feat(conversione): pipeline unificata PDF → Markdown, sostituisce step-0..4

Consolida in un unico modulo (conversione/pipeline.py) l'intera catena di
conversione che prima era distribuita tra step-0 (validazione PDF), step-1
(ispezione qualità), step-2 (conversione pymupdf4llm), step-3 (rilevamento
struttura) e step-4 (revisione euristica).

Miglioramenti principali rispetto agli step separati:
- libreria di conversione XY-Cut++ (ordine di lettura più preciso)
- 30+ trasformazioni euristiche vs le 7 originali di step-4
- validazione, struttura e profilo chunking prodotti in un solo passaggio
- validate.py con scoring orientato a chunking/vettorizzazione
This commit is contained in:
2026-04-17 16:05:11 +02:00
14 changed files with 2314 additions and 1319 deletions
+199
View File
@@ -0,0 +1,199 @@
---
description: Legge un file Markdown, individua tutti i problemi che compromettono il chunking (artefatti, sillabazione, header malformati, paragrafi spezzati, gerarchia incoerente, sezioni vuote) e applica le correzioni direttamente sul file senza chiedere conferma per i casi chiari.
allowed-tools: Read Bash Grep Edit
argument-hint: <path/to/clean.md oppure stem>
---
Risolvi il percorso del file da preparare:
!`python3 -c "
import sys, json, re
from pathlib import Path
arg = '$ARGUMENTS'.strip()
root = Path('.')
candidates = [
Path(arg),
root / arg,
root / 'conversione' / arg / 'clean.md',
root / 'step-4' / arg / 'clean.md',
]
md_path = None
for p in candidates:
if p.exists() and p.suffix == '.md':
md_path = p
break
if not md_path:
print('ERRORE: file non trovato per:', arg)
sys.exit(1)
print('MD_PATH=' + str(md_path))
# Cerca profilo strutturale (report.json o structure_profile.json)
stem = md_path.parent.name
profile_candidates = [
md_path.parent / 'report.json',
md_path.parent / 'structure_profile.json',
root / 'step-4' / stem / 'structure_profile.json',
root / 'conversione' / stem / 'report.json',
]
for sp in profile_candidates:
if sp.exists():
try:
d = json.load(open(sp))
st = d.get('structure', d)
print(f'STRATEGIA={st.get(\"strategia_chunking\",\"?\")}')
print(f'LINGUA={st.get(\"lingua_rilevata\",\"?\")}')
print(f'H1={st.get(\"n_h1\",0)} H2={st.get(\"n_h2\",0)} H3={st.get(\"n_h3\",0)}')
for a in st.get('avvertenze', []):
print(f'AVVISO: {a}')
except Exception:
pass
break
# Statistiche file
text = md_path.read_text(encoding='utf-8')
lines = text.split('\n')
pua = len(re.findall(r'[\ue000-\uf8ff]', text))
print(f'RIGHE={len(lines)} CHARS={len(text)}')
if pua:
print(f'PUA_RESIDUI={pua}')
" 2>/dev/null`
Se l'output contiene `ERRORE`, comunica il percorso non trovato e fermati.
---
Leggi il file completo identificato da `MD_PATH` nell'output sopra. Poi esegui **tutti** i controlli e applica le correzioni nell'ordine indicato.
I parametri di riferimento per il chunking sono: **MIN_CHARS=200, MAX_CHARS=800**.
---
## Controllo 1 — Sillabazione residua
Cerca blocchi di testo (non header) dove una riga termina con `-` e la successiva inizia con lettera minuscola: è un'interruzione di parola non risolta da PDF.
Esempio da correggere:
```
...il meccanismo di decen-
tralizzazione permette...
```
`...il meccanismo di decentralizzazione permette...`
**Applica** ogni fusione con Edit. Se la parola ricomposta sembra errata, segnala invece di correggere.
---
## Controllo 2 — Artefatti di pagina
Righe standalone che sono esclusivamente:
- Un numero intero isolato (numero di pagina)
- Titolo del libro / nome autore che si ripete identico 3+ volte nel documento
- Intestazioni di capitolo che si ripetono (es. `## 3. Termodinamica` appare sia come header legittimo che come riga di testo duplicata)
**Applica** la rimozione con Edit per le ripetizioni chiaramente decorative. Segnala i casi ambigui.
---
## Controllo 3 — Numeri di pagina in header
Header che terminano con ` | N` o ` N` dove N è un numero isolato (residuo di indice non rimosso):
- `### 16. Link vari | 109``### 16. Link vari`
- `## Capitolo 3 42``## Capitolo 3`
**Applica** con Edit.
---
## Controllo 4 — Header malformati
Per ogni header (`#`, `##`, `###`):
**a) ALL-CAPS non convertito:**
`## TERMODINAMICA DEI PROCESSI``## Termodinamica dei processi`
Usa sentence case (prima lettera maiuscola, resto minuscolo salvo nomi propri evidenti).
**Applica**.
**b) Livello h4/h5/h6:**
`#### Sottosezione``### Sottosezione`
**Applica**.
**c) Testo troppo lungo (> 120 char):**
Probabilmente non è un header ma testo estratto erroneamente. Rimuovi i `#` iniziali lasciando il testo come paragrafo normale.
**Applica** se chiaramente non è un titolo. Segnala se ambiguo.
**d) Header duplicati:**
Se lo stesso header appare due volte, rimuovi la seconda occorrenza (o la prima se è quella fuori contesto).
**Applica**.
---
## Controllo 5 — Paragrafi spezzati
Blocchi di testo (non header, non liste) che terminano senza punteggiatura finale (`.?!»)`).
Se il blocco successivo non inizia con lettera maiuscola e non è un header/lista, i due blocchi sono parte della stessa frase spezzata da un salto pagina PDF.
**Applica** la fusione solo quando sei certo (la congiunzione è evidente: inizia con congiunzione, continua la frase in modo inequivocabile). Segnala i casi dubbi invece di correggere.
---
## Controllo 6 — Sezioni quasi-vuote o vuote
Sezione (header + corpo) con corpo < 100 caratteri:
- Se il contenuto è evidentemente una sottosezione o introduzione di ciò che segue (e non ha senso da solo), rimuovi l'header e unisci il testo alla sezione precedente o successiva.
- Se è un header di capitolo che introduce legittime sottosezioni (`##` seguito da `###`), lascia invariato.
**Applica** le fusioni sicure. Segnala quelle ambigue.
---
## Controllo 7 — Gerarchia heading
Verifica che la gerarchia sia coerente. Problemi da correggere:
- Più di un `# ` (h1) nel documento → il secondo e successivi diventano `## ` salvo che siano chiaramente titoli di parti distinte
- `### ` prima del primo `## ` → abbassa il `###` a `## ` o aggiungi un `## ` genitore appropriato
- `## ` prima del primo `# ` in documenti con h1 → lascia invariato (alcuni documenti non hanno h1)
**Applica** solo le correzioni di livello sicure. Segnala le ristrutturazioni che richiedono giudizio.
---
## Controllo 8 — Sezioni troppo lunghe senza struttura
Sezione (## o ###) con corpo > 3000 caratteri e nessun header figlio al suo interno: il chunker la spezzerà su frasi in modo meccanico, perdendo coerenza semantica.
Se il testo contiene chiari cambio-argomento (paragrafi separati da riga vuota, con transizioni come "Inoltre...", "In secondo luogo...", "Un altro aspetto..."), considera di aggiungere un `### ` per suddividere semanticamente.
**Non aggiungere header inventati.** Segnala le sezioni candidate e proponi i titoli: applica solo su risposta affermativa.
---
## Report finale
Dopo aver applicato tutte le correzioni automatiche, mostra:
```
File: <path>
Correzioni applicate: N totali
Sillabazione risolta: N
Artefatti pagina rimossi: N
Numeri pagina in header: N
Header normalizzati: N (ALL-CAPS, livello, lunghezza, duplicati)
Paragrafi fusi: N
Sezioni quasi-vuote risolte:N
Gerarchia corretta: N
Problemi aperti (richiedono giudizio manuale):
[riga N] <descrizione precisa>
...
```
Se non ci sono problemi aperti: **"Markdown pronto per il chunking."**
Se ci sono problemi aperti: elencali e chiedi quali applicare.
-115
View File
@@ -1,115 +0,0 @@
---
description: Revisione qualitativa del clean.md dopo il pre-processing automatico (step 4). Trova artefatti residui, paragrafi spezzati e header errati, poi propone le correzioni.
allowed-tools: Read Bash Grep Edit
argument-hint: <stem>
---
Esegui la revisione qualitativa di `step-4/$ARGUMENTS/clean.md`.
**Cosa è già stato fatto automaticamente (revision_log):**
!`grep -A 12 "^## $ARGUMENTS" step-4/revision_log.md 2>/dev/null || echo "(nessun log trovato per questo stem)"`
**Profilo strutturale attuale:**
!`python3 -c "
import json, sys
try:
d = json.load(open('step-4/$ARGUMENTS/structure_profile.json'))
print(f'Livello: {d[\"livello_struttura\"]} Strategia: {d[\"strategia_chunking\"]}')
print(f'h1={d[\"n_h1\"]} h2={d[\"n_h2\"]} h3={d[\"n_h3\"]} paragrafi={d[\"n_paragrafi\"]}')
print(f'Lunghezza media sezione: {d[\"lunghezza_media_sezione\"]} char')
for a in d.get('avvertenze', []): print(f' ⚠️ {a}')
except Exception as e: print(f'ERRORE: {e}')
" 2>/dev/null`
---
Analizza `step-4/$ARGUMENTS/clean.md` eseguendo i grep seguenti e ragionando sui risultati. Per ogni check: esegui il grep, conta i risultati, riporta i casi concreti (max 5 esempi con numero di riga).
## Check 1 — Sillabazione residua
Righe che terminano con trattino seguito da testo nella riga successiva (artefatto PDF non risolto):
```bash
grep -n "\-$" step-4/$ARGUMENTS/clean.md | head -20
```
Segnala se presenti: numero di riga, testo della riga e della riga successiva.
## Check 2 — Righe orfane (artefatti PDF)
Righe standalone (non header `#`, non vuote) di meno di 60 caratteri che sembrano artefatti:
```bash
grep -n "^[^#\-\*\|].\{1,59\}$" step-4/$ARGUMENTS/clean.md | grep -v "^\s*$" | head -30
```
Valuta ogni riga: è testo normale breve (legittimo) o artefatto (numero di pagina, nome autore isolato, riga di intestazione ripetuta)?
## Check 3 — Paragrafi con frase spezzata
Blocchi di testo che terminano senza punteggiatura di fine frase (`.?!»)`):
```bash
grep -n "[^.!?»)\]\'\"]$" step-4/$ARGUMENTS/clean.md | grep -v "^[0-9]*:#" | grep -v "^[0-9]*:\s*$" | grep -v "^\s*[-\*]" | head -20
```
Riporta i casi più sospetti (righe brevi che finiscono a metà concetto).
## Check 4 — Header sospetti
```bash
grep -n "^##\? " step-4/$ARGUMENTS/clean.md | head -40
```
Verifica:
- `##` o `###` con contenuto interamente MAIUSCOLO non convertito → segnala
- Header duplicati (stesso testo che appare due volte) → segnala
- `##` con testo > 80 caratteri (probabile testo che non è un header) → segnala
- Salti di livello anomali (es. `###` senza un `##` padre) → segnala
## Check 5 — Sezioni quasi vuote
```bash
python3 -c "
import re, sys
text = open('step-4/$ARGUMENTS/clean.md').read()
sections = re.split(r'^(#{1,3} .+)$', text, flags=re.MULTILINE)
for i in range(1, len(sections)-1, 2):
header = sections[i].strip()
body = sections[i+1].strip() if i+1 < len(sections) else ''
if len(body) < 80 and body:
print(f'{header!r} → {len(body)} char: {body[:60]!r}')
elif not body:
print(f'{header!r} → VUOTA')
" 2>/dev/null | head -20
```
Sezioni con body < 80 char o vuote compromettono il chunking. Segnala quelle che non hanno senso come sezione autonoma.
## Check 6 — Gerarchia strutturale
```bash
grep -n "^#\{1,3\} " step-4/$ARGUMENTS/clean.md | head -50
```
Verifica che la gerarchia sia coerente: `# → ## → ###`. Segnala se ci sono `###` prima del primo `##`, o `##` prima del primo `#`, o `#` multipli (più di un h1).
---
## Report finale
```
🔴 BLOCCANTI (compromettono il chunking o il retrieval)
[riga N] descrizione precisa del problema
...
🟡 MINORI (artefatti visibili, non bloccanti)
[riga N] descrizione
...
🟢 OK — nessun problema rilevato in questa categoria
```
Poi chiedi: **"Applico le correzioni per i 🔴? E per i 🟡?"**
Applica solo ciò che viene esplicitamente approvato. Usa Edit per ogni modifica — mai riscrivere l'intero file.
+3
View File
@@ -46,3 +46,6 @@ step-5/*/
# Output step-6 — report generati da verify_chunks.py
step-6/*/
# Output conversione/ — generati da conversione/pipeline.py
conversione/*/
+33 -40
View File
@@ -4,83 +4,76 @@
- **Lingua:** Rispondi sempre in italiano.
- **Venv obbligatorio:** Usa `.venv/bin/python` o attiva con `source .venv/bin/activate`. Mai `pip`/`python` di sistema.
- **Non modificare `raw.md`:** `step-2/<stem>/raw.md` è immutabile. La copia di lavoro è `step-4/<stem>/clean.md`.
- **Non modificare `raw.md`:** Il file `raw.md` di ogni stem è immutabile. La copia di lavoro è sempre `clean.md`.
---
## Pipeline (ordine obbligatorio)
## Pipeline (operazioni in ordine)
```
PDF (sources/) → step-0 → step-1 → step-2 → step-3
→ step-4 (CRITICO: revisione manuale clean.md)
→ step-5 → step-6 → step-7 (Ollama) → step-8 → step-9
PDF (sources/)
→ conversione (PDF → clean.md + structure_profile.json)
→ chunking (clean.md → chunks.json)
→ verifica (chunks.json → report + fix automatici)
→ vettorizzazione (chunks.json → ChromaDB)
→ retrieval (query → risposta via Ollama)
```
Il parametro `--stem` identifica il documento (nome PDF senza `.pdf`). Lo stem è anche il nome della collection ChromaDB.
Comandi tipici:
```bash
source .venv/bin/activate
python step-4/revise.py --stem <stem>
python step-5/chunker.py --stem <stem>
python step-6/verify_chunks.py --stem <stem>
python step-8/ingest.py --stem <stem>
python step-9/rag.py --stem <stem>
```
---
## File critici
| File | Ruolo |
|---|---|
| `step-9/config.py` | Fonte di verità: `EMBED_MODEL`, `OLLAMA_MODEL`, `TOP_K`, `TEMPERATURE`, `SYSTEM_PROMPT` |
| `step-5/chunker.py` | Chunking adattivo — `MIN_CHARS=200`, `MAX_CHARS=800`, `OVERLAP_S=2` |
| `step-6/verify_chunks.py` | Verifica chunk — stesse soglie di `chunker.py` |
| `step-6/fix_chunks.py` | Fix automatici su chunk anomali |
| `step-4/revise.py` | Pre-processing MD automatico (8 trasformazioni euristiche) |
| `step-8/ingest.py` | Vettorizzazione ChromaDB — legge `EMBED_MODEL` da `config.py` |
| `step-9/rag.py` | Pipeline RAG interattiva |
| `config.py` | Fonte di verità: `EMBED_MODEL`, `OLLAMA_MODEL`, `TOP_K`, `TEMPERATURE`, `SYSTEM_PROMPT` |
| `chunker.py` | Chunking adattivo — `MIN_CHARS=200`, `MAX_CHARS=800`, `OVERLAP_S=2` |
| `verify_chunks.py` | Verifica chunk — stesse soglie di `chunker.py` |
| `fix_chunks.py` | Fix automatici su chunk anomali |
| `ingest.py` | Vettorizzazione ChromaDB — legge `EMBED_MODEL` da `config.py` |
| `rag.py` | Pipeline RAG interattiva |
| `conversione/pipeline.py` | Conversione PDF → clean Markdown strutturato |
---
## Regole di assistenza
**Modifica `EMBED_MODEL` in `step-9/config.py`:**
**Modifica `EMBED_MODEL` in `config.py`:**
Avvisa sempre che serve rieseguire la vettorizzazione:
```bash
python step-8/ingest.py --stem <stem> --force
python ingest.py --stem <stem> --force
```
`ingest.py` importa `EMBED_MODEL` direttamente da `config.py` — la coerenza è critica: se violata non produce errori ma restituisce risultati insensati.
**Modifica soglie chunking (`MIN_CHARS`, `MAX_CHARS`, `OVERLAP_S`):**
I valori compaiono in tre file che vanno sincronizzati manualmente:
1. `step-5/chunker.py`
2. `step-6/verify_chunks.py`
3. `step-6/fix_chunks.py`
I valori compaiono in più file che vanno sincronizzati manualmente:
- `chunker.py`
- `verify_chunks.py`
- `fix_chunks.py`
**Step 4 — revisione clean.md:**
`revise.py` applica trasformazioni automatiche, ma il risultato va sempre revisionato a mano. La qualità del RAG dipende da `clean.md` più di qualsiasi parametro tecnico. Suggerisci sempre `/step4-review <stem>` dopo `revise.py`.
**Conversione PDF → Markdown:**
`conversione/pipeline.py` produce `raw.md` e `clean.md`. Il `clean.md` va sempre revisionato dopo la conversione automatica — la qualità del RAG dipende da esso più di qualsiasi parametro tecnico. Suggerisci sempre `/prepare-md conversione/<stem>/clean.md` dopo la conversione.
**Step 6 — verifica chunk:**
Dopo `verify_chunks.py`, usa `/step6-fix <stem>` prima di passare a step-8.
**Verifica chunk:**
Dopo `verify_chunks.py`, usa `/step6-fix <stem>` prima di procedere con la vettorizzazione.
---
## Skills custom
- `/step4-review <stem>` — Revisione qualitativa `clean.md`: artefatti, paragrafi spezzati, header errati.
- `/prepare-md <path>` — Revisione e correzione automatica di qualsiasi `clean.md`: sillabazione, artefatti, header malformati, paragrafi spezzati, gerarchia, sezioni vuote. Accetta path completo (`conversione/bitcoin/clean.md`) o stem (`bitcoin`).
- `/step6-fix <stem>` — Dry-run e applicazione fix chunk tramite `fix_chunks.py`.
---
## Struttura directory per stem
## Output per stem
```
step-2/<stem>/raw.md ← immutabile
step-4/<stem>/clean.md ← copia di lavoro
step-4/<stem>/structure_profile.json
step-5/<stem>/chunks.json
step-6/<stem>/report.json
chroma_db/<stem>/ ← collection ChromaDB
conversione/<stem>/raw.md ← immutabile
conversione/<stem>/clean.md ← copia di lavoro
conversione/<stem>/structure_profile.json
<stem>/chunks.json
<stem>/report.json
chroma_db/<stem>/ ← collection ChromaDB
```
+236
View File
@@ -0,0 +1,236 @@
# conversione — PDF → Markdown pulito
Pipeline automatica che trasforma un PDF grezzo in Markdown strutturato e
pronto per la suddivisione in chunk. Gestisce l'intero processo: validazione
del PDF, estrazione del testo, pulizia strutturale e analisi della struttura
del documento.
## Requisiti
### Python
```
pip install opendataloader-pdf pdfplumber
```
### Java 11+
`opendataloader-pdf` richiede Java sul PATH. Se non è installato:
```bash
# Ubuntu / Debian / WSL
sudo apt install default-jdk
# Verifica
java -version
```
Download alternativo: https://adoptium.net/
---
## Utilizzo
Posiziona il PDF in `sources/<nome>.pdf`, poi:
```bash
# Singolo documento
python conversione/pipeline.py --stem <nome>
# Tutti i PDF in sources/
python conversione/pipeline.py
# Forza la riesecuzione (sovrascrive output esistente)
python conversione/pipeline.py --stem <nome> --force
```
Il parametro `--stem` è il nome del file PDF senza estensione.
Esempio: `sources/analisi1.pdf``--stem analisi1`
---
## Output
Per ogni stem vengono prodotti tre file in `conversione/<stem>/`:
| File | Descrizione |
|------|-------------|
| `raw.md` | Markdown grezzo estratto dal PDF — **non modificare** |
| `clean.md` | Markdown pulito e strutturato — input per il chunker |
| `report.json` | Metriche complete di qualità della conversione |
### report.json
Contiene tutto ciò che serve per valutare la conversione: statistiche
trasformazioni, struttura rilevata, distribuzione lunghezze sezioni,
anomalie e problemi residui con esempi.
```json
{
"stem": "dirittoprivato",
"timestamp": "2026-04-16 15:41",
"transforms": {
"n_accenti_corretti": 0,
"n_dotleader_rimossi": 0,
"toc_rimosso": false,
"n_sezioni_numerate": 63,
"riduzione_pct": 1
},
"structure": {
"livello_struttura": 3,
"n_h1": 0, "n_h2": 6, "n_h3": 163,
"lingua_rilevata": "it",
"strategia_chunking": "h3_aware",
"avvertenze": []
},
"distribution": { "min": 12, "p25": 312, "mediana": 681, "p75": 1197, "max": 6120 },
"anomalie": {
"bare_headers": 0,
"short_sections": 1,
"long_sections": 39,
"bare_headers_list": [],
"short_sections_list": [...],
"long_sections_list": [...]
},
"residui": {
"backtick": 0, "dotleader": 0, "url": 0, "immagini": 0,
"backtick_esempi": []
}
}
```
**`strategia_chunking`** indica come suddividere il documento in chunk:
| Valore | Significato |
|--------|-------------|
| `h3_aware` | Documento ricco di sezioni `###` — usa i `###` come boundary |
| `h2_paragraph_split` | Struttura parziale `##` — suddividi per paragrafo dentro ogni `##` |
| `paragraph` | Nessuna gerarchia chiara — suddividi per paragrafo |
| `sliding_window` | Testo piatto — usa finestra scorrevole |
---
## Validazione batch
Dopo aver convertito uno o più documenti, esegui `validate.py` per ottenere
una tabella di stato su tutti gli stem:
```bash
python conversione/validate.py
```
Output di esempio:
```
stem h2 h3 strategia bare corte lunghe backtick dotlead url status
──────────────────────────────────────────────────────────────────────────────────────────────
analisi1 13 279 h3_aware 0 36 151 10 0 0 ⚠️
dirittoprivato 6 163 h3_aware 0 1 39 0 0 0 ✅
nietzsche 4 303 h3_aware 6 104 100 0 0 0 ⚠️
──────────────────────────────────────────────────────────────────────────────────────────────
Totale: 3 ✅ 1 ⚠️ 2 ❌ 0
```
**Legenda colonne:**
| Colonna | Significato | Soglia warning |
|---------|-------------|----------------|
| `bare` | Header solo-numero senza corpo (`### 1.` vuoto) | ≥ 1 |
| `corte` | Sezioni con corpo < 150 chars | informativo |
| `lunghe` | Sezioni con corpo > 1500 chars | ≥ 80 |
| `backtick` | Backtick `` ` `` residui nel testo | ≥ 1 |
| `dotlead` | Dot-leader residui (`. . . .`) | ≥ 1 |
**Stato:**
- ✅ nessuna anomalia critica
- ⚠️ anomalie presenti, documento processabile ma da verificare
- ❌ struttura non rilevata (`livello_struttura = 0`) o > 50 backtick residui
---
## Cosa fa la pipeline
La pipeline esegue quattro fasi in sequenza.
### Fase 1 — Validazione
Verifica che il PDF esista, non sia vuoto, non sia protetto da password e
contenga testo digitale estraibile. I PDF scansionati (immagini) non sono
supportati.
### Fase 2 — Estrazione testo
Usa `opendataloader-pdf` con l'algoritmo **XY-Cut++** per ricostruire il
corretto ordine di lettura anche in documenti multi-colonna. Le immagini
vengono ignorate completamente — il `clean.md` non contiene mai riferimenti
a immagini.
### Fase 3 — Pulizia strutturale
Serie di trasformazioni applicate al Markdown grezzo:
| Trasformazione | Problema risolto |
|----------------|-----------------|
| Rimozione riferimenti immagini | Artefatti `![...]()` lasciati dal convertitore |
| Fix accenti backtick LaTeX | `` `e``→`è`, ``puo` ``→`può`, ``sar`a``→`sarà` |
| Rimozione dot-leader TOC | `- 1.1 Titolo . . . . . 42` (voci indice) |
| Rimozione numerali romani pagina | `i`, `ii`, `iii` su riga isolata (footer LaTeX) |
| Fix header + body concatenati | `### 11 TitoloCorpo testo...` → header + paragrafo separati |
| Estrazione header Capitolo inline | `Capitolo 3: IL TITOLO` nel corpo → `## Capitolo 3: ...` |
| Normalizzazione livelli header | `####`, `#####` → `###` (gerarchia uniforme a 3 livelli) |
| Rimozione bold negli header | `## **Titolo**` → `## Titolo` |
| Normalizzazione ALL-CAPS header | `## IL TITOLO` → `## Il titolo` |
| Rimozione TOC | Blocchi indice/sommario rilevati per keyword |
| ALL-CAPS standalone → header | Righe in maiuscolo isolate → `## Titolo` |
| Sezioni numerate → header | `N. Titolo sezione` → `### N.` + corpo |
| Sezioni con punto → header | `- N. Testo aphorismo...` → `### N.` + corpo |
| Sezioni lista numerate → header | `- N Titolo Corpo testo...` → `### N. Titolo` + corpo |
| Unione paragrafi spezzati | Paragrafi tagliati dal salto pagina PDF ricongiunti |
| Normalizzazione whitespace | Spazi multipli ridotti a singoli |
| Riduzione righe vuote | Tre o più righe vuote consecutive → due |
| Rimozione URL watermark | `www.piattaforma.com`, `https://...` su riga isolata |
| Rimozione header senza corpo | Sezioni vuote e header watermark scartati |
> **Rilevamento automatico tipo documento**: se il documento contiene sezioni
> "Esercizi" (libri di testo accademici), la conversione dei numeri di esercizio
> in header viene disabilitata automaticamente.
### Fase 4 — Analisi struttura
Rileva la gerarchia del documento (conteggio `#`/`##`/`###`), la lingua
(italiano / inglese / sconosciuta), la lunghezza media delle sezioni e
suggerisce la strategia di chunking ottimale. I risultati sono scritti in
`structure_profile.json`.
---
## Tipi di documento supportati
| Tipo | Esempi | Note |
|------|--------|------|
| Testo giuridico / accademico | Manuali, dispense, codici | Header numerati `N.` e `N.N` |
| Filosofia / saggistica | Aforismi numerati, capitoli | Pattern `- N. testo` |
| Matematica / LaTeX | Analisi, algebra, fisica | Fix accenti, TOC, numerali romani |
| Testo generico strutturato | Qualsiasi PDF digitale | Paragrafi e header standard |
**Non supportati**: PDF scansionati (solo immagini), PDF protetti da password.
---
## Log di esecuzione
Durante l'esecuzione la pipeline stampa le statistiche di ogni trasformazione:
```
[3/4] Pulizia strutturale...
✅ Immagini rimosse: 0
Accenti corretti: 3701
Dot-leader rimossi: 53
Header concat fixati: 0
TOC rimosso: sì
ALL-CAPS → ##: 14
Sezioni → ###: 279
Paragrafi uniti: 12998
Riduzione testo: 3%
```
Se un documento è già stato convertito, la pipeline lo salta automaticamente.
Usa `--force` per rieseguire.
+29
View File
@@ -0,0 +1,29 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
mapfile -t dirs < <(find . -maxdepth 1 -mindepth 1 -type d | sort)
if [[ ${#dirs[@]} -eq 0 ]]; then
echo "Nessuna cartella da cancellare."
exit 0
fi
echo "Cartelle che verranno cancellate:"
for d in "${dirs[@]}"; do
echo " $d"
done
if [[ "${1:-}" != "-f" ]]; then
read -r -p "Confermi? [s/N] " answer
[[ "$answer" =~ ^[sS]$ ]] || { echo "Annullato."; exit 0; }
fi
for d in "${dirs[@]}"; do
rm -rf "$d"
echo "Rimossa: $d"
done
echo "Pulizia completata."
File diff suppressed because it is too large Load Diff
+210
View File
@@ -0,0 +1,210 @@
#!/usr/bin/env python3
"""
conversione/validate.py Validazione qualità Markdown
Legge i report.json prodotti da pipeline.py, stampa una tabella di stato
e assegna un voto (0-100) a ogni documento.
90-100 A ottimo, pronto per il chunker
75-89 B buono, qualche sezione lunga ma accettabile
60-74 C accettabile, anomalie minori da verificare
40-59 D da rivedere, problemi strutturali o residui evidenti
0-39 F da riprocessare, struttura assente o testo corrotto
Uso:
python conversione/validate.py # tutti gli stem
python conversione/validate.py analisi1 # stem specifico
python conversione/validate.py a b c # stem multipli
python conversione/validate.py --detail analisi1 # mostra dettaglio penalità
"""
import argparse
import json
import sys
from pathlib import Path
# ─── Punteggio ───────────────────────────────────────────────────────────────
_GRADES = [(90, "A"), (75, "B"), (60, "C"), (40, "D"), (0, "F")]
def _score(r: dict) -> tuple[int, list[str]]:
"""
Calcola un punteggio 0-100 sulla qualità del clean.md ai fini della
suddivisione in chunk e vettorizzazione.
Restituisce (score, lista_penalità_applicate).
Penalità struttura (il chunker non può operare senza header):
struttura assente (livello 0) 40
struttura piatta (livello 1) 15
Penalità residui (finiscono nei vettori e degradano il retrieval):
backtick 2/cad (max 20)
dot-leader 5/cad (max 10)
URL / watermark 5/cad (max 15)
immagini residue 5/cad (max 10)
<br> inline (artefatti tabelle) 2/cad (max 15)
simboli encoding (!/" residui) → 1/cad (max 10)
formule inline [N.M] 1/cad (max 8)
Penalità anomalie:
bare headers 3/cad (max 15)
Non penalizzate (il chunker le normalizza):
sezioni corte, sezioni lunghe, mediana, p25
"""
score = 100
detail = []
structure = r.get("structure", {})
anomalie = r.get("anomalie", {})
residui = r.get("residui", {})
livello = structure.get("livello_struttura", 0)
# ── Struttura ─────────────────────────────────────────────────────────
if livello == 0:
score -= 40
detail.append("struttura assente 40")
elif livello == 1:
score -= 15
detail.append("struttura piatta 15")
# ── Residui ───────────────────────────────────────────────────────────
def _pen(key: str, per_item: int, cap: int, label: str) -> None:
n = residui.get(key, 0)
if n:
p = min(cap, n * per_item)
nonlocal score
score -= p
detail.append(f"{label} ×{n} {p}")
_pen("backtick", 2, 20, "backtick")
_pen("dotleader", 5, 10, "dot-leader")
_pen("url", 5, 15, "url")
_pen("immagini", 5, 10, "immagini")
_pen("br_inline", 2, 15, "<br> inline")
_pen("simboli_encoding", 1, 10, "simboli encoding")
_pen("formule_inline", 1, 8, "formule inline")
_pen("footnote_markers", 1, 8, "footnote residui")
_pen("pua_markers", 2, 20, "caratteri PUA font Symbol")
# ── Anomalie ──────────────────────────────────────────────────────────
n_bare = anomalie.get("bare_headers", 0)
if n_bare:
p = min(15, n_bare * 3)
score -= p
detail.append(f"bare headers ×{n_bare} {p}")
return max(0, score), detail
def _grade(score: int) -> str:
return next(g for threshold, g in _GRADES if score >= threshold)
# ─── Validazione ─────────────────────────────────────────────────────────────
def validate(stems: list[str], project_root: Path, detail: bool = False) -> None:
conv_dir = project_root / "conversione"
paths = (
[conv_dir / s / "report.json" for s in stems]
if stems
else sorted(conv_dir.glob("*/report.json"))
)
if not paths:
print("Nessun report.json trovato in conversione/*/")
sys.exit(0)
rows = [
json.loads(p.read_text(encoding="utf-8")) if p.exists()
else {"stem": p.parent.name, "_missing": True}
for p in paths
]
# ── Intestazione ─────────────────────────────────────────────────────
col = max(len(r.get("stem", "stem")) for r in rows) + 2
header = (
f"{'stem':<{col}}"
f"{'h2':>4}{'h3':>5} "
f"{'strategia':<18}"
f"{'bare':>5}{'corte':>6}{'lunghe':>7}"
f"{'btk':>5}{'br':>4}{'enc':>4}{'url':>4}"
f"{'med':>6}"
f" {'voto':>4} grade"
)
sep = "" * len(header)
print(f"\n{header}\n{sep}")
scores = []
# ── Righe ─────────────────────────────────────────────────────────────
for r in rows:
if r.get("_missing"):
print(f"{r['stem']:<{col}} (report.json non trovato)")
continue
st = r.get("structure", {})
an = r.get("anomalie", {})
res = r.get("residui", {})
dist = r.get("distribution", {})
s, pen = _score(r)
scores.append(s)
print(
f"{r['stem']:<{col}}"
f"{st.get('n_h2', 0):>4}"
f"{st.get('n_h3', 0):>5} "
f"{st.get('strategia_chunking','?'):<18}"
f"{an.get('bare_headers', 0):>5}"
f"{an.get('short_sections', 0):>6}"
f"{an.get('long_sections', 0):>7}"
f"{res.get('backtick', 0):>5}"
f"{res.get('br_inline', 0):>4}"
f"{res.get('simboli_encoding', 0):>4}"
f"{res.get('url', 0):>4}"
f"{dist.get('mediana', 0):>6}"
f" {s:>4} {_grade(s)}"
)
if detail and pen:
for p in pen:
print(f" {'':>{col}}{p}")
# ── Riepilogo ─────────────────────────────────────────────────────────
print(sep)
if scores:
media = sum(scores) / len(scores)
print(
f"Documenti: {len(scores)} "
f"Media: {media:.0f}/100 {_grade(int(media))} "
f"(A≥90 B≥75 C≥60 D≥40 F<40)"
)
print(
"\nColonne: bare=header vuoti corte=sez<150ch lunghe=sez>1500ch "
"btk=backtick br=<br>inline enc=simboli encoding med=mediana chars\n"
)
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Valida i report Markdown prodotti da pipeline.py",
epilog="Senza argomenti valida tutti gli stem in conversione/*/",
)
parser.add_argument(
"stems",
nargs="*",
metavar="STEM",
help="stem da validare (es: analisi1). Ometti per tutti.",
)
parser.add_argument(
"--detail", "-d",
action="store_true",
help="mostra dettaglio penalità per ogni documento",
)
args = parser.parse_args()
validate(args.stems, Path(__file__).parent.parent, detail=args.detail)
+4
View File
@@ -4,5 +4,9 @@ pdfplumber==0.11.9
# Step 2 — Conversione PDF → Markdown
pymupdf4llm
# conversione/ — Pipeline automatica PDF → clean Markdown (alternativa a step 0+1+2+3+4)
# Richiede anche Java 11+ sul PATH: https://adoptium.net/
opendataloader-pdf
# Step 8 — Vettorizzazione
chromadb
-229
View File
@@ -1,229 +0,0 @@
#!/usr/bin/env python3
"""
Step 0 Verifica idoneità PDF
Legge tutti i PDF in sources/ e salva un report per ognuno in step-0/.
Uso:
python step-0/check_pdf.py
Output:
step-0/<nome_pdf>_step0_report.txt
"""
import sys
import statistics
from datetime import datetime
from pathlib import Path
def check_pdf(pdf_path: str, save: bool = True) -> None:
try:
import pdfplumber
except ImportError:
print("Errore: pdfplumber non è installato.")
print(" pip install pdfplumber")
sys.exit(1)
path = Path(pdf_path)
if not path.exists():
print(f"Errore: file non trovato — {pdf_path}")
sys.exit(1)
if path.suffix.lower() != ".pdf":
print(f"Errore: il file non è un PDF — {pdf_path}")
sys.exit(1)
lines = [] # righe del report
results = [] # (etichetta, stato, messaggio)
def out(text=""):
lines.append(text)
print(text)
out(f"Step 0 — Verifica idoneità PDF")
out(f"File: {path.name}")
out(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
out("=" * 50)
# ------------------------------------------------------------------ #
# Criterio 1 — Non protetto da password
# ------------------------------------------------------------------ #
try:
with pdfplumber.open(path) as pdf:
n_pages = len(pdf.pages)
results.append(("Non protetto da password", "PASS", f"{n_pages} pagine"))
except Exception as e:
msg = str(e).lower()
if "password" in msg or "encrypted" in msg or "decrypt" in msg:
results.append(("Non protetto da password", "FAIL",
"Il PDF è cifrato — non può essere elaborato"))
else:
results.append(("Non protetto da password", "FAIL",
f"Impossibile aprire il file: {e}"))
_render_results(results, out)
_maybe_save(lines, path, save)
return
# ------------------------------------------------------------------ #
# Lettura pagine — una sola passata
# ------------------------------------------------------------------ #
char_counts = []
line_lengths = []
all_text = ""
empty_pages = 0
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
text = page.extract_text() or ""
all_text += text + "\n"
chars = len(text.strip())
char_counts.append(chars)
if chars == 0:
empty_pages += 1
for line in text.splitlines():
stripped = line.strip()
if stripped:
line_lengths.append(len(stripped))
total_pages = len(char_counts)
pages_with_text = sum(1 for c in char_counts if c > 50)
text_coverage = pages_with_text / total_pages if total_pages > 0 else 0
# ------------------------------------------------------------------ #
# Criterio 2 — Testo estraibile
# ------------------------------------------------------------------ #
if text_coverage >= 0.7:
results.append(("Testo estraibile", "PASS",
f"{pages_with_text}/{total_pages} pagine con testo ({text_coverage:.0%})"))
elif text_coverage >= 0.4:
results.append(("Testo estraibile", "WARN",
f"Solo {pages_with_text}/{total_pages} pagine con testo — revisione estesa necessaria"))
else:
results.append(("Testo estraibile", "FAIL",
f"Solo {pages_with_text}/{total_pages} pagine con testo — probabilmente scansionato"))
# ------------------------------------------------------------------ #
# Criterio 3 — Generato digitalmente (non scansionato)
# ------------------------------------------------------------------ #
pages_text_only = [c for c in char_counts if c > 0]
avg_chars = statistics.mean(pages_text_only) if pages_text_only else 0
if avg_chars >= 300:
results.append(("Generato digitalmente (non scansionato)", "PASS",
f"Media {avg_chars:.0f} char/pagina"))
elif avg_chars >= 100:
results.append(("Generato digitalmente (non scansionato)", "WARN",
f"Media bassa: {avg_chars:.0f} char/pagina — alcune pagine potrebbero essere immagini"))
else:
results.append(("Generato digitalmente (non scansionato)", "FAIL",
f"Media molto bassa: {avg_chars:.0f} char/pagina — il PDF sembra scansionato"))
# ------------------------------------------------------------------ #
# Criterio 4 — Pagine vuote
# ------------------------------------------------------------------ #
if empty_pages == 0:
results.append(("Pagine vuote", "PASS", "Nessuna pagina vuota"))
elif empty_pages <= total_pages * 0.05:
results.append(("Pagine vuote", "WARN",
f"{empty_pages} pagine vuote (≤ 5%) — probabilmente copertine o separatori"))
else:
results.append(("Pagine vuote", "WARN",
f"{empty_pages} pagine vuote ({empty_pages/total_pages:.0%}) — controllare"))
# ------------------------------------------------------------------ #
# Criterio desiderabile — Layout a colonne singola
# ------------------------------------------------------------------ #
if line_lengths:
median_len = statistics.median(line_lengths)
short_lines = sum(1 for l in line_lengths if l < median_len * 0.4)
short_ratio = short_lines / len(line_lengths)
if short_ratio < 0.15:
results.append(("Layout a colonne singola (desiderabile)", "PASS",
f"Righe corte: {short_ratio:.0%} — struttura lineare"))
elif short_ratio < 0.35:
results.append(("Layout a colonne singola (desiderabile)", "WARN",
f"Righe corte: {short_ratio:.0%} — possibile layout a colonne parziale"))
else:
results.append(("Layout a colonne singola (desiderabile)", "WARN",
f"Righe corte: {short_ratio:.0%} — probabile layout a colonne multiple"))
else:
results.append(("Layout a colonne singola (desiderabile)", "WARN",
"Impossibile analizzare (nessuna riga estratta)"))
# ------------------------------------------------------------------ #
# Criterio desiderabile — Struttura logica (titoli)
# ------------------------------------------------------------------ #
candidate_headings = [
line.strip() for line in all_text.splitlines()
if 3 <= len(line.strip()) <= 80
and line.strip()[0].isupper()
and not line.strip().endswith(".")
and not line.strip().endswith(",")
and len(line.strip().split()) <= 10
]
heading_density = len(candidate_headings) / total_pages if total_pages > 0 else 0
if heading_density >= 1.0:
results.append(("Struttura logica riconoscibile (desiderabile)", "PASS",
f"~{len(candidate_headings)} possibili titoli rilevati ({heading_density:.1f}/pagina)"))
elif heading_density >= 0.3:
results.append(("Struttura logica riconoscibile (desiderabile)", "WARN",
f"~{len(candidate_headings)} possibili titoli ({heading_density:.1f}/pagina) — struttura parziale"))
else:
results.append(("Struttura logica riconoscibile (desiderabile)", "WARN",
"Pochi titoli rilevati — testo narrativo o struttura non standard"))
_render_results(results, out)
_maybe_save(lines, path, save)
def _render_results(results: list, out) -> None:
icons = {"PASS": "", "WARN": "⚠️ ", "FAIL": ""}
out()
for label, status, message in results:
icon = icons.get(status, " ")
out(f" {icon} {label}")
out(f" {message}")
out()
fails = [r for r in results if r[1] == "FAIL"]
warns = [r for r in results if r[1] == "WARN"]
if fails:
out("ESITO: ❌ PDF NON IDONEO")
out(" Criteri obbligatori non soddisfatti — scegli un PDF diverso.")
elif warns:
out("ESITO: ⚠️ PDF ACCETTABILE CON CAUTELA")
out(" Procedi, ma aspettati più lavoro nella revisione manuale (step 4).")
else:
out("ESITO: ✅ PDF IDONEO")
out(" Tutti i criteri soddisfatti — procedi con lo step 1.")
out()
def _maybe_save(lines: list, pdf_path: Path, save: bool) -> None:
if not save:
return
script_dir = Path(__file__).parent
out_file = script_dir / f"{pdf_path.stem}_step0_report.txt"
out_file.write_text("\n".join(lines), encoding="utf-8")
print(f"Report salvato in: {out_file}")
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
sources_dir = project_root / "sources"
if not sources_dir.exists():
print(f"Errore: cartella sources/ non trovata in {project_root}")
sys.exit(1)
pdfs = sorted(sources_dir.glob("*.pdf"))
if not pdfs:
print(f"Errore: nessun PDF trovato in {sources_dir}")
sys.exit(1)
for pdf in pdfs:
check_pdf(str(pdf), save=True)
if len(pdfs) > 1:
print("-" * 50)
-199
View File
@@ -1,199 +0,0 @@
#!/usr/bin/env python3
"""
Step 1 Ispezione automatica PDF
Analizza il PDF pagina per pagina e produce un report con score (0100)
e lista dei problemi per pagina. Serve per capire la qualità del documento
e mappare i problemi prima della revisione manuale (step 4).
Uso:
python step1/inspect.py
Output:
step1/<nome_pdf>_step1_report.txt
"""
import re
import sys
import statistics
from collections import Counter
from datetime import datetime
from pathlib import Path
# ── Penalità per il calcolo dello score ───────────────────────────────────
SYLLABIF_PENALTY = 0.3 # per occorrenza di sillabazione
COLUMN_PENALTY = 3.0 # per pagina con layout a colonne
UNICODE_PENALTY = 1.5 # per pagina con caratteri anomali
EMPTY_PENALTY = 1.0 # per pagina vuota
HEADER_FOOTER_PEN = 5.0 # fisso se intestazioni/piè ripetitivi rilevati
def inspect_pdf(pdf_path: str, save: bool = True) -> None:
try:
import pdfplumber
except ImportError:
print("Errore: pdfplumber non è installato.")
print(" pip install pdfplumber")
sys.exit(1)
path = Path(pdf_path)
if not path.exists():
print(f"Errore: file non trovato — {pdf_path}")
sys.exit(1)
lines = []
def out(text=""):
lines.append(text)
print(text)
out("Step 1 — Ispezione automatica PDF")
out(f"File: {path.name}")
out(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
out("=" * 50)
# ── Lettura pagine ─────────────────────────────────────────────────────
with pdfplumber.open(path) as pdf:
n_pages = len(pdf.pages)
pages_text = [page.extract_text() or "" for page in pdf.pages]
# ── Analisi per pagina ─────────────────────────────────────────────────
issues = [] # (page_num, descrizione) — page_num=0 → problema globale
deductions = 0.0
first_lines = [] # prima riga significativa di ogni pagina (per header)
last_lines = [] # ultima riga significativa di ogni pagina (per footer)
for i, text in enumerate(pages_text):
page_num = i + 1
stripped = text.strip()
# 1. Pagina vuota
if len(stripped) < 50:
issues.append((page_num, "pagina vuota"))
deductions += EMPTY_PENALTY
continue
page_lines = text.splitlines()
nonempty = [l.strip() for l in page_lines if l.strip()]
# Raccogli prima/ultima riga per il controllo header/footer
if nonempty:
first_lines.append(nonempty[0])
last_lines.append(nonempty[-1])
# 2. Sillabazione a fine riga (es. "estra-" + a capo)
syllabif = sum(
1 for line in page_lines
if re.search(r'\b\w{2,}-$', line.rstrip())
)
if syllabif:
label = "occorrenza" if syllabif == 1 else "occorrenze"
issues.append((page_num, f"sillabazione rilevata ({syllabif} {label})"))
deductions += syllabif * SYLLABIF_PENALTY
# 3. Layout a colonne (righe molto corte e numerose)
if len(nonempty) >= 10:
median_len = statistics.median(len(l) for l in nonempty)
short_ratio = sum(1 for l in nonempty if len(l) < median_len * 0.4) / len(nonempty)
if short_ratio > 0.35:
issues.append((page_num, f"possibile layout a colonne ({short_ratio:.0%} righe corte)"))
deductions += COLUMN_PENALTY
# 4. Caratteri Unicode anomali
# (control chars esclusi \n \t \r, replacement char, PUA block)
anomalies = re.findall(
r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f\ufffd\ue000-\uf8ff]', text
)
if anomalies:
issues.append((page_num, f"caratteri Unicode anomali ({len(anomalies)} trovati)"))
deductions += UNICODE_PENALTY
# ── Intestazioni e piè di pagina ripetitivi ────────────────────────────
def _check_repetition(line_list: list, label: str) -> None:
nonlocal deductions
if not line_list:
return
threshold = max(3, len(line_list) * 0.25)
repeated = [
(txt, cnt) for txt, cnt in Counter(line_list).items()
if cnt >= threshold and len(txt) > 3
]
if repeated:
deductions += HEADER_FOOTER_PEN
for txt, cnt in repeated[:3]:
issues.append((0, f"{label} ripetitivo: \"{txt[:45]}\" ({cnt} volte)"))
_check_repetition(first_lines, "intestazione")
_check_repetition(last_lines, "piè di pagina")
# ── Score ──────────────────────────────────────────────────────────────
score = max(0, round(100 - deductions))
# ── Riepilogo ──────────────────────────────────────────────────────────
pages_with_issues = len({p for p, _ in issues if p > 0})
out()
out(f"Score: {score}/100")
out(f"Pagine totali: {n_pages}")
out(f"Pagine con problemi: {pages_with_issues}")
out()
if issues:
global_issues = [(p, d) for p, d in issues if p == 0]
page_issues = sorted([(p, d) for p, d in issues if p > 0])
for _, desc in global_issues:
out(f" ⚠️ {desc}")
for page_num, desc in page_issues:
out(f" Pagina {page_num:>4}: {desc}")
else:
out(" Nessun problema rilevato.")
out()
# ── Prossimi passi ─────────────────────────────────────────────────────
out("PROSSIMI PASSI:")
if score >= 70:
out(" → conversione con marker funzionerà bene")
elif score >= 40:
out(" → conversione possibile, attendi più errori nella revisione")
else:
out(" → qualità bassa — valuta una fonte PDF migliore")
attention_pages = sorted({p for p, _ in issues if p > 0})
if attention_pages:
sample = ", ".join(str(p) for p in attention_pages[:10])
if len(attention_pages) > 10:
sample += f" … e altre {len(attention_pages) - 10}"
out(f" → attenzione alle pagine {sample} nella revisione manuale")
out()
_maybe_save(lines, path, save)
def _maybe_save(lines: list, pdf_path: Path, save: bool) -> None:
if not save:
return
script_dir = Path(__file__).parent
out_file = script_dir / f"{pdf_path.stem}_step1_report.txt"
out_file.write_text("\n".join(lines), encoding="utf-8")
print(f"Report salvato in: {out_file}")
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
sources_dir = project_root / "sources"
if not sources_dir.exists():
print(f"Errore: cartella sources/ non trovata in {project_root}")
sys.exit(1)
pdfs = sorted(sources_dir.glob("*.pdf"))
if not pdfs:
print(f"Errore: nessun PDF trovato in {sources_dir}")
sys.exit(1)
for pdf in pdfs:
inspect_pdf(str(pdf), save=True)
if len(pdfs) > 1:
print("-" * 50)
-80
View File
@@ -1,80 +0,0 @@
#!/usr/bin/env python3
"""
Step 2 Conversione PDF Markdown grezzo
Usa pymupdf4llm (PyMuPDF puro C, zero modelli ML, ~30-50 MB RAM)
per convertire ogni PDF in sources/ e organizza l'output in:
step-2/<stem>/raw.md MD grezzo, non modificare mai
step-2/<stem>/clean.md copia di lavoro per lo step 4
Uso:
python step-2/convert_pdf.py # tutti i PDF in sources/
python step-2/convert_pdf.py --pdf sources/doc.pdf # un solo PDF
"""
import argparse
import shutil
import sys
from pathlib import Path
import pymupdf4llm
def convert_pdf(pdf_path: Path, project_root: Path) -> bool:
stem = pdf_path.stem
out_dir = project_root / "step-2" / stem
raw_md = out_dir / "raw.md"
clean_md = out_dir / "clean.md"
print(f"\nConversione: {pdf_path.name}")
print(f" Output: step-2/{stem}/")
if raw_md.exists():
print(f" ⚠️ raw.md già presente — skip")
print(f" (elimina {raw_md} per riconvertire)")
return True
out_dir.mkdir(parents=True, exist_ok=True)
print(f" Conversione in corso...")
md_text = pymupdf4llm.to_markdown(str(pdf_path))
raw_md.write_text(md_text, encoding="utf-8")
shutil.copy2(raw_md, clean_md)
size_kb = raw_md.stat().st_size // 1024
print(f" ✅ raw.md salvato ({size_kb} KB)")
print(f" ✅ clean.md creato (copia di lavoro per step 4)")
return True
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 2 — Conversione PDF → Markdown")
parser.add_argument("--pdf", help="Percorso di un singolo PDF da convertire")
args = parser.parse_args()
if args.pdf:
pdf_path = Path(args.pdf)
if not pdf_path.exists():
print(f"Errore: file non trovato — {args.pdf}")
sys.exit(1)
pdfs = [pdf_path]
else:
sources_dir = project_root / "sources"
if not sources_dir.exists():
print(f"Errore: cartella sources/ non trovata in {project_root}")
sys.exit(1)
pdfs = sorted(sources_dir.glob("*.pdf"))
if not pdfs:
print(f"Errore: nessun PDF trovato in {sources_dir}")
sys.exit(1)
results = [convert_pdf(p, project_root) for p in pdfs]
ok_count = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok_count}/{total} PDF convertiti")
sys.exit(0 if all(results) else 1)
-223
View File
@@ -1,223 +0,0 @@
#!/usr/bin/env python3
"""
Step 3 Rilevamento struttura Markdown
Analizza il Markdown grezzo prodotto dallo step 2 senza modificarlo.
Copia i file da step-2/<stem>/ e produce structure_profile.json che
guida la revisione manuale (step 4) e il chunker adattivo (step 5).
Output in step-3/<stem>/:
raw.md copia da step-2 (non modificare mai)
clean.md copia da step-2 (da revisionare nello step 4)
structure_profile.json profilo strutturale
Uso:
python step-3/detect_structure.py # tutti i documenti in step-2/
python step-3/detect_structure.py --stem nietzsche # un solo documento
python step-3/detect_structure.py --force # riesegui anche se già presente
"""
import argparse
import json
import re
import shutil
import sys
from pathlib import Path
# ─── Language detection ───────────────────────────────────────────────────────
_IT_WORDS = frozenset([
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
])
_EN_WORDS = frozenset([
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
def detect_language(text: str) -> str:
words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower())
sample = words[:2000]
it = sum(1 for w in sample if w in _IT_WORDS)
en = sum(1 for w in sample if w in _EN_WORDS)
if it == 0 and en == 0:
return "unknown"
return "it" if it >= en else "en"
# ─── Markdown parsing ─────────────────────────────────────────────────────────
def split_sections(text: str, header_level: int) -> list[str]:
"""
Split text on headers of the given level (1=h1, 2=h2, 3=h3).
Returns list of body texts for each matching section.
"""
prefix = "#" * header_level + " "
parts = re.split(rf'(?m)^{re.escape(prefix)}.+', text)
# parts[0] is preamble, rest are section bodies
return [p for p in parts[1:] if p.strip()]
def count_headers(text: str, level: int) -> int:
prefix = "#" * level + " "
return len(re.findall(rf'(?m)^{re.escape(prefix)}', text))
def count_paragraphs(text: str) -> int:
"""Count non-empty, non-header paragraph blocks."""
blocks = re.split(r'\n{2,}', text)
return sum(1 for b in blocks if b.strip() and not re.match(r'^#+\s', b.strip()))
# ─── Core analysis ────────────────────────────────────────────────────────────
def analyze(raw_md_path: Path) -> dict:
text = raw_md_path.read_text(encoding="utf-8")
n_h1 = count_headers(text, 1)
n_h2 = count_headers(text, 2)
n_h3 = count_headers(text, 3)
n_paragrafi = count_paragraphs(text)
# Determine structural level and primary boundary
if n_h3 >= 5:
livello = 3
boundary = "h3"
strategia = "h3_aware"
section_bodies = split_sections(text, 3)
elif n_h2 >= 3:
livello = 2
boundary = "h2"
strategia = "h2_paragraph_split"
section_bodies = split_sections(text, 2)
elif n_h1 + n_h2 + n_h3 >= 1:
livello = 1
boundary = "paragrafo"
strategia = "paragraph"
section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()]
else:
if n_paragrafi >= 3:
livello = 1
boundary = "paragrafo"
strategia = "paragraph"
section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()]
else:
livello = 0
boundary = "nessuno"
strategia = "sliding_window"
section_bodies = [text] if text.strip() else []
lengths = [len(b) for b in section_bodies if b.strip()]
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
lingua = detect_language(text)
avvertenze = []
short = sum(1 for l in lengths if l < 200)
long_ = sum(1 for l in lengths if l > 800)
if short:
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
if long_:
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
return {
"livello_struttura": livello,
"n_h1": n_h1,
"n_h2": n_h2,
"n_h3": n_h3,
"n_paragrafi": n_paragrafi,
"boundary_primario": boundary,
"lingua_rilevata": lingua,
"lunghezza_media_sezione": lunghezza_media,
"strategia_chunking": strategia,
"avvertenze": avvertenze,
}
# ─── Per-document processing ─────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
src_dir = project_root / "step-2" / stem
out_dir = project_root / "step-3" / stem
raw_src = src_dir / "raw.md"
clean_src = src_dir / "clean.md"
profile_out = out_dir / "structure_profile.json"
print(f"\nDocumento: {stem}")
if not raw_src.exists():
print(f" ✗ raw.md non trovato in step-2/{stem}/ — skip")
return False
if profile_out.exists() and not force:
print(f" ⚠️ structure_profile.json già presente — skip")
print(f" (usa --force per rieseguire)")
return True
out_dir.mkdir(parents=True, exist_ok=True)
# Copy files from step-2
shutil.copy2(raw_src, out_dir / "raw.md")
if clean_src.exists():
shutil.copy2(clean_src, out_dir / "clean.md")
print(f" Copiati raw.md e clean.md da step-2/{stem}/")
# Analyze
print(f" Analisi struttura in corso...")
profile = analyze(out_dir / "raw.md")
profile_out.write_text(json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8")
# Report
_LIVELLO_DESC = {
3: "struttura ricca (###)",
2: "struttura parziale (##)",
1: "solo paragrafi",
0: "testo piatto",
}
print(f" ✅ Livello {profile['livello_struttura']}{_LIVELLO_DESC[profile['livello_struttura']]}")
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} paragrafi={profile['n_paragrafi']}")
print(f" Boundary: {profile['boundary_primario']} | Strategia: {profile['strategia_chunking']}")
print(f" Lingua: {profile['lingua_rilevata']} | Lunghezza media sezione: {profile['lunghezza_media_sezione']} char")
for w in profile["avvertenze"]:
print(f" ⚠️ {w}")
print(f" ✅ structure_profile.json salvato")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 3 — Rilevamento struttura Markdown")
parser.add_argument("--stem", help="Nome del documento (sottocartella di step-2/)")
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
step2_dir = project_root / "step-2"
if not step2_dir.exists():
print(f"Errore: cartella step-2/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(p.name for p in step2_dir.iterdir() if p.is_dir())
if not stems:
print(f"Errore: nessun documento trovato in step-2/")
sys.exit(1)
results = [process_stem(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti analizzati")
sys.exit(0 if all(results) else 1)
-433
View File
@@ -1,433 +0,0 @@
#!/usr/bin/env python3
"""
Step 4 Revisione automatica del Markdown
Trasforma clean.md da step-3 rivelando la struttura latente del documento.
Le trasformazioni sono euristiche universali che funzionano su qualsiasi PDF:
- Normalizza whitespace multiplo (artefatto PDF)
- Riduce righe vuote multiple
- Rimuove marcatori **bold** nelle intestazioni esistenti
- Converte righe ALL-CAPS standalone ## header (euristico, qualsiasi lingua)
- Converte sezioni numerate "N. testo" ### N. (qualsiasi numerazione)
- Rimuove blocchi TOC (righe che iniziano con parole-chiave indice)
Per ogni documento viene ricalcolato il profilo strutturale: il livello può
salire (es. livello 1 3) se le strutture latenti vengono rilevate.
Output in step-4/<stem>/:
raw.md copia da step-3 (non modificare mai)
clean.md MD revisionato
structure_profile.json profilo aggiornato dopo la revisione
Uso:
python step-4/revise.py # tutti i documenti in step-3/
python step-4/revise.py --stem nietzsche # un solo documento
python step-4/revise.py --force # riesegui anche se già presente
"""
import argparse
import json
import re
import shutil
import sys
from datetime import date
from pathlib import Path
# Riusa la funzione analyze() già scritta nello step 3
sys.path.insert(0, str(Path(__file__).parent.parent / "step-3"))
from detect_structure import analyze # noqa: E402
# ─── Costanti ─────────────────────────────────────────────────────────────────
# Parole-chiave che identificano blocchi TOC (da rimuovere)
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
])
# Preposizioni/articoli da non capitalizzare nel title-case
_STOP_IT_EN = frozenset([
# italiano
"di", "del", "della", "dei", "delle", "da", "in", "e", "il", "la",
"lo", "le", "gli", "un", "una", "per", "a", "al", "alla", "ai",
"alle", "con", "su", "sul", "sulla", "che", "o",
# inglese
"of", "the", "a", "an", "and", "or", "but", "in", "on", "at",
"to", "for", "with", "by", "from", "as",
])
# Ordinali italiani → romani (per titoli come "CAPITOLO PRIMO")
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
# Ordinali inglesi → arabici (per "CHAPTER ONE")
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
# ─── Utilità ──────────────────────────────────────────────────────────────────
def _sentence_case(s: str) -> str:
"""
Sentence-case: prima lettera maiuscola, resto minuscolo.
Corretto per l'italiano e accettabile per l'inglese accademico.
"""
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
"""
True se la riga è una candidata per conversione a ## header.
Criterio: tutti i caratteri alfabetici sono maiuscoli, lunghezza >= 3.
"""
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
)
def _allcaps_to_header(raw_line: str) -> str:
"""
Converte una riga ALL-CAPS in un ## header title-case.
Riconosce pattern specifici (CAPITOLO ORDINE, CHAPTER N) come bonus,
ma funziona in modalità generica su qualsiasi testo.
"""
text = raw_line.strip().rstrip('.').rstrip('?').strip()
# ── Pattern italiano: "CAPITOLO PRIMO. TITOLO DEL CAPITOLO"
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf'^CAPITOLO ({_ORD_IT_PAT})\. (.+)', text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip('.').rstrip('?').strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
# ── Pattern inglese: "CHAPTER ONE. TITLE" o "CHAPTER 1. TITLE"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf'^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)', text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip('.').rstrip('?').strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
# ── Pattern generico con numerazione romana o arabica nel prefisso
m = re.match(r'^([IVXLCDM]+|[0-9]+)\. (.+)', text)
if m:
n = m.group(1)
titolo = m.group(2).rstrip('.').strip()
return f"## {n}. {_sentence_case(titolo)}"
# ── Caso generico: tutto maiuscolo senza pattern riconoscibile
return f"## {_sentence_case(text)}"
def _is_toc_line(line: str) -> bool:
"""True se la riga è l'intestazione di un blocco indice/TOC."""
first_word = line.strip().split('.')[0].strip().lower()
return first_word in _TOC_KEYWORDS
# ─── Trasformazioni ────────────────────────────────────────────────────────────
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica tutte le trasformazioni strutturali al testo MD.
Restituisce (testo_modificato, statistiche).
"""
stats = {
"toc_rimosso": False,
"n_header_allcaps": 0,
"n_sezioni_numerate": 0,
"n_paragrafi_uniti": 0,
}
# ── 1. Rimuovi marcatori **bold** nelle intestazioni esistenti
# ## **Titolo** → ## Titolo
text = re.sub(
r'^(#{1,6})\s+\*\*(.+?)\*\*\s*$',
r'\1 \2',
text, flags=re.MULTILINE,
)
# ── 1b. Normalizza header esistenti con contenuto ALL-CAPS → sentence-case
# ## AL DI LA' DEL BENE E DEL MALE → ## Al di la' del bene e del male
def _norm_allcaps_header(m: re.Match) -> str:
hashes = m.group(1)
content = m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(
r'^(#{1,6}) (.+)$',
_norm_allcaps_header,
text, flags=re.MULTILINE,
)
# ── 2. Rimuovi blocco TOC (riga indice + contenuto inline sulla stessa riga)
# "INDICE. Capitolo 1 Capitolo 2 ..." → rimossa
lines = text.split('\n')
new_lines = []
for line in lines:
if _is_toc_line(line):
stats["toc_rimosso"] = True
else:
new_lines.append(line)
text = '\n'.join(new_lines)
# ── 3. Converti righe ALL-CAPS standalone → ## header
# Una riga è "standalone" se è preceduta/seguita da riga vuota
# oppure si trova all'inizio/fine del documento.
blocks = text.split('\n\n')
new_blocks = []
for block in blocks:
stripped = block.strip()
# Blocco standalone = un'unica riga (nessun \n interno rilevante)
if '\n' not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
stats["n_header_allcaps"] += 1
else:
# Controlla riga per riga per righe ALL-CAPS seguite da altri contenuti
sub_lines = block.split('\n')
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
stats["n_header_allcaps"] += 1
else:
converted.append(ln)
new_blocks.append('\n'.join(converted))
text = '\n\n'.join(new_blocks)
# ── 4. Converti sezioni numerate "N. testo" → "### N.\n\ntesto"
# Riconosce: "1. Testo", "42. Testo" (due o più spazi dopo il punto)
def _num_repl(m: re.Match) -> str:
num = m.group(1)
testo = m.group(2).strip()
stats["n_sezioni_numerate"] += 1
return f"### {num}.\n\n{testo}"
# Pattern standard: "1. testo" o "1. testo"
text = re.sub(
r'^(\d+)\.\s+(.+)$',
_num_repl,
text, flags=re.MULTILINE,
)
# Pattern con lettera-suffisso: "65 a. testo" o "65a. testo"
def _num_letter_repl(m: re.Match) -> str:
num = m.group(1) + m.group(2)
testo = m.group(3).strip()
stats["n_sezioni_numerate"] += 1
return f"### {num}.\n\n{testo}"
text = re.sub(
r'^(\d+)\s*([a-z])\.\s+(.+)$',
_num_letter_repl,
text, flags=re.MULTILINE,
)
# ── 5. Unisci paragrafi spezzati da salti pagina PDF
# Criterio: blocco A non finisce con punteggiatura di fine frase,
# blocco B non inizia con maiuscola "di sezione" né è un header.
# Unione sicura: mai attraverso confini ###/##.
_SENTENCE_END = set('.?!»)\'"')
blocks = text.split('\n\n')
merged = []
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
# Prova a unire con il successivo se la frase è spezzata
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith('#')
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
# Non unire se il successivo è un header o è vuoto
if not nxt or nxt.startswith('#'):
break
# Non unire se il successivo inizia con una cifra seguita da punto
# (sarebbe l'inizio di un nuovo aforisma non ancora convertito)
if re.match(r'^\d+\.', nxt):
break
b = stripped + ' ' + nxt
stripped = b.strip()
stats["n_paragrafi_uniti"] += 1
i += 1
merged.append(b)
i += 1
text = '\n\n'.join(merged)
# ── 6. Normalizza whitespace multiplo interno alle righe
# "parola parola" → "parola parola" (inclusi gli header)
lines = text.split('\n')
normalized = []
for line in lines:
if not line.strip():
normalized.append(line)
else:
normalized.append(re.sub(r' +', ' ', line))
text = '\n'.join(normalized)
# ── 7. Riduci righe vuote multiple a doppie
text = re.sub(r'\n{3,}', '\n\n', text)
return text, stats
# ─── Aggiornamento revision log ────────────────────────────────────────────────
def update_revision_log(
log_path: Path,
stem: str,
profile_before: dict,
profile_after: dict,
t_stats: dict,
) -> None:
header_exists = log_path.exists() and log_path.stat().st_size > 0
avv = profile_after.get("avvertenze", [])
avv_str = "; ".join(avv) if avv else "nessuna"
entry = f"""
## {stem} — {date.today().isoformat()}
**Trasformazioni automatiche:**
- Normalizzazione whitespace multiplo e righe vuote
- Blocco TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}
- Righe ALL-CAPS ## header: {t_stats['n_header_allcaps']}
- Sezioni numerate ### header: {t_stats['n_sezioni_numerate']}
- Paragrafi uniti (salti pagina PDF): {t_stats['n_paragrafi_uniti']}
- Livello struttura: {profile_before.get('livello_struttura', '?')} {profile_after.get('livello_struttura', '?')}
**Avvertenze residue:** {avv_str}
**Revisioni manuali pendenti:**
- [ ] Verificare conversioni ALL-CAPS errate
- [ ] Controllare sezioni troppo corte o troppo lunghe
"""
if not header_exists:
log_path.write_text("# Revision log\n" + entry, encoding="utf-8")
else:
existing = log_path.read_text(encoding="utf-8")
log_path.write_text(existing + entry, encoding="utf-8")
# ─── Per-document processing ─────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
src_dir = project_root / "step-3" / stem
out_dir = project_root / "step-4" / stem
raw_src = src_dir / "raw.md"
clean_src = src_dir / "clean.md"
profile_src = src_dir / "structure_profile.json"
clean_out = out_dir / "clean.md"
profile_out = out_dir / "structure_profile.json"
print(f"\nDocumento: {stem}")
if not clean_src.exists():
print(f" ✗ clean.md non trovato in step-3/{stem}/ — skip")
return False
if clean_out.exists() and not force:
print(f" ⚠️ clean.md già presente — skip")
print(f" (usa --force per rieseguire)")
return True
out_dir.mkdir(parents=True, exist_ok=True)
# Copia raw.md immutabile (riferimento)
if raw_src.exists():
shutil.copy2(raw_src, out_dir / "raw.md")
print(f" Copiato raw.md da step-3/{stem}/")
# Leggi profilo step-3 (per confronto nel report)
profile_before: dict = {}
if profile_src.exists():
profile_before = json.loads(profile_src.read_text(encoding="utf-8"))
# Applica trasformazioni
print(f" Applicazione trasformazioni strutturali...")
text = clean_src.read_text(encoding="utf-8")
text_revised, t_stats = apply_transforms(text)
# Salva clean.md revisionato
clean_out.write_text(text_revised, encoding="utf-8")
# Ricalcola profilo sul nuovo clean.md
profile_after = analyze(clean_out)
profile_out.write_text(
json.dumps(profile_after, ensure_ascii=False, indent=2),
encoding="utf-8",
)
# Report
lv_b = profile_before.get("livello_struttura", "?")
lv_a = profile_after["livello_struttura"]
_STRAT = {3: "h3_aware", 2: "h2_paragraph_split", 1: "paragraph", 0: "sliding_window"}
print(f" ✅ Livello struttura: {lv_b}{lv_a} ({_STRAT.get(lv_a, '?')})")
print(f" h2: {profile_before.get('n_h2','?')}{profile_after['n_h2']}")
print(f" h3: {profile_before.get('n_h3','?')}{profile_after['n_h3']}")
print(f" TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}")
print(f" Righe ALL-CAPS → ##: {t_stats['n_header_allcaps']}")
print(f" Sezioni numerate → ###: {t_stats['n_sezioni_numerate']}")
print(f" Paragrafi uniti (salti pagina): {t_stats['n_paragrafi_uniti']}")
for w in profile_after["avvertenze"]:
print(f" ⚠️ {w}")
# Aggiorna revision log (direttamente in step-4/, non in sottocartella)
log_path = project_root / "step-4" / "revision_log.md"
update_revision_log(log_path, stem, profile_before, profile_after, t_stats)
print(f" ✅ step-4/revision_log.md aggiornato")
print(f" ✅ structure_profile.json salvato")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 4 — Revisione automatica Markdown")
parser.add_argument("--stem", help="Nome del documento (sottocartella di step-3/)")
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
step3_dir = project_root / "step-3"
if not step3_dir.exists():
print(f"Errore: cartella step-3/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(p.name for p in step3_dir.iterdir() if p.is_dir())
if not stems:
print(f"Errore: nessun documento trovato in step-3/")
sys.exit(1)
results = [process_stem(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti revisionati")
sys.exit(0 if all(results) else 1)