From ee25adc0a6f94f89baac93448719c8a68183ac48 Mon Sep 17 00:00:00 2001 From: Davide Grilli Date: Mon, 13 Apr 2026 10:16:42 +0200 Subject: [PATCH] step-3: add detect_structure.py (structure profile, no ML deps) --- .gitignore | 3 + step-3/detect_structure.py | 223 +++++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 step-3/detect_structure.py diff --git a/.gitignore b/.gitignore index 87aef95..73ee299 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,6 @@ step-1/*_step1_report.txt # Output step-2 — MD grezzo generato da marker step-2/*/ +# Output step-3 — profilo struttura generato da detect_structure.py +step-3/*/ + diff --git a/step-3/detect_structure.py b/step-3/detect_structure.py new file mode 100644 index 0000000..e3a426b --- /dev/null +++ b/step-3/detect_structure.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +""" +Step 3 — Rilevamento struttura Markdown + +Analizza il Markdown grezzo prodotto dallo step 2 senza modificarlo. +Copia i file da step-2// e produce structure_profile.json che +guida la revisione manuale (step 4) e il chunker adattivo (step 5). + +Output in step-3//: + raw.md — copia da step-2 (non modificare mai) + clean.md — copia da step-2 (da revisionare nello step 4) + structure_profile.json — profilo strutturale + +Uso: + python step-3/detect_structure.py # tutti i documenti in step-2/ + python step-3/detect_structure.py --stem nietzsche # un solo documento + python step-3/detect_structure.py --force # riesegui anche se già presente +""" + +import argparse +import json +import re +import shutil +import sys +from pathlib import Path + + +# ─── Language detection ─────────────────────────────────────────────────────── + +_IT_WORDS = frozenset([ + "il", "la", "di", "e", "che", "non", "per", "un", "una", "si", + "con", "da", "del", "della", "dei", "in", "ma", "se", "lo", "le", + "gli", "al", "alla", "ai", "alle", "sono", "ha", "hanno", "era", + "erano", "nel", "nella", "nei", "nelle", "questo", "questa", "così", +]) + +_EN_WORDS = frozenset([ + "the", "of", "and", "to", "in", "is", "that", "it", "was", "for", + "on", "are", "as", "with", "his", "they", "at", "be", "this", "have", + "from", "or", "an", "but", "not", "by", "he", "she", "we", "you", + "which", "their", "been", "has", "would", "there", "when", "will", +]) + + +def detect_language(text: str) -> str: + words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower()) + sample = words[:2000] + it = sum(1 for w in sample if w in _IT_WORDS) + en = sum(1 for w in sample if w in _EN_WORDS) + if it == 0 and en == 0: + return "unknown" + return "it" if it >= en else "en" + + +# ─── Markdown parsing ───────────────────────────────────────────────────────── + +def split_sections(text: str, header_level: int) -> list[str]: + """ + Split text on headers of the given level (1=h1, 2=h2, 3=h3). + Returns list of body texts for each matching section. + """ + prefix = "#" * header_level + " " + parts = re.split(rf'(?m)^{re.escape(prefix)}.+', text) + # parts[0] is preamble, rest are section bodies + return [p for p in parts[1:] if p.strip()] + + +def count_headers(text: str, level: int) -> int: + prefix = "#" * level + " " + return len(re.findall(rf'(?m)^{re.escape(prefix)}', text)) + + +def count_paragraphs(text: str) -> int: + """Count non-empty, non-header paragraph blocks.""" + blocks = re.split(r'\n{2,}', text) + return sum(1 for b in blocks if b.strip() and not re.match(r'^#+\s', b.strip())) + + +# ─── Core analysis ──────────────────────────────────────────────────────────── + +def analyze(raw_md_path: Path) -> dict: + text = raw_md_path.read_text(encoding="utf-8") + + n_h1 = count_headers(text, 1) + n_h2 = count_headers(text, 2) + n_h3 = count_headers(text, 3) + n_paragrafi = count_paragraphs(text) + + # Determine structural level and primary boundary + if n_h3 >= 5: + livello = 3 + boundary = "h3" + strategia = "h3_aware" + section_bodies = split_sections(text, 3) + elif n_h2 >= 3: + livello = 2 + boundary = "h2" + strategia = "h2_paragraph_split" + section_bodies = split_sections(text, 2) + elif n_h1 + n_h2 + n_h3 >= 1: + livello = 1 + boundary = "paragrafo" + strategia = "paragraph" + section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()] + else: + if n_paragrafi >= 3: + livello = 1 + boundary = "paragrafo" + strategia = "paragraph" + section_bodies = [b for b in re.split(r'\n{2,}', text) if b.strip()] + else: + livello = 0 + boundary = "nessuno" + strategia = "sliding_window" + section_bodies = [text] if text.strip() else [] + + lengths = [len(b) for b in section_bodies if b.strip()] + lunghezza_media = int(sum(lengths) / len(lengths)) if lengths else 0 + + lingua = detect_language(text) + + avvertenze = [] + short = sum(1 for l in lengths if l < 200) + long_ = sum(1 for l in lengths if l > 800) + if short: + avvertenze.append(f"{short} sezioni sotto i 200 caratteri — verranno accorpate") + if long_: + avvertenze.append(f"{long_} sezioni sopra i 800 caratteri — verranno divise") + + return { + "livello_struttura": livello, + "n_h1": n_h1, + "n_h2": n_h2, + "n_h3": n_h3, + "n_paragrafi": n_paragrafi, + "boundary_primario": boundary, + "lingua_rilevata": lingua, + "lunghezza_media_sezione": lunghezza_media, + "strategia_chunking": strategia, + "avvertenze": avvertenze, + } + + +# ─── Per-document processing ───────────────────────────────────────────────── + +def process_stem(stem: str, project_root: Path, force: bool) -> bool: + src_dir = project_root / "step-2" / stem + out_dir = project_root / "step-3" / stem + raw_src = src_dir / "raw.md" + clean_src = src_dir / "clean.md" + profile_out = out_dir / "structure_profile.json" + + print(f"\nDocumento: {stem}") + + if not raw_src.exists(): + print(f" ✗ raw.md non trovato in step-2/{stem}/ — skip") + return False + + if profile_out.exists() and not force: + print(f" ⚠️ structure_profile.json già presente — skip") + print(f" (usa --force per rieseguire)") + return True + + out_dir.mkdir(parents=True, exist_ok=True) + + # Copy files from step-2 + shutil.copy2(raw_src, out_dir / "raw.md") + if clean_src.exists(): + shutil.copy2(clean_src, out_dir / "clean.md") + print(f" Copiati raw.md e clean.md da step-2/{stem}/") + + # Analyze + print(f" Analisi struttura in corso...") + profile = analyze(out_dir / "raw.md") + + profile_out.write_text(json.dumps(profile, ensure_ascii=False, indent=2), encoding="utf-8") + + # Report + _LIVELLO_DESC = { + 3: "struttura ricca (###)", + 2: "struttura parziale (##)", + 1: "solo paragrafi", + 0: "testo piatto", + } + print(f" ✅ Livello {profile['livello_struttura']} — {_LIVELLO_DESC[profile['livello_struttura']]}") + print(f" h1={profile['n_h1']} h2={profile['n_h2']} h3={profile['n_h3']} paragrafi={profile['n_paragrafi']}") + print(f" Boundary: {profile['boundary_primario']} | Strategia: {profile['strategia_chunking']}") + print(f" Lingua: {profile['lingua_rilevata']} | Lunghezza media sezione: {profile['lunghezza_media_sezione']} char") + for w in profile["avvertenze"]: + print(f" ⚠️ {w}") + print(f" ✅ structure_profile.json salvato") + return True + + +# ─── Entry point ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + project_root = Path(__file__).parent.parent + + parser = argparse.ArgumentParser(description="Step 3 — Rilevamento struttura Markdown") + parser.add_argument("--stem", help="Nome del documento (sottocartella di step-2/)") + parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") + args = parser.parse_args() + + if args.stem: + stems = [args.stem] + else: + step2_dir = project_root / "step-2" + if not step2_dir.exists(): + print(f"Errore: cartella step-2/ non trovata in {project_root}") + sys.exit(1) + stems = sorted(p.name for p in step2_dir.iterdir() if p.is_dir()) + if not stems: + print(f"Errore: nessun documento trovato in step-2/") + sys.exit(1) + + results = [process_stem(s, project_root, args.force) for s in stems] + + ok = sum(results) + total = len(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti analizzati") + + sys.exit(0 if all(results) else 1)