Files
rag-from-scratch/conversione/pipeline.py
T
davide ea721774da feat(pipeline): 10 nuovi transform e metriche residui estese
- 0_br: rimozione tag <br> residui da tabelle PDF
- 0_tabsep: rimozione separatori | | e |---| (doppio pass pre/post merge)
- 0a2: correzione encoding " → × (moltiplicazione, solo digit-before)
- 0a3: correzione encoding ! → µ prima di unità SI
- 0a4: rimozione label formule inline [N.M]
- 9c: filtro garbage headers — simboli puri, abbreviazioni brevi, prefisso ...
- 9d: rimozione sezioni frontmatter (URL, email, copyright, affiliazione)
- build_report: tracking esteso br_inline, simboli_encoding, formule_inline
2026-04-17 09:19:53 +02:00

1108 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
conversione/pipeline.py — PDF → clean Markdown (pipeline automatica)
Converte un PDF grezzo in Markdown strutturato e pulito, pronto per la
suddivisione in chunk. Gestisce validazione, estrazione testo, pulizia
strutturale e rilevamento automatico della struttura del documento.
Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura corretto,
testo fluente, struttura preservata).
Output per ciascuno stem:
conversione/<stem>/raw.md — Markdown grezzo (immutabile)
conversione/<stem>/clean.md — Markdown pulito e strutturato
conversione/<stem>/structure_profile.json
Uso:
python conversione/pipeline.py --stem <nome>
python conversione/pipeline.py # tutti i PDF in sources/
python conversione/pipeline.py --stem <nome> --force # forza riesecuzione
Prerequisiti:
pip install opendataloader-pdf
Java 11+ sul PATH (https://adoptium.net/)
"""
import argparse
import json
import re
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
# ─── Verifica dipendenze ──────────────────────────────────────────────────────
def _check_deps() -> None:
try:
import opendataloader_pdf # noqa: F401
except ImportError:
print("Errore: opendataloader-pdf non installato.")
print(" pip install opendataloader-pdf")
sys.exit(1)
try:
result = subprocess.run(
["java", "-version"],
capture_output=True, text=True,
)
if result.returncode != 0:
raise FileNotFoundError
except FileNotFoundError:
print("Errore: Java 11+ non trovato sul PATH.")
print(" Installa da https://adoptium.net/")
sys.exit(1)
# ─── [1] Validazione PDF ─────────────────────────────────────────────────────
def check_pdf(pdf_path: Path) -> tuple[bool, str]:
"""
Validazione rapida: esistenza, leggibilità, testo estraibile.
Restituisce (ok, messaggio).
"""
if not pdf_path.exists():
return False, f"File non trovato: {pdf_path}"
if pdf_path.suffix.lower() != ".pdf":
return False, f"Non è un PDF: {pdf_path.name}"
if pdf_path.stat().st_size == 0:
return False, "File vuoto"
try:
import pdfplumber
with pdfplumber.open(pdf_path) as pdf:
n_pages = len(pdf.pages)
if n_pages == 0:
return False, "PDF senza pagine"
sample = min(5, n_pages)
pages_with_text = sum(
1 for i in range(sample)
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if pages_with_text == 0:
return False, (
f"Nessun testo nelle prime {sample} pagine "
f"— probabilmente scansionato (usa modalità hybrid)"
)
return True, f"{n_pages} pagine, testo digitale confermato"
except Exception as e:
msg = str(e).lower()
if "password" in msg or "encrypted" in msg:
return False, "PDF protetto da password"
return False, f"Impossibile aprire: {e}"
# ─── [2] Conversione PDF → Markdown ─────────────────────────────────────────
def convert_pdf(pdf_path: Path, out_dir: Path) -> Path:
"""
Converte il PDF in Markdown tramite opendataloader-pdf.
Scrive il file nella out_dir e restituisce il percorso.
Parametri scelti per output RAG-ottimale:
- keep_line_breaks=False → testo fluente, no hard-wrap PDF
- reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++)
- sanitize=False → preserva il testo originale (no anonimizzazione PII)
"""
import opendataloader_pdf
out_dir.mkdir(parents=True, exist_ok=True)
opendataloader_pdf.convert(
input_path=str(pdf_path),
output_dir=str(out_dir),
format="markdown",
keep_line_breaks=False,
reading_order="xycut",
sanitize=False,
image_output="off", # nessuna immagine estratta né referenziata
quiet=True, # sopprime i log Java
)
# Il file output si chiama <stem>.md
md_file = out_dir / f"{pdf_path.stem}.md"
if not md_file.exists():
candidates = list(out_dir.glob("*.md"))
if not candidates:
raise RuntimeError(f"Nessun file .md prodotto in {out_dir}")
md_file = candidates[0]
return md_file
# ─── [3] Pulizia strutturale ─────────────────────────────────────────────────
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
])
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
def _sentence_case(s: str) -> str:
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
)
def _allcaps_to_header(raw_line: str) -> str:
# Rimuovi eventuale prefisso di lista "- " o "* " prima di creare l'header
text = re.sub(r"^[-*+]\s+", "", raw_line.strip())
text = text.rstrip(".").rstrip("?").strip()
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
if m:
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
return f"## {_sentence_case(text)}"
def _extract_math_environments(text: str) -> tuple[str, int]:
"""
Converte paragrafi che iniziano con ambienti matematici in header ###.
'Teorema 1.6.3 (principio di induzione) Sia A ⊆ N...'
'### Teorema 1.6.3 (principio di induzione)\n\nSia A ⊆ N...'
Riconosce: Definizione, Teorema, Lemma, Proposizione, Corollario,
Osservazione, Nota, Esempio (solo con numero di sezione).
Non tocca paragrafi che già iniziano con un header Markdown.
Deve girare PRIMA del merge paragrafi (step 5) per sfruttare i blocchi intatti.
"""
_ENVS = (
r"Definizione|Teorema|Lemma|Proposizione|"
r"Corollario|Osservazione|Nota|Esempio"
)
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
m = re.match(
rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)",
stripped,
re.DOTALL,
)
if not m:
result.append(block)
continue
env = m.group(1)
num = m.group(2).rstrip(".")
rest = m.group(3).strip()
# Titolo opzionale tra parentesi: "(principio di induzione)"
title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL)
if title_m:
header = f"### {env} {num} {title_m.group(1)}"
body = title_m.group(2).strip()
else:
header = f"### {env} {num}."
body = rest
result.append(f"{header}\n\n{body}" if body else header)
count += 1
return "\n\n".join(result), count
def _merge_title_headers(text: str) -> tuple[str, int]:
"""
Fonde header numerici isolati con il sottotitolo breve che li segue.
'### N.\n\nSottotitolo (riga singola ≤ 80 char, senza punto finale)'
'### N. Sottotitolo'
Caso tipico: parti di un'opera (es. Nietzsche) dove il numero di sezione
e il titolo della sezione sono in blocchi Markdown separati.
Non tocca header con titolo già inline né header seguiti da testo lungo.
"""
count = 0
blocks = re.split(r"\n{2,}", text)
result = []
i = 0
while i < len(blocks):
block = blocks[i]
stripped = block.strip()
if (
re.match(r"^#{2,3} \d+\.\s*$", stripped)
and i + 1 < len(blocks)
):
nxt = blocks[i + 1].strip()
# Sottotitolo valido: riga singola, ≤ 80 char, non header, non numerazione pura
if (
nxt
and "\n" not in nxt
and len(nxt) <= 80
and not nxt.startswith("#")
and not re.match(r"^\d+[\.\)]\s", nxt)
):
result.append(stripped.rstrip() + " " + nxt)
count += 1
i += 2
continue
result.append(block)
i += 1
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count
def _extract_article_headers(text: str) -> tuple[str, int]:
"""
Converte voci di articolo dal formato lista Markdown al formato header ###.
'- Art. N[suffix]. Titolo. Corpo testo...''### Art. N[suffix]. Titolo.\n\nCorpo testo...'
'- Art. N[suffix]. (…) (1)''### Art. N[suffix].\n\n(…) (1)'
Gestisce suffissi come: Art. 4-bis., Art. 14-ter., Art. 1-quinquies.
Il titolo è la prima frase con iniziale maiuscola che termina con '.' prima di
ulteriore testo (es. "Leggi. La formazione..." → titolo "Leggi", corpo "La formazione...").
Se il testo non ha titolo separabile, tutto diventa il corpo.
"""
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
rest = m.group(2).strip()
# Titolo: frase con iniziale maiuscola, max 75 char, termina con '.',
# seguita da almeno un'altra frase (minimo 5 char) che inizia con maiuscola
# o con '(' / cifra (note a piè o continuazione corpo).
title_m = re.match(
r"^([A-ZÀÈÉÌÍÒÓÙÚ].{1,74}?)\.\s+([A-ZÀÈÉÌÍÒÓÙÚ\(\d].{4,})",
rest,
)
if title_m:
count += 1
return (
f"### Art. {num}. {title_m.group(1)}.\n\n"
f"{title_m.group(2).strip()}"
)
# Nessun titolo separabile: tutto è corpo
if rest:
count += 1
return f"### Art. {num}.\n\n{rest}"
# Articolo senza testo inline (es. "- Art. 5. (…) (1)" già estratto sopra,
# oppure articolo vuoto nella lista)
count += 1
return f"### Art. {num}."
text = re.sub(
r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)",
_repl,
text,
flags=re.MULTILINE,
)
return text, count
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica le trasformazioni strutturali al Markdown grezzo.
Restituisce (testo_modificato, statistiche).
"""
stats = {
"toc_rimosso": False,
"n_immagini_rimosse": 0,
"n_accenti_corretti": 0,
"n_moltiplicazioni_corrette": 0,
"n_micro_corretti": 0,
"n_br_rimossi": 0,
"n_formule_rimossi": 0,
"n_garbage_headers_rimossi": 0,
"n_frontmatter_rimossi": 0,
"n_dotleader_rimossi": 0,
"n_header_concat_fixati": 0,
"n_articoli_estratti": 0,
"n_ambienti_matematici": 0,
"n_titoli_uniti": 0,
"n_header_allcaps": 0,
"n_sezioni_numerate": 0,
"n_paragrafi_uniti": 0,
"n_tabsep_rimossi": 0,
}
# 0. Rimuovi riferimenti immagini (artefatti opendataloader-pdf)
stats["n_immagini_rimosse"] = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text))
text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text)
# 0_br. Rimuovi tag <br> residui da tabelle e blocchi formula PDF
# Nelle celle di tabella produce spazio; nel testo inline elimina rumore.
stats["n_br_rimossi"] = len(re.findall(r"<br>", text, re.IGNORECASE))
text = re.sub(r"<br>\s*", " ", text, flags=re.IGNORECASE)
# 0_tabsep. Rimuovi separatori tabella PDF: "| |" (riga vuota) e "|---|" (separatore).
# Nascono da tabelle non strutturate nel PDF. Rimossi PRIMA del merge paragrafi
# (step 5) altrimenti "|---|" viene fuso con il paragrafo successivo producendo
# righe tipo "|---| Una caratterizzazione analoga...".
_pat_tabsep = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$")
stats["n_tabsep_rimossi"] = len(_pat_tabsep.findall(text))
text = _pat_tabsep.sub("", text)
# 0a. Fix artefatti backtick da PDF LaTeX: `e→è, e`→è, sar`a→sarà, ecc.
# I PDF prodotti da LaTeX estraggono gli accenti gravi come backtick separati
# dalla vocale accentata. Esempi: "`e" → "è", "puo`" → "può", "sar`a" → "sarà"
_ACCENT_MAP = {
"e": "è", "E": "È", "a": "à", "A": "À",
"u": "ù", "U": "Ù", "i": "ì", "I": "Ì", "o": "ò", "O": "Ò",
}
n_bt_before = text.count("`")
text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text)
text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text)
stats["n_accenti_corretti"] = n_bt_before - text.count("`")
# Backtick orfani: artefatti LaTeX rimasti dopo la correzione vocale
# (es. "propriet`" da "proprietà", "continuit`" da "continuità").
# In testi PDF non esistono backtick legittimi → rimozione sicura.
n_bt_orfani = text.count("`")
if n_bt_orfani:
text = re.sub(r"`", "", text)
stats["n_accenti_corretti"] += n_bt_orfani
# 0a2. Fix segno di moltiplicazione "→× (encoding font PDF non-standard)
# Esempi: 2"107 → 2×107, 2"(10-2 m)3 → 2×(10-2 m)3
# Lookbehind SOLO su cifra: evita falsi positivi tipo t1"t0 (→ limite)
# o h"hf (→ differenza) dove la lettera prima della " non indica prodotto.
_n_cross = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text))
text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text)
stats["n_moltiplicazioni_corrette"] = _n_cross
# 0a3. Fix prefisso micro !→µ prima di unità SI note
# "1 !m" → "1 µm", "1 !A" → "1 µA", "3 !s-1" → "3 µs-1"
# Pattern stretto: cifra + spazio opzionale + ! + lettera unità SI a scelta ristretta.
# Non tocca "4! steradianti" (spazio dopo !) né "mol!K" (non preceduto da cifra).
_SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]'
_n_micro = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text))
text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text)
stats["n_micro_corretti"] = _n_micro
# 0a4. Rimuovi label formule inline [N.M] — es. [3.4], [10.7], [5.25]
# Non aggiungono valore semantico per il RAG; restano come rumore numerico.
# Preserva [N] senza punto (riferimenti bibliografici/note legittime).
n_form_before = len(re.findall(r"\[\d+\.\d+\]", text))
text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text)
stats["n_formule_rimossi"] = n_form_before
# 0b_pre. Rimuovi righe con dot-leader (voci di indice/sommario)
# Esempi: "- 1.1 Alfabeto greco . . . . . . 1", "3.4 Continuità . . . . 205"
# Pattern: almeno 3 occorrenze di ". " consecutive nella riga
# Cattura sia ". . . ." (spazi) sia "......." (punti continui, tipici dei TOC PDF)
_DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$"
stats["n_dotleader_rimossi"] = len(
re.findall(_DOTLEADER_RE, text, re.MULTILINE)
)
text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE)
# 0b_pre2. Rimuovi righe che sono solo numerali romani (indicatori di pagina TOC)
# Esempi: "i", "ii", "iii", "iv", "v" su riga isolata (footer pagine indice LaTeX)
# Questi impedirebbero al transform 9 di rimuovere le entry TOC rimaste senza corpo.
text = re.sub(
r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$",
"",
text,
flags=re.IGNORECASE,
)
# Flag documento: rilevamento sezioni esercizi (es. libri di testo accademici)
# Usato per disabilitare transform 4b che convertirebbe i numeri degli esercizi in header.
_has_exercise_sections = bool(re.search(r"\bEsercizi\b", text, re.IGNORECASE))
# 0b. Fix header + body concatenati senza separatore
# "##### 11 TitoloCorpodel testo..." → "##### 11 Titolo\n\nCorpo del testo..."
def _fix_header_concat(m: re.Match) -> str:
hashes = m.group(1)
full = m.group(2).strip()
if len(full) < 60:
return m.group(0)
# Cerca split: lettera minuscola (incluse accentate) seguita da maiuscola
# Salta i primi ~10 char per non spezzare il numero della sezione
skip = min(10, len(full) // 3)
split = re.search(r"(?<=[a-zàèéìíòóùúä])(?=[A-ZÀÈÉÌÍÒÓÙÚ])", full[skip:])
if split:
pos = skip + split.start()
title = full[:pos].strip()
body = full[pos:].strip()
if len(title) >= 5 and len(body) >= 15:
stats["n_header_concat_fixati"] += 1
return f"{hashes} {title}\n\n{body}"
return m.group(0)
text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix_header_concat, text, flags=re.MULTILINE)
# 0c. Estrai "Capitolo N: TITOLO" inline nel corpo del testo → ## header separato
# "Capitolo 3: IL TITOLO DEL CAPITOLO - 16 Primo..." → "## Capitolo 3: ..."
# "Capitolo 1 : TITOLO CAPITOLO" → "## Capitolo 1: ..."
def _extract_capitolo(m: re.Match) -> str:
num = m.group(1)
titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip())
return f"\n\n## Capitolo {num}: {titolo}\n\n"
text = re.sub(
r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-ZÀÈÉÌÍÒÓÙÚ\'L][A-ZÀÈÉÌÍÒÓÙÚ\s\'\.,\(\)]{5,80}?)"
r"(?=\s*[-]\s*\d|\s*\n|\s*$)",
_extract_capitolo,
text,
)
# 0d. Normalizza header di sezione a livello uniforme ###
# "#### N Titolo" → "### N. Titolo" (numerati: aggiunge punto)
# "#### B) Titolo" → "### B) Titolo" (lettera: solo cambio livello)
# "#### " → rimosso (vuoti)
text = re.sub(
r"^#{3,6}\s*$",
"",
text,
flags=re.MULTILINE,
)
text = re.sub(
r"^(#{3,6})\s+(\d{1,3})\s+(.+)$",
lambda m: f"### {m.group(2)}. {m.group(3)}",
text,
flags=re.MULTILINE,
)
text = re.sub(
r"^#{4,6}\s+(.+)$",
r"### \1",
text,
flags=re.MULTILINE,
)
# 0e. Converti voci articolo "- Art. N. Titolo. Corpo" → "### Art. N. Titolo.\n\nCorpo"
# Eseguito dopo la promozione h4+ → h3 (0d) per non duplicare Art. già header.
# Eseguito prima del merge paragrafi (5): il boundary ### previene la fusione.
text, n_art = _extract_article_headers(text)
stats["n_articoli_estratti"] = n_art
# 1. Rimuovi **bold** negli header esistenti: ## **Titolo** → ## Titolo
text = re.sub(
r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$",
r"\1 \2",
text, flags=re.MULTILINE,
)
# 1b. Normalizza header ALL-CAPS → sentence-case
def _norm_allcaps_header(m: re.Match) -> str:
hashes, content = m.group(1), m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(r"^(#{1,6}) (.+)$", _norm_allcaps_header, text, flags=re.MULTILINE)
# 2. Rimuovi righe TOC: header "# Indice", "# Contents", ecc.
# + le voci lista numeriche che seguono (TOC senza dot-leader, es. Nietzsche):
# "- 1. Dei pregiudizi dei filosofi" → rimossa se viene subito dopo un header TOC.
# Le voci con dot-leader sono già rimosse da 0b_pre.
# Gli header rimasti senza corpo vengono poi eliminati dal transform 9.
lines = text.split("\n")
new_lines = []
_in_toc = False
for line in lines:
bare = re.sub(r"^#+\s*", "", line.strip())
first_word = bare.split(".")[0].strip().lower()
if first_word in _TOC_KEYWORDS:
stats["toc_rimosso"] = True
_in_toc = True
continue
if _in_toc:
# Salta righe vuote e voci lista numeriche (- N. Titolo / - N Titolo)
if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line):
continue
_in_toc = False
new_lines.append(line)
text = "\n".join(new_lines)
# 3. Converti righe ALL-CAPS standalone → ## header
blocks = text.split("\n\n")
new_blocks = []
for block in blocks:
stripped = block.strip()
if "\n" not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
stats["n_header_allcaps"] += 1
else:
sub_lines = block.split("\n")
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
stats["n_header_allcaps"] += 1
else:
converted.append(ln)
new_blocks.append("\n".join(converted))
text = "\n\n".join(new_blocks)
# 4. Converti sezioni numerate "N. testo" → "### N.\n\ntesto"
# Guarda che il testo non sia una frase completa (es. esercizi numerati):
# se termina con "." ed è più lungo di 40 caratteri, è probabilmente una frase,
# non un titolo di sezione → lascia invariato.
def _num_repl(m: re.Match) -> str:
content = m.group(2).strip()
if content.endswith(".") and len(content) > 40:
return m.group(0)
stats["n_sezioni_numerate"] += 1
return f"### {m.group(1)}.\n\n{content}"
text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE)
def _num_letter_repl(m: re.Match) -> str:
stats["n_sezioni_numerate"] += 1
return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}"
text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE)
# 4b. Converti "- N. testo" sezioni con punto → "### N.\n\ntesto"
# "- 1. Testo del primo punto..." → "### 1.\n\nTesto del primo punto..."
# Deve precedere 4c: "- N." ha il punto, "- N testo" no.
# Disabilitato se il documento contiene sezioni "Esercizi": in quel caso i
# "- N. testo" sono numerazioni di esercizi, non header di sezione.
if not _has_exercise_sections:
def _aphorism_repl(m: re.Match) -> str:
stats["n_sezioni_numerate"] += 1
return f"\n\n### {m.group(1)}.\n\n{m.group(2).strip()}"
text = re.sub(
r"^-\s+(\d{1,3})\.\s+(.{10,})$",
_aphorism_repl,
text,
flags=re.MULTILINE,
)
# 4c. Converti "- N testo" list item numerati → "### N.\n\ntesto"
# "- 12 Titolo sezione Corpo della sezione..." → "### 12. Titolo sezione\n\nCorpo..."
# Non tocca "- a) testo", "- 1) testo" (già gestiti come liste)
def _list_section_repl(m: re.Match) -> str:
num = m.group(1)
content = m.group(2).strip()
stats["n_sezioni_numerate"] += 1
# Separa titolo da corpo: il titolo finisce dove una lettera minuscola
# è seguita da spazio e maiuscola (confine fine-titolo / inizio-corpo)
split = re.search(r"(?<=[a-zàèéìíòóùú])\s+(?=[A-ZÀÈÉÌÍÒÓÙÚ])", content)
if split and split.start() >= 3:
title = content[: split.start()].strip()
body = content[split.end() :].strip()
if len(body) >= 20:
return f"\n\n### {num}. {title}\n\n{body}"
# Nessun body inline: il content è solo il titolo
return f"\n\n### {num}. {content}"
text = re.sub(
r"^-\s+(\d{1,3})\s+([A-ZÀÈÉÌÍÒÓÙÚ\'L].{10,})$",
_list_section_repl,
text,
flags=re.MULTILINE,
)
# 4d. Converti ambienti matematici (Teorema/Definizione/...) → ### header
# Eseguito prima del merge paragrafi (5) per sfruttare i blocchi intatti.
text, n_math = _extract_math_environments(text)
stats["n_ambienti_matematici"] = n_math
# 5. Unisci paragrafi spezzati da salti pagina PDF
_SENTENCE_END = set(".?!»)\"'")
blocks = text.split("\n\n")
merged = []
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith("#")
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
if not nxt or nxt.startswith("#") or re.match(r"^\d+\.", nxt):
break
b = stripped + " " + nxt
stripped = b.strip()
stats["n_paragrafi_uniti"] += 1
i += 1
merged.append(b)
i += 1
text = "\n\n".join(merged)
# Secondo pass: rimuovi prefisso |---| eventualmente rimasto dopo il merge paragrafi
text = re.sub(r"(?m)^\|---\|\s*", "", text)
# 6. Normalizza whitespace multiplo interno alle righe
lines = text.split("\n")
text = "\n".join(
re.sub(r" +", " ", line) if line.strip() else line
for line in lines
)
# 7. Riduci righe vuote multiple a doppie
text = re.sub(r"\n{3,}", "\n\n", text)
# 8. Rimuovi righe che sono solo URL (watermark, footer di piattaforme)
text = re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text)
# 9. Rimuovi header senza corpo: header seguito solo da righe vuote e poi
# da un altro header o dalla fine del testo (sezioni vuote / watermark)
blocks = re.split(r"\n{2,}", text)
cleaned = []
for i, block in enumerate(blocks):
stripped = block.strip()
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
if not next_stripped or re.match(r"^#{1,6} ", next_stripped):
continue # header senza corpo → scarta
cleaned.append(block)
text = re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned))
# 9b. Fondi header numerici isolati con il sottotitolo breve successivo
# "### N.\n\nSottotitolo" → "### N. Sottotitolo" (es. parti Nietzsche)
text, n_titoli = _merge_title_headers(text)
stats["n_titoli_uniti"] = n_titoli
# 9c. Rimuovi garbage headers: header ### senza parole reali o con solo
# abbreviazioni matematiche. Esempi: "### ( vm)", "### #", "### ! =",
# "### (am)", "### 2. Il valore di hf si deter- mina risolvendo mg(h!hf)"
# Questi nascono da espressioni matematiche scambiate per titoli di sezione.
# Il corpo rimane nel testo e viene accorpato alla sezione precedente.
def _is_garbage_header(content: str) -> bool:
# Header con prefisso "..." — frammento di formula (es. "...Di", "...vi")
if content.lstrip().startswith("..."):
return True
# Nessuna sequenza alfabetica ≥ 2 char
if not re.search(r"[A-Za-zÀ-ÿ]{2,}", content):
return True
# Abbreviazione corta in parentesi opzionali: "(vm)", "( am)", "(am)"
if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()):
return True
# Header molto lungo (>60ch) con artefatti formula inline
if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content):
return True
return False
lines = text.split("\n")
new_lines = []
for line in lines:
m = re.match(r"^#{1,6} (.+)$", line)
if m and _is_garbage_header(m.group(1)):
stats["n_garbage_headers_rimossi"] += 1
continue
new_lines.append(line)
text = "\n".join(new_lines)
text = re.sub(r"\n{3,}", "\n\n", text)
# 9d. Rimuovi sezioni frontmatter: header senza numero + corpo corto con
# URL, email, affiliazione, copyright, edizione — metadati non-contenuto.
_FM_RE = re.compile(
r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|"
r"\bCopyright\b|\bLicenza\b|\bEdizione\b|"
r"protetto da|tutti i diritti",
re.IGNORECASE,
)
blocks = re.split(r"\n{2,}", text)
cleaned = []
for i, block in enumerate(blocks):
stripped = block.strip()
if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped):
cleaned.append(block)
continue
body = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
is_fm_body = len(body) < 250 and _FM_RE.search(body)
is_fm_hdr = _FM_RE.search(stripped)
if is_fm_body or is_fm_hdr:
stats["n_frontmatter_rimossi"] += 1
continue
cleaned.append(block)
text = re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned))
return text, stats
# ─── [4] Rilevamento struttura ───────────────────────────────────────────────
_IT_WORDS = frozenset([
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
])
_EN_WORDS = frozenset([
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
def _detect_language(text: str) -> str:
words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())
sample = words[:2000]
it = sum(1 for w in sample if w in _IT_WORDS)
en = sum(1 for w in sample if w in _EN_WORDS)
if it == 0 and en == 0:
return "unknown"
return "it" if it >= en else "en"
def _count_headers(text: str, level: int) -> int:
prefix = "#" * level + " "
return len(re.findall(rf"(?m)^{re.escape(prefix)}", text))
def _count_paragraphs(text: str) -> int:
blocks = re.split(r"\n{2,}", text)
return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip()))
def _split_sections(text: str, level: int) -> list[str]:
prefix = "#" * level + " "
parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text)
return [p for p in parts[1:] if p.strip()]
def analyze(md_path: Path) -> dict:
text = md_path.read_text(encoding="utf-8")
n_h1 = _count_headers(text, 1)
n_h2 = _count_headers(text, 2)
n_h3 = _count_headers(text, 3)
n_paragrafi = _count_paragraphs(text)
if n_h3 >= 5:
livello, boundary, strategia = 3, "h3", "h3_aware"
section_bodies = _split_sections(text, 3)
elif n_h2 >= 3:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = _split_sections(text, 2)
elif n_h1 + n_h2 + n_h3 >= 1:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
elif n_paragrafi >= 3:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
else:
livello, boundary, strategia = 0, "nessuno", "sliding_window"
section_bodies = [text] if text.strip() else []
lengths = [len(b) for b in section_bodies if b.strip()]
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
lingua = _detect_language(text)
avvertenze = []
short = sum(1 for l in lengths if l < 200)
long_ = sum(1 for l in lengths if l > 800)
if short:
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
if long_:
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
return {
"livello_struttura": livello,
"n_h1": n_h1,
"n_h2": n_h2,
"n_h3": n_h3,
"n_paragrafi": n_paragrafi,
"boundary_primario": boundary,
"lingua_rilevata": lingua,
"lunghezza_media_sezione": lunghezza_media,
"strategia_chunking": strategia,
"avvertenze": avvertenze,
}
# ─── Report di conversione ───────────────────────────────────────────────────
def build_report(
stem: str,
out_dir: Path,
clean_text: str,
t_stats: dict,
profile: dict,
reduction: float,
) -> Path:
"""
Genera conversione/<stem>/report.json con tutte le metriche di qualità:
statistiche trasformazioni, struttura, distribuzione lunghezze, anomalie
e problemi residui. Leggibile da validate.py per la validazione batch.
"""
text_lines = clean_text.split("\n")
# ── Raccolta sezioni ### con corpo ────────────────────────────────────
sections: list[tuple[str, str]] = []
cur_hdr: str | None = None
cur_body: list[str] = []
for line in text_lines:
if re.match(r"^### ", line):
if cur_hdr is not None:
sections.append((cur_hdr, "\n".join(cur_body).strip()))
cur_hdr = line
cur_body = []
elif cur_hdr is not None:
cur_body.append(line)
if cur_hdr is not None:
sections.append((cur_hdr, "\n".join(cur_body).strip()))
lengths = [len(body) for _, body in sections]
# ── Distribuzione lunghezze ───────────────────────────────────────────
def _pct(data: list[int], p: float) -> int:
if not data:
return 0
s = sorted(data)
return s[max(0, min(len(s) - 1, int(len(s) * p)))]
distribution = {
"min": min(lengths) if lengths else 0,
"p25": _pct(lengths, 0.25),
"mediana": _pct(lengths, 0.50),
"p75": _pct(lengths, 0.75),
"max": max(lengths) if lengths else 0,
}
# ── Anomalie ──────────────────────────────────────────────────────────
# Header solo-numero senza corpo sostanziale: anomalia solo se il corpo
# è vuoto o < 30 chars. Un body lungo è una sezione numerata legittima
# (es. aforismi numerati dove il numero è l'identificatore della sezione).
bare_hdrs = [
{"header": hdr, "corpo_inizio": body[:120].replace("\n", " ")}
for hdr, body in sections
if re.match(r"^### \d+\.\s*$", hdr) and len(body.strip()) < 30
]
short_secs = [
{"header": hdr, "chars": length, "testo": body[:80].replace("\n", " ")}
for (hdr, body), length in zip(sections, lengths)
if 0 < length < 150
]
long_secs = [
{"header": hdr, "chars": length}
for (hdr, _), length in zip(sections, lengths)
if length > 1500
]
# ── Problemi residui (max 10 esempi ciascuno) ─────────────────────────
def _scan(pattern: str, max_n: int = 10) -> list[dict]:
hits = []
for i, line in enumerate(text_lines):
if re.search(pattern, line) and not re.match(r"^#+ ", line):
hits.append({"riga": i + 1, "testo": line.strip()[:120]})
if len(hits) >= max_n:
break
return hits
residui = {
"backtick": _scan(r"`"),
"dotleader": _scan(r"(?:\. ){3,}"),
"url": _scan(r"^(https?://|www\.)\S+"),
"immagini": _scan(r"!\[[^\]]*\]\([^)]*\)"),
"br_inline": _scan(r"<br>"),
"simboli_encoding":_scan(r'(?<=[0-9A-Za-z])[!"](?=[0-9A-Za-z])'),
"formule_inline": _scan(r"\[\d+\.\d+\]"),
}
# ── Composizione report ───────────────────────────────────────────────
report = {
"stem": stem,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
"transforms": {
**t_stats,
"riduzione_pct": round(reduction),
},
"structure": profile,
"distribution": distribution,
"anomalie": {
"bare_headers": len(bare_hdrs),
"short_sections": len(short_secs),
"long_sections": len(long_secs),
"bare_headers_list": bare_hdrs,
"short_sections_list": short_secs,
"long_sections_list": long_secs,
},
"residui": {
"backtick": len(residui["backtick"]),
"dotleader": len(residui["dotleader"]),
"url": len(residui["url"]),
"immagini": len(residui["immagini"]),
"br_inline": len(residui["br_inline"]),
"simboli_encoding": len(residui["simboli_encoding"]),
"formule_inline": len(residui["formule_inline"]),
"backtick_esempi": residui["backtick"],
"dotleader_esempi": residui["dotleader"],
"url_esempi": residui["url"],
"immagini_esempi": residui["immagini"],
"br_inline_esempi": residui["br_inline"],
"simboli_encoding_esempi": residui["simboli_encoding"],
"formule_inline_esempi": residui["formule_inline"],
},
}
report_path = out_dir / "report.json"
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
return report_path
# ─── Pipeline principale ──────────────────────────────────────────────────────
def run(stem: str, project_root: Path, force: bool) -> bool:
pdf_path = project_root / "sources" / f"{stem}.pdf"
out_dir = project_root / "conversione" / stem
raw_out = out_dir / "raw.md"
clean_out = out_dir / "clean.md"
print(f"\n{'' * 52}")
print(f" {stem}")
print(f"{'' * 52}")
if clean_out.exists() and not force:
print(f" ⚠️ conversione/{stem}/clean.md già presente — skip")
print(f" (usa --force per rieseguire)")
return True
# ── [1] Validazione ────────────────────────────────────────────────────
print(" [1/4] Validazione PDF...")
ok, msg = check_pdf(pdf_path)
if not ok:
print(f"{msg}")
return False
print(f"{msg}")
# ── [2] Conversione ────────────────────────────────────────────────────
print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...")
with tempfile.TemporaryDirectory() as tmp:
try:
md_file = convert_pdf(pdf_path, Path(tmp))
except Exception as e:
print(f" ✗ Conversione fallita: {e}")
return False
raw_text = md_file.read_text(encoding="utf-8")
size_kb = len(raw_text.encode()) // 1024
n_lines = raw_text.count("\n")
print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe")
# ── [3] Pulizia strutturale ────────────────────────────────────────────
print(" [3/4] Pulizia strutturale...")
clean_text, t_stats = apply_transforms(raw_text)
reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0
print(f" ✅ Immagini rimosse: {t_stats['n_immagini_rimosse']}")
print(f" Accenti corretti: {t_stats['n_accenti_corretti']}")
print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}")
print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}")
print(f" Articoli → ###: {t_stats['n_articoli_estratti']}")
print(f" Ambienti matematici: {t_stats['n_ambienti_matematici']}")
print(f" Titoli header uniti: {t_stats['n_titoli_uniti']}")
print(f" TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}")
print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}")
print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}")
print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}")
print(f" Riduzione testo: {reduction:.0f}%")
# ── [4] Profilo strutturale ────────────────────────────────────────────
print(" [4/4] Analisi struttura...")
out_dir.mkdir(parents=True, exist_ok=True)
raw_out.write_text(raw_text, encoding="utf-8")
clean_out.write_text(clean_text, encoding="utf-8")
profile = analyze(clean_out)
_LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}
print(f" ✅ Struttura: livello {profile['livello_struttura']}{_LIVELLO_DESC[profile['livello_struttura']]}")
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} "
f"paragrafi={profile['n_paragrafi']}")
print(f" Strategia chunking: {profile['strategia_chunking']}")
print(f" Lingua rilevata: {profile['lingua_rilevata']}")
for w in profile["avvertenze"]:
print(f" ⚠️ {w}")
build_report(stem, out_dir, clean_text, t_stats, profile, reduction)
print(f"\n Output:")
print(f" conversione/{stem}/raw.md (immutabile)")
print(f" conversione/{stem}/clean.md")
print(f" conversione/{stem}/report.json")
print(f"\n clean.md pronto per la suddivisione in chunk.")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(
description="Pipeline PDF → clean Markdown strutturato, pronto per chunking",
epilog="Prerequisiti: pip install opendataloader-pdf + Java 11+ sul PATH",
)
parser.add_argument(
"--stem",
help="Nome del documento (PDF in sources/<stem>.pdf). "
"Se omesso, elabora tutti i PDF in sources/.",
)
parser.add_argument(
"--force",
action="store_true",
help="Riesegui anche se clean.md è già presente",
)
args = parser.parse_args()
_check_deps()
if args.stem:
stems = [args.stem]
else:
sources_dir = project_root / "sources"
if not sources_dir.exists():
print("Errore: cartella sources/ non trovata")
sys.exit(1)
stems = sorted(p.stem for p in sources_dir.glob("*.pdf"))
if not stems:
print("Errore: nessun PDF trovato in sources/")
sys.exit(1)
results = [run(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti")
sys.exit(0 if all(results) else 1)