1601 lines
60 KiB
Python
1601 lines
60 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
conversione/pipeline.py — PDF → clean Markdown (pipeline automatica)
|
||
|
||
Converte un PDF grezzo in Markdown strutturato e pulito, pronto per la
|
||
suddivisione in chunk. Gestisce validazione, estrazione testo, pulizia
|
||
strutturale e rilevamento automatico della struttura del documento.
|
||
|
||
Usa opendataloader-pdf (algoritmo XY-Cut++ per ordine di lettura corretto,
|
||
testo fluente, struttura preservata).
|
||
|
||
Output per ciascuno stem:
|
||
conversione/<stem>/raw.md — Markdown grezzo (immutabile)
|
||
conversione/<stem>/clean.md — Markdown pulito e strutturato
|
||
conversione/<stem>/structure_profile.json
|
||
|
||
Uso:
|
||
python conversione/pipeline.py --stem <nome>
|
||
python conversione/pipeline.py # tutti i PDF in sources/
|
||
python conversione/pipeline.py --stem <nome> --force # forza riesecuzione
|
||
|
||
Prerequisiti:
|
||
pip install opendataloader-pdf
|
||
Java 11+ sul PATH (https://adoptium.net/)
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import tempfile
|
||
from collections import Counter
|
||
from datetime import datetime
|
||
from functools import partial
|
||
from pathlib import Path
|
||
|
||
|
||
# ─── Verifica dipendenze ──────────────────────────────────────────────────────
|
||
|
||
def _check_deps() -> None:
|
||
try:
|
||
import opendataloader_pdf # noqa: F401
|
||
except ImportError:
|
||
print("Errore: opendataloader-pdf non installato.")
|
||
print(" pip install opendataloader-pdf")
|
||
sys.exit(1)
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
["java", "-version"],
|
||
capture_output=True, text=True,
|
||
)
|
||
if result.returncode != 0:
|
||
raise FileNotFoundError
|
||
except FileNotFoundError:
|
||
print("Errore: Java 11+ non trovato sul PATH.")
|
||
print(" Installa da https://adoptium.net/")
|
||
sys.exit(1)
|
||
|
||
|
||
# ─── [1] Validazione PDF ─────────────────────────────────────────────────────
|
||
|
||
def check_pdf(pdf_path: Path) -> tuple[bool, str]:
|
||
"""
|
||
Validazione rapida: esistenza, leggibilità, testo estraibile.
|
||
Restituisce (ok, messaggio).
|
||
"""
|
||
if not pdf_path.exists():
|
||
return False, f"File non trovato: {pdf_path}"
|
||
if pdf_path.suffix.lower() != ".pdf":
|
||
return False, f"Non è un PDF: {pdf_path.name}"
|
||
size = pdf_path.stat().st_size
|
||
if size == 0:
|
||
return False, "File vuoto"
|
||
if size < 1024:
|
||
return False, f"File troppo piccolo ({size} byte) — probabilmente corrotto"
|
||
|
||
try:
|
||
import pdfplumber
|
||
with pdfplumber.open(pdf_path) as pdf:
|
||
n_pages = len(pdf.pages)
|
||
if n_pages == 0:
|
||
return False, "PDF senza pagine"
|
||
sample = min(5, n_pages)
|
||
pages_with_text = sum(
|
||
1 for i in range(sample)
|
||
if len((pdf.pages[i].extract_text() or "").strip()) > 50
|
||
)
|
||
if pages_with_text == 0:
|
||
# Estende il campione: copertine immagine o pagine bianche iniziali
|
||
extended = min(15, n_pages)
|
||
if extended > sample:
|
||
ext_with_text = sum(
|
||
1 for i in range(sample, extended)
|
||
if len((pdf.pages[i].extract_text() or "").strip()) > 50
|
||
)
|
||
if ext_with_text > 0:
|
||
return True, (
|
||
f"{n_pages} pagine — prime {sample} vuote, "
|
||
f"testo trovato in pagine successive "
|
||
f"(possibile copertina immagine)"
|
||
)
|
||
return False, (
|
||
f"Nessun testo nelle prime {extended} pagine "
|
||
f"— probabilmente scansionato (OCR non supportato)"
|
||
)
|
||
return True, f"{n_pages} pagine, testo digitale confermato"
|
||
except MemoryError:
|
||
return False, "Memoria esaurita durante l'apertura del PDF"
|
||
except Exception as e:
|
||
msg = str(e).lower()
|
||
if "password" in msg or "encrypted" in msg:
|
||
return False, "PDF protetto da password"
|
||
return False, f"Impossibile aprire: {e}"
|
||
|
||
|
||
# ─── [2] Conversione PDF → Markdown ─────────────────────────────────────────
|
||
|
||
def convert_pdf(pdf_path: Path, out_dir: Path) -> Path:
|
||
"""
|
||
Converte il PDF in Markdown tramite opendataloader-pdf.
|
||
Scrive il file nella out_dir e restituisce il percorso.
|
||
|
||
Parametri scelti per output RAG-ottimale:
|
||
- keep_line_breaks=False → testo fluente, no hard-wrap PDF
|
||
- reading_order="xycut" → corregge ordine multi-colonna (XY-Cut++)
|
||
- sanitize=False → preserva il testo originale (no anonimizzazione PII)
|
||
"""
|
||
import opendataloader_pdf
|
||
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
opendataloader_pdf.convert(
|
||
input_path=str(pdf_path),
|
||
output_dir=str(out_dir),
|
||
format="markdown",
|
||
keep_line_breaks=False,
|
||
reading_order="xycut",
|
||
sanitize=False,
|
||
image_output="off", # nessuna immagine estratta né referenziata
|
||
quiet=True, # sopprime i log Java
|
||
)
|
||
|
||
# Il file output si chiama <stem>.md
|
||
md_file = out_dir / f"{pdf_path.stem}.md"
|
||
if not md_file.exists():
|
||
candidates = list(out_dir.glob("*.md"))
|
||
if not candidates:
|
||
raise RuntimeError(f"Nessun file .md prodotto in {out_dir}")
|
||
md_file = candidates[0]
|
||
|
||
content = md_file.read_text(encoding="utf-8", errors="replace").strip()
|
||
if len(content) < 100:
|
||
raise RuntimeError(
|
||
f"opendataloader ha prodotto un file .md quasi vuoto ({len(content)} char) "
|
||
f"— il PDF potrebbe essere corrotto o non supportato"
|
||
)
|
||
|
||
return md_file
|
||
|
||
|
||
# ─── [3] Pulizia strutturale ─────────────────────────────────────────────────
|
||
|
||
_TOC_KEYWORDS = frozenset([
|
||
"indice", "index", "contents", "table of contents",
|
||
"sommario", "inhaltsverzeichnis", "inhalt",
|
||
"indice generale", "indice analitico", "indice dei contenuti",
|
||
"elenco dei capitoli", "argomenti", "table des matières",
|
||
"tabla de contenidos", "содержание",
|
||
])
|
||
|
||
_ORDINALS_IT = {
|
||
"PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV",
|
||
"QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII",
|
||
"NONO": "IX", "DECIMO": "X",
|
||
}
|
||
_ORDINALS_EN = {
|
||
"ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5",
|
||
"SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10",
|
||
}
|
||
|
||
|
||
def _sentence_case(s: str) -> str:
|
||
if not s:
|
||
return s
|
||
lower = s.lower()
|
||
return lower[0].upper() + lower[1:]
|
||
|
||
|
||
def _is_allcaps_line(line: str) -> bool:
|
||
stripped = line.strip()
|
||
letters = [c for c in stripped if c.isalpha()]
|
||
return (
|
||
len(letters) >= 3
|
||
and all(c.isupper() for c in letters)
|
||
and not stripped.startswith("#")
|
||
and not stripped.startswith("|") # esclude righe tabella Markdown
|
||
)
|
||
|
||
|
||
def _allcaps_to_header(raw_line: str) -> str:
|
||
# Rimuovi eventuale prefisso di lista "- " o "* " prima di creare l'header
|
||
text = re.sub(r"^[-*+]\s+", "", raw_line.strip())
|
||
text = text.rstrip(".").rstrip("?").strip()
|
||
|
||
_ORD_IT_PAT = "|".join(_ORDINALS_IT.keys())
|
||
m = re.match(rf"^CAPITOLO ({_ORD_IT_PAT})\. (.+)", text)
|
||
if m:
|
||
roman = _ORDINALS_IT[m.group(1)]
|
||
titolo = m.group(2).rstrip(".").rstrip("?").strip()
|
||
return f"## Capitolo {roman} — {_sentence_case(titolo)}"
|
||
|
||
_ORD_EN_PAT = "|".join(_ORDINALS_EN.keys())
|
||
m = re.match(rf"^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)", text)
|
||
if m:
|
||
n = _ORDINALS_EN.get(m.group(1), m.group(1))
|
||
titolo = m.group(2).rstrip(".").rstrip("?").strip()
|
||
return f"## Chapter {n} — {_sentence_case(titolo)}"
|
||
|
||
m = re.match(r"^([IVXLCDM]+|[0-9]+)\. (.+)", text)
|
||
if m:
|
||
return f"## {m.group(1)}. {_sentence_case(m.group(2).rstrip('.').strip())}"
|
||
|
||
return f"## {_sentence_case(text)}"
|
||
|
||
|
||
def _extract_math_environments(text: str) -> tuple[str, int]:
|
||
"""
|
||
Converte paragrafi che iniziano con ambienti matematici in header ###.
|
||
|
||
'Teorema 1.6.3 (principio di induzione) Sia A ⊆ N...'
|
||
→ '### Teorema 1.6.3 (principio di induzione)\n\nSia A ⊆ N...'
|
||
|
||
Riconosce: Definizione, Teorema, Lemma, Proposizione, Corollario,
|
||
Osservazione, Nota, Esempio (solo con numero di sezione).
|
||
Non tocca paragrafi che già iniziano con un header Markdown.
|
||
Deve girare PRIMA del merge paragrafi (step 5) per sfruttare i blocchi intatti.
|
||
"""
|
||
_ENVS = (
|
||
r"Definizione|Definition|Teorema|Theorem|Lemma|"
|
||
r"Proposizione|Proposition|Corollario|Corollary|"
|
||
r"Osservazione|Remark|Nota|Note|Esempio|Example"
|
||
)
|
||
count = 0
|
||
blocks = text.split("\n\n")
|
||
result = []
|
||
|
||
for block in blocks:
|
||
stripped = block.strip()
|
||
if not stripped or stripped.startswith("#"):
|
||
result.append(block)
|
||
continue
|
||
|
||
m = re.match(
|
||
rf"^({_ENVS})\s+((?:\d+\.?){{1,4}})\s*(.*)",
|
||
stripped,
|
||
re.DOTALL,
|
||
)
|
||
if not m:
|
||
result.append(block)
|
||
continue
|
||
|
||
env = m.group(1)
|
||
num = m.group(2).rstrip(".")
|
||
rest = m.group(3).strip()
|
||
|
||
# Titolo opzionale tra parentesi: "(principio di induzione)"
|
||
title_m = re.match(r"^(\([^)]{2,60}\))\s+(.*)", rest, re.DOTALL)
|
||
if title_m:
|
||
header = f"### {env} {num} {title_m.group(1)}"
|
||
body = title_m.group(2).strip()
|
||
else:
|
||
header = f"### {env} {num}."
|
||
body = rest
|
||
|
||
result.append(f"{header}\n\n{body}" if body else header)
|
||
count += 1
|
||
|
||
return "\n\n".join(result), count
|
||
|
||
|
||
def _merge_title_headers(text: str) -> tuple[str, int]:
|
||
"""
|
||
Fonde header numerici isolati con il sottotitolo breve che li segue.
|
||
|
||
'### N.\n\nSottotitolo (riga singola ≤ 80 char, senza punto finale)'
|
||
→ '### N. Sottotitolo'
|
||
|
||
Caso tipico: parti di un'opera (es. Nietzsche) dove il numero di sezione
|
||
e il titolo della sezione sono in blocchi Markdown separati.
|
||
Non tocca header con titolo già inline né header seguiti da testo lungo.
|
||
"""
|
||
count = 0
|
||
blocks = re.split(r"\n{2,}", text)
|
||
result = []
|
||
i = 0
|
||
while i < len(blocks):
|
||
block = blocks[i]
|
||
stripped = block.strip()
|
||
if (
|
||
re.match(r"^#{2,3} \d+\.\s*$", stripped)
|
||
and i + 1 < len(blocks)
|
||
):
|
||
nxt = blocks[i + 1].strip()
|
||
# Sottotitolo valido: riga singola, ≤ 80 char, non header, non numerazione pura
|
||
if (
|
||
nxt
|
||
and "\n" not in nxt
|
||
and len(nxt) <= 80
|
||
and not nxt.startswith("#")
|
||
and not re.match(r"^\d+[\.\)]\s", nxt)
|
||
):
|
||
result.append(stripped.rstrip() + " " + nxt)
|
||
count += 1
|
||
i += 2
|
||
continue
|
||
result.append(block)
|
||
i += 1
|
||
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(result)), count
|
||
|
||
|
||
def _extract_article_headers(text: str) -> tuple[str, int]:
|
||
"""
|
||
Converte voci di articolo dal formato lista Markdown al formato header ###.
|
||
|
||
'- Art. N[suffix]. Titolo. Corpo testo...' → '### Art. N[suffix]. Titolo.\n\nCorpo testo...'
|
||
'- Art. N[suffix]. (…) (1)' → '### Art. N[suffix].\n\n(…) (1)'
|
||
|
||
Gestisce suffissi come: Art. 4-bis., Art. 14-ter., Art. 1-quinquies.
|
||
Il titolo è la prima frase con iniziale maiuscola che termina con '.' prima di
|
||
ulteriore testo (es. "Leggi. La formazione..." → titolo "Leggi", corpo "La formazione...").
|
||
Se il testo non ha titolo separabile, tutto diventa il corpo.
|
||
"""
|
||
count = 0
|
||
|
||
def _repl(m: re.Match) -> str:
|
||
nonlocal count
|
||
num = m.group(1)
|
||
rest = m.group(2).strip()
|
||
|
||
# Titolo: frase con iniziale maiuscola, max 75 char, termina con '.',
|
||
# seguita da almeno un'altra frase (minimo 5 char) che inizia con maiuscola
|
||
# o con '(' / cifra (note a piè o continuazione corpo).
|
||
title_m = re.match(
|
||
r"^([A-ZÀÈÉÌÍÒÓÙÚ].{1,74}?)\.\s+([A-ZÀÈÉÌÍÒÓÙÚ\(\d].{4,})",
|
||
rest,
|
||
)
|
||
if title_m:
|
||
count += 1
|
||
return (
|
||
f"### Art. {num}. {title_m.group(1)}.\n\n"
|
||
f"{title_m.group(2).strip()}"
|
||
)
|
||
|
||
# Nessun titolo separabile: tutto è corpo
|
||
if rest:
|
||
count += 1
|
||
return f"### Art. {num}.\n\n{rest}"
|
||
|
||
# Articolo senza testo inline (es. "- Art. 5. (…) (1)" già estratto sopra,
|
||
# oppure articolo vuoto nella lista)
|
||
count += 1
|
||
return f"### Art. {num}."
|
||
|
||
text = re.sub(
|
||
r"^-\s+Art\.\s+([\d]+[a-z\-]*)\.\s*(.*)",
|
||
_repl,
|
||
text,
|
||
flags=re.MULTILINE,
|
||
)
|
||
return text, count
|
||
|
||
|
||
# ─── [3a] Funzioni di trasformazione ─────────────────────────────────────────
|
||
|
||
# Mapping PUA Unicode (U+F020-U+F0FF) → simboli corretti per font Symbol/Wingdings.
|
||
# Il font Symbol di Windows codifica lettere greche e operatori matematici nel
|
||
# range Private Use Area invece dei codepoint Unicode standard.
|
||
_SYMBOL_PUA_MAP: dict[str, str] = {
|
||
"\uf020": " ", # space
|
||
"\uf028": "(",
|
||
"\uf029": ")",
|
||
"\uf02b": "+",
|
||
"\uf02d": "\u2212", # minus
|
||
"\uf02e": ".",
|
||
"\uf02f": "/",
|
||
"\uf030": "0", "\uf031": "1", "\uf032": "2", "\uf033": "3", "\uf034": "4",
|
||
"\uf035": "5", "\uf036": "6", "\uf037": "7", "\uf038": "8", "\uf039": "9",
|
||
"\uf03a": ":", "\uf03b": ";", "\uf03c": "<", "\uf03d": "=", "\uf03e": ">",
|
||
"\uf040": "\u2245", # congruent
|
||
"\uf041": "\u0391", # Alpha
|
||
"\uf042": "\u0392", # Beta
|
||
"\uf043": "\u03a7", # Chi
|
||
"\uf044": "\u0394", # Delta
|
||
"\uf045": "\u0395", # Epsilon
|
||
"\uf046": "\u03a6", # Phi
|
||
"\uf047": "\u0393", # Gamma
|
||
"\uf048": "\u0397", # Eta
|
||
"\uf049": "\u0399", # Iota
|
||
"\uf04a": "\u03d1", # theta variant
|
||
"\uf04b": "\u039a", # Kappa
|
||
"\uf04c": "\u039b", # Lambda
|
||
"\uf04d": "\u039c", # Mu
|
||
"\uf04e": "\u039d", # Nu
|
||
"\uf04f": "\u039f", # Omicron
|
||
"\uf050": "\u03a0", # Pi
|
||
"\uf051": "\u0398", # Theta
|
||
"\uf052": "\u03a1", # Rho
|
||
"\uf053": "\u03a3", # Sigma
|
||
"\uf054": "\u03a4", # Tau
|
||
"\uf055": "\u03a5", # Upsilon
|
||
"\uf056": "\u03c2", # sigma final
|
||
"\uf057": "\u03a9", # Omega
|
||
"\uf058": "\u039e", # Xi
|
||
"\uf059": "\u03a8", # Psi
|
||
"\uf05a": "\u0396", # Zeta
|
||
"\uf05b": "[",
|
||
"\uf05c": "\u2234", # therefore
|
||
"\uf05d": "]",
|
||
"\uf05e": "\u22a5", # perpendicular
|
||
"\uf061": "\u03b1", # alpha
|
||
"\uf062": "\u03b2", # beta
|
||
"\uf063": "\u03c7", # chi
|
||
"\uf064": "\u03b4", # delta
|
||
"\uf065": "\u03b5", # epsilon
|
||
"\uf066": "\u03c6", # phi
|
||
"\uf067": "\u03b3", # gamma
|
||
"\uf068": "\u03b7", # eta
|
||
"\uf069": "\u03b9", # iota
|
||
"\uf06a": "\u03d5", # phi variant
|
||
"\uf06b": "\u03ba", # kappa
|
||
"\uf06c": "\u03bb", # lambda
|
||
"\uf06d": "\u03bc", # mu
|
||
"\uf06e": "\u03bd", # nu
|
||
"\uf06f": "\u03bf", # omicron
|
||
"\uf070": "\u03c0", # pi
|
||
"\uf071": "\u03b8", # theta
|
||
"\uf072": "\u03c1", # rho
|
||
"\uf073": "\u03c3", # sigma
|
||
"\uf074": "\u03c4", # tau
|
||
"\uf075": "\u03c5", # upsilon
|
||
"\uf076": "\u03d6", # pi symbol
|
||
"\uf077": "\u03c9", # omega
|
||
"\uf078": "\u03be", # xi
|
||
"\uf079": "\u03c8", # psi
|
||
"\uf07a": "\u03b6", # zeta
|
||
"\uf07b": "{",
|
||
"\uf07c": "|",
|
||
"\uf07d": "}",
|
||
"\uf07e": "~",
|
||
"\uf0b1": "\u00b1", # plus-minus
|
||
"\uf0b7": "\u2022", # bullet
|
||
"\uf0ba": "\u221a", # square root
|
||
"\uf0bc": "\u2264", # less or equal
|
||
"\uf0bd": "\u2265", # greater or equal
|
||
"\uf0be": "\u221d", # proportional
|
||
"\uf0d7": "\u00d7", # multiplication
|
||
"\uf0f7": "\u00f7", # division
|
||
"\uf0b4": "\u00d7", # alternate multiply
|
||
"\uf0bb": "\u2260", # not equal
|
||
"\uf0b9": "\u2260", # not equal alternate
|
||
"\uf0b3": "\u2265", # greater or equal alternate
|
||
"\uf0b2": "\u2032", # prime
|
||
"\uf02a": "*",
|
||
"\uf02c": ",",
|
||
"\uf0a3": "\u2264", # less or equal (Symbol 0xA3)
|
||
"\uf0a7": "\u2022", # bullet (Wingdings 0xA7)
|
||
"\uf0a8": "\u2022", # bullet variant
|
||
"\uf0ae": "\u2192", # right arrow (Symbol 0xAE)
|
||
"\uf0b8": "\u00f7", # division / range separator
|
||
"\uf0eb": "", # Wingdings decorative icon (rimosso)
|
||
"\uf0f0": "\u2192", # right arrow variant
|
||
"\uf0db": "", # bracket extension piece (non ricostruibile)
|
||
"\uf0dc": "", # bracket extension piece
|
||
"\uf0dd": "", # bracket extension piece
|
||
"\uf0de": "", # brace middle piece (non ricostruibile)
|
||
"\uf0df": "", # brace extension piece
|
||
}
|
||
|
||
_SYMBOL_PUA_RE = re.compile(
|
||
"[" + "".join(re.escape(k) for k in _SYMBOL_PUA_MAP) + "]"
|
||
)
|
||
|
||
|
||
def _t_fix_symbol_font(text: str) -> tuple[str, int]:
|
||
"""Rimappa caratteri PUA font Symbol (U+F020-U+F0FF) in simboli Unicode corretti."""
|
||
count = [0]
|
||
|
||
def _repl(m: re.Match) -> str:
|
||
count[0] += 1
|
||
return _SYMBOL_PUA_MAP[m.group(0)]
|
||
|
||
result = _SYMBOL_PUA_RE.sub(_repl, text)
|
||
return result, count[0]
|
||
|
||
|
||
def _t_remove_images(text: str) -> tuple[str, int]:
|
||
n = len(re.findall(r"!\[[^\]]*\]\([^)]*\)", text))
|
||
text = re.sub(r"!\[[^\]]*\]\([^)]*\)\s*", "", text)
|
||
return text, n
|
||
|
||
|
||
# Superscript Unicode: ¹²³⁴⁵⁶⁷⁸⁹⁰
|
||
_SUPERSCRIPT_RE = re.compile(r'[\u00b9\u00b2\u00b3\u2070\u2074-\u2079]+')
|
||
# Riga corpo-nota: inizia con superscript o [N]
|
||
_FOOTNOTE_BODY_RE = re.compile(
|
||
r'^([\u00b9\u00b2\u00b3\u2070\u2074-\u2079]+\s+|\[\d{1,3}\]\s+)'
|
||
)
|
||
|
||
|
||
def _t_remove_footnotes(text: str) -> tuple[str, int]:
|
||
"""Rimuovi marcatori footnote superscript inline e righe corpo-nota."""
|
||
lines = text.split("\n")
|
||
result, count = [], 0
|
||
for line in lines:
|
||
stripped = line.strip()
|
||
# Corpo nota: riga breve che inizia con ¹ o [N]
|
||
if stripped and _FOOTNOTE_BODY_RE.match(stripped) and len(stripped) < 300:
|
||
count += 1
|
||
continue
|
||
cleaned = _SUPERSCRIPT_RE.sub("", line)
|
||
if cleaned != line:
|
||
count += 1
|
||
result.append(cleaned)
|
||
return "\n".join(result), count
|
||
|
||
|
||
def _t_fix_br(text: str) -> tuple[str, int]:
|
||
n = len(re.findall(r"<br>", text, re.IGNORECASE))
|
||
text = re.sub(r"<br>\s*", " ", text, flags=re.IGNORECASE)
|
||
return text, n
|
||
|
||
|
||
def _t_fix_tabsep(text: str) -> tuple[str, int]:
|
||
_pat = re.compile(r"(?m)^\|\s*\|\s*$|^\|---\|?\s*$")
|
||
n = len(_pat.findall(text))
|
||
text = _pat.sub("", text)
|
||
return text, n
|
||
|
||
|
||
def _t_fix_accents(text: str) -> tuple[str, int]:
|
||
"""Fix artefatti backtick da PDF LaTeX: `e→è, e`→è, sar`a→sarà, ecc."""
|
||
_ACCENT_MAP = {
|
||
"e": "è", "E": "È", "a": "à", "A": "À",
|
||
"u": "ù", "U": "Ù", "i": "ì", "I": "Ì", "o": "ò", "O": "Ò",
|
||
}
|
||
n_bt_before = text.count("`")
|
||
text = re.sub(r"`([eEaAuUiIoO])", lambda m: _ACCENT_MAP[m.group(1)], text)
|
||
text = re.sub(r"([eEaAuUiIoO])`", lambda m: _ACCENT_MAP[m.group(1)], text)
|
||
n_accenti = n_bt_before - text.count("`")
|
||
# Backtick orfani: artefatti LaTeX rimasti dopo la correzione vocale
|
||
n_bt_orfani = text.count("`")
|
||
if n_bt_orfani:
|
||
text = re.sub(r"`", "", text)
|
||
n_accenti += n_bt_orfani
|
||
return text, n_accenti
|
||
|
||
|
||
def _t_fix_multiplication(text: str) -> tuple[str, int]:
|
||
"""Fix segno di moltiplicazione "→× (encoding font PDF non-standard)."""
|
||
n = len(re.findall(r'(?<=[0-9])"(?=[0-9(])', text))
|
||
text = re.sub(r'(?<=[0-9])"(?=[0-9(])', '×', text)
|
||
return text, n
|
||
|
||
|
||
def _t_fix_micro(text: str) -> tuple[str, int]:
|
||
"""Fix prefisso micro !→µ prima di unità SI note."""
|
||
_SI_UNITS_RE = r'[mAsgVWFHTKNJClΩ]'
|
||
n = len(re.findall(rf'\d\s*!(?={_SI_UNITS_RE})', text))
|
||
text = re.sub(rf'(\d)\s*!({_SI_UNITS_RE})', r'\1 µ\2', text)
|
||
return text, n
|
||
|
||
|
||
def _t_remove_formula_labels(text: str) -> tuple[str, int]:
|
||
"""Rimuovi label formule inline [N.M] — es. [3.4], [10.7]."""
|
||
n = len(re.findall(r"\[\d+\.\d+\]", text))
|
||
text = re.sub(r"\s*\[\d+\.\d+\]\s*", " ", text)
|
||
return text, n
|
||
|
||
|
||
def _t_remove_dotleaders(text: str) -> tuple[str, int]:
|
||
"""Rimuovi righe con dot-leader e numerali romani isolati (footer TOC)."""
|
||
_DOTLEADER_RE = r"^[^\n]*(?:(?:\. ){3,}|\.{4,})[^\n]*$"
|
||
n = len(re.findall(_DOTLEADER_RE, text, re.MULTILINE))
|
||
text = re.sub(_DOTLEADER_RE, "", text, flags=re.MULTILINE)
|
||
text = re.sub(
|
||
r"(?m)^(i{1,3}|iv|vi{0,3}|ix|xi{0,2}|x)$",
|
||
"",
|
||
text,
|
||
flags=re.IGNORECASE,
|
||
)
|
||
return text, n
|
||
|
||
|
||
def _t_fix_header_concat(text: str) -> tuple[str, int]:
|
||
"""Fix header + body concatenati senza separatore."""
|
||
count = 0
|
||
|
||
def _fix(m: re.Match) -> str:
|
||
nonlocal count
|
||
hashes = m.group(1)
|
||
full = m.group(2).strip()
|
||
if len(full) < 60:
|
||
return m.group(0)
|
||
skip = min(10, len(full) // 3)
|
||
split = re.search(r"(?<=[a-zàèéìíòóùúä])(?=[A-ZÀÈÉÌÍÒÓÙÚ])", full[skip:])
|
||
if split:
|
||
pos = skip + split.start()
|
||
title = full[:pos].strip()
|
||
body = full[pos:].strip()
|
||
if len(title) >= 5 and len(body) >= 15:
|
||
count += 1
|
||
return f"{hashes} {title}\n\n{body}"
|
||
return m.group(0)
|
||
|
||
text = re.sub(r"^(#{2,6})\s+(.{40,})$", _fix, text, flags=re.MULTILINE)
|
||
return text, count
|
||
|
||
|
||
def _t_extract_capitolo(text: str) -> tuple[str, int]:
|
||
"""Estrai 'Capitolo N: TITOLO' inline nel corpo del testo → ## header."""
|
||
def _repl(m: re.Match) -> str:
|
||
num = m.group(1)
|
||
titolo = _sentence_case(m.group(2).strip().rstrip("- ").strip())
|
||
return f"\n\n## Capitolo {num}: {titolo}\n\n"
|
||
|
||
text = re.sub(
|
||
r"\bCapitolo\s+(\d+)\s*[:\s]\s*([A-ZÀÈÉÌÍÒÓÙÚ\'L][A-ZÀÈÉÌÍÒÓÙÚ\s\'\.,\(\)]{5,80}?)"
|
||
r"(?=\s*[-–]\s*\d|\s*\n|\s*$)",
|
||
_repl,
|
||
text,
|
||
)
|
||
return text, 0
|
||
|
||
|
||
_NUMBERED_HDR_RE = re.compile(
|
||
r"^(#{1,6})\s+(\d+(?:\.\d+)*)\.\s+(.+)$",
|
||
re.MULTILINE,
|
||
)
|
||
|
||
|
||
def _t_normalize_numbered_headings(text: str) -> tuple[str, int]:
|
||
"""Corregge livelli header per documenti con numerazione decimale.
|
||
|
||
Assegna livello heading in base alla profondità numerica usando come base
|
||
il livello corrente degli header di profondità minima.
|
||
Attivo solo se il documento ha almeno 2 profondità di numerazione.
|
||
"""
|
||
all_matches = list(_NUMBERED_HDR_RE.finditer(text))
|
||
if not all_matches:
|
||
return text, 0
|
||
|
||
pairs = [
|
||
(m.group(2).count(".") + 1, len(m.group(1)))
|
||
for m in all_matches
|
||
]
|
||
depths = [d for d, _ in pairs]
|
||
min_depth, max_depth = min(depths), max(depths)
|
||
if max_depth == min_depth:
|
||
return text, 0
|
||
|
||
base_level = min(lv for d, lv in pairs if d == min_depth)
|
||
count = 0
|
||
|
||
def _repl(m: re.Match) -> str:
|
||
nonlocal count
|
||
hashes, num, title = m.group(1), m.group(2), m.group(3)
|
||
depth = num.count(".") + 1
|
||
new_level = min(base_level + (depth - min_depth), 6)
|
||
if new_level == len(hashes):
|
||
return m.group(0)
|
||
count += 1
|
||
return f"{'#' * new_level} {num}. {title}"
|
||
|
||
return _NUMBERED_HDR_RE.sub(_repl, text), count
|
||
|
||
|
||
def _t_normalize_header_levels(text: str) -> tuple[str, int]:
|
||
"""Normalizza h4+ → h3; rimuove header vuoti; rimuove numero pagina '| N' finale."""
|
||
text = re.sub(r"^#{3,6}\s*$", "", text, flags=re.MULTILINE)
|
||
text = re.sub(
|
||
r"^(#{3,6})\s+(\d{1,3})\s+(.+)$",
|
||
lambda m: f"### {m.group(2)}. {m.group(3)}",
|
||
text,
|
||
flags=re.MULTILINE,
|
||
)
|
||
text = re.sub(r"^#{4,6}\s+(.+)$", r"### \1", text, flags=re.MULTILINE)
|
||
return text, 0
|
||
|
||
|
||
def _t_extract_articles(text: str) -> tuple[str, int]:
|
||
"""Converti voci articolo '- Art. N.' → '### Art. N.'"""
|
||
return _extract_article_headers(text)
|
||
|
||
|
||
def _t_remove_header_bold(text: str) -> tuple[str, int]:
|
||
"""Rimuovi **bold** negli header esistenti."""
|
||
text = re.sub(
|
||
r"^(#{1,6})\s+\*\*(.+?)\*\*\s*$",
|
||
r"\1 \2",
|
||
text, flags=re.MULTILINE,
|
||
)
|
||
return text, 0
|
||
|
||
|
||
def _t_normalize_allcaps_headers(text: str) -> tuple[str, int]:
|
||
"""Normalizza header ALL-CAPS → sentence-case."""
|
||
def _norm(m: re.Match) -> str:
|
||
hashes, content = m.group(1), m.group(2).strip()
|
||
letters = [c for c in content if c.isalpha()]
|
||
if letters and all(c.isupper() for c in letters):
|
||
return f"{hashes} {_sentence_case(content)}"
|
||
return m.group(0)
|
||
|
||
text = re.sub(r"^(#{1,6}) (.+)$", _norm, text, flags=re.MULTILINE)
|
||
return text, 0
|
||
|
||
|
||
def _t_remove_toc(text: str) -> tuple[str, int]:
|
||
"""Rimuovi header TOC e voci lista numerate che seguono."""
|
||
lines = text.split("\n")
|
||
new_lines = []
|
||
_in_toc = False
|
||
removed = False
|
||
for line in lines:
|
||
bare = re.sub(r"^#+\s*", "", line.strip())
|
||
first_word = bare.split(".")[0].strip().lower()
|
||
if first_word in _TOC_KEYWORDS:
|
||
removed = True
|
||
_in_toc = True
|
||
continue
|
||
if _in_toc:
|
||
if re.match(r"^\s*$", line) or re.match(r"^\s*[-*+]\s+\d", line):
|
||
continue
|
||
# Voce TOC con numero pagina finale (sicuro: siamo gia in contesto TOC)
|
||
if re.match(r"^\s*[-*+]\s+.{2,70}\s+\d{1,3}\s*$", line):
|
||
continue
|
||
# Riga di testo lungo = probabilmente abstract o corpo, non voce di indice
|
||
if len(line.strip()) > 200:
|
||
_in_toc = False
|
||
new_lines.append(line)
|
||
continue
|
||
_in_toc = False
|
||
new_lines.append(line)
|
||
return "\n".join(new_lines), 1 if removed else 0
|
||
|
||
|
||
|
||
def _t_allcaps_to_headers(text: str) -> tuple[str, int]:
|
||
"""Converti righe ALL-CAPS standalone → ## header."""
|
||
count = 0
|
||
blocks = text.split("\n\n")
|
||
new_blocks = []
|
||
for block in blocks:
|
||
stripped = block.strip()
|
||
if "\n" not in stripped and _is_allcaps_line(stripped):
|
||
new_blocks.append(_allcaps_to_header(stripped))
|
||
count += 1
|
||
else:
|
||
sub_lines = block.split("\n")
|
||
converted = []
|
||
for ln in sub_lines:
|
||
if _is_allcaps_line(ln) and len(ln.strip()) > 3:
|
||
converted.append(_allcaps_to_header(ln))
|
||
count += 1
|
||
else:
|
||
converted.append(ln)
|
||
new_blocks.append("\n".join(converted))
|
||
return "\n\n".join(new_blocks), count
|
||
|
||
|
||
_BIB_MARKERS_RE = re.compile(
|
||
r'\b(pp?\.|vol\.|n\.\s*\d|ed\.|edn\.|ISBN|DOI|arXiv)\b'
|
||
r'|\b(19|20)\d{2}\b',
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
def _t_numbered_sections(text: str, has_exercises: bool = False) -> tuple[str, int]:
|
||
"""Converti sezioni numerate 'N. testo' / '- N. testo' / '- N testo' → ### header."""
|
||
count = 0
|
||
|
||
def _num_repl(m: re.Match) -> str:
|
||
nonlocal count
|
||
content = m.group(2).strip()
|
||
if content.endswith(".") and len(content) > 40:
|
||
return m.group(0)
|
||
if _BIB_MARKERS_RE.search(content):
|
||
return m.group(0)
|
||
count += 1
|
||
return f"### {m.group(1)}.\n\n{content}"
|
||
|
||
text = re.sub(r"^(\d+)\.\s+(.+)$", _num_repl, text, flags=re.MULTILINE)
|
||
|
||
def _num_letter_repl(m: re.Match) -> str:
|
||
nonlocal count
|
||
count += 1
|
||
return f"### {m.group(1)}{m.group(2)}.\n\n{m.group(3).strip()}"
|
||
|
||
text = re.sub(r"^(\d+)\s*([a-z])\.\s+(.+)$", _num_letter_repl, text, flags=re.MULTILINE)
|
||
|
||
# Disabilitato se il documento contiene sezioni "Esercizi": in quel caso i
|
||
# "- N. testo" sono numerazioni di esercizi, non header di sezione.
|
||
if not has_exercises:
|
||
def _aphorism_repl(m: re.Match) -> str:
|
||
nonlocal count
|
||
content = m.group(2).strip()
|
||
if _BIB_MARKERS_RE.search(content):
|
||
return m.group(0)
|
||
count += 1
|
||
return f"\n\n### {m.group(1)}.\n\n{content}"
|
||
|
||
text = re.sub(
|
||
r"^-\s+(\d{1,3})\.\s+(.{10,})$",
|
||
_aphorism_repl,
|
||
text,
|
||
flags=re.MULTILINE,
|
||
)
|
||
|
||
def _list_section_repl(m: re.Match) -> str:
|
||
nonlocal count
|
||
num = m.group(1)
|
||
content = m.group(2).strip()
|
||
if _BIB_MARKERS_RE.search(content):
|
||
return m.group(0)
|
||
count += 1
|
||
split = re.search(r"(?<=[a-zàèéìíòóùú])\s+(?=[A-ZÀÈÉÌÍÒÓÙÚ])", content)
|
||
if split and split.start() >= 3:
|
||
title = content[: split.start()].strip()
|
||
body = content[split.end():].strip()
|
||
if len(body) >= 20:
|
||
return f"\n\n### {num}. {title}\n\n{body}"
|
||
return f"\n\n### {num}. {content}"
|
||
|
||
text = re.sub(
|
||
r"^-\s+(\d{1,3})\s+([A-ZÀÈÉÌÍÒÓÙÚ\'L].{10,})$",
|
||
_list_section_repl,
|
||
text,
|
||
flags=re.MULTILINE,
|
||
)
|
||
return text, count
|
||
|
||
|
||
def _t_extract_math(text: str) -> tuple[str, int]:
|
||
"""Converti ambienti matematici (Teorema/Definizione/...) → ### header."""
|
||
return _extract_math_environments(text)
|
||
|
||
|
||
def _t_merge_paragraphs(text: str) -> tuple[str, int]:
|
||
"""Unisci paragrafi spezzati da salti pagina PDF."""
|
||
_SENTENCE_END = set(".?!»)\"'")
|
||
blocks = text.split("\n\n")
|
||
merged = []
|
||
count = 0
|
||
i = 0
|
||
while i < len(blocks):
|
||
b = blocks[i]
|
||
stripped = b.strip()
|
||
while (
|
||
i + 1 < len(blocks)
|
||
and stripped
|
||
and not stripped.startswith("#")
|
||
and not stripped.startswith("|") # non unire righe tabella in avanti
|
||
and stripped[-1] not in _SENTENCE_END
|
||
):
|
||
nxt = blocks[i + 1].strip()
|
||
if not nxt or nxt.startswith("#") or nxt.startswith("|") or re.match(r"^\d+\.", nxt) or re.match(r"^[-*+]\s", nxt):
|
||
break
|
||
b = stripped + " " + nxt
|
||
stripped = b.strip()
|
||
count += 1
|
||
i += 1
|
||
merged.append(b)
|
||
i += 1
|
||
text = "\n\n".join(merged)
|
||
# Secondo pass: rimuovi prefisso |---| eventualmente rimasto dopo il merge
|
||
text = re.sub(r"(?m)^\|---\|\s*", "", text)
|
||
return text, count
|
||
|
||
|
||
def _t_normalize_whitespace(text: str) -> tuple[str, int]:
|
||
"""Normalizza whitespace multiplo interno alle righe."""
|
||
lines = text.split("\n")
|
||
text = "\n".join(
|
||
re.sub(r" +", " ", line) if line.strip() else line
|
||
for line in lines
|
||
)
|
||
return text, 0
|
||
|
||
|
||
def _t_collapse_blank_lines(text: str) -> tuple[str, int]:
|
||
"""Riduci righe vuote multiple a doppie."""
|
||
return re.sub(r"\n{3,}", "\n\n", text), 0
|
||
|
||
|
||
def _t_demote_verse_headers(text: str) -> tuple[str, int]:
|
||
"""Demoti header che sono in realtà terzine/versi.
|
||
|
||
opendataloader promuove a ## le iscrizioni e i testi in evidenza nel PDF
|
||
(corpo maggiore, centrato). Si riconoscono perché:
|
||
- terminano con un numero nudo (numero di verso: 3, 6, 9, …)
|
||
- contengono punteggiatura interna di fine verso (', ' o '. ')
|
||
Esempio: '## «per me si va ne la città dolente, ... gente. 3'
|
||
→ paragrafo normale senza il numero finale.
|
||
"""
|
||
count = 0
|
||
|
||
def _demote(m: re.Match) -> str:
|
||
nonlocal count
|
||
hashes, content = m.group(1), m.group(2).strip()
|
||
# Deve terminare con numero nudo (numero di verso ≤ 9999)
|
||
if not re.search(r"\s\d{1,4}\s*$", content):
|
||
return m.group(0)
|
||
# Deve contenere punteggiatura interna (è un blocco di più versi)
|
||
inner = re.sub(r"\s\d{1,4}\s*$", "", content)
|
||
if not re.search(r"[,;:.!?»\"\']\s+[A-Za-zÀ-ÿ«\"]", inner):
|
||
return m.group(0)
|
||
count += 1
|
||
# Rimuovi il numero di verso finale e restituisci come testo normale
|
||
clean = re.sub(r"\s\d{1,4}\s*$", "", content)
|
||
return clean
|
||
|
||
text = re.sub(
|
||
r"^(#{1,6})\s+(.{20,})$",
|
||
_demote,
|
||
text,
|
||
flags=re.MULTILINE,
|
||
)
|
||
return text, count
|
||
|
||
|
||
def _t_restore_poetry_lines(text: str) -> tuple[str, int]:
|
||
"""Ripristina line break di poesia distrutti da keep_line_breaks=False.
|
||
|
||
Quando il PDF è poesia (terzine dantesche, sonetti, ecc.) opendataloader
|
||
con keep_line_breaks=False produce un unico paragrafo con i numeri di verso
|
||
(3, 6, 9 … oppure 1, 2, 3 …) incorporati inline:
|
||
'smarrita. 3 Ahi quanto a dir qual era è cosa dura … paura! 6 Tant'è …'
|
||
|
||
Il transform rileva blocchi con numeri di verso in progressione aritmetica
|
||
e li separa in righe, con riga vuota ogni 3 versi (terzina).
|
||
"""
|
||
count = 0
|
||
blocks = text.split("\n\n")
|
||
result = []
|
||
|
||
# Pattern: numero isolato preceduto da punteggiatura-fine-verso e seguito
|
||
# da lettera maiuscola (inizio verso successivo).
|
||
_VERSE_NUM_RE = re.compile(
|
||
r'([.!?»\'\"]\s+)(\d+)(\s+)(?=[A-ZÀ-Ùa-zà-ù«"‟])'
|
||
)
|
||
|
||
for block in blocks:
|
||
stripped = block.strip()
|
||
if not stripped or stripped.startswith("#"):
|
||
result.append(block)
|
||
continue
|
||
|
||
matches = list(_VERSE_NUM_RE.finditer(stripped))
|
||
if len(matches) < 2:
|
||
result.append(block)
|
||
continue
|
||
|
||
nums = [int(m.group(2)) for m in matches]
|
||
diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)]
|
||
# Accetta progressioni con passo costante 1–5 (terzine: 3, endecasillabi: 1)
|
||
if not diffs or len(set(diffs)) > 2 or not (1 <= diffs[0] <= 5):
|
||
result.append(block)
|
||
continue
|
||
|
||
step = diffs[0]
|
||
|
||
def _replace_verse_num(m: re.Match) -> str:
|
||
n = int(m.group(2))
|
||
# Ogni 'step' versi → riga vuota (inizio nuova terzina/strofa)
|
||
sep = "\n\n" if n % (step * 3) == 0 else "\n"
|
||
return m.group(1).rstrip() + sep
|
||
|
||
new_block = _VERSE_NUM_RE.sub(_replace_verse_num, stripped)
|
||
if new_block != stripped:
|
||
count += len(matches)
|
||
result.append(new_block)
|
||
|
||
return "\n\n".join(result), count
|
||
|
||
|
||
def _t_remove_urls(text: str) -> tuple[str, int]:
|
||
"""Rimuovi righe che sono solo URL (watermark, footer di piattaforme)."""
|
||
return re.sub(r"(?m)^(https?://|www\.)\S+\s*$", "", text), 0
|
||
|
||
|
||
def _t_remove_empty_headers(text: str) -> tuple[str, int]:
|
||
"""Rimuovi header senza corpo (sezioni vuote / watermark)."""
|
||
blocks = re.split(r"\n{2,}", text)
|
||
cleaned = []
|
||
for i, block in enumerate(blocks):
|
||
stripped = block.strip()
|
||
if re.match(r"^#{1,6} ", stripped) and "\n" not in stripped:
|
||
next_stripped = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
|
||
# Non rimuovere un header breve se il successivo è un header molto lungo
|
||
# (> 80 char): quasi certamente è testo PDF mal classificato come heading.
|
||
next_is_long_header = (
|
||
re.match(r"^#{1,6} ", next_stripped) and len(next_stripped) > 80
|
||
)
|
||
if not next_stripped or (
|
||
re.match(r"^#{1,6} ", next_stripped) and not next_is_long_header
|
||
):
|
||
continue
|
||
cleaned.append(block)
|
||
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), 0
|
||
|
||
|
||
def _t_merge_title_headers(text: str) -> tuple[str, int]:
|
||
"""Fondi header numerici isolati con il sottotitolo breve successivo."""
|
||
return _merge_title_headers(text)
|
||
|
||
|
||
def _t_remove_garbage_headers(text: str) -> tuple[str, int]:
|
||
"""Rimuovi garbage headers: simboli, abbreviazioni matematiche, frammenti formula."""
|
||
def _is_garbage_header(content: str) -> bool:
|
||
if content.lstrip().startswith("..."):
|
||
return True
|
||
if not re.search(r"[A-Za-zÀ-ÿ\u0391-\u03c9]{2,}", content):
|
||
return True
|
||
if re.fullmatch(r"\(?\s*[A-Za-z]{1,4}\s*\)?", content.strip()):
|
||
return True
|
||
if len(content) > 60 and re.search(r"[!%#]\w|\w[!%#]|\b\w+-\s*\w", content):
|
||
return True
|
||
# Frammento di frase: inizia con minuscola ed e abbastanza lungo
|
||
first_alpha = next((c for c in content if c.isalpha()), None)
|
||
if first_alpha and first_alpha.islower() and len(content) > 40:
|
||
return True
|
||
# Formula matematica: variabile singola (o breve) seguita da = o operatore
|
||
if re.match(r"^[A-Za-z\u0391-\u03c9_]{1,3}\s*[=<>≤≥]", content.strip()):
|
||
return True
|
||
# Didascalia figura/tabella: "Figura N..." o "Figure N..." o "Tabella N..."
|
||
if re.match(r"^(Figura|Figure|Fig\.|Tabella|Table|Tab\.)\s+\d", content.strip(), re.IGNORECASE):
|
||
return True
|
||
return False
|
||
|
||
count = 0
|
||
lines = text.split("\n")
|
||
new_lines = []
|
||
for line in lines:
|
||
m = re.match(r"^#{1,6} (.+)$", line)
|
||
if m and _is_garbage_header(m.group(1)):
|
||
count += 1
|
||
continue
|
||
new_lines.append(line)
|
||
text = "\n".join(new_lines)
|
||
text = re.sub(r"\n{3,}", "\n\n", text)
|
||
return text, count
|
||
|
||
|
||
def _t_remove_frontmatter(text: str) -> tuple[str, int]:
|
||
"""Rimuovi sezioni frontmatter: URL, email, affiliazione, copyright."""
|
||
_FM_RE = re.compile(
|
||
r"https?://|www\.|@[A-Za-z]|\bUniversit[àa]\b|\bDipartimento\b|"
|
||
r"\bCopyright\b|\bLicenza\b|\bEdizione\b|"
|
||
r"protetto da|tutti i diritti",
|
||
re.IGNORECASE,
|
||
)
|
||
blocks = re.split(r"\n{2,}", text)
|
||
cleaned = []
|
||
count = 0
|
||
total = len(blocks)
|
||
cutoff = max(5, min(15, int(total * 0.20)))
|
||
for i, block in enumerate(blocks):
|
||
stripped = block.strip()
|
||
# Frontmatter compare solo nelle prime sezioni del documento
|
||
if i >= cutoff:
|
||
cleaned.append(block)
|
||
continue
|
||
if not re.match(r"^### ", stripped) or re.match(r"^### \d", stripped):
|
||
cleaned.append(block)
|
||
continue
|
||
body = blocks[i + 1].strip() if i + 1 < len(blocks) else ""
|
||
is_fm_body = len(body) < 250 and _FM_RE.search(body)
|
||
is_fm_hdr = _FM_RE.search(stripped)
|
||
if is_fm_body or is_fm_hdr:
|
||
count += 1
|
||
continue
|
||
cleaned.append(block)
|
||
return re.sub(r"\n{3,}", "\n\n", "\n\n".join(cleaned)), count
|
||
|
||
|
||
_WATERMARK_RE = re.compile(
|
||
r"^(BOZZA|DRAFT|CONFIDENTIAL|RISERVATO|PROVVISORIO|SAMPLE|SPECIMEN"
|
||
r"|DO NOT DISTRIBUTE|NON DISTRIBUIRE|COPY|COPIA)\s*$",
|
||
re.IGNORECASE | re.MULTILINE,
|
||
)
|
||
|
||
|
||
def _t_remove_watermarks(text: str) -> tuple[str, int]:
|
||
"""Rimuovi righe standalone con testo watermark comune."""
|
||
lines = text.split("\n")
|
||
result, count = [], 0
|
||
for line in lines:
|
||
if _WATERMARK_RE.match(line):
|
||
count += 1
|
||
else:
|
||
result.append(line)
|
||
return "\n".join(result), count
|
||
|
||
|
||
def _t_fix_math_symbols(text: str) -> tuple[str, int]:
|
||
"""Rimuovi righe composte solo da simboli box/placeholder (font non estratti)."""
|
||
lines = text.split("\n")
|
||
result, count = [], 0
|
||
for line in lines:
|
||
if line.strip() and re.match(r"^[\s□■▪▫◆◇●○•\u25a0-\u25ff]+$", line):
|
||
count += 1
|
||
else:
|
||
result.append(line)
|
||
return "\n".join(result), count
|
||
|
||
|
||
def _t_remove_recurring_lines(text: str) -> tuple[str, int]:
|
||
"""Rimuovi righe corte che si ripetono ≥5 volte (header/footer di pagina)."""
|
||
lines = text.split("\n")
|
||
short_lines = [
|
||
ln.strip() for ln in lines
|
||
if 3 < len(ln.strip()) < 80
|
||
and not ln.strip().startswith("#")
|
||
and not ln.strip().startswith("|")
|
||
]
|
||
freq = Counter(short_lines)
|
||
recurring = {ln for ln, c in freq.items() if c >= 5}
|
||
if not recurring:
|
||
return text, 0
|
||
result, count = [], 0
|
||
for line in lines:
|
||
if line.strip() in recurring:
|
||
count += 1
|
||
else:
|
||
result.append(line)
|
||
return "\n".join(result), count
|
||
|
||
|
||
# ─── [3b] Pipeline delle trasformazioni ──────────────────────────────────────
|
||
|
||
def apply_transforms(text: str) -> tuple[str, dict]:
|
||
"""
|
||
Applica le trasformazioni strutturali al Markdown grezzo.
|
||
Restituisce (testo_modificato, statistiche).
|
||
"""
|
||
# Flag calcolato prima del loop: disabilita il transform 4b nei documenti
|
||
# con sezioni "Esercizi" (i "- N. testo" sarebbero numerazioni, non header).
|
||
_has_ex = bool(re.search(r"\b(Esercizi|Exercises|Problems|Homework)\b", text, re.IGNORECASE))
|
||
|
||
_transforms: list[tuple[str | None, object]] = [
|
||
("n_simboli_pua_corretti", _t_fix_symbol_font),
|
||
("n_immagini_rimosse", _t_remove_images),
|
||
("n_br_rimossi", _t_fix_br),
|
||
("n_tabsep_rimossi", _t_fix_tabsep),
|
||
("n_note_rimosse", _t_remove_footnotes),
|
||
("n_accenti_corretti", _t_fix_accents),
|
||
("n_moltiplicazioni_corrette", _t_fix_multiplication),
|
||
("n_micro_corretti", _t_fix_micro),
|
||
("n_simboli_math_rimossi", _t_fix_math_symbols),
|
||
("n_formule_rimossi", _t_remove_formula_labels),
|
||
("n_dotleader_rimossi", _t_remove_dotleaders),
|
||
("n_righe_ricorrenti_rimosse", _t_remove_recurring_lines),
|
||
("n_header_concat_fixati", _t_fix_header_concat),
|
||
(None, _t_extract_capitolo),
|
||
("n_header_numerati_normalizzati", _t_normalize_numbered_headings),
|
||
(None, _t_normalize_header_levels),
|
||
("n_articoli_estratti", _t_extract_articles),
|
||
(None, _t_remove_header_bold),
|
||
(None, _t_normalize_allcaps_headers),
|
||
("toc_rimosso", _t_remove_toc),
|
||
("n_header_allcaps", _t_allcaps_to_headers),
|
||
("n_sezioni_numerate", partial(_t_numbered_sections, has_exercises=_has_ex)),
|
||
("n_ambienti_matematici", _t_extract_math),
|
||
("n_paragrafi_uniti", _t_merge_paragraphs),
|
||
(None, _t_normalize_whitespace),
|
||
(None, _t_collapse_blank_lines),
|
||
("n_versi_ripristinati", _t_restore_poetry_lines),
|
||
("n_header_verso_demotati", _t_demote_verse_headers),
|
||
(None, _t_remove_urls),
|
||
(None, _t_remove_empty_headers),
|
||
("n_titoli_uniti", _t_merge_title_headers),
|
||
(None, lambda t: (re.sub(r"(?m)^(#{1,6}.+?)\s*\|\s*\d{1,3}\s*$", r"\1", t), 0)),
|
||
("n_garbage_headers_rimossi", _t_remove_garbage_headers),
|
||
("n_frontmatter_rimossi", _t_remove_frontmatter),
|
||
("n_watermark_rimossi", _t_remove_watermarks),
|
||
]
|
||
|
||
stats: dict = {}
|
||
for stat_key, fn in _transforms:
|
||
text, n = fn(text)
|
||
if stat_key:
|
||
stats[stat_key] = stats.get(stat_key, 0) + n
|
||
|
||
stats["toc_rimosso"] = bool(stats.get("toc_rimosso", 0))
|
||
return text, stats
|
||
|
||
|
||
# ─── [4] Rilevamento struttura ───────────────────────────────────────────────
|
||
|
||
_IT_WORDS = frozenset([
|
||
"il", "la", "di", "e", "che", "non", "per", "un", "una", "si",
|
||
"con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le",
|
||
"gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era",
|
||
"erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così",
|
||
])
|
||
_EN_WORDS = frozenset([
|
||
"the", "of", "and", "to", "in", "is", "that", "it", "was", "for",
|
||
"on", "are", "as", "with", "his", "they", "at", "be", "this", "have",
|
||
"from", "or", "an", "but", "not", "by", "he", "she", "we", "you",
|
||
"which", "their", "been", "has", "would", "there", "when", "will",
|
||
])
|
||
_FR_WORDS = frozenset([
|
||
"le", "les", "de", "du", "des", "et", "un", "une", "est", "que",
|
||
"pour", "dans", "sur", "avec", "qui", "par", "pas", "plus", "au",
|
||
"ce", "se", "ou", "mais", "comme", "aussi",
|
||
])
|
||
_DE_WORDS = frozenset([
|
||
"der", "die", "das", "und", "in", "von", "zu", "den", "mit", "ist",
|
||
"auf", "eine", "als", "dem", "des", "sich", "nicht", "auch", "werden",
|
||
"bei", "nach", "oder", "wenn", "wird", "war",
|
||
])
|
||
_ES_WORDS = frozenset([
|
||
"el", "los", "las", "de", "en", "un", "una", "es", "que", "por",
|
||
"con", "del", "para", "como", "pero", "sus", "son", "los", "hay",
|
||
"todo", "esta", "este", "ser", "más", "ya",
|
||
])
|
||
|
||
|
||
def _detect_language(text: str) -> str:
|
||
words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())
|
||
sample = words[:2000]
|
||
scores = {
|
||
"it": sum(1 for w in sample if w in _IT_WORDS),
|
||
"en": sum(1 for w in sample if w in _EN_WORDS),
|
||
"fr": sum(1 for w in sample if w in _FR_WORDS),
|
||
"de": sum(1 for w in sample if w in _DE_WORDS),
|
||
"es": sum(1 for w in sample if w in _ES_WORDS),
|
||
}
|
||
best = max(scores, key=scores.get)
|
||
return best if scores[best] > 0 else "unknown"
|
||
|
||
|
||
def _count_headers(text: str, level: int) -> int:
|
||
prefix = "#" * level + " "
|
||
return len(re.findall(rf"(?m)^{re.escape(prefix)}", text))
|
||
|
||
|
||
def _count_paragraphs(text: str) -> int:
|
||
blocks = re.split(r"\n{2,}", text)
|
||
return sum(1 for b in blocks if b.strip() and not re.match(r"^#+\s", b.strip()))
|
||
|
||
|
||
def _split_sections(text: str, level: int) -> list[str]:
|
||
prefix = "#" * level + " "
|
||
parts = re.split(rf"(?m)^{re.escape(prefix)}.+", text)
|
||
return [p for p in parts[1:] if p.strip()]
|
||
|
||
|
||
def _parse_sections_with_body(text: str, level: int = 3) -> list[tuple[str, str]]:
|
||
"""Restituisce lista di (header_line, body_text) per tutti gli header al livello dato."""
|
||
prefix = "#" * level + " "
|
||
lines = text.split("\n")
|
||
sections: list[tuple[str, str]] = []
|
||
cur_hdr: str | None = None
|
||
cur_body: list[str] = []
|
||
for line in lines:
|
||
if line.startswith(prefix):
|
||
if cur_hdr is not None:
|
||
sections.append((cur_hdr, "\n".join(cur_body).strip()))
|
||
cur_hdr = line
|
||
cur_body = []
|
||
elif cur_hdr is not None:
|
||
cur_body.append(line)
|
||
if cur_hdr is not None:
|
||
sections.append((cur_hdr, "\n".join(cur_body).strip()))
|
||
return sections
|
||
|
||
|
||
def analyze(md_path: Path) -> dict:
|
||
text = md_path.read_text(encoding="utf-8")
|
||
n_h1 = _count_headers(text, 1)
|
||
n_h2 = _count_headers(text, 2)
|
||
n_h3 = _count_headers(text, 3)
|
||
n_paragrafi = _count_paragraphs(text)
|
||
|
||
if n_h3 >= 5:
|
||
livello, boundary, strategia = 3, "h3", "h3_aware"
|
||
section_bodies = _split_sections(text, 3)
|
||
# Gerarchia invertita: h3 sono capitoli enormi, h2 sono sottosezioni più brevi.
|
||
# Succede quando opendataloader classifica titoli capitolo come h6 (→ normalizzati
|
||
# a h3) e le sottosezioni ALL-CAPS diventano ## (h2). In questo caso h2 è
|
||
# il boundary corretto per il chunking.
|
||
if n_h2 >= 3:
|
||
h2_bodies = _split_sections(text, 2)
|
||
avg_h3 = sum(len(b) for b in section_bodies) / len(section_bodies) if section_bodies else 0
|
||
avg_h2 = sum(len(b) for b in h2_bodies) / len(h2_bodies) if h2_bodies else 0
|
||
if avg_h3 > 5000 and avg_h2 < avg_h3 * 0.7:
|
||
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
|
||
section_bodies = h2_bodies
|
||
elif n_h2 >= 3:
|
||
livello, boundary, strategia = 2, "h2", "h2_paragraph_split"
|
||
section_bodies = _split_sections(text, 2)
|
||
elif n_h1 + n_h2 + n_h3 >= 1:
|
||
livello, boundary, strategia = 1, "paragrafo", "paragraph"
|
||
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
|
||
elif n_paragrafi >= 3:
|
||
livello, boundary, strategia = 1, "paragrafo", "paragraph"
|
||
section_bodies = [b for b in re.split(r"\n{2,}", text) if b.strip()]
|
||
else:
|
||
livello, boundary, strategia = 0, "nessuno", "sliding_window"
|
||
section_bodies = [text] if text.strip() else []
|
||
|
||
lengths = [len(b) for b in section_bodies if b.strip()]
|
||
lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0
|
||
lingua = _detect_language(text)
|
||
|
||
avvertenze = []
|
||
short = sum(1 for l in lengths if l < 200)
|
||
long_ = sum(1 for l in lengths if l > 800)
|
||
if short:
|
||
avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate")
|
||
if long_:
|
||
avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise")
|
||
|
||
return {
|
||
"livello_struttura": livello,
|
||
"n_h1": n_h1,
|
||
"n_h2": n_h2,
|
||
"n_h3": n_h3,
|
||
"n_paragrafi": n_paragrafi,
|
||
"boundary_primario": boundary,
|
||
"lingua_rilevata": lingua,
|
||
"lunghezza_media_sezione": lunghezza_media,
|
||
"strategia_chunking": strategia,
|
||
"avvertenze": avvertenze,
|
||
}
|
||
|
||
|
||
# ─── Report di conversione ───────────────────────────────────────────────────
|
||
|
||
def build_report(
|
||
stem: str,
|
||
out_dir: Path,
|
||
clean_text: str,
|
||
t_stats: dict,
|
||
profile: dict,
|
||
reduction: float,
|
||
) -> Path:
|
||
"""
|
||
Genera conversione/<stem>/report.json con tutte le metriche di qualità:
|
||
statistiche trasformazioni, struttura, distribuzione lunghezze, anomalie
|
||
e problemi residui. Leggibile da validate.py per la validazione batch.
|
||
"""
|
||
text_lines = clean_text.split("\n")
|
||
|
||
# ── Raccolta sezioni ### con corpo ────────────────────────────────────
|
||
sections = _parse_sections_with_body(clean_text, 3)
|
||
lengths = [len(body) for _, body in sections]
|
||
|
||
# ── Distribuzione lunghezze ───────────────────────────────────────────
|
||
def _pct(data: list[int], p: float) -> int:
|
||
if not data:
|
||
return 0
|
||
s = sorted(data)
|
||
return s[max(0, min(len(s) - 1, int(len(s) * p)))]
|
||
|
||
distribution = {
|
||
"min": min(lengths) if lengths else 0,
|
||
"p25": _pct(lengths, 0.25),
|
||
"mediana": _pct(lengths, 0.50),
|
||
"p75": _pct(lengths, 0.75),
|
||
"max": max(lengths) if lengths else 0,
|
||
}
|
||
|
||
# ── Anomalie ──────────────────────────────────────────────────────────
|
||
bare_hdrs = [
|
||
{"header": hdr, "corpo_inizio": body[:120].replace("\n", " ")}
|
||
for hdr, body in sections
|
||
if re.match(r"^### \d+\.\s*$", hdr) and len(body.strip()) < 30
|
||
]
|
||
|
||
short_secs = [
|
||
{"header": hdr, "chars": length, "testo": body[:80].replace("\n", " ")}
|
||
for (hdr, body), length in zip(sections, lengths)
|
||
if 0 < length < 150
|
||
]
|
||
|
||
long_secs = [
|
||
{"header": hdr, "chars": length}
|
||
for (hdr, _), length in zip(sections, lengths)
|
||
if length > 1500
|
||
]
|
||
|
||
# ── Problemi residui (max 10 esempi ciascuno) ─────────────────────────
|
||
def _scan(pattern: str, max_n: int = 10) -> list[dict]:
|
||
hits = []
|
||
for i, line in enumerate(text_lines):
|
||
if re.search(pattern, line) and not re.match(r"^#+ ", line):
|
||
hits.append({"riga": i + 1, "testo": line.strip()[:120]})
|
||
if len(hits) >= max_n:
|
||
break
|
||
return hits
|
||
|
||
residui = {
|
||
"backtick": _scan(r"`"),
|
||
"dotleader": _scan(r"(?:\. ){3,}"),
|
||
"url": _scan(r"^(https?://|www\.)\S+"),
|
||
"immagini": _scan(r"!\[[^\]]*\]\([^)]*\)"),
|
||
"br_inline": _scan(r"<br>"),
|
||
"simboli_encoding": _scan(r'(?<=[0-9A-Za-z])[!"](?=[0-9A-Za-z])'),
|
||
"formule_inline": _scan(r"\[\d+\.\d+\]"),
|
||
"footnote_markers": _scan(r'[\u00b9\u00b2\u00b3\u2070\u2074-\u2079]'),
|
||
"pua_markers": _scan(r'[\ue000-\uf8ff]'),
|
||
}
|
||
|
||
# ── Composizione report ───────────────────────────────────────────────
|
||
report = {
|
||
"stem": stem,
|
||
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
|
||
"transforms": {
|
||
**t_stats,
|
||
"riduzione_pct": round(reduction),
|
||
},
|
||
"structure": profile,
|
||
"distribution": distribution,
|
||
"anomalie": {
|
||
"bare_headers": len(bare_hdrs),
|
||
"short_sections": len(short_secs),
|
||
"long_sections": len(long_secs),
|
||
"bare_headers_list": bare_hdrs,
|
||
"short_sections_list": short_secs,
|
||
"long_sections_list": long_secs,
|
||
},
|
||
"residui": {
|
||
"backtick": len(residui["backtick"]),
|
||
"dotleader": len(residui["dotleader"]),
|
||
"url": len(residui["url"]),
|
||
"immagini": len(residui["immagini"]),
|
||
"br_inline": len(residui["br_inline"]),
|
||
"simboli_encoding": len(residui["simboli_encoding"]),
|
||
"formule_inline": len(residui["formule_inline"]),
|
||
"footnote_markers": len(residui["footnote_markers"]),
|
||
"pua_markers": len(residui["pua_markers"]),
|
||
"backtick_esempi": residui["backtick"],
|
||
"dotleader_esempi": residui["dotleader"],
|
||
"url_esempi": residui["url"],
|
||
"immagini_esempi": residui["immagini"],
|
||
"br_inline_esempi": residui["br_inline"],
|
||
"simboli_encoding_esempi": residui["simboli_encoding"],
|
||
"formule_inline_esempi": residui["formule_inline"],
|
||
"footnote_markers_esempi": residui["footnote_markers"],
|
||
"pua_markers_esempi": residui["pua_markers"],
|
||
},
|
||
}
|
||
|
||
report_path = out_dir / "report.json"
|
||
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
return report_path
|
||
|
||
|
||
# ─── Pipeline principale ──────────────────────────────────────────────────────
|
||
|
||
def run(stem: str, project_root: Path, force: bool) -> bool:
|
||
pdf_path = project_root / "sources" / f"{stem}.pdf"
|
||
out_dir = project_root / "conversione" / stem
|
||
raw_out = out_dir / "raw.md"
|
||
clean_out = out_dir / "clean.md"
|
||
|
||
print(f"\n{'─' * 52}")
|
||
print(f" {stem}")
|
||
print(f"{'─' * 52}")
|
||
|
||
if clean_out.exists() and not force:
|
||
print(f" ⚠️ conversione/{stem}/clean.md già presente — skip")
|
||
print(f" (usa --force per rieseguire)")
|
||
return True
|
||
|
||
# ── [1] Validazione ────────────────────────────────────────────────────
|
||
print(" [1/4] Validazione PDF...")
|
||
ok, msg = check_pdf(pdf_path)
|
||
if not ok:
|
||
print(f" ✗ {msg}")
|
||
return False
|
||
print(f" ✅ {msg}")
|
||
|
||
# ── [2] Conversione ────────────────────────────────────────────────────
|
||
print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...")
|
||
with tempfile.TemporaryDirectory() as tmp:
|
||
try:
|
||
md_file = convert_pdf(pdf_path, Path(tmp))
|
||
except MemoryError:
|
||
print(" ✗ Memoria esaurita durante la conversione")
|
||
return False
|
||
except Exception as e:
|
||
print(f" ✗ Conversione fallita: {e}")
|
||
return False
|
||
try:
|
||
raw_text = md_file.read_text(encoding="utf-8")
|
||
except UnicodeDecodeError as e:
|
||
print(f" ✗ Errore encoding nel file prodotto: {e}")
|
||
return False
|
||
|
||
size_kb = len(raw_text.encode()) // 1024
|
||
n_lines = raw_text.count("\n")
|
||
print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe")
|
||
|
||
# ── [3] Pulizia strutturale ────────────────────────────────────────────
|
||
print(" [3/4] Pulizia strutturale...")
|
||
clean_text, t_stats = apply_transforms(raw_text)
|
||
reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0
|
||
print(f" ✅ Simboli PUA corretti: {t_stats['n_simboli_pua_corretti']}")
|
||
print(f" Immagini rimosse: {t_stats['n_immagini_rimosse']}")
|
||
print(f" Note rimossa: {t_stats['n_note_rimosse']}")
|
||
print(f" Accenti corretti: {t_stats['n_accenti_corretti']}")
|
||
print(f" Dot-leader rimossi: {t_stats['n_dotleader_rimossi']}")
|
||
print(f" Header concat fixati: {t_stats['n_header_concat_fixati']}")
|
||
print(f" Header num. normaliz.: {t_stats['n_header_numerati_normalizzati']}")
|
||
print(f" Articoli → ###: {t_stats['n_articoli_estratti']}")
|
||
print(f" Ambienti matematici: {t_stats['n_ambienti_matematici']}")
|
||
print(f" Titoli header uniti: {t_stats['n_titoli_uniti']}")
|
||
print(f" TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'}")
|
||
print(f" Versi poesia riprist.: {t_stats['n_versi_ripristinati']}")
|
||
print(f" Header verso demotati: {t_stats['n_header_verso_demotati']}")
|
||
print(f" ALL-CAPS → ##: {t_stats['n_header_allcaps']}")
|
||
print(f" Sezioni → ###: {t_stats['n_sezioni_numerate']}")
|
||
print(f" Paragrafi uniti: {t_stats['n_paragrafi_uniti']}")
|
||
print(f" Riduzione testo: {reduction:.0f}%")
|
||
|
||
# ── [4] Profilo strutturale ────────────────────────────────────────────
|
||
print(" [4/4] Analisi struttura...")
|
||
try:
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
raw_out.write_text(raw_text, encoding="utf-8")
|
||
clean_out.write_text(clean_text, encoding="utf-8")
|
||
except PermissionError as e:
|
||
print(f" ✗ Permesso negato durante la scrittura: {e}")
|
||
return False
|
||
profile = analyze(clean_out)
|
||
|
||
_LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}
|
||
print(f" ✅ Struttura: livello {profile['livello_struttura']} — {_LIVELLO_DESC[profile['livello_struttura']]}")
|
||
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} "
|
||
f"paragrafi={profile['n_paragrafi']}")
|
||
print(f" Strategia chunking: {profile['strategia_chunking']}")
|
||
print(f" Lingua rilevata: {profile['lingua_rilevata']}")
|
||
for w in profile["avvertenze"]:
|
||
print(f" ⚠️ {w}")
|
||
|
||
build_report(stem, out_dir, clean_text, t_stats, profile, reduction)
|
||
|
||
print(f"\n Output:")
|
||
print(f" conversione/{stem}/raw.md (immutabile)")
|
||
print(f" conversione/{stem}/clean.md")
|
||
print(f" conversione/{stem}/report.json")
|
||
print(f"\n clean.md pronto per la suddivisione in chunk.")
|
||
return True
|
||
|
||
|
||
# ─── Entry point ─────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
project_root = Path(__file__).parent.parent
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="Pipeline PDF → clean Markdown strutturato, pronto per chunking",
|
||
epilog="Prerequisiti: pip install opendataloader-pdf + Java 11+ sul PATH",
|
||
)
|
||
parser.add_argument(
|
||
"--stem",
|
||
help="Nome del documento (PDF in sources/<stem>.pdf). "
|
||
"Se omesso, elabora tutti i PDF in sources/.",
|
||
)
|
||
parser.add_argument(
|
||
"--force",
|
||
action="store_true",
|
||
help="Riesegui anche se clean.md è già presente",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
_check_deps()
|
||
|
||
if args.stem:
|
||
stems = [args.stem]
|
||
else:
|
||
sources_dir = project_root / "sources"
|
||
if not sources_dir.exists():
|
||
print("Errore: cartella sources/ non trovata")
|
||
sys.exit(1)
|
||
stems = sorted(p.stem for p in sources_dir.glob("*.pdf"))
|
||
if not stems:
|
||
print("Errore: nessun PDF trovato in sources/")
|
||
sys.exit(1)
|
||
|
||
results = [run(s, project_root, args.force) for s in stems]
|
||
ok = sum(results)
|
||
total = len(results)
|
||
print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti convertiti")
|
||
sys.exit(0 if all(results) else 1)
|