diff --git a/step-0/check_pdf.py b/step-0/check_pdf.py deleted file mode 100644 index 2bee9b8..0000000 --- a/step-0/check_pdf.py +++ /dev/null @@ -1,229 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 0 — Verifica idoneità PDF - -Legge tutti i PDF in sources/ e salva un report per ognuno in step-0/. - -Uso: - python step-0/check_pdf.py - -Output: - step-0/_step0_report.txt -""" - -import sys -import statistics -from datetime import datetime -from pathlib import Path - - -def check_pdf(pdf_path: str, save: bool = True) -> None: - try: - import pdfplumber - except ImportError: - print("Errore: pdfplumber non è installato.") - print(" pip install pdfplumber") - sys.exit(1) - - path = Path(pdf_path) - if not path.exists(): - print(f"Errore: file non trovato — {pdf_path}") - sys.exit(1) - if path.suffix.lower() != ".pdf": - print(f"Errore: il file non è un PDF — {pdf_path}") - sys.exit(1) - - lines = [] # righe del report - results = [] # (etichetta, stato, messaggio) - - def out(text=""): - lines.append(text) - print(text) - - out(f"Step 0 — Verifica idoneità PDF") - out(f"File: {path.name}") - out(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M')}") - out("=" * 50) - - # ------------------------------------------------------------------ # - # Criterio 1 — Non protetto da password - # ------------------------------------------------------------------ # - try: - with pdfplumber.open(path) as pdf: - n_pages = len(pdf.pages) - results.append(("Non protetto da password", "PASS", f"{n_pages} pagine")) - except Exception as e: - msg = str(e).lower() - if "password" in msg or "encrypted" in msg or "decrypt" in msg: - results.append(("Non protetto da password", "FAIL", - "Il PDF è cifrato — non può essere elaborato")) - else: - results.append(("Non protetto da password", "FAIL", - f"Impossibile aprire il file: {e}")) - _render_results(results, out) - _maybe_save(lines, path, save) - return - - # ------------------------------------------------------------------ # - # Lettura pagine — una sola passata - # ------------------------------------------------------------------ # - char_counts = [] - line_lengths = [] - all_text = "" - empty_pages = 0 - - with pdfplumber.open(path) as pdf: - for page in pdf.pages: - text = page.extract_text() or "" - all_text += text + "\n" - chars = len(text.strip()) - char_counts.append(chars) - if chars == 0: - empty_pages += 1 - for line in text.splitlines(): - stripped = line.strip() - if stripped: - line_lengths.append(len(stripped)) - - total_pages = len(char_counts) - pages_with_text = sum(1 for c in char_counts if c > 50) - text_coverage = pages_with_text / total_pages if total_pages > 0 else 0 - - # ------------------------------------------------------------------ # - # Criterio 2 — Testo estraibile - # ------------------------------------------------------------------ # - if text_coverage >= 0.7: - results.append(("Testo estraibile", "PASS", - f"{pages_with_text}/{total_pages} pagine con testo ({text_coverage:.0%})")) - elif text_coverage >= 0.4: - results.append(("Testo estraibile", "WARN", - f"Solo {pages_with_text}/{total_pages} pagine con testo — revisione estesa necessaria")) - else: - results.append(("Testo estraibile", "FAIL", - f"Solo {pages_with_text}/{total_pages} pagine con testo — probabilmente scansionato")) - - # ------------------------------------------------------------------ # - # Criterio 3 — Generato digitalmente (non scansionato) - # ------------------------------------------------------------------ # - pages_text_only = [c for c in char_counts if c > 0] - avg_chars = statistics.mean(pages_text_only) if pages_text_only else 0 - - if avg_chars >= 300: - results.append(("Generato digitalmente (non scansionato)", "PASS", - f"Media {avg_chars:.0f} char/pagina")) - elif avg_chars >= 100: - results.append(("Generato digitalmente (non scansionato)", "WARN", - f"Media bassa: {avg_chars:.0f} char/pagina — alcune pagine potrebbero essere immagini")) - else: - results.append(("Generato digitalmente (non scansionato)", "FAIL", - f"Media molto bassa: {avg_chars:.0f} char/pagina — il PDF sembra scansionato")) - - # ------------------------------------------------------------------ # - # Criterio 4 — Pagine vuote - # ------------------------------------------------------------------ # - if empty_pages == 0: - results.append(("Pagine vuote", "PASS", "Nessuna pagina vuota")) - elif empty_pages <= total_pages * 0.05: - results.append(("Pagine vuote", "WARN", - f"{empty_pages} pagine vuote (≤ 5%) — probabilmente copertine o separatori")) - else: - results.append(("Pagine vuote", "WARN", - f"{empty_pages} pagine vuote ({empty_pages/total_pages:.0%}) — controllare")) - - # ------------------------------------------------------------------ # - # Criterio desiderabile — Layout a colonne singola - # ------------------------------------------------------------------ # - if line_lengths: - median_len = statistics.median(line_lengths) - short_lines = sum(1 for l in line_lengths if l < median_len * 0.4) - short_ratio = short_lines / len(line_lengths) - if short_ratio < 0.15: - results.append(("Layout a colonne singola (desiderabile)", "PASS", - f"Righe corte: {short_ratio:.0%} — struttura lineare")) - elif short_ratio < 0.35: - results.append(("Layout a colonne singola (desiderabile)", "WARN", - f"Righe corte: {short_ratio:.0%} — possibile layout a colonne parziale")) - else: - results.append(("Layout a colonne singola (desiderabile)", "WARN", - f"Righe corte: {short_ratio:.0%} — probabile layout a colonne multiple")) - else: - results.append(("Layout a colonne singola (desiderabile)", "WARN", - "Impossibile analizzare (nessuna riga estratta)")) - - # ------------------------------------------------------------------ # - # Criterio desiderabile — Struttura logica (titoli) - # ------------------------------------------------------------------ # - candidate_headings = [ - line.strip() for line in all_text.splitlines() - if 3 <= len(line.strip()) <= 80 - and line.strip()[0].isupper() - and not line.strip().endswith(".") - and not line.strip().endswith(",") - and len(line.strip().split()) <= 10 - ] - heading_density = len(candidate_headings) / total_pages if total_pages > 0 else 0 - - if heading_density >= 1.0: - results.append(("Struttura logica riconoscibile (desiderabile)", "PASS", - f"~{len(candidate_headings)} possibili titoli rilevati ({heading_density:.1f}/pagina)")) - elif heading_density >= 0.3: - results.append(("Struttura logica riconoscibile (desiderabile)", "WARN", - f"~{len(candidate_headings)} possibili titoli ({heading_density:.1f}/pagina) — struttura parziale")) - else: - results.append(("Struttura logica riconoscibile (desiderabile)", "WARN", - "Pochi titoli rilevati — testo narrativo o struttura non standard")) - - _render_results(results, out) - _maybe_save(lines, path, save) - - -def _render_results(results: list, out) -> None: - icons = {"PASS": "✅", "WARN": "⚠️ ", "FAIL": "❌"} - out() - for label, status, message in results: - icon = icons.get(status, " ") - out(f" {icon} {label}") - out(f" {message}") - out() - - fails = [r for r in results if r[1] == "FAIL"] - warns = [r for r in results if r[1] == "WARN"] - - if fails: - out("ESITO: ❌ PDF NON IDONEO") - out(" Criteri obbligatori non soddisfatti — scegli un PDF diverso.") - elif warns: - out("ESITO: ⚠️ PDF ACCETTABILE CON CAUTELA") - out(" Procedi, ma aspettati più lavoro nella revisione manuale (step 4).") - else: - out("ESITO: ✅ PDF IDONEO") - out(" Tutti i criteri soddisfatti — procedi con lo step 1.") - out() - - -def _maybe_save(lines: list, pdf_path: Path, save: bool) -> None: - if not save: - return - script_dir = Path(__file__).parent - out_file = script_dir / f"{pdf_path.stem}_step0_report.txt" - out_file.write_text("\n".join(lines), encoding="utf-8") - print(f"Report salvato in: {out_file}") - - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - sources_dir = project_root / "sources" - - if not sources_dir.exists(): - print(f"Errore: cartella sources/ non trovata in {project_root}") - sys.exit(1) - - pdfs = sorted(sources_dir.glob("*.pdf")) - if not pdfs: - print(f"Errore: nessun PDF trovato in {sources_dir}") - sys.exit(1) - - for pdf in pdfs: - check_pdf(str(pdf), save=True) - if len(pdfs) > 1: - print("-" * 50) diff --git a/step-1/inspect_pdf.py b/step-1/inspect_pdf.py deleted file mode 100644 index 0c2bfdd..0000000 --- a/step-1/inspect_pdf.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 1 — Ispezione automatica PDF - -Analizza il PDF pagina per pagina e produce un report con score (0–100) -e lista dei problemi per pagina. Serve per capire la qualità del documento -e mappare i problemi prima della revisione manuale (step 4). - -Uso: - python step1/inspect.py - -Output: - step1/_step1_report.txt -""" - -import re -import sys -import statistics -from collections import Counter -from datetime import datetime -from pathlib import Path - - -# ── Penalità per il calcolo dello score ─────────────────────────────────── -SYLLABIF_PENALTY = 0.3 # per occorrenza di sillabazione -COLUMN_PENALTY = 3.0 # per pagina con layout a colonne -UNICODE_PENALTY = 1.5 # per pagina con caratteri anomali -EMPTY_PENALTY = 1.0 # per pagina vuota -HEADER_FOOTER_PEN = 5.0 # fisso se intestazioni/piè ripetitivi rilevati - - -def inspect_pdf(pdf_path: str, save: bool = True) -> None: - try: - import pdfplumber - except ImportError: - print("Errore: pdfplumber non è installato.") - print(" pip install pdfplumber") - sys.exit(1) - - path = Path(pdf_path) - if not path.exists(): - print(f"Errore: file non trovato — {pdf_path}") - sys.exit(1) - - lines = [] - - def out(text=""): - lines.append(text) - print(text) - - out("Step 1 — Ispezione automatica PDF") - out(f"File: {path.name}") - out(f"Data: {datetime.now().strftime('%Y-%m-%d %H:%M')}") - out("=" * 50) - - # ── Lettura pagine ───────────────────────────────────────────────────── - with pdfplumber.open(path) as pdf: - n_pages = len(pdf.pages) - pages_text = [page.extract_text() or "" for page in pdf.pages] - - # ── Analisi per pagina ───────────────────────────────────────────────── - issues = [] # (page_num, descrizione) — page_num=0 → problema globale - deductions = 0.0 - - first_lines = [] # prima riga significativa di ogni pagina (per header) - last_lines = [] # ultima riga significativa di ogni pagina (per footer) - - for i, text in enumerate(pages_text): - page_num = i + 1 - stripped = text.strip() - - # 1. Pagina vuota - if len(stripped) < 50: - issues.append((page_num, "pagina vuota")) - deductions += EMPTY_PENALTY - continue - - page_lines = text.splitlines() - nonempty = [l.strip() for l in page_lines if l.strip()] - - # Raccogli prima/ultima riga per il controllo header/footer - if nonempty: - first_lines.append(nonempty[0]) - last_lines.append(nonempty[-1]) - - # 2. Sillabazione a fine riga (es. "estra-" + a capo) - syllabif = sum( - 1 for line in page_lines - if re.search(r'\b\w{2,}-$', line.rstrip()) - ) - if syllabif: - label = "occorrenza" if syllabif == 1 else "occorrenze" - issues.append((page_num, f"sillabazione rilevata ({syllabif} {label})")) - deductions += syllabif * SYLLABIF_PENALTY - - # 3. Layout a colonne (righe molto corte e numerose) - if len(nonempty) >= 10: - median_len = statistics.median(len(l) for l in nonempty) - short_ratio = sum(1 for l in nonempty if len(l) < median_len * 0.4) / len(nonempty) - if short_ratio > 0.35: - issues.append((page_num, f"possibile layout a colonne ({short_ratio:.0%} righe corte)")) - deductions += COLUMN_PENALTY - - # 4. Caratteri Unicode anomali - # (control chars esclusi \n \t \r, replacement char, PUA block) - anomalies = re.findall( - r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f\ufffd\ue000-\uf8ff]', text - ) - if anomalies: - issues.append((page_num, f"caratteri Unicode anomali ({len(anomalies)} trovati)")) - deductions += UNICODE_PENALTY - - # ── Intestazioni e piè di pagina ripetitivi ──────────────────────────── - def _check_repetition(line_list: list, label: str) -> None: - nonlocal deductions - if not line_list: - return - threshold = max(3, len(line_list) * 0.25) - repeated = [ - (txt, cnt) for txt, cnt in Counter(line_list).items() - if cnt >= threshold and len(txt) > 3 - ] - if repeated: - deductions += HEADER_FOOTER_PEN - for txt, cnt in repeated[:3]: - issues.append((0, f"{label} ripetitivo: \"{txt[:45]}\" ({cnt} volte)")) - - _check_repetition(first_lines, "intestazione") - _check_repetition(last_lines, "piè di pagina") - - # ── Score ────────────────────────────────────────────────────────────── - score = max(0, round(100 - deductions)) - - # ── Riepilogo ────────────────────────────────────────────────────────── - pages_with_issues = len({p for p, _ in issues if p > 0}) - out() - out(f"Score: {score}/100") - out(f"Pagine totali: {n_pages}") - out(f"Pagine con problemi: {pages_with_issues}") - out() - - if issues: - global_issues = [(p, d) for p, d in issues if p == 0] - page_issues = sorted([(p, d) for p, d in issues if p > 0]) - for _, desc in global_issues: - out(f" ⚠️ {desc}") - for page_num, desc in page_issues: - out(f" Pagina {page_num:>4}: {desc}") - else: - out(" Nessun problema rilevato.") - - out() - - # ── Prossimi passi ───────────────────────────────────────────────────── - out("PROSSIMI PASSI:") - if score >= 70: - out(" → conversione con marker funzionerà bene") - elif score >= 40: - out(" → conversione possibile, attendi più errori nella revisione") - else: - out(" → qualità bassa — valuta una fonte PDF migliore") - - attention_pages = sorted({p for p, _ in issues if p > 0}) - if attention_pages: - sample = ", ".join(str(p) for p in attention_pages[:10]) - if len(attention_pages) > 10: - sample += f" … e altre {len(attention_pages) - 10}" - out(f" → attenzione alle pagine {sample} nella revisione manuale") - out() - - _maybe_save(lines, path, save) - - -def _maybe_save(lines: list, pdf_path: Path, save: bool) -> None: - if not save: - return - script_dir = Path(__file__).parent - out_file = script_dir / f"{pdf_path.stem}_step1_report.txt" - out_file.write_text("\n".join(lines), encoding="utf-8") - print(f"Report salvato in: {out_file}") - - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - sources_dir = project_root / "sources" - - if not sources_dir.exists(): - print(f"Errore: cartella sources/ non trovata in {project_root}") - sys.exit(1) - - pdfs = sorted(sources_dir.glob("*.pdf")) - if not pdfs: - print(f"Errore: nessun PDF trovato in {sources_dir}") - sys.exit(1) - - for pdf in pdfs: - inspect_pdf(str(pdf), save=True) - if len(pdfs) > 1: - print("-" * 50) diff --git a/step-2/convert_pdf.py b/step-2/convert_pdf.py deleted file mode 100644 index efc6376..0000000 --- a/step-2/convert_pdf.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 2 — Conversione PDF → Markdown grezzo - -Usa pymupdf4llm (PyMuPDF puro C, zero modelli ML, ~30-50 MB RAM) -per convertire ogni PDF in sources/ e organizza l'output in: - step-2//raw.md — MD grezzo, non modificare mai - step-2//clean.md — copia di lavoro per lo step 4 - -Uso: - python step-2/convert_pdf.py # tutti i PDF in sources/ - python step-2/convert_pdf.py --pdf sources/doc.pdf # un solo PDF -""" - -import argparse -import shutil -import sys -from pathlib import Path - -import pymupdf4llm - - -def convert_pdf(pdf_path: Path, project_root: Path) -> bool: - stem = pdf_path.stem - out_dir = project_root / "step-2" / stem - raw_md = out_dir / "raw.md" - clean_md = out_dir / "clean.md" - - print(f"\nConversione: {pdf_path.name}") - print(f" Output: step-2/{stem}/") - - if raw_md.exists(): - print(f" ⚠️ raw.md già presente — skip") - print(f" (elimina {raw_md} per riconvertire)") - return True - - out_dir.mkdir(parents=True, exist_ok=True) - - print(f" Conversione in corso...") - md_text = pymupdf4llm.to_markdown(str(pdf_path)) - - raw_md.write_text(md_text, encoding="utf-8") - shutil.copy2(raw_md, clean_md) - - size_kb = raw_md.stat().st_size // 1024 - print(f" ✅ raw.md salvato ({size_kb} KB)") - print(f" ✅ clean.md creato (copia di lavoro per step 4)") - return True - - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Step 2 — Conversione PDF → Markdown") - parser.add_argument("--pdf", help="Percorso di un singolo PDF da convertire") - args = parser.parse_args() - - if args.pdf: - pdf_path = Path(args.pdf) - if not pdf_path.exists(): - print(f"Errore: file non trovato — {args.pdf}") - sys.exit(1) - pdfs = [pdf_path] - else: - sources_dir = project_root / "sources" - if not sources_dir.exists(): - print(f"Errore: cartella sources/ non trovata in {project_root}") - sys.exit(1) - pdfs = sorted(sources_dir.glob("*.pdf")) - if not pdfs: - print(f"Errore: nessun PDF trovato in {sources_dir}") - sys.exit(1) - - results = [convert_pdf(p, project_root) for p in pdfs] - - ok_count = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok_count}/{total} PDF convertiti") - - sys.exit(0 if all(results) else 1) diff --git a/step-3/detect_structure.py b/step-3/detect_structure.py deleted file mode 100644 index e3a426b..0000000 --- a/step-3/detect_structure.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 3 — Rilevamento struttura Markdown - -Analizza il Markdown grezzo prodotto dallo step 2 senza modificarlo. -Copia i file da step-2// e produce structure_profile.json che -guida la revisione manuale (step 4) e il chunker adattivo (step 5). - -Output in step-3//: - raw.md — copia da step-2 (non modificare mai) - clean.md — copia da step-2 (da revisionare nello step 4) - structure_profile.json — profilo strutturale - -Uso: - python step-3/detect_structure.py # tutti i documenti in step-2/ - python step-3/detect_structure.py --stem nietzsche # un solo documento - python step-3/detect_structure.py --force # riesegui anche se già presente -""" - -import argparse -import json -import re -import shutil -import sys -from pathlib import Path - - -# ─── Language detection ─────────────────────────────────────────────────────── - -_IT_WORDS = frozenset([ - "il", "la", "di", "e", "che", "non", "per", "un", "una", "si", - "con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le", - "gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era", - "erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così", -]) - -_EN_WORDS = frozenset([ - "the", "of", "and", "to", "in", "is", "that", "it", "was", "for", - "on", "are", "as", "with", "his", "they", "at", "be", "this", "have", - "from", "or", "an", "but", "not", "by", "he", "she", "we", "you", - "which", "their", "been", "has", "would", "there", "when", "will", -]) - - -def detect_language(text: str) -> str: - words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower()) - sample = words[:2000] - it = sum(1 for w in sample if w in _IT_WORDS) - en = sum(1 for w in sample if w in _EN_WORDS) - if it == 0 and en == 0: - return "unknown" - return "it" if it >= en else "en" - - -# ─── Markdown parsing ───────────────────────────────────────────────────────── - -def split_sections(text: str, header_level: int) -> list[str]: - """ - Split text on headers of the given level (1=h1, 2=h2, 3=h3). - Returns list of body texts for each matching section. - """ - prefix = "#" * header_level + " " - parts = re.split(rf'(?m)^{re.escape(prefix)}.+', text) - # parts[0] is preamble, rest are section bodies - return [p for p in parts[1:] if p.strip()] - - -def count_headers(text: str, level: int) -> int: - prefix = "#" * level + " " - return len(re.findall(rf'(?m)^{re.escape(prefix)}', text)) - - -def count_paragraphs(text: str) -> int: - """Count non-empty, non-header paragraph blocks.""" - blocks = re.split(r'\n{2,}', text) - return sum(1 for b in blocks if b.strip() and not re.match(r'^#+\s', b.strip())) - - -# ─── Core analysis ──────────────────────────────────────────────────────────── - -def analyze(raw_md_path: Path) -> dict: - text = raw_md_path.read_text(encoding="utf-8") - - n_h1 = count_headers(text, 1) - n_h2 = count_headers(text, 2) - n_h3 = count_headers(text, 3) - n_paragrafi = count_paragraphs(text) - - # Determine structural level and primary boundary - if n_h3 >= 5: - livello = 3 - boundary = "h3" - strategia = "h3_aware" - section_bodies = split_sections(text, 3) - elif n_h2 >= 3: - livello = 2 - boundary = "h2" - strategia = "h2_paragraph_split" - section_bodies = split_sections(text, 2) - elif n_h1 + n_h2 + n_h3 >= 1: - livello = 1 - boundary = "paragrafo" - strategia = "paragraph" - section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()] - else: - if n_paragrafi >= 3: - livello = 1 - boundary = "paragrafo" - strategia = "paragraph" - section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()] - else: - livello = 0 - boundary = "nessuno" - strategia = "sliding_window" - section_bodies = [text] if text.strip() else [] - - lengths = [len(b) for b in section_bodies if b.strip()] - lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0 - - lingua = detect_language(text) - - avvertenze = [] - short = sum(1 for l in lengths if l < 200) - long_ = sum(1 for l in lengths if l > 800) - if short: - avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate") - if long_: - avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise") - - return { - "livello_struttura": livello, - "n_h1": n_h1, - "n_h2": n_h2, - "n_h3": n_h3, - "n_paragrafi": n_paragrafi, - "boundary_primario": boundary, - "lingua_rilevata": lingua, - "lunghezza_media_sezione": lunghezza_media, - "strategia_chunking": strategia, - "avvertenze": avvertenze, - } - - -# ─── Per-document processing ───────────────────────────────────────────────── - -def process_stem(stem: str, project_root: Path, force: bool) -> bool: - src_dir = project_root / "step-2" / stem - out_dir = project_root / "step-3" / stem - raw_src = src_dir / "raw.md" - clean_src = src_dir / "clean.md" - profile_out = out_dir / "structure_profile.json" - - print(f"\nDocumento: {stem}") - - if not raw_src.exists(): - print(f" ✗ raw.md non trovato in step-2/{stem}/ — skip") - return False - - if profile_out.exists() and not force: - print(f" ⚠️ structure_profile.json già presente — skip") - print(f" (usa --force per rieseguire)") - return True - - out_dir.mkdir(parents=True, exist_ok=True) - - # Copy files from step-2 - shutil.copy2(raw_src, out_dir / "raw.md") - if clean_src.exists(): - shutil.copy2(clean_src, out_dir / "clean.md") - print(f" Copiati raw.md e clean.md da step-2/{stem}/") - - # Analyze - print(f" Analisi struttura in corso...") - profile = analyze(out_dir / "raw.md") - - profile_out.write_text(json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8") - - # Report - _LIVELLO_DESC = { - 3: "struttura ricca (###)", - 2: "struttura parziale (##)", - 1: "solo paragrafi", - 0: "testo piatto", - } - print(f" ✅ Livello {profile['livello_struttura']} — {_LIVELLO_DESC[profile['livello_struttura']]}") - print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} paragrafi={profile['n_paragrafi']}") - print(f" Boundary: {profile['boundary_primario']} | Strategia: {profile['strategia_chunking']}") - print(f" Lingua: {profile['lingua_rilevata']} | Lunghezza media sezione: {profile['lunghezza_media_sezione']} char") - for w in profile["avvertenze"]: - print(f" ⚠️ {w}") - print(f" ✅ structure_profile.json salvato") - return True - - -# ─── Entry point ───────────────────────────────────────────────────────────── - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Step 3 — Rilevamento struttura Markdown") - parser.add_argument("--stem", help="Nome del documento (sottocartella di step-2/)") - parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") - args = parser.parse_args() - - if args.stem: - stems = [args.stem] - else: - step2_dir = project_root / "step-2" - if not step2_dir.exists(): - print(f"Errore: cartella step-2/ non trovata in {project_root}") - sys.exit(1) - stems = sorted(p.name for p in step2_dir.iterdir() if p.is_dir()) - if not stems: - print(f"Errore: nessun documento trovato in step-2/") - sys.exit(1) - - results = [process_stem(s, project_root, args.force) for s in stems] - - ok = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti analizzati") - - sys.exit(0 if all(results) else 1) diff --git a/step-4/revise.py b/step-4/revise.py deleted file mode 100644 index cf703a2..0000000 --- a/step-4/revise.py +++ /dev/null @@ -1,433 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 4 — Revisione automatica del Markdown - -Trasforma clean.md da step-3 rivelando la struttura latente del documento. -Le trasformazioni sono euristiche universali che funzionano su qualsiasi PDF: - - - Normalizza whitespace multiplo (artefatto PDF) - - Riduce righe vuote multiple - - Rimuove marcatori **bold** nelle intestazioni esistenti - - Converte righe ALL-CAPS standalone → ## header (euristico, qualsiasi lingua) - - Converte sezioni numerate "N. testo" → ### N. (qualsiasi numerazione) - - Rimuove blocchi TOC (righe che iniziano con parole-chiave indice) - -Per ogni documento viene ricalcolato il profilo strutturale: il livello può -salire (es. livello 1 → 3) se le strutture latenti vengono rilevate. - -Output in step-4//: - raw.md — copia da step-3 (non modificare mai) - clean.md — MD revisionato - structure_profile.json — profilo aggiornato dopo la revisione - -Uso: - python step-4/revise.py # tutti i documenti in step-3/ - python step-4/revise.py --stem nietzsche # un solo documento - python step-4/revise.py --force # riesegui anche se già presente -""" - -import argparse -import json -import re -import shutil -import sys -from datetime import date -from pathlib import Path - -# Riusa la funzione analyze() già scritta nello step 3 -sys.path.insert(0, str(Path(__file__).parent.parent / "step-3")) -from detect_structure import analyze # noqa: E402 - - -# ─── Costanti ───────────────────────────────────────────────────────────────── - -# Parole-chiave che identificano blocchi TOC (da rimuovere) -_TOC_KEYWORDS = frozenset([ - "indice", "index", "contents", "table of contents", - "sommario", "inhaltsverzeichnis", "inhalt", -]) - -# Preposizioni/articoli da non capitalizzare nel title-case -_STOP_IT_EN = frozenset([ - # italiano - "di", "del", "della", "dei", "delle", "da", "in", "e", "il", "la", - "lo", "le", "gli", "un", "una", "per", "a", "al", "alla", "ai", - "alle", "con", "su", "sul", "sulla", "che", "o", - # inglese - "of", "the", "a", "an", "and", "or", "but", "in", "on", "at", - "to", "for", "with", "by", "from", "as", -]) - -# Ordinali italiani → romani (per titoli come "CAPITOLO PRIMO") -_ORDINALS_IT = { - "PRIMO": "I", "SECONDO": "II", "TERZO": "III", "QUARTO": "IV", - "QUINTO": "V", "SESTO": "VI", "SETTIMO": "VII", "OTTAVO": "VIII", - "NONO": "IX", "DECIMO": "X", -} - -# Ordinali inglesi → arabici (per "CHAPTER ONE") -_ORDINALS_EN = { - "ONE": "1", "TWO": "2", "THREE": "3", "FOUR": "4", "FIVE": "5", - "SIX": "6", "SEVEN": "7", "EIGHT": "8", "NINE": "9", "TEN": "10", -} - - -# ─── Utilità ────────────────────────────────────────────────────────────────── - -def _sentence_case(s: str) -> str: - """ - Sentence-case: prima lettera maiuscola, resto minuscolo. - Corretto per l'italiano e accettabile per l'inglese accademico. - """ - if not s: - return s - lower = s.lower() - return lower[0].upper() + lower[1:] - - -def _is_allcaps_line(line: str) -> bool: - """ - True se la riga è una candidata per conversione a ## header. - Criterio: tutti i caratteri alfabetici sono maiuscoli, lunghezza >= 3. - """ - stripped = line.strip() - letters = [c for c in stripped if c.isalpha()] - return ( - len(letters) >= 3 - and all(c.isupper() for c in letters) - and not stripped.startswith("#") - ) - - -def _allcaps_to_header(raw_line: str) -> str: - """ - Converte una riga ALL-CAPS in un ## header title-case. - Riconosce pattern specifici (CAPITOLO ORDINE, CHAPTER N) come bonus, - ma funziona in modalità generica su qualsiasi testo. - """ - text = raw_line.strip().rstrip('.').rstrip('?').strip() - - # ── Pattern italiano: "CAPITOLO PRIMO. TITOLO DEL CAPITOLO" - _ORD_IT_PAT = "|".join(_ORDINALS_IT.keys()) - m = re.match(rf'^CAPITOLO ({_ORD_IT_PAT})\. (.+)', text) - if m: - roman = _ORDINALS_IT[m.group(1)] - titolo = m.group(2).rstrip('.').rstrip('?').strip() - return f"## Capitolo {roman} — {_sentence_case(titolo)}" - - # ── Pattern inglese: "CHAPTER ONE. TITLE" o "CHAPTER 1. TITLE" - _ORD_EN_PAT = "|".join(_ORDINALS_EN.keys()) - m = re.match(rf'^CHAPTER ({_ORD_EN_PAT}|\d+)\.? (.+)', text) - if m: - n = _ORDINALS_EN.get(m.group(1), m.group(1)) - titolo = m.group(2).rstrip('.').rstrip('?').strip() - return f"## Chapter {n} — {_sentence_case(titolo)}" - - # ── Pattern generico con numerazione romana o arabica nel prefisso - m = re.match(r'^([IVXLCDM]+|[0-9]+)\. (.+)', text) - if m: - n = m.group(1) - titolo = m.group(2).rstrip('.').strip() - return f"## {n}. {_sentence_case(titolo)}" - - # ── Caso generico: tutto maiuscolo senza pattern riconoscibile - return f"## {_sentence_case(text)}" - - -def _is_toc_line(line: str) -> bool: - """True se la riga è l'intestazione di un blocco indice/TOC.""" - first_word = line.strip().split('.')[0].strip().lower() - return first_word in _TOC_KEYWORDS - - -# ─── Trasformazioni ──────────────────────────────────────────────────────────── - -def apply_transforms(text: str) -> tuple[str, dict]: - """ - Applica tutte le trasformazioni strutturali al testo MD. - Restituisce (testo_modificato, statistiche). - """ - stats = { - "toc_rimosso": False, - "n_header_allcaps": 0, - "n_sezioni_numerate": 0, - "n_paragrafi_uniti": 0, - } - - # ── 1. Rimuovi marcatori **bold** nelle intestazioni esistenti - # ## **Titolo** → ## Titolo - text = re.sub( - r'^(#{1,6})\s+\*\*(.+?)\*\*\s*$', - r'\1 \2', - text, flags=re.MULTILINE, - ) - - # ── 1b. Normalizza header esistenti con contenuto ALL-CAPS → sentence-case - # ## AL DI LA' DEL BENE E DEL MALE → ## Al di la' del bene e del male - def _norm_allcaps_header(m: re.Match) -> str: - hashes = m.group(1) - content = m.group(2).strip() - letters = [c for c in content if c.isalpha()] - if letters and all(c.isupper() for c in letters): - return f"{hashes} {_sentence_case(content)}" - return m.group(0) - - text = re.sub( - r'^(#{1,6}) (.+)$', - _norm_allcaps_header, - text, flags=re.MULTILINE, - ) - - # ── 2. Rimuovi blocco TOC (riga indice + contenuto inline sulla stessa riga) - # "INDICE. Capitolo 1 Capitolo 2 ..." → rimossa - lines = text.split('\n') - new_lines = [] - for line in lines: - if _is_toc_line(line): - stats["toc_rimosso"] = True - else: - new_lines.append(line) - text = '\n'.join(new_lines) - - # ── 3. Converti righe ALL-CAPS standalone → ## header - # Una riga è "standalone" se è preceduta/seguita da riga vuota - # oppure si trova all'inizio/fine del documento. - blocks = text.split('\n\n') - new_blocks = [] - for block in blocks: - stripped = block.strip() - # Blocco standalone = un'unica riga (nessun \n interno rilevante) - if '\n' not in stripped and _is_allcaps_line(stripped): - new_blocks.append(_allcaps_to_header(stripped)) - stats["n_header_allcaps"] += 1 - else: - # Controlla riga per riga per righe ALL-CAPS seguite da altri contenuti - sub_lines = block.split('\n') - converted = [] - for ln in sub_lines: - if _is_allcaps_line(ln) and len(ln.strip()) > 3: - converted.append(_allcaps_to_header(ln)) - stats["n_header_allcaps"] += 1 - else: - converted.append(ln) - new_blocks.append('\n'.join(converted)) - text = '\n\n'.join(new_blocks) - - # ── 4. Converti sezioni numerate "N. testo" → "### N.\n\ntesto" - # Riconosce: "1. Testo", "42. Testo" (due o più spazi dopo il punto) - def _num_repl(m: re.Match) -> str: - num = m.group(1) - testo = m.group(2).strip() - stats["n_sezioni_numerate"] += 1 - return f"### {num}.\n\n{testo}" - - # Pattern standard: "1. testo" o "1. testo" - text = re.sub( - r'^(\d+)\.\s+(.+)$', - _num_repl, - text, flags=re.MULTILINE, - ) - - # Pattern con lettera-suffisso: "65 a. testo" o "65a. testo" - def _num_letter_repl(m: re.Match) -> str: - num = m.group(1) + m.group(2) - testo = m.group(3).strip() - stats["n_sezioni_numerate"] += 1 - return f"### {num}.\n\n{testo}" - - text = re.sub( - r'^(\d+)\s*([a-z])\.\s+(.+)$', - _num_letter_repl, - text, flags=re.MULTILINE, - ) - - # ── 5. Unisci paragrafi spezzati da salti pagina PDF - # Criterio: blocco A non finisce con punteggiatura di fine frase, - # blocco B non inizia con maiuscola "di sezione" né è un header. - # Unione sicura: mai attraverso confini ###/##. - _SENTENCE_END = set('.?!»)\'"') - blocks = text.split('\n\n') - merged = [] - i = 0 - while i < len(blocks): - b = blocks[i] - stripped = b.strip() - # Prova a unire con il successivo se la frase è spezzata - while ( - i + 1 < len(blocks) - and stripped - and not stripped.startswith('#') - and stripped[-1] not in _SENTENCE_END - ): - nxt = blocks[i + 1].strip() - # Non unire se il successivo è un header o è vuoto - if not nxt or nxt.startswith('#'): - break - # Non unire se il successivo inizia con una cifra seguita da punto - # (sarebbe l'inizio di un nuovo aforisma non ancora convertito) - if re.match(r'^\d+\.', nxt): - break - b = stripped + ' ' + nxt - stripped = b.strip() - stats["n_paragrafi_uniti"] += 1 - i += 1 - merged.append(b) - i += 1 - text = '\n\n'.join(merged) - - # ── 6. Normalizza whitespace multiplo interno alle righe - # "parola parola" → "parola parola" (inclusi gli header) - lines = text.split('\n') - normalized = [] - for line in lines: - if not line.strip(): - normalized.append(line) - else: - normalized.append(re.sub(r' +', ' ', line)) - text = '\n'.join(normalized) - - # ── 7. Riduci righe vuote multiple a doppie - text = re.sub(r'\n{3,}', '\n\n', text) - - return text, stats - - -# ─── Aggiornamento revision log ──────────────────────────────────────────────── - -def update_revision_log( - log_path: Path, - stem: str, - profile_before: dict, - profile_after: dict, - t_stats: dict, -) -> None: - header_exists = log_path.exists() and log_path.stat().st_size > 0 - - avv = profile_after.get("avvertenze", []) - avv_str = "; ".join(avv) if avv else "nessuna" - - entry = f""" -## {stem} — {date.today().isoformat()} - -**Trasformazioni automatiche:** -- Normalizzazione whitespace multiplo e righe vuote -- Blocco TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'} -- Righe ALL-CAPS → ## header: {t_stats['n_header_allcaps']} -- Sezioni numerate → ### header: {t_stats['n_sezioni_numerate']} -- Paragrafi uniti (salti pagina PDF): {t_stats['n_paragrafi_uniti']} -- Livello struttura: {profile_before.get('livello_struttura', '?')} → {profile_after.get('livello_struttura', '?')} - -**Avvertenze residue:** {avv_str} - -**Revisioni manuali pendenti:** -- [ ] Verificare conversioni ALL-CAPS errate -- [ ] Controllare sezioni troppo corte o troppo lunghe -""" - - if not header_exists: - log_path.write_text("# Revision log\n" + entry, encoding="utf-8") - else: - existing = log_path.read_text(encoding="utf-8") - log_path.write_text(existing + entry, encoding="utf-8") - - -# ─── Per-document processing ───────────────────────────────────────────────── - -def process_stem(stem: str, project_root: Path, force: bool) -> bool: - src_dir = project_root / "step-3" / stem - out_dir = project_root / "step-4" / stem - raw_src = src_dir / "raw.md" - clean_src = src_dir / "clean.md" - profile_src = src_dir / "structure_profile.json" - clean_out = out_dir / "clean.md" - profile_out = out_dir / "structure_profile.json" - - print(f"\nDocumento: {stem}") - - if not clean_src.exists(): - print(f" ✗ clean.md non trovato in step-3/{stem}/ — skip") - return False - - if clean_out.exists() and not force: - print(f" ⚠️ clean.md già presente — skip") - print(f" (usa --force per rieseguire)") - return True - - out_dir.mkdir(parents=True, exist_ok=True) - - # Copia raw.md immutabile (riferimento) - if raw_src.exists(): - shutil.copy2(raw_src, out_dir / "raw.md") - print(f" Copiato raw.md da step-3/{stem}/") - - # Leggi profilo step-3 (per confronto nel report) - profile_before: dict = {} - if profile_src.exists(): - profile_before = json.loads(profile_src.read_text(encoding="utf-8")) - - # Applica trasformazioni - print(f" Applicazione trasformazioni strutturali...") - text = clean_src.read_text(encoding="utf-8") - text_revised, t_stats = apply_transforms(text) - - # Salva clean.md revisionato - clean_out.write_text(text_revised, encoding="utf-8") - - # Ricalcola profilo sul nuovo clean.md - profile_after = analyze(clean_out) - profile_out.write_text( - json.dumps(profile_after, ensure_ascii=False, indent=2), - encoding="utf-8", - ) - - # Report - lv_b = profile_before.get("livello_struttura", "?") - lv_a = profile_after["livello_struttura"] - _STRAT = {3: "h3_aware", 2: "h2_paragraph_split", 1: "paragraph", 0: "sliding_window"} - print(f" ✅ Livello struttura: {lv_b} → {lv_a} ({_STRAT.get(lv_a, '?')})") - print(f" h2: {profile_before.get('n_h2','?')} → {profile_after['n_h2']}") - print(f" h3: {profile_before.get('n_h3','?')} → {profile_after['n_h3']}") - print(f" TOC rimosso: {'sì' if t_stats['toc_rimosso'] else 'no'}") - print(f" Righe ALL-CAPS → ##: {t_stats['n_header_allcaps']}") - print(f" Sezioni numerate → ###: {t_stats['n_sezioni_numerate']}") - print(f" Paragrafi uniti (salti pagina): {t_stats['n_paragrafi_uniti']}") - for w in profile_after["avvertenze"]: - print(f" ⚠️ {w}") - - # Aggiorna revision log (direttamente in step-4/, non in sottocartella) - log_path = project_root / "step-4" / "revision_log.md" - update_revision_log(log_path, stem, profile_before, profile_after, t_stats) - print(f" ✅ step-4/revision_log.md aggiornato") - print(f" ✅ structure_profile.json salvato") - return True - - -# ─── Entry point ───────────────────────────────────────────────────────────── - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Step 4 — Revisione automatica Markdown") - parser.add_argument("--stem", help="Nome del documento (sottocartella di step-3/)") - parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") - args = parser.parse_args() - - if args.stem: - stems = [args.stem] - else: - step3_dir = project_root / "step-3" - if not step3_dir.exists(): - print(f"Errore: cartella step-3/ non trovata in {project_root}") - sys.exit(1) - stems = sorted(p.name for p in step3_dir.iterdir() if p.is_dir()) - if not stems: - print(f"Errore: nessun documento trovato in step-3/") - sys.exit(1) - - results = [process_stem(s, project_root, args.force) for s in stems] - - ok = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti revisionati") - - sys.exit(0 if all(results) else 1)