feat(conversione): robustezza e 7 nuovi transform

- check_pdf: file < 1KB, campione esteso 15pp, MemoryError
- convert_pdf: validazione output ≥ 100 char
- analyze: rilevamento gerarchia invertita h3 > h2
- _detect_language: supporto FR/DE/ES
- 7 nuovi transform: fix_math_symbols, remove_recurring_lines,
  normalize_numbered_headings, remove_toc_page_list,
  restore_poetry_lines, demote_verse_headers, remove_watermarks
- bug fix: tabelle MD, garbage headers lowercase, empty headers
- run(): MemoryError / UnicodeDecodeError / PermissionError
This commit is contained in:
2026-04-17 11:53:38 +02:00
parent 757df26bc2
commit 0a8d98279c
+317 -14
View File
@@ -69,8 +69,11 @@ def check_pdf(pdf_path: Path) -> tuple[bool, str]:
return False, f"File non trovato: {pdf_path}"
if pdf_path.suffix.lower() != ".pdf":
return False, f"Non è un PDF: {pdf_path.name}"
if pdf_path.stat().st_size == 0:
size = pdf_path.stat().st_size
if size == 0:
return False, "File vuoto"
if size < 1024:
return False, f"File troppo piccolo ({size} byte) — probabilmente corrotto"
try:
import pdfplumber
@@ -84,11 +87,26 @@ def check_pdf(pdf_path: Path) -> tuple[bool, str]:
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if pages_with_text == 0:
# Estende il campione: copertine immagine o pagine bianche iniziali
extended = min(15, n_pages)
if extended > sample:
ext_with_text = sum(
1 for i in range(sample, extended)
if len((pdf.pages[i].extract_text() or "").strip()) > 50
)
if ext_with_text > 0:
return True, (
f"{n_pages} pagine — prime {sample} vuote, "
f"testo trovato in pagine successive "
f"(possibile copertina immagine)"
)
return False, (
f"Nessun testo nelle prime {sample} pagine "
f"— probabilmente scansionato (usa modalità hybrid)"
f"Nessun testo nelle prime {extended} pagine "
f"— probabilmente scansionato (OCR non supportato)"
)
return True, f"{n_pages} pagine, testo digitale confermato"
except MemoryError:
return False, "Memoria esaurita durante l'apertura del PDF"
except Exception as e:
msg = str(e).lower()
if "password" in msg or "encrypted" in msg:
@@ -131,6 +149,13 @@ def convert_pdf(pdf_path: Path, out_dir: Path) -> Path:
raise RuntimeError(f"Nessun file .md prodotto in {out_dir}")
md_file = candidates[0]
content = md_file.read_text(encoding="utf-8", errors="replace").strip()
if len(content) < 100:
raise RuntimeError(
f"opendataloader ha prodotto un file .md quasi vuoto ({len(content)} char) "
f"— il PDF potrebbe essere corrotto o non supportato"
)
return md_file
@@ -139,6 +164,9 @@ def convert_pdf(pdf_path: Path, out_dir: Path) -> Path:
_TOC_KEYWORDS = frozenset([
"indice", "index", "contents", "table of contents",
"sommario", "inhaltsverzeichnis", "inhalt",
"indice generale", "indice analitico", "indice dei contenuti",
"elenco dei capitoli", "argomenti", "table des matières",
"tabla de contenidos", "содержание",
])
_ORDINALS_IT = {
@@ -166,6 +194,7 @@ def _is_allcaps_line(line: str) -> bool:
len(letters) >= 3
and all(c.isupper() for c in letters)
and not stripped.startswith("#")
and not stripped.startswith("|") # esclude righe tabella Markdown
)
@@ -457,6 +486,48 @@ def _t_extract_capitolo(text: str) -> tuple[str, int]:
return text, 0
_NUMBERED_HDR_RE = re.compile(
r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$",
re.MULTILINE,
)
def _t_normalize_numbered_headings(text: str) -> tuple[str, int]:
"""Corregge livelli header per documenti con numerazione decimale.
Assegna livello heading in base alla profondità numerica usando come base
il livello corrente degli header di profondità minima.
Attivo solo se il documento ha almeno 2 profondità di numerazione.
"""
all_matches = list(_NUMBERED_HDR_RE.finditer(text))
if not all_matches:
return text, 0
pairs = [
(m.group(2).count(".") + 1, len(m.group(1)))
for m in all_matches
]
depths = [d for d, _ in pairs]
min_depth, max_depth = min(depths), max(depths)
if max_depth == min_depth:
return text, 0
base_level = min(lv for d, lv in pairs if d == min_depth)
count = 0
def _repl(m: re.Match) -> str:
nonlocal count
hashes, num, title = m.group(1), m.group(2), m.group(3)
depth = num.count(".") + 1
new_level = min(base_level + (depth - min_depth), 6)
if new_level == len(hashes):
return m.group(0)
count += 1
return f"{'#' * new_level} {num}. {title}"
return _NUMBERED_HDR_RE.sub(_repl, text), count
def _t_normalize_header_levels(text: str) -> tuple[str, int]:
"""Normalizza h4+ → h3; rimuove header vuoti."""
text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE)
@@ -519,6 +590,30 @@ def _t_remove_toc(text: str) -> tuple[str, int]:
return "\n".join(new_lines), 1 if removed else 0
def _t_remove_toc_page_list(text: str) -> tuple[str, int]:
"""Rimuovi voci lista TOC con numero di pagina finale.
Intercetta indici come '- Canto I 1', '- Canto XXIX 119' (eventualmente
fusi su una riga: '- Canto XXIX 119 - Canto XXX 123') che opendataloader
non separa dall'indice del PDF.
"""
count = 0
lines = text.split("\n")
new_lines = []
for line in lines:
stripped = line.strip()
# Voce TOC fusa: "- X N - Y M" — le separiamo e le scartiamo entrambe
if re.match(r"^\s*-\s+.{2,50}\s+\d{1,4}\s+-\s+.{2,50}\s+\d{1,4}\s*$", stripped):
count += 2
continue
# Voce TOC semplice: "- Testo ... NN" dove NN è un numero pagina isolato
if re.match(r"^\s*-\s+\S.{1,60}\s+\d{1,4}\s*$", stripped):
count += 1
continue
new_lines.append(line)
return "\n".join(new_lines), count
def _t_allcaps_to_headers(text: str) -> tuple[str, int]:
"""Converti righe ALL-CAPS standalone → ## header."""
count = 0
@@ -619,10 +714,11 @@ def _t_merge_paragraphs(text: str) -> tuple[str, int]:
i + 1 < len(blocks)
and stripped
and not stripped.startswith("#")
and not stripped.startswith("|") # non unire righe tabella in avanti
and stripped[-1] not in _SENTENCE_END
):
nxt = blocks[i + 1].strip()
if not nxt or nxt.startswith("#") or re.match(r"^\d+\.", nxt):
if not nxt or nxt.startswith("#") or nxt.startswith("|") or re.match(r"^\d+\.", nxt):
break
b = stripped + " " + nxt
stripped = b.strip()
@@ -651,6 +747,97 @@ def _t_collapse_blank_lines(text: str) -> tuple[str, int]:
return re.sub(r"\n{3,}", "\n\n", text), 0
def _t_demote_verse_headers(text: str) -> tuple[str, int]:
"""Demoti header che sono in realtà terzine/versi.
opendataloader promuove a ## le iscrizioni e i testi in evidenza nel PDF
(corpo maggiore, centrato). Si riconoscono perché:
- terminano con un numero nudo (numero di verso: 3, 6, 9, …)
- contengono punteggiatura interna di fine verso (', ' o '. ')
Esempio: '## «per me si va ne la città dolente, ... gente. 3'
→ paragrafo normale senza il numero finale.
"""
count = 0
def _demote(m: re.Match) -> str:
nonlocal count
hashes, content = m.group(1), m.group(2).strip()
# Deve terminare con numero nudo (numero di verso ≤ 9999)
if not re.search(r"\s\d{1,4}\s*$", content):
return m.group(0)
# Deve contenere punteggiatura interna (è un blocco di più versi)
inner = re.sub(r"\s\d{1,4}\s*$", "", content)
if not re.search(r"[,;:.!?»\"\']\s+[A-Za-zÀ-ÿ«\"]", inner):
return m.group(0)
count += 1
# Rimuovi il numero di verso finale e restituisci come testo normale
clean = re.sub(r"\s\d{1,4}\s*$", "", content)
return clean
text = re.sub(
r"^(#{1,6})\s+(.{20,})$",
_demote,
text,
flags=re.MULTILINE,
)
return text, count
def _t_restore_poetry_lines(text: str) -> tuple[str, int]:
"""Ripristina line break di poesia distrutti da keep_line_breaks=False.
Quando il PDF è poesia (terzine dantesche, sonetti, ecc.) opendataloader
con keep_line_breaks=False produce un unico paragrafo con i numeri di verso
(3, 6, 9 … oppure 1, 2, 3 …) incorporati inline:
'smarrita. 3 Ahi quanto a dir qual era è cosa dura … paura! 6 Tant'è …'
Il transform rileva blocchi con numeri di verso in progressione aritmetica
e li separa in righe, con riga vuota ogni 3 versi (terzina).
"""
count = 0
blocks = text.split("\n\n")
result = []
# Pattern: numero isolato preceduto da punteggiatura-fine-verso e seguito
# da lettera maiuscola (inizio verso successivo).
_VERSE_NUM_RE = re.compile(
r'([.!?»\'\"]\s+)(\d+)(\s+)(?=[A-ZÀ-Ùa-zà-ù«"‟])'
)
for block in blocks:
stripped = block.strip()
if not stripped or stripped.startswith("#"):
result.append(block)
continue
matches = list(_VERSE_NUM_RE.finditer(stripped))
if len(matches) < 2:
result.append(block)
continue
nums = [int(m.group(2)) for m in matches]
diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)]
# Accetta progressioni con passo costante 15 (terzine: 3, endecasillabi: 1)
if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5):
result.append(block)
continue
step = diffs[0]
def _replace_verse_num(m: re.Match) -> str:
n = int(m.group(2))
# Ogni 'step' versi → riga vuota (inizio nuova terzina/strofa)
sep = "\n\n" if n % (step * 3) == 0 else "\n"
return m.group(1).rstrip() + sep
new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped)
if new_block != stripped:
count += len(matches)
result.append(new_block)
return "\n\n".join(result), count
def _t_remove_urls(text: str) -> tuple[str, int]:
"""Rimuovi righe che sono solo URL (watermark, footer di piattaforme)."""
return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0
@@ -664,7 +851,14 @@ def _t_remove_empty_headers(text: str) -> tuple[str, int]:
stripped = block.strip()
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
if not next_stripped or re.match(r"^#{1,6} ", next_stripped):
# Non rimuovere un header breve se il successivo è un header molto lungo
# (> 80 char): quasi certamente è testo PDF mal classificato come heading.
next_is_long_header = (
re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80
)
if not next_stripped or (
re.match(r"^#{1,6} ", next_stripped) and not next_is_long_header
):
continue
cleaned.append(block)
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0
@@ -686,6 +880,11 @@ def _t_remove_garbage_headers(text: str) -> tuple[str, int]:
return True
if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content):
return True
# Frammento di frase: inizia con minuscola ed è abbastanza lungo
# (testo spezzato dalla tabella che opendataloader ha promosso a heading)
first_alpha = next((c for c in content if c.isalpha()), None)
if first_alpha and first_alpha.islower() and len(content) > 40:
return True
return False
count = 0
@@ -728,6 +927,58 @@ def _t_remove_frontmatter(text: str) -> tuple[str, int]:
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count
_WATERMARK_RE = re.compile(
r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN"
r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$",
re.IGNORECASE | re.MULTILINE,
)
def _t_remove_watermarks(text: str) -> tuple[str, int]:
"""Rimuovi righe standalone con testo watermark comune."""
lines = text.split("\n")
result, count = [], 0
for line in lines:
if _WATERMARK_RE.match(line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_fix_math_symbols(text: str) -> tuple[str, int]:
"""Rimuovi righe composte solo da simboli box/placeholder (font non estratti)."""
lines = text.split("\n")
result, count = [], 0
for line in lines:
if line.strip() and re.match(r"^[\s□■▪▫◆◇●○•\u25a0-\u25ff]+$", line):
count += 1
else:
result.append(line)
return "\n".join(result), count
def _t_remove_recurring_lines(text: str) -> tuple[str, int]:
"""Rimuovi righe corte che si ripetono ≥3 volte (header/footer di pagina)."""
from collections import Counter
lines = text.split("\n")
short_lines = [
ln.strip() for ln in lines
if 3 < len(ln.strip()) < 80 and not ln.strip().startswith("#")
]
freq = Counter(short_lines)
recurring = {ln for ln, c in freq.items() if c >= 3}
if not recurring:
return text, 0
result, count = [], 0
for line in lines:
if line.strip() in recurring:
count += 1
else:
result.append(line)
return "\n".join(result), count
# ─── [3b] Pipeline delle trasformazioni ──────────────────────────────────────
def apply_transforms(text: str) -> tuple[str, dict]:
@@ -746,26 +997,33 @@ def apply_transforms(text: str) -> tuple[str, dict]:
("n_accenti_corretti", _t_fix_accents),
("n_moltiplicazioni_corrette", _t_fix_multiplication),
("n_micro_corretti", _t_fix_micro),
("n_simboli_math_rimossi", _t_fix_math_symbols),
("n_formule_rimossi", _t_remove_formula_labels),
("n_dotleader_rimossi", _t_remove_dotleaders),
("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines),
("n_header_concat_fixati", _t_fix_header_concat),
(None, _t_extract_capitolo),
("n_header_numerati_normalizzati", _t_normalize_numbered_headings),
(None, _t_normalize_header_levels),
("n_articoli_estratti", _t_extract_articles),
(None, _t_remove_header_bold),
(None, _t_normalize_allcaps_headers),
("toc_rimosso", _t_remove_toc),
("n_toc_page_list_rimossi", _t_remove_toc_page_list),
("n_header_allcaps", _t_allcaps_to_headers),
("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)),
("n_ambienti_matematici", _t_extract_math),
("n_paragrafi_uniti", _t_merge_paragraphs),
(None, _t_normalize_whitespace),
(None, _t_collapse_blank_lines),
("n_versi_ripristinati", _t_restore_poetry_lines),
("n_header_verso_demotati", _t_demote_verse_headers),
(None, _t_remove_urls),
(None, _t_remove_empty_headers),
("n_titoli_uniti", _t_merge_title_headers),
("n_garbage_headers_rimossi", _t_remove_garbage_headers),
("n_frontmatter_rimossi", _t_remove_frontmatter),
("n_watermark_rimossi", _t_remove_watermarks),
]
stats: dict = {}
@@ -792,16 +1050,35 @@ _EN_WORDS = frozenset([
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
"which", "their", "been", "has", "would", "there", "when", "will",
])
_FR_WORDS = frozenset([
"le", "les", "de", "du", "des", "et", "un", "une", "est", "que",
"pour", "dans", "sur", "avec", "qui", "par", "pas", "plus", "au",
"ce", "se", "ou", "mais", "comme", "aussi",
])
_DE_WORDS = frozenset([
"der", "die", "das", "und", "in", "von", "zu", "den", "mit", "ist",
"auf", "eine", "als", "dem", "des", "sich", "nicht", "auch", "werden",
"bei", "nach", "oder", "wenn", "wird", "war",
])
_ES_WORDS = frozenset([
"el", "los", "las", "de", "en", "un", "una", "es", "que", "por",
"con", "del", "para", "como", "pero", "sus", "son", "los", "hay",
"todo", "esta", "este", "ser", "más", "ya",
])
def _detect_language(text: str) -> str:
words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())
sample = words[:2000]
it = sum(1 for w in sample if w in _IT_WORDS)
en = sum(1 for w in sample if w in _EN_WORDS)
if it == 0 and en == 0:
return "unknown"
return "it" if it >= en else "en"
scores = {
"it": sum(1 for w in sample if w in _IT_WORDS),
"en": sum(1 for w in sample if w in _EN_WORDS),
"fr": sum(1 for w in sample if w in _FR_WORDS),
"de": sum(1 for w in sample if w in _DE_WORDS),
"es": sum(1 for w in sample if w in _ES_WORDS),
}
best = max(scores, key=scores.get)
return best if scores[best] > 0 else "unknown"
def _count_headers(text: str, level: int) -> int:
@@ -850,6 +1127,17 @@ def analyze(md_path: Path) -> dict:
if n_h3 >= 5:
livello, boundary, strategia = 3, "h3", "h3_aware"
section_bodies = _split_sections(text, 3)
# Gerarchia invertita: h3 sono capitoli enormi, h2 sono sottosezioni più brevi.
# Succede quando opendataloader classifica titoli capitolo come h6 (→ normalizzati
# a h3) e le sottosezioni ALL-CAPS diventano ## (h2). In questo caso h2 è
# il boundary corretto per il chunking.
if n_h2 >= 3:
h2_bodies = _split_sections(text, 2)
avg_h3 = sum(len(b) for b in section_bodies) / len(section_bodies) if section_bodies else 0
avg_h2 = sum(len(b) for b in h2_bodies) / len(h2_bodies) if h2_bodies else 0
if avg_h3 > 5000 and avg_h2 < avg_h3 * 0.7:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = h2_bodies
elif n_h2 >= 3:
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
section_bodies = _split_sections(text, 2)
@@ -1035,10 +1323,17 @@ def run(stem: str, project_root: Path, force: bool) -> bool:
with tempfile.TemporaryDirectory() as tmp:
try:
md_file = convert_pdf(pdf_path, Path(tmp))
except MemoryError:
print(" ✗ Memoria esaurita durante la conversione")
return False
except Exception as e:
print(f" ✗ Conversione fallita: {e}")
return False
raw_text = md_file.read_text(encoding="utf-8")
try:
raw_text = md_file.read_text(encoding="utf-8")
except UnicodeDecodeError as e:
print(f" ✗ Errore encoding nel file prodotto: {e}")
return False
size_kb = len(raw_text.encode()) // 1024
n_lines = raw_text.count("\n")
@@ -1052,10 +1347,14 @@ def run(stem: str, project_root: Path, force: bool) -> bool:
print(f" Accenti corretti: {t_stats['n_accenti_corretti']}")
print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}")
print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}")
print(f" Header num. normaliz.: {t_stats['n_header_numerati_normalizzati']}")
print(f" Articoli → ###: {t_stats['n_articoli_estratti']}")
print(f" Ambienti matematici: {t_stats['n_ambienti_matematici']}")
print(f" Titoli header uniti: {t_stats['n_titoli_uniti']}")
print(f" TOC rimosso: {'' if t_stats['toc_rimosso'] else 'no'}")
print(f" TOC voci pagina rim.: {t_stats['n_toc_page_list_rimossi']}")
print(f" Versi poesia riprist.: {t_stats['n_versi_ripristinati']}")
print(f" Header verso demotati: {t_stats['n_header_verso_demotati']}")
print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}")
print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}")
print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}")
@@ -1063,9 +1362,13 @@ def run(stem: str, project_root: Path, force: bool) -> bool:
# ── [4] Profilo strutturale ────────────────────────────────────────────
print(" [4/4] Analisi struttura...")
out_dir.mkdir(parents=True, exist_ok=True)
raw_out.write_text(raw_text, encoding="utf-8")
clean_out.write_text(clean_text, encoding="utf-8")
try:
out_dir.mkdir(parents=True, exist_ok=True)
raw_out.write_text(raw_text, encoding="utf-8")
clean_out.write_text(clean_text, encoding="utf-8")
except PermissionError as e:
print(f" ✗ Permesso negato durante la scrittura: {e}")
return False
profile = analyze(clean_out)
_LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}