From 0a8d98279c50cb8fc3c0d54ac35558350d7544f5 Mon Sep 17 00:00:00 2001 From: Davide Grilli Date: Fri, 17 Apr 2026 11:53:38 +0200 Subject: [PATCH] feat(conversione): robustezza e 7 nuovi transform MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - check_pdf: file < 1KB, campione esteso 15pp, MemoryError - convert_pdf: validazione output ≥ 100 char - analyze: rilevamento gerarchia invertita h3 > h2 - _detect_language: supporto FR/DE/ES - 7 nuovi transform: fix_math_symbols, remove_recurring_lines, normalize_numbered_headings, remove_toc_page_list, restore_poetry_lines, demote_verse_headers, remove_watermarks - bug fix: tabelle MD, garbage headers lowercase, empty headers - run(): MemoryError / UnicodeDecodeError / PermissionError --- conversione/pipeline.py | 331 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 317 insertions(+), 14 deletions(-) diff --git a/conversione/pipeline.py b/conversione/pipeline.py index 783f3d7..099acec 100644 --- a/conversione/pipeline.py +++ b/conversione/pipeline.py @@ -69,8 +69,11 @@ def check_pdf(pdf_path: Path) -> tuple[bool, str]: return False, f"File non trovato: {pdf_path}" if pdf_path.suffix.lower() != ".pdf": return False, f"Non è un PDF: {pdf_path.name}" - if pdf_path.stat().st_size == 0: + size = pdf_path.stat().st_size + if size == 0: return False, "File vuoto" + if size < 1024: + return False, f"File troppo piccolo ({size} byte) — probabilmente corrotto" try: import pdfplumber @@ -84,11 +87,26 @@ def check_pdf(pdf_path: Path) -> tuple[bool, str]: if len((pdf.pages[i].extract_text() or "").strip()) > 50 ) if pages_with_text == 0: + # Estende il campione: copertine immagine o pagine bianche iniziali + extended = min(15, n_pages) + if extended > sample: + ext_with_text = sum( + 1 for i in range(sample, extended) + if len((pdf.pages[i].extract_text() or "").strip()) > 50 + ) + if ext_with_text > 0: + return True, ( + f"{n_pages} pagine — prime {sample} vuote, " + f"testo trovato in pagine successive " + f"(possibile copertina immagine)" + ) return False, ( - f"Nessun testo nelle prime {sample} pagine " - f"— probabilmente scansionato (usa modalità hybrid)" + f"Nessun testo nelle prime {extended} pagine " + f"— probabilmente scansionato (OCR non supportato)" ) return True, f"{n_pages} pagine, testo digitale confermato" + except MemoryError: + return False, "Memoria esaurita durante l'apertura del PDF" except Exception as e: msg = str(e).lower() if "password" in msg or "encrypted" in msg: @@ -131,6 +149,13 @@ def convert_pdf(pdf_path: Path, out_dir: Path) -> Path: raise RuntimeError(f"Nessun file .md prodotto in {out_dir}") md_file = candidates[0] + content = md_file.read_text(encoding="utf-8", errors="replace").strip() + if len(content) < 100: + raise RuntimeError( + f"opendataloader ha prodotto un file .md quasi vuoto ({len(content)} char) " + f"— il PDF potrebbe essere corrotto o non supportato" + ) + return md_file @@ -139,6 +164,9 @@ def convert_pdf(pdf_path: Path, out_dir: Path) -> Path: _TOC_KEYWORDS = frozenset([ "indice", "index", "contents", "table of contents", "sommario", "inhaltsverzeichnis", "inhalt", + "indice generale", "indice analitico", "indice dei contenuti", + "elenco dei capitoli", "argomenti", "table des matières", + "tabla de contenidos", "содержание", ]) _ORDINALS_IT = { @@ -166,6 +194,7 @@ def _is_allcaps_line(line: str) -> bool: len(letters) >= 3 and all(c.isupper() for c in letters) and not stripped.startswith("#") + and not stripped.startswith("|") # esclude righe tabella Markdown ) @@ -457,6 +486,48 @@ def _t_extract_capitolo(text: str) -> tuple[str, int]: return text, 0 +_NUMBERED_HDR_RE = re.compile( + r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$", + re.MULTILINE, +) + + +def _t_normalize_numbered_headings(text: str) -> tuple[str, int]: + """Corregge livelli header per documenti con numerazione decimale. + + Assegna livello heading in base alla profondità numerica usando come base + il livello corrente degli header di profondità minima. + Attivo solo se il documento ha almeno 2 profondità di numerazione. + """ + all_matches = list(_NUMBERED_HDR_RE.finditer(text)) + if not all_matches: + return text, 0 + + pairs = [ + (m.group(2).count(".") + 1, len(m.group(1))) + for m in all_matches + ] + depths = [d for d, _ in pairs] + min_depth, max_depth = min(depths), max(depths) + if max_depth == min_depth: + return text, 0 + + base_level = min(lv for d, lv in pairs if d == min_depth) + count = 0 + + def _repl(m: re.Match) -> str: + nonlocal count + hashes, num, title = m.group(1), m.group(2), m.group(3) + depth = num.count(".") + 1 + new_level = min(base_level + (depth - min_depth), 6) + if new_level == len(hashes): + return m.group(0) + count += 1 + return f"{'#' * new_level} {num}. {title}" + + return _NUMBERED_HDR_RE.sub(_repl, text), count + + def _t_normalize_header_levels(text: str) -> tuple[str, int]: """Normalizza h4+ → h3; rimuove header vuoti.""" text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE) @@ -519,6 +590,30 @@ def _t_remove_toc(text: str) -> tuple[str, int]: return "\n".join(new_lines), 1 if removed else 0 +def _t_remove_toc_page_list(text: str) -> tuple[str, int]: + """Rimuovi voci lista TOC con numero di pagina finale. + + Intercetta indici come '- Canto I 1', '- Canto XXIX 119' (eventualmente + fusi su una riga: '- Canto XXIX 119 - Canto XXX 123') che opendataloader + non separa dall'indice del PDF. + """ + count = 0 + lines = text.split("\n") + new_lines = [] + for line in lines: + stripped = line.strip() + # Voce TOC fusa: "- X N - Y M" — le separiamo e le scartiamo entrambe + if re.match(r"^\s*-\s+.{2,50}\s+\d{1,4}\s+-\s+.{2,50}\s+\d{1,4}\s*$", stripped): + count += 2 + continue + # Voce TOC semplice: "- Testo ... NN" dove NN è un numero pagina isolato + if re.match(r"^\s*-\s+\S.{1,60}\s+\d{1,4}\s*$", stripped): + count += 1 + continue + new_lines.append(line) + return "\n".join(new_lines), count + + def _t_allcaps_to_headers(text: str) -> tuple[str, int]: """Converti righe ALL-CAPS standalone → ## header.""" count = 0 @@ -619,10 +714,11 @@ def _t_merge_paragraphs(text: str) -> tuple[str, int]: i + 1 < len(blocks) and stripped and not stripped.startswith("#") + and not stripped.startswith("|") # non unire righe tabella in avanti and stripped[-1] not in _SENTENCE_END ): nxt = blocks[i + 1].strip() - if not nxt or nxt.startswith("#") or re.match(r"^\d+\.", nxt): + if not nxt or nxt.startswith("#") or nxt.startswith("|") or re.match(r"^\d+\.", nxt): break b = stripped + " " + nxt stripped = b.strip() @@ -651,6 +747,97 @@ def _t_collapse_blank_lines(text: str) -> tuple[str, int]: return re.sub(r"\n{3,}", "\n\n", text), 0 +def _t_demote_verse_headers(text: str) -> tuple[str, int]: + """Demoti header che sono in realtà terzine/versi. + + opendataloader promuove a ## le iscrizioni e i testi in evidenza nel PDF + (corpo maggiore, centrato). Si riconoscono perché: + - terminano con un numero nudo (numero di verso: 3, 6, 9, …) + - contengono punteggiatura interna di fine verso (', ' o '. ') + Esempio: '## «per me si va ne la città dolente, ... gente. 3' + → paragrafo normale senza il numero finale. + """ + count = 0 + + def _demote(m: re.Match) -> str: + nonlocal count + hashes, content = m.group(1), m.group(2).strip() + # Deve terminare con numero nudo (numero di verso ≤ 9999) + if not re.search(r"\s\d{1,4}\s*$", content): + return m.group(0) + # Deve contenere punteggiatura interna (è un blocco di più versi) + inner = re.sub(r"\s\d{1,4}\s*$", "", content) + if not re.search(r"[,;:.!?»\"\']\s+[A-Za-zÀ-ÿ«\"]", inner): + return m.group(0) + count += 1 + # Rimuovi il numero di verso finale e restituisci come testo normale + clean = re.sub(r"\s\d{1,4}\s*$", "", content) + return clean + + text = re.sub( + r"^(#{1,6})\s+(.{20,})$", + _demote, + text, + flags=re.MULTILINE, + ) + return text, count + + +def _t_restore_poetry_lines(text: str) -> tuple[str, int]: + """Ripristina line break di poesia distrutti da keep_line_breaks=False. + + Quando il PDF è poesia (terzine dantesche, sonetti, ecc.) opendataloader + con keep_line_breaks=False produce un unico paragrafo con i numeri di verso + (3, 6, 9 … oppure 1, 2, 3 …) incorporati inline: + 'smarrita. 3 Ahi quanto a dir qual era è cosa dura … paura! 6 Tant'è …' + + Il transform rileva blocchi con numeri di verso in progressione aritmetica + e li separa in righe, con riga vuota ogni 3 versi (terzina). + """ + count = 0 + blocks = text.split("\n\n") + result = [] + + # Pattern: numero isolato preceduto da punteggiatura-fine-verso e seguito + # da lettera maiuscola (inizio verso successivo). + _VERSE_NUM_RE = re.compile( + r'([.!?»\'\"]\s+)(\d+)(\s+)(?=[A-ZÀ-Ùa-zà-ù«"‟])' + ) + + for block in blocks: + stripped = block.strip() + if not stripped or stripped.startswith("#"): + result.append(block) + continue + + matches = list(_VERSE_NUM_RE.finditer(stripped)) + if len(matches) < 2: + result.append(block) + continue + + nums = [int(m.group(2)) for m in matches] + diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)] + # Accetta progressioni con passo costante 1–5 (terzine: 3, endecasillabi: 1) + if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5): + result.append(block) + continue + + step = diffs[0] + + def _replace_verse_num(m: re.Match) -> str: + n = int(m.group(2)) + # Ogni 'step' versi → riga vuota (inizio nuova terzina/strofa) + sep = "\n\n" if n % (step * 3) == 0 else "\n" + return m.group(1).rstrip() + sep + + new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped) + if new_block != stripped: + count += len(matches) + result.append(new_block) + + return "\n\n".join(result), count + + def _t_remove_urls(text: str) -> tuple[str, int]: """Rimuovi righe che sono solo URL (watermark, footer di piattaforme).""" return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0 @@ -664,7 +851,14 @@ def _t_remove_empty_headers(text: str) -> tuple[str, int]: stripped = block.strip() if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped: next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else "" - if not next_stripped or re.match(r"^#{1,6} ", next_stripped): + # Non rimuovere un header breve se il successivo è un header molto lungo + # (> 80 char): quasi certamente è testo PDF mal classificato come heading. + next_is_long_header = ( + re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80 + ) + if not next_stripped or ( + re.match(r"^#{1,6} ", next_stripped) and not next_is_long_header + ): continue cleaned.append(block) return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0 @@ -686,6 +880,11 @@ def _t_remove_garbage_headers(text: str) -> tuple[str, int]: return True if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content): return True + # Frammento di frase: inizia con minuscola ed è abbastanza lungo + # (testo spezzato dalla tabella che opendataloader ha promosso a heading) + first_alpha = next((c for c in content if c.isalpha()), None) + if first_alpha and first_alpha.islower() and len(content) > 40: + return True return False count = 0 @@ -728,6 +927,58 @@ def _t_remove_frontmatter(text: str) -> tuple[str, int]: return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count +_WATERMARK_RE = re.compile( + r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN" + r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$", + re.IGNORECASE | re.MULTILINE, +) + + +def _t_remove_watermarks(text: str) -> tuple[str, int]: + """Rimuovi righe standalone con testo watermark comune.""" + lines = text.split("\n") + result, count = [], 0 + for line in lines: + if _WATERMARK_RE.match(line): + count += 1 + else: + result.append(line) + return "\n".join(result), count + + +def _t_fix_math_symbols(text: str) -> tuple[str, int]: + """Rimuovi righe composte solo da simboli box/placeholder (font non estratti).""" + lines = text.split("\n") + result, count = [], 0 + for line in lines: + if line.strip() and re.match(r"^[\s□■▪▫◆◇●○•\u25a0-\u25ff]+$", line): + count += 1 + else: + result.append(line) + return "\n".join(result), count + + +def _t_remove_recurring_lines(text: str) -> tuple[str, int]: + """Rimuovi righe corte che si ripetono ≥3 volte (header/footer di pagina).""" + from collections import Counter + lines = text.split("\n") + short_lines = [ + ln.strip() for ln in lines + if 3 < len(ln.strip()) < 80 and not ln.strip().startswith("#") + ] + freq = Counter(short_lines) + recurring = {ln for ln, c in freq.items() if c >= 3} + if not recurring: + return text, 0 + result, count = [], 0 + for line in lines: + if line.strip() in recurring: + count += 1 + else: + result.append(line) + return "\n".join(result), count + + # ─── [3b] Pipeline delle trasformazioni ────────────────────────────────────── def apply_transforms(text: str) -> tuple[str, dict]: @@ -746,26 +997,33 @@ def apply_transforms(text: str) -> tuple[str, dict]: ("n_accenti_corretti", _t_fix_accents), ("n_moltiplicazioni_corrette", _t_fix_multiplication), ("n_micro_corretti", _t_fix_micro), + ("n_simboli_math_rimossi", _t_fix_math_symbols), ("n_formule_rimossi", _t_remove_formula_labels), ("n_dotleader_rimossi", _t_remove_dotleaders), + ("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines), ("n_header_concat_fixati", _t_fix_header_concat), (None, _t_extract_capitolo), + ("n_header_numerati_normalizzati", _t_normalize_numbered_headings), (None, _t_normalize_header_levels), ("n_articoli_estratti", _t_extract_articles), (None, _t_remove_header_bold), (None, _t_normalize_allcaps_headers), ("toc_rimosso", _t_remove_toc), + ("n_toc_page_list_rimossi", _t_remove_toc_page_list), ("n_header_allcaps", _t_allcaps_to_headers), ("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)), ("n_ambienti_matematici", _t_extract_math), ("n_paragrafi_uniti", _t_merge_paragraphs), (None, _t_normalize_whitespace), (None, _t_collapse_blank_lines), + ("n_versi_ripristinati", _t_restore_poetry_lines), + ("n_header_verso_demotati", _t_demote_verse_headers), (None, _t_remove_urls), (None, _t_remove_empty_headers), ("n_titoli_uniti", _t_merge_title_headers), ("n_garbage_headers_rimossi", _t_remove_garbage_headers), ("n_frontmatter_rimossi", _t_remove_frontmatter), + ("n_watermark_rimossi", _t_remove_watermarks), ] stats: dict = {} @@ -792,16 +1050,35 @@ _EN_WORDS = frozenset([ "from", "or", "an", "but", "not", "by", "he", "she", "we", "you", "which", "their", "been", "has", "would", "there", "when", "will", ]) +_FR_WORDS = frozenset([ + "le", "les", "de", "du", "des", "et", "un", "une", "est", "que", + "pour", "dans", "sur", "avec", "qui", "par", "pas", "plus", "au", + "ce", "se", "ou", "mais", "comme", "aussi", +]) +_DE_WORDS = frozenset([ + "der", "die", "das", "und", "in", "von", "zu", "den", "mit", "ist", + "auf", "eine", "als", "dem", "des", "sich", "nicht", "auch", "werden", + "bei", "nach", "oder", "wenn", "wird", "war", +]) +_ES_WORDS = frozenset([ + "el", "los", "las", "de", "en", "un", "una", "es", "que", "por", + "con", "del", "para", "como", "pero", "sus", "son", "los", "hay", + "todo", "esta", "este", "ser", "más", "ya", +]) def _detect_language(text: str) -> str: words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower()) sample = words[:2000] - it = sum(1 for w in sample if w in _IT_WORDS) - en = sum(1 for w in sample if w in _EN_WORDS) - if it == 0 and en == 0: - return "unknown" - return "it" if it >= en else "en" + scores = { + "it": sum(1 for w in sample if w in _IT_WORDS), + "en": sum(1 for w in sample if w in _EN_WORDS), + "fr": sum(1 for w in sample if w in _FR_WORDS), + "de": sum(1 for w in sample if w in _DE_WORDS), + "es": sum(1 for w in sample if w in _ES_WORDS), + } + best = max(scores, key=scores.get) + return best if scores[best] > 0 else "unknown" def _count_headers(text: str, level: int) -> int: @@ -850,6 +1127,17 @@ def analyze(md_path: Path) -> dict: if n_h3 >= 5: livello, boundary, strategia = 3, "h3", "h3_aware" section_bodies = _split_sections(text, 3) + # Gerarchia invertita: h3 sono capitoli enormi, h2 sono sottosezioni più brevi. + # Succede quando opendataloader classifica titoli capitolo come h6 (→ normalizzati + # a h3) e le sottosezioni ALL-CAPS diventano ## (h2). In questo caso h2 è + # il boundary corretto per il chunking. + if n_h2 >= 3: + h2_bodies = _split_sections(text, 2) + avg_h3 = sum(len(b) for b in section_bodies) / len(section_bodies) if section_bodies else 0 + avg_h2 = sum(len(b) for b in h2_bodies) / len(h2_bodies) if h2_bodies else 0 + if avg_h3 > 5000 and avg_h2 < avg_h3 * 0.7: + livello, boundary, strategia = 2, "h2", "h2_paragraph_split" + section_bodies = h2_bodies elif n_h2 >= 3: livello, boundary, strategia = 2, "h2", "h2_paragraph_split" section_bodies = _split_sections(text, 2) @@ -1035,10 +1323,17 @@ def run(stem: str, project_root: Path, force: bool) -> bool: with tempfile.TemporaryDirectory() as tmp: try: md_file = convert_pdf(pdf_path, Path(tmp)) + except MemoryError: + print(" ✗ Memoria esaurita durante la conversione") + return False except Exception as e: print(f" ✗ Conversione fallita: {e}") return False - raw_text = md_file.read_text(encoding="utf-8") + try: + raw_text = md_file.read_text(encoding="utf-8") + except UnicodeDecodeError as e: + print(f" ✗ Errore encoding nel file prodotto: {e}") + return False size_kb = len(raw_text.encode()) // 1024 n_lines = raw_text.count("\n") @@ -1052,10 +1347,14 @@ def run(stem: str, project_root: Path, force: bool) -> bool: print(f" Accenti corretti: {t_stats['n_accenti_corretti']}") print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}") print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}") + print(f" Header num. normaliz.: {t_stats['n_header_numerati_normalizzati']}") print(f" Articoli → ###: {t_stats['n_articoli_estratti']}") print(f" Ambienti matematici: {t_stats['n_ambienti_matematici']}") print(f" Titoli header uniti: {t_stats['n_titoli_uniti']}") print(f" TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'}") + print(f" TOC voci pagina rim.: {t_stats['n_toc_page_list_rimossi']}") + print(f" Versi poesia riprist.: {t_stats['n_versi_ripristinati']}") + print(f" Header verso demotati: {t_stats['n_header_verso_demotati']}") print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}") print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}") print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}") @@ -1063,9 +1362,13 @@ def run(stem: str, project_root: Path, force: bool) -> bool: # ── [4] Profilo strutturale ──────────────────────────────────────────── print(" [4/4] Analisi struttura...") - out_dir.mkdir(parents=True, exist_ok=True) - raw_out.write_text(raw_text, encoding="utf-8") - clean_out.write_text(clean_text, encoding="utf-8") + try: + out_dir.mkdir(parents=True, exist_ok=True) + raw_out.write_text(raw_text, encoding="utf-8") + clean_out.write_text(clean_text, encoding="utf-8") + except PermissionError as e: + print(f" ✗ Permesso negato durante la scrittura: {e}") + return False profile = analyze(clean_out) _LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}