Files
rag-from-scratch/conversione/pipeline.py
T
davide b7994100e7 feat(pdf-to-md): aggiungi pipeline automatica PDF → Markdown pulito
Nuova cartella conversione/ con pipeline.py che sostituisce
step-0+1+2+3+4 in un singolo comando senza operazioni manuali.
Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura).

Trasformazioni strutturali:
- accenti backtick da PDF LaTeX (es. `e→è, puo`→può)
- rimozione dot-leader TOC e numerali romani pagina (i, ii, iii)
- normalizzazione header a gerarchia uniforme h1/h2/h3
- conversione sezioni numerate e aforismi → header ###
- rilevamento sezioni Esercizi → disabilita conversione numerazioni
- watermark URL rimossi, header vuoti scartati
2026-04-16 15:27:53 +02:00

691 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
conversion/pipeline.py — PDF → clean Markdown (pipeline automatica)
Sostituisce step-0 + step-1 + step-2 + step-3 + step-4 in un solo comando,
senza operazioni manuali.
Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura corretto,
testo fluente, struttura preservata) al posto di pymupdf4llm.
Output (compatibile con step-5+):
conversion/<stem>/raw.md — output grezzo opendataloader (immutabile)
conversion/<stem>/clean.md — MD pulito e strutturato
conversion/<stem>/structure_profile.json
Uso:
python conversion/pipeline.py --stem <nome>
python conversion/pipeline.py # tutti i PDF in sources/
python conversion/pipeline.py --stem <nome> --force # forza riesecuzione
Prerequisiti:
pip install opendataloader-pdf
Java 11+ sul PATH (https://adoptium.net/)
"""
import argparse
import json
import re
import subprocess
import sys
import tempfile
from pathlib import Path
# ─── Verifica dipendenze ──────────────────────────────────────────────────────
def _check_deps() -> None:
try:
import opendataloader_pdf # noqa: F401
except ImportError:
print("Errore: opendataloader-pdf non installato.")
print(" pip install opendataloader-pdf")
sys.exit(1)
try:
result = subprocess.run(
["java", "-version"],
capture_output=True, text=True,
)
if result.returncode != 0:
raise FileNotFoundError
except FileNotFoundError:
print("Errore: Java 11+ non trovato sul PATH.")
print(" Installa da https://adoptium.net/")
sys.exit(1)
# ─── [1] Validazione PDF (step-0 + step-1) ────────────────────────────────────
def check_pdf(pdf_path: Path) -> tuple[bool, str]:
"""
Validazione rapida: esistenza, leggibilità, testo estraibile.
Restituisce (ok, messaggio).
"""
if not pdf_path.exists():
return False, f"File non trovato: {pdf_path}"
if pdf_path.suffix.lower() != ".pdf":
return False, f"Non è un PDF: {pdf_path.name}"
if pdf_path.stat().st_size == 0:
return False, "File vuoto"
try:
import pdfplumber
with pdfplumber.open(pdf_path) as pdf:
n_pages = len(pdf.pages)
if n_pages == 0:
return False, "PDF senza pagine"
sample = min(5, n_pages)
pages_with_text = sum(
1 for i in range(sample)
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if pages_with_text == 0:
return False, (
f"Nessun testo nelle prime {sample} pagine "
f"— probabilmente scansionato (usa modalità hybrid)"
)
return True, f"{n_pages} pagine, testo digitale confermato"
except Exception as e:
msg = str(e).lower()
if "password" in msg or "encrypted" in msg:
return False, "PDF protetto da password"
return False, f"Impossibile aprire: {e}"
# ─── [2] Conversione PDF → Markdown (step-2) ─────────────────────────────────
def convert_pdf(pdf_path: Path, out_dir: Path) -> Path:
"""
Converte il PDF in Markdown tramite opendataloader-pdf.
Scrive il file nella out_dir e restituisce il percorso.
Parametri scelti per output RAG-ottimale:
- keep_line_breaks=False → testo fluente, no hard-wrap PDF
- reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++)
- sanitize=False → preserva il testo originale (no anonimizzazione PII)
"""
import opendataloader_pdf
out_dir.mkdir(parents=True, exist_ok=True)
opendataloader_pdf.convert(
input_path=str(pdf_path),
output_dir=str(out_dir),
format="markdown",
keep_line_breaks=False,
reading_order="xycut",
sanitize=False,
image_output="off", # nessuna immagine estratta né referenziata
quiet=True, # sopprime i log Java
)
# Il file output si chiama <stem>.md
md_file = out_dir / f"{pdf_path.stem}.md"
if not md_file.exists():
candidates = list(out_dir.glob("*.md"))
if not candidates:
raise RuntimeError(f"Nessun file .md prodotto in {out_dir}")
md_file = candidates[0]
return md_file
# ─── [3] Pulizia strutturale (step-4 / revise.py) ────────────────────────────
#
# Logica identica a step-4/revise.py — mantenuta sincronizzata.
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
])
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
def _sentence_case(s: str) -> str:
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
)
def _allcaps_to_header(raw_line: str) -> str:
text = raw_line.strip().rstrip(".").rstrip("?").strip()
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
if m:
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
return f"## {_sentence_case(text)}"
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica le trasformazioni strutturali al Markdown grezzo.
Restituisce (testo_modificato, statistiche).
"""
stats = {
"toc_rimosso": False,
"n_immagini_rimosse": 0,
"n_accenti_corretti": 0,
"n_dotleader_rimossi": 0,
"n_header_concat_fixati": 0,
"n_header_allcaps": 0,
"n_sezioni_numerate": 0,
"n_paragrafi_uniti": 0,
}
# 0. Rimuovi riferimenti immagini (artefatti opendataloader-pdf)
stats["n_immagini_rimosse"] = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text))
text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text)
# 0a. Fix artefatti backtick da PDF LaTeX: `e→è, e`→è, sar`a→sarà, ecc.
# I PDF prodotti da LaTeX estraggono gli accenti gravi come backtick separati
# dalla vocale accentata. Esempi: "`e" → "è", "puo`" → "può", "sar`a" → "sarà"
_ACCENT_MAP = {
"e": "è", "E": "È", "a": "à", "A": "À",
"u": "ù", "U": "Ù", "i": "ì", "I": "Ì", "o": "ò", "O": "Ò",
}
n_bt_before = text.count("`")
text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text)
text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text)
stats["n_accenti_corretti"] = n_bt_before - text.count("`")
# 0b_pre. Rimuovi righe con dot-leader (voci di indice/sommario)
# Esempi: "- 1.1 Alfabeto greco . . . . . . 1", "3.4 Continuità . . . . 205"
# Pattern: almeno 3 occorrenze di ". " consecutive nella riga
stats["n_dotleader_rimossi"] = len(
re.findall(r"^[^\n]*(?:\. ){3,}[^\n]*$", text, re.MULTILINE)
)
text = re.sub(r"^[^\n]*(?:\. ){3,}[^\n]*$", "", text, flags=re.MULTILINE)
# 0b_pre2. Rimuovi righe che sono solo numerali romani (indicatori di pagina TOC)
# Esempi: "i", "ii", "iii", "iv", "v" su riga isolata (footer pagine indice LaTeX)
# Questi impedirebbero al transform 9 di rimuovere le entry TOC rimaste senza corpo.
text = re.sub(
r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$",
"",
text,
flags=re.IGNORECASE,
)
# Flag documento: rilevamento sezioni esercizi (es. libri di testo accademici)
# Usato per disabilitare transform 4b che convertirebbe i numeri degli esercizi in header.
_has_exercise_sections = bool(re.search(r"\bEsercizi\b", text, re.IGNORECASE))
# 0b. Fix header + body concatenati senza separatore
# "##### 11 TitoloCorpodel testo..." → "##### 11 Titolo\n\nCorpo del testo..."
def _fix_header_concat(m: re.Match) -> str:
hashes = m.group(1)
full = m.group(2).strip()
if len(full) < 60:
return m.group(0)
# Cerca split: lettera minuscola (incluse accentate) seguita da maiuscola
# Salta i primi ~10 char per non spezzare il numero della sezione
skip = min(10, len(full) // 3)
split = re.search(r"(?<=[a-zàèéìíòóùúä])(?=[A-ZÀÈÉÌÍÒÓÙÚ])", full[skip:])
if split:
pos = skip + split.start()
title = full[:pos].strip()
body = full[pos:].strip()
if len(title) >= 5 and len(body) >= 15:
stats["n_header_concat_fixati"] += 1
return f"{hashes} {title}\n\n{body}"
return m.group(0)
text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix_header_concat, text, flags=re.MULTILINE)
# 0c. Estrai "Capitolo N: TITOLO" inline nel corpo del testo → ## header separato
# "Capitolo 3: IL TITOLO DEL CAPITOLO - 16 Primo..." → "## Capitolo 3: ..."
# "Capitolo 1 : TITOLO CAPITOLO" → "## Capitolo 1: ..."
def _extract_capitolo(m: re.Match) -> str:
num = m.group(1)
titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip())
return f"\n\n## Capitolo {num}: {titolo}\n\n"
text = re.sub(
r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-ZÀÈÉÌÍÒÓÙÚ\'L][A-ZÀÈÉÌÍÒÓÙÚ\s\'\.,\(\)]{5,80}?)"
r"(?=\s*[-]\s*\d|\s*\n|\s*$)",
_extract_capitolo,
text,
)
# 0d. Normalizza header di sezione a livello uniforme ###
# "#### N Titolo" → "### N. Titolo" (numerati: aggiunge punto)
# "#### B) Titolo" → "### B) Titolo" (lettera: solo cambio livello)
# "#### " → rimosso (vuoti)
text = re.sub(
r"^#{3,6}\s*$",
"",
text,
flags=re.MULTILINE,
)
text = re.sub(
r"^(#{3,6})\s+(\d{1,3})\s+(.+)$",
lambda m: f"### {m.group(2)}. {m.group(3)}",
text,
flags=re.MULTILINE,
)
text = re.sub(
r"^#{4,6}\s+(.+)$",
r"### \1",
text,
flags=re.MULTILINE,
)
# 1. Rimuovi **bold** negli header esistenti: ## **Titolo** → ## Titolo
text = re.sub(
r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$",
r"\1 \2",
text, flags=re.MULTILINE,
)
# 1b. Normalizza header ALL-CAPS → sentence-case
def _norm_allcaps_header(m: re.Match) -> str:
hashes, content = m.group(1), m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(r"^(#{1,6}) (.+)$", _norm_allcaps_header, text, flags=re.MULTILINE)
# 2. Rimuovi righe TOC: header "# Indice", "# Contents", ecc.
# Rimuove la riga stessa; le voci subordinate (dot-leader) sono già rimosse da 0b_pre.
# L'header rimasto senza corpo viene poi eliminato dal transform 9.
lines = text.split("\n")
new_lines = []
for line in lines:
# Stripping del prefisso markdown (##, #, ecc.) prima del confronto keyword
bare = re.sub(r"^#+\s*", "", line.strip())
first_word = bare.split(".")[0].strip().lower()
if first_word in _TOC_KEYWORDS:
stats["toc_rimosso"] = True
else:
new_lines.append(line)
text = "\n".join(new_lines)
# 3. Converti righe ALL-CAPS standalone → ## header
blocks = text.split("\n\n")
new_blocks = []
for block in blocks:
stripped = block.strip()
if "\n" not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
stats["n_header_allcaps"] += 1
else:
sub_lines = block.split("\n")
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
stats["n_header_allcaps"] += 1
else:
converted.append(ln)
new_blocks.append("\n".join(converted))
text = "\n\n".join(new_blocks)
# 4. Converti sezioni numerate "N. testo" → "### N.\n\ntesto"
# Guarda che il testo non sia una frase completa (es. esercizi numerati):
# se termina con "." ed è più lungo di 40 caratteri, è probabilmente una frase,
# non un titolo di sezione → lascia invariato.
def _num_repl(m: re.Match) -> str:
content = m.group(2).strip()
if content.endswith(".") and len(content) > 40:
return m.group(0)
stats["n_sezioni_numerate"] += 1
return f"### {m.group(1)}.\n\n{content}"
text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE)
def _num_letter_repl(m: re.Match) -> str:
stats["n_sezioni_numerate"] += 1
return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}"
text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE)
# 4b. Converti "- N. testo" sezioni con punto → "### N.\n\ntesto"
# "- 1. Testo del primo punto..." → "### 1.\n\nTesto del primo punto..."
# Deve precedere 4c: "- N." ha il punto, "- N testo" no.
# Disabilitato se il documento contiene sezioni "Esercizi": in quel caso i
# "- N. testo" sono numerazioni di esercizi, non header di sezione.
if not _has_exercise_sections:
def _aphorism_repl(m: re.Match) -> str:
stats["n_sezioni_numerate"] += 1
return f"\n\n### {m.group(1)}.\n\n{m.group(2).strip()}"
text = re.sub(
r"^-\s+(\d{1,3})\.\s+(.{10,})$",
_aphorism_repl,
text,
flags=re.MULTILINE,
)
# 4c. Converti "- N testo" list item numerati → "### N.\n\ntesto"
# "- 12 Titolo sezione Corpo della sezione..." → "### 12. Titolo sezione\n\nCorpo..."
# Non tocca "- a) testo", "- 1) testo" (già gestiti come liste)
def _list_section_repl(m: re.Match) -> str:
num = m.group(1)
content = m.group(2).strip()
stats["n_sezioni_numerate"] += 1
# Separa titolo da corpo: il titolo finisce dove una lettera minuscola
# è seguita da spazio e maiuscola (confine fine-titolo / inizio-corpo)
split = re.search(r"(?<=[a-zàèéìíòóùú])\s+(?=[A-ZÀÈÉÌÍÒÓÙÚ])", content)
if split and split.start() >= 3:
title = content[: split.start()].strip()
body = content[split.end() :].strip()
if len(body) >= 20:
return f"\n\n### {num}. {title}\n\n{body}"
# Nessun body inline: il content è solo il titolo
return f"\n\n### {num}. {content}"
text = re.sub(
r"^-\s+(\d{1,3})\s+([A-ZÀÈÉÌÍÒÓÙÚ\'L].{10,})$",
_list_section_repl,
text,
flags=re.MULTILINE,
)
# 5. Unisci paragrafi spezzati da salti pagina PDF
_SENTENCE_END = set(".?!»)\"'")
blocks = text.split("\n\n")
merged = []
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith("#")
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
if not nxt or nxt.startswith("#") or re.match(r"^\d+\.", nxt):
break
b = stripped + " " + nxt
stripped = b.strip()
stats["n_paragrafi_uniti"] += 1
i += 1
merged.append(b)
i += 1
text = "\n\n".join(merged)
# 6. Normalizza whitespace multiplo interno alle righe
lines = text.split("\n")
text = "\n".join(
re.sub(r" +", " ", line) if line.strip() else line
for line in lines
)
# 7. Riduci righe vuote multiple a doppie
text = re.sub(r"\n{3,}", "\n\n", text)
# 8. Rimuovi righe che sono solo URL (watermark, footer di piattaforme)
text = re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text)
# 9. Rimuovi header senza corpo: header seguito solo da righe vuote e poi
# da un altro header o dalla fine del testo (sezioni vuote / watermark)
blocks = re.split(r"\n{2,}", text)
cleaned = []
for i, block in enumerate(blocks):
stripped = block.strip()
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
if not next_stripped or re.match(r"^#{1,6} ", next_stripped):
continue # header senza corpo → scarta
cleaned.append(block)
text = re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned))
return text, stats
# ─── [4] Rilevamento struttura (step-3 / detect_structure.py) ────────────────
#
# Logica identica a step-3/detect_structure.py — mantenuta sincronizzata.
_IT_WORDS = frozenset([
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
])
_EN_WORDS = frozenset([
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
def _detect_language(text: str) -> str:
words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())
sample = words[:2000]
it = sum(1 for w in sample if w in _IT_WORDS)
en = sum(1 for w in sample if w in _EN_WORDS)
if it == 0 and en == 0:
return "unknown"
return "it" if it >= en else "en"
def _count_headers(text: str, level: int) -> int:
prefix = "#" * level + " "
return len(re.findall(rf"(?m)^{re.escape(prefix)}", text))
def _count_paragraphs(text: str) -> int:
blocks = re.split(r"\n{2,}", text)
return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip()))
def _split_sections(text: str, level: int) -> list[str]:
prefix = "#" * level + " "
parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text)
return [p for p in parts[1:] if p.strip()]
def analyze(md_path: Path) -> dict:
text = md_path.read_text(encoding="utf-8")
n_h1 = _count_headers(text, 1)
n_h2 = _count_headers(text, 2)
n_h3 = _count_headers(text, 3)
n_paragrafi = _count_paragraphs(text)
if n_h3 >= 5:
livello, boundary, strategia = 3, "h3", "h3_aware"
section_bodies = _split_sections(text, 3)
elif n_h2 >= 3:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = _split_sections(text, 2)
elif n_h1 + n_h2 + n_h3 >= 1:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
elif n_paragrafi >= 3:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
else:
livello, boundary, strategia = 0, "nessuno", "sliding_window"
section_bodies = [text] if text.strip() else []
lengths = [len(b) for b in section_bodies if b.strip()]
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
lingua = _detect_language(text)
avvertenze = []
short = sum(1 for l in lengths if l < 200)
long_ = sum(1 for l in lengths if l > 800)
if short:
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
if long_:
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
return {
"livello_struttura": livello,
"n_h1": n_h1,
"n_h2": n_h2,
"n_h3": n_h3,
"n_paragrafi": n_paragrafi,
"boundary_primario": boundary,
"lingua_rilevata": lingua,
"lunghezza_media_sezione": lunghezza_media,
"strategia_chunking": strategia,
"avvertenze": avvertenze,
}
# ─── Pipeline principale ──────────────────────────────────────────────────────
def run(stem: str, project_root: Path, force: bool) -> bool:
pdf_path = project_root / "sources" / f"{stem}.pdf"
out_dir = project_root / "conversion" / stem
raw_out = out_dir / "raw.md"
clean_out = out_dir / "clean.md"
profile_out = out_dir / "structure_profile.json"
print(f"\n{'' * 52}")
print(f" {stem}")
print(f"{'' * 52}")
if clean_out.exists() and not force:
print(f" ⚠️ conversion/{stem}/clean.md già presente — skip")
print(f" (usa --force per rieseguire)")
return True
# ── [1] Validazione ────────────────────────────────────────────────────
print(" [1/4] Validazione PDF...")
ok, msg = check_pdf(pdf_path)
if not ok:
print(f"{msg}")
return False
print(f"{msg}")
# ── [2] Conversione ────────────────────────────────────────────────────
print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...")
with tempfile.TemporaryDirectory() as tmp:
try:
md_file = convert_pdf(pdf_path, Path(tmp))
except Exception as e:
print(f" ✗ Conversione fallita: {e}")
return False
raw_text = md_file.read_text(encoding="utf-8")
size_kb = len(raw_text.encode()) // 1024
n_lines = raw_text.count("\n")
print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe")
# ── [3] Pulizia strutturale ────────────────────────────────────────────
print(" [3/4] Pulizia strutturale...")
clean_text, t_stats = apply_transforms(raw_text)
reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0
print(f" ✅ Immagini rimosse: {t_stats['n_immagini_rimosse']}")
print(f" Accenti corretti: {t_stats['n_accenti_corretti']}")
print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}")
print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}")
print(f" TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}")
print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}")
print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}")
print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}")
print(f" Riduzione testo: {reduction:.0f}%")
# ── [4] Profilo strutturale ────────────────────────────────────────────
print(" [4/4] Analisi struttura...")
out_dir.mkdir(parents=True, exist_ok=True)
raw_out.write_text(raw_text, encoding="utf-8")
clean_out.write_text(clean_text, encoding="utf-8")
profile = analyze(clean_out)
profile_out.write_text(json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8")
_LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}
print(f" ✅ Struttura: livello {profile['livello_struttura']}{_LIVELLO_DESC[profile['livello_struttura']]}")
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} "
f"paragrafi={profile['n_paragrafi']}")
print(f" Strategia chunking: {profile['strategia_chunking']}")
print(f" Lingua rilevata: {profile['lingua_rilevata']}")
for w in profile["avvertenze"]:
print(f" ⚠️ {w}")
print(f"\n Output:")
print(f" conversion/{stem}/raw.md (immutabile)")
print(f" conversion/{stem}/clean.md")
print(f" conversion/{stem}/structure_profile.json")
print(f"\n Prossimo passo: python step-5/chunker.py --stem {stem}")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(
description="Pipeline PDF → clean Markdown (sostituisce step 0+1+2+3+4)",
epilog=(
"Output compatibile con step-5+.\n"
"Prerequisiti: pip install opendataloader-pdf + Java 11+ sul PATH"
),
)
parser.add_argument(
"--stem",
help="Nome del documento (PDF in sources/<stem>.pdf). "
"Se omesso, elabora tutti i PDF in sources/.",
)
parser.add_argument(
"--force",
action="store_true",
help="Riesegui anche se clean.md è già presente",
)
args = parser.parse_args()
_check_deps()
if args.stem:
stems = [args.stem]
else:
sources_dir = project_root / "sources"
if not sources_dir.exists():
print("Errore: cartella sources/ non trovata")
sys.exit(1)
stems = sorted(p.stem for p in sources_dir.glob("*.pdf"))
if not stems:
print("Errore: nessun PDF trovato in sources/")
sys.exit(1)
results = [run(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti")
sys.exit(0 if all(results) else 1)