Files
rag-from-scratch/conversione/pipeline.py
T
davide ebd2a43f84 feat: integra pipeline PDF→Markdown a 9 stadi e test suite
Porta da main la riscrittura completa di conversione/_pipeline/ (9 stadi
PyMuPDF) e la suite tests/ senza modificare chunks/, step-8/, rag.py,
ollama/, retrieve.py, config.py.

requirements.txt: aggiunge PyMuPDF>=1.24.0 e pytest>=8.0, mantiene chromadb,
rimuove opendataloader-pdf e pymupdf4llm.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 14:46:16 +02:00

1601 lines
60 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
conversione/pipeline.py — PDF → clean Markdown (pipeline automatica)
Converte un PDF grezzo in Markdown strutturato e pulito, pronto per la
suddivisione in chunk. Gestisce validazione, estrazione testo, pulizia
strutturale e rilevamento automatico della struttura del documento.
Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura corretto,
testo fluente, struttura preservata).
Output per ciascuno stem:
conversione/<stem>/raw.md — Markdown grezzo (immutabile)
conversione/<stem>/clean.md — Markdown pulito e strutturato
conversione/<stem>/structure_profile.json
Uso:
python conversione/pipeline.py --stem <nome>
python conversione/pipeline.py # tutti i PDF in sources/
python conversione/pipeline.py --stem <nome> --force # forza riesecuzione
Prerequisiti:
pip install opendataloader-pdf
Java 11+ sul PATH (https://adoptium.net/)
"""
import argparse
import json
import re
import subprocess
import sys
import tempfile
from collections import Counter
from datetime import datetime
from functools import partial
from pathlib import Path
# ─── Verifica dipendenze ──────────────────────────────────────────────────────
def _check_deps() -> None:
try:
import opendataloader_pdf # noqa: F401
except ImportError:
print("Errore: opendataloader-pdf non installato.")
print(" pip install opendataloader-pdf")
sys.exit(1)
try:
result = subprocess.run(
["java", "-version"],
capture_output=True, text=True,
)
if result.returncode != 0:
raise FileNotFoundError
except FileNotFoundError:
print("Errore: Java 11+ non trovato sul PATH.")
print(" Installa da https://adoptium.net/")
sys.exit(1)
# ─── [1] Validazione PDF ─────────────────────────────────────────────────────
def check_pdf(pdf_path: Path) -> tuple[bool, str]:
"""
Validazione rapida: esistenza, leggibilità, testo estraibile.
Restituisce (ok, messaggio).
"""
if not pdf_path.exists():
return False, f"File non trovato: {pdf_path}"
if pdf_path.suffix.lower() != ".pdf":
return False, f"Non è un PDF: {pdf_path.name}"
size = pdf_path.stat().st_size
if size == 0:
return False, "File vuoto"
if size < 1024:
return False, f"File troppo piccolo ({size} byte) — probabilmente corrotto"
try:
import pdfplumber
with pdfplumber.open(pdf_path) as pdf:
n_pages = len(pdf.pages)
if n_pages == 0:
return False, "PDF senza pagine"
sample = min(5, n_pages)
pages_with_text = sum(
1 for i in range(sample)
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if pages_with_text == 0:
# Estende il campione: copertine immagine o pagine bianche iniziali
extended = min(15, n_pages)
if extended > sample:
ext_with_text = sum(
1 for i in range(sample, extended)
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if ext_with_text > 0:
return True, (
f"{n_pages} pagine — prime {sample} vuote, "
f"testo trovato in pagine successive "
f"(possibile copertina immagine)"
)
return False, (
f"Nessun testo nelle prime {extended} pagine "
f"— probabilmente scansionato (OCR non supportato)"
)
return True, f"{n_pages} pagine, testo digitale confermato"
except MemoryError:
return False, "Memoria esaurita durante l'apertura del PDF"
except Exception as e:
msg = str(e).lower()
if "password" in msg or "encrypted" in msg:
return False, "PDF protetto da password"
return False, f"Impossibile aprire: {e}"
# ─── [2] Conversione PDF → Markdown ─────────────────────────────────────────
def convert_pdf(pdf_path: Path, out_dir: Path) -> Path:
"""
Converte il PDF in Markdown tramite opendataloader-pdf.
Scrive il file nella out_dir e restituisce il percorso.
Parametri scelti per output RAG-ottimale:
- keep_line_breaks=False → testo fluente, no hard-wrap PDF
- reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++)
- sanitize=False → preserva il testo originale (no anonimizzazione PII)
"""
import opendataloader_pdf
out_dir.mkdir(parents=True, exist_ok=True)
opendataloader_pdf.convert(
input_path=str(pdf_path),
output_dir=str(out_dir),
format="markdown",
keep_line_breaks=False,
reading_order="xycut",
sanitize=False,
image_output="off", # nessuna immagine estratta né referenziata
quiet=True, # sopprime i log Java
)
# Il file output si chiama <stem>.md
md_file = out_dir / f"{pdf_path.stem}.md"
if not md_file.exists():
candidates = list(out_dir.glob("*.md"))
if not candidates:
raise RuntimeError(f"Nessun file .md prodotto in {out_dir}")
md_file = candidates[0]
content = md_file.read_text(encoding="utf-8", errors="replace").strip()
if len(content) < 100:
raise RuntimeError(
f"opendataloader ha prodotto un file .md quasi vuoto ({len(content)} char) "
f"— il PDF potrebbe essere corrotto o non supportato"
)
return md_file
# ─── [3] Pulizia strutturale ─────────────────────────────────────────────────
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
"indice generale", "indice analitico", "indice dei contenuti",
"elenco dei capitoli", "argomenti", "table des matières",
"tabla de contenidos", "содержание",
])
_ORDINALS_IT = {
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
"NONO": "IX", "DECIMO": "X",
}
_ORDINALS_EN = {
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
}
def _sentence_case(s: str) -> str:
if not s:
return s
lower = s.lower()
return lower[0].upper() + lower[1:]
def _is_allcaps_line(line: str) -> bool:
stripped = line.strip()
letters = [c for c in stripped if c.isalpha()]
return (
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
and not stripped.startswith("|") # esclude righe tabella Markdown
)
def _allcaps_to_header(raw_line: str) -> str:
# Rimuovi eventuale prefisso di lista "- " o "* " prima di creare l'header
text = re.sub(r"^[-*+]\s+", "", raw_line.strip())
text = text.rstrip(".").rstrip("?").strip()
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
if m:
roman = _ORDINALS_IT[m.group(1)]
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Capitolo {roman}{_sentence_case(titolo)}"
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
if m:
n = _ORDINALS_EN.get(m.group(1), m.group(1))
titolo = m.group(2).rstrip(".").rstrip("?").strip()
return f"## Chapter {n}{_sentence_case(titolo)}"
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
if m:
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
return f"## {_sentence_case(text)}"
def _extract_math_environments(text: str) -> tuple[str, int]:
"""
Converte paragrafi che iniziano con ambienti matematici in header ###.
'Teorema 1.6.3 (principio di induzione) Sia A ⊆ N...'
'### Teorema 1.6.3 (principio di induzione)\n\nSia A ⊆ N...'
Riconosce: Definizione, Teorema, Lemma, Proposizione, Corollario,
Osservazione, Nota, Esempio (solo con numero di sezione).
Non tocca paragrafi che già iniziano con un header Markdown.
Deve girare PRIMA del merge paragrafi (step 5) per sfruttare i blocchi intatti.
"""
_ENVS = (
r"Definizione|Definition|Teorema|Theorem|Lemma|"
r"Proposizione|Proposition|Corollario|Corollary|"
r"Osservazione|Remark|Nota|Note|Esempio|Example"
)
count = 0
blocks = text.split("\n\n")
result = []
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
m = re.match(
rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)",
stripped,
re.DOTALL,
)
if not m:
result.append(block)
continue
env = m.group(1)
num = m.group(2).rstrip(".")
rest = m.group(3).strip()
# Titolo opzionale tra parentesi: "(principio di induzione)"
title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL)
if title_m:
header = f"### {env} {num} {title_m.group(1)}"
body = title_m.group(2).strip()
else:
header = f"### {env} {num}."
body = rest
result.append(f"{header}\n\n{body}" if body else header)
count += 1
return "\n\n".join(result), count
def _merge_title_headers(text: str) -> tuple[str, int]:
"""
Fonde header numerici isolati con il sottotitolo breve che li segue.
'### N.\n\nSottotitolo (riga singola ≤ 80 char, senza punto finale)'
'### N. Sottotitolo'
Caso tipico: parti di un'opera (es. Nietzsche) dove il numero di sezione
e il titolo della sezione sono in blocchi Markdown separati.
Non tocca header con titolo già inline né header seguiti da testo lungo.
"""
count = 0
blocks = re.split(r"\n{2,}", text)
result = []
i = 0
while i < len(blocks):
block = blocks[i]
stripped = block.strip()
if (
re.match(r"^#{2,3} \d+\.\s*$", stripped)
and i + 1 < len(blocks)
):
nxt = blocks[i + 1].strip()
# Sottotitolo valido: riga singola, ≤ 80 char, non header, non numerazione pura
if (
nxt
and "\n" not in nxt
and len(nxt) <= 80
and not nxt.startswith("#")
and not re.match(r"^\d+[\.\)]\s", nxt)
):
result.append(stripped.rstrip() + " " + nxt)
count += 1
i += 2
continue
result.append(block)
i += 1
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count
def _extract_article_headers(text: str) -> tuple[str, int]:
"""
Converte voci di articolo dal formato lista Markdown al formato header ###.
'- Art. N[suffix]. Titolo. Corpo testo...''### Art. N[suffix]. Titolo.\n\nCorpo testo...'
'- Art. N[suffix]. (…) (1)''### Art. N[suffix].\n\n(…) (1)'
Gestisce suffissi come: Art. 4-bis., Art. 14-ter., Art. 1-quinquies.
Il titolo è la prima frase con iniziale maiuscola che termina con '.' prima di
ulteriore testo (es. "Leggi. La formazione..." → titolo "Leggi", corpo "La formazione...").
Se il testo non ha titolo separabile, tutto diventa il corpo.
"""
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
rest = m.group(2).strip()
# Titolo: frase con iniziale maiuscola, max 75 char, termina con '.',
# seguita da almeno un'altra frase (minimo 5 char) che inizia con maiuscola
# o con '(' / cifra (note a piè o continuazione corpo).
title_m = re.match(
r"^([A-ZÀÈÉÌÍÒÓÙÚ].{1,74}?)\.\s+([A-ZÀÈÉÌÍÒÓÙÚ\(\d].{4,})",
rest,
)
if title_m:
count += 1
return (
f"### Art. {num}. {title_m.group(1)}.\n\n"
f"{title_m.group(2).strip()}"
)
# Nessun titolo separabile: tutto è corpo
if rest:
count += 1
return f"### Art. {num}.\n\n{rest}"
# Articolo senza testo inline (es. "- Art. 5. (…) (1)" già estratto sopra,
# oppure articolo vuoto nella lista)
count += 1
return f"### Art. {num}."
text = re.sub(
r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)",
_repl,
text,
flags=re.MULTILINE,
)
return text, count
# ─── [3a] Funzioni di trasformazione ─────────────────────────────────────────
# Mapping PUA Unicode (U+F020-U+F0FF) → simboli corretti per font Symbol/Wingdings.
# Il font Symbol di Windows codifica lettere greche e operatori matematici nel
# range Private Use Area invece dei codepoint Unicode standard.
_SYMBOL_PUA_MAP: dict[str, str] = {
"\uf020": " ", # space
"\uf028": "(",
"\uf029": ")",
"\uf02b": "+",
"\uf02d": "\u2212", # minus
"\uf02e": ".",
"\uf02f": "/",
"\uf030": "0", "\uf031": "1", "\uf032": "2", "\uf033": "3", "\uf034": "4",
"\uf035": "5", "\uf036": "6", "\uf037": "7", "\uf038": "8", "\uf039": "9",
"\uf03a": ":", "\uf03b": ";", "\uf03c": "<", "\uf03d": "=", "\uf03e": ">",
"\uf040": "\u2245", # congruent
"\uf041": "\u0391", # Alpha
"\uf042": "\u0392", # Beta
"\uf043": "\u03a7", # Chi
"\uf044": "\u0394", # Delta
"\uf045": "\u0395", # Epsilon
"\uf046": "\u03a6", # Phi
"\uf047": "\u0393", # Gamma
"\uf048": "\u0397", # Eta
"\uf049": "\u0399", # Iota
"\uf04a": "\u03d1", # theta variant
"\uf04b": "\u039a", # Kappa
"\uf04c": "\u039b", # Lambda
"\uf04d": "\u039c", # Mu
"\uf04e": "\u039d", # Nu
"\uf04f": "\u039f", # Omicron
"\uf050": "\u03a0", # Pi
"\uf051": "\u0398", # Theta
"\uf052": "\u03a1", # Rho
"\uf053": "\u03a3", # Sigma
"\uf054": "\u03a4", # Tau
"\uf055": "\u03a5", # Upsilon
"\uf056": "\u03c2", # sigma final
"\uf057": "\u03a9", # Omega
"\uf058": "\u039e", # Xi
"\uf059": "\u03a8", # Psi
"\uf05a": "\u0396", # Zeta
"\uf05b": "[",
"\uf05c": "\u2234", # therefore
"\uf05d": "]",
"\uf05e": "\u22a5", # perpendicular
"\uf061": "\u03b1", # alpha
"\uf062": "\u03b2", # beta
"\uf063": "\u03c7", # chi
"\uf064": "\u03b4", # delta
"\uf065": "\u03b5", # epsilon
"\uf066": "\u03c6", # phi
"\uf067": "\u03b3", # gamma
"\uf068": "\u03b7", # eta
"\uf069": "\u03b9", # iota
"\uf06a": "\u03d5", # phi variant
"\uf06b": "\u03ba", # kappa
"\uf06c": "\u03bb", # lambda
"\uf06d": "\u03bc", # mu
"\uf06e": "\u03bd", # nu
"\uf06f": "\u03bf", # omicron
"\uf070": "\u03c0", # pi
"\uf071": "\u03b8", # theta
"\uf072": "\u03c1", # rho
"\uf073": "\u03c3", # sigma
"\uf074": "\u03c4", # tau
"\uf075": "\u03c5", # upsilon
"\uf076": "\u03d6", # pi symbol
"\uf077": "\u03c9", # omega
"\uf078": "\u03be", # xi
"\uf079": "\u03c8", # psi
"\uf07a": "\u03b6", # zeta
"\uf07b": "{",
"\uf07c": "|",
"\uf07d": "}",
"\uf07e": "~",
"\uf0b1": "\u00b1", # plus-minus
"\uf0b7": "\u2022", # bullet
"\uf0ba": "\u221a", # square root
"\uf0bc": "\u2264", # less or equal
"\uf0bd": "\u2265", # greater or equal
"\uf0be": "\u221d", # proportional
"\uf0d7": "\u00d7", # multiplication
"\uf0f7": "\u00f7", # division
"\uf0b4": "\u00d7", # alternate multiply
"\uf0bb": "\u2260", # not equal
"\uf0b9": "\u2260", # not equal alternate
"\uf0b3": "\u2265", # greater or equal alternate
"\uf0b2": "\u2032", # prime
"\uf02a": "*",
"\uf02c": ",",
"\uf0a3": "\u2264", # less or equal (Symbol 0xA3)
"\uf0a7": "\u2022", # bullet (Wingdings 0xA7)
"\uf0a8": "\u2022", # bullet variant
"\uf0ae": "\u2192", # right arrow (Symbol 0xAE)
"\uf0b8": "\u00f7", # division / range separator
"\uf0eb": "", # Wingdings decorative icon (rimosso)
"\uf0f0": "\u2192", # right arrow variant
"\uf0db": "", # bracket extension piece (non ricostruibile)
"\uf0dc": "", # bracket extension piece
"\uf0dd": "", # bracket extension piece
"\uf0de": "", # brace middle piece (non ricostruibile)
"\uf0df": "", # brace extension piece
}
_SYMBOL_PUA_RE = re.compile(
"[" + "".join(re.escape(k) for k in _SYMBOL_PUA_MAP) + "]"
)
def _t_fix_symbol_font(text: str) -> tuple[str, int]:
"""Rimappa caratteri PUA font Symbol (U+F020-U+F0FF) in simboli Unicode corretti."""
count = [0]
def _repl(m: re.Match) -> str:
count[0] += 1
return _SYMBOL_PUA_MAP[m.group(0)]
result = _SYMBOL_PUA_RE.sub(_repl, text)
return result, count[0]
def _t_remove_images(text: str) -> tuple[str, int]:
n = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text))
text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text)
return text, n
# Superscript Unicode: ¹²³⁴⁵⁶⁷⁸⁹⁰
_SUPERSCRIPT_RE = re.compile(r'[\u00b9\u00b2\u00b3\u2070\u2074-\u2079]+')
# Riga corpo-nota: inizia con superscript o [N]
_FOOTNOTE_BODY_RE = re.compile(
r'^([\u00b9\u00b2\u00b3\u2070\u2074-\u2079]+\s+|\[\d{1,3}\]\s+)'
)
def _t_remove_footnotes(text: str) -> tuple[str, int]:
"""Rimuovi marcatori footnote superscript inline e righe corpo-nota."""
lines = text.split("\n")
result, count = [], 0
for line in lines:
stripped = line.strip()
# Corpo nota: riga breve che inizia con ¹ o [N]
if stripped and _FOOTNOTE_BODY_RE.match(stripped) and len(stripped) < 300:
count += 1
continue
cleaned = _SUPERSCRIPT_RE.sub("", line)
if cleaned != line:
count += 1
result.append(cleaned)
return "\n".join(result), count
def _t_fix_br(text: str) -> tuple[str, int]:
n = len(re.findall(r"<br>", text, re.IGNORECASE))
text = re.sub(r"<br>\s*", " ", text, flags=re.IGNORECASE)
return text, n
def _t_fix_tabsep(text: str) -> tuple[str, int]:
_pat = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$")
n = len(_pat.findall(text))
text = _pat.sub("", text)
return text, n
def _t_fix_accents(text: str) -> tuple[str, int]:
"""Fix artefatti backtick da PDF LaTeX: `e→è, e`→è, sar`a→sarà, ecc."""
_ACCENT_MAP = {
"e": "è", "E": "È", "a": "à", "A": "À",
"u": "ù", "U": "Ù", "i": "ì", "I": "Ì", "o": "ò", "O": "Ò",
}
n_bt_before = text.count("`")
text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text)
text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text)
n_accenti = n_bt_before - text.count("`")
# Backtick orfani: artefatti LaTeX rimasti dopo la correzione vocale
n_bt_orfani = text.count("`")
if n_bt_orfani:
text = re.sub(r"`", "", text)
n_accenti += n_bt_orfani
return text, n_accenti
def _t_fix_multiplication(text: str) -> tuple[str, int]:
"""Fix segno di moltiplicazione "→× (encoding font PDF non-standard)."""
n = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text))
text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text)
return text, n
def _t_fix_micro(text: str) -> tuple[str, int]:
"""Fix prefisso micro !→µ prima di unità SI note."""
_SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]'
n = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text))
text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text)
return text, n
def _t_remove_formula_labels(text: str) -> tuple[str, int]:
"""Rimuovi label formule inline [N.M] — es. [3.4], [10.7]."""
n = len(re.findall(r"\[\d+\.\d+\]", text))
text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text)
return text, n
def _t_remove_dotleaders(text: str) -> tuple[str, int]:
"""Rimuovi righe con dot-leader e numerali romani isolati (footer TOC)."""
_DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$"
n = len(re.findall(_DOTLEADER_RE, text, re.MULTILINE))
text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE)
text = re.sub(
r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$",
"",
text,
flags=re.IGNORECASE,
)
return text, n
def _t_fix_header_concat(text: str) -> tuple[str, int]:
"""Fix header + body concatenati senza separatore."""
count = 0
def _fix(m: re.Match) -> str:
nonlocal count
hashes = m.group(1)
full = m.group(2).strip()
if len(full) < 60:
return m.group(0)
skip = min(10, len(full) // 3)
split = re.search(r"(?<=[a-zàèéìíòóùúä])(?=[A-ZÀÈÉÌÍÒÓÙÚ])", full[skip:])
if split:
pos = skip + split.start()
title = full[:pos].strip()
body = full[pos:].strip()
if len(title) >= 5 and len(body) >= 15:
count += 1
return f"{hashes} {title}\n\n{body}"
return m.group(0)
text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix, text, flags=re.MULTILINE)
return text, count
def _t_extract_capitolo(text: str) -> tuple[str, int]:
"""Estrai 'Capitolo N: TITOLO' inline nel corpo del testo → ## header."""
def _repl(m: re.Match) -> str:
num = m.group(1)
titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip())
return f"\n\n## Capitolo {num}: {titolo}\n\n"
text = re.sub(
r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-ZÀÈÉÌÍÒÓÙÚ\'L][A-ZÀÈÉÌÍÒÓÙÚ\s\'\.,\(\)]{5,80}?)"
r"(?=\s*[-]\s*\d|\s*\n|\s*$)",
_repl,
text,
)
return text, 0
_NUMBERED_HDR_RE = re.compile(
r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$",
re.MULTILINE,
)
def _t_normalize_numbered_headings(text: str) -> tuple[str, int]:
"""Corregge livelli header per documenti con numerazione decimale.
Assegna livello heading in base alla profondità numerica usando come base
il livello corrente degli header di profondità minima.
Attivo solo se il documento ha almeno 2 profondità di numerazione.
"""
all_matches = list(_NUMBERED_HDR_RE.finditer(text))
if not all_matches:
return text, 0
pairs = [
(m.group(2).count(".") + 1, len(m.group(1)))
for m in all_matches
]
depths = [d for d, _ in pairs]
min_depth, max_depth = min(depths), max(depths)
if max_depth == min_depth:
return text, 0
base_level = min(lv for d, lv in pairs if d == min_depth)
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
hashes, num, title = m.group(1), m.group(2), m.group(3)
depth = num.count(".") + 1
new_level = min(base_level + (depth - min_depth), 6)
if new_level == len(hashes):
return m.group(0)
count += 1
return f"{'#' * new_level} {num}. {title}"
return _NUMBERED_HDR_RE.sub(_repl, text), count
def _t_normalize_header_levels(text: str) -> tuple[str, int]:
"""Normalizza h4+ → h3; rimuove header vuoti; rimuove numero pagina '| N' finale."""
text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE)
text = re.sub(
r"^(#{3,6})\s+(\d{1,3})\s+(.+)$",
lambda m: f"### {m.group(2)}. {m.group(3)}",
text,
flags=re.MULTILINE,
)
text = re.sub(r"^#{4,6}\s+(.+)$", r"### \1", text, flags=re.MULTILINE)
return text, 0
def _t_extract_articles(text: str) -> tuple[str, int]:
"""Converti voci articolo '- Art. N.''### Art. N.'"""
return _extract_article_headers(text)
def _t_remove_header_bold(text: str) -> tuple[str, int]:
"""Rimuovi **bold** negli header esistenti."""
text = re.sub(
r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$",
r"\1 \2",
text, flags=re.MULTILINE,
)
return text, 0
def _t_normalize_allcaps_headers(text: str) -> tuple[str, int]:
"""Normalizza header ALL-CAPS → sentence-case."""
def _norm(m: re.Match) -> str:
hashes, content = m.group(1), m.group(2).strip()
letters = [c for c in content if c.isalpha()]
if letters and all(c.isupper() for c in letters):
return f"{hashes} {_sentence_case(content)}"
return m.group(0)
text = re.sub(r"^(#{1,6}) (.+)$", _norm, text, flags=re.MULTILINE)
return text, 0
def _t_remove_toc(text: str) -> tuple[str, int]:
"""Rimuovi header TOC e voci lista numerate che seguono."""
lines = text.split("\n")
new_lines = []
_in_toc = False
removed = False
for line in lines:
bare = re.sub(r"^#+\s*", "", line.strip())
first_word = bare.split(".")[0].strip().lower()
if first_word in _TOC_KEYWORDS:
removed = True
_in_toc = True
continue
if _in_toc:
if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line):
continue
# Voce TOC con numero pagina finale (sicuro: siamo gia in contesto TOC)
if re.match(r"^\s*[-*+]\s+.{2,70}\s+\d{1,3}\s*$", line):
continue
# Riga di testo lungo = probabilmente abstract o corpo, non voce di indice
if len(line.strip()) > 200:
_in_toc = False
new_lines.append(line)
continue
_in_toc = False
new_lines.append(line)
return "\n".join(new_lines), 1 if removed else 0
def _t_allcaps_to_headers(text: str) -> tuple[str, int]:
"""Converti righe ALL-CAPS standalone → ## header."""
count = 0
blocks = text.split("\n\n")
new_blocks = []
for block in blocks:
stripped = block.strip()
if "\n" not in stripped and _is_allcaps_line(stripped):
new_blocks.append(_allcaps_to_header(stripped))
count += 1
else:
sub_lines = block.split("\n")
converted = []
for ln in sub_lines:
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
converted.append(_allcaps_to_header(ln))
count += 1
else:
converted.append(ln)
new_blocks.append("\n".join(converted))
return "\n\n".join(new_blocks), count
_BIB_MARKERS_RE = re.compile(
r'\b(pp?\.|vol\.|n\.\s*\d|ed\.|edn\.|ISBN|DOI|arXiv)\b'
r'|\b(19|20)\d{2}\b',
re.IGNORECASE,
)
def _t_numbered_sections(text: str, has_exercises: bool = False) -> tuple[str, int]:
"""Converti sezioni numerate 'N. testo' / '- N. testo' / '- N testo' → ### header."""
count = 0
def _num_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if content.endswith(".") and len(content) > 40:
return m.group(0)
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
return f"### {m.group(1)}.\n\n{content}"
text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE)
def _num_letter_repl(m: re.Match) -> str:
nonlocal count
count += 1
return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}"
text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE)
# Disabilitato se il documento contiene sezioni "Esercizi": in quel caso i
# "- N. testo" sono numerazioni di esercizi, non header di sezione.
if not has_exercises:
def _aphorism_repl(m: re.Match) -> str:
nonlocal count
content = m.group(2).strip()
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
return f"\n\n### {m.group(1)}.\n\n{content}"
text = re.sub(
r"^-\s+(\d{1,3})\.\s+(.{10,})$",
_aphorism_repl,
text,
flags=re.MULTILINE,
)
def _list_section_repl(m: re.Match) -> str:
nonlocal count
num = m.group(1)
content = m.group(2).strip()
if _BIB_MARKERS_RE.search(content):
return m.group(0)
count += 1
split = re.search(r"(?<=[a-zàèéìíòóùú])\s+(?=[A-ZÀÈÉÌÍÒÓÙÚ])", content)
if split and split.start() >= 3:
title = content[: split.start()].strip()
body = content[split.end():].strip()
if len(body) >= 20:
return f"\n\n### {num}. {title}\n\n{body}"
return f"\n\n### {num}. {content}"
text = re.sub(
r"^-\s+(\d{1,3})\s+([A-ZÀÈÉÌÍÒÓÙÚ\'L].{10,})$",
_list_section_repl,
text,
flags=re.MULTILINE,
)
return text, count
def _t_extract_math(text: str) -> tuple[str, int]:
"""Converti ambienti matematici (Teorema/Definizione/...) → ### header."""
return _extract_math_environments(text)
def _t_merge_paragraphs(text: str) -> tuple[str, int]:
"""Unisci paragrafi spezzati da salti pagina PDF."""
_SENTENCE_END = set(".?!»)\"'")
blocks = text.split("\n\n")
merged = []
count = 0
i = 0
while i < len(blocks):
b = blocks[i]
stripped = b.strip()
while (
i + 1 < len(blocks)
and stripped
and not stripped.startswith("#")
and not stripped.startswith("|") # non unire righe tabella in avanti
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
if not nxt or nxt.startswith("#") or nxt.startswith("|") or re.match(r"^\d+\.", nxt) or re.match(r"^[-*+]\s", nxt):
break
b = stripped + " " + nxt
stripped = b.strip()
count += 1
i += 1
merged.append(b)
i += 1
text = "\n\n".join(merged)
# Secondo pass: rimuovi prefisso |---| eventualmente rimasto dopo il merge
text = re.sub(r"(?m)^\|---\|\s*", "", text)
return text, count
def _t_normalize_whitespace(text: str) -> tuple[str, int]:
"""Normalizza whitespace multiplo interno alle righe."""
lines = text.split("\n")
text = "\n".join(
re.sub(r" +", " ", line) if line.strip() else line
for line in lines
)
return text, 0
def _t_collapse_blank_lines(text: str) -> tuple[str, int]:
"""Riduci righe vuote multiple a doppie."""
return re.sub(r"\n{3,}", "\n\n", text), 0
def _t_demote_verse_headers(text: str) -> tuple[str, int]:
"""Demoti header che sono in realtà terzine/versi.
opendataloader promuove a ## le iscrizioni e i testi in evidenza nel PDF
(corpo maggiore, centrato). Si riconoscono perché:
- terminano con un numero nudo (numero di verso: 3, 6, 9, …)
- contengono punteggiatura interna di fine verso (', ' o '. ')
Esempio: '## «per me si va ne la città dolente, ... gente. 3'
→ paragrafo normale senza il numero finale.
"""
count = 0
def _demote(m: re.Match) -> str:
nonlocal count
hashes, content = m.group(1), m.group(2).strip()
# Deve terminare con numero nudo (numero di verso ≤ 9999)
if not re.search(r"\s\d{1,4}\s*$", content):
return m.group(0)
# Deve contenere punteggiatura interna (è un blocco di più versi)
inner = re.sub(r"\s\d{1,4}\s*$", "", content)
if not re.search(r"[,;:.!?»\"\']\s+[A-Za-zÀ-ÿ«\"]", inner):
return m.group(0)
count += 1
# Rimuovi il numero di verso finale e restituisci come testo normale
clean = re.sub(r"\s\d{1,4}\s*$", "", content)
return clean
text = re.sub(
r"^(#{1,6})\s+(.{20,})$",
_demote,
text,
flags=re.MULTILINE,
)
return text, count
def _t_restore_poetry_lines(text: str) -> tuple[str, int]:
"""Ripristina line break di poesia distrutti da keep_line_breaks=False.
Quando il PDF è poesia (terzine dantesche, sonetti, ecc.) opendataloader
con keep_line_breaks=False produce un unico paragrafo con i numeri di verso
(3, 6, 9 … oppure 1, 2, 3 …) incorporati inline:
'smarrita. 3 Ahi quanto a dir qual era è cosa dura … paura! 6 Tant'è …'
Il transform rileva blocchi con numeri di verso in progressione aritmetica
e li separa in righe, con riga vuota ogni 3 versi (terzina).
"""
count = 0
blocks = text.split("\n\n")
result = []
# Pattern: numero isolato preceduto da punteggiatura-fine-verso e seguito
# da lettera maiuscola (inizio verso successivo).
_VERSE_NUM_RE = re.compile(
r'([.!?»\'\"]\s+)(\d+)(\s+)(?=[A-ZÀ-Ùa-zà-ù«"‟])'
)
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
matches = list(_VERSE_NUM_RE.finditer(stripped))
if len(matches) < 2:
result.append(block)
continue
nums = [int(m.group(2)) for m in matches]
diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)]
# Accetta progressioni con passo costante 15 (terzine: 3, endecasillabi: 1)
if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5):
result.append(block)
continue
step = diffs[0]
def _replace_verse_num(m: re.Match) -> str:
n = int(m.group(2))
# Ogni 'step' versi → riga vuota (inizio nuova terzina/strofa)
sep = "\n\n" if n % (step * 3) == 0 else "\n"
return m.group(1).rstrip() + sep
new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped)
if new_block != stripped:
count += len(matches)
result.append(new_block)
return "\n\n".join(result), count
def _t_remove_urls(text: str) -> tuple[str, int]:
"""Rimuovi righe che sono solo URL (watermark, footer di piattaforme)."""
return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0
def _t_remove_empty_headers(text: str) -> tuple[str, int]:
"""Rimuovi header senza corpo (sezioni vuote / watermark)."""
blocks = re.split(r"\n{2,}", text)
cleaned = []
for i, block in enumerate(blocks):
stripped = block.strip()
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
# Non rimuovere un header breve se il successivo è un header molto lungo
# (> 80 char): quasi certamente è testo PDF mal classificato come heading.
next_is_long_header = (
re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80
)
if not next_stripped or (
re.match(r"^#{1,6} ", next_stripped) and not next_is_long_header
):
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0
def _t_merge_title_headers(text: str) -> tuple[str, int]:
"""Fondi header numerici isolati con il sottotitolo breve successivo."""
return _merge_title_headers(text)
def _t_remove_garbage_headers(text: str) -> tuple[str, int]:
"""Rimuovi garbage headers: simboli, abbreviazioni matematiche, frammenti formula."""
def _is_garbage_header(content: str) -> bool:
if content.lstrip().startswith("..."):
return True
if not re.search(r"[A-Za-zÀ-ÿ\u0391-\u03c9]{2,}", content):
return True
if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()):
return True
if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content):
return True
# Frammento di frase: inizia con minuscola ed e abbastanza lungo
first_alpha = next((c for c in content if c.isalpha()), None)
if first_alpha and first_alpha.islower() and len(content) > 40:
return True
# Formula matematica: variabile singola (o breve) seguita da = o operatore
if re.match(r"^[A-Za-z\u0391-\u03c9_]{1,3}\s*[=<>≤≥]", content.strip()):
return True
# Didascalia figura/tabella: "Figura N..." o "Figure N..." o "Tabella N..."
if re.match(r"^(Figura|Figure|Fig\.|Tabella|Table|Tab\.)\s+\d", content.strip(), re.IGNORECASE):
return True
return False
count = 0
lines = text.split("\n")
new_lines = []
for line in lines:
m = re.match(r"^#{1,6} (.+)$", line)
if m and _is_garbage_header(m.group(1)):
count += 1
continue
new_lines.append(line)
text = "\n".join(new_lines)
text = re.sub(r"\n{3,}", "\n\n", text)
return text, count
def _t_remove_frontmatter(text: str) -> tuple[str, int]:
"""Rimuovi sezioni frontmatter: URL, email, affiliazione, copyright."""
_FM_RE = re.compile(
r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|"
r"\bCopyright\b|\bLicenza\b|\bEdizione\b|"
r"protetto da|tutti i diritti",
re.IGNORECASE,
)
blocks = re.split(r"\n{2,}", text)
cleaned = []
count = 0
total = len(blocks)
cutoff = max(5, min(15, int(total * 0.20)))
for i, block in enumerate(blocks):
stripped = block.strip()
# Frontmatter compare solo nelle prime sezioni del documento
if i >= cutoff:
cleaned.append(block)
continue
if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped):
cleaned.append(block)
continue
body = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
is_fm_body = len(body) < 250 and _FM_RE.search(body)
is_fm_hdr = _FM_RE.search(stripped)
if is_fm_body or is_fm_hdr:
count += 1
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count
_WATERMARK_RE = re.compile(
r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN"
r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$",
re.IGNORECASE | re.MULTILINE,
)
def _t_remove_watermarks(text: str) -> tuple[str, int]:
"""Rimuovi righe standalone con testo watermark comune."""
lines = text.split("\n")
result, count = [], 0
for line in lines:
if _WATERMARK_RE.match(line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_fix_math_symbols(text: str) -> tuple[str, int]:
"""Rimuovi righe composte solo da simboli box/placeholder (font non estratti)."""
lines = text.split("\n")
result, count = [], 0
for line in lines:
if line.strip() and re.match(r"^[\s□■▪▫◆◇●○•\u25a0-\u25ff]+$", line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_remove_recurring_lines(text: str) -> tuple[str, int]:
"""Rimuovi righe corte che si ripetono ≥5 volte (header/footer di pagina)."""
lines = text.split("\n")
short_lines = [
ln.strip() for ln in lines
if 3 < len(ln.strip()) < 80
and not ln.strip().startswith("#")
and not ln.strip().startswith("|")
]
freq = Counter(short_lines)
recurring = {ln for ln, c in freq.items() if c >= 5}
if not recurring:
return text, 0
result, count = [], 0
for line in lines:
if line.strip() in recurring:
count += 1
else:
result.append(line)
return "\n".join(result), count
# ─── [3b] Pipeline delle trasformazioni ──────────────────────────────────────
def apply_transforms(text: str) -> tuple[str, dict]:
"""
Applica le trasformazioni strutturali al Markdown grezzo.
Restituisce (testo_modificato, statistiche).
"""
# Flag calcolato prima del loop: disabilita il transform 4b nei documenti
# con sezioni "Esercizi" (i "- N. testo" sarebbero numerazioni, non header).
_has_ex = bool(re.search(r"\b(Esercizi|Exercises|Problems|Homework)\b", text, re.IGNORECASE))
_transforms: list[tuple[str | None, object]] = [
("n_simboli_pua_corretti", _t_fix_symbol_font),
("n_immagini_rimosse", _t_remove_images),
("n_br_rimossi", _t_fix_br),
("n_tabsep_rimossi", _t_fix_tabsep),
("n_note_rimosse", _t_remove_footnotes),
("n_accenti_corretti", _t_fix_accents),
("n_moltiplicazioni_corrette", _t_fix_multiplication),
("n_micro_corretti", _t_fix_micro),
("n_simboli_math_rimossi", _t_fix_math_symbols),
("n_formule_rimossi", _t_remove_formula_labels),
("n_dotleader_rimossi", _t_remove_dotleaders),
("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines),
("n_header_concat_fixati", _t_fix_header_concat),
(None, _t_extract_capitolo),
("n_header_numerati_normalizzati", _t_normalize_numbered_headings),
(None, _t_normalize_header_levels),
("n_articoli_estratti", _t_extract_articles),
(None, _t_remove_header_bold),
(None, _t_normalize_allcaps_headers),
("toc_rimosso", _t_remove_toc),
("n_header_allcaps", _t_allcaps_to_headers),
("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)),
("n_ambienti_matematici", _t_extract_math),
("n_paragrafi_uniti", _t_merge_paragraphs),
(None, _t_normalize_whitespace),
(None, _t_collapse_blank_lines),
("n_versi_ripristinati", _t_restore_poetry_lines),
("n_header_verso_demotati", _t_demote_verse_headers),
(None, _t_remove_urls),
(None, _t_remove_empty_headers),
("n_titoli_uniti", _t_merge_title_headers),
(None, lambda t: (re.sub(r"(?m)^(#{1,6}.+?)\s*\|\s*\d{1,3}\s*$", r"\1", t), 0)),
("n_garbage_headers_rimossi", _t_remove_garbage_headers),
("n_frontmatter_rimossi", _t_remove_frontmatter),
("n_watermark_rimossi", _t_remove_watermarks),
]
stats: dict = {}
for stat_key, fn in _transforms:
text, n = fn(text)
if stat_key:
stats[stat_key] = stats.get(stat_key, 0) + n
stats["toc_rimosso"] = bool(stats.get("toc_rimosso", 0))
return text, stats
# ─── [4] Rilevamento struttura ───────────────────────────────────────────────
_IT_WORDS = frozenset([
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
])
_EN_WORDS = frozenset([
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
_FR_WORDS = frozenset([
"le", "les", "de", "du", "des", "et", "un", "une", "est", "que",
"pour", "dans", "sur", "avec", "qui", "par", "pas", "plus", "au",
"ce", "se", "ou", "mais", "comme", "aussi",
])
_DE_WORDS = frozenset([
"der", "die", "das", "und", "in", "von", "zu", "den", "mit", "ist",
"auf", "eine", "als", "dem", "des", "sich", "nicht", "auch", "werden",
"bei", "nach", "oder", "wenn", "wird", "war",
])
_ES_WORDS = frozenset([
"el", "los", "las", "de", "en", "un", "una", "es", "que", "por",
"con", "del", "para", "como", "pero", "sus", "son", "los", "hay",
"todo", "esta", "este", "ser", "más", "ya",
])
def _detect_language(text: str) -> str:
words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())
sample = words[:2000]
scores = {
"it": sum(1 for w in sample if w in _IT_WORDS),
"en": sum(1 for w in sample if w in _EN_WORDS),
"fr": sum(1 for w in sample if w in _FR_WORDS),
"de": sum(1 for w in sample if w in _DE_WORDS),
"es": sum(1 for w in sample if w in _ES_WORDS),
}
best = max(scores, key=scores.get)
return best if scores[best] > 0 else "unknown"
def _count_headers(text: str, level: int) -> int:
prefix = "#" * level + " "
return len(re.findall(rf"(?m)^{re.escape(prefix)}", text))
def _count_paragraphs(text: str) -> int:
blocks = re.split(r"\n{2,}", text)
return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip()))
def _split_sections(text: str, level: int) -> list[str]:
prefix = "#" * level + " "
parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text)
return [p for p in parts[1:] if p.strip()]
def _parse_sections_with_body(text: str, level: int = 3) -> list[tuple[str, str]]:
"""Restituisce lista di (header_line, body_text) per tutti gli header al livello dato."""
prefix = "#" * level + " "
lines = text.split("\n")
sections: list[tuple[str, str]] = []
cur_hdr: str | None = None
cur_body: list[str] = []
for line in lines:
if line.startswith(prefix):
if cur_hdr is not None:
sections.append((cur_hdr, "\n".join(cur_body).strip()))
cur_hdr = line
cur_body = []
elif cur_hdr is not None:
cur_body.append(line)
if cur_hdr is not None:
sections.append((cur_hdr, "\n".join(cur_body).strip()))
return sections
def analyze(md_path: Path) -> dict:
text = md_path.read_text(encoding="utf-8")
n_h1 = _count_headers(text, 1)
n_h2 = _count_headers(text, 2)
n_h3 = _count_headers(text, 3)
n_paragrafi = _count_paragraphs(text)
if n_h3 >= 5:
livello, boundary, strategia = 3, "h3", "h3_aware"
section_bodies = _split_sections(text, 3)
# Gerarchia invertita: h3 sono capitoli enormi, h2 sono sottosezioni più brevi.
# Succede quando opendataloader classifica titoli capitolo come h6 (→ normalizzati
# a h3) e le sottosezioni ALL-CAPS diventano ## (h2). In questo caso h2 è
# il boundary corretto per il chunking.
if n_h2 >= 3:
h2_bodies = _split_sections(text, 2)
avg_h3 = sum(len(b) for b in section_bodies) / len(section_bodies) if section_bodies else 0
avg_h2 = sum(len(b) for b in h2_bodies) / len(h2_bodies) if h2_bodies else 0
if avg_h3 > 5000 and avg_h2 < avg_h3 * 0.7:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = h2_bodies
elif n_h2 >= 3:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = _split_sections(text, 2)
elif n_h1 + n_h2 + n_h3 >= 1:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
elif n_paragrafi >= 3:
livello, boundary, strategia = 1, "paragrafo", "paragraph"
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
else:
livello, boundary, strategia = 0, "nessuno", "sliding_window"
section_bodies = [text] if text.strip() else []
lengths = [len(b) for b in section_bodies if b.strip()]
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
lingua = _detect_language(text)
avvertenze = []
short = sum(1 for l in lengths if l < 200)
long_ = sum(1 for l in lengths if l > 800)
if short:
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
if long_:
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
return {
"livello_struttura": livello,
"n_h1": n_h1,
"n_h2": n_h2,
"n_h3": n_h3,
"n_paragrafi": n_paragrafi,
"boundary_primario": boundary,
"lingua_rilevata": lingua,
"lunghezza_media_sezione": lunghezza_media,
"strategia_chunking": strategia,
"avvertenze": avvertenze,
}
# ─── Report di conversione ───────────────────────────────────────────────────
def build_report(
stem: str,
out_dir: Path,
clean_text: str,
t_stats: dict,
profile: dict,
reduction: float,
) -> Path:
"""
Genera conversione/<stem>/report.json con tutte le metriche di qualità:
statistiche trasformazioni, struttura, distribuzione lunghezze, anomalie
e problemi residui. Leggibile da validate.py per la validazione batch.
"""
text_lines = clean_text.split("\n")
# ── Raccolta sezioni ### con corpo ────────────────────────────────────
sections = _parse_sections_with_body(clean_text, 3)
lengths = [len(body) for _, body in sections]
# ── Distribuzione lunghezze ───────────────────────────────────────────
def _pct(data: list[int], p: float) -> int:
if not data:
return 0
s = sorted(data)
return s[max(0, min(len(s) - 1, int(len(s) * p)))]
distribution = {
"min": min(lengths) if lengths else 0,
"p25": _pct(lengths, 0.25),
"mediana": _pct(lengths, 0.50),
"p75": _pct(lengths, 0.75),
"max": max(lengths) if lengths else 0,
}
# ── Anomalie ──────────────────────────────────────────────────────────
bare_hdrs = [
{"header": hdr, "corpo_inizio": body[:120].replace("\n", " ")}
for hdr, body in sections
if re.match(r"^### \d+\.\s*$", hdr) and len(body.strip()) < 30
]
short_secs = [
{"header": hdr, "chars": length, "testo": body[:80].replace("\n", " ")}
for (hdr, body), length in zip(sections, lengths)
if 0 < length < 150
]
long_secs = [
{"header": hdr, "chars": length}
for (hdr, _), length in zip(sections, lengths)
if length > 1500
]
# ── Problemi residui (max 10 esempi ciascuno) ─────────────────────────
def _scan(pattern: str, max_n: int = 10) -> list[dict]:
hits = []
for i, line in enumerate(text_lines):
if re.search(pattern, line) and not re.match(r"^#+ ", line):
hits.append({"riga": i + 1, "testo": line.strip()[:120]})
if len(hits) >= max_n:
break
return hits
residui = {
"backtick": _scan(r"`"),
"dotleader": _scan(r"(?:\. ){3,}"),
"url": _scan(r"^(https?://|www\.)\S+"),
"immagini": _scan(r"!\[[^\]]*\]\([^)]*\)"),
"br_inline": _scan(r"<br>"),
"simboli_encoding": _scan(r'(?<=[0-9A-Za-z])[!"](?=[0-9A-Za-z])'),
"formule_inline": _scan(r"\[\d+\.\d+\]"),
"footnote_markers": _scan(r'[\u00b9\u00b2\u00b3\u2070\u2074-\u2079]'),
"pua_markers": _scan(r'[\ue000-\uf8ff]'),
}
# ── Composizione report ───────────────────────────────────────────────
report = {
"stem": stem,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
"transforms": {
**t_stats,
"riduzione_pct": round(reduction),
},
"structure": profile,
"distribution": distribution,
"anomalie": {
"bare_headers": len(bare_hdrs),
"short_sections": len(short_secs),
"long_sections": len(long_secs),
"bare_headers_list": bare_hdrs,
"short_sections_list": short_secs,
"long_sections_list": long_secs,
},
"residui": {
"backtick": len(residui["backtick"]),
"dotleader": len(residui["dotleader"]),
"url": len(residui["url"]),
"immagini": len(residui["immagini"]),
"br_inline": len(residui["br_inline"]),
"simboli_encoding": len(residui["simboli_encoding"]),
"formule_inline": len(residui["formule_inline"]),
"footnote_markers": len(residui["footnote_markers"]),
"pua_markers": len(residui["pua_markers"]),
"backtick_esempi": residui["backtick"],
"dotleader_esempi": residui["dotleader"],
"url_esempi": residui["url"],
"immagini_esempi": residui["immagini"],
"br_inline_esempi": residui["br_inline"],
"simboli_encoding_esempi": residui["simboli_encoding"],
"formule_inline_esempi": residui["formule_inline"],
"footnote_markers_esempi": residui["footnote_markers"],
"pua_markers_esempi": residui["pua_markers"],
},
}
report_path = out_dir / "report.json"
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
return report_path
# ─── Pipeline principale ──────────────────────────────────────────────────────
def run(stem: str, project_root: Path, force: bool) -> bool:
pdf_path = project_root / "sources" / f"{stem}.pdf"
out_dir = project_root / "conversione" / stem
raw_out = out_dir / "raw.md"
clean_out = out_dir / "clean.md"
print(f"\n{'' * 52}")
print(f" {stem}")
print(f"{'' * 52}")
if clean_out.exists() and not force:
print(f" ⚠️ conversione/{stem}/clean.md già presente — skip")
print(f" (usa --force per rieseguire)")
return True
# ── [1] Validazione ────────────────────────────────────────────────────
print(" [1/4] Validazione PDF...")
ok, msg = check_pdf(pdf_path)
if not ok:
print(f"{msg}")
return False
print(f"{msg}")
# ── [2] Conversione ────────────────────────────────────────────────────
print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...")
with tempfile.TemporaryDirectory() as tmp:
try:
md_file = convert_pdf(pdf_path, Path(tmp))
except MemoryError:
print(" ✗ Memoria esaurita durante la conversione")
return False
except Exception as e:
print(f" ✗ Conversione fallita: {e}")
return False
try:
raw_text = md_file.read_text(encoding="utf-8")
except UnicodeDecodeError as e:
print(f" ✗ Errore encoding nel file prodotto: {e}")
return False
size_kb = len(raw_text.encode()) // 1024
n_lines = raw_text.count("\n")
print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe")
# ── [3] Pulizia strutturale ────────────────────────────────────────────
print(" [3/4] Pulizia strutturale...")
clean_text, t_stats = apply_transforms(raw_text)
reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0
print(f" ✅ Simboli PUA corretti: {t_stats['n_simboli_pua_corretti']}")
print(f" Immagini rimosse: {t_stats['n_immagini_rimosse']}")
print(f" Note rimossa: {t_stats['n_note_rimosse']}")
print(f" Accenti corretti: {t_stats['n_accenti_corretti']}")
print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}")
print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}")
print(f" Header num. normaliz.: {t_stats['n_header_numerati_normalizzati']}")
print(f" Articoli → ###: {t_stats['n_articoli_estratti']}")
print(f" Ambienti matematici: {t_stats['n_ambienti_matematici']}")
print(f" Titoli header uniti: {t_stats['n_titoli_uniti']}")
print(f" TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}")
print(f" Versi poesia riprist.: {t_stats['n_versi_ripristinati']}")
print(f" Header verso demotati: {t_stats['n_header_verso_demotati']}")
print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}")
print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}")
print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}")
print(f" Riduzione testo: {reduction:.0f}%")
# ── [4] Profilo strutturale ────────────────────────────────────────────
print(" [4/4] Analisi struttura...")
try:
out_dir.mkdir(parents=True, exist_ok=True)
raw_out.write_text(raw_text, encoding="utf-8")
clean_out.write_text(clean_text, encoding="utf-8")
except PermissionError as e:
print(f" ✗ Permesso negato durante la scrittura: {e}")
return False
profile = analyze(clean_out)
_LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}
print(f" ✅ Struttura: livello {profile['livello_struttura']}{_LIVELLO_DESC[profile['livello_struttura']]}")
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} "
f"paragrafi={profile['n_paragrafi']}")
print(f" Strategia chunking: {profile['strategia_chunking']}")
print(f" Lingua rilevata: {profile['lingua_rilevata']}")
for w in profile["avvertenze"]:
print(f" ⚠️ {w}")
build_report(stem, out_dir, clean_text, t_stats, profile, reduction)
print(f"\n Output:")
print(f" conversione/{stem}/raw.md (immutabile)")
print(f" conversione/{stem}/clean.md")
print(f" conversione/{stem}/report.json")
print(f"\n clean.md pronto per la suddivisione in chunk.")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(
description="Pipeline PDF → clean Markdown strutturato, pronto per chunking",
epilog="Prerequisiti: pip install opendataloader-pdf + Java 11+ sul PATH",
)
parser.add_argument(
"--stem",
help="Nome del documento (PDF in sources/<stem>.pdf). "
"Se omesso, elabora tutti i PDF in sources/.",
)
parser.add_argument(
"--force",
action="store_true",
help="Riesegui anche se clean.md è già presente",
)
args = parser.parse_args()
_check_deps()
if args.stem:
stems = [args.stem]
else:
sources_dir = project_root / "sources"
if not sources_dir.exists():
print("Errore: cartella sources/ non trovata")
sys.exit(1)
stems = sorted(p.stem for p in sources_dir.glob("*.pdf"))
if not stems:
print("Errore: nessun PDF trovato in sources/")
sys.exit(1)
results = [run(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti")
sys.exit(0 if all(results) else 1)