Files
rag-from-scratch/conversione/pipeline.py
T
davide 757df26bc2 refactor(pipeline): modularizza apply_transforms in 26 funzioni _t_xxx
Estrae ogni trasformazione strutturale in una funzione dedicata
_t_xxx(text) -> tuple[str, int], sostituendo la mega-function da
418 righe con un loop su lista di coppie (stat_key, fn). Aggiunge
_parse_sections_with_body() condivisa tra analyze() e build_report().
Output identico verificato su tutti e 5 gli stem esistenti
2026-04-17 09:46:59 +02:00

1130 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
conversione/pipeline.py — PDF → clean Markdown (pipeline automatica)
Converte un PDF grezzo in Markdown strutturato e pulito, pronto per la
suddivisione in chunk. Gestisce validazione, estrazione testo, pulizia
strutturale e rilevamento automatico della struttura del documento.
Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura corretto,
testo fluente, struttura preservata).
Output per ciascuno stem:
conversione/<stem>/raw.md — Markdown grezzo (immutabile)
conversione/<stem>/clean.md — Markdown pulito e strutturato
conversione/<stem>/structure_profile.json
Uso:
python conversione/pipeline.py --stem <nome>
python conversione/pipeline.py # tutti i PDF in sources/
python conversione/pipeline.py --stem <nome> --force # forza riesecuzione
Prerequisiti:
pip install opendataloader-pdf
Java 11+ sul PATH (https://adoptium.net/)
"""
import argparse
import json
import re
import subprocess
import sys
import tempfile
from datetime import datetime
from functools import partial
from pathlib import Path
# ─── Verifica dipendenze ──────────────────────────────────────────────────────
def _check_deps() -> None:
try:
import opendataloader_pdf # noqa: F401
except ImportError:
print("Errore: opendataloader-pdf non installato.")
print(" pip install opendataloader-pdf")
sys.exit(1)
try:
result = subprocess.run(
["java", "-version"],
capture_output=True, text=True,
)
if result.returncode != 0:
raise FileNotFoundError
except FileNotFoundError:
print("Errore: Java 11+ non trovato sul PATH.")
print(" Installa da https://adoptium.net/")
sys.exit(1)
# ─── [1] Validazione PDF ─────────────────────────────────────────────────────
def check_pdf(pdf_path: Path) -> tuple[bool, str]:
"""
Validazione rapida: esistenza, leggibilità, testo estraibile.
Restituisce (ok, messaggio).
"""
if not pdf_path.exists():
return False, f"File non trovato: {pdf_path}"
if pdf_path.suffix.lower() != ".pdf":
return False, f"Non è un PDF: {pdf_path.name}"
if pdf_path.stat().st_size == 0:
return False, "File vuoto"
try:
import pdfplumber
with pdfplumber.open(pdf_path) as pdf:
n_pages = len(pdf.pages)
if n_pages == 0:
return False, "PDF senza pagine"
sample = min(5, n_pages)
pages_with_text = sum(
1 for i in range(sample)
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if pages_with_text == 0:
return False, (
f"Nessun testo nelle prime {sample} pagine "
f"— probabilmente scansionato (usa modalità hybrid)"
)
return True, f"{n_pages} pagine, testo digitale confermato"
except Exception as e:
msg = str(e).lower()
if "password" in msg or "encrypted" in msg:
return False, "PDF protetto da password"
return False, f"Impossibile aprire: {e}"
# ─── [2] Conversione PDF → Markdown ─────────────────────────────────────────
def convert_pdf(pdf_path: Path, out_dir: Path) -> Path:
"""
Converte il PDF in Markdown tramite opendataloader-pdf.
Scrive il file nella out_dir e restituisce il percorso.
Parametri scelti per output RAG-ottimale:
- keep_line_breaks=False → testo fluente, no hard-wrap PDF
- reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++)
- sanitize=False → preserva il testo originale (no anonimizzazione PII)
"""
import opendataloader_pdf
out_dir.mkdir(parents=True, exist_ok=True)
opendataloader_pdf.convert(
input_path=str(pdf_path),
output_dir=str(out_dir),
format="markdown",
keep_line_breaks=False,
reading_order="xycut",
sanitize=False,
image_output="off", # nessuna immagine estratta né referenziata
quiet=True, # sopprime i log Java
)
# Il file output si chiama <stem>.md
md_file = out_dir / f"{pdf_path.stem}.md"
if not md_file.exists():
candidates = list(out_dir.glob("*.md"))
if not candidates:
raise RuntimeError(f"Nessun file .md prodotto in {out_dir}")
md_file = candidates[0]
return md_file
# ─── [3] Pulizia strutturale ─────────────────────────────────────────────────
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
])
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
def _sentence_case(s: str) -> str:
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
)
def _allcaps_to_header(raw_line: str) -> str:
# Rimuovi eventuale prefisso di lista "- " o "* " prima di creare l'header
text = re.sub(r"^[-*+]\s+", "", raw_line.strip())
text = text.rstrip(".").rstrip("?").strip()
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
if m:
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
return f"## {_sentence_case(text)}"
def _extract_math_environments(text: str) -> tuple[str, int]:
"""
Converte paragrafi che iniziano con ambienti matematici in header ###.
'Teorema 1.6.3 (principio di induzione) Sia A ⊆ N...'
'### Teorema 1.6.3 (principio di induzione)\n\nSia A ⊆ N...'
Riconosce: Definizione, Teorema, Lemma, Proposizione, Corollario,
Osservazione, Nota, Esempio (solo con numero di sezione).
Non tocca paragrafi che già iniziano con un header Markdown.
Deve girare PRIMA del merge paragrafi (step 5) per sfruttare i blocchi intatti.
"""
_ENVS = (
r"Definizione|Teorema|Lemma|Proposizione|"
r"Corollario|Osservazione|Nota|Esempio"
)
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
m = re.match(
rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)",
stripped,
re.DOTALL,
)
if not m:
result.append(block)
continue
env = m.group(1)
num = m.group(2).rstrip(".")
rest = m.group(3).strip()
# Titolo opzionale tra parentesi: "(principio di induzione)"
title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL)
if title_m:
header = f"### {env} {num} {title_m.group(1)}"
body = title_m.group(2).strip()
else:
header = f"### {env} {num}."
body = rest
result.append(f"{header}\n\n{body}" if body else header)
count += 1
return "\n\n".join(result), count
def _merge_title_headers(text: str) -> tuple[str, int]:
"""
Fonde header numerici isolati con il sottotitolo breve che li segue.
'### N.\n\nSottotitolo (riga singola ≤ 80 char, senza punto finale)'
'### N. Sottotitolo'
Caso tipico: parti di un'opera (es. Nietzsche) dove il numero di sezione
e il titolo della sezione sono in blocchi Markdown separati.
Non tocca header con titolo già inline né header seguiti da testo lungo.
"""
count = 0
blocks = re.split(r"\n{2,}", text)
result = []
i = 0
while i < len(blocks):
block = blocks[i]
stripped = block.strip()
if (
re.match(r"^#{2,3} \d+\.\s*$", stripped)
and i + 1 < len(blocks)
):
nxt = blocks[i + 1].strip()
# Sottotitolo valido: riga singola, ≤ 80 char, non header, non numerazione pura
if (
nxt
and "\n" not in nxt
and len(nxt) <= 80
and not nxt.startswith("#")
and not re.match(r"^\d+[\.\)]\s", nxt)
):
result.append(stripped.rstrip() + " " + nxt)
count += 1
i += 2
continue
result.append(block)
i += 1
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count
def _extract_article_headers(text: str) -> tuple[str, int]:
"""
Converte voci di articolo dal formato lista Markdown al formato header ###.
'- Art. N[suffix]. Titolo. Corpo testo...''### Art. N[suffix]. Titolo.\n\nCorpo testo...'
'- Art. N[suffix]. (…) (1)''### Art. N[suffix].\n\n(…) (1)'
Gestisce suffissi come: Art. 4-bis., Art. 14-ter., Art. 1-quinquies.
Il titolo è la prima frase con iniziale maiuscola che termina con '.' prima di
ulteriore testo (es. "Leggi. La formazione..." → titolo "Leggi", corpo "La formazione...").
Se il testo non ha titolo separabile, tutto diventa il corpo.
"""
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
rest = m.group(2).strip()
# Titolo: frase con iniziale maiuscola, max 75 char, termina con '.',
# seguita da almeno un'altra frase (minimo 5 char) che inizia con maiuscola
# o con '(' / cifra (note a piè o continuazione corpo).
title_m = re.match(
r"^([A-ZÀÈÉÌÍÒÓÙÚ].{1,74}?)\.\s+([A-ZÀÈÉÌÍÒÓÙÚ\(\d].{4,})",
rest,
)
if title_m:
count += 1
return (
f"### Art. {num}. {title_m.group(1)}.\n\n"
f"{title_m.group(2).strip()}"
)
# Nessun titolo separabile: tutto è corpo
if rest:
count += 1
return f"### Art. {num}.\n\n{rest}"
# Articolo senza testo inline (es. "- Art. 5. (…) (1)" già estratto sopra,
# oppure articolo vuoto nella lista)
count += 1
return f"### Art. {num}."
text = re.sub(
r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)",
_repl,
text,
flags=re.MULTILINE,
)
return text, count
# ─── [3a] Funzioni di trasformazione ─────────────────────────────────────────
def _t_remove_images(text: str) -> tuple[str, int]:
n = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text))
text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text)
return text, n
def _t_fix_br(text: str) -> tuple[str, int]:
n = len(re.findall(r"<br>", text, re.IGNORECASE))
text = re.sub(r"<br>\s*", " ", text, flags=re.IGNORECASE)
return text, n
def _t_fix_tabsep(text: str) -> tuple[str, int]:
_pat = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$")
n = len(_pat.findall(text))
text = _pat.sub("", text)
return text, n
def _t_fix_accents(text: str) -> tuple[str, int]:
"""Fix artefatti backtick da PDF LaTeX: `e→è, e`→è, sar`a→sarà, ecc."""
_ACCENT_MAP = {
"e": "è", "E": "È", "a": "à", "A": "À",
"u": "ù", "U": "Ù", "i": "ì", "I": "Ì", "o": "ò", "O": "Ò",
}
n_bt_before = text.count("`")
text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text)
text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text)
n_accenti = n_bt_before - text.count("`")
# Backtick orfani: artefatti LaTeX rimasti dopo la correzione vocale
n_bt_orfani = text.count("`")
if n_bt_orfani:
text = re.sub(r"`", "", text)
n_accenti += n_bt_orfani
return text, n_accenti
def _t_fix_multiplication(text: str) -> tuple[str, int]:
"""Fix segno di moltiplicazione "→× (encoding font PDF non-standard)."""
n = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text))
text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text)
return text, n
def _t_fix_micro(text: str) -> tuple[str, int]:
"""Fix prefisso micro !→µ prima di unità SI note."""
_SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]'
n = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text))
text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text)
return text, n
def _t_remove_formula_labels(text: str) -> tuple[str, int]:
"""Rimuovi label formule inline [N.M] — es. [3.4], [10.7]."""
n = len(re.findall(r"\[\d+\.\d+\]", text))
text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text)
return text, n
def _t_remove_dotleaders(text: str) -> tuple[str, int]:
"""Rimuovi righe con dot-leader e numerali romani isolati (footer TOC)."""
_DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$"
n = len(re.findall(_DOTLEADER_RE, text, re.MULTILINE))
text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE)
text = re.sub(
r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$",
"",
text,
flags=re.IGNORECASE,
)
return text, n
def _t_fix_header_concat(text: str) -> tuple[str, int]:
"""Fix header + body concatenati senza separatore."""
count = 0
def _fix(m: re.Match) -> str:
nonlocal count
hashes = m.group(1)
full = m.group(2).strip()
if len(full) < 60:
return m.group(0)
skip = min(10, len(full) // 3)
split = re.search(r"(?<=[a-zàèéìíòóùúä])(?=[A-ZÀÈÉÌÍÒÓÙÚ])", full[skip:])
if split:
pos = skip + split.start()
title = full[:pos].strip()
body = full[pos:].strip()
if len(title) >= 5 and len(body) >= 15:
count += 1
return f"{hashes} {title}\n\n{body}"
return m.group(0)
text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix, text, flags=re.MULTILINE)
return text, count
def _t_extract_capitolo(text: str) -> tuple[str, int]:
"""Estrai 'Capitolo N: TITOLO' inline nel corpo del testo → ## header."""
def _repl(m: re.Match) -> str:
num = m.group(1)
titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip())
return f"\n\n## Capitolo {num}: {titolo}\n\n"
text = re.sub(
r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-ZÀÈÉÌÍÒÓÙÚ\'L][A-ZÀÈÉÌÍÒÓÙÚ\s\'\.,\(\)]{5,80}?)"
r"(?=\s*[-]\s*\d|\s*\n|\s*$)",
_repl,
text,
)
return text, 0
def _t_normalize_header_levels(text: str) -> tuple[str, int]:
"""Normalizza h4+ → h3; rimuove header vuoti."""
text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE)
text = re.sub(
r"^(#{3,6})\s+(\d{1,3})\s+(.+)$",
lambda m: f"### {m.group(2)}. {m.group(3)}",
text,
flags=re.MULTILINE,
)
text = re.sub(r"^#{4,6}\s+(.+)$", r"### \1", text, flags=re.MULTILINE)
return text, 0
def _t_extract_articles(text: str) -> tuple[str, int]:
"""Converti voci articolo '- Art. N.''### Art. N.'"""
return _extract_article_headers(text)
def _t_remove_header_bold(text: str) -> tuple[str, int]:
"""Rimuovi **bold** negli header esistenti."""
text = re.sub(
r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$",
r"\1 \2",
text, flags=re.MULTILINE,
)
return text, 0
def _t_normalize_allcaps_headers(text: str) -> tuple[str, int]:
"""Normalizza header ALL-CAPS → sentence-case."""
def _norm(m: re.Match) -> str:
hashes, content = m.group(1), m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(r"^(#{1,6}) (.+)$", _norm, text, flags=re.MULTILINE)
return text, 0
def _t_remove_toc(text: str) -> tuple[str, int]:
"""Rimuovi header TOC e voci lista numerate che seguono."""
lines = text.split("\n")
new_lines = []
_in_toc = False
removed = False
for line in lines:
bare = re.sub(r"^#+\s*", "", line.strip())
first_word = bare.split(".")[0].strip().lower()
if first_word in _TOC_KEYWORDS:
removed = True
_in_toc = True
continue
if _in_toc:
if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line):
continue
_in_toc = False
new_lines.append(line)
return "\n".join(new_lines), 1 if removed else 0
def _t_allcaps_to_headers(text: str) -> tuple[str, int]:
"""Converti righe ALL-CAPS standalone → ## header."""
count = 0
blocks = text.split("\n\n")
new_blocks = []
for block in blocks:
stripped = block.strip()
if "\n" not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
count += 1
else:
sub_lines = block.split("\n")
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
count += 1
else:
converted.append(ln)
new_blocks.append("\n".join(converted))
return "\n\n".join(new_blocks), count
def _t_numbered_sections(text: str, has_exercises: bool = False) -> tuple[str, int]:
"""Converti sezioni numerate 'N. testo' / '- N. testo' / '- N testo' → ### header."""
count = 0
def _num_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if content.endswith(".") and len(content) > 40:
return m.group(0)
count += 1
return f"### {m.group(1)}.\n\n{content}"
text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE)
def _num_letter_repl(m: re.Match) -> str:
nonlocal count
count += 1
return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}"
text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE)
# Disabilitato se il documento contiene sezioni "Esercizi": in quel caso i
# "- N. testo" sono numerazioni di esercizi, non header di sezione.
if not has_exercises:
def _aphorism_repl(m: re.Match) -> str:
nonlocal count
count += 1
return f"\n\n### {m.group(1)}.\n\n{m.group(2).strip()}"
text = re.sub(
r"^-\s+(\d{1,3})\.\s+(.{10,})$",
_aphorism_repl,
text,
flags=re.MULTILINE,
)
def _list_section_repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
content = m.group(2).strip()
count += 1
split = re.search(r"(?<=[a-zàèéìíòóùú])\s+(?=[A-ZÀÈÉÌÍÒÓÙÚ])", content)
if split and split.start() >= 3:
title = content[: split.start()].strip()
body = content[split.end():].strip()
if len(body) >= 20:
return f"\n\n### {num}. {title}\n\n{body}"
return f"\n\n### {num}. {content}"
text = re.sub(
r"^-\s+(\d{1,3})\s+([A-ZÀÈÉÌÍÒÓÙÚ\'L].{10,})$",
_list_section_repl,
text,
flags=re.MULTILINE,
)
return text, count
def _t_extract_math(text: str) -> tuple[str, int]:
"""Converti ambienti matematici (Teorema/Definizione/...) → ### header."""
return _extract_math_environments(text)
def _t_merge_paragraphs(text: str) -> tuple[str, int]:
"""Unisci paragrafi spezzati da salti pagina PDF."""
_SENTENCE_END = set(".?!»)\"'")
blocks = text.split("\n\n")
merged = []
count = 0
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith("#")
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
if not nxt or nxt.startswith("#") or re.match(r"^\d+\.", nxt):
break
b = stripped + " " + nxt
stripped = b.strip()
count += 1
i += 1
merged.append(b)
i += 1
text = "\n\n".join(merged)
# Secondo pass: rimuovi prefisso |---| eventualmente rimasto dopo il merge
text = re.sub(r"(?m)^\|---\|\s*", "", text)
return text, count
def _t_normalize_whitespace(text: str) -> tuple[str, int]:
"""Normalizza whitespace multiplo interno alle righe."""
lines = text.split("\n")
text = "\n".join(
re.sub(r" +", " ", line) if line.strip() else line
for line in lines
)
return text, 0
def _t_collapse_blank_lines(text: str) -> tuple[str, int]:
"""Riduci righe vuote multiple a doppie."""
return re.sub(r"\n{3,}", "\n\n", text), 0
def _t_remove_urls(text: str) -> tuple[str, int]:
"""Rimuovi righe che sono solo URL (watermark, footer di piattaforme)."""
return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0
def _t_remove_empty_headers(text: str) -> tuple[str, int]:
"""Rimuovi header senza corpo (sezioni vuote / watermark)."""
blocks = re.split(r"\n{2,}", text)
cleaned = []
for i, block in enumerate(blocks):
stripped = block.strip()
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
if not next_stripped or re.match(r"^#{1,6} ", next_stripped):
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0
def _t_merge_title_headers(text: str) -> tuple[str, int]:
"""Fondi header numerici isolati con il sottotitolo breve successivo."""
return _merge_title_headers(text)
def _t_remove_garbage_headers(text: str) -> tuple[str, int]:
"""Rimuovi garbage headers: simboli, abbreviazioni matematiche, frammenti formula."""
def _is_garbage_header(content: str) -> bool:
if content.lstrip().startswith("..."):
return True
if not re.search(r"[A-Za-zÀ-ÿ]{2,}", content):
return True
if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()):
return True
if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content):
return True
return False
count = 0
lines = text.split("\n")
new_lines = []
for line in lines:
m = re.match(r"^#{1,6} (.+)$", line)
if m and _is_garbage_header(m.group(1)):
count += 1
continue
new_lines.append(line)
text = "\n".join(new_lines)
text = re.sub(r"\n{3,}", "\n\n", text)
return text, count
def _t_remove_frontmatter(text: str) -> tuple[str, int]:
"""Rimuovi sezioni frontmatter: URL, email, affiliazione, copyright."""
_FM_RE = re.compile(
r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|"
r"\bCopyright\b|\bLicenza\b|\bEdizione\b|"
r"protetto da|tutti i diritti",
re.IGNORECASE,
)
blocks = re.split(r"\n{2,}", text)
cleaned = []
count = 0
for i, block in enumerate(blocks):
stripped = block.strip()
if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped):
cleaned.append(block)
continue
body = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
is_fm_body = len(body) < 250 and _FM_RE.search(body)
is_fm_hdr = _FM_RE.search(stripped)
if is_fm_body or is_fm_hdr:
count += 1
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count
# ─── [3b] Pipeline delle trasformazioni ──────────────────────────────────────
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica le trasformazioni strutturali al Markdown grezzo.
Restituisce (testo_modificato, statistiche).
"""
# Flag calcolato prima del loop: disabilita il transform 4b nei documenti
# con sezioni "Esercizi" (i "- N. testo" sarebbero numerazioni, non header).
_has_ex = bool(re.search(r"\bEsercizi\b", text, re.IGNORECASE))
_transforms: list[tuple[str | None, object]] = [
("n_immagini_rimosse", _t_remove_images),
("n_br_rimossi", _t_fix_br),
("n_tabsep_rimossi", _t_fix_tabsep),
("n_accenti_corretti", _t_fix_accents),
("n_moltiplicazioni_corrette", _t_fix_multiplication),
("n_micro_corretti", _t_fix_micro),
("n_formule_rimossi", _t_remove_formula_labels),
("n_dotleader_rimossi", _t_remove_dotleaders),
("n_header_concat_fixati", _t_fix_header_concat),
(None, _t_extract_capitolo),
(None, _t_normalize_header_levels),
("n_articoli_estratti", _t_extract_articles),
(None, _t_remove_header_bold),
(None, _t_normalize_allcaps_headers),
("toc_rimosso", _t_remove_toc),
("n_header_allcaps", _t_allcaps_to_headers),
("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)),
("n_ambienti_matematici", _t_extract_math),
("n_paragrafi_uniti", _t_merge_paragraphs),
(None, _t_normalize_whitespace),
(None, _t_collapse_blank_lines),
(None, _t_remove_urls),
(None, _t_remove_empty_headers),
("n_titoli_uniti", _t_merge_title_headers),
("n_garbage_headers_rimossi", _t_remove_garbage_headers),
("n_frontmatter_rimossi", _t_remove_frontmatter),
]
stats: dict = {}
for stat_key, fn in _transforms:
text, n = fn(text)
if stat_key:
stats[stat_key] = stats.get(stat_key, 0) + n
stats["toc_rimosso"] = bool(stats.get("toc_rimosso", 0))
return text, stats
# ─── [4] Rilevamento struttura ───────────────────────────────────────────────
_IT_WORDS = frozenset([
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
])
_EN_WORDS = frozenset([
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
def _detect_language(text: str) -> str:
words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())
sample = words[:2000]
it = sum(1 for w in sample if w in _IT_WORDS)
en = sum(1 for w in sample if w in _EN_WORDS)
if it == 0 and en == 0:
return "unknown"
return "it" if it >= en else "en"
def _count_headers(text: str, level: int) -> int:
prefix = "#" * level + " "
return len(re.findall(rf"(?m)^{re.escape(prefix)}", text))
def _count_paragraphs(text: str) -> int:
blocks = re.split(r"\n{2,}", text)
return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip()))
def _split_sections(text: str, level: int) -> list[str]:
prefix = "#" * level + " "
parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text)
return [p for p in parts[1:] if p.strip()]
def _parse_sections_with_body(text: str, level: int = 3) -> list[tuple[str, str]]:
"""Restituisce lista di (header_line, body_text) per tutti gli header al livello dato."""
prefix = "#" * level + " "
lines = text.split("\n")
sections: list[tuple[str, str]] = []
cur_hdr: str | None = None
cur_body: list[str] = []
for line in lines:
if line.startswith(prefix):
if cur_hdr is not None:
sections.append((cur_hdr, "\n".join(cur_body).strip()))
cur_hdr = line
cur_body = []
elif cur_hdr is not None:
cur_body.append(line)
if cur_hdr is not None:
sections.append((cur_hdr, "\n".join(cur_body).strip()))
return sections
def analyze(md_path: Path) -> dict:
text = md_path.read_text(encoding="utf-8")
n_h1 = _count_headers(text, 1)
n_h2 = _count_headers(text, 2)
n_h3 = _count_headers(text, 3)
n_paragrafi = _count_paragraphs(text)
if n_h3 >= 5:
livello, boundary, strategia = 3, "h3", "h3_aware"
section_bodies = _split_sections(text, 3)
elif n_h2 >= 3:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = _split_sections(text, 2)
elif n_h1 + n_h2 + n_h3 >= 1:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
elif n_paragrafi >= 3:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
else:
livello, boundary, strategia = 0, "nessuno", "sliding_window"
section_bodies = [text] if text.strip() else []
lengths = [len(b) for b in section_bodies if b.strip()]
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
lingua = _detect_language(text)
avvertenze = []
short = sum(1 for l in lengths if l < 200)
long_ = sum(1 for l in lengths if l > 800)
if short:
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
if long_:
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
return {
"livello_struttura": livello,
"n_h1": n_h1,
"n_h2": n_h2,
"n_h3": n_h3,
"n_paragrafi": n_paragrafi,
"boundary_primario": boundary,
"lingua_rilevata": lingua,
"lunghezza_media_sezione": lunghezza_media,
"strategia_chunking": strategia,
"avvertenze": avvertenze,
}
# ─── Report di conversione ───────────────────────────────────────────────────
def build_report(
stem: str,
out_dir: Path,
clean_text: str,
t_stats: dict,
profile: dict,
reduction: float,
) -> Path:
"""
Genera conversione/<stem>/report.json con tutte le metriche di qualità:
statistiche trasformazioni, struttura, distribuzione lunghezze, anomalie
e problemi residui. Leggibile da validate.py per la validazione batch.
"""
text_lines = clean_text.split("\n")
# ── Raccolta sezioni ### con corpo ────────────────────────────────────
sections = _parse_sections_with_body(clean_text, 3)
lengths = [len(body) for _, body in sections]
# ── Distribuzione lunghezze ───────────────────────────────────────────
def _pct(data: list[int], p: float) -> int:
if not data:
return 0
s = sorted(data)
return s[max(0, min(len(s) - 1, int(len(s) * p)))]
distribution = {
"min": min(lengths) if lengths else 0,
"p25": _pct(lengths, 0.25),
"mediana": _pct(lengths, 0.50),
"p75": _pct(lengths, 0.75),
"max": max(lengths) if lengths else 0,
}
# ── Anomalie ──────────────────────────────────────────────────────────
bare_hdrs = [
{"header": hdr, "corpo_inizio": body[:120].replace("\n", " ")}
for hdr, body in sections
if re.match(r"^### \d+\.\s*$", hdr) and len(body.strip()) < 30
]
short_secs = [
{"header": hdr, "chars": length, "testo": body[:80].replace("\n", " ")}
for (hdr, body), length in zip(sections, lengths)
if 0 < length < 150
]
long_secs = [
{"header": hdr, "chars": length}
for (hdr, _), length in zip(sections, lengths)
if length > 1500
]
# ── Problemi residui (max 10 esempi ciascuno) ─────────────────────────
def _scan(pattern: str, max_n: int = 10) -> list[dict]:
hits = []
for i, line in enumerate(text_lines):
if re.search(pattern, line) and not re.match(r"^#+ ", line):
hits.append({"riga": i + 1, "testo": line.strip()[:120]})
if len(hits) >= max_n:
break
return hits
residui = {
"backtick": _scan(r"`"),
"dotleader": _scan(r"(?:\. ){3,}"),
"url": _scan(r"^(https?://|www\.)\S+"),
"immagini": _scan(r"!\[[^\]]*\]\([^)]*\)"),
"br_inline": _scan(r"<br>"),
"simboli_encoding":_scan(r'(?<=[0-9A-Za-z])[!"](?=[0-9A-Za-z])'),
"formule_inline": _scan(r"\[\d+\.\d+\]"),
}
# ── Composizione report ───────────────────────────────────────────────
report = {
"stem": stem,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
"transforms": {
**t_stats,
"riduzione_pct": round(reduction),
},
"structure": profile,
"distribution": distribution,
"anomalie": {
"bare_headers": len(bare_hdrs),
"short_sections": len(short_secs),
"long_sections": len(long_secs),
"bare_headers_list": bare_hdrs,
"short_sections_list": short_secs,
"long_sections_list": long_secs,
},
"residui": {
"backtick": len(residui["backtick"]),
"dotleader": len(residui["dotleader"]),
"url": len(residui["url"]),
"immagini": len(residui["immagini"]),
"br_inline": len(residui["br_inline"]),
"simboli_encoding": len(residui["simboli_encoding"]),
"formule_inline": len(residui["formule_inline"]),
"backtick_esempi": residui["backtick"],
"dotleader_esempi": residui["dotleader"],
"url_esempi": residui["url"],
"immagini_esempi": residui["immagini"],
"br_inline_esempi": residui["br_inline"],
"simboli_encoding_esempi": residui["simboli_encoding"],
"formule_inline_esempi": residui["formule_inline"],
},
}
report_path = out_dir / "report.json"
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
return report_path
# ─── Pipeline principale ──────────────────────────────────────────────────────
def run(stem: str, project_root: Path, force: bool) -> bool:
pdf_path = project_root / "sources" / f"{stem}.pdf"
out_dir = project_root / "conversione" / stem
raw_out = out_dir / "raw.md"
clean_out = out_dir / "clean.md"
print(f"\n{'' * 52}")
print(f" {stem}")
print(f"{'' * 52}")
if clean_out.exists() and not force:
print(f" ⚠️ conversione/{stem}/clean.md già presente — skip")
print(f" (usa --force per rieseguire)")
return True
# ── [1] Validazione ────────────────────────────────────────────────────
print(" [1/4] Validazione PDF...")
ok, msg = check_pdf(pdf_path)
if not ok:
print(f"{msg}")
return False
print(f"{msg}")
# ── [2] Conversione ────────────────────────────────────────────────────
print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...")
with tempfile.TemporaryDirectory() as tmp:
try:
md_file = convert_pdf(pdf_path, Path(tmp))
except Exception as e:
print(f" ✗ Conversione fallita: {e}")
return False
raw_text = md_file.read_text(encoding="utf-8")
size_kb = len(raw_text.encode()) // 1024
n_lines = raw_text.count("\n")
print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe")
# ── [3] Pulizia strutturale ────────────────────────────────────────────
print(" [3/4] Pulizia strutturale...")
clean_text, t_stats = apply_transforms(raw_text)
reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0
print(f" ✅ Immagini rimosse: {t_stats['n_immagini_rimosse']}")
print(f" Accenti corretti: {t_stats['n_accenti_corretti']}")
print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}")
print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}")
print(f" Articoli → ###: {t_stats['n_articoli_estratti']}")
print(f" Ambienti matematici: {t_stats['n_ambienti_matematici']}")
print(f" Titoli header uniti: {t_stats['n_titoli_uniti']}")
print(f" TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}")
print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}")
print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}")
print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}")
print(f" Riduzione testo: {reduction:.0f}%")
# ── [4] Profilo strutturale ────────────────────────────────────────────
print(" [4/4] Analisi struttura...")
out_dir.mkdir(parents=True, exist_ok=True)
raw_out.write_text(raw_text, encoding="utf-8")
clean_out.write_text(clean_text, encoding="utf-8")
profile = analyze(clean_out)
_LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}
print(f" ✅ Struttura: livello {profile['livello_struttura']}{_LIVELLO_DESC[profile['livello_struttura']]}")
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} "
f"paragrafi={profile['n_paragrafi']}")
print(f" Strategia chunking: {profile['strategia_chunking']}")
print(f" Lingua rilevata: {profile['lingua_rilevata']}")
for w in profile["avvertenze"]:
print(f" ⚠️ {w}")
build_report(stem, out_dir, clean_text, t_stats, profile, reduction)
print(f"\n Output:")
print(f" conversione/{stem}/raw.md (immutabile)")
print(f" conversione/{stem}/clean.md")
print(f" conversione/{stem}/report.json")
print(f"\n clean.md pronto per la suddivisione in chunk.")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(
description="Pipeline PDF → clean Markdown strutturato, pronto per chunking",
epilog="Prerequisiti: pip install opendataloader-pdf + Java 11+ sul PATH",
)
parser.add_argument(
"--stem",
help="Nome del documento (PDF in sources/<stem>.pdf). "
"Se omesso, elabora tutti i PDF in sources/.",
)
parser.add_argument(
"--force",
action="store_true",
help="Riesegui anche se clean.md è già presente",
)
args = parser.parse_args()
_check_deps()
if args.stem:
stems = [args.stem]
else:
sources_dir = project_root / "sources"
if not sources_dir.exists():
print("Errore: cartella sources/ non trovata")
sys.exit(1)
stems = sorted(p.stem for p in sources_dir.glob("*.pdf"))
if not stems:
print("Errore: nessun PDF trovato in sources/")
sys.exit(1)
results = [run(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti")
sys.exit(0 if all(results) else 1)