diff --git a/.claude/commands/prepare-md.md b/.claude/commands/prepare-md.md new file mode 100644 index 0000000..0ed1f30 --- /dev/null +++ b/.claude/commands/prepare-md.md @@ -0,0 +1,199 @@ +--- +description: Legge un file Markdown, individua tutti i problemi che compromettono il chunking (artefatti, sillabazione, header malformati, paragrafi spezzati, gerarchia incoerente, sezioni vuote) e applica le correzioni direttamente sul file senza chiedere conferma per i casi chiari. +allowed-tools: Read Bash Grep Edit +argument-hint: +--- + +Risolvi il percorso del file da preparare: + +!`python3 -c " +import sys, json, re +from pathlib import Path + +arg = '$ARGUMENTS'.strip() +root = Path('.') + +candidates = [ + Path(arg), + root / arg, + root / 'conversione' / arg / 'clean.md', + root / 'step-4' / arg / 'clean.md', +] + +md_path = None +for p in candidates: + if p.exists() and p.suffix == '.md': + md_path = p + break + +if not md_path: + print('ERRORE: file non trovato per:', arg) + sys.exit(1) + +print('MD_PATH=' + str(md_path)) + +# Cerca profilo strutturale (report.json o structure_profile.json) +stem = md_path.parent.name +profile_candidates = [ + md_path.parent / 'report.json', + md_path.parent / 'structure_profile.json', + root / 'step-4' / stem / 'structure_profile.json', + root / 'conversione' / stem / 'report.json', +] +for sp in profile_candidates: + if sp.exists(): + try: + d = json.load(open(sp)) + st = d.get('structure', d) + print(f'STRATEGIA={st.get(\"strategia_chunking\",\"?\")}') + print(f'LINGUA={st.get(\"lingua_rilevata\",\"?\")}') + print(f'H1={st.get(\"n_h1\",0)} H2={st.get(\"n_h2\",0)} H3={st.get(\"n_h3\",0)}') + for a in st.get('avvertenze', []): + print(f'AVVISO: {a}') + except Exception: + pass + break + +# Statistiche file +text = md_path.read_text(encoding='utf-8') +lines = text.split('\n') +pua = len(re.findall(r'[\ue000-\uf8ff]', text)) +print(f'RIGHE={len(lines)} CHARS={len(text)}') +if pua: + print(f'PUA_RESIDUI={pua}') +" 2>/dev/null` + +Se l'output contiene `ERRORE`, comunica il percorso non trovato e fermati. + +--- + +Leggi il file completo identificato da `MD_PATH` nell'output sopra. Poi esegui **tutti** i controlli e applica le correzioni nell'ordine indicato. + +I parametri di riferimento per il chunking sono: **MIN_CHARS=200, MAX_CHARS=800**. + +--- + +## Controllo 1 — Sillabazione residua + +Cerca blocchi di testo (non header) dove una riga termina con `-` e la successiva inizia con lettera minuscola: è un'interruzione di parola non risolta da PDF. + +Esempio da correggere: +``` +...il meccanismo di decen- +tralizzazione permette... +``` +→ `...il meccanismo di decentralizzazione permette...` + +**Applica** ogni fusione con Edit. Se la parola ricomposta sembra errata, segnala invece di correggere. + +--- + +## Controllo 2 — Artefatti di pagina + +Righe standalone che sono esclusivamente: +- Un numero intero isolato (numero di pagina) +- Titolo del libro / nome autore che si ripete identico 3+ volte nel documento +- Intestazioni di capitolo che si ripetono (es. `## 3. Termodinamica` appare sia come header legittimo che come riga di testo duplicata) + +**Applica** la rimozione con Edit per le ripetizioni chiaramente decorative. Segnala i casi ambigui. + +--- + +## Controllo 3 — Numeri di pagina in header + +Header che terminano con ` | N` o ` N` dove N è un numero isolato (residuo di indice non rimosso): +- `### 16. Link vari | 109` → `### 16. Link vari` +- `## Capitolo 3 42` → `## Capitolo 3` + +**Applica** con Edit. + +--- + +## Controllo 4 — Header malformati + +Per ogni header (`#`, `##`, `###`): + +**a) ALL-CAPS non convertito:** +`## TERMODINAMICA DEI PROCESSI` → `## Termodinamica dei processi` +Usa sentence case (prima lettera maiuscola, resto minuscolo salvo nomi propri evidenti). +**Applica**. + +**b) Livello h4/h5/h6:** +`#### Sottosezione` → `### Sottosezione` +**Applica**. + +**c) Testo troppo lungo (> 120 char):** +Probabilmente non è un header ma testo estratto erroneamente. Rimuovi i `#` iniziali lasciando il testo come paragrafo normale. +**Applica** se chiaramente non è un titolo. Segnala se ambiguo. + +**d) Header duplicati:** +Se lo stesso header appare due volte, rimuovi la seconda occorrenza (o la prima se è quella fuori contesto). +**Applica**. + +--- + +## Controllo 5 — Paragrafi spezzati + +Blocchi di testo (non header, non liste) che terminano senza punteggiatura finale (`.?!»)`). + +Se il blocco successivo non inizia con lettera maiuscola e non è un header/lista, i due blocchi sono parte della stessa frase spezzata da un salto pagina PDF. + +**Applica** la fusione solo quando sei certo (la congiunzione è evidente: inizia con congiunzione, continua la frase in modo inequivocabile). Segnala i casi dubbi invece di correggere. + +--- + +## Controllo 6 — Sezioni quasi-vuote o vuote + +Sezione (header + corpo) con corpo < 100 caratteri: +- Se il contenuto è evidentemente una sottosezione o introduzione di ciò che segue (e non ha senso da solo), rimuovi l'header e unisci il testo alla sezione precedente o successiva. +- Se è un header di capitolo che introduce legittime sottosezioni (`##` seguito da `###`), lascia invariato. + +**Applica** le fusioni sicure. Segnala quelle ambigue. + +--- + +## Controllo 7 — Gerarchia heading + +Verifica che la gerarchia sia coerente. Problemi da correggere: + +- Più di un `# ` (h1) nel documento → il secondo e successivi diventano `## ` salvo che siano chiaramente titoli di parti distinte +- `### ` prima del primo `## ` → abbassa il `###` a `## ` o aggiungi un `## ` genitore appropriato +- `## ` prima del primo `# ` in documenti con h1 → lascia invariato (alcuni documenti non hanno h1) + +**Applica** solo le correzioni di livello sicure. Segnala le ristrutturazioni che richiedono giudizio. + +--- + +## Controllo 8 — Sezioni troppo lunghe senza struttura + +Sezione (## o ###) con corpo > 3000 caratteri e nessun header figlio al suo interno: il chunker la spezzerà su frasi in modo meccanico, perdendo coerenza semantica. + +Se il testo contiene chiari cambio-argomento (paragrafi separati da riga vuota, con transizioni come "Inoltre...", "In secondo luogo...", "Un altro aspetto..."), considera di aggiungere un `### ` per suddividere semanticamente. + +**Non aggiungere header inventati.** Segnala le sezioni candidate e proponi i titoli: applica solo su risposta affermativa. + +--- + +## Report finale + +Dopo aver applicato tutte le correzioni automatiche, mostra: + +``` +File: +Correzioni applicate: N totali + + Sillabazione risolta: N + Artefatti pagina rimossi: N + Numeri pagina in header: N + Header normalizzati: N (ALL-CAPS, livello, lunghezza, duplicati) + Paragrafi fusi: N + Sezioni quasi-vuote risolte:N + Gerarchia corretta: N + +Problemi aperti (richiedono giudizio manuale): + [riga N] + ... +``` + +Se non ci sono problemi aperti: **"Markdown pronto per il chunking."** +Se ci sono problemi aperti: elencali e chiedi quali applicare. diff --git a/.claude/commands/step4-review.md b/.claude/commands/step4-review.md deleted file mode 100644 index 61c5566..0000000 --- a/.claude/commands/step4-review.md +++ /dev/null @@ -1,115 +0,0 @@ ---- -description: Revisione qualitativa del clean.md dopo il pre-processing automatico (step 4). Trova artefatti residui, paragrafi spezzati e header errati, poi propone le correzioni. -allowed-tools: Read Bash Grep Edit -argument-hint: ---- - -Esegui la revisione qualitativa di `step-4/$ARGUMENTS/clean.md`. - -**Cosa è già stato fatto automaticamente (revision_log):** -!`grep -A 12 "^## $ARGUMENTS" step-4/revision_log.md 2>/dev/null || echo "(nessun log trovato per questo stem)"` - -**Profilo strutturale attuale:** -!`python3 -c " -import json, sys -try: - d = json.load(open('step-4/$ARGUMENTS/structure_profile.json')) - print(f'Livello: {d[\"livello_struttura\"]} Strategia: {d[\"strategia_chunking\"]}') - print(f'h1={d[\"n_h1\"]} h2={d[\"n_h2\"]} h3={d[\"n_h3\"]} paragrafi={d[\"n_paragrafi\"]}') - print(f'Lunghezza media sezione: {d[\"lunghezza_media_sezione\"]} char') - for a in d.get('avvertenze', []): print(f' ⚠️ {a}') -except Exception as e: print(f'ERRORE: {e}') -" 2>/dev/null` - ---- - -Analizza `step-4/$ARGUMENTS/clean.md` eseguendo i grep seguenti e ragionando sui risultati. Per ogni check: esegui il grep, conta i risultati, riporta i casi concreti (max 5 esempi con numero di riga). - -## Check 1 — Sillabazione residua - -Righe che terminano con trattino seguito da testo nella riga successiva (artefatto PDF non risolto): - -```bash -grep -n "\-$" step-4/$ARGUMENTS/clean.md | head -20 -``` - -Segnala se presenti: numero di riga, testo della riga e della riga successiva. - -## Check 2 — Righe orfane (artefatti PDF) - -Righe standalone (non header `#`, non vuote) di meno di 60 caratteri che sembrano artefatti: - -```bash -grep -n "^[^#\-\*\|].\{1,59\}$" step-4/$ARGUMENTS/clean.md | grep -v "^\s*$" | head -30 -``` - -Valuta ogni riga: è testo normale breve (legittimo) o artefatto (numero di pagina, nome autore isolato, riga di intestazione ripetuta)? - -## Check 3 — Paragrafi con frase spezzata - -Blocchi di testo che terminano senza punteggiatura di fine frase (`.?!»)`): - -```bash -grep -n "[^.!?»)\]\'\"]$" step-4/$ARGUMENTS/clean.md | grep -v "^[0-9]*:#" | grep -v "^[0-9]*:\s*$" | grep -v "^\s*[-\*]" | head -20 -``` - -Riporta i casi più sospetti (righe brevi che finiscono a metà concetto). - -## Check 4 — Header sospetti - -```bash -grep -n "^##\? " step-4/$ARGUMENTS/clean.md | head -40 -``` - -Verifica: -- `##` o `###` con contenuto interamente MAIUSCOLO non convertito → segnala -- Header duplicati (stesso testo che appare due volte) → segnala -- `##` con testo > 80 caratteri (probabile testo che non è un header) → segnala -- Salti di livello anomali (es. `###` senza un `##` padre) → segnala - -## Check 5 — Sezioni quasi vuote - -```bash -python3 -c " -import re, sys -text = open('step-4/$ARGUMENTS/clean.md').read() -sections = re.split(r'^(#{1,3} .+)$', text, flags=re.MULTILINE) -for i in range(1, len(sections)-1, 2): - header = sections[i].strip() - body = sections[i+1].strip() if i+1 < len(sections) else '' - if len(body) < 80 and body: - print(f'{header!r} → {len(body)} char: {body[:60]!r}') - elif not body: - print(f'{header!r} → VUOTA') -" 2>/dev/null | head -20 -``` - -Sezioni con body < 80 char o vuote compromettono il chunking. Segnala quelle che non hanno senso come sezione autonoma. - -## Check 6 — Gerarchia strutturale - -```bash -grep -n "^#\{1,3\} " step-4/$ARGUMENTS/clean.md | head -50 -``` - -Verifica che la gerarchia sia coerente: `# → ## → ###`. Segnala se ci sono `###` prima del primo `##`, o `##` prima del primo `#`, o `#` multipli (più di un h1). - ---- - -## Report finale - -``` -🔴 BLOCCANTI (compromettono il chunking o il retrieval) - [riga N] descrizione precisa del problema - ... - -🟡 MINORI (artefatti visibili, non bloccanti) - [riga N] descrizione - ... - -🟢 OK — nessun problema rilevato in questa categoria -``` - -Poi chiedi: **"Applico le correzioni per i 🔴? E per i 🟡?"** - -Applica solo ciò che viene esplicitamente approvato. Usa Edit per ogni modifica — mai riscrivere l'intero file. diff --git a/.gitignore b/.gitignore index 69458fa..0b18250 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,6 @@ step-5/*/ # Output step-6 — report generati da verify_chunks.py step-6/*/ +# Output conversione/ — generati da conversione/pipeline.py +conversione/*/ + diff --git a/CLAUDE.md b/CLAUDE.md index 4b25071..698d0e4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,83 +4,76 @@ - **Lingua:** Rispondi sempre in italiano. - **Venv obbligatorio:** Usa `.venv/bin/python` o attiva con `source .venv/bin/activate`. Mai `pip`/`python` di sistema. -- **Non modificare `raw.md`:** `step-2//raw.md` è immutabile. La copia di lavoro è `step-4//clean.md`. +- **Non modificare `raw.md`:** Il file `raw.md` di ogni stem è immutabile. La copia di lavoro è sempre `clean.md`. --- -## Pipeline (ordine obbligatorio) +## Pipeline (operazioni in ordine) ``` -PDF (sources/) → step-0 → step-1 → step-2 → step-3 - → step-4 (CRITICO: revisione manuale clean.md) - → step-5 → step-6 → step-7 (Ollama) → step-8 → step-9 +PDF (sources/) + → conversione (PDF → clean.md + structure_profile.json) + → chunking (clean.md → chunks.json) + → verifica (chunks.json → report + fix automatici) + → vettorizzazione (chunks.json → ChromaDB) + → retrieval (query → risposta via Ollama) ``` Il parametro `--stem` identifica il documento (nome PDF senza `.pdf`). Lo stem è anche il nome della collection ChromaDB. -Comandi tipici: -```bash -source .venv/bin/activate -python step-4/revise.py --stem -python step-5/chunker.py --stem -python step-6/verify_chunks.py --stem -python step-8/ingest.py --stem -python step-9/rag.py --stem -``` - --- ## File critici | File | Ruolo | |---|---| -| `step-9/config.py` | Fonte di verità: `EMBED_MODEL`, `OLLAMA_MODEL`, `TOP_K`, `TEMPERATURE`, `SYSTEM_PROMPT` | -| `step-5/chunker.py` | Chunking adattivo — `MIN_CHARS=200`, `MAX_CHARS=800`, `OVERLAP_S=2` | -| `step-6/verify_chunks.py` | Verifica chunk — stesse soglie di `chunker.py` | -| `step-6/fix_chunks.py` | Fix automatici su chunk anomali | -| `step-4/revise.py` | Pre-processing MD automatico (8 trasformazioni euristiche) | -| `step-8/ingest.py` | Vettorizzazione ChromaDB — legge `EMBED_MODEL` da `config.py` | -| `step-9/rag.py` | Pipeline RAG interattiva | +| `config.py` | Fonte di verità: `EMBED_MODEL`, `OLLAMA_MODEL`, `TOP_K`, `TEMPERATURE`, `SYSTEM_PROMPT` | +| `chunker.py` | Chunking adattivo — `MIN_CHARS=200`, `MAX_CHARS=800`, `OVERLAP_S=2` | +| `verify_chunks.py` | Verifica chunk — stesse soglie di `chunker.py` | +| `fix_chunks.py` | Fix automatici su chunk anomali | +| `ingest.py` | Vettorizzazione ChromaDB — legge `EMBED_MODEL` da `config.py` | +| `rag.py` | Pipeline RAG interattiva | +| `conversione/pipeline.py` | Conversione PDF → clean Markdown strutturato | --- ## Regole di assistenza -**Modifica `EMBED_MODEL` in `step-9/config.py`:** +**Modifica `EMBED_MODEL` in `config.py`:** Avvisa sempre che serve rieseguire la vettorizzazione: ```bash -python step-8/ingest.py --stem --force +python ingest.py --stem --force ``` `ingest.py` importa `EMBED_MODEL` direttamente da `config.py` — la coerenza è critica: se violata non produce errori ma restituisce risultati insensati. **Modifica soglie chunking (`MIN_CHARS`, `MAX_CHARS`, `OVERLAP_S`):** -I valori compaiono in tre file che vanno sincronizzati manualmente: -1. `step-5/chunker.py` -2. `step-6/verify_chunks.py` -3. `step-6/fix_chunks.py` +I valori compaiono in più file che vanno sincronizzati manualmente: +- `chunker.py` +- `verify_chunks.py` +- `fix_chunks.py` -**Step 4 — revisione clean.md:** -`revise.py` applica trasformazioni automatiche, ma il risultato va sempre revisionato a mano. La qualità del RAG dipende da `clean.md` più di qualsiasi parametro tecnico. Suggerisci sempre `/step4-review ` dopo `revise.py`. +**Conversione PDF → Markdown:** +`conversione/pipeline.py` produce `raw.md` e `clean.md`. Il `clean.md` va sempre revisionato dopo la conversione automatica — la qualità del RAG dipende da esso più di qualsiasi parametro tecnico. Suggerisci sempre `/prepare-md conversione//clean.md` dopo la conversione. -**Step 6 — verifica chunk:** -Dopo `verify_chunks.py`, usa `/step6-fix ` prima di passare a step-8. +**Verifica chunk:** +Dopo `verify_chunks.py`, usa `/step6-fix ` prima di procedere con la vettorizzazione. --- ## Skills custom -- `/step4-review ` — Revisione qualitativa `clean.md`: artefatti, paragrafi spezzati, header errati. +- `/prepare-md ` — Revisione e correzione automatica di qualsiasi `clean.md`: sillabazione, artefatti, header malformati, paragrafi spezzati, gerarchia, sezioni vuote. Accetta path completo (`conversione/bitcoin/clean.md`) o stem (`bitcoin`). - `/step6-fix ` — Dry-run e applicazione fix chunk tramite `fix_chunks.py`. --- -## Struttura directory per stem +## Output per stem ``` -step-2//raw.md ← immutabile -step-4//clean.md ← copia di lavoro -step-4//structure_profile.json -step-5//chunks.json -step-6//report.json -chroma_db// ← collection ChromaDB +conversione//raw.md ← immutabile +conversione//clean.md ← copia di lavoro +conversione//structure_profile.json +/chunks.json +/report.json +chroma_db// ← collection ChromaDB ``` diff --git a/conversione/README.md b/conversione/README.md new file mode 100644 index 0000000..a5ef3fb --- /dev/null +++ b/conversione/README.md @@ -0,0 +1,236 @@ +# conversione — PDF → Markdown pulito + +Pipeline automatica che trasforma un PDF grezzo in Markdown strutturato e +pronto per la suddivisione in chunk. Gestisce l'intero processo: validazione +del PDF, estrazione del testo, pulizia strutturale e analisi della struttura +del documento. + +## Requisiti + +### Python +``` +pip install opendataloader-pdf pdfplumber +``` + +### Java 11+ +`opendataloader-pdf` richiede Java sul PATH. Se non è installato: + +```bash +# Ubuntu / Debian / WSL +sudo apt install default-jdk + +# Verifica +java -version +``` + +Download alternativo: https://adoptium.net/ + +--- + +## Utilizzo + +Posiziona il PDF in `sources/.pdf`, poi: + +```bash +# Singolo documento +python conversione/pipeline.py --stem + +# Tutti i PDF in sources/ +python conversione/pipeline.py + +# Forza la riesecuzione (sovrascrive output esistente) +python conversione/pipeline.py --stem --force +``` + +Il parametro `--stem` è il nome del file PDF senza estensione. +Esempio: `sources/analisi1.pdf` → `--stem analisi1` + +--- + +## Output + +Per ogni stem vengono prodotti tre file in `conversione//`: + +| File | Descrizione | +|------|-------------| +| `raw.md` | Markdown grezzo estratto dal PDF — **non modificare** | +| `clean.md` | Markdown pulito e strutturato — input per il chunker | +| `report.json` | Metriche complete di qualità della conversione | + +### report.json + +Contiene tutto ciò che serve per valutare la conversione: statistiche +trasformazioni, struttura rilevata, distribuzione lunghezze sezioni, +anomalie e problemi residui con esempi. + +```json +{ + "stem": "dirittoprivato", + "timestamp": "2026-04-16 15:41", + "transforms": { + "n_accenti_corretti": 0, + "n_dotleader_rimossi": 0, + "toc_rimosso": false, + "n_sezioni_numerate": 63, + "riduzione_pct": 1 + }, + "structure": { + "livello_struttura": 3, + "n_h1": 0, "n_h2": 6, "n_h3": 163, + "lingua_rilevata": "it", + "strategia_chunking": "h3_aware", + "avvertenze": [] + }, + "distribution": { "min": 12, "p25": 312, "mediana": 681, "p75": 1197, "max": 6120 }, + "anomalie": { + "bare_headers": 0, + "short_sections": 1, + "long_sections": 39, + "bare_headers_list": [], + "short_sections_list": [...], + "long_sections_list": [...] + }, + "residui": { + "backtick": 0, "dotleader": 0, "url": 0, "immagini": 0, + "backtick_esempi": [] + } +} +``` + +**`strategia_chunking`** indica come suddividere il documento in chunk: + +| Valore | Significato | +|--------|-------------| +| `h3_aware` | Documento ricco di sezioni `###` — usa i `###` come boundary | +| `h2_paragraph_split` | Struttura parziale `##` — suddividi per paragrafo dentro ogni `##` | +| `paragraph` | Nessuna gerarchia chiara — suddividi per paragrafo | +| `sliding_window` | Testo piatto — usa finestra scorrevole | + +--- + +## Validazione batch + +Dopo aver convertito uno o più documenti, esegui `validate.py` per ottenere +una tabella di stato su tutti gli stem: + +```bash +python conversione/validate.py +``` + +Output di esempio: + +``` +stem h2 h3 strategia bare corte lunghe backtick dotlead url status +────────────────────────────────────────────────────────────────────────────────────────────── +analisi1 13 279 h3_aware 0 36 151 10 0 0 ⚠️ +dirittoprivato 6 163 h3_aware 0 1 39 0 0 0 ✅ +nietzsche 4 303 h3_aware 6 104 100 0 0 0 ⚠️ +────────────────────────────────────────────────────────────────────────────────────────────── +Totale: 3 ✅ 1 ⚠️ 2 ❌ 0 +``` + +**Legenda colonne:** + +| Colonna | Significato | Soglia warning | +|---------|-------------|----------------| +| `bare` | Header solo-numero senza corpo (`### 1.` vuoto) | ≥ 1 | +| `corte` | Sezioni con corpo < 150 chars | informativo | +| `lunghe` | Sezioni con corpo > 1500 chars | ≥ 80 | +| `backtick` | Backtick `` ` `` residui nel testo | ≥ 1 | +| `dotlead` | Dot-leader residui (`. . . .`) | ≥ 1 | + +**Stato:** +- ✅ nessuna anomalia critica +- ⚠️ anomalie presenti, documento processabile ma da verificare +- ❌ struttura non rilevata (`livello_struttura = 0`) o > 50 backtick residui + +--- + +## Cosa fa la pipeline + +La pipeline esegue quattro fasi in sequenza. + +### Fase 1 — Validazione + +Verifica che il PDF esista, non sia vuoto, non sia protetto da password e +contenga testo digitale estraibile. I PDF scansionati (immagini) non sono +supportati. + +### Fase 2 — Estrazione testo + +Usa `opendataloader-pdf` con l'algoritmo **XY-Cut++** per ricostruire il +corretto ordine di lettura anche in documenti multi-colonna. Le immagini +vengono ignorate completamente — il `clean.md` non contiene mai riferimenti +a immagini. + +### Fase 3 — Pulizia strutturale + +Serie di trasformazioni applicate al Markdown grezzo: + +| Trasformazione | Problema risolto | +|----------------|-----------------| +| Rimozione riferimenti immagini | Artefatti `![...]()` lasciati dal convertitore | +| Fix accenti backtick LaTeX | `` `e``→`è`, ``puo` ``→`può`, ``sar`a``→`sarà` | +| Rimozione dot-leader TOC | `- 1.1 Titolo . . . . . 42` (voci indice) | +| Rimozione numerali romani pagina | `i`, `ii`, `iii` su riga isolata (footer LaTeX) | +| Fix header + body concatenati | `### 11 TitoloCorpo testo...` → header + paragrafo separati | +| Estrazione header Capitolo inline | `Capitolo 3: IL TITOLO` nel corpo → `## Capitolo 3: ...` | +| Normalizzazione livelli header | `####`, `#####` → `###` (gerarchia uniforme a 3 livelli) | +| Rimozione bold negli header | `## **Titolo**` → `## Titolo` | +| Normalizzazione ALL-CAPS header | `## IL TITOLO` → `## Il titolo` | +| Rimozione TOC | Blocchi indice/sommario rilevati per keyword | +| ALL-CAPS standalone → header | Righe in maiuscolo isolate → `## Titolo` | +| Sezioni numerate → header | `N. Titolo sezione` → `### N.` + corpo | +| Sezioni con punto → header | `- N. Testo aphorismo...` → `### N.` + corpo | +| Sezioni lista numerate → header | `- N Titolo Corpo testo...` → `### N. Titolo` + corpo | +| Unione paragrafi spezzati | Paragrafi tagliati dal salto pagina PDF ricongiunti | +| Normalizzazione whitespace | Spazi multipli ridotti a singoli | +| Riduzione righe vuote | Tre o più righe vuote consecutive → due | +| Rimozione URL watermark | `www.piattaforma.com`, `https://...` su riga isolata | +| Rimozione header senza corpo | Sezioni vuote e header watermark scartati | + +> **Rilevamento automatico tipo documento**: se il documento contiene sezioni +> "Esercizi" (libri di testo accademici), la conversione dei numeri di esercizio +> in header viene disabilitata automaticamente. + +### Fase 4 — Analisi struttura + +Rileva la gerarchia del documento (conteggio `#`/`##`/`###`), la lingua +(italiano / inglese / sconosciuta), la lunghezza media delle sezioni e +suggerisce la strategia di chunking ottimale. I risultati sono scritti in +`structure_profile.json`. + +--- + +## Tipi di documento supportati + +| Tipo | Esempi | Note | +|------|--------|------| +| Testo giuridico / accademico | Manuali, dispense, codici | Header numerati `N.` e `N.N` | +| Filosofia / saggistica | Aforismi numerati, capitoli | Pattern `- N. testo` | +| Matematica / LaTeX | Analisi, algebra, fisica | Fix accenti, TOC, numerali romani | +| Testo generico strutturato | Qualsiasi PDF digitale | Paragrafi e header standard | + +**Non supportati**: PDF scansionati (solo immagini), PDF protetti da password. + +--- + +## Log di esecuzione + +Durante l'esecuzione la pipeline stampa le statistiche di ogni trasformazione: + +``` + [3/4] Pulizia strutturale... + ✅ Immagini rimosse: 0 + Accenti corretti: 3701 + Dot-leader rimossi: 53 + Header concat fixati: 0 + TOC rimosso: sì + ALL-CAPS → ##: 14 + Sezioni → ###: 279 + Paragrafi uniti: 12998 + Riduzione testo: 3% +``` + +Se un documento è già stato convertito, la pipeline lo salta automaticamente. +Usa `--force` per rieseguire. diff --git a/conversione/clear.sh b/conversione/clear.sh new file mode 100755 index 0000000..3774610 --- /dev/null +++ b/conversione/clear.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +mapfile -t dirs < <(find . -maxdepth 1 -mindepth 1 -type d | sort) + +if [[ ${#dirs[@]} -eq 0 ]]; then + echo "Nessuna cartella da cancellare." + exit 0 +fi + +echo "Cartelle che verranno cancellate:" +for d in "${dirs[@]}"; do + echo " $d" +done + +if [[ "${1:-}" != "-f" ]]; then + read -r -p "Confermi? [s/N] " answer + [[ "$answer" =~ ^[sS]$ ]] || { echo "Annullato."; exit 0; } +fi + +for d in "${dirs[@]}"; do + rm -rf "$d" + echo "Rimossa: $d" +done + +echo "Pulizia completata." diff --git a/conversione/pipeline.py b/conversione/pipeline.py new file mode 100644 index 0000000..eedf436 --- /dev/null +++ b/conversione/pipeline.py @@ -0,0 +1,1600 @@ +#!/usr/bin/env python3 +""" +conversione/pipeline.py — PDF → clean Markdown (pipeline automatica) + +Converte un PDF grezzo in Markdown strutturato e pulito, pronto per la +suddivisione in chunk. Gestisce validazione, estrazione testo, pulizia +strutturale e rilevamento automatico della struttura del documento. + +Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura corretto, +testo fluente, struttura preservata). + +Output per ciascuno stem: + conversione//raw.md — Markdown grezzo (immutabile) + conversione//clean.md — Markdown pulito e strutturato + conversione//structure_profile.json + +Uso: + python conversione/pipeline.py --stem + python conversione/pipeline.py # tutti i PDF in sources/ + python conversione/pipeline.py --stem --force # forza riesecuzione + +Prerequisiti: + pip install opendataloader-pdf + Java 11+ sul PATH (https://adoptium.net/) +""" + +import argparse +import json +import re +import subprocess +import sys +import tempfile +from collections import Counter +from datetime import datetime +from functools import partial +from pathlib import Path + + +# ─── Verifica dipendenze ────────────────────────────────────────────────────── + +def _check_deps() -> None: + try: + import opendataloader_pdf # noqa: F401 + except ImportError: + print("Errore: opendataloader-pdf non installato.") + print(" pip install opendataloader-pdf") + sys.exit(1) + + try: + result = subprocess.run( + ["java", "-version"], + capture_output=True, text=True, + ) + if result.returncode != 0: + raise FileNotFoundError + except FileNotFoundError: + print("Errore: Java 11+ non trovato sul PATH.") + print(" Installa da https://adoptium.net/") + sys.exit(1) + + +# ─── [1] Validazione PDF ───────────────────────────────────────────────────── + +def check_pdf(pdf_path: Path) -> tuple[bool, str]: + """ + Validazione rapida: esistenza, leggibilità, testo estraibile. + Restituisce (ok, messaggio). + """ + if not pdf_path.exists(): + return False, f"File non trovato: {pdf_path}" + if pdf_path.suffix.lower() != ".pdf": + return False, f"Non è un PDF: {pdf_path.name}" + size = pdf_path.stat().st_size + if size == 0: + return False, "File vuoto" + if size < 1024: + return False, f"File troppo piccolo ({size} byte) — probabilmente corrotto" + + try: + import pdfplumber + with pdfplumber.open(pdf_path) as pdf: + n_pages = len(pdf.pages) + if n_pages == 0: + return False, "PDF senza pagine" + sample = min(5, n_pages) + pages_with_text = sum( + 1 for i in range(sample) + if len((pdf.pages[i].extract_text() or "").strip()) > 50 + ) + if pages_with_text == 0: + # Estende il campione: copertine immagine o pagine bianche iniziali + extended = min(15, n_pages) + if extended > sample: + ext_with_text = sum( + 1 for i in range(sample, extended) + if len((pdf.pages[i].extract_text() or "").strip()) > 50 + ) + if ext_with_text > 0: + return True, ( + f"{n_pages} pagine — prime {sample} vuote, " + f"testo trovato in pagine successive " + f"(possibile copertina immagine)" + ) + return False, ( + f"Nessun testo nelle prime {extended} pagine " + f"— probabilmente scansionato (OCR non supportato)" + ) + return True, f"{n_pages} pagine, testo digitale confermato" + except MemoryError: + return False, "Memoria esaurita durante l'apertura del PDF" + except Exception as e: + msg = str(e).lower() + if "password" in msg or "encrypted" in msg: + return False, "PDF protetto da password" + return False, f"Impossibile aprire: {e}" + + +# ─── [2] Conversione PDF → Markdown ───────────────────────────────────────── + +def convert_pdf(pdf_path: Path, out_dir: Path) -> Path: + """ + Converte il PDF in Markdown tramite opendataloader-pdf. + Scrive il file nella out_dir e restituisce il percorso. + + Parametri scelti per output RAG-ottimale: + - keep_line_breaks=False → testo fluente, no hard-wrap PDF + - reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++) + - sanitize=False → preserva il testo originale (no anonimizzazione PII) + """ + import opendataloader_pdf + + out_dir.mkdir(parents=True, exist_ok=True) + + opendataloader_pdf.convert( + input_path=str(pdf_path), + output_dir=str(out_dir), + format="markdown", + keep_line_breaks=False, + reading_order="xycut", + sanitize=False, + image_output="off", # nessuna immagine estratta né referenziata + quiet=True, # sopprime i log Java + ) + + # Il file output si chiama .md + md_file = out_dir / f"{pdf_path.stem}.md" + if not md_file.exists(): + candidates = list(out_dir.glob("*.md")) + if not candidates: + raise RuntimeError(f"Nessun file .md prodotto in {out_dir}") + md_file = candidates[0] + + content = md_file.read_text(encoding="utf-8", errors="replace").strip() + if len(content) < 100: + raise RuntimeError( + f"opendataloader ha prodotto un file .md quasi vuoto ({len(content)} char) " + f"— il PDF potrebbe essere corrotto o non supportato" + ) + + return md_file + + +# ─── [3] Pulizia strutturale ───────────────────────────────────────────────── + +_TOC_KEYWORDS = frozenset([ + "indice", "index", "contents", "table of contents", + "sommario", "inhaltsverzeichnis", "inhalt", + "indice generale", "indice analitico", "indice dei contenuti", + "elenco dei capitoli", "argomenti", "table des matières", + "tabla de contenidos", "содержание", +]) + +_ORDINALS_IT = { + "PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV", + "QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII", + "NONO": "IX", "DECIMO": "X", +} +_ORDINALS_EN = { + "ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5", + "SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10", +} + + +def _sentence_case(s: str) -> str: + if not s: + return s + lower = s.lower() + return lower[0].upper() + lower[1:] + + +def _is_allcaps_line(line: str) -> bool: + stripped = line.strip() + letters = [c for c in stripped if c.isalpha()] + return ( + len(letters) >= 3 + and all(c.isupper() for c in letters) + and not stripped.startswith("#") + and not stripped.startswith("|") # esclude righe tabella Markdown + ) + + +def _allcaps_to_header(raw_line: str) -> str: + # Rimuovi eventuale prefisso di lista "- " o "* " prima di creare l'header + text = re.sub(r"^[-*+]\s+", "", raw_line.strip()) + text = text.rstrip(".").rstrip("?").strip() + + _ORD_IT_PAT = "|".join(_ORDINALS_IT.keys()) + m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text) + if m: + roman = _ORDINALS_IT[m.group(1)] + titolo = m.group(2).rstrip(".").rstrip("?").strip() + return f"## Capitolo {roman} — {_sentence_case(titolo)}" + + _ORD_EN_PAT = "|".join(_ORDINALS_EN.keys()) + m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text) + if m: + n = _ORDINALS_EN.get(m.group(1), m.group(1)) + titolo = m.group(2).rstrip(".").rstrip("?").strip() + return f"## Chapter {n} — {_sentence_case(titolo)}" + + m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text) + if m: + return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}" + + return f"## {_sentence_case(text)}" + + +def _extract_math_environments(text: str) -> tuple[str, int]: + """ + Converte paragrafi che iniziano con ambienti matematici in header ###. + + 'Teorema 1.6.3 (principio di induzione) Sia A ⊆ N...' + → '### Teorema 1.6.3 (principio di induzione)\n\nSia A ⊆ N...' + + Riconosce: Definizione, Teorema, Lemma, Proposizione, Corollario, + Osservazione, Nota, Esempio (solo con numero di sezione). + Non tocca paragrafi che già iniziano con un header Markdown. + Deve girare PRIMA del merge paragrafi (step 5) per sfruttare i blocchi intatti. + """ + _ENVS = ( + r"Definizione|Definition|Teorema|Theorem|Lemma|" + r"Proposizione|Proposition|Corollario|Corollary|" + r"Osservazione|Remark|Nota|Note|Esempio|Example" + ) + count = 0 + blocks = text.split("\n\n") + result = [] + + for block in blocks: + stripped = block.strip() + if not stripped or stripped.startswith("#"): + result.append(block) + continue + + m = re.match( + rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)", + stripped, + re.DOTALL, + ) + if not m: + result.append(block) + continue + + env = m.group(1) + num = m.group(2).rstrip(".") + rest = m.group(3).strip() + + # Titolo opzionale tra parentesi: "(principio di induzione)" + title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL) + if title_m: + header = f"### {env} {num} {title_m.group(1)}" + body = title_m.group(2).strip() + else: + header = f"### {env} {num}." + body = rest + + result.append(f"{header}\n\n{body}" if body else header) + count += 1 + + return "\n\n".join(result), count + + +def _merge_title_headers(text: str) -> tuple[str, int]: + """ + Fonde header numerici isolati con il sottotitolo breve che li segue. + + '### N.\n\nSottotitolo (riga singola ≤ 80 char, senza punto finale)' + → '### N. Sottotitolo' + + Caso tipico: parti di un'opera (es. Nietzsche) dove il numero di sezione + e il titolo della sezione sono in blocchi Markdown separati. + Non tocca header con titolo già inline né header seguiti da testo lungo. + """ + count = 0 + blocks = re.split(r"\n{2,}", text) + result = [] + i = 0 + while i < len(blocks): + block = blocks[i] + stripped = block.strip() + if ( + re.match(r"^#{2,3} \d+\.\s*$", stripped) + and i + 1 < len(blocks) + ): + nxt = blocks[i + 1].strip() + # Sottotitolo valido: riga singola, ≤ 80 char, non header, non numerazione pura + if ( + nxt + and "\n" not in nxt + and len(nxt) <= 80 + and not nxt.startswith("#") + and not re.match(r"^\d+[\.\)]\s", nxt) + ): + result.append(stripped.rstrip() + " " + nxt) + count += 1 + i += 2 + continue + result.append(block) + i += 1 + return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count + + +def _extract_article_headers(text: str) -> tuple[str, int]: + """ + Converte voci di articolo dal formato lista Markdown al formato header ###. + + '- Art. N[suffix]. Titolo. Corpo testo...' → '### Art. N[suffix]. Titolo.\n\nCorpo testo...' + '- Art. N[suffix]. (…) (1)' → '### Art. N[suffix].\n\n(…) (1)' + + Gestisce suffissi come: Art. 4-bis., Art. 14-ter., Art. 1-quinquies. + Il titolo è la prima frase con iniziale maiuscola che termina con '.' prima di + ulteriore testo (es. "Leggi. La formazione..." → titolo "Leggi", corpo "La formazione..."). + Se il testo non ha titolo separabile, tutto diventa il corpo. + """ + count = 0 + + def _repl(m: re.Match) -> str: + nonlocal count + num = m.group(1) + rest = m.group(2).strip() + + # Titolo: frase con iniziale maiuscola, max 75 char, termina con '.', + # seguita da almeno un'altra frase (minimo 5 char) che inizia con maiuscola + # o con '(' / cifra (note a piè o continuazione corpo). + title_m = re.match( + r"^([A-ZÀÈÉÌÍÒÓÙÚ].{1,74}?)\.\s+([A-ZÀÈÉÌÍÒÓÙÚ\(\d].{4,})", + rest, + ) + if title_m: + count += 1 + return ( + f"### Art. {num}. {title_m.group(1)}.\n\n" + f"{title_m.group(2).strip()}" + ) + + # Nessun titolo separabile: tutto è corpo + if rest: + count += 1 + return f"### Art. {num}.\n\n{rest}" + + # Articolo senza testo inline (es. "- Art. 5. (…) (1)" già estratto sopra, + # oppure articolo vuoto nella lista) + count += 1 + return f"### Art. {num}." + + text = re.sub( + r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)", + _repl, + text, + flags=re.MULTILINE, + ) + return text, count + + +# ─── [3a] Funzioni di trasformazione ───────────────────────────────────────── + +# Mapping PUA Unicode (U+F020-U+F0FF) → simboli corretti per font Symbol/Wingdings. +# Il font Symbol di Windows codifica lettere greche e operatori matematici nel +# range Private Use Area invece dei codepoint Unicode standard. +_SYMBOL_PUA_MAP: dict[str, str] = { + "\uf020": " ", # space + "\uf028": "(", + "\uf029": ")", + "\uf02b": "+", + "\uf02d": "\u2212", # minus + "\uf02e": ".", + "\uf02f": "/", + "\uf030": "0", "\uf031": "1", "\uf032": "2", "\uf033": "3", "\uf034": "4", + "\uf035": "5", "\uf036": "6", "\uf037": "7", "\uf038": "8", "\uf039": "9", + "\uf03a": ":", "\uf03b": ";", "\uf03c": "<", "\uf03d": "=", "\uf03e": ">", + "\uf040": "\u2245", # congruent + "\uf041": "\u0391", # Alpha + "\uf042": "\u0392", # Beta + "\uf043": "\u03a7", # Chi + "\uf044": "\u0394", # Delta + "\uf045": "\u0395", # Epsilon + "\uf046": "\u03a6", # Phi + "\uf047": "\u0393", # Gamma + "\uf048": "\u0397", # Eta + "\uf049": "\u0399", # Iota + "\uf04a": "\u03d1", # theta variant + "\uf04b": "\u039a", # Kappa + "\uf04c": "\u039b", # Lambda + "\uf04d": "\u039c", # Mu + "\uf04e": "\u039d", # Nu + "\uf04f": "\u039f", # Omicron + "\uf050": "\u03a0", # Pi + "\uf051": "\u0398", # Theta + "\uf052": "\u03a1", # Rho + "\uf053": "\u03a3", # Sigma + "\uf054": "\u03a4", # Tau + "\uf055": "\u03a5", # Upsilon + "\uf056": "\u03c2", # sigma final + "\uf057": "\u03a9", # Omega + "\uf058": "\u039e", # Xi + "\uf059": "\u03a8", # Psi + "\uf05a": "\u0396", # Zeta + "\uf05b": "[", + "\uf05c": "\u2234", # therefore + "\uf05d": "]", + "\uf05e": "\u22a5", # perpendicular + "\uf061": "\u03b1", # alpha + "\uf062": "\u03b2", # beta + "\uf063": "\u03c7", # chi + "\uf064": "\u03b4", # delta + "\uf065": "\u03b5", # epsilon + "\uf066": "\u03c6", # phi + "\uf067": "\u03b3", # gamma + "\uf068": "\u03b7", # eta + "\uf069": "\u03b9", # iota + "\uf06a": "\u03d5", # phi variant + "\uf06b": "\u03ba", # kappa + "\uf06c": "\u03bb", # lambda + "\uf06d": "\u03bc", # mu + "\uf06e": "\u03bd", # nu + "\uf06f": "\u03bf", # omicron + "\uf070": "\u03c0", # pi + "\uf071": "\u03b8", # theta + "\uf072": "\u03c1", # rho + "\uf073": "\u03c3", # sigma + "\uf074": "\u03c4", # tau + "\uf075": "\u03c5", # upsilon + "\uf076": "\u03d6", # pi symbol + "\uf077": "\u03c9", # omega + "\uf078": "\u03be", # xi + "\uf079": "\u03c8", # psi + "\uf07a": "\u03b6", # zeta + "\uf07b": "{", + "\uf07c": "|", + "\uf07d": "}", + "\uf07e": "~", + "\uf0b1": "\u00b1", # plus-minus + "\uf0b7": "\u2022", # bullet + "\uf0ba": "\u221a", # square root + "\uf0bc": "\u2264", # less or equal + "\uf0bd": "\u2265", # greater or equal + "\uf0be": "\u221d", # proportional + "\uf0d7": "\u00d7", # multiplication + "\uf0f7": "\u00f7", # division + "\uf0b4": "\u00d7", # alternate multiply + "\uf0bb": "\u2260", # not equal + "\uf0b9": "\u2260", # not equal alternate + "\uf0b3": "\u2265", # greater or equal alternate + "\uf0b2": "\u2032", # prime + "\uf02a": "*", + "\uf02c": ",", + "\uf0a3": "\u2264", # less or equal (Symbol 0xA3) + "\uf0a7": "\u2022", # bullet (Wingdings 0xA7) + "\uf0a8": "\u2022", # bullet variant + "\uf0ae": "\u2192", # right arrow (Symbol 0xAE) + "\uf0b8": "\u00f7", # division / range separator + "\uf0eb": "", # Wingdings decorative icon (rimosso) + "\uf0f0": "\u2192", # right arrow variant + "\uf0db": "", # bracket extension piece (non ricostruibile) + "\uf0dc": "", # bracket extension piece + "\uf0dd": "", # bracket extension piece + "\uf0de": "", # brace middle piece (non ricostruibile) + "\uf0df": "", # brace extension piece +} + +_SYMBOL_PUA_RE = re.compile( + "[" + "".join(re.escape(k) for k in _SYMBOL_PUA_MAP) + "]" +) + + +def _t_fix_symbol_font(text: str) -> tuple[str, int]: + """Rimappa caratteri PUA font Symbol (U+F020-U+F0FF) in simboli Unicode corretti.""" + count = [0] + + def _repl(m: re.Match) -> str: + count[0] += 1 + return _SYMBOL_PUA_MAP[m.group(0)] + + result = _SYMBOL_PUA_RE.sub(_repl, text) + return result, count[0] + + +def _t_remove_images(text: str) -> tuple[str, int]: + n = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text)) + text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text) + return text, n + + +# Superscript Unicode: ¹²³⁴⁵⁶⁷⁸⁹⁰ +_SUPERSCRIPT_RE = re.compile(r'[\u00b9\u00b2\u00b3\u2070\u2074-\u2079]+') +# Riga corpo-nota: inizia con superscript o [N] +_FOOTNOTE_BODY_RE = re.compile( + r'^([\u00b9\u00b2\u00b3\u2070\u2074-\u2079]+\s+|\[\d{1,3}\]\s+)' +) + + +def _t_remove_footnotes(text: str) -> tuple[str, int]: + """Rimuovi marcatori footnote superscript inline e righe corpo-nota.""" + lines = text.split("\n") + result, count = [], 0 + for line in lines: + stripped = line.strip() + # Corpo nota: riga breve che inizia con ¹ o [N] + if stripped and _FOOTNOTE_BODY_RE.match(stripped) and len(stripped) < 300: + count += 1 + continue + cleaned = _SUPERSCRIPT_RE.sub("", line) + if cleaned != line: + count += 1 + result.append(cleaned) + return "\n".join(result), count + + +def _t_fix_br(text: str) -> tuple[str, int]: + n = len(re.findall(r"
", text, re.IGNORECASE)) + text = re.sub(r"
\s*", " ", text, flags=re.IGNORECASE) + return text, n + + +def _t_fix_tabsep(text: str) -> tuple[str, int]: + _pat = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$") + n = len(_pat.findall(text)) + text = _pat.sub("", text) + return text, n + + +def _t_fix_accents(text: str) -> tuple[str, int]: + """Fix artefatti backtick da PDF LaTeX: `e→è, e`→è, sar`a→sarà, ecc.""" + _ACCENT_MAP = { + "e": "è", "E": "È", "a": "à", "A": "À", + "u": "ù", "U": "Ù", "i": "ì", "I": "Ì", "o": "ò", "O": "Ò", + } + n_bt_before = text.count("`") + text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text) + text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text) + n_accenti = n_bt_before - text.count("`") + # Backtick orfani: artefatti LaTeX rimasti dopo la correzione vocale + n_bt_orfani = text.count("`") + if n_bt_orfani: + text = re.sub(r"`", "", text) + n_accenti += n_bt_orfani + return text, n_accenti + + +def _t_fix_multiplication(text: str) -> tuple[str, int]: + """Fix segno di moltiplicazione "→× (encoding font PDF non-standard).""" + n = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text)) + text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text) + return text, n + + +def _t_fix_micro(text: str) -> tuple[str, int]: + """Fix prefisso micro !→µ prima di unità SI note.""" + _SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]' + n = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text)) + text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text) + return text, n + + +def _t_remove_formula_labels(text: str) -> tuple[str, int]: + """Rimuovi label formule inline [N.M] — es. [3.4], [10.7].""" + n = len(re.findall(r"\[\d+\.\d+\]", text)) + text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text) + return text, n + + +def _t_remove_dotleaders(text: str) -> tuple[str, int]: + """Rimuovi righe con dot-leader e numerali romani isolati (footer TOC).""" + _DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$" + n = len(re.findall(_DOTLEADER_RE, text, re.MULTILINE)) + text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE) + text = re.sub( + r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$", + "", + text, + flags=re.IGNORECASE, + ) + return text, n + + +def _t_fix_header_concat(text: str) -> tuple[str, int]: + """Fix header + body concatenati senza separatore.""" + count = 0 + + def _fix(m: re.Match) -> str: + nonlocal count + hashes = m.group(1) + full = m.group(2).strip() + if len(full) < 60: + return m.group(0) + skip = min(10, len(full) // 3) + split = re.search(r"(?<=[a-zàèéìíòóùúä])(?=[A-ZÀÈÉÌÍÒÓÙÚ])", full[skip:]) + if split: + pos = skip + split.start() + title = full[:pos].strip() + body = full[pos:].strip() + if len(title) >= 5 and len(body) >= 15: + count += 1 + return f"{hashes} {title}\n\n{body}" + return m.group(0) + + text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix, text, flags=re.MULTILINE) + return text, count + + +def _t_extract_capitolo(text: str) -> tuple[str, int]: + """Estrai 'Capitolo N: TITOLO' inline nel corpo del testo → ## header.""" + def _repl(m: re.Match) -> str: + num = m.group(1) + titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip()) + return f"\n\n## Capitolo {num}: {titolo}\n\n" + + text = re.sub( + r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-ZÀÈÉÌÍÒÓÙÚ\'L][A-ZÀÈÉÌÍÒÓÙÚ\s\'\.,\(\)]{5,80}?)" + r"(?=\s*[-–]\s*\d|\s*\n|\s*$)", + _repl, + text, + ) + return text, 0 + + +_NUMBERED_HDR_RE = re.compile( + r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$", + re.MULTILINE, +) + + +def _t_normalize_numbered_headings(text: str) -> tuple[str, int]: + """Corregge livelli header per documenti con numerazione decimale. + + Assegna livello heading in base alla profondità numerica usando come base + il livello corrente degli header di profondità minima. + Attivo solo se il documento ha almeno 2 profondità di numerazione. + """ + all_matches = list(_NUMBERED_HDR_RE.finditer(text)) + if not all_matches: + return text, 0 + + pairs = [ + (m.group(2).count(".") + 1, len(m.group(1))) + for m in all_matches + ] + depths = [d for d, _ in pairs] + min_depth, max_depth = min(depths), max(depths) + if max_depth == min_depth: + return text, 0 + + base_level = min(lv for d, lv in pairs if d == min_depth) + count = 0 + + def _repl(m: re.Match) -> str: + nonlocal count + hashes, num, title = m.group(1), m.group(2), m.group(3) + depth = num.count(".") + 1 + new_level = min(base_level + (depth - min_depth), 6) + if new_level == len(hashes): + return m.group(0) + count += 1 + return f"{'#' * new_level} {num}. {title}" + + return _NUMBERED_HDR_RE.sub(_repl, text), count + + +def _t_normalize_header_levels(text: str) -> tuple[str, int]: + """Normalizza h4+ → h3; rimuove header vuoti; rimuove numero pagina '| N' finale.""" + text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE) + text = re.sub( + r"^(#{3,6})\s+(\d{1,3})\s+(.+)$", + lambda m: f"### {m.group(2)}. {m.group(3)}", + text, + flags=re.MULTILINE, + ) + text = re.sub(r"^#{4,6}\s+(.+)$", r"### \1", text, flags=re.MULTILINE) + return text, 0 + + +def _t_extract_articles(text: str) -> tuple[str, int]: + """Converti voci articolo '- Art. N.' → '### Art. N.'""" + return _extract_article_headers(text) + + +def _t_remove_header_bold(text: str) -> tuple[str, int]: + """Rimuovi **bold** negli header esistenti.""" + text = re.sub( + r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$", + r"\1 \2", + text, flags=re.MULTILINE, + ) + return text, 0 + + +def _t_normalize_allcaps_headers(text: str) -> tuple[str, int]: + """Normalizza header ALL-CAPS → sentence-case.""" + def _norm(m: re.Match) -> str: + hashes, content = m.group(1), m.group(2).strip() + letters = [c for c in content if c.isalpha()] + if letters and all(c.isupper() for c in letters): + return f"{hashes} {_sentence_case(content)}" + return m.group(0) + + text = re.sub(r"^(#{1,6}) (.+)$", _norm, text, flags=re.MULTILINE) + return text, 0 + + +def _t_remove_toc(text: str) -> tuple[str, int]: + """Rimuovi header TOC e voci lista numerate che seguono.""" + lines = text.split("\n") + new_lines = [] + _in_toc = False + removed = False + for line in lines: + bare = re.sub(r"^#+\s*", "", line.strip()) + first_word = bare.split(".")[0].strip().lower() + if first_word in _TOC_KEYWORDS: + removed = True + _in_toc = True + continue + if _in_toc: + if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line): + continue + # Voce TOC con numero pagina finale (sicuro: siamo gia in contesto TOC) + if re.match(r"^\s*[-*+]\s+.{2,70}\s+\d{1,3}\s*$", line): + continue + # Riga di testo lungo = probabilmente abstract o corpo, non voce di indice + if len(line.strip()) > 200: + _in_toc = False + new_lines.append(line) + continue + _in_toc = False + new_lines.append(line) + return "\n".join(new_lines), 1 if removed else 0 + + + +def _t_allcaps_to_headers(text: str) -> tuple[str, int]: + """Converti righe ALL-CAPS standalone → ## header.""" + count = 0 + blocks = text.split("\n\n") + new_blocks = [] + for block in blocks: + stripped = block.strip() + if "\n" not in stripped and _is_allcaps_line(stripped): + new_blocks.append(_allcaps_to_header(stripped)) + count += 1 + else: + sub_lines = block.split("\n") + converted = [] + for ln in sub_lines: + if _is_allcaps_line(ln) and len(ln.strip()) > 3: + converted.append(_allcaps_to_header(ln)) + count += 1 + else: + converted.append(ln) + new_blocks.append("\n".join(converted)) + return "\n\n".join(new_blocks), count + + +_BIB_MARKERS_RE = re.compile( + r'\b(pp?\.|vol\.|n\.\s*\d|ed\.|edn\.|ISBN|DOI|arXiv)\b' + r'|\b(19|20)\d{2}\b', + re.IGNORECASE, +) + + +def _t_numbered_sections(text: str, has_exercises: bool = False) -> tuple[str, int]: + """Converti sezioni numerate 'N. testo' / '- N. testo' / '- N testo' → ### header.""" + count = 0 + + def _num_repl(m: re.Match) -> str: + nonlocal count + content = m.group(2).strip() + if content.endswith(".") and len(content) > 40: + return m.group(0) + if _BIB_MARKERS_RE.search(content): + return m.group(0) + count += 1 + return f"### {m.group(1)}.\n\n{content}" + + text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE) + + def _num_letter_repl(m: re.Match) -> str: + nonlocal count + count += 1 + return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}" + + text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE) + + # Disabilitato se il documento contiene sezioni "Esercizi": in quel caso i + # "- N. testo" sono numerazioni di esercizi, non header di sezione. + if not has_exercises: + def _aphorism_repl(m: re.Match) -> str: + nonlocal count + content = m.group(2).strip() + if _BIB_MARKERS_RE.search(content): + return m.group(0) + count += 1 + return f"\n\n### {m.group(1)}.\n\n{content}" + + text = re.sub( + r"^-\s+(\d{1,3})\.\s+(.{10,})$", + _aphorism_repl, + text, + flags=re.MULTILINE, + ) + + def _list_section_repl(m: re.Match) -> str: + nonlocal count + num = m.group(1) + content = m.group(2).strip() + if _BIB_MARKERS_RE.search(content): + return m.group(0) + count += 1 + split = re.search(r"(?<=[a-zàèéìíòóùú])\s+(?=[A-ZÀÈÉÌÍÒÓÙÚ])", content) + if split and split.start() >= 3: + title = content[: split.start()].strip() + body = content[split.end():].strip() + if len(body) >= 20: + return f"\n\n### {num}. {title}\n\n{body}" + return f"\n\n### {num}. {content}" + + text = re.sub( + r"^-\s+(\d{1,3})\s+([A-ZÀÈÉÌÍÒÓÙÚ\'L].{10,})$", + _list_section_repl, + text, + flags=re.MULTILINE, + ) + return text, count + + +def _t_extract_math(text: str) -> tuple[str, int]: + """Converti ambienti matematici (Teorema/Definizione/...) → ### header.""" + return _extract_math_environments(text) + + +def _t_merge_paragraphs(text: str) -> tuple[str, int]: + """Unisci paragrafi spezzati da salti pagina PDF.""" + _SENTENCE_END = set(".?!»)\"'") + blocks = text.split("\n\n") + merged = [] + count = 0 + i = 0 + while i < len(blocks): + b = blocks[i] + stripped = b.strip() + while ( + i + 1 < len(blocks) + and stripped + and not stripped.startswith("#") + and not stripped.startswith("|") # non unire righe tabella in avanti + and stripped[-1] not in _SENTENCE_END + ): + nxt = blocks[i + 1].strip() + if not nxt or nxt.startswith("#") or nxt.startswith("|") or re.match(r"^\d+\.", nxt) or re.match(r"^[-*+]\s", nxt): + break + b = stripped + " " + nxt + stripped = b.strip() + count += 1 + i += 1 + merged.append(b) + i += 1 + text = "\n\n".join(merged) + # Secondo pass: rimuovi prefisso |---| eventualmente rimasto dopo il merge + text = re.sub(r"(?m)^\|---\|\s*", "", text) + return text, count + + +def _t_normalize_whitespace(text: str) -> tuple[str, int]: + """Normalizza whitespace multiplo interno alle righe.""" + lines = text.split("\n") + text = "\n".join( + re.sub(r" +", " ", line) if line.strip() else line + for line in lines + ) + return text, 0 + + +def _t_collapse_blank_lines(text: str) -> tuple[str, int]: + """Riduci righe vuote multiple a doppie.""" + return re.sub(r"\n{3,}", "\n\n", text), 0 + + +def _t_demote_verse_headers(text: str) -> tuple[str, int]: + """Demoti header che sono in realtà terzine/versi. + + opendataloader promuove a ## le iscrizioni e i testi in evidenza nel PDF + (corpo maggiore, centrato). Si riconoscono perché: + - terminano con un numero nudo (numero di verso: 3, 6, 9, …) + - contengono punteggiatura interna di fine verso (', ' o '. ') + Esempio: '## «per me si va ne la città dolente, ... gente. 3' + → paragrafo normale senza il numero finale. + """ + count = 0 + + def _demote(m: re.Match) -> str: + nonlocal count + hashes, content = m.group(1), m.group(2).strip() + # Deve terminare con numero nudo (numero di verso ≤ 9999) + if not re.search(r"\s\d{1,4}\s*$", content): + return m.group(0) + # Deve contenere punteggiatura interna (è un blocco di più versi) + inner = re.sub(r"\s\d{1,4}\s*$", "", content) + if not re.search(r"[,;:.!?»\"\']\s+[A-Za-zÀ-ÿ«\"]", inner): + return m.group(0) + count += 1 + # Rimuovi il numero di verso finale e restituisci come testo normale + clean = re.sub(r"\s\d{1,4}\s*$", "", content) + return clean + + text = re.sub( + r"^(#{1,6})\s+(.{20,})$", + _demote, + text, + flags=re.MULTILINE, + ) + return text, count + + +def _t_restore_poetry_lines(text: str) -> tuple[str, int]: + """Ripristina line break di poesia distrutti da keep_line_breaks=False. + + Quando il PDF è poesia (terzine dantesche, sonetti, ecc.) opendataloader + con keep_line_breaks=False produce un unico paragrafo con i numeri di verso + (3, 6, 9 … oppure 1, 2, 3 …) incorporati inline: + 'smarrita. 3 Ahi quanto a dir qual era è cosa dura … paura! 6 Tant'è …' + + Il transform rileva blocchi con numeri di verso in progressione aritmetica + e li separa in righe, con riga vuota ogni 3 versi (terzina). + """ + count = 0 + blocks = text.split("\n\n") + result = [] + + # Pattern: numero isolato preceduto da punteggiatura-fine-verso e seguito + # da lettera maiuscola (inizio verso successivo). + _VERSE_NUM_RE = re.compile( + r'([.!?»\'\"]\s+)(\d+)(\s+)(?=[A-ZÀ-Ùa-zà-ù«"‟])' + ) + + for block in blocks: + stripped = block.strip() + if not stripped or stripped.startswith("#"): + result.append(block) + continue + + matches = list(_VERSE_NUM_RE.finditer(stripped)) + if len(matches) < 2: + result.append(block) + continue + + nums = [int(m.group(2)) for m in matches] + diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)] + # Accetta progressioni con passo costante 1–5 (terzine: 3, endecasillabi: 1) + if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5): + result.append(block) + continue + + step = diffs[0] + + def _replace_verse_num(m: re.Match) -> str: + n = int(m.group(2)) + # Ogni 'step' versi → riga vuota (inizio nuova terzina/strofa) + sep = "\n\n" if n % (step * 3) == 0 else "\n" + return m.group(1).rstrip() + sep + + new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped) + if new_block != stripped: + count += len(matches) + result.append(new_block) + + return "\n\n".join(result), count + + +def _t_remove_urls(text: str) -> tuple[str, int]: + """Rimuovi righe che sono solo URL (watermark, footer di piattaforme).""" + return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0 + + +def _t_remove_empty_headers(text: str) -> tuple[str, int]: + """Rimuovi header senza corpo (sezioni vuote / watermark).""" + blocks = re.split(r"\n{2,}", text) + cleaned = [] + for i, block in enumerate(blocks): + stripped = block.strip() + if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped: + next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else "" + # Non rimuovere un header breve se il successivo è un header molto lungo + # (> 80 char): quasi certamente è testo PDF mal classificato come heading. + next_is_long_header = ( + re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80 + ) + if not next_stripped or ( + re.match(r"^#{1,6} ", next_stripped) and not next_is_long_header + ): + continue + cleaned.append(block) + return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0 + + +def _t_merge_title_headers(text: str) -> tuple[str, int]: + """Fondi header numerici isolati con il sottotitolo breve successivo.""" + return _merge_title_headers(text) + + +def _t_remove_garbage_headers(text: str) -> tuple[str, int]: + """Rimuovi garbage headers: simboli, abbreviazioni matematiche, frammenti formula.""" + def _is_garbage_header(content: str) -> bool: + if content.lstrip().startswith("..."): + return True + if not re.search(r"[A-Za-zÀ-ÿ\u0391-\u03c9]{2,}", content): + return True + if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()): + return True + if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content): + return True + # Frammento di frase: inizia con minuscola ed e abbastanza lungo + first_alpha = next((c for c in content if c.isalpha()), None) + if first_alpha and first_alpha.islower() and len(content) > 40: + return True + # Formula matematica: variabile singola (o breve) seguita da = o operatore + if re.match(r"^[A-Za-z\u0391-\u03c9_]{1,3}\s*[=<>≤≥]", content.strip()): + return True + # Didascalia figura/tabella: "Figura N..." o "Figure N..." o "Tabella N..." + if re.match(r"^(Figura|Figure|Fig\.|Tabella|Table|Tab\.)\s+\d", content.strip(), re.IGNORECASE): + return True + return False + + count = 0 + lines = text.split("\n") + new_lines = [] + for line in lines: + m = re.match(r"^#{1,6} (.+)$", line) + if m and _is_garbage_header(m.group(1)): + count += 1 + continue + new_lines.append(line) + text = "\n".join(new_lines) + text = re.sub(r"\n{3,}", "\n\n", text) + return text, count + + +def _t_remove_frontmatter(text: str) -> tuple[str, int]: + """Rimuovi sezioni frontmatter: URL, email, affiliazione, copyright.""" + _FM_RE = re.compile( + r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|" + r"\bCopyright\b|\bLicenza\b|\bEdizione\b|" + r"protetto da|tutti i diritti", + re.IGNORECASE, + ) + blocks = re.split(r"\n{2,}", text) + cleaned = [] + count = 0 + total = len(blocks) + cutoff = max(5, min(15, int(total * 0.20))) + for i, block in enumerate(blocks): + stripped = block.strip() + # Frontmatter compare solo nelle prime sezioni del documento + if i >= cutoff: + cleaned.append(block) + continue + if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped): + cleaned.append(block) + continue + body = blocks[i + 1].strip() if i + 1 < len(blocks) else "" + is_fm_body = len(body) < 250 and _FM_RE.search(body) + is_fm_hdr = _FM_RE.search(stripped) + if is_fm_body or is_fm_hdr: + count += 1 + continue + cleaned.append(block) + return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count + + +_WATERMARK_RE = re.compile( + r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN" + r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$", + re.IGNORECASE | re.MULTILINE, +) + + +def _t_remove_watermarks(text: str) -> tuple[str, int]: + """Rimuovi righe standalone con testo watermark comune.""" + lines = text.split("\n") + result, count = [], 0 + for line in lines: + if _WATERMARK_RE.match(line): + count += 1 + else: + result.append(line) + return "\n".join(result), count + + +def _t_fix_math_symbols(text: str) -> tuple[str, int]: + """Rimuovi righe composte solo da simboli box/placeholder (font non estratti).""" + lines = text.split("\n") + result, count = [], 0 + for line in lines: + if line.strip() and re.match(r"^[\s□■▪▫◆◇●○•\u25a0-\u25ff]+$", line): + count += 1 + else: + result.append(line) + return "\n".join(result), count + + +def _t_remove_recurring_lines(text: str) -> tuple[str, int]: + """Rimuovi righe corte che si ripetono ≥5 volte (header/footer di pagina).""" + lines = text.split("\n") + short_lines = [ + ln.strip() for ln in lines + if 3 < len(ln.strip()) < 80 + and not ln.strip().startswith("#") + and not ln.strip().startswith("|") + ] + freq = Counter(short_lines) + recurring = {ln for ln, c in freq.items() if c >= 5} + if not recurring: + return text, 0 + result, count = [], 0 + for line in lines: + if line.strip() in recurring: + count += 1 + else: + result.append(line) + return "\n".join(result), count + + +# ─── [3b] Pipeline delle trasformazioni ────────────────────────────────────── + +def apply_transforms(text: str) -> tuple[str, dict]: + """ + Applica le trasformazioni strutturali al Markdown grezzo. + Restituisce (testo_modificato, statistiche). + """ + # Flag calcolato prima del loop: disabilita il transform 4b nei documenti + # con sezioni "Esercizi" (i "- N. testo" sarebbero numerazioni, non header). + _has_ex = bool(re.search(r"\b(Esercizi|Exercises|Problems|Homework)\b", text, re.IGNORECASE)) + + _transforms: list[tuple[str | None, object]] = [ + ("n_simboli_pua_corretti", _t_fix_symbol_font), + ("n_immagini_rimosse", _t_remove_images), + ("n_br_rimossi", _t_fix_br), + ("n_tabsep_rimossi", _t_fix_tabsep), + ("n_note_rimosse", _t_remove_footnotes), + ("n_accenti_corretti", _t_fix_accents), + ("n_moltiplicazioni_corrette", _t_fix_multiplication), + ("n_micro_corretti", _t_fix_micro), + ("n_simboli_math_rimossi", _t_fix_math_symbols), + ("n_formule_rimossi", _t_remove_formula_labels), + ("n_dotleader_rimossi", _t_remove_dotleaders), + ("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines), + ("n_header_concat_fixati", _t_fix_header_concat), + (None, _t_extract_capitolo), + ("n_header_numerati_normalizzati", _t_normalize_numbered_headings), + (None, _t_normalize_header_levels), + ("n_articoli_estratti", _t_extract_articles), + (None, _t_remove_header_bold), + (None, _t_normalize_allcaps_headers), + ("toc_rimosso", _t_remove_toc), + ("n_header_allcaps", _t_allcaps_to_headers), + ("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)), + ("n_ambienti_matematici", _t_extract_math), + ("n_paragrafi_uniti", _t_merge_paragraphs), + (None, _t_normalize_whitespace), + (None, _t_collapse_blank_lines), + ("n_versi_ripristinati", _t_restore_poetry_lines), + ("n_header_verso_demotati", _t_demote_verse_headers), + (None, _t_remove_urls), + (None, _t_remove_empty_headers), + ("n_titoli_uniti", _t_merge_title_headers), + (None, lambda t: (re.sub(r"(?m)^(#{1,6}.+?)\s*\|\s*\d{1,3}\s*$", r"\1", t), 0)), + ("n_garbage_headers_rimossi", _t_remove_garbage_headers), + ("n_frontmatter_rimossi", _t_remove_frontmatter), + ("n_watermark_rimossi", _t_remove_watermarks), + ] + + stats: dict = {} + for stat_key, fn in _transforms: + text, n = fn(text) + if stat_key: + stats[stat_key] = stats.get(stat_key, 0) + n + + stats["toc_rimosso"] = bool(stats.get("toc_rimosso", 0)) + return text, stats + + +# ─── [4] Rilevamento struttura ─────────────────────────────────────────────── + +_IT_WORDS = frozenset([ + "il", "la", "di", "e", "che", "non", "per", "un", "una", "si", + "con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le", + "gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era", + "erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così", +]) +_EN_WORDS = frozenset([ + "the", "of", "and", "to", "in", "is", "that", "it", "was", "for", + "on", "are", "as", "with", "his", "they", "at", "be", "this", "have", + "from", "or", "an", "but", "not", "by", "he", "she", "we", "you", + "which", "their", "been", "has", "would", "there", "when", "will", +]) +_FR_WORDS = frozenset([ + "le", "les", "de", "du", "des", "et", "un", "une", "est", "que", + "pour", "dans", "sur", "avec", "qui", "par", "pas", "plus", "au", + "ce", "se", "ou", "mais", "comme", "aussi", +]) +_DE_WORDS = frozenset([ + "der", "die", "das", "und", "in", "von", "zu", "den", "mit", "ist", + "auf", "eine", "als", "dem", "des", "sich", "nicht", "auch", "werden", + "bei", "nach", "oder", "wenn", "wird", "war", +]) +_ES_WORDS = frozenset([ + "el", "los", "las", "de", "en", "un", "una", "es", "que", "por", + "con", "del", "para", "como", "pero", "sus", "son", "los", "hay", + "todo", "esta", "este", "ser", "más", "ya", +]) + + +def _detect_language(text: str) -> str: + words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower()) + sample = words[:2000] + scores = { + "it": sum(1 for w in sample if w in _IT_WORDS), + "en": sum(1 for w in sample if w in _EN_WORDS), + "fr": sum(1 for w in sample if w in _FR_WORDS), + "de": sum(1 for w in sample if w in _DE_WORDS), + "es": sum(1 for w in sample if w in _ES_WORDS), + } + best = max(scores, key=scores.get) + return best if scores[best] > 0 else "unknown" + + +def _count_headers(text: str, level: int) -> int: + prefix = "#" * level + " " + return len(re.findall(rf"(?m)^{re.escape(prefix)}", text)) + + +def _count_paragraphs(text: str) -> int: + blocks = re.split(r"\n{2,}", text) + return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip())) + + +def _split_sections(text: str, level: int) -> list[str]: + prefix = "#" * level + " " + parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text) + return [p for p in parts[1:] if p.strip()] + + +def _parse_sections_with_body(text: str, level: int = 3) -> list[tuple[str, str]]: + """Restituisce lista di (header_line, body_text) per tutti gli header al livello dato.""" + prefix = "#" * level + " " + lines = text.split("\n") + sections: list[tuple[str, str]] = [] + cur_hdr: str | None = None + cur_body: list[str] = [] + for line in lines: + if line.startswith(prefix): + if cur_hdr is not None: + sections.append((cur_hdr, "\n".join(cur_body).strip())) + cur_hdr = line + cur_body = [] + elif cur_hdr is not None: + cur_body.append(line) + if cur_hdr is not None: + sections.append((cur_hdr, "\n".join(cur_body).strip())) + return sections + + +def analyze(md_path: Path) -> dict: + text = md_path.read_text(encoding="utf-8") + n_h1 = _count_headers(text, 1) + n_h2 = _count_headers(text, 2) + n_h3 = _count_headers(text, 3) + n_paragrafi = _count_paragraphs(text) + + if n_h3 >= 5: + livello, boundary, strategia = 3, "h3", "h3_aware" + section_bodies = _split_sections(text, 3) + # Gerarchia invertita: h3 sono capitoli enormi, h2 sono sottosezioni più brevi. + # Succede quando opendataloader classifica titoli capitolo come h6 (→ normalizzati + # a h3) e le sottosezioni ALL-CAPS diventano ## (h2). In questo caso h2 è + # il boundary corretto per il chunking. + if n_h2 >= 3: + h2_bodies = _split_sections(text, 2) + avg_h3 = sum(len(b) for b in section_bodies) / len(section_bodies) if section_bodies else 0 + avg_h2 = sum(len(b) for b in h2_bodies) / len(h2_bodies) if h2_bodies else 0 + if avg_h3 > 5000 and avg_h2 < avg_h3 * 0.7: + livello, boundary, strategia = 2, "h2", "h2_paragraph_split" + section_bodies = h2_bodies + elif n_h2 >= 3: + livello, boundary, strategia = 2, "h2", "h2_paragraph_split" + section_bodies = _split_sections(text, 2) + elif n_h1 + n_h2 + n_h3 >= 1: + livello, boundary, strategia = 1, "paragrafo", "paragraph" + section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()] + elif n_paragrafi >= 3: + livello, boundary, strategia = 1, "paragrafo", "paragraph" + section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()] + else: + livello, boundary, strategia = 0, "nessuno", "sliding_window" + section_bodies = [text] if text.strip() else [] + + lengths = [len(b) for b in section_bodies if b.strip()] + lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0 + lingua = _detect_language(text) + + avvertenze = [] + short = sum(1 for l in lengths if l < 200) + long_ = sum(1 for l in lengths if l > 800) + if short: + avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate") + if long_: + avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise") + + return { + "livello_struttura": livello, + "n_h1": n_h1, + "n_h2": n_h2, + "n_h3": n_h3, + "n_paragrafi": n_paragrafi, + "boundary_primario": boundary, + "lingua_rilevata": lingua, + "lunghezza_media_sezione": lunghezza_media, + "strategia_chunking": strategia, + "avvertenze": avvertenze, + } + + +# ─── Report di conversione ─────────────────────────────────────────────────── + +def build_report( + stem: str, + out_dir: Path, + clean_text: str, + t_stats: dict, + profile: dict, + reduction: float, +) -> Path: + """ + Genera conversione//report.json con tutte le metriche di qualità: + statistiche trasformazioni, struttura, distribuzione lunghezze, anomalie + e problemi residui. Leggibile da validate.py per la validazione batch. + """ + text_lines = clean_text.split("\n") + + # ── Raccolta sezioni ### con corpo ──────────────────────────────────── + sections = _parse_sections_with_body(clean_text, 3) + lengths = [len(body) for _, body in sections] + + # ── Distribuzione lunghezze ─────────────────────────────────────────── + def _pct(data: list[int], p: float) -> int: + if not data: + return 0 + s = sorted(data) + return s[max(0, min(len(s) - 1, int(len(s) * p)))] + + distribution = { + "min": min(lengths) if lengths else 0, + "p25": _pct(lengths, 0.25), + "mediana": _pct(lengths, 0.50), + "p75": _pct(lengths, 0.75), + "max": max(lengths) if lengths else 0, + } + + # ── Anomalie ────────────────────────────────────────────────────────── + bare_hdrs = [ + {"header": hdr, "corpo_inizio": body[:120].replace("\n", " ")} + for hdr, body in sections + if re.match(r"^### \d+\.\s*$", hdr) and len(body.strip()) < 30 + ] + + short_secs = [ + {"header": hdr, "chars": length, "testo": body[:80].replace("\n", " ")} + for (hdr, body), length in zip(sections, lengths) + if 0 < length < 150 + ] + + long_secs = [ + {"header": hdr, "chars": length} + for (hdr, _), length in zip(sections, lengths) + if length > 1500 + ] + + # ── Problemi residui (max 10 esempi ciascuno) ───────────────────────── + def _scan(pattern: str, max_n: int = 10) -> list[dict]: + hits = [] + for i, line in enumerate(text_lines): + if re.search(pattern, line) and not re.match(r"^#+ ", line): + hits.append({"riga": i + 1, "testo": line.strip()[:120]}) + if len(hits) >= max_n: + break + return hits + + residui = { + "backtick": _scan(r"`"), + "dotleader": _scan(r"(?:\. ){3,}"), + "url": _scan(r"^(https?://|www\.)\S+"), + "immagini": _scan(r"!\[[^\]]*\]\([^)]*\)"), + "br_inline": _scan(r"
"), + "simboli_encoding": _scan(r'(?<=[0-9A-Za-z])[!"](?=[0-9A-Za-z])'), + "formule_inline": _scan(r"\[\d+\.\d+\]"), + "footnote_markers": _scan(r'[\u00b9\u00b2\u00b3\u2070\u2074-\u2079]'), + "pua_markers": _scan(r'[\ue000-\uf8ff]'), + } + + # ── Composizione report ─────────────────────────────────────────────── + report = { + "stem": stem, + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"), + "transforms": { + **t_stats, + "riduzione_pct": round(reduction), + }, + "structure": profile, + "distribution": distribution, + "anomalie": { + "bare_headers": len(bare_hdrs), + "short_sections": len(short_secs), + "long_sections": len(long_secs), + "bare_headers_list": bare_hdrs, + "short_sections_list": short_secs, + "long_sections_list": long_secs, + }, + "residui": { + "backtick": len(residui["backtick"]), + "dotleader": len(residui["dotleader"]), + "url": len(residui["url"]), + "immagini": len(residui["immagini"]), + "br_inline": len(residui["br_inline"]), + "simboli_encoding": len(residui["simboli_encoding"]), + "formule_inline": len(residui["formule_inline"]), + "footnote_markers": len(residui["footnote_markers"]), + "pua_markers": len(residui["pua_markers"]), + "backtick_esempi": residui["backtick"], + "dotleader_esempi": residui["dotleader"], + "url_esempi": residui["url"], + "immagini_esempi": residui["immagini"], + "br_inline_esempi": residui["br_inline"], + "simboli_encoding_esempi": residui["simboli_encoding"], + "formule_inline_esempi": residui["formule_inline"], + "footnote_markers_esempi": residui["footnote_markers"], + "pua_markers_esempi": residui["pua_markers"], + }, + } + + report_path = out_dir / "report.json" + report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") + return report_path + + +# ─── Pipeline principale ────────────────────────────────────────────────────── + +def run(stem: str, project_root: Path, force: bool) -> bool: + pdf_path = project_root / "sources" / f"{stem}.pdf" + out_dir = project_root / "conversione" / stem + raw_out = out_dir / "raw.md" + clean_out = out_dir / "clean.md" + + print(f"\n{'─' * 52}") + print(f" {stem}") + print(f"{'─' * 52}") + + if clean_out.exists() and not force: + print(f" ⚠️ conversione/{stem}/clean.md già presente — skip") + print(f" (usa --force per rieseguire)") + return True + + # ── [1] Validazione ──────────────────────────────────────────────────── + print(" [1/4] Validazione PDF...") + ok, msg = check_pdf(pdf_path) + if not ok: + print(f" ✗ {msg}") + return False + print(f" ✅ {msg}") + + # ── [2] Conversione ──────────────────────────────────────────────────── + print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...") + with tempfile.TemporaryDirectory() as tmp: + try: + md_file = convert_pdf(pdf_path, Path(tmp)) + except MemoryError: + print(" ✗ Memoria esaurita durante la conversione") + return False + except Exception as e: + print(f" ✗ Conversione fallita: {e}") + return False + try: + raw_text = md_file.read_text(encoding="utf-8") + except UnicodeDecodeError as e: + print(f" ✗ Errore encoding nel file prodotto: {e}") + return False + + size_kb = len(raw_text.encode()) // 1024 + n_lines = raw_text.count("\n") + print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe") + + # ── [3] Pulizia strutturale ──────────────────────────────────────────── + print(" [3/4] Pulizia strutturale...") + clean_text, t_stats = apply_transforms(raw_text) + reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0 + print(f" ✅ Simboli PUA corretti: {t_stats['n_simboli_pua_corretti']}") + print(f" Immagini rimosse: {t_stats['n_immagini_rimosse']}") + print(f" Note rimossa: {t_stats['n_note_rimosse']}") + print(f" Accenti corretti: {t_stats['n_accenti_corretti']}") + print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}") + print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}") + print(f" Header num. normaliz.: {t_stats['n_header_numerati_normalizzati']}") + print(f" Articoli → ###: {t_stats['n_articoli_estratti']}") + print(f" Ambienti matematici: {t_stats['n_ambienti_matematici']}") + print(f" Titoli header uniti: {t_stats['n_titoli_uniti']}") + print(f" TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'}") + print(f" Versi poesia riprist.: {t_stats['n_versi_ripristinati']}") + print(f" Header verso demotati: {t_stats['n_header_verso_demotati']}") + print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}") + print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}") + print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}") + print(f" Riduzione testo: {reduction:.0f}%") + + # ── [4] Profilo strutturale ──────────────────────────────────────────── + print(" [4/4] Analisi struttura...") + try: + out_dir.mkdir(parents=True, exist_ok=True) + raw_out.write_text(raw_text, encoding="utf-8") + clean_out.write_text(clean_text, encoding="utf-8") + except PermissionError as e: + print(f" ✗ Permesso negato durante la scrittura: {e}") + return False + profile = analyze(clean_out) + + _LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"} + print(f" ✅ Struttura: livello {profile['livello_struttura']} — {_LIVELLO_DESC[profile['livello_struttura']]}") + print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} " + f"paragrafi={profile['n_paragrafi']}") + print(f" Strategia chunking: {profile['strategia_chunking']}") + print(f" Lingua rilevata: {profile['lingua_rilevata']}") + for w in profile["avvertenze"]: + print(f" ⚠️ {w}") + + build_report(stem, out_dir, clean_text, t_stats, profile, reduction) + + print(f"\n Output:") + print(f" conversione/{stem}/raw.md (immutabile)") + print(f" conversione/{stem}/clean.md") + print(f" conversione/{stem}/report.json") + print(f"\n clean.md pronto per la suddivisione in chunk.") + return True + + +# ─── Entry point ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + project_root = Path(__file__).parent.parent + + parser = argparse.ArgumentParser( + description="Pipeline PDF → clean Markdown strutturato, pronto per chunking", + epilog="Prerequisiti: pip install opendataloader-pdf + Java 11+ sul PATH", + ) + parser.add_argument( + "--stem", + help="Nome del documento (PDF in sources/.pdf). " + "Se omesso, elabora tutti i PDF in sources/.", + ) + parser.add_argument( + "--force", + action="store_true", + help="Riesegui anche se clean.md è già presente", + ) + args = parser.parse_args() + + _check_deps() + + if args.stem: + stems = [args.stem] + else: + sources_dir = project_root / "sources" + if not sources_dir.exists(): + print("Errore: cartella sources/ non trovata") + sys.exit(1) + stems = sorted(p.stem for p in sources_dir.glob("*.pdf")) + if not stems: + print("Errore: nessun PDF trovato in sources/") + sys.exit(1) + + results = [run(s, project_root, args.force) for s in stems] + ok = sum(results) + total = len(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti") + sys.exit(0 if all(results) else 1) diff --git a/conversione/validate.py b/conversione/validate.py new file mode 100644 index 0000000..f2c1ead --- /dev/null +++ b/conversione/validate.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +conversione/validate.py — Validazione qualità Markdown + +Legge i report.json prodotti da pipeline.py, stampa una tabella di stato +e assegna un voto (0-100) a ogni documento. + + 90-100 A — ottimo, pronto per il chunker + 75-89 B — buono, qualche sezione lunga ma accettabile + 60-74 C — accettabile, anomalie minori da verificare + 40-59 D — da rivedere, problemi strutturali o residui evidenti + 0-39 F — da riprocessare, struttura assente o testo corrotto + +Uso: + python conversione/validate.py # tutti gli stem + python conversione/validate.py analisi1 # stem specifico + python conversione/validate.py a b c # stem multipli + python conversione/validate.py --detail analisi1 # mostra dettaglio penalità +""" + +import argparse +import json +import sys +from pathlib import Path + + +# ─── Punteggio ─────────────────────────────────────────────────────────────── + +_GRADES = [(90, "A"), (75, "B"), (60, "C"), (40, "D"), (0, "F")] + + +def _score(r: dict) -> tuple[int, list[str]]: + """ + Calcola un punteggio 0-100 sulla qualità del clean.md ai fini della + suddivisione in chunk e vettorizzazione. + Restituisce (score, lista_penalità_applicate). + + Penalità struttura (il chunker non può operare senza header): + struttura assente (livello 0) → −40 + struttura piatta (livello 1) → −15 + + Penalità residui (finiscono nei vettori e degradano il retrieval): + backtick → −2/cad (max −20) + dot-leader → −5/cad (max −10) + URL / watermark → −5/cad (max −15) + immagini residue → −5/cad (max −10) +
inline (artefatti tabelle) → −2/cad (max −15) + simboli encoding (!/" residui) → −1/cad (max −10) + formule inline [N.M] → −1/cad (max −8) + + Penalità anomalie: + bare headers → −3/cad (max −15) + + Non penalizzate (il chunker le normalizza): + sezioni corte, sezioni lunghe, mediana, p25 + """ + score = 100 + detail = [] + structure = r.get("structure", {}) + anomalie = r.get("anomalie", {}) + residui = r.get("residui", {}) + + livello = structure.get("livello_struttura", 0) + + # ── Struttura ───────────────────────────────────────────────────────── + if livello == 0: + score -= 40 + detail.append("struttura assente −40") + elif livello == 1: + score -= 15 + detail.append("struttura piatta −15") + + # ── Residui ─────────────────────────────────────────────────────────── + def _pen(key: str, per_item: int, cap: int, label: str) -> None: + n = residui.get(key, 0) + if n: + p = min(cap, n * per_item) + nonlocal score + score -= p + detail.append(f"{label} ×{n} −{p}") + + _pen("backtick", 2, 20, "backtick") + _pen("dotleader", 5, 10, "dot-leader") + _pen("url", 5, 15, "url") + _pen("immagini", 5, 10, "immagini") + _pen("br_inline", 2, 15, "
inline") + _pen("simboli_encoding", 1, 10, "simboli encoding") + _pen("formule_inline", 1, 8, "formule inline") + _pen("footnote_markers", 1, 8, "footnote residui") + _pen("pua_markers", 2, 20, "caratteri PUA font Symbol") + + # ── Anomalie ────────────────────────────────────────────────────────── + n_bare = anomalie.get("bare_headers", 0) + if n_bare: + p = min(15, n_bare * 3) + score -= p + detail.append(f"bare headers ×{n_bare} −{p}") + + return max(0, score), detail + + +def _grade(score: int) -> str: + return next(g for threshold, g in _GRADES if score >= threshold) + + +# ─── Validazione ───────────────────────────────────────────────────────────── + +def validate(stems: list[str], project_root: Path, detail: bool = False) -> None: + conv_dir = project_root / "conversione" + + paths = ( + [conv_dir / s / "report.json" for s in stems] + if stems + else sorted(conv_dir.glob("*/report.json")) + ) + + if not paths: + print("Nessun report.json trovato in conversione/*/") + sys.exit(0) + + rows = [ + json.loads(p.read_text(encoding="utf-8")) if p.exists() + else {"stem": p.parent.name, "_missing": True} + for p in paths + ] + + # ── Intestazione ───────────────────────────────────────────────────── + col = max(len(r.get("stem", "stem")) for r in rows) + 2 + header = ( + f"{'stem':<{col}}" + f"{'h2':>4}{'h3':>5} " + f"{'strategia':<18}" + f"{'bare':>5}{'corte':>6}{'lunghe':>7}" + f"{'btk':>5}{'br':>4}{'enc':>4}{'url':>4}" + f"{'med':>6}" + f" {'voto':>4} grade" + ) + sep = "─" * len(header) + print(f"\n{header}\n{sep}") + + scores = [] + + # ── Righe ───────────────────────────────────────────────────────────── + for r in rows: + if r.get("_missing"): + print(f"{r['stem']:<{col}} (report.json non trovato)") + continue + + st = r.get("structure", {}) + an = r.get("anomalie", {}) + res = r.get("residui", {}) + dist = r.get("distribution", {}) + s, pen = _score(r) + scores.append(s) + + print( + f"{r['stem']:<{col}}" + f"{st.get('n_h2', 0):>4}" + f"{st.get('n_h3', 0):>5} " + f"{st.get('strategia_chunking','?'):<18}" + f"{an.get('bare_headers', 0):>5}" + f"{an.get('short_sections', 0):>6}" + f"{an.get('long_sections', 0):>7}" + f"{res.get('backtick', 0):>5}" + f"{res.get('br_inline', 0):>4}" + f"{res.get('simboli_encoding', 0):>4}" + f"{res.get('url', 0):>4}" + f"{dist.get('mediana', 0):>6}" + f" {s:>4} {_grade(s)}" + ) + + if detail and pen: + for p in pen: + print(f" {'':>{col}} ↳ {p}") + + # ── Riepilogo ───────────────────────────────────────────────────────── + print(sep) + if scores: + media = sum(scores) / len(scores) + print( + f"Documenti: {len(scores)} " + f"Media: {media:.0f}/100 {_grade(int(media))} " + f"(A≥90 B≥75 C≥60 D≥40 F<40)" + ) + print( + "\nColonne: bare=header vuoti corte=sez<150ch lunghe=sez>1500ch " + "btk=backtick br=
inline enc=simboli encoding med=mediana chars\n" + ) + + +# ─── Entry point ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Valida i report Markdown prodotti da pipeline.py", + epilog="Senza argomenti valida tutti gli stem in conversione/*/", + ) + parser.add_argument( + "stems", + nargs="*", + metavar="STEM", + help="stem da validare (es: analisi1). Ometti per tutti.", + ) + parser.add_argument( + "--detail", "-d", + action="store_true", + help="mostra dettaglio penalità per ogni documento", + ) + args = parser.parse_args() + validate(args.stems, Path(__file__).parent.parent, detail=args.detail) diff --git a/requirements.txt b/requirements.txt index a30e6e4..6cc5bce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,9 @@ pdfplumber==0.11.9 # Step 2 — Conversione PDF → Markdown pymupdf4llm +# conversione/ — Pipeline automatica PDF → clean Markdown (alternativa a step 0+1+2+3+4) +# Richiede anche Java 11+ sul PATH: https://adoptium.net/ +opendataloader-pdf + # Step 8 — Vettorizzazione chromadb diff --git a/step-0/check_pdf.py b/step-0/check_pdf.py deleted file mode 100644 index 2bee9b8..0000000 --- a/step-0/check_pdf.py +++ /dev/null @@ -1,229 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 0 — Verifica idoneità PDF - -Legge tutti i PDF in sources/ e salva un report per ognuno in step-0/. - -Uso: - python step-0/check_pdf.py - -Output: - step-0/_step0_report.txt -""" - -import sys -import statistics -from datetime import datetime -from pathlib import Path - - -def check_pdf(pdf_path: str, save: bool = True) -> None: - try: - import pdfplumber - except ImportError: - print("Errore: pdfplumber non è installato.") - print(" pip install pdfplumber") - sys.exit(1) - - path = Path(pdf_path) - if not path.exists(): - print(f"Errore: file non trovato — {pdf_path}") - sys.exit(1) - if path.suffix.lower() != ".pdf": - print(f"Errore: il file non è un PDF — {pdf_path}") - sys.exit(1) - - lines = [] # righe del report - results = [] # (etichetta, stato, messaggio) - - def out(text=""): - lines.append(text) - print(text) - - out(f"Step 0 — Verifica idoneità PDF") - out(f"File: {path.name}") - out(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M')}") - out("=" * 50) - - # ------------------------------------------------------------------ # - # Criterio 1 — Non protetto da password - # ------------------------------------------------------------------ # - try: - with pdfplumber.open(path) as pdf: - n_pages = len(pdf.pages) - results.append(("Non protetto da password", "PASS", f"{n_pages} pagine")) - except Exception as e: - msg = str(e).lower() - if "password" in msg or "encrypted" in msg or "decrypt" in msg: - results.append(("Non protetto da password", "FAIL", - "Il PDF è cifrato — non può essere elaborato")) - else: - results.append(("Non protetto da password", "FAIL", - f"Impossibile aprire il file: {e}")) - _render_results(results, out) - _maybe_save(lines, path, save) - return - - # ------------------------------------------------------------------ # - # Lettura pagine — una sola passata - # ------------------------------------------------------------------ # - char_counts = [] - line_lengths = [] - all_text = "" - empty_pages = 0 - - with pdfplumber.open(path) as pdf: - for page in pdf.pages: - text = page.extract_text() or "" - all_text += text + "\n" - chars = len(text.strip()) - char_counts.append(chars) - if chars == 0: - empty_pages += 1 - for line in text.splitlines(): - stripped = line.strip() - if stripped: - line_lengths.append(len(stripped)) - - total_pages = len(char_counts) - pages_with_text = sum(1 for c in char_counts if c > 50) - text_coverage = pages_with_text / total_pages if total_pages > 0 else 0 - - # ------------------------------------------------------------------ # - # Criterio 2 — Testo estraibile - # ------------------------------------------------------------------ # - if text_coverage >= 0.7: - results.append(("Testo estraibile", "PASS", - f"{pages_with_text}/{total_pages} pagine con testo ({text_coverage:.0%})")) - elif text_coverage >= 0.4: - results.append(("Testo estraibile", "WARN", - f"Solo {pages_with_text}/{total_pages} pagine con testo — revisione estesa necessaria")) - else: - results.append(("Testo estraibile", "FAIL", - f"Solo {pages_with_text}/{total_pages} pagine con testo — probabilmente scansionato")) - - # ------------------------------------------------------------------ # - # Criterio 3 — Generato digitalmente (non scansionato) - # ------------------------------------------------------------------ # - pages_text_only = [c for c in char_counts if c > 0] - avg_chars = statistics.mean(pages_text_only) if pages_text_only else 0 - - if avg_chars >= 300: - results.append(("Generato digitalmente (non scansionato)", "PASS", - f"Media {avg_chars:.0f} char/pagina")) - elif avg_chars >= 100: - results.append(("Generato digitalmente (non scansionato)", "WARN", - f"Media bassa: {avg_chars:.0f} char/pagina — alcune pagine potrebbero essere immagini")) - else: - results.append(("Generato digitalmente (non scansionato)", "FAIL", - f"Media molto bassa: {avg_chars:.0f} char/pagina — il PDF sembra scansionato")) - - # ------------------------------------------------------------------ # - # Criterio 4 — Pagine vuote - # ------------------------------------------------------------------ # - if empty_pages == 0: - results.append(("Pagine vuote", "PASS", "Nessuna pagina vuota")) - elif empty_pages <= total_pages * 0.05: - results.append(("Pagine vuote", "WARN", - f"{empty_pages} pagine vuote (≤ 5%) — probabilmente copertine o separatori")) - else: - results.append(("Pagine vuote", "WARN", - f"{empty_pages} pagine vuote ({empty_pages/total_pages:.0%}) — controllare")) - - # ------------------------------------------------------------------ # - # Criterio desiderabile — Layout a colonne singola - # ------------------------------------------------------------------ # - if line_lengths: - median_len = statistics.median(line_lengths) - short_lines = sum(1 for l in line_lengths if l < median_len * 0.4) - short_ratio = short_lines / len(line_lengths) - if short_ratio < 0.15: - results.append(("Layout a colonne singola (desiderabile)", "PASS", - f"Righe corte: {short_ratio:.0%} — struttura lineare")) - elif short_ratio < 0.35: - results.append(("Layout a colonne singola (desiderabile)", "WARN", - f"Righe corte: {short_ratio:.0%} — possibile layout a colonne parziale")) - else: - results.append(("Layout a colonne singola (desiderabile)", "WARN", - f"Righe corte: {short_ratio:.0%} — probabile layout a colonne multiple")) - else: - results.append(("Layout a colonne singola (desiderabile)", "WARN", - "Impossibile analizzare (nessuna riga estratta)")) - - # ------------------------------------------------------------------ # - # Criterio desiderabile — Struttura logica (titoli) - # ------------------------------------------------------------------ # - candidate_headings = [ - line.strip() for line in all_text.splitlines() - if 3 <= len(line.strip()) <= 80 - and line.strip()[0].isupper() - and not line.strip().endswith(".") - and not line.strip().endswith(",") - and len(line.strip().split()) <= 10 - ] - heading_density = len(candidate_headings) / total_pages if total_pages > 0 else 0 - - if heading_density >= 1.0: - results.append(("Struttura logica riconoscibile (desiderabile)", "PASS", - f"~{len(candidate_headings)} possibili titoli rilevati ({heading_density:.1f}/pagina)")) - elif heading_density >= 0.3: - results.append(("Struttura logica riconoscibile (desiderabile)", "WARN", - f"~{len(candidate_headings)} possibili titoli ({heading_density:.1f}/pagina) — struttura parziale")) - else: - results.append(("Struttura logica riconoscibile (desiderabile)", "WARN", - "Pochi titoli rilevati — testo narrativo o struttura non standard")) - - _render_results(results, out) - _maybe_save(lines, path, save) - - -def _render_results(results: list, out) -> None: - icons = {"PASS": "✅", "WARN": "⚠️ ", "FAIL": "❌"} - out() - for label, status, message in results: - icon = icons.get(status, " ") - out(f" {icon} {label}") - out(f" {message}") - out() - - fails = [r for r in results if r[1] == "FAIL"] - warns = [r for r in results if r[1] == "WARN"] - - if fails: - out("ESITO: ❌ PDF NON IDONEO") - out(" Criteri obbligatori non soddisfatti — scegli un PDF diverso.") - elif warns: - out("ESITO: ⚠️ PDF ACCETTABILE CON CAUTELA") - out(" Procedi, ma aspettati più lavoro nella revisione manuale (step 4).") - else: - out("ESITO: ✅ PDF IDONEO") - out(" Tutti i criteri soddisfatti — procedi con lo step 1.") - out() - - -def _maybe_save(lines: list, pdf_path: Path, save: bool) -> None: - if not save: - return - script_dir = Path(__file__).parent - out_file = script_dir / f"{pdf_path.stem}_step0_report.txt" - out_file.write_text("\n".join(lines), encoding="utf-8") - print(f"Report salvato in: {out_file}") - - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - sources_dir = project_root / "sources" - - if not sources_dir.exists(): - print(f"Errore: cartella sources/ non trovata in {project_root}") - sys.exit(1) - - pdfs = sorted(sources_dir.glob("*.pdf")) - if not pdfs: - print(f"Errore: nessun PDF trovato in {sources_dir}") - sys.exit(1) - - for pdf in pdfs: - check_pdf(str(pdf), save=True) - if len(pdfs) > 1: - print("-" * 50) diff --git a/step-1/inspect_pdf.py b/step-1/inspect_pdf.py deleted file mode 100644 index 0c2bfdd..0000000 --- a/step-1/inspect_pdf.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 1 — Ispezione automatica PDF - -Analizza il PDF pagina per pagina e produce un report con score (0–100) -e lista dei problemi per pagina. Serve per capire la qualità del documento -e mappare i problemi prima della revisione manuale (step 4). - -Uso: - python step1/inspect.py - -Output: - step1/_step1_report.txt -""" - -import re -import sys -import statistics -from collections import Counter -from datetime import datetime -from pathlib import Path - - -# ── Penalità per il calcolo dello score ─────────────────────────────────── -SYLLABIF_PENALTY = 0.3 # per occorrenza di sillabazione -COLUMN_PENALTY = 3.0 # per pagina con layout a colonne -UNICODE_PENALTY = 1.5 # per pagina con caratteri anomali -EMPTY_PENALTY = 1.0 # per pagina vuota -HEADER_FOOTER_PEN = 5.0 # fisso se intestazioni/piè ripetitivi rilevati - - -def inspect_pdf(pdf_path: str, save: bool = True) -> None: - try: - import pdfplumber - except ImportError: - print("Errore: pdfplumber non è installato.") - print(" pip install pdfplumber") - sys.exit(1) - - path = Path(pdf_path) - if not path.exists(): - print(f"Errore: file non trovato — {pdf_path}") - sys.exit(1) - - lines = [] - - def out(text=""): - lines.append(text) - print(text) - - out("Step 1 — Ispezione automatica PDF") - out(f"File: {path.name}") - out(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M')}") - out("=" * 50) - - # ── Lettura pagine ───────────────────────────────────────────────────── - with pdfplumber.open(path) as pdf: - n_pages = len(pdf.pages) - pages_text = [page.extract_text() or "" for page in pdf.pages] - - # ── Analisi per pagina ───────────────────────────────────────────────── - issues = [] # (page_num, descrizione) — page_num=0 → problema globale - deductions = 0.0 - - first_lines = [] # prima riga significativa di ogni pagina (per header) - last_lines = [] # ultima riga significativa di ogni pagina (per footer) - - for i, text in enumerate(pages_text): - page_num = i + 1 - stripped = text.strip() - - # 1. Pagina vuota - if len(stripped) < 50: - issues.append((page_num, "pagina vuota")) - deductions += EMPTY_PENALTY - continue - - page_lines = text.splitlines() - nonempty = [l.strip() for l in page_lines if l.strip()] - - # Raccogli prima/ultima riga per il controllo header/footer - if nonempty: - first_lines.append(nonempty[0]) - last_lines.append(nonempty[-1]) - - # 2. Sillabazione a fine riga (es. "estra-" + a capo) - syllabif = sum( - 1 for line in page_lines - if re.search(r'\b\w{2,}-$', line.rstrip()) - ) - if syllabif: - label = "occorrenza" if syllabif == 1 else "occorrenze" - issues.append((page_num, f"sillabazione rilevata ({syllabif} {label})")) - deductions += syllabif * SYLLABIF_PENALTY - - # 3. Layout a colonne (righe molto corte e numerose) - if len(nonempty) >= 10: - median_len = statistics.median(len(l) for l in nonempty) - short_ratio = sum(1 for l in nonempty if len(l) < median_len * 0.4) / len(nonempty) - if short_ratio > 0.35: - issues.append((page_num, f"possibile layout a colonne ({short_ratio:.0%} righe corte)")) - deductions += COLUMN_PENALTY - - # 4. Caratteri Unicode anomali - # (control chars esclusi \n \t \r, replacement char, PUA block) - anomalies = re.findall( - r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f\ufffd\ue000-\uf8ff]', text - ) - if anomalies: - issues.append((page_num, f"caratteri Unicode anomali ({len(anomalies)} trovati)")) - deductions += UNICODE_PENALTY - - # ── Intestazioni e piè di pagina ripetitivi ──────────────────────────── - def _check_repetition(line_list: list, label: str) -> None: - nonlocal deductions - if not line_list: - return - threshold = max(3, len(line_list) * 0.25) - repeated = [ - (txt, cnt) for txt, cnt in Counter(line_list).items() - if cnt >= threshold and len(txt) > 3 - ] - if repeated: - deductions += HEADER_FOOTER_PEN - for txt, cnt in repeated[:3]: - issues.append((0, f"{label} ripetitivo: \"{txt[:45]}\" ({cnt} volte)")) - - _check_repetition(first_lines, "intestazione") - _check_repetition(last_lines, "piè di pagina") - - # ── Score ────────────────────────────────────────────────────────────── - score = max(0, round(100 - deductions)) - - # ── Riepilogo ────────────────────────────────────────────────────────── - pages_with_issues = len({p for p, _ in issues if p > 0}) - out() - out(f"Score: {score}/100") - out(f"Pagine totali: {n_pages}") - out(f"Pagine con problemi: {pages_with_issues}") - out() - - if issues: - global_issues = [(p, d) for p, d in issues if p == 0] - page_issues = sorted([(p, d) for p, d in issues if p > 0]) - for _, desc in global_issues: - out(f" ⚠️ {desc}") - for page_num, desc in page_issues: - out(f" Pagina {page_num:>4}: {desc}") - else: - out(" Nessun problema rilevato.") - - out() - - # ── Prossimi passi ───────────────────────────────────────────────────── - out("PROSSIMI PASSI:") - if score >= 70: - out(" → conversione con marker funzionerà bene") - elif score >= 40: - out(" → conversione possibile, attendi più errori nella revisione") - else: - out(" → qualità bassa — valuta una fonte PDF migliore") - - attention_pages = sorted({p for p, _ in issues if p > 0}) - if attention_pages: - sample = ", ".join(str(p) for p in attention_pages[:10]) - if len(attention_pages) > 10: - sample += f" … e altre {len(attention_pages) - 10}" - out(f" → attenzione alle pagine {sample} nella revisione manuale") - out() - - _maybe_save(lines, path, save) - - -def _maybe_save(lines: list, pdf_path: Path, save: bool) -> None: - if not save: - return - script_dir = Path(__file__).parent - out_file = script_dir / f"{pdf_path.stem}_step1_report.txt" - out_file.write_text("\n".join(lines), encoding="utf-8") - print(f"Report salvato in: {out_file}") - - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - sources_dir = project_root / "sources" - - if not sources_dir.exists(): - print(f"Errore: cartella sources/ non trovata in {project_root}") - sys.exit(1) - - pdfs = sorted(sources_dir.glob("*.pdf")) - if not pdfs: - print(f"Errore: nessun PDF trovato in {sources_dir}") - sys.exit(1) - - for pdf in pdfs: - inspect_pdf(str(pdf), save=True) - if len(pdfs) > 1: - print("-" * 50) diff --git a/step-2/convert_pdf.py b/step-2/convert_pdf.py deleted file mode 100644 index efc6376..0000000 --- a/step-2/convert_pdf.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 2 — Conversione PDF → Markdown grezzo - -Usa pymupdf4llm (PyMuPDF puro C, zero modelli ML, ~30-50 MB RAM) -per convertire ogni PDF in sources/ e organizza l'output in: - step-2//raw.md — MD grezzo, non modificare mai - step-2//clean.md — copia di lavoro per lo step 4 - -Uso: - python step-2/convert_pdf.py # tutti i PDF in sources/ - python step-2/convert_pdf.py --pdf sources/doc.pdf # un solo PDF -""" - -import argparse -import shutil -import sys -from pathlib import Path - -import pymupdf4llm - - -def convert_pdf(pdf_path: Path, project_root: Path) -> bool: - stem = pdf_path.stem - out_dir = project_root / "step-2" / stem - raw_md = out_dir / "raw.md" - clean_md = out_dir / "clean.md" - - print(f"\nConversione: {pdf_path.name}") - print(f" Output: step-2/{stem}/") - - if raw_md.exists(): - print(f" ⚠️ raw.md già presente — skip") - print(f" (elimina {raw_md} per riconvertire)") - return True - - out_dir.mkdir(parents=True, exist_ok=True) - - print(f" Conversione in corso...") - md_text = pymupdf4llm.to_markdown(str(pdf_path)) - - raw_md.write_text(md_text, encoding="utf-8") - shutil.copy2(raw_md, clean_md) - - size_kb = raw_md.stat().st_size // 1024 - print(f" ✅ raw.md salvato ({size_kb} KB)") - print(f" ✅ clean.md creato (copia di lavoro per step 4)") - return True - - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Step 2 — Conversione PDF → Markdown") - parser.add_argument("--pdf", help="Percorso di un singolo PDF da convertire") - args = parser.parse_args() - - if args.pdf: - pdf_path = Path(args.pdf) - if not pdf_path.exists(): - print(f"Errore: file non trovato — {args.pdf}") - sys.exit(1) - pdfs = [pdf_path] - else: - sources_dir = project_root / "sources" - if not sources_dir.exists(): - print(f"Errore: cartella sources/ non trovata in {project_root}") - sys.exit(1) - pdfs = sorted(sources_dir.glob("*.pdf")) - if not pdfs: - print(f"Errore: nessun PDF trovato in {sources_dir}") - sys.exit(1) - - results = [convert_pdf(p, project_root) for p in pdfs] - - ok_count = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok_count}/{total} PDF convertiti") - - sys.exit(0 if all(results) else 1) diff --git a/step-3/detect_structure.py b/step-3/detect_structure.py deleted file mode 100644 index e3a426b..0000000 --- a/step-3/detect_structure.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 3 — Rilevamento struttura Markdown - -Analizza il Markdown grezzo prodotto dallo step 2 senza modificarlo. -Copia i file da step-2// e produce structure_profile.json che -guida la revisione manuale (step 4) e il chunker adattivo (step 5). - -Output in step-3//: - raw.md — copia da step-2 (non modificare mai) - clean.md — copia da step-2 (da revisionare nello step 4) - structure_profile.json — profilo strutturale - -Uso: - python step-3/detect_structure.py # tutti i documenti in step-2/ - python step-3/detect_structure.py --stem nietzsche # un solo documento - python step-3/detect_structure.py --force # riesegui anche se già presente -""" - -import argparse -import json -import re -import shutil -import sys -from pathlib import Path - - -# ─── Language detection ─────────────────────────────────────────────────────── - -_IT_WORDS = frozenset([ - "il", "la", "di", "e", "che", "non", "per", "un", "una", "si", - "con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le", - "gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era", - "erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così", -]) - -_EN_WORDS = frozenset([ - "the", "of", "and", "to", "in", "is", "that", "it", "was", "for", - "on", "are", "as", "with", "his", "they", "at", "be", "this", "have", - "from", "or", "an", "but", "not", "by", "he", "she", "we", "you", - "which", "their", "been", "has", "would", "there", "when", "will", -]) - - -def detect_language(text: str) -> str: - words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower()) - sample = words[:2000] - it = sum(1 for w in sample if w in _IT_WORDS) - en = sum(1 for w in sample if w in _EN_WORDS) - if it == 0 and en == 0: - return "unknown" - return "it" if it >= en else "en" - - -# ─── Markdown parsing ───────────────────────────────────────────────────────── - -def split_sections(text: str, header_level: int) -> list[str]: - """ - Split text on headers of the given level (1=h1, 2=h2, 3=h3). - Returns list of body texts for each matching section. - """ - prefix = "#" * header_level + " " - parts = re.split(rf'(?m)^{re.escape(prefix)}.+', text) - # parts[0] is preamble, rest are section bodies - return [p for p in parts[1:] if p.strip()] - - -def count_headers(text: str, level: int) -> int: - prefix = "#" * level + " " - return len(re.findall(rf'(?m)^{re.escape(prefix)}', text)) - - -def count_paragraphs(text: str) -> int: - """Count non-empty, non-header paragraph blocks.""" - blocks = re.split(r'\n{2,}', text) - return sum(1 for b in blocks if b.strip() and not re.match(r'^#+\s', b.strip())) - - -# ─── Core analysis ──────────────────────────────────────────────────────────── - -def analyze(raw_md_path: Path) -> dict: - text = raw_md_path.read_text(encoding="utf-8") - - n_h1 = count_headers(text, 1) - n_h2 = count_headers(text, 2) - n_h3 = count_headers(text, 3) - n_paragrafi = count_paragraphs(text) - - # Determine structural level and primary boundary - if n_h3 >= 5: - livello = 3 - boundary = "h3" - strategia = "h3_aware" - section_bodies = split_sections(text, 3) - elif n_h2 >= 3: - livello = 2 - boundary = "h2" - strategia = "h2_paragraph_split" - section_bodies = split_sections(text, 2) - elif n_h1 + n_h2 + n_h3 >= 1: - livello = 1 - boundary = "paragrafo" - strategia = "paragraph" - section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()] - else: - if n_paragrafi >= 3: - livello = 1 - boundary = "paragrafo" - strategia = "paragraph" - section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()] - else: - livello = 0 - boundary = "nessuno" - strategia = "sliding_window" - section_bodies = [text] if text.strip() else [] - - lengths = [len(b) for b in section_bodies if b.strip()] - lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0 - - lingua = detect_language(text) - - avvertenze = [] - short = sum(1 for l in lengths if l < 200) - long_ = sum(1 for l in lengths if l > 800) - if short: - avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate") - if long_: - avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise") - - return { - "livello_struttura": livello, - "n_h1": n_h1, - "n_h2": n_h2, - "n_h3": n_h3, - "n_paragrafi": n_paragrafi, - "boundary_primario": boundary, - "lingua_rilevata": lingua, - "lunghezza_media_sezione": lunghezza_media, - "strategia_chunking": strategia, - "avvertenze": avvertenze, - } - - -# ─── Per-document processing ───────────────────────────────────────────────── - -def process_stem(stem: str, project_root: Path, force: bool) -> bool: - src_dir = project_root / "step-2" / stem - out_dir = project_root / "step-3" / stem - raw_src = src_dir / "raw.md" - clean_src = src_dir / "clean.md" - profile_out = out_dir / "structure_profile.json" - - print(f"\nDocumento: {stem}") - - if not raw_src.exists(): - print(f" ✗ raw.md non trovato in step-2/{stem}/ — skip") - return False - - if profile_out.exists() and not force: - print(f" ⚠️ structure_profile.json già presente — skip") - print(f" (usa --force per rieseguire)") - return True - - out_dir.mkdir(parents=True, exist_ok=True) - - # Copy files from step-2 - shutil.copy2(raw_src, out_dir / "raw.md") - if clean_src.exists(): - shutil.copy2(clean_src, out_dir / "clean.md") - print(f" Copiati raw.md e clean.md da step-2/{stem}/") - - # Analyze - print(f" Analisi struttura in corso...") - profile = analyze(out_dir / "raw.md") - - profile_out.write_text(json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8") - - # Report - _LIVELLO_DESC = { - 3: "struttura ricca (###)", - 2: "struttura parziale (##)", - 1: "solo paragrafi", - 0: "testo piatto", - } - print(f" ✅ Livello {profile['livello_struttura']} — {_LIVELLO_DESC[profile['livello_struttura']]}") - print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} paragrafi={profile['n_paragrafi']}") - print(f" Boundary: {profile['boundary_primario']} | Strategia: {profile['strategia_chunking']}") - print(f" Lingua: {profile['lingua_rilevata']} | Lunghezza media sezione: {profile['lunghezza_media_sezione']} char") - for w in profile["avvertenze"]: - print(f" ⚠️ {w}") - print(f" ✅ structure_profile.json salvato") - return True - - -# ─── Entry point ───────────────────────────────────────────────────────────── - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Step 3 — Rilevamento struttura Markdown") - parser.add_argument("--stem", help="Nome del documento (sottocartella di step-2/)") - parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") - args = parser.parse_args() - - if args.stem: - stems = [args.stem] - else: - step2_dir = project_root / "step-2" - if not step2_dir.exists(): - print(f"Errore: cartella step-2/ non trovata in {project_root}") - sys.exit(1) - stems = sorted(p.name for p in step2_dir.iterdir() if p.is_dir()) - if not stems: - print(f"Errore: nessun documento trovato in step-2/") - sys.exit(1) - - results = [process_stem(s, project_root, args.force) for s in stems] - - ok = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti analizzati") - - sys.exit(0 if all(results) else 1) diff --git a/step-4/revise.py b/step-4/revise.py deleted file mode 100644 index cf703a2..0000000 --- a/step-4/revise.py +++ /dev/null @@ -1,433 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 4 — Revisione automatica del Markdown - -Trasforma clean.md da step-3 rivelando la struttura latente del documento. -Le trasformazioni sono euristiche universali che funzionano su qualsiasi PDF: - - - Normalizza whitespace multiplo (artefatto PDF) - - Riduce righe vuote multiple - - Rimuove marcatori **bold** nelle intestazioni esistenti - - Converte righe ALL-CAPS standalone → ## header (euristico, qualsiasi lingua) - - Converte sezioni numerate "N. testo" → ### N. (qualsiasi numerazione) - - Rimuove blocchi TOC (righe che iniziano con parole-chiave indice) - -Per ogni documento viene ricalcolato il profilo strutturale: il livello può -salire (es. livello 1 → 3) se le strutture latenti vengono rilevate. - -Output in step-4//: - raw.md — copia da step-3 (non modificare mai) - clean.md — MD revisionato - structure_profile.json — profilo aggiornato dopo la revisione - -Uso: - python step-4/revise.py # tutti i documenti in step-3/ - python step-4/revise.py --stem nietzsche # un solo documento - python step-4/revise.py --force # riesegui anche se già presente -""" - -import argparse -import json -import re -import shutil -import sys -from datetime import date -from pathlib import Path - -# Riusa la funzione analyze() già scritta nello step 3 -sys.path.insert(0, str(Path(__file__).parent.parent / "step-3")) -from detect_structure import analyze # noqa: E402 - - -# ─── Costanti ───────────────────────────────────────────────────────────────── - -# Parole-chiave che identificano blocchi TOC (da rimuovere) -_TOC_KEYWORDS = frozenset([ - "indice", "index", "contents", "table of contents", - "sommario", "inhaltsverzeichnis", "inhalt", -]) - -# Preposizioni/articoli da non capitalizzare nel title-case -_STOP_IT_EN = frozenset([ - # italiano - "di", "del", "della", "dei", "delle", "da", "in", "e", "il", "la", - "lo", "le", "gli", "un", "una", "per", "a", "al", "alla", "ai", - "alle", "con", "su", "sul", "sulla", "che", "o", - # inglese - "of", "the", "a", "an", "and", "or", "but", "in", "on", "at", - "to", "for", "with", "by", "from", "as", -]) - -# Ordinali italiani → romani (per titoli come "CAPITOLO PRIMO") -_ORDINALS_IT = { - "PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV", - "QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII", - "NONO": "IX", "DECIMO": "X", -} - -# Ordinali inglesi → arabici (per "CHAPTER ONE") -_ORDINALS_EN = { - "ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5", - "SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10", -} - - -# ─── Utilità ────────────────────────────────────────────────────────────────── - -def _sentence_case(s: str) -> str: - """ - Sentence-case: prima lettera maiuscola, resto minuscolo. - Corretto per l'italiano e accettabile per l'inglese accademico. - """ - if not s: - return s - lower = s.lower() - return lower[0].upper() + lower[1:] - - -def _is_allcaps_line(line: str) -> bool: - """ - True se la riga è una candidata per conversione a ## header. - Criterio: tutti i caratteri alfabetici sono maiuscoli, lunghezza >= 3. - """ - stripped = line.strip() - letters = [c for c in stripped if c.isalpha()] - return ( - len(letters) >= 3 - and all(c.isupper() for c in letters) - and not stripped.startswith("#") - ) - - -def _allcaps_to_header(raw_line: str) -> str: - """ - Converte una riga ALL-CAPS in un ## header title-case. - Riconosce pattern specifici (CAPITOLO ORDINE, CHAPTER N) come bonus, - ma funziona in modalità generica su qualsiasi testo. - """ - text = raw_line.strip().rstrip('.').rstrip('?').strip() - - # ── Pattern italiano: "CAPITOLO PRIMO. TITOLO DEL CAPITOLO" - _ORD_IT_PAT = "|".join(_ORDINALS_IT.keys()) - m = re.match(rf'^CAPITOLO ({_ORD_IT_PAT})\. (.+)', text) - if m: - roman = _ORDINALS_IT[m.group(1)] - titolo = m.group(2).rstrip('.').rstrip('?').strip() - return f"## Capitolo {roman} — {_sentence_case(titolo)}" - - # ── Pattern inglese: "CHAPTER ONE. TITLE" o "CHAPTER 1. TITLE" - _ORD_EN_PAT = "|".join(_ORDINALS_EN.keys()) - m = re.match(rf'^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)', text) - if m: - n = _ORDINALS_EN.get(m.group(1), m.group(1)) - titolo = m.group(2).rstrip('.').rstrip('?').strip() - return f"## Chapter {n} — {_sentence_case(titolo)}" - - # ── Pattern generico con numerazione romana o arabica nel prefisso - m = re.match(r'^([IVXLCDM]+|[0-9]+)\. (.+)', text) - if m: - n = m.group(1) - titolo = m.group(2).rstrip('.').strip() - return f"## {n}. {_sentence_case(titolo)}" - - # ── Caso generico: tutto maiuscolo senza pattern riconoscibile - return f"## {_sentence_case(text)}" - - -def _is_toc_line(line: str) -> bool: - """True se la riga è l'intestazione di un blocco indice/TOC.""" - first_word = line.strip().split('.')[0].strip().lower() - return first_word in _TOC_KEYWORDS - - -# ─── Trasformazioni ──────────────────────────────────────────────────────────── - -def apply_transforms(text: str) -> tuple[str, dict]: - """ - Applica tutte le trasformazioni strutturali al testo MD. - Restituisce (testo_modificato, statistiche). - """ - stats = { - "toc_rimosso": False, - "n_header_allcaps": 0, - "n_sezioni_numerate": 0, - "n_paragrafi_uniti": 0, - } - - # ── 1. Rimuovi marcatori **bold** nelle intestazioni esistenti - # ## **Titolo** → ## Titolo - text = re.sub( - r'^(#{1,6})\s+\*\*(.+?)\*\*\s*$', - r'\1 \2', - text, flags=re.MULTILINE, - ) - - # ── 1b. Normalizza header esistenti con contenuto ALL-CAPS → sentence-case - # ## AL DI LA' DEL BENE E DEL MALE → ## Al di la' del bene e del male - def _norm_allcaps_header(m: re.Match) -> str: - hashes = m.group(1) - content = m.group(2).strip() - letters = [c for c in content if c.isalpha()] - if letters and all(c.isupper() for c in letters): - return f"{hashes} {_sentence_case(content)}" - return m.group(0) - - text = re.sub( - r'^(#{1,6}) (.+)$', - _norm_allcaps_header, - text, flags=re.MULTILINE, - ) - - # ── 2. Rimuovi blocco TOC (riga indice + contenuto inline sulla stessa riga) - # "INDICE. Capitolo 1 Capitolo 2 ..." → rimossa - lines = text.split('\n') - new_lines = [] - for line in lines: - if _is_toc_line(line): - stats["toc_rimosso"] = True - else: - new_lines.append(line) - text = '\n'.join(new_lines) - - # ── 3. Converti righe ALL-CAPS standalone → ## header - # Una riga è "standalone" se è preceduta/seguita da riga vuota - # oppure si trova all'inizio/fine del documento. - blocks = text.split('\n\n') - new_blocks = [] - for block in blocks: - stripped = block.strip() - # Blocco standalone = un'unica riga (nessun \n interno rilevante) - if '\n' not in stripped and _is_allcaps_line(stripped): - new_blocks.append(_allcaps_to_header(stripped)) - stats["n_header_allcaps"] += 1 - else: - # Controlla riga per riga per righe ALL-CAPS seguite da altri contenuti - sub_lines = block.split('\n') - converted = [] - for ln in sub_lines: - if _is_allcaps_line(ln) and len(ln.strip()) > 3: - converted.append(_allcaps_to_header(ln)) - stats["n_header_allcaps"] += 1 - else: - converted.append(ln) - new_blocks.append('\n'.join(converted)) - text = '\n\n'.join(new_blocks) - - # ── 4. Converti sezioni numerate "N. testo" → "### N.\n\ntesto" - # Riconosce: "1. Testo", "42. Testo" (due o più spazi dopo il punto) - def _num_repl(m: re.Match) -> str: - num = m.group(1) - testo = m.group(2).strip() - stats["n_sezioni_numerate"] += 1 - return f"### {num}.\n\n{testo}" - - # Pattern standard: "1. testo" o "1. testo" - text = re.sub( - r'^(\d+)\.\s+(.+)$', - _num_repl, - text, flags=re.MULTILINE, - ) - - # Pattern con lettera-suffisso: "65 a. testo" o "65a. testo" - def _num_letter_repl(m: re.Match) -> str: - num = m.group(1) + m.group(2) - testo = m.group(3).strip() - stats["n_sezioni_numerate"] += 1 - return f"### {num}.\n\n{testo}" - - text = re.sub( - r'^(\d+)\s*([a-z])\.\s+(.+)$', - _num_letter_repl, - text, flags=re.MULTILINE, - ) - - # ── 5. Unisci paragrafi spezzati da salti pagina PDF - # Criterio: blocco A non finisce con punteggiatura di fine frase, - # blocco B non inizia con maiuscola "di sezione" né è un header. - # Unione sicura: mai attraverso confini ###/##. - _SENTENCE_END = set('.?!»)\'"') - blocks = text.split('\n\n') - merged = [] - i = 0 - while i < len(blocks): - b = blocks[i] - stripped = b.strip() - # Prova a unire con il successivo se la frase è spezzata - while ( - i + 1 < len(blocks) - and stripped - and not stripped.startswith('#') - and stripped[-1] not in _SENTENCE_END - ): - nxt = blocks[i + 1].strip() - # Non unire se il successivo è un header o è vuoto - if not nxt or nxt.startswith('#'): - break - # Non unire se il successivo inizia con una cifra seguita da punto - # (sarebbe l'inizio di un nuovo aforisma non ancora convertito) - if re.match(r'^\d+\.', nxt): - break - b = stripped + ' ' + nxt - stripped = b.strip() - stats["n_paragrafi_uniti"] += 1 - i += 1 - merged.append(b) - i += 1 - text = '\n\n'.join(merged) - - # ── 6. Normalizza whitespace multiplo interno alle righe - # "parola parola" → "parola parola" (inclusi gli header) - lines = text.split('\n') - normalized = [] - for line in lines: - if not line.strip(): - normalized.append(line) - else: - normalized.append(re.sub(r' +', ' ', line)) - text = '\n'.join(normalized) - - # ── 7. Riduci righe vuote multiple a doppie - text = re.sub(r'\n{3,}', '\n\n', text) - - return text, stats - - -# ─── Aggiornamento revision log ──────────────────────────────────────────────── - -def update_revision_log( - log_path: Path, - stem: str, - profile_before: dict, - profile_after: dict, - t_stats: dict, -) -> None: - header_exists = log_path.exists() and log_path.stat().st_size > 0 - - avv = profile_after.get("avvertenze", []) - avv_str = "; ".join(avv) if avv else "nessuna" - - entry = f""" -## {stem} — {date.today().isoformat()} - -**Trasformazioni automatiche:** -- Normalizzazione whitespace multiplo e righe vuote -- Blocco TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'} -- Righe ALL-CAPS → ## header: {t_stats['n_header_allcaps']} -- Sezioni numerate → ### header: {t_stats['n_sezioni_numerate']} -- Paragrafi uniti (salti pagina PDF): {t_stats['n_paragrafi_uniti']} -- Livello struttura: {profile_before.get('livello_struttura', '?')} → {profile_after.get('livello_struttura', '?')} - -**Avvertenze residue:** {avv_str} - -**Revisioni manuali pendenti:** -- [ ] Verificare conversioni ALL-CAPS errate -- [ ] Controllare sezioni troppo corte o troppo lunghe -""" - - if not header_exists: - log_path.write_text("# Revision log\n" + entry, encoding="utf-8") - else: - existing = log_path.read_text(encoding="utf-8") - log_path.write_text(existing + entry, encoding="utf-8") - - -# ─── Per-document processing ───────────────────────────────────────────────── - -def process_stem(stem: str, project_root: Path, force: bool) -> bool: - src_dir = project_root / "step-3" / stem - out_dir = project_root / "step-4" / stem - raw_src = src_dir / "raw.md" - clean_src = src_dir / "clean.md" - profile_src = src_dir / "structure_profile.json" - clean_out = out_dir / "clean.md" - profile_out = out_dir / "structure_profile.json" - - print(f"\nDocumento: {stem}") - - if not clean_src.exists(): - print(f" ✗ clean.md non trovato in step-3/{stem}/ — skip") - return False - - if clean_out.exists() and not force: - print(f" ⚠️ clean.md già presente — skip") - print(f" (usa --force per rieseguire)") - return True - - out_dir.mkdir(parents=True, exist_ok=True) - - # Copia raw.md immutabile (riferimento) - if raw_src.exists(): - shutil.copy2(raw_src, out_dir / "raw.md") - print(f" Copiato raw.md da step-3/{stem}/") - - # Leggi profilo step-3 (per confronto nel report) - profile_before: dict = {} - if profile_src.exists(): - profile_before = json.loads(profile_src.read_text(encoding="utf-8")) - - # Applica trasformazioni - print(f" Applicazione trasformazioni strutturali...") - text = clean_src.read_text(encoding="utf-8") - text_revised, t_stats = apply_transforms(text) - - # Salva clean.md revisionato - clean_out.write_text(text_revised, encoding="utf-8") - - # Ricalcola profilo sul nuovo clean.md - profile_after = analyze(clean_out) - profile_out.write_text( - json.dumps(profile_after, ensure_ascii=False, indent=2), - encoding="utf-8", - ) - - # Report - lv_b = profile_before.get("livello_struttura", "?") - lv_a = profile_after["livello_struttura"] - _STRAT = {3: "h3_aware", 2: "h2_paragraph_split", 1: "paragraph", 0: "sliding_window"} - print(f" ✅ Livello struttura: {lv_b} → {lv_a} ({_STRAT.get(lv_a, '?')})") - print(f" h2: {profile_before.get('n_h2','?')} → {profile_after['n_h2']}") - print(f" h3: {profile_before.get('n_h3','?')} → {profile_after['n_h3']}") - print(f" TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'}") - print(f" Righe ALL-CAPS → ##: {t_stats['n_header_allcaps']}") - print(f" Sezioni numerate → ###: {t_stats['n_sezioni_numerate']}") - print(f" Paragrafi uniti (salti pagina): {t_stats['n_paragrafi_uniti']}") - for w in profile_after["avvertenze"]: - print(f" ⚠️ {w}") - - # Aggiorna revision log (direttamente in step-4/, non in sottocartella) - log_path = project_root / "step-4" / "revision_log.md" - update_revision_log(log_path, stem, profile_before, profile_after, t_stats) - print(f" ✅ step-4/revision_log.md aggiornato") - print(f" ✅ structure_profile.json salvato") - return True - - -# ─── Entry point ───────────────────────────────────────────────────────────── - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Step 4 — Revisione automatica Markdown") - parser.add_argument("--stem", help="Nome del documento (sottocartella di step-3/)") - parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") - args = parser.parse_args() - - if args.stem: - stems = [args.stem] - else: - step3_dir = project_root / "step-3" - if not step3_dir.exists(): - print(f"Errore: cartella step-3/ non trovata in {project_root}") - sys.exit(1) - stems = sorted(p.name for p in step3_dir.iterdir() if p.is_dir()) - if not stems: - print(f"Errore: nessun documento trovato in step-3/") - sys.exit(1) - - results = [process_stem(s, project_root, args.force) for s in stems] - - ok = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti revisionati") - - sys.exit(0 if all(results) else 1)