From e41fcae248e2be5e7849ab48fa17a6f9642598e3 Mon Sep 17 00:00:00 2001 From: Davide Grilli Date: Thu, 30 Apr 2026 14:59:55 +0200 Subject: [PATCH] refactor: modularizza pipeline in conversione/_pipeline/ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sostituisce i file monolitici pipeline.py e validate.py con il package _pipeline/ a responsabilità separate. Entry point unificato in __main__.py (convert + validate dallo stesso comando). Moduli aggiunti: - __main__.py — CLI unificata (--stem, --force, validate, --detail) - _pipeline/__init__.py — re-export pubblico - _pipeline/checker.py — validazione PDF - _pipeline/deps.py — verifica dipendenze Java + opendataloader - _pipeline/structure.py — analyze() + strategia chunking Moduli già committati in precedenza: - _pipeline/converter.py, transforms.py, report.py, runner.py, validator.py Aggiornamenti collaterali: - .gitignore: exception !conversione/_pipeline/** per tracciare il package - CLAUDE.md: documentazione aggiornata alla nuova architettura; fix riferimenti obsoleti a conversione/pipeline.py → conversione/ Co-Authored-By: Claude Sonnet 4.6 --- .gitignore | 2 + CLAUDE.md | 146 ++- conversione/__main__.py | 111 ++ conversione/_pipeline/__init__.py | 19 + conversione/_pipeline/checker.py | 51 + conversione/_pipeline/deps.py | 23 + conversione/_pipeline/structure.py | 141 +++ conversione/pipeline.py | 1603 ---------------------------- conversione/validate.py | 210 ---- 9 files changed, 464 insertions(+), 1842 deletions(-) create mode 100644 conversione/__main__.py create mode 100644 conversione/_pipeline/__init__.py create mode 100644 conversione/_pipeline/checker.py create mode 100644 conversione/_pipeline/deps.py create mode 100644 conversione/_pipeline/structure.py delete mode 100644 conversione/pipeline.py delete mode 100644 conversione/validate.py diff --git a/.gitignore b/.gitignore index e3783db..4ff0772 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,8 @@ Thumbs.db # Output conversione/ — generati da conversione/pipeline.py conversione/*/ +!conversione/_pipeline/ +!conversione/_pipeline/** # Output chunks/ — generati da chunks/chunker.py e chunks/verify_chunks.py chunks/*/ diff --git a/CLAUDE.md b/CLAUDE.md index 2fddce7..71427fe 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,69 +2,157 @@ This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. +## Missione + +Convertire PDF digitali in Markdown **perfetto per la vettorizzazione RAG**, senza revisione manuale. L'output deve essere testo pulito, strutturato in sezioni semanticamente coerenti, privo di artefatti, pronto per chunking e indicizzazione in un vector store. + +**Non supportato:** PDF scansionati (immagini), PDF protetti da password. + +--- + ## Regole invarianti - **Lingua:** Rispondi sempre in italiano. - **Venv:** Usa `.venv/bin/python` o `source .venv/bin/activate`. Mai `pip`/`python` di sistema. -- **`raw.md` immutabile:** La copia di lavoro è sempre `clean.md`. +- **`raw.md` immutabile:** Non modificare mai `raw.md`. La copia di lavoro è sempre `clean.md`. +- **Obiettivo zero revisioni:** ogni miglioramento alla pipeline deve ridurre i casi in cui il `clean.md` richiede correzioni manuali. --- -## Pipeline +## Setup -``` -PDF → conversione → clean.md -``` +```bash +python -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt -`--stem` = nome PDF senza estensione. +# Java 11+ richiesto da opendataloader-pdf +sudo apt install default-jdk # Ubuntu/Debian/WSL +java -version +``` --- ## Comandi ```bash -# Converti un PDF -python conversione/pipeline.py --stem +# Converti un PDF (posizionalo prima in sources/.pdf) +.venv/bin/python conversione/ --stem # Tutti i PDF in sources/ -python conversione/pipeline.py +.venv/bin/python conversione/ -# Forza riesecuzione -python conversione/pipeline.py --stem --force +# Forza riesecuzione (sovrascrive clean.md esistente) +.venv/bin/python conversione/ --stem --force -# Validazione batch di tutti gli stem convertiti -python conversione/validate.py +# Validazione batch di tutti gli stem +.venv/bin/python conversione/ validate + +# Validazione con dettaglio penalità +.venv/bin/python conversione/ validate --detail + +# Rimuove l'output di uno stem +bash conversione/clear.sh ``` +`--stem` = nome file PDF senza estensione. + --- ## Architettura -### `conversione/pipeline.py` +Il codice è organizzato in `conversione/__main__.py` (entry point) e il package `conversione/_pipeline/` (logica modulare). -Quattro fasi in sequenza: +``` +conversione/ +├── __main__.py # Entry point unificato: convert + validate +├── clear.sh # Rimuove output di uno stem +└── _pipeline/ + ├── __init__.py # Re-export pubblico + ├── deps.py # _check_deps() — verifica opendataloader-pdf e Java + ├── checker.py # check_pdf() — validazione PDF + ├── converter.py # convert_pdf() — wrapper opendataloader + ├── transforms.py # apply_transforms() + tutte le _t_* (~920 righe) + ├── structure.py # analyze() + rilevamento lingua e struttura + ├── report.py # build_report() → report.json + ├── runner.py # run() — orchestrazione 4 fasi + └── validator.py # validate() + _score() + _grade() +``` -1. **Validazione** — verifica che il PDF sia digitale, non protetto, non vuoto. -2. **Estrazione** — usa `opendataloader-pdf` (XY-Cut++) con Java 11+ per ricostruire l'ordine di lettura corretto, anche in documenti multi-colonna. -3. **Pulizia strutturale** — serie di trasformazioni su `raw.md`: fix accenti backtick LaTeX, rimozione TOC e dot-leader, normalizzazione header, unione paragrafi spezzati da salto pagina, rimozione URL watermark, ecc. -4. **Analisi struttura** — rileva gerarchia (`#`/`##`/`###`), lingua, lunghezza media sezioni e scrive `structure_profile.json`. +### `__main__.py` — entry point unificato -Output per ogni stem in `conversione//`: -- `raw.md` — grezzo, immutabile -- `clean.md` — copia di lavoro da revisionare con `/prepare-md` -- `structure_profile.json` — struttura rilevata + `strategia_chunking` (`h3_aware`, `h2_paragraph_split`, `paragraph`, `sliding_window`) -- `report.json` — metriche complete (trasformazioni, anomalie, distribuzione lunghezze) +CLI con due modalità: conversione (default, `--stem`, `--force`) e validazione (subcommand `validate`, con stems opzionali e `--detail`). Aggiunge `conversione/` a `sys.path` e delega a `_pipeline`. Uso: `python conversione/ [--stem X] [--force]` oppure `python conversione/ validate [X] [--detail]`. -### `conversione/validate.py` +### `_pipeline/transforms.py` — cuore della pipeline -Legge i `report.json` di tutti gli stem e stampa una tabella di stato. Segnala: bare header, sezioni corte/lunghe, backtick residui, dot-leader. +Contiene ~35 trasformazioni atomiche (`_t_*`) e l'orchestratore `apply_transforms(text) -> (text, stats)`. Le trasformazioni sono tenute in un unico file perché hanno dipendenze incrociate dense e il loro **ordine è semantico** — non modificarlo senza capire le dipendenze. -### `conversione/clear.sh` +Ordine logico dei gruppi (non separare): +1. **Encoding** — PUA font Symbol, accenti backtick LaTeX, moltiplicazione, micro +2. **Pulizia artefatti** — immagini, `
`, footnote superscript, URL, box symbol, righe ricorrenti, watermark +3. **Struttura header** — fix header+body concatenati, Capitolo inline, normalizzazione livelli numerati, `####`→`###`, bold, ALL-CAPS +4. **Costruzione struttura** — TOC rimosso, ALL-CAPS→`##`, sezioni numerata→`###`, ambienti matematici, articoli +5. **Testo** — merge paragrafi spezzati, whitespace, blank lines, poesia, versi +6. **Rifinitura** — header vuoti, garbage header, merge titoli isolati, frontmatter -Rimuove gli output di conversione per uno stem (`conversione//`). +Costanti di modulo (compilate una volta): `_SYMBOL_PUA_MAP`, `_SYMBOL_PUA_RE`, `_TABSEP_RE`, `_FM_RE`, `_VERSE_NUM_RE`, `_NUMBERED_HDR_RE`, `_BIB_MARKERS_RE`, `_WATERMARK_RE`, `_SUPERSCRIPT_RE`. + +Flag automatico: se il testo contiene "Esercizi/Problems/Homework", `_t_numbered_sections` non converte `- N. testo` in header (sono numerazioni di esercizi, non titoli). + +### `_pipeline/structure.py` — analisi struttura + +`analyze(md_path) -> dict` conta `#`/`##`/`###`, rileva lingua (it/en/fr/de/es), sceglie `strategia_chunking`: + +| Strategia | Condizione | +|-----------|------------| +| `h3_aware` | ≥5 `###` | +| `h2_paragraph_split` | ≥3 `##`, pochi `###` | +| `paragraph` | struttura rada | +| `sliding_window` | testo piatto | + +### `_pipeline/report.py` — metriche qualità + +`build_report()` genera `report.json` con: statistiche trasformazioni, struttura, distribuzione lunghezze sezioni (`min`/`p25`/`mediana`/`p75`/`max`), anomalie (bare headers, sezioni corte/lunghe), residui con esempi (backtick, dot-leader, URL, `
`, simboli encoding, formule inline, footnote, PUA). + +### `validate.py` — scoring + +Assegna un voto 0–100 (A/B/C/D/F) leggendo `report.json`. Penalità principali: + +| Problema | Penalità | Cap | +|----------|----------|-----| +| Struttura assente (livello 0) | −40 | — | +| Struttura piatta (livello 1) | −15 | — | +| Backtick residui | −2/cad | −20 | +| Caratteri PUA font Symbol | −2/cad | −20 | +| Dot-leader | −5/cad | −10 | +| URL/watermark | −5/cad | −15 | +| `
` inline | −2/cad | −15 | +| Bare headers | −3/cad | −15 | + +--- + +## Cosa rende un Markdown perfetto per la vettorizzazione + +- **Struttura semantica:** header Markdown = confini naturali dei chunk; ogni sezione è un'unità concettuale. +- **Testo pulito:** nessun backtick, dot-leader, footnote superscript, carattere PUA, `
`. +- **Paragrafi interi:** nessuna frase troncata da salto pagina PDF. +- **Formule e simboli:** lettere greche e operatori in Unicode standard, non in font-encoding privato. +- **Nessun rumore strutturale:** TOC, header/footer ripetuti, URL, watermark — tutto rimosso. +- **Gerarchia corretta:** h1/h2/h3 riflettono la struttura logica, non il layout tipografico. + +--- + +## Linee guida per migliorare la pipeline + +Quando si aggiunge una trasformazione in `apply_transforms()`: +- Ogni `_t_*` deve restituire `(testo, n_modifiche)` — il contatore alimenta `report.json`. +- Aggiungere la coppia `("stat_key", _t_nuova)` nella lista `_transforms` nel punto logicamente corretto (rispettare i gruppi sopra). +- Compilare i pattern regex a livello di modulo come costanti, non dentro la funzione. +- Testare con `.venv/bin/python conversione/ --stem --force` e confrontare `report.json`. +- Un nuovo tipo di artefatto: prima aggiungerlo come residuo in `report.py` (`_scan`), poi implementare la `_t_*` che lo rimuove. --- ## Skills custom -- `/prepare-md ` — corregge `clean.md`: sillabazione, artefatti, header, paragrafi spezzati, gerarchia. +- `/prepare-md ` — corregge `clean.md` quando la pipeline non basta: sillabazione, artefatti residui, header malformati, gerarchia incoerente. diff --git a/conversione/__main__.py b/conversione/__main__.py new file mode 100644 index 0000000..e552b18 --- /dev/null +++ b/conversione/__main__.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +""" +Pipeline PDF → clean Markdown per vettorizzazione RAG. + +Uso: + # Converti + python conversione/ --stem + python conversione/ --stem --force + python conversione/ # tutti i PDF in sources/ + + # Valida + python conversione/ validate + python conversione/ validate [ ...] --detail + +Prerequisiti: + pip install opendataloader-pdf pdfplumber + Java 11+ sul PATH (https://adoptium.net/) +""" + +import argparse +import sys +from pathlib import Path + +# Rende _pipeline importabile da conversione/ +sys.path.insert(0, str(Path(__file__).parent)) + +from _pipeline import _check_deps, run, validate + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="conversione", + description="PDF → clean Markdown strutturato, pronto per chunking RAG", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Esempi:\n" + " python conversione/ --stem manuale\n" + " python conversione/ --stem manuale --force\n" + " python conversione/ validate\n" + " python conversione/ validate manuale --detail" + ), + ) + + # ── Subcommand: validate ────────────────────────────────────────────── + sub = parser.add_subparsers(dest="cmd", metavar="comando") + val = sub.add_parser( + "validate", + help="valida i report.json prodotti dalla conversione", + description="Legge i report.json e assegna un voto 0-100 (A/B/C/D/F).", + ) + val.add_argument( + "stems", + nargs="*", + metavar="STEM", + help="stem da validare. Ometti per tutti.", + ) + val.add_argument( + "--detail", "-d", + action="store_true", + help="mostra il dettaglio delle penalità per ogni documento", + ) + + # ── Opzioni convert (modalità default) ─────────────────────────────── + parser.add_argument( + "--stem", + metavar="NOME", + help="nome del PDF in sources/ (senza estensione). Ometti per tutti.", + ) + parser.add_argument( + "--force", + action="store_true", + help="riesegui anche se clean.md è già presente", + ) + + return parser + + +def main() -> None: + parser = _build_parser() + args = parser.parse_args() + root = Path(__file__).parent.parent + + # ── Validate ───────────────────────────────────────────────────────── + if args.cmd == "validate": + validate(args.stems, root, detail=args.detail) + return + + # ── Convert (default) ──────────────────────────────────────────────── + _check_deps() + + if args.stem: + stems = [args.stem] + else: + sources_dir = root / "sources" + if not sources_dir.exists(): + print("Errore: cartella sources/ non trovata.") + sys.exit(1) + stems = sorted(p.stem for p in sources_dir.glob("*.pdf")) + if not stems: + print("Errore: nessun PDF trovato in sources/.") + sys.exit(1) + + results = [run(s, root, args.force) for s in stems] + ok = sum(results) + total = len(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti") + sys.exit(0 if all(results) else 1) + + +if __name__ == "__main__": + main() diff --git a/conversione/_pipeline/__init__.py b/conversione/_pipeline/__init__.py new file mode 100644 index 0000000..5ff882c --- /dev/null +++ b/conversione/_pipeline/__init__.py @@ -0,0 +1,19 @@ +from .deps import _check_deps +from .checker import check_pdf +from .converter import convert_pdf +from .transforms import apply_transforms +from .structure import analyze +from .report import build_report +from .runner import run +from .validator import validate + +__all__ = [ + "_check_deps", + "check_pdf", + "convert_pdf", + "apply_transforms", + "analyze", + "build_report", + "run", + "validate", +] diff --git a/conversione/_pipeline/checker.py b/conversione/_pipeline/checker.py new file mode 100644 index 0000000..87b667a --- /dev/null +++ b/conversione/_pipeline/checker.py @@ -0,0 +1,51 @@ +from pathlib import Path + + +def check_pdf(pdf_path: Path) -> tuple[bool, str]: + """Validazione rapida: esistenza, leggibilità, testo estraibile.""" + if not pdf_path.exists(): + return False, f"File non trovato: {pdf_path}" + if pdf_path.suffix.lower() != ".pdf": + return False, f"Non è un PDF: {pdf_path.name}" + size = pdf_path.stat().st_size + if size == 0: + return False, "File vuoto" + if size < 1024: + return False, f"File troppo piccolo ({size} byte) — probabilmente corrotto" + + try: + import pdfplumber + with pdfplumber.open(pdf_path) as pdf: + n_pages = len(pdf.pages) + if n_pages == 0: + return False, "PDF senza pagine" + sample = min(5, n_pages) + pages_with_text = sum( + 1 for i in range(sample) + if len((pdf.pages[i].extract_text() or "").strip()) > 50 + ) + if pages_with_text == 0: + extended = min(15, n_pages) + if extended > sample: + ext_with_text = sum( + 1 for i in range(sample, extended) + if len((pdf.pages[i].extract_text() or "").strip()) > 50 + ) + if ext_with_text > 0: + return True, ( + f"{n_pages} pagine — prime {sample} vuote, " + f"testo trovato in pagine successive " + f"(possibile copertina immagine)" + ) + return False, ( + f"Nessun testo nelle prime {extended} pagine " + f"— probabilmente scansionato (OCR non supportato)" + ) + return True, f"{n_pages} pagine, testo digitale confermato" + except MemoryError: + return False, "Memoria esaurita durante l'apertura del PDF" + except Exception as e: + msg = str(e).lower() + if "password" in msg or "encrypted" in msg: + return False, "PDF protetto da password" + return False, f"Impossibile aprire: {e}" diff --git a/conversione/_pipeline/deps.py b/conversione/_pipeline/deps.py new file mode 100644 index 0000000..04dec8a --- /dev/null +++ b/conversione/_pipeline/deps.py @@ -0,0 +1,23 @@ +import subprocess +import sys + + +def _check_deps() -> None: + try: + import opendataloader_pdf # noqa: F401 + except ImportError: + print("Errore: opendataloader-pdf non installato.") + print(" pip install opendataloader-pdf") + sys.exit(1) + + try: + result = subprocess.run( + ["java", "-version"], + capture_output=True, text=True, + ) + if result.returncode != 0: + raise FileNotFoundError + except FileNotFoundError: + print("Errore: Java 11+ non trovato sul PATH.") + print(" Installa da https://adoptium.net/") + sys.exit(1) diff --git a/conversione/_pipeline/structure.py b/conversione/_pipeline/structure.py new file mode 100644 index 0000000..fd4442c --- /dev/null +++ b/conversione/_pipeline/structure.py @@ -0,0 +1,141 @@ +import re +from pathlib import Path + +# ─── Rilevamento lingua ─────────────────────────────────────────────────────── + +_IT_WORDS = frozenset([ + "il", "la", "di", "e", "che", "non", "per", "un", "una", "si", + "con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le", + "gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era", + "erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così", +]) +_EN_WORDS = frozenset([ + "the", "of", "and", "to", "in", "is", "that", "it", "was", "for", + "on", "are", "as", "with", "his", "they", "at", "be", "this", "have", + "from", "or", "an", "but", "not", "by", "he", "she", "we", "you", + "which", "their", "been", "has", "would", "there", "when", "will", +]) +_FR_WORDS = frozenset([ + "le", "les", "de", "du", "des", "et", "un", "une", "est", "que", + "pour", "dans", "sur", "avec", "qui", "par", "pas", "plus", "au", + "ce", "se", "ou", "mais", "comme", "aussi", +]) +_DE_WORDS = frozenset([ + "der", "die", "das", "und", "in", "von", "zu", "den", "mit", "ist", + "auf", "eine", "als", "dem", "des", "sich", "nicht", "auch", "werden", + "bei", "nach", "oder", "wenn", "wird", "war", +]) +_ES_WORDS = frozenset([ + "el", "los", "las", "de", "en", "un", "una", "es", "que", "por", + "con", "del", "para", "como", "pero", "sus", "son", "los", "hay", + "todo", "esta", "este", "ser", "más", "ya", +]) + + +def _detect_language(text: str) -> str: + words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower()) + sample = words[:2000] + scores = { + "it": sum(1 for w in sample if w in _IT_WORDS), + "en": sum(1 for w in sample if w in _EN_WORDS), + "fr": sum(1 for w in sample if w in _FR_WORDS), + "de": sum(1 for w in sample if w in _DE_WORDS), + "es": sum(1 for w in sample if w in _ES_WORDS), + } + best = max(scores, key=scores.get) + return best if scores[best] > 0 else "unknown" + + +# ─── Analisi struttura ──────────────────────────────────────────────────────── + +def _count_headers(text: str, level: int) -> int: + prefix = "#" * level + " " + return len(re.findall(rf"(?m)^{re.escape(prefix)}", text)) + + +def _count_paragraphs(text: str) -> int: + blocks = re.split(r"\n{2,}", text) + return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip())) + + +def _split_sections(text: str, level: int) -> list[str]: + prefix = "#" * level + " " + parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text) + return [p for p in parts[1:] if p.strip()] + + +def _parse_sections_with_body(text: str, level: int = 3) -> list[tuple[str, str]]: + """Restituisce lista di (header_line, body_text) per tutti gli header al livello dato.""" + prefix = "#" * level + " " + lines = text.split("\n") + sections: list[tuple[str, str]] = [] + cur_hdr: str | None = None + cur_body: list[str] = [] + for line in lines: + if line.startswith(prefix): + if cur_hdr is not None: + sections.append((cur_hdr, "\n".join(cur_body).strip())) + cur_hdr = line + cur_body = [] + elif cur_hdr is not None: + cur_body.append(line) + if cur_hdr is not None: + sections.append((cur_hdr, "\n".join(cur_body).strip())) + return sections + + +def analyze(md_path: Path) -> dict: + text = md_path.read_text(encoding="utf-8") + n_h1 = _count_headers(text, 1) + n_h2 = _count_headers(text, 2) + n_h3 = _count_headers(text, 3) + n_paragrafi = _count_paragraphs(text) + + if n_h3 >= 5: + livello, boundary, strategia = 3, "h3", "h3_aware" + section_bodies = _split_sections(text, 3) + # Se h3 sono enormi e h2 più brevi, h2 è il boundary corretto + if n_h2 >= 3: + h2_bodies = _split_sections(text, 2) + avg_h3 = sum(len(b) for b in section_bodies) / len(section_bodies) if section_bodies else 0 + avg_h2 = sum(len(b) for b in h2_bodies) / len(h2_bodies) if h2_bodies else 0 + if avg_h3 > 5000 and avg_h2 < avg_h3 * 0.7: + livello, boundary, strategia = 2, "h2", "h2_paragraph_split" + section_bodies = h2_bodies + elif n_h2 >= 3: + livello, boundary, strategia = 2, "h2", "h2_paragraph_split" + section_bodies = _split_sections(text, 2) + elif n_h1 + n_h2 + n_h3 >= 1: + livello, boundary, strategia = 1, "paragrafo", "paragraph" + section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()] + elif n_paragrafi >= 3: + livello, boundary, strategia = 1, "paragrafo", "paragraph" + section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()] + else: + livello, boundary, strategia = 0, "nessuno", "sliding_window" + section_bodies = [text] if text.strip() else [] + + lengths = [len(b) for b in section_bodies if b.strip()] + lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0 + lingua = _detect_language(text) + + avvertenze = [] + short = sum(1 for l in lengths if l < 200) + long_ = sum(1 for l in lengths if l > 800) + if short: + avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate") + if long_: + avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise") + + return { + "livello_struttura": livello, + "n_h1": n_h1, + "n_h2": n_h2, + "n_h3": n_h3, + "n_paragrafi": n_paragrafi, + "boundary_primario": boundary, + "lingua_rilevata": lingua, + "lunghezza_media_sezione": lunghezza_media, + "strategia_chunking": strategia, + "avvertenze": avvertenze, + } diff --git a/conversione/pipeline.py b/conversione/pipeline.py deleted file mode 100644 index e657da0..0000000 --- a/conversione/pipeline.py +++ /dev/null @@ -1,1603 +0,0 @@ -#!/usr/bin/env python3 -""" -conversione/pipeline.py — PDF → clean Markdown (pipeline automatica) - -Converte un PDF grezzo in Markdown strutturato e pulito, pronto per la -suddivisione in chunk. Gestisce validazione, estrazione testo, pulizia -strutturale e rilevamento automatico della struttura del documento. - -Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura corretto, -testo fluente, struttura preservata). - -Output per ciascuno stem: - conversione//raw.md — Markdown grezzo (immutabile) - conversione//clean.md — Markdown pulito e strutturato - conversione//structure_profile.json - -Uso: - python conversione/pipeline.py --stem - python conversione/pipeline.py # tutti i PDF in sources/ - python conversione/pipeline.py --stem --force # forza riesecuzione - -Prerequisiti: - pip install opendataloader-pdf - Java 11+ sul PATH (https://adoptium.net/) -""" - -import argparse -import json -import re -import subprocess -import sys -import tempfile -from collections import Counter -from datetime import datetime -from functools import partial -from pathlib import Path - - -# ─── Verifica dipendenze ────────────────────────────────────────────────────── - -def _check_deps() -> None: - try: - import opendataloader_pdf # noqa: F401 - except ImportError: - print("Errore: opendataloader-pdf non installato.") - print(" pip install opendataloader-pdf") - sys.exit(1) - - try: - result = subprocess.run( - ["java", "-version"], - capture_output=True, text=True, - ) - if result.returncode != 0: - raise FileNotFoundError - except FileNotFoundError: - print("Errore: Java 11+ non trovato sul PATH.") - print(" Installa da https://adoptium.net/") - sys.exit(1) - - -# ─── [1] Validazione PDF ───────────────────────────────────────────────────── - -def check_pdf(pdf_path: Path) -> tuple[bool, str]: - """ - Validazione rapida: esistenza, leggibilità, testo estraibile. - Restituisce (ok, messaggio). - """ - if not pdf_path.exists(): - return False, f"File non trovato: {pdf_path}" - if pdf_path.suffix.lower() != ".pdf": - return False, f"Non è un PDF: {pdf_path.name}" - size = pdf_path.stat().st_size - if size == 0: - return False, "File vuoto" - if size < 1024: - return False, f"File troppo piccolo ({size} byte) — probabilmente corrotto" - - try: - import pdfplumber - with pdfplumber.open(pdf_path) as pdf: - n_pages = len(pdf.pages) - if n_pages == 0: - return False, "PDF senza pagine" - sample = min(5, n_pages) - pages_with_text = sum( - 1 for i in range(sample) - if len((pdf.pages[i].extract_text() or "").strip()) > 50 - ) - if pages_with_text == 0: - # Estende il campione: copertine immagine o pagine bianche iniziali - extended = min(15, n_pages) - if extended > sample: - ext_with_text = sum( - 1 for i in range(sample, extended) - if len((pdf.pages[i].extract_text() or "").strip()) > 50 - ) - if ext_with_text > 0: - return True, ( - f"{n_pages} pagine — prime {sample} vuote, " - f"testo trovato in pagine successive " - f"(possibile copertina immagine)" - ) - return False, ( - f"Nessun testo nelle prime {extended} pagine " - f"— probabilmente scansionato (OCR non supportato)" - ) - return True, f"{n_pages} pagine, testo digitale confermato" - except MemoryError: - return False, "Memoria esaurita durante l'apertura del PDF" - except Exception as e: - msg = str(e).lower() - if "password" in msg or "encrypted" in msg: - return False, "PDF protetto da password" - return False, f"Impossibile aprire: {e}" - - -# ─── [2] Conversione PDF → Markdown ───────────────────────────────────────── - -def convert_pdf(pdf_path: Path, out_dir: Path) -> Path: - """ - Converte il PDF in Markdown tramite opendataloader-pdf. - Scrive il file nella out_dir e restituisce il percorso. - - Parametri scelti per output RAG-ottimale: - - keep_line_breaks=False → testo fluente, no hard-wrap PDF - - reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++) - - sanitize=False → preserva il testo originale (no anonimizzazione PII) - """ - import opendataloader_pdf - - out_dir.mkdir(parents=True, exist_ok=True) - - opendataloader_pdf.convert( - input_path=str(pdf_path), - output_dir=str(out_dir), - format="markdown", - keep_line_breaks=False, - reading_order="xycut", - sanitize=False, - image_output="off", # nessuna immagine estratta né referenziata - quiet=True, # sopprime i log Java - ) - - # Il file output si chiama .md - md_file = out_dir / f"{pdf_path.stem}.md" - if not md_file.exists(): - candidates = list(out_dir.glob("*.md")) - if not candidates: - raise RuntimeError(f"Nessun file .md prodotto in {out_dir}") - md_file = candidates[0] - - content = md_file.read_text(encoding="utf-8", errors="replace").strip() - if len(content) < 100: - raise RuntimeError( - f"opendataloader ha prodotto un file .md quasi vuoto ({len(content)} char) " - f"— il PDF potrebbe essere corrotto o non supportato" - ) - - return md_file - - -# ─── [3] Pulizia strutturale ───────────────────────────────────────────────── - -_TOC_KEYWORDS = frozenset([ - "indice", "index", "contents", "table of contents", - "sommario", "inhaltsverzeichnis", "inhalt", - "indice generale", "indice analitico", "indice dei contenuti", - "elenco dei capitoli", "argomenti", "table des matières", - "tabla de contenidos", "содержание", -]) - -_ORDINALS_IT = { - "PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV", - "QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII", - "NONO": "IX", "DECIMO": "X", -} -_ORDINALS_EN = { - "ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5", - "SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10", -} - - -def _sentence_case(s: str) -> str: - if not s: - return s - lower = s.lower() - return lower[0].upper() + lower[1:] - - -def _is_allcaps_line(line: str) -> bool: - stripped = line.strip() - letters = [c for c in stripped if c.isalpha()] - return ( - len(letters) >= 3 - and all(c.isupper() for c in letters) - and not stripped.startswith("#") - and not stripped.startswith("|") # esclude righe tabella Markdown - ) - - -def _allcaps_to_header(raw_line: str) -> str: - # Rimuovi eventuale prefisso di lista "- " o "* " prima di creare l'header - text = re.sub(r"^[-*+]\s+", "", raw_line.strip()) - text = text.rstrip(".").rstrip("?").strip() - - _ORD_IT_PAT = "|".join(_ORDINALS_IT.keys()) - m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text) - if m: - roman = _ORDINALS_IT[m.group(1)] - titolo = m.group(2).rstrip(".").rstrip("?").strip() - return f"## Capitolo {roman} — {_sentence_case(titolo)}" - - _ORD_EN_PAT = "|".join(_ORDINALS_EN.keys()) - m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text) - if m: - n = _ORDINALS_EN.get(m.group(1), m.group(1)) - titolo = m.group(2).rstrip(".").rstrip("?").strip() - return f"## Chapter {n} — {_sentence_case(titolo)}" - - m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text) - if m: - return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}" - - return f"## {_sentence_case(text)}" - - -def _extract_math_environments(text: str) -> tuple[str, int]: - """ - Converte paragrafi che iniziano con ambienti matematici in header ###. - - 'Teorema 1.6.3 (principio di induzione) Sia A ⊆ N...' - → '### Teorema 1.6.3 (principio di induzione)\n\nSia A ⊆ N...' - - Riconosce: Definizione, Teorema, Lemma, Proposizione, Corollario, - Osservazione, Nota, Esempio (solo con numero di sezione). - Non tocca paragrafi che già iniziano con un header Markdown. - Deve girare PRIMA del merge paragrafi (step 5) per sfruttare i blocchi intatti. - """ - _ENVS = ( - r"Definizione|Definition|Teorema|Theorem|Lemma|" - r"Proposizione|Proposition|Corollario|Corollary|" - r"Osservazione|Remark|Nota|Note|Esempio|Example" - ) - count = 0 - blocks = text.split("\n\n") - result = [] - - for block in blocks: - stripped = block.strip() - if not stripped or stripped.startswith("#"): - result.append(block) - continue - - m = re.match( - rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)", - stripped, - re.DOTALL, - ) - if not m: - result.append(block) - continue - - env = m.group(1) - num = m.group(2).rstrip(".") - rest = m.group(3).strip() - - # Titolo opzionale tra parentesi: "(principio di induzione)" - title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL) - if title_m: - header = f"### {env} {num} {title_m.group(1)}" - body = title_m.group(2).strip() - else: - header = f"### {env} {num}." - body = rest - - result.append(f"{header}\n\n{body}" if body else header) - count += 1 - - return "\n\n".join(result), count - - -def _merge_title_headers(text: str) -> tuple[str, int]: - """ - Fonde header numerici isolati con il sottotitolo breve che li segue. - - '### N.\n\nSottotitolo (riga singola ≤ 80 char, senza punto finale)' - → '### N. Sottotitolo' - - Caso tipico: parti di un'opera (es. Nietzsche) dove il numero di sezione - e il titolo della sezione sono in blocchi Markdown separati. - Non tocca header con titolo già inline né header seguiti da testo lungo. - """ - count = 0 - blocks = re.split(r"\n{2,}", text) - result = [] - i = 0 - while i < len(blocks): - block = blocks[i] - stripped = block.strip() - if ( - re.match(r"^#{2,3} \d+\.\s*$", stripped) - and i + 1 < len(blocks) - ): - nxt = blocks[i + 1].strip() - # Sottotitolo valido: riga singola, ≤ 80 char, non header, non numerazione pura - if ( - nxt - and "\n" not in nxt - and len(nxt) <= 80 - and not nxt.startswith("#") - and not re.match(r"^\d+[\.\)]\s", nxt) - ): - result.append(stripped.rstrip() + " " + nxt) - count += 1 - i += 2 - continue - result.append(block) - i += 1 - return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count - - -def _extract_article_headers(text: str) -> tuple[str, int]: - """ - Converte voci di articolo dal formato lista Markdown al formato header ###. - - '- Art. N[suffix]. Titolo. Corpo testo...' → '### Art. N[suffix]. Titolo.\n\nCorpo testo...' - '- Art. N[suffix]. (…) (1)' → '### Art. N[suffix].\n\n(…) (1)' - - Gestisce suffissi come: Art. 4-bis., Art. 14-ter., Art. 1-quinquies. - Il titolo è la prima frase con iniziale maiuscola che termina con '.' prima di - ulteriore testo (es. "Leggi. La formazione..." → titolo "Leggi", corpo "La formazione..."). - Se il testo non ha titolo separabile, tutto diventa il corpo. - """ - count = 0 - - def _repl(m: re.Match) -> str: - nonlocal count - num = m.group(1) - rest = m.group(2).strip() - - # Titolo: frase con iniziale maiuscola, max 75 char, termina con '.', - # seguita da almeno un'altra frase (minimo 5 char) che inizia con maiuscola - # o con '(' / cifra (note a piè o continuazione corpo). - title_m = re.match( - r"^([A-ZÀÈÉÌÍÒÓÙÚ].{1,74}?)\.\s+([A-ZÀÈÉÌÍÒÓÙÚ\(\d].{4,})", - rest, - ) - if title_m: - count += 1 - return ( - f"### Art. {num}. {title_m.group(1)}.\n\n" - f"{title_m.group(2).strip()}" - ) - - # Nessun titolo separabile: tutto è corpo - if rest: - count += 1 - return f"### Art. {num}.\n\n{rest}" - - # Articolo senza testo inline (es. "- Art. 5. (…) (1)" già estratto sopra, - # oppure articolo vuoto nella lista) - count += 1 - return f"### Art. {num}." - - text = re.sub( - r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)", - _repl, - text, - flags=re.MULTILINE, - ) - return text, count - - -# ─── [3a] Funzioni di trasformazione ───────────────────────────────────────── - -# Mapping PUA Unicode (U+F020-U+F0FF) → simboli corretti per font Symbol/Wingdings. -# Il font Symbol di Windows codifica lettere greche e operatori matematici nel -# range Private Use Area invece dei codepoint Unicode standard. -_SYMBOL_PUA_MAP: dict[str, str] = { - "\uf020": " ", # space - "\uf028": "(", - "\uf029": ")", - "\uf02b": "+", - "\uf02d": "\u2212", # minus - "\uf02e": ".", - "\uf02f": "/", - "\uf030": "0", "\uf031": "1", "\uf032": "2", "\uf033": "3", "\uf034": "4", - "\uf035": "5", "\uf036": "6", "\uf037": "7", "\uf038": "8", "\uf039": "9", - "\uf03a": ":", "\uf03b": ";", "\uf03c": "<", "\uf03d": "=", "\uf03e": ">", - "\uf040": "\u2245", # congruent - "\uf041": "\u0391", # Alpha - "\uf042": "\u0392", # Beta - "\uf043": "\u03a7", # Chi - "\uf044": "\u0394", # Delta - "\uf045": "\u0395", # Epsilon - "\uf046": "\u03a6", # Phi - "\uf047": "\u0393", # Gamma - "\uf048": "\u0397", # Eta - "\uf049": "\u0399", # Iota - "\uf04a": "\u03d1", # theta variant - "\uf04b": "\u039a", # Kappa - "\uf04c": "\u039b", # Lambda - "\uf04d": "\u039c", # Mu - "\uf04e": "\u039d", # Nu - "\uf04f": "\u039f", # Omicron - "\uf050": "\u03a0", # Pi - "\uf051": "\u0398", # Theta - "\uf052": "\u03a1", # Rho - "\uf053": "\u03a3", # Sigma - "\uf054": "\u03a4", # Tau - "\uf055": "\u03a5", # Upsilon - "\uf056": "\u03c2", # sigma final - "\uf057": "\u03a9", # Omega - "\uf058": "\u039e", # Xi - "\uf059": "\u03a8", # Psi - "\uf05a": "\u0396", # Zeta - "\uf05b": "[", - "\uf05c": "\u2234", # therefore - "\uf05d": "]", - "\uf05e": "\u22a5", # perpendicular - "\uf061": "\u03b1", # alpha - "\uf062": "\u03b2", # beta - "\uf063": "\u03c7", # chi - "\uf064": "\u03b4", # delta - "\uf065": "\u03b5", # epsilon - "\uf066": "\u03c6", # phi - "\uf067": "\u03b3", # gamma - "\uf068": "\u03b7", # eta - "\uf069": "\u03b9", # iota - "\uf06a": "\u03d5", # phi variant - "\uf06b": "\u03ba", # kappa - "\uf06c": "\u03bb", # lambda - "\uf06d": "\u03bc", # mu - "\uf06e": "\u03bd", # nu - "\uf06f": "\u03bf", # omicron - "\uf070": "\u03c0", # pi - "\uf071": "\u03b8", # theta - "\uf072": "\u03c1", # rho - "\uf073": "\u03c3", # sigma - "\uf074": "\u03c4", # tau - "\uf075": "\u03c5", # upsilon - "\uf076": "\u03d6", # pi symbol - "\uf077": "\u03c9", # omega - "\uf078": "\u03be", # xi - "\uf079": "\u03c8", # psi - "\uf07a": "\u03b6", # zeta - "\uf07b": "{", - "\uf07c": "|", - "\uf07d": "}", - "\uf07e": "~", - "\uf0b1": "\u00b1", # plus-minus - "\uf0b7": "\u2022", # bullet - "\uf0ba": "\u221a", # square root - "\uf0bc": "\u2264", # less or equal - "\uf0bd": "\u2265", # greater or equal - "\uf0be": "\u221d", # proportional - "\uf0d7": "\u00d7", # multiplication - "\uf0f7": "\u00f7", # division - "\uf0b4": "\u00d7", # alternate multiply - "\uf0bb": "\u2260", # not equal - "\uf0b9": "\u2260", # not equal alternate - "\uf0b3": "\u2265", # greater or equal alternate - "\uf0b2": "\u2032", # prime - "\uf02a": "*", - "\uf02c": ",", - "\uf0a3": "\u2264", # less or equal (Symbol 0xA3) - "\uf0a7": "\u2022", # bullet (Wingdings 0xA7) - "\uf0a8": "\u2022", # bullet variant - "\uf0ae": "\u2192", # right arrow (Symbol 0xAE) - "\uf0b8": "\u00f7", # division / range separator - "\uf0eb": "", # Wingdings decorative icon (rimosso) - "\uf0f0": "\u2192", # right arrow variant - "\uf0db": "", # bracket extension piece (non ricostruibile) - "\uf0dc": "", # bracket extension piece - "\uf0dd": "", # bracket extension piece - "\uf0de": "", # brace middle piece (non ricostruibile) - "\uf0df": "", # brace extension piece -} - -_SYMBOL_PUA_RE = re.compile( - "[" + "".join(re.escape(k) for k in _SYMBOL_PUA_MAP) + "]" -) - - -def _t_fix_symbol_font(text: str) -> tuple[str, int]: - """Rimappa caratteri PUA font Symbol (U+F020-U+F0FF) in simboli Unicode corretti.""" - count = [0] - - def _repl(m: re.Match) -> str: - count[0] += 1 - return _SYMBOL_PUA_MAP[m.group(0)] - - result = _SYMBOL_PUA_RE.sub(_repl, text) - return result, count[0] - - -def _t_remove_images(text: str) -> tuple[str, int]: - n = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text)) - text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text) - return text, n - - -# Superscript Unicode: ¹²³⁴⁵⁶⁷⁸⁹⁰ -_SUPERSCRIPT_RE = re.compile(r'[\u00b9\u00b2\u00b3\u2070\u2074-\u2079]+') -# Riga corpo-nota: inizia con superscript o [N] -_FOOTNOTE_BODY_RE = re.compile( - r'^([\u00b9\u00b2\u00b3\u2070\u2074-\u2079]+\s+|\[\d{1,3}\]\s+)' -) - - -def _t_remove_footnotes(text: str) -> tuple[str, int]: - """Rimuovi marcatori footnote superscript inline e righe corpo-nota.""" - lines = text.split("\n") - result, count = [], 0 - for line in lines: - stripped = line.strip() - # Corpo nota: riga breve che inizia con ¹ o [N] - if stripped and _FOOTNOTE_BODY_RE.match(stripped) and len(stripped) < 300: - count += 1 - continue - cleaned = _SUPERSCRIPT_RE.sub("", line) - if cleaned != line: - count += 1 - result.append(cleaned) - return "\n".join(result), count - - -def _t_fix_br(text: str) -> tuple[str, int]: - n = len(re.findall(r"
", text, re.IGNORECASE)) - text = re.sub(r"
\s*", " ", text, flags=re.IGNORECASE) - return text, n - - -def _t_fix_tabsep(text: str) -> tuple[str, int]: - _pat = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$") - n = len(_pat.findall(text)) - text = _pat.sub("", text) - return text, n - - -def _t_fix_accents(text: str) -> tuple[str, int]: - """Fix artefatti backtick da PDF LaTeX: `e→è, e`→è, sar`a→sarà, ecc.""" - _ACCENT_MAP = { - "e": "è", "E": "È", "a": "à", "A": "À", - "u": "ù", "U": "Ù", "i": "ì", "I": "Ì", "o": "ò", "O": "Ò", - } - n_bt_before = text.count("`") - text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text) - text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text) - n_accenti = n_bt_before - text.count("`") - # Backtick orfani: artefatti LaTeX rimasti dopo la correzione vocale - n_bt_orfani = text.count("`") - if n_bt_orfani: - text = re.sub(r"`", "", text) - n_accenti += n_bt_orfani - return text, n_accenti - - -def _t_fix_multiplication(text: str) -> tuple[str, int]: - """Fix segno di moltiplicazione "→× (encoding font PDF non-standard).""" - n = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text)) - text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text) - return text, n - - -def _t_fix_micro(text: str) -> tuple[str, int]: - """Fix prefisso micro !→µ prima di unità SI note.""" - _SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]' - n = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text)) - text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text) - return text, n - - -def _t_remove_formula_labels(text: str) -> tuple[str, int]: - """Rimuovi label formule inline [N.M] — es. [3.4], [10.7].""" - n = len(re.findall(r"\[\d+\.\d+\]", text)) - text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text) - return text, n - - -def _t_remove_dotleaders(text: str) -> tuple[str, int]: - """Rimuovi righe con dot-leader e numerali romani isolati (footer TOC).""" - _DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$" - n = len(re.findall(_DOTLEADER_RE, text, re.MULTILINE)) - text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE) - text = re.sub( - r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$", - "", - text, - flags=re.IGNORECASE, - ) - return text, n - - -def _t_fix_header_concat(text: str) -> tuple[str, int]: - """Fix header + body concatenati senza separatore.""" - count = 0 - - def _fix(m: re.Match) -> str: - nonlocal count - hashes = m.group(1) - full = m.group(2).strip() - if len(full) < 60: - return m.group(0) - skip = min(10, len(full) // 3) - split = re.search(r"(?<=[a-zàèéìíòóùúä])(?=[A-ZÀÈÉÌÍÒÓÙÚ])", full[skip:]) - if split: - pos = skip + split.start() - title = full[:pos].strip() - body = full[pos:].strip() - if len(title) >= 5 and len(body) >= 15: - count += 1 - return f"{hashes} {title}\n\n{body}" - return m.group(0) - - text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix, text, flags=re.MULTILINE) - return text, count - - -def _t_extract_capitolo(text: str) -> tuple[str, int]: - """Estrai 'Capitolo N: TITOLO' inline nel corpo del testo → ## header.""" - def _repl(m: re.Match) -> str: - num = m.group(1) - titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip()) - return f"\n\n## Capitolo {num}: {titolo}\n\n" - - text = re.sub( - r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-ZÀÈÉÌÍÒÓÙÚ\'L][A-ZÀÈÉÌÍÒÓÙÚ\s\'\.,\(\)]{5,80}?)" - r"(?=\s*[-–]\s*\d|\s*\n|\s*$)", - _repl, - text, - ) - return text, 0 - - -_NUMBERED_HDR_RE = re.compile( - r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$", - re.MULTILINE, -) - - -def _t_normalize_numbered_headings(text: str) -> tuple[str, int]: - """Corregge livelli header per documenti con numerazione decimale. - - Assegna livello heading in base alla profondità numerica usando come base - il livello corrente degli header di profondità minima. - Attivo solo se il documento ha almeno 2 profondità di numerazione. - """ - all_matches = list(_NUMBERED_HDR_RE.finditer(text)) - if not all_matches: - return text, 0 - - pairs = [ - (m.group(2).count(".") + 1, len(m.group(1))) - for m in all_matches - ] - depths = [d for d, _ in pairs] - min_depth, max_depth = min(depths), max(depths) - if max_depth == min_depth: - return text, 0 - - base_level = min(lv for d, lv in pairs if d == min_depth) - count = 0 - - def _repl(m: re.Match) -> str: - nonlocal count - hashes, num, title = m.group(1), m.group(2), m.group(3) - depth = num.count(".") + 1 - new_level = min(base_level + (depth - min_depth), 6) - if new_level == len(hashes): - return m.group(0) - count += 1 - return f"{'#' * new_level} {num}. {title}" - - return _NUMBERED_HDR_RE.sub(_repl, text), count - - -def _t_normalize_header_levels(text: str) -> tuple[str, int]: - """Normalizza h4+ → h3; rimuove header vuoti; rimuove numero pagina '| N' finale.""" - text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE) - text = re.sub( - r"^(#{3,6})\s+(\d{1,3})\s+(.+)$", - lambda m: f"### {m.group(2)}. {m.group(3)}", - text, - flags=re.MULTILINE, - ) - text = re.sub(r"^#{4,6}\s+(.+)$", r"### \1", text, flags=re.MULTILINE) - return text, 0 - - -def _t_extract_articles(text: str) -> tuple[str, int]: - """Converti voci articolo '- Art. N.' → '### Art. N.'""" - return _extract_article_headers(text) - - -def _t_remove_header_bold(text: str) -> tuple[str, int]: - """Rimuovi **bold** negli header esistenti.""" - text = re.sub( - r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$", - r"\1 \2", - text, flags=re.MULTILINE, - ) - return text, 0 - - -def _t_normalize_allcaps_headers(text: str) -> tuple[str, int]: - """Normalizza header ALL-CAPS → sentence-case.""" - def _norm(m: re.Match) -> str: - hashes, content = m.group(1), m.group(2).strip() - letters = [c for c in content if c.isalpha()] - if letters and all(c.isupper() for c in letters): - return f"{hashes} {_sentence_case(content)}" - return m.group(0) - - text = re.sub(r"^(#{1,6}) (.+)$", _norm, text, flags=re.MULTILINE) - return text, 0 - - -def _t_remove_toc(text: str) -> tuple[str, int]: - """Rimuovi header TOC e voci lista numerate che seguono.""" - lines = text.split("\n") - new_lines = [] - _in_toc = False - removed = False - for line in lines: - bare = re.sub(r"^#+\s*", "", line.strip()) - first_word = bare.split(".")[0].strip().lower() - if first_word in _TOC_KEYWORDS: - removed = True - _in_toc = True - continue - if _in_toc: - if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line): - continue - # Voce TOC con numero pagina finale (sicuro: siamo gia in contesto TOC) - if re.match(r"^\s*[-*+]\s+.{2,70}\s+\d{1,3}\s*$", line): - continue - # Riga di testo lungo = probabilmente abstract o corpo, non voce di indice - if len(line.strip()) > 200: - _in_toc = False - new_lines.append(line) - continue - _in_toc = False - new_lines.append(line) - return "\n".join(new_lines), 1 if removed else 0 - - - -def _t_allcaps_to_headers(text: str) -> tuple[str, int]: - """Converti righe ALL-CAPS standalone → ## header.""" - count = 0 - blocks = text.split("\n\n") - new_blocks = [] - for block in blocks: - stripped = block.strip() - if "\n" not in stripped and _is_allcaps_line(stripped): - new_blocks.append(_allcaps_to_header(stripped)) - count += 1 - else: - sub_lines = block.split("\n") - converted = [] - for ln in sub_lines: - if _is_allcaps_line(ln) and len(ln.strip()) > 3: - converted.append(_allcaps_to_header(ln)) - count += 1 - else: - converted.append(ln) - new_blocks.append("\n".join(converted)) - return "\n\n".join(new_blocks), count - - -_BIB_MARKERS_RE = re.compile( - r'\b(pp?\.|vol\.|n\.\s*\d|ed\.|edn\.|ISBN|DOI|arXiv)\b' - r'|\b(19|20)\d{2}\b', - re.IGNORECASE, -) - - -def _t_numbered_sections(text: str, has_exercises: bool = False) -> tuple[str, int]: - """Converti sezioni numerate 'N. testo' / '- N. testo' / '- N testo' → ### header.""" - count = 0 - - def _num_repl(m: re.Match) -> str: - nonlocal count - content = m.group(2).strip() - if content.endswith(".") and len(content) > 40: - return m.group(0) - if _BIB_MARKERS_RE.search(content): - return m.group(0) - count += 1 - return f"### {m.group(1)}.\n\n{content}" - - text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE) - - def _num_letter_repl(m: re.Match) -> str: - nonlocal count - count += 1 - return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}" - - text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE) - - # Disabilitato se il documento contiene sezioni "Esercizi": in quel caso i - # "- N. testo" sono numerazioni di esercizi, non header di sezione. - if not has_exercises: - def _aphorism_repl(m: re.Match) -> str: - nonlocal count - content = m.group(2).strip() - if _BIB_MARKERS_RE.search(content): - return m.group(0) - count += 1 - return f"\n\n### {m.group(1)}.\n\n{content}" - - text = re.sub( - r"^-\s+(\d{1,3})\.\s+(.{10,})$", - _aphorism_repl, - text, - flags=re.MULTILINE, - ) - - def _list_section_repl(m: re.Match) -> str: - nonlocal count - num = m.group(1) - content = m.group(2).strip() - if _BIB_MARKERS_RE.search(content): - return m.group(0) - count += 1 - split = re.search(r"(?<=[a-zàèéìíòóùú])\s+(?=[A-ZÀÈÉÌÍÒÓÙÚ])", content) - if split and split.start() >= 3: - title = content[: split.start()].strip() - body = content[split.end():].strip() - if len(body) >= 20: - return f"\n\n### {num}. {title}\n\n{body}" - return f"\n\n### {num}. {content}" - - text = re.sub( - r"^-\s+(\d{1,3})\s+([A-ZÀÈÉÌÍÒÓÙÚ\'L].{10,})$", - _list_section_repl, - text, - flags=re.MULTILINE, - ) - return text, count - - -def _t_extract_math(text: str) -> tuple[str, int]: - """Converti ambienti matematici (Teorema/Definizione/...) → ### header.""" - return _extract_math_environments(text) - - -def _t_merge_paragraphs(text: str) -> tuple[str, int]: - """Unisci paragrafi spezzati da salti pagina PDF.""" - _SENTENCE_END = set(".?!»)\"'") - blocks = text.split("\n\n") - merged = [] - count = 0 - i = 0 - while i < len(blocks): - b = blocks[i] - stripped = b.strip() - while ( - i + 1 < len(blocks) - and stripped - and not stripped.startswith("#") - and not stripped.startswith("|") # non unire righe tabella in avanti - and stripped[-1] not in _SENTENCE_END - ): - nxt = blocks[i + 1].strip() - if not nxt or nxt.startswith("#") or nxt.startswith("|") or re.match(r"^\d+\.", nxt) or re.match(r"^[-*+]\s", nxt): - break - b = stripped + " " + nxt - stripped = b.strip() - count += 1 - i += 1 - merged.append(b) - i += 1 - text = "\n\n".join(merged) - # Secondo pass: rimuovi prefisso |---| eventualmente rimasto dopo il merge - text = re.sub(r"(?m)^\|---\|\s*", "", text) - return text, count - - -def _t_normalize_whitespace(text: str) -> tuple[str, int]: - """Normalizza whitespace multiplo interno alle righe.""" - lines = text.split("\n") - text = "\n".join( - re.sub(r" +", " ", line) if line.strip() else line - for line in lines - ) - return text, 0 - - -def _t_collapse_blank_lines(text: str) -> tuple[str, int]: - """Riduci righe vuote multiple a doppie.""" - return re.sub(r"\n{3,}", "\n\n", text), 0 - - -def _t_demote_verse_headers(text: str) -> tuple[str, int]: - """Demoti header che sono in realtà terzine/versi. - - opendataloader promuove a ## le iscrizioni e i testi in evidenza nel PDF - (corpo maggiore, centrato). Si riconoscono perché: - - terminano con un numero nudo (numero di verso: 3, 6, 9, …) - - contengono punteggiatura interna di fine verso (', ' o '. ') - Esempio: '## «per me si va ne la città dolente, ... gente. 3' - → paragrafo normale senza il numero finale. - """ - count = 0 - - def _demote(m: re.Match) -> str: - nonlocal count - hashes, content = m.group(1), m.group(2).strip() - # Deve terminare con numero nudo (numero di verso ≤ 9999) - if not re.search(r"\s\d{1,4}\s*$", content): - return m.group(0) - # Deve contenere punteggiatura interna (è un blocco di più versi) - inner = re.sub(r"\s\d{1,4}\s*$", "", content) - if not re.search(r"[,;:.!?»\"\']\s+[A-Za-zÀ-ÿ«\"]", inner): - return m.group(0) - count += 1 - # Rimuovi il numero di verso finale e restituisci come testo normale - clean = re.sub(r"\s\d{1,4}\s*$", "", content) - return clean - - text = re.sub( - r"^(#{1,6})\s+(.{20,})$", - _demote, - text, - flags=re.MULTILINE, - ) - return text, count - - -def _t_restore_poetry_lines(text: str) -> tuple[str, int]: - """Ripristina line break di poesia distrutti da keep_line_breaks=False. - - Quando il PDF è poesia (terzine dantesche, sonetti, ecc.) opendataloader - con keep_line_breaks=False produce un unico paragrafo con i numeri di verso - (3, 6, 9 … oppure 1, 2, 3 …) incorporati inline: - 'smarrita. 3 Ahi quanto a dir qual era è cosa dura … paura! 6 Tant'è …' - - Il transform rileva blocchi con numeri di verso in progressione aritmetica - e li separa in righe, con riga vuota ogni 3 versi (terzina). - """ - count = 0 - blocks = text.split("\n\n") - result = [] - - # Pattern: numero isolato preceduto da punteggiatura-fine-verso e seguito - # da lettera maiuscola (inizio verso successivo). - _VERSE_NUM_RE = re.compile( - r'([.!?»\'\"]\s+)(\d+)(\s+)(?=[A-ZÀ-Ùa-zà-ù«"‟])' - ) - - for block in blocks: - stripped = block.strip() - if not stripped or stripped.startswith("#"): - result.append(block) - continue - - matches = list(_VERSE_NUM_RE.finditer(stripped)) - if len(matches) < 2: - result.append(block) - continue - - nums = [int(m.group(2)) for m in matches] - diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)] - # Accetta progressioni con passo costante 1–5 (terzine: 3, endecasillabi: 1) - if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5): - result.append(block) - continue - - step = diffs[0] - - def _replace_verse_num(m: re.Match) -> str: - n = int(m.group(2)) - # Ogni 'step' versi → riga vuota (inizio nuova terzina/strofa) - sep = "\n\n" if n % (step * 3) == 0 else "\n" - return m.group(1).rstrip() + sep - - new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped) - if new_block != stripped: - count += len(matches) - result.append(new_block) - - return "\n\n".join(result), count - - -def _t_remove_urls(text: str) -> tuple[str, int]: - """Rimuovi righe che sono solo URL (watermark, footer di piattaforme).""" - return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0 - - -def _t_remove_empty_headers(text: str) -> tuple[str, int]: - """Rimuovi header senza corpo (sezioni vuote / watermark).""" - blocks = re.split(r"\n{2,}", text) - cleaned = [] - for i, block in enumerate(blocks): - stripped = block.strip() - if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped: - next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else "" - # Non rimuovere un header breve se il successivo è un header molto lungo - # (> 80 char): quasi certamente è testo PDF mal classificato come heading. - next_is_long_header = ( - re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80 - ) - if not next_stripped or ( - re.match(r"^#{1,6} ", next_stripped) and not next_is_long_header - ): - continue - cleaned.append(block) - return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0 - - -def _t_merge_title_headers(text: str) -> tuple[str, int]: - """Fondi header numerici isolati con il sottotitolo breve successivo.""" - return _merge_title_headers(text) - - -def _t_remove_garbage_headers(text: str) -> tuple[str, int]: - """Rimuovi garbage headers: simboli, abbreviazioni matematiche, frammenti formula.""" - def _is_garbage_header(content: str) -> bool: - if content.lstrip().startswith("..."): - return True - if not re.search(r"[A-Za-zÀ-ÿ\u0391-\u03c9]{2,}", content): - return True - if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()): - return True - if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content): - return True - # Frammento di frase: inizia con minuscola ed e abbastanza lungo - first_alpha = next((c for c in content if c.isalpha()), None) - if first_alpha and first_alpha.islower() and len(content) > 40: - return True - # Formula matematica: variabile singola (o breve) seguita da = o operatore - if re.match(r"^[A-Za-z\u0391-\u03c9_]{1,3}\s*[=<>≤≥]", content.strip()): - return True - # Didascalia figura/tabella: "Figura N..." o "Figure N..." o "Tabella N..." - if re.match(r"^(Figura|Figure|Fig\.|Tabella|Table|Tab\.)\s+\d", content.strip(), re.IGNORECASE): - return True - return False - - count = 0 - lines = text.split("\n") - new_lines = [] - for line in lines: - m = re.match(r"^#{1,6} (.+)$", line) - if m and _is_garbage_header(m.group(1)): - count += 1 - continue - new_lines.append(line) - text = "\n".join(new_lines) - text = re.sub(r"\n{3,}", "\n\n", text) - return text, count - - -def _t_remove_frontmatter(text: str) -> tuple[str, int]: - """Rimuovi sezioni frontmatter: URL, email, affiliazione, copyright.""" - _FM_RE = re.compile( - r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|" - r"\bCopyright\b|\bLicenza\b|\bEdizione\b|" - r"protetto da|tutti i diritti", - re.IGNORECASE, - ) - blocks = re.split(r"\n{2,}", text) - cleaned = [] - count = 0 - total = len(blocks) - cutoff = max(5, min(15, int(total * 0.20))) - for i, block in enumerate(blocks): - stripped = block.strip() - # Frontmatter compare solo nelle prime sezioni del documento - if i >= cutoff: - cleaned.append(block) - continue - if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped): - cleaned.append(block) - continue - body = blocks[i + 1].strip() if i + 1 < len(blocks) else "" - is_fm_body = len(body) < 250 and _FM_RE.search(body) - is_fm_hdr = _FM_RE.search(stripped) - if is_fm_body or is_fm_hdr: - count += 1 - continue - cleaned.append(block) - return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count - - -_WATERMARK_RE = re.compile( - r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN" - r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$", - re.IGNORECASE | re.MULTILINE, -) - - -def _t_remove_watermarks(text: str) -> tuple[str, int]: - """Rimuovi righe standalone con testo watermark comune.""" - lines = text.split("\n") - result, count = [], 0 - for line in lines: - if _WATERMARK_RE.match(line): - count += 1 - else: - result.append(line) - return "\n".join(result), count - - -def _t_fix_math_symbols(text: str) -> tuple[str, int]: - """Rimuovi righe composte solo da simboli box/placeholder (font non estratti).""" - lines = text.split("\n") - result, count = [], 0 - for line in lines: - if line.strip() and re.match(r"^[\s□■▪▫◆◇●○•\u25a0-\u25ff]+$", line): - count += 1 - else: - result.append(line) - return "\n".join(result), count - - -def _t_remove_recurring_lines(text: str) -> tuple[str, int]: - """Rimuovi righe corte che si ripetono ≥5 volte (header/footer di pagina).""" - lines = text.split("\n") - short_lines = [ - ln.strip() for ln in lines - if 3 < len(ln.strip()) < 80 - and not ln.strip().startswith("#") - and not ln.strip().startswith("|") - ] - freq = Counter(short_lines) - recurring = {ln for ln, c in freq.items() if c >= 5} - if not recurring: - return text, 0 - result, count = [], 0 - for line in lines: - if line.strip() in recurring: - count += 1 - else: - result.append(line) - return "\n".join(result), count - - -# ─── [3b] Pipeline delle trasformazioni ────────────────────────────────────── - -def apply_transforms(text: str) -> tuple[str, dict]: - """ - Applica le trasformazioni strutturali al Markdown grezzo. - Restituisce (testo_modificato, statistiche). - """ - # Flag calcolato prima del loop: disabilita il transform 4b nei documenti - # con sezioni "Esercizi" (i "- N. testo" sarebbero numerazioni, non header). - _has_ex = bool(re.search(r"\b(Esercizi|Exercises|Problems|Homework)\b", text, re.IGNORECASE)) - - _transforms: list[tuple[str | None, object]] = [ - ("n_simboli_pua_corretti", _t_fix_symbol_font), - ("n_immagini_rimosse", _t_remove_images), - ("n_br_rimossi", _t_fix_br), - ("n_tabsep_rimossi", _t_fix_tabsep), - ("n_note_rimosse", _t_remove_footnotes), - ("n_accenti_corretti", _t_fix_accents), - ("n_moltiplicazioni_corrette", _t_fix_multiplication), - ("n_micro_corretti", _t_fix_micro), - ("n_simboli_math_rimossi", _t_fix_math_symbols), - ("n_formule_rimossi", _t_remove_formula_labels), - ("n_dotleader_rimossi", _t_remove_dotleaders), - ("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines), - ("n_header_concat_fixati", _t_fix_header_concat), - (None, _t_extract_capitolo), - ("n_header_numerati_normalizzati", _t_normalize_numbered_headings), - (None, _t_normalize_header_levels), - ("n_articoli_estratti", _t_extract_articles), - (None, _t_remove_header_bold), - (None, _t_normalize_allcaps_headers), - ("toc_rimosso", _t_remove_toc), - ("n_header_allcaps", _t_allcaps_to_headers), - ("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)), - ("n_ambienti_matematici", _t_extract_math), - ("n_paragrafi_uniti", _t_merge_paragraphs), - (None, _t_normalize_whitespace), - (None, _t_collapse_blank_lines), - ("n_versi_ripristinati", _t_restore_poetry_lines), - ("n_header_verso_demotati", _t_demote_verse_headers), - (None, _t_remove_urls), - (None, _t_remove_empty_headers), - ("n_titoli_uniti", _t_merge_title_headers), - (None, lambda t: (re.sub(r"(?m)^(#{1,6}.+?)\s*\|\s*\d{1,3}\s*$", r"\1", t), 0)), - ("n_garbage_headers_rimossi", _t_remove_garbage_headers), - ("n_frontmatter_rimossi", _t_remove_frontmatter), - ("n_watermark_rimossi", _t_remove_watermarks), - ] - - stats: dict = {} - for stat_key, fn in _transforms: - text, n = fn(text) - if stat_key: - stats[stat_key] = stats.get(stat_key, 0) + n - - stats["toc_rimosso"] = bool(stats.get("toc_rimosso", 0)) - return text, stats - - -# ─── [4] Rilevamento struttura ─────────────────────────────────────────────── - -_IT_WORDS = frozenset([ - "il", "la", "di", "e", "che", "non", "per", "un", "una", "si", - "con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le", - "gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era", - "erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così", -]) -_EN_WORDS = frozenset([ - "the", "of", "and", "to", "in", "is", "that", "it", "was", "for", - "on", "are", "as", "with", "his", "they", "at", "be", "this", "have", - "from", "or", "an", "but", "not", "by", "he", "she", "we", "you", - "which", "their", "been", "has", "would", "there", "when", "will", -]) -_FR_WORDS = frozenset([ - "le", "les", "de", "du", "des", "et", "un", "une", "est", "que", - "pour", "dans", "sur", "avec", "qui", "par", "pas", "plus", "au", - "ce", "se", "ou", "mais", "comme", "aussi", -]) -_DE_WORDS = frozenset([ - "der", "die", "das", "und", "in", "von", "zu", "den", "mit", "ist", - "auf", "eine", "als", "dem", "des", "sich", "nicht", "auch", "werden", - "bei", "nach", "oder", "wenn", "wird", "war", -]) -_ES_WORDS = frozenset([ - "el", "los", "las", "de", "en", "un", "una", "es", "que", "por", - "con", "del", "para", "como", "pero", "sus", "son", "los", "hay", - "todo", "esta", "este", "ser", "más", "ya", -]) - - -def _detect_language(text: str) -> str: - words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower()) - sample = words[:2000] - scores = { - "it": sum(1 for w in sample if w in _IT_WORDS), - "en": sum(1 for w in sample if w in _EN_WORDS), - "fr": sum(1 for w in sample if w in _FR_WORDS), - "de": sum(1 for w in sample if w in _DE_WORDS), - "es": sum(1 for w in sample if w in _ES_WORDS), - } - best = max(scores, key=scores.get) - return best if scores[best] > 0 else "unknown" - - -def _count_headers(text: str, level: int) -> int: - prefix = "#" * level + " " - return len(re.findall(rf"(?m)^{re.escape(prefix)}", text)) - - -def _count_paragraphs(text: str) -> int: - blocks = re.split(r"\n{2,}", text) - return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip())) - - -def _split_sections(text: str, level: int) -> list[str]: - prefix = "#" * level + " " - parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text) - return [p for p in parts[1:] if p.strip()] - - -def _parse_sections_with_body(text: str, level: int = 3) -> list[tuple[str, str]]: - """Restituisce lista di (header_line, body_text) per tutti gli header al livello dato.""" - prefix = "#" * level + " " - lines = text.split("\n") - sections: list[tuple[str, str]] = [] - cur_hdr: str | None = None - cur_body: list[str] = [] - for line in lines: - if line.startswith(prefix): - if cur_hdr is not None: - sections.append((cur_hdr, "\n".join(cur_body).strip())) - cur_hdr = line - cur_body = [] - elif cur_hdr is not None: - cur_body.append(line) - if cur_hdr is not None: - sections.append((cur_hdr, "\n".join(cur_body).strip())) - return sections - - -def analyze(md_path: Path) -> dict: - text = md_path.read_text(encoding="utf-8") - n_h1 = _count_headers(text, 1) - n_h2 = _count_headers(text, 2) - n_h3 = _count_headers(text, 3) - n_paragrafi = _count_paragraphs(text) - - if n_h3 >= 5: - livello, boundary, strategia = 3, "h3", "h3_aware" - section_bodies = _split_sections(text, 3) - # Gerarchia invertita: h3 sono capitoli enormi, h2 sono sottosezioni più brevi. - # Succede quando opendataloader classifica titoli capitolo come h6 (→ normalizzati - # a h3) e le sottosezioni ALL-CAPS diventano ## (h2). In questo caso h2 è - # il boundary corretto per il chunking. - if n_h2 >= 3: - h2_bodies = _split_sections(text, 2) - avg_h3 = sum(len(b) for b in section_bodies) / len(section_bodies) if section_bodies else 0 - avg_h2 = sum(len(b) for b in h2_bodies) / len(h2_bodies) if h2_bodies else 0 - if avg_h3 > 5000 and avg_h2 < avg_h3 * 0.7: - livello, boundary, strategia = 2, "h2", "h2_paragraph_split" - section_bodies = h2_bodies - elif n_h2 >= 3: - livello, boundary, strategia = 2, "h2", "h2_paragraph_split" - section_bodies = _split_sections(text, 2) - elif n_h1 + n_h2 + n_h3 >= 1: - livello, boundary, strategia = 1, "paragrafo", "paragraph" - section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()] - elif n_paragrafi >= 3: - livello, boundary, strategia = 1, "paragrafo", "paragraph" - section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()] - else: - livello, boundary, strategia = 0, "nessuno", "sliding_window" - section_bodies = [text] if text.strip() else [] - - lengths = [len(b) for b in section_bodies if b.strip()] - lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0 - lingua = _detect_language(text) - - avvertenze = [] - short = sum(1 for l in lengths if l < 200) - long_ = sum(1 for l in lengths if l > 800) - if short: - avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate") - if long_: - avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise") - - return { - "livello_struttura": livello, - "n_h1": n_h1, - "n_h2": n_h2, - "n_h3": n_h3, - "n_paragrafi": n_paragrafi, - "boundary_primario": boundary, - "lingua_rilevata": lingua, - "lunghezza_media_sezione": lunghezza_media, - "strategia_chunking": strategia, - "avvertenze": avvertenze, - } - - -# ─── Report di conversione ─────────────────────────────────────────────────── - -def build_report( - stem: str, - out_dir: Path, - clean_text: str, - t_stats: dict, - profile: dict, - reduction: float, -) -> Path: - """ - Genera conversione//report.json con tutte le metriche di qualità: - statistiche trasformazioni, struttura, distribuzione lunghezze, anomalie - e problemi residui. Leggibile da validate.py per la validazione batch. - """ - text_lines = clean_text.split("\n") - - # ── Raccolta sezioni ### con corpo ──────────────────────────────────── - sections = _parse_sections_with_body(clean_text, 3) - lengths = [len(body) for _, body in sections] - - # ── Distribuzione lunghezze ─────────────────────────────────────────── - def _pct(data: list[int], p: float) -> int: - if not data: - return 0 - s = sorted(data) - return s[max(0, min(len(s) - 1, int(len(s) * p)))] - - distribution = { - "min": min(lengths) if lengths else 0, - "p25": _pct(lengths, 0.25), - "mediana": _pct(lengths, 0.50), - "p75": _pct(lengths, 0.75), - "max": max(lengths) if lengths else 0, - } - - # ── Anomalie ────────────────────────────────────────────────────────── - bare_hdrs = [ - {"header": hdr, "corpo_inizio": body[:120].replace("\n", " ")} - for hdr, body in sections - if re.match(r"^### \d+\.\s*$", hdr) and len(body.strip()) < 30 - ] - - short_secs = [ - {"header": hdr, "chars": length, "testo": body[:80].replace("\n", " ")} - for (hdr, body), length in zip(sections, lengths) - if 0 < length < 150 - ] - - long_secs = [ - {"header": hdr, "chars": length} - for (hdr, _), length in zip(sections, lengths) - if length > 1500 - ] - - # ── Problemi residui (max 10 esempi ciascuno) ───────────────────────── - def _scan(pattern: str, max_n: int = 10) -> list[dict]: - hits = [] - for i, line in enumerate(text_lines): - if re.search(pattern, line) and not re.match(r"^#+ ", line): - hits.append({"riga": i + 1, "testo": line.strip()[:120]}) - if len(hits) >= max_n: - break - return hits - - residui = { - "backtick": _scan(r"`"), - "dotleader": _scan(r"(?:\. ){3,}"), - "url": _scan(r"^(https?://|www\.)\S+"), - "immagini": _scan(r"!\[[^\]]*\]\([^)]*\)"), - "br_inline": _scan(r"
"), - "simboli_encoding": _scan(r'(?<=[0-9A-Za-z])[!"](?=[0-9A-Za-z])'), - "formule_inline": _scan(r"\[\d+\.\d+\]"), - "footnote_markers": _scan(r'[\u00b9\u00b2\u00b3\u2070\u2074-\u2079]'), - "pua_markers": _scan(r'[\ue000-\uf8ff]'), - } - - # ── Composizione report ─────────────────────────────────────────────── - report = { - "stem": stem, - "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"), - "transforms": { - **t_stats, - "riduzione_pct": round(reduction), - }, - "structure": profile, - "distribution": distribution, - "anomalie": { - "bare_headers": len(bare_hdrs), - "short_sections": len(short_secs), - "long_sections": len(long_secs), - "bare_headers_list": bare_hdrs, - "short_sections_list": short_secs, - "long_sections_list": long_secs, - }, - "residui": { - "backtick": len(residui["backtick"]), - "dotleader": len(residui["dotleader"]), - "url": len(residui["url"]), - "immagini": len(residui["immagini"]), - "br_inline": len(residui["br_inline"]), - "simboli_encoding": len(residui["simboli_encoding"]), - "formule_inline": len(residui["formule_inline"]), - "footnote_markers": len(residui["footnote_markers"]), - "pua_markers": len(residui["pua_markers"]), - "backtick_esempi": residui["backtick"], - "dotleader_esempi": residui["dotleader"], - "url_esempi": residui["url"], - "immagini_esempi": residui["immagini"], - "br_inline_esempi": residui["br_inline"], - "simboli_encoding_esempi": residui["simboli_encoding"], - "formule_inline_esempi": residui["formule_inline"], - "footnote_markers_esempi": residui["footnote_markers"], - "pua_markers_esempi": residui["pua_markers"], - }, - } - - report_path = out_dir / "report.json" - report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") - return report_path - - -# ─── Pipeline principale ────────────────────────────────────────────────────── - -def run(stem: str, project_root: Path, force: bool) -> bool: - pdf_path = project_root / "sources" / f"{stem}.pdf" - out_dir = project_root / "conversione" / stem - raw_out = out_dir / "raw.md" - clean_out = out_dir / "clean.md" - - print(f"\n{'─' * 52}") - print(f" {stem}") - print(f"{'─' * 52}") - - if clean_out.exists() and not force: - print(f" ⚠️ conversione/{stem}/clean.md già presente — skip") - print(f" (usa --force per rieseguire)") - return True - - # ── [1] Validazione ──────────────────────────────────────────────────── - print(" [1/4] Validazione PDF...") - ok, msg = check_pdf(pdf_path) - if not ok: - print(f" ✗ {msg}") - return False - print(f" ✅ {msg}") - - # ── [2] Conversione ──────────────────────────────────────────────────── - print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...") - with tempfile.TemporaryDirectory() as tmp: - try: - md_file = convert_pdf(pdf_path, Path(tmp)) - except MemoryError: - print(" ✗ Memoria esaurita durante la conversione") - return False - except Exception as e: - print(f" ✗ Conversione fallita: {e}") - return False - try: - raw_text = md_file.read_text(encoding="utf-8") - except UnicodeDecodeError as e: - print(f" ✗ Errore encoding nel file prodotto: {e}") - return False - - size_kb = len(raw_text.encode()) // 1024 - n_lines = raw_text.count("\n") - print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe") - - # ── [3] Pulizia strutturale ──────────────────────────────────────────── - print(" [3/4] Pulizia strutturale...") - clean_text, t_stats = apply_transforms(raw_text) - reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0 - print(f" ✅ Simboli PUA corretti: {t_stats['n_simboli_pua_corretti']}") - print(f" Immagini rimosse: {t_stats['n_immagini_rimosse']}") - print(f" Note rimossa: {t_stats['n_note_rimosse']}") - print(f" Accenti corretti: {t_stats['n_accenti_corretti']}") - print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}") - print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}") - print(f" Header num. normaliz.: {t_stats['n_header_numerati_normalizzati']}") - print(f" Articoli → ###: {t_stats['n_articoli_estratti']}") - print(f" Ambienti matematici: {t_stats['n_ambienti_matematici']}") - print(f" Titoli header uniti: {t_stats['n_titoli_uniti']}") - print(f" TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'}") - print(f" Versi poesia riprist.: {t_stats['n_versi_ripristinati']}") - print(f" Header verso demotati: {t_stats['n_header_verso_demotati']}") - print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}") - print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}") - print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}") - print(f" Riduzione testo: {reduction:.0f}%") - - # ── [4] Profilo strutturale ──────────────────────────────────────────── - print(" [4/4] Analisi struttura...") - try: - out_dir.mkdir(parents=True, exist_ok=True) - raw_out.write_text(raw_text, encoding="utf-8") - clean_out.write_text(clean_text, encoding="utf-8") - except PermissionError as e: - print(f" ✗ Permesso negato durante la scrittura: {e}") - return False - profile = analyze(clean_out) - (out_dir / "structure_profile.json").write_text( - json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8" - ) - - _LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"} - print(f" ✅ Struttura: livello {profile['livello_struttura']} — {_LIVELLO_DESC[profile['livello_struttura']]}") - print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} " - f"paragrafi={profile['n_paragrafi']}") - print(f" Strategia chunking: {profile['strategia_chunking']}") - print(f" Lingua rilevata: {profile['lingua_rilevata']}") - for w in profile["avvertenze"]: - print(f" ⚠️ {w}") - - build_report(stem, out_dir, clean_text, t_stats, profile, reduction) - - print(f"\n Output:") - print(f" conversione/{stem}/raw.md (immutabile)") - print(f" conversione/{stem}/clean.md") - print(f" conversione/{stem}/report.json") - print(f"\n clean.md pronto per la suddivisione in chunk.") - return True - - -# ─── Entry point ───────────────────────────────────────────────────────────── - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser( - description="Pipeline PDF → clean Markdown strutturato, pronto per chunking", - epilog="Prerequisiti: pip install opendataloader-pdf + Java 11+ sul PATH", - ) - parser.add_argument( - "--stem", - help="Nome del documento (PDF in sources/.pdf). " - "Se omesso, elabora tutti i PDF in sources/.", - ) - parser.add_argument( - "--force", - action="store_true", - help="Riesegui anche se clean.md è già presente", - ) - args = parser.parse_args() - - _check_deps() - - if args.stem: - stems = [args.stem] - else: - sources_dir = project_root / "sources" - if not sources_dir.exists(): - print("Errore: cartella sources/ non trovata") - sys.exit(1) - stems = sorted(p.stem for p in sources_dir.glob("*.pdf")) - if not stems: - print("Errore: nessun PDF trovato in sources/") - sys.exit(1) - - results = [run(s, project_root, args.force) for s in stems] - ok = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti") - sys.exit(0 if all(results) else 1) diff --git a/conversione/validate.py b/conversione/validate.py deleted file mode 100644 index f2c1ead..0000000 --- a/conversione/validate.py +++ /dev/null @@ -1,210 +0,0 @@ -#!/usr/bin/env python3 -""" -conversione/validate.py — Validazione qualità Markdown - -Legge i report.json prodotti da pipeline.py, stampa una tabella di stato -e assegna un voto (0-100) a ogni documento. - - 90-100 A — ottimo, pronto per il chunker - 75-89 B — buono, qualche sezione lunga ma accettabile - 60-74 C — accettabile, anomalie minori da verificare - 40-59 D — da rivedere, problemi strutturali o residui evidenti - 0-39 F — da riprocessare, struttura assente o testo corrotto - -Uso: - python conversione/validate.py # tutti gli stem - python conversione/validate.py analisi1 # stem specifico - python conversione/validate.py a b c # stem multipli - python conversione/validate.py --detail analisi1 # mostra dettaglio penalità -""" - -import argparse -import json -import sys -from pathlib import Path - - -# ─── Punteggio ─────────────────────────────────────────────────────────────── - -_GRADES = [(90, "A"), (75, "B"), (60, "C"), (40, "D"), (0, "F")] - - -def _score(r: dict) -> tuple[int, list[str]]: - """ - Calcola un punteggio 0-100 sulla qualità del clean.md ai fini della - suddivisione in chunk e vettorizzazione. - Restituisce (score, lista_penalità_applicate). - - Penalità struttura (il chunker non può operare senza header): - struttura assente (livello 0) → −40 - struttura piatta (livello 1) → −15 - - Penalità residui (finiscono nei vettori e degradano il retrieval): - backtick → −2/cad (max −20) - dot-leader → −5/cad (max −10) - URL / watermark → −5/cad (max −15) - immagini residue → −5/cad (max −10) -
inline (artefatti tabelle) → −2/cad (max −15) - simboli encoding (!/" residui) → −1/cad (max −10) - formule inline [N.M] → −1/cad (max −8) - - Penalità anomalie: - bare headers → −3/cad (max −15) - - Non penalizzate (il chunker le normalizza): - sezioni corte, sezioni lunghe, mediana, p25 - """ - score = 100 - detail = [] - structure = r.get("structure", {}) - anomalie = r.get("anomalie", {}) - residui = r.get("residui", {}) - - livello = structure.get("livello_struttura", 0) - - # ── Struttura ───────────────────────────────────────────────────────── - if livello == 0: - score -= 40 - detail.append("struttura assente −40") - elif livello == 1: - score -= 15 - detail.append("struttura piatta −15") - - # ── Residui ─────────────────────────────────────────────────────────── - def _pen(key: str, per_item: int, cap: int, label: str) -> None: - n = residui.get(key, 0) - if n: - p = min(cap, n * per_item) - nonlocal score - score -= p - detail.append(f"{label} ×{n} −{p}") - - _pen("backtick", 2, 20, "backtick") - _pen("dotleader", 5, 10, "dot-leader") - _pen("url", 5, 15, "url") - _pen("immagini", 5, 10, "immagini") - _pen("br_inline", 2, 15, "
inline") - _pen("simboli_encoding", 1, 10, "simboli encoding") - _pen("formule_inline", 1, 8, "formule inline") - _pen("footnote_markers", 1, 8, "footnote residui") - _pen("pua_markers", 2, 20, "caratteri PUA font Symbol") - - # ── Anomalie ────────────────────────────────────────────────────────── - n_bare = anomalie.get("bare_headers", 0) - if n_bare: - p = min(15, n_bare * 3) - score -= p - detail.append(f"bare headers ×{n_bare} −{p}") - - return max(0, score), detail - - -def _grade(score: int) -> str: - return next(g for threshold, g in _GRADES if score >= threshold) - - -# ─── Validazione ───────────────────────────────────────────────────────────── - -def validate(stems: list[str], project_root: Path, detail: bool = False) -> None: - conv_dir = project_root / "conversione" - - paths = ( - [conv_dir / s / "report.json" for s in stems] - if stems - else sorted(conv_dir.glob("*/report.json")) - ) - - if not paths: - print("Nessun report.json trovato in conversione/*/") - sys.exit(0) - - rows = [ - json.loads(p.read_text(encoding="utf-8")) if p.exists() - else {"stem": p.parent.name, "_missing": True} - for p in paths - ] - - # ── Intestazione ───────────────────────────────────────────────────── - col = max(len(r.get("stem", "stem")) for r in rows) + 2 - header = ( - f"{'stem':<{col}}" - f"{'h2':>4}{'h3':>5} " - f"{'strategia':<18}" - f"{'bare':>5}{'corte':>6}{'lunghe':>7}" - f"{'btk':>5}{'br':>4}{'enc':>4}{'url':>4}" - f"{'med':>6}" - f" {'voto':>4} grade" - ) - sep = "─" * len(header) - print(f"\n{header}\n{sep}") - - scores = [] - - # ── Righe ───────────────────────────────────────────────────────────── - for r in rows: - if r.get("_missing"): - print(f"{r['stem']:<{col}} (report.json non trovato)") - continue - - st = r.get("structure", {}) - an = r.get("anomalie", {}) - res = r.get("residui", {}) - dist = r.get("distribution", {}) - s, pen = _score(r) - scores.append(s) - - print( - f"{r['stem']:<{col}}" - f"{st.get('n_h2', 0):>4}" - f"{st.get('n_h3', 0):>5} " - f"{st.get('strategia_chunking','?'):<18}" - f"{an.get('bare_headers', 0):>5}" - f"{an.get('short_sections', 0):>6}" - f"{an.get('long_sections', 0):>7}" - f"{res.get('backtick', 0):>5}" - f"{res.get('br_inline', 0):>4}" - f"{res.get('simboli_encoding', 0):>4}" - f"{res.get('url', 0):>4}" - f"{dist.get('mediana', 0):>6}" - f" {s:>4} {_grade(s)}" - ) - - if detail and pen: - for p in pen: - print(f" {'':>{col}} ↳ {p}") - - # ── Riepilogo ───────────────────────────────────────────────────────── - print(sep) - if scores: - media = sum(scores) / len(scores) - print( - f"Documenti: {len(scores)} " - f"Media: {media:.0f}/100 {_grade(int(media))} " - f"(A≥90 B≥75 C≥60 D≥40 F<40)" - ) - print( - "\nColonne: bare=header vuoti corte=sez<150ch lunghe=sez>1500ch " - "btk=backtick br=
inline enc=simboli encoding med=mediana chars\n" - ) - - -# ─── Entry point ───────────────────────────────────────────────────────────── - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Valida i report Markdown prodotti da pipeline.py", - epilog="Senza argomenti valida tutti gli stem in conversione/*/", - ) - parser.add_argument( - "stems", - nargs="*", - metavar="STEM", - help="stem da validare (es: analisi1). Ometti per tutti.", - ) - parser.add_argument( - "--detail", "-d", - action="store_true", - help="mostra dettaglio penalità per ogni documento", - ) - args = parser.parse_args() - validate(args.stems, Path(__file__).parent.parent, detail=args.detail)