2c0b7a462e
- extract.py: aggiunge extract_metadata() — title, author, year, pages via fitz - extract.py: aggiunge markdown_page_separator con <!-- page: N --> tra pagine - extract.py: aggiunge replace_invalid_chars=" " per testo più pulito - runner.py: prepend YAML frontmatter (source/title/author/year/pages) al clean.md - runner.py: mostra title e author rilevati durante validazione Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
194 lines
7.5 KiB
Python
194 lines
7.5 KiB
Python
import json
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from .extract import validate_pdf, convert_pdf, extract_metadata
|
|
from ._apply import apply_transforms
|
|
from .structure import analyze
|
|
from .report import build_report
|
|
from .validator import _score, _grade
|
|
|
|
|
|
_LIVELLO_DESC = {3: "ricca (h3)", 2: "parziale (h2)", 1: "paragrafi", 0: "testo piatto"}
|
|
|
|
|
|
def _build_frontmatter(meta: dict) -> str:
|
|
lines = ["---", f"source: {meta['source']}"]
|
|
if meta["title"]:
|
|
lines.append(f'title: "{meta["title"]}"')
|
|
if meta["author"]:
|
|
lines.append(f'author: "{meta["author"]}"')
|
|
if meta["year"]:
|
|
lines.append(f"year: {meta['year']}")
|
|
if meta["pages"]:
|
|
lines.append(f"pages: {meta['pages']}")
|
|
lines += ["---", ""]
|
|
return "\n".join(lines) + "\n"
|
|
|
|
_SPIN_FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
|
|
|
|
|
|
class _Spinner:
|
|
"""Spinner animato in un thread separato — mostra frame + tempo trascorso."""
|
|
|
|
def __init__(self, prefix: str):
|
|
self._prefix = prefix
|
|
self._stop = threading.Event()
|
|
self._thread = threading.Thread(target=self._run, daemon=True)
|
|
self._t0 = 0.0
|
|
|
|
def __enter__(self):
|
|
self._t0 = time.perf_counter()
|
|
self._thread.start()
|
|
return self
|
|
|
|
def __exit__(self, *_):
|
|
self._stop.set()
|
|
self._thread.join()
|
|
sys.stdout.write("\r" + " " * 72 + "\r")
|
|
sys.stdout.flush()
|
|
|
|
def _run(self):
|
|
i = 0
|
|
while not self._stop.wait(0.1):
|
|
elapsed = time.perf_counter() - self._t0
|
|
frame = _SPIN_FRAMES[i % len(_SPIN_FRAMES)]
|
|
sys.stdout.write(f"\r {frame} {self._prefix} {elapsed:.0f}s")
|
|
sys.stdout.flush()
|
|
i += 1
|
|
|
|
|
|
def run(stem: str, project_root: Path, force: bool) -> bool:
|
|
pdf_path = project_root / "sources" / f"{stem}.pdf"
|
|
out_dir = project_root / "conversione" / stem
|
|
raw_out = out_dir / "raw.md"
|
|
clean_out = out_dir / "clean.md"
|
|
|
|
print(f"\n{'─' * 52}")
|
|
print(f" {stem}")
|
|
print(f"{'─' * 52}")
|
|
|
|
if clean_out.exists() and not force:
|
|
print(f" ⚠️ conversione/{stem}/clean.md già presente — skip")
|
|
print(f" (usa --force per rieseguire)")
|
|
return True
|
|
|
|
# [1] Validazione + metadati
|
|
print(" [1/4] Validazione PDF...")
|
|
pdf_mb = pdf_path.stat().st_size / (1024 * 1024) if pdf_path.exists() else 0
|
|
print(f" File: {pdf_path.name} ({pdf_mb:.1f} MB)")
|
|
ok, msg = validate_pdf(pdf_path)
|
|
if not ok:
|
|
print(f" ✗ {msg}")
|
|
return False
|
|
print(f" ✅ {msg}")
|
|
meta = extract_metadata(pdf_path)
|
|
if meta["title"]:
|
|
print(f" Titolo: {meta['title']}")
|
|
if meta["author"]:
|
|
print(f" Autore: {meta['author']}")
|
|
|
|
# [2] Conversione
|
|
print(" [2/4] Conversione PDF → Markdown (opendataloader-pdf)...")
|
|
with _Spinner("opendataloader-pdf in esecuzione...") as spinner:
|
|
t0 = time.perf_counter()
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
try:
|
|
md_file = convert_pdf(pdf_path, Path(tmp))
|
|
except MemoryError:
|
|
print(" ✗ Memoria esaurita durante la conversione")
|
|
return False
|
|
except Exception as e:
|
|
print(f" ✗ Conversione fallita: {e}")
|
|
return False
|
|
try:
|
|
raw_text = md_file.read_text(encoding="utf-8")
|
|
except UnicodeDecodeError as e:
|
|
print(f" ✗ Errore encoding nel file prodotto: {e}")
|
|
return False
|
|
elapsed = time.perf_counter() - t0
|
|
|
|
size_kb = len(raw_text.encode()) // 1024
|
|
n_lines = raw_text.count("\n")
|
|
print(f" ✅ Markdown grezzo: {size_kb} KB, {n_lines} righe ({elapsed:.1f}s)")
|
|
|
|
# [3] Pulizia strutturale
|
|
print(" [3/4] Pulizia strutturale...")
|
|
|
|
def _on_step(i: int, total: int, label: str) -> None:
|
|
sys.stdout.write(f"\r [{i}/{total}] {label:<45}")
|
|
sys.stdout.flush()
|
|
|
|
clean_text, t = apply_transforms(raw_text, on_step=_on_step)
|
|
sys.stdout.write("\r" + " " * 72 + "\r")
|
|
sys.stdout.flush()
|
|
clean_text = _build_frontmatter(meta) + clean_text
|
|
reduction = 100 * (1 - len(clean_text) / len(raw_text)) if raw_text else 0
|
|
print(f" ✅ Encoding")
|
|
print(f" Simboli PUA corretti: {t['n_simboli_pua_corretti']}")
|
|
print(f" Accenti corretti: {t['n_accenti_corretti']}")
|
|
print(f" Artefatti")
|
|
print(f" Immagini rimosse: {t['n_immagini_rimosse']}")
|
|
print(f" <br> rimossi: {t['n_br_rimossi']}")
|
|
print(f" Note rimosse: {t['n_note_rimosse']}")
|
|
print(f" Dot-leader rimossi: {t['n_dotleader_rimossi']}")
|
|
print(f" Righe ricorrenti rim.: {t['n_righe_ricorrenti_rimosse']}")
|
|
print(f" URL rimossi: {t['n_url_rimossi']}")
|
|
print(f" Watermark rimossi: {t['n_watermark_rimossi']}")
|
|
print(f" Header")
|
|
print(f" Header concat fixati: {t['n_header_concat_fixati']}")
|
|
print(f" Header num. normaliz.: {t['n_header_numerati_normalizzati']}")
|
|
print(f" Struttura")
|
|
print(f" TOC rimosso: {'sì' if t['toc_rimosso'] else 'no'}")
|
|
print(f" TOC orfani rimossi: {t['n_toc_orfani_rimossi']}")
|
|
print(f" ALL-CAPS → ##: {t['n_header_allcaps']}")
|
|
print(f" Sezioni → ###: {t['n_sezioni_numerate']}")
|
|
print(f" Ambienti matematici: {t['n_ambienti_matematici']}")
|
|
print(f" Articoli → ###: {t['n_articoli_estratti']}")
|
|
print(f" Testo")
|
|
print(f" Paragrafi uniti: {t['n_paragrafi_uniti']}")
|
|
print(f" Versi poesia riprist.: {t['n_versi_ripristinati']}")
|
|
print(f" Header verso demotati: {t['n_header_verso_demotati']}")
|
|
print(f" Rifinitura")
|
|
print(f" Garbage header rim.: {t['n_garbage_headers_rimossi']}")
|
|
print(f" Titoli header uniti: {t['n_titoli_uniti']}")
|
|
print(f" Formula-hdr demotati: {t['n_formula_headers_demotati']}")
|
|
print(f" Frontmatter rimossi: {t['n_frontmatter_rimossi']}")
|
|
print(f" Riduzione testo: {reduction:.0f}%")
|
|
|
|
# [4] Profilo strutturale
|
|
print(" [4/4] Analisi struttura...")
|
|
try:
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
raw_out.write_text(raw_text, encoding="utf-8")
|
|
clean_out.write_text(clean_text, encoding="utf-8")
|
|
except PermissionError as e:
|
|
print(f" ✗ Permesso negato durante la scrittura: {e}")
|
|
return False
|
|
|
|
profile = analyze(clean_out)
|
|
(out_dir / "structure_profile.json").write_text(
|
|
json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8"
|
|
)
|
|
|
|
print(f" ✅ Struttura: livello {profile['livello_struttura']} — "
|
|
f"{_LIVELLO_DESC[profile['livello_struttura']]}")
|
|
print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} "
|
|
f"paragrafi={profile['n_paragrafi']}")
|
|
print(f" Strategia chunking: {profile['strategia_chunking']}")
|
|
print(f" Lingua rilevata: {profile['lingua_rilevata']}")
|
|
for w in profile["avvertenze"]:
|
|
print(f" ⚠️ {w}")
|
|
|
|
report_path = build_report(stem, out_dir, clean_text, t, profile, reduction)
|
|
report_data = json.loads(report_path.read_text(encoding="utf-8"))
|
|
score, _ = _score(report_data)
|
|
|
|
print(f"\n Output → conversione/{stem}/")
|
|
print(f" raw.md (immutabile) clean.md report.json")
|
|
print(f" Punteggio qualità: {score}/100 {_grade(score)}")
|
|
return True
|