From b7994100e72a6c7a08e7434da58fdcd57fb63315 Mon Sep 17 00:00:00 2001 From: Davide Grilli Date: Thu, 16 Apr 2026 15:27:45 +0200 Subject: [PATCH] =?UTF-8?q?feat(pdf-to-md):=20aggiungi=20pipeline=20automa?= =?UTF-8?q?tica=20PDF=20=E2=86=92=20Markdown=20pulito?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nuova cartella conversione/ con pipeline.py che sostituisce step-0+1+2+3+4 in un singolo comando senza operazioni manuali. Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura). Trasformazioni strutturali: - accenti backtick da PDF LaTeX (es. `e→è, puo`→può) - rimozione dot-leader TOC e numerali romani pagina (i, ii, iii) - normalizzazione header a gerarchia uniforme h1/h2/h3 - conversione sezioni numerate e aforismi → header ### - rilevamento sezioni Esercizi → disabilita conversione numerazioni - watermark URL rimossi, header vuoti scartati --- .gitignore | 3 + conversione/pipeline.py | 690 ++++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 + 3 files changed, 697 insertions(+) create mode 100644 conversione/pipeline.py diff --git a/.gitignore b/.gitignore index 69458fa..0b18250 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,6 @@ step-5/*/ # Output step-6 — report generati da verify_chunks.py step-6/*/ +# Output conversione/ — generati da conversione/pipeline.py +conversione/*/ + diff --git a/conversione/pipeline.py b/conversione/pipeline.py new file mode 100644 index 0000000..5ed919d --- /dev/null +++ b/conversione/pipeline.py @@ -0,0 +1,690 @@ +#!/usr/bin/env python3 +""" +conversion/pipeline.py — PDF → clean Markdown (pipeline automatica) + +Sostituisce step-0 + step-1 + step-2 + step-3 + step-4 in un solo comando, +senza operazioni manuali. + +Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura corretto, +testo fluente, struttura preservata) al posto di pymupdf4llm. + +Output (compatibile con step-5+): + conversion//raw.md — output grezzo opendataloader (immutabile) + conversion//clean.md — MD pulito e strutturato + conversion//structure_profile.json + +Uso: + python conversion/pipeline.py --stem + python conversion/pipeline.py # tutti i PDF in sources/ + python conversion/pipeline.py --stem --force # forza riesecuzione + +Prerequisiti: + pip install opendataloader-pdf + Java 11+ sul PATH (https://adoptium.net/) +""" + +import argparse +import json +import re +import subprocess +import sys +import tempfile +from pathlib import Path + + +# ─── Verifica dipendenze ────────────────────────────────────────────────────── + +def _check_deps() -> None: + try: + import opendataloader_pdf # noqa: F401 + except ImportError: + print("Errore: opendataloader-pdf non installato.") + print(" pip install opendataloader-pdf") + sys.exit(1) + + try: + result = subprocess.run( + ["java", "-version"], + capture_output=True, text=True, + ) + if result.returncode != 0: + raise FileNotFoundError + except FileNotFoundError: + print("Errore: Java 11+ non trovato sul PATH.") + print(" Installa da https://adoptium.net/") + sys.exit(1) + + +# ─── [1] Validazione PDF (step-0 + step-1) ──────────────────────────────────── + +def check_pdf(pdf_path: Path) -> tuple[bool, str]: + """ + Validazione rapida: esistenza, leggibilità, testo estraibile. + Restituisce (ok, messaggio). + """ + if not pdf_path.exists(): + return False, f"File non trovato: {pdf_path}" + if pdf_path.suffix.lower() != ".pdf": + return False, f"Non è un PDF: {pdf_path.name}" + if pdf_path.stat().st_size == 0: + return False, "File vuoto" + + try: + import pdfplumber + with pdfplumber.open(pdf_path) as pdf: + n_pages = len(pdf.pages) + if n_pages == 0: + return False, "PDF senza pagine" + sample = min(5, n_pages) + pages_with_text = sum( + 1 for i in range(sample) + if len((pdf.pages[i].extract_text() or "").strip()) > 50 + ) + if pages_with_text == 0: + return False, ( + f"Nessun testo nelle prime {sample} pagine " + f"— probabilmente scansionato (usa modalità hybrid)" + ) + return True, f"{n_pages} pagine, testo digitale confermato" + except Exception as e: + msg = str(e).lower() + if "password" in msg or "encrypted" in msg: + return False, "PDF protetto da password" + return False, f"Impossibile aprire: {e}" + + +# ─── [2] Conversione PDF → Markdown (step-2) ───────────────────────────────── + +def convert_pdf(pdf_path: Path, out_dir: Path) -> Path: + """ + Converte il PDF in Markdown tramite opendataloader-pdf. + Scrive il file nella out_dir e restituisce il percorso. + + Parametri scelti per output RAG-ottimale: + - keep_line_breaks=False → testo fluente, no hard-wrap PDF + - reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++) + - sanitize=False → preserva il testo originale (no anonimizzazione PII) + """ + import opendataloader_pdf + + out_dir.mkdir(parents=True, exist_ok=True) + + opendataloader_pdf.convert( + input_path=str(pdf_path), + output_dir=str(out_dir), + format="markdown", + keep_line_breaks=False, + reading_order="xycut", + sanitize=False, + image_output="off", # nessuna immagine estratta né referenziata + quiet=True, # sopprime i log Java + ) + + # Il file output si chiama .md + md_file = out_dir / f"{pdf_path.stem}.md" + if not md_file.exists(): + candidates = list(out_dir.glob("*.md")) + if not candidates: + raise RuntimeError(f"Nessun file .md prodotto in {out_dir}") + md_file = candidates[0] + + return md_file + + +# ─── [3] Pulizia strutturale (step-4 / revise.py) ──────────────────────────── +# +# Logica identica a step-4/revise.py — mantenuta sincronizzata. + +_TOC_KEYWORDS = frozenset([ + "indice", "index", "contents", "table of contents", + "sommario", "inhaltsverzeichnis", "inhalt", +]) + +_ORDINALS_IT = { + "PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV", + "QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII", + "NONO": "IX", "DECIMO": "X", +} +_ORDINALS_EN = { + "ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5", + "SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10", +} + + +def _sentence_case(s: str) -> str: + if not s: + return s + lower = s.lower() + return lower[0].upper() + lower[1:] + + +def _is_allcaps_line(line: str) -> bool: + stripped = line.strip() + letters = [c for c in stripped if c.isalpha()] + return ( + len(letters) >= 3 + and all(c.isupper() for c in letters) + and not stripped.startswith("#") + ) + + +def _allcaps_to_header(raw_line: str) -> str: + text = raw_line.strip().rstrip(".").rstrip("?").strip() + + _ORD_IT_PAT = "|".join(_ORDINALS_IT.keys()) + m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text) + if m: + roman = _ORDINALS_IT[m.group(1)] + titolo = m.group(2).rstrip(".").rstrip("?").strip() + return f"## Capitolo {roman} — {_sentence_case(titolo)}" + + _ORD_EN_PAT = "|".join(_ORDINALS_EN.keys()) + m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text) + if m: + n = _ORDINALS_EN.get(m.group(1), m.group(1)) + titolo = m.group(2).rstrip(".").rstrip("?").strip() + return f"## Chapter {n} — {_sentence_case(titolo)}" + + m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text) + if m: + return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}" + + return f"## {_sentence_case(text)}" + + +def apply_transforms(text: str) -> tuple[str, dict]: + """ + Applica le trasformazioni strutturali al Markdown grezzo. + Restituisce (testo_modificato, statistiche). + """ + stats = { + "toc_rimosso": False, + "n_immagini_rimosse": 0, + "n_accenti_corretti": 0, + "n_dotleader_rimossi": 0, + "n_header_concat_fixati": 0, + "n_header_allcaps": 0, + "n_sezioni_numerate": 0, + "n_paragrafi_uniti": 0, + } + + # 0. Rimuovi riferimenti immagini (artefatti opendataloader-pdf) + stats["n_immagini_rimosse"] = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text)) + text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text) + + # 0a. Fix artefatti backtick da PDF LaTeX: `e→è, e`→è, sar`a→sarà, ecc. + # I PDF prodotti da LaTeX estraggono gli accenti gravi come backtick separati + # dalla vocale accentata. Esempi: "`e" → "è", "puo`" → "può", "sar`a" → "sarà" + _ACCENT_MAP = { + "e": "è", "E": "È", "a": "à", "A": "À", + "u": "ù", "U": "Ù", "i": "ì", "I": "Ì", "o": "ò", "O": "Ò", + } + n_bt_before = text.count("`") + text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text) + text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text) + stats["n_accenti_corretti"] = n_bt_before - text.count("`") + + # 0b_pre. Rimuovi righe con dot-leader (voci di indice/sommario) + # Esempi: "- 1.1 Alfabeto greco . . . . . . 1", "3.4 Continuità . . . . 205" + # Pattern: almeno 3 occorrenze di ". " consecutive nella riga + stats["n_dotleader_rimossi"] = len( + re.findall(r"^[^\n]*(?:\. ){3,}[^\n]*$", text, re.MULTILINE) + ) + text = re.sub(r"^[^\n]*(?:\. ){3,}[^\n]*$", "", text, flags=re.MULTILINE) + + # 0b_pre2. Rimuovi righe che sono solo numerali romani (indicatori di pagina TOC) + # Esempi: "i", "ii", "iii", "iv", "v" su riga isolata (footer pagine indice LaTeX) + # Questi impedirebbero al transform 9 di rimuovere le entry TOC rimaste senza corpo. + text = re.sub( + r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$", + "", + text, + flags=re.IGNORECASE, + ) + + # Flag documento: rilevamento sezioni esercizi (es. libri di testo accademici) + # Usato per disabilitare transform 4b che convertirebbe i numeri degli esercizi in header. + _has_exercise_sections = bool(re.search(r"\bEsercizi\b", text, re.IGNORECASE)) + + # 0b. Fix header + body concatenati senza separatore + # "##### 11 TitoloCorpodel testo..." → "##### 11 Titolo\n\nCorpo del testo..." + def _fix_header_concat(m: re.Match) -> str: + hashes = m.group(1) + full = m.group(2).strip() + if len(full) < 60: + return m.group(0) + # Cerca split: lettera minuscola (incluse accentate) seguita da maiuscola + # Salta i primi ~10 char per non spezzare il numero della sezione + skip = min(10, len(full) // 3) + split = re.search(r"(?<=[a-zàèéìíòóùúä])(?=[A-ZÀÈÉÌÍÒÓÙÚ])", full[skip:]) + if split: + pos = skip + split.start() + title = full[:pos].strip() + body = full[pos:].strip() + if len(title) >= 5 and len(body) >= 15: + stats["n_header_concat_fixati"] += 1 + return f"{hashes} {title}\n\n{body}" + return m.group(0) + + text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix_header_concat, text, flags=re.MULTILINE) + + # 0c. Estrai "Capitolo N: TITOLO" inline nel corpo del testo → ## header separato + # "Capitolo 3: IL TITOLO DEL CAPITOLO - 16 Primo..." → "## Capitolo 3: ..." + # "Capitolo 1 : TITOLO CAPITOLO" → "## Capitolo 1: ..." + def _extract_capitolo(m: re.Match) -> str: + num = m.group(1) + titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip()) + return f"\n\n## Capitolo {num}: {titolo}\n\n" + + text = re.sub( + r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-ZÀÈÉÌÍÒÓÙÚ\'L][A-ZÀÈÉÌÍÒÓÙÚ\s\'\.,\(\)]{5,80}?)" + r"(?=\s*[-–]\s*\d|\s*\n|\s*$)", + _extract_capitolo, + text, + ) + + # 0d. Normalizza header di sezione a livello uniforme ### + # "#### N Titolo" → "### N. Titolo" (numerati: aggiunge punto) + # "#### B) Titolo" → "### B) Titolo" (lettera: solo cambio livello) + # "#### " → rimosso (vuoti) + text = re.sub( + r"^#{3,6}\s*$", + "", + text, + flags=re.MULTILINE, + ) + text = re.sub( + r"^(#{3,6})\s+(\d{1,3})\s+(.+)$", + lambda m: f"### {m.group(2)}. {m.group(3)}", + text, + flags=re.MULTILINE, + ) + text = re.sub( + r"^#{4,6}\s+(.+)$", + r"### \1", + text, + flags=re.MULTILINE, + ) + + # 1. Rimuovi **bold** negli header esistenti: ## **Titolo** → ## Titolo + text = re.sub( + r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$", + r"\1 \2", + text, flags=re.MULTILINE, + ) + + # 1b. Normalizza header ALL-CAPS → sentence-case + def _norm_allcaps_header(m: re.Match) -> str: + hashes, content = m.group(1), m.group(2).strip() + letters = [c for c in content if c.isalpha()] + if letters and all(c.isupper() for c in letters): + return f"{hashes} {_sentence_case(content)}" + return m.group(0) + + text = re.sub(r"^(#{1,6}) (.+)$", _norm_allcaps_header, text, flags=re.MULTILINE) + + # 2. Rimuovi righe TOC: header "# Indice", "# Contents", ecc. + # Rimuove la riga stessa; le voci subordinate (dot-leader) sono già rimosse da 0b_pre. + # L'header rimasto senza corpo viene poi eliminato dal transform 9. + lines = text.split("\n") + new_lines = [] + for line in lines: + # Stripping del prefisso markdown (##, #, ecc.) prima del confronto keyword + bare = re.sub(r"^#+\s*", "", line.strip()) + first_word = bare.split(".")[0].strip().lower() + if first_word in _TOC_KEYWORDS: + stats["toc_rimosso"] = True + else: + new_lines.append(line) + text = "\n".join(new_lines) + + # 3. Converti righe ALL-CAPS standalone → ## header + blocks = text.split("\n\n") + new_blocks = [] + for block in blocks: + stripped = block.strip() + if "\n" not in stripped and _is_allcaps_line(stripped): + new_blocks.append(_allcaps_to_header(stripped)) + stats["n_header_allcaps"] += 1 + else: + sub_lines = block.split("\n") + converted = [] + for ln in sub_lines: + if _is_allcaps_line(ln) and len(ln.strip()) > 3: + converted.append(_allcaps_to_header(ln)) + stats["n_header_allcaps"] += 1 + else: + converted.append(ln) + new_blocks.append("\n".join(converted)) + text = "\n\n".join(new_blocks) + + # 4. Converti sezioni numerate "N. testo" → "### N.\n\ntesto" + # Guarda che il testo non sia una frase completa (es. esercizi numerati): + # se termina con "." ed è più lungo di 40 caratteri, è probabilmente una frase, + # non un titolo di sezione → lascia invariato. + def _num_repl(m: re.Match) -> str: + content = m.group(2).strip() + if content.endswith(".") and len(content) > 40: + return m.group(0) + stats["n_sezioni_numerate"] += 1 + return f"### {m.group(1)}.\n\n{content}" + + text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE) + + def _num_letter_repl(m: re.Match) -> str: + stats["n_sezioni_numerate"] += 1 + return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}" + + text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE) + + # 4b. Converti "- N. testo" sezioni con punto → "### N.\n\ntesto" + # "- 1. Testo del primo punto..." → "### 1.\n\nTesto del primo punto..." + # Deve precedere 4c: "- N." ha il punto, "- N testo" no. + # Disabilitato se il documento contiene sezioni "Esercizi": in quel caso i + # "- N. testo" sono numerazioni di esercizi, non header di sezione. + if not _has_exercise_sections: + def _aphorism_repl(m: re.Match) -> str: + stats["n_sezioni_numerate"] += 1 + return f"\n\n### {m.group(1)}.\n\n{m.group(2).strip()}" + + text = re.sub( + r"^-\s+(\d{1,3})\.\s+(.{10,})$", + _aphorism_repl, + text, + flags=re.MULTILINE, + ) + + # 4c. Converti "- N testo" list item numerati → "### N.\n\ntesto" + # "- 12 Titolo sezione Corpo della sezione..." → "### 12. Titolo sezione\n\nCorpo..." + # Non tocca "- a) testo", "- 1) testo" (già gestiti come liste) + def _list_section_repl(m: re.Match) -> str: + num = m.group(1) + content = m.group(2).strip() + stats["n_sezioni_numerate"] += 1 + # Separa titolo da corpo: il titolo finisce dove una lettera minuscola + # è seguita da spazio e maiuscola (confine fine-titolo / inizio-corpo) + split = re.search(r"(?<=[a-zàèéìíòóùú])\s+(?=[A-ZÀÈÉÌÍÒÓÙÚ])", content) + if split and split.start() >= 3: + title = content[: split.start()].strip() + body = content[split.end() :].strip() + if len(body) >= 20: + return f"\n\n### {num}. {title}\n\n{body}" + # Nessun body inline: il content è solo il titolo + return f"\n\n### {num}. {content}" + + text = re.sub( + r"^-\s+(\d{1,3})\s+([A-ZÀÈÉÌÍÒÓÙÚ\'L].{10,})$", + _list_section_repl, + text, + flags=re.MULTILINE, + ) + + # 5. Unisci paragrafi spezzati da salti pagina PDF + _SENTENCE_END = set(".?!»)\"'") + blocks = text.split("\n\n") + merged = [] + i = 0 + while i < len(blocks): + b = blocks[i] + stripped = b.strip() + while ( + i + 1 < len(blocks) + and stripped + and not stripped.startswith("#") + and stripped[-1] not in _SENTENCE_END + ): + nxt = blocks[i + 1].strip() + if not nxt or nxt.startswith("#") or re.match(r"^\d+\.", nxt): + break + b = stripped + " " + nxt + stripped = b.strip() + stats["n_paragrafi_uniti"] += 1 + i += 1 + merged.append(b) + i += 1 + text = "\n\n".join(merged) + + # 6. Normalizza whitespace multiplo interno alle righe + lines = text.split("\n") + text = "\n".join( + re.sub(r" +", " ", line) if line.strip() else line + for line in lines + ) + + # 7. Riduci righe vuote multiple a doppie + text = re.sub(r"\n{3,}", "\n\n", text) + + # 8. Rimuovi righe che sono solo URL (watermark, footer di piattaforme) + text = re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text) + + # 9. Rimuovi header senza corpo: header seguito solo da righe vuote e poi + # da un altro header o dalla fine del testo (sezioni vuote / watermark) + blocks = re.split(r"\n{2,}", text) + cleaned = [] + for i, block in enumerate(blocks): + stripped = block.strip() + if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped: + next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else "" + if not next_stripped or re.match(r"^#{1,6} ", next_stripped): + continue # header senza corpo → scarta + cleaned.append(block) + text = re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)) + + return text, stats + + +# ─── [4] Rilevamento struttura (step-3 / detect_structure.py) ──────────────── +# +# Logica identica a step-3/detect_structure.py — mantenuta sincronizzata. + +_IT_WORDS = frozenset([ + "il", "la", "di", "e", "che", "non", "per", "un", "una", "si", + "con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le", + "gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era", + "erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così", +]) +_EN_WORDS = frozenset([ + "the", "of", "and", "to", "in", "is", "that", "it", "was", "for", + "on", "are", "as", "with", "his", "they", "at", "be", "this", "have", + "from", "or", "an", "but", "not", "by", "he", "she", "we", "you", + "which", "their", "been", "has", "would", "there", "when", "will", +]) + + +def _detect_language(text: str) -> str: + words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower()) + sample = words[:2000] + it = sum(1 for w in sample if w in _IT_WORDS) + en = sum(1 for w in sample if w in _EN_WORDS) + if it == 0 and en == 0: + return "unknown" + return "it" if it >= en else "en" + + +def _count_headers(text: str, level: int) -> int: + prefix = "#" * level + " " + return len(re.findall(rf"(?m)^{re.escape(prefix)}", text)) + + +def _count_paragraphs(text: str) -> int: + blocks = re.split(r"\n{2,}", text) + return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip())) + + +def _split_sections(text: str, level: int) -> list[str]: + prefix = "#" * level + " " + parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text) + return [p for p in parts[1:] if p.strip()] + + +def analyze(md_path: Path) -> dict: + text = md_path.read_text(encoding="utf-8") + n_h1 = _count_headers(text, 1) + n_h2 = _count_headers(text, 2) + n_h3 = _count_headers(text, 3) + n_paragrafi = _count_paragraphs(text) + + if n_h3 >= 5: + livello, boundary, strategia = 3, "h3", "h3_aware" + section_bodies = _split_sections(text, 3) + elif n_h2 >= 3: + livello, boundary, strategia = 2, "h2", "h2_paragraph_split" + section_bodies = _split_sections(text, 2) + elif n_h1 + n_h2 + n_h3 >= 1: + livello, boundary, strategia = 1, "paragrafo", "paragraph" + section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()] + elif n_paragrafi >= 3: + livello, boundary, strategia = 1, "paragrafo", "paragraph" + section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()] + else: + livello, boundary, strategia = 0, "nessuno", "sliding_window" + section_bodies = [text] if text.strip() else [] + + lengths = [len(b) for b in section_bodies if b.strip()] + lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0 + lingua = _detect_language(text) + + avvertenze = [] + short = sum(1 for l in lengths if l < 200) + long_ = sum(1 for l in lengths if l > 800) + if short: + avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate") + if long_: + avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise") + + return { + "livello_struttura": livello, + "n_h1": n_h1, + "n_h2": n_h2, + "n_h3": n_h3, + "n_paragrafi": n_paragrafi, + "boundary_primario": boundary, + "lingua_rilevata": lingua, + "lunghezza_media_sezione": lunghezza_media, + "strategia_chunking": strategia, + "avvertenze": avvertenze, + } + + +# ─── Pipeline principale ────────────────────────────────────────────────────── + +def run(stem: str, project_root: Path, force: bool) -> bool: + pdf_path = project_root / "sources" / f"{stem}.pdf" + out_dir = project_root / "conversion" / stem + raw_out = out_dir / "raw.md" + clean_out = out_dir / "clean.md" + profile_out = out_dir / "structure_profile.json" + + print(f"\n{'─' * 52}") + print(f" {stem}") + print(f"{'─' * 52}") + + if clean_out.exists() and not force: + print(f" ⚠️ conversion/{stem}/clean.md già presente — skip") + print(f" (usa --force per rieseguire)") + return True + + # ── [1] Validazione ──────────────────────────────────────────────────── + print(" [1/4] Validazione PDF...") + ok, msg = check_pdf(pdf_path) + if not ok: + print(f" ✗ {msg}") + return False + print(f" ✅ {msg}") + + # ── [2] Conversione ──────────────────────────────────────────────────── + print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...") + with tempfile.TemporaryDirectory() as tmp: + try: + md_file = convert_pdf(pdf_path, Path(tmp)) + except Exception as e: + print(f" ✗ Conversione fallita: {e}") + return False + raw_text = md_file.read_text(encoding="utf-8") + + size_kb = len(raw_text.encode()) // 1024 + n_lines = raw_text.count("\n") + print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe") + + # ── [3] Pulizia strutturale ──────────────────────────────────────────── + print(" [3/4] Pulizia strutturale...") + clean_text, t_stats = apply_transforms(raw_text) + reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0 + print(f" ✅ Immagini rimosse: {t_stats['n_immagini_rimosse']}") + print(f" Accenti corretti: {t_stats['n_accenti_corretti']}") + print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}") + print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}") + print(f" TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'}") + print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}") + print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}") + print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}") + print(f" Riduzione testo: {reduction:.0f}%") + + # ── [4] Profilo strutturale ──────────────────────────────────────────── + print(" [4/4] Analisi struttura...") + out_dir.mkdir(parents=True, exist_ok=True) + raw_out.write_text(raw_text, encoding="utf-8") + clean_out.write_text(clean_text, encoding="utf-8") + profile = analyze(clean_out) + profile_out.write_text(json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8") + + _LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"} + print(f" ✅ Struttura: livello {profile['livello_struttura']} — {_LIVELLO_DESC[profile['livello_struttura']]}") + print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} " + f"paragrafi={profile['n_paragrafi']}") + print(f" Strategia chunking: {profile['strategia_chunking']}") + print(f" Lingua rilevata: {profile['lingua_rilevata']}") + for w in profile["avvertenze"]: + print(f" ⚠️ {w}") + + print(f"\n Output:") + print(f" conversion/{stem}/raw.md (immutabile)") + print(f" conversion/{stem}/clean.md") + print(f" conversion/{stem}/structure_profile.json") + print(f"\n Prossimo passo: python step-5/chunker.py --stem {stem}") + return True + + +# ─── Entry point ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + project_root = Path(__file__).parent.parent + + parser = argparse.ArgumentParser( + description="Pipeline PDF → clean Markdown (sostituisce step 0+1+2+3+4)", + epilog=( + "Output compatibile con step-5+.\n" + "Prerequisiti: pip install opendataloader-pdf + Java 11+ sul PATH" + ), + ) + parser.add_argument( + "--stem", + help="Nome del documento (PDF in sources/.pdf). " + "Se omesso, elabora tutti i PDF in sources/.", + ) + parser.add_argument( + "--force", + action="store_true", + help="Riesegui anche se clean.md è già presente", + ) + args = parser.parse_args() + + _check_deps() + + if args.stem: + stems = [args.stem] + else: + sources_dir = project_root / "sources" + if not sources_dir.exists(): + print("Errore: cartella sources/ non trovata") + sys.exit(1) + stems = sorted(p.stem for p in sources_dir.glob("*.pdf")) + if not stems: + print("Errore: nessun PDF trovato in sources/") + sys.exit(1) + + results = [run(s, project_root, args.force) for s in stems] + ok = sum(results) + total = len(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti") + sys.exit(0 if all(results) else 1) diff --git a/requirements.txt b/requirements.txt index a30e6e4..6cc5bce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,9 @@ pdfplumber==0.11.9 # Step 2 — Conversione PDF → Markdown pymupdf4llm +# conversione/ — Pipeline automatica PDF → clean Markdown (alternativa a step 0+1+2+3+4) +# Richiede anche Java 11+ sul PATH: https://adoptium.net/ +opendataloader-pdf + # Step 8 — Vettorizzazione chromadb