diff --git a/chunks/chunker.py b/chunks/chunker.py index 9fa7b84..480a5ff 100644 --- a/chunks/chunker.py +++ b/chunks/chunker.py @@ -1,17 +1,29 @@ #!/usr/bin/env python3 """ -Chunking adattivo +Pipeline di chunking unificata (Stage 1 + Stage 2) -Divide il Markdown revisionato in chunk semantici pronti per la -vettorizzazione. La strategia dipende dal profilo strutturale del documento. +Stage 1 — Ottimizzazione Markdown (md_optimizer): + Legge _content_list_v2.json + _model.json di MinerU e produce _clean.md + con gerarchia H1/H2/H3 pulita (TOC, frontespizi e sommari rimossi). -Input: conversione//clean.md + conversione//structure_profile.json -Output: chunks//chunks.json +Stage 2 — Chunking semantico: + Divide il _clean.md in chunk semantici: + - un chunk per paragrafo (mai due paragrafi nello stesso chunk) + - split a confine di frase se il paragrafo supera MAX_CHARS + - overlap di OVERLAP_SENTENCES frasi tra chunk consecutivi + - tabelle e liste sono blocchi atomici (non si spezzano) + +Input: sources//auto/_content_list_v2.json + sources//auto/_model.json (opzionale) +Output: sources//auto/_clean.md + chunks//chunks.json + chunks//meta.json Uso: - python chunks/chunker.py # tutti i documenti in conversione/ - python chunks/chunker.py --stem documento # un solo documento - python chunks/chunker.py --stem documento --force + python chunks/chunker.py --stem + python chunks/chunker.py # tutti gli stem in sources/ + python chunks/chunker.py --stem --force + python chunks/chunker.py --stem --skip-optimize # salta Stage 1 """ import argparse @@ -24,474 +36,272 @@ _HERE = Path(__file__).resolve().parent if str(_HERE) not in sys.path: sys.path.insert(0, str(_HERE)) import config as cfg +from md_optimizer import optimize as _optimize_md # ─── Utilità ────────────────────────────────────────────────────────────────── def split_sentences(text: str) -> list[str]: - parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip()) - if len(parts) <= 1: - parts = re.split(r'(?<=[.!?»])\s+', text.strip()) + parts = re.split(cfg.SENTENCE_SPLIT_RE, text.strip()) return [p.strip() for p in parts if p.strip()] -def slugify(s: str, max_len: int = 60) -> str: - s = s.lower() - s = re.sub(r'[^\w\s-]', '', s) - s = re.sub(r'[\s_-]+', '_', s).strip('_') - return s[:max_len] if s else "section" - - -def _is_table_block(text: str) -> bool: - """True se il testo è prevalentemente una tabella Markdown (≥50% righe con |).""" - lines = [l for l in text.strip().splitlines() if l.strip()] - if not lines: - return False - table_lines = sum(1 for l in lines if l.strip().startswith("|")) - return table_lines / len(lines) >= 0.5 - - -def _ov(strategy: str) -> tuple[int, float, int]: - """Legge (target_chars, tolerance, overlap) dagli override di strategia.""" - ov = cfg.STRATEGY_OVERRIDES.get(strategy, {}) - target = ov.get("target_chars", cfg.TARGET_CHARS) - tolerance = ov.get("tolerance", cfg.CHUNK_TOLERANCE) - overlap = ov.get("overlap", cfg.OVERLAP_SENTENCES) - return target, tolerance, overlap - - -# ─── Core: split in sotto-chunk orientato al target ─────────────────────────── - -def make_sub_chunks( - body: str, - prefix: str, - sezione: str, - titolo: str, - target: int, - tolerance: float, - overlap_s: int, -) -> list[dict]: - """Divide body in chunk il più vicini possibile a `target` char. - - Logica: - lower = target × (1 − tolerance) → soglia minima per emettere - upper = target × (1 + tolerance) → limite massimo - - Si accumulano frasi intere finché la successiva farebbe superare `upper`. - A quel punto si emette (siamo vicini al target) e si riparte con overlap. - Ogni chunk termina sempre su un confine di frase; non attraversa mai - il boundary dell'header corrente. - """ - if cfg.PROTECT_TABLES and _is_table_block(body): - chunk_text = prefix + body - return [{ - "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s0", - "text": chunk_text, - "sezione": sezione, - "titolo": titolo, - "sub_index": 0, - "n_chars": len(chunk_text), - }] - - # Soglia calcolata sul corpo (n_chars finale = prefix_len + body_len). - prefix_len = len(prefix) - upper_body = max(1, int(target * (1 + tolerance)) - prefix_len) - - sentences = split_sentences(body) - if not sentences: - return [] - - chunks: list[dict] = [] - current: list[str] = [] - current_len = 0 - sub_index = 0 - - def _emit() -> None: - nonlocal current, current_len, sub_index - chunk_text = prefix + " ".join(current) - chunks.append({ - "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", - "text": chunk_text, - "sezione": sezione, - "titolo": titolo, - "sub_index": sub_index, - "n_chars": len(chunk_text), - }) - overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] - current = overlap[:] - # Lunghezza corretta dell'overlap (n-1 spazi tra n frasi). - current_len = sum(len(s) for s in current) + max(0, len(current) - 1) - sub_index += 1 - - for sent in sentences: - sep = 1 if current else 0 - new_len = current_len + sep + len(sent) - - if new_len <= upper_body: - # Ancora entro il limite del corpo: aggiungi e continua. - current.append(sent) - current_len = new_len - elif current: - # La frase successiva sfora il limite: emetti il chunk corrente - # (che termina su frase completa) poi inizia il nuovo con questa frase. - _emit() - current.append(sent) - current_len += (1 if current[:-1] else 0) + len(sent) - else: - # Chunk vuoto: la singola frase supera già il limite — emettiamo così com'è. - current.append(sent) - current_len = len(sent) - _emit() - - if current: - chunk_text = prefix + " ".join(current) - chunks.append({ - "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", - "text": chunk_text, - "sezione": sezione, - "titolo": titolo, - "sub_index": sub_index, - "n_chars": len(chunk_text), - }) - - return chunks +def context_to_meta(context: str) -> tuple[str, str]: + """Divide 'H1 > H2 > H3' in (sezione, titolo) per ingest/verify.""" + parts = [p.strip() for p in context.split(" > ") if p.strip()] + if len(parts) >= 2: + return " > ".join(parts[:-1]), parts[-1] + return (parts[0] if parts else ""), "" # ─── Parser Markdown ────────────────────────────────────────────────────────── -def parse_h3_sections(text: str) -> list[dict]: - sections = [] - current_h2 = "" - current_h3 = "" - current_body_lines: list[str] = [] +def parse_paragraphs(text: str) -> list[dict]: + """Estrae blocchi dal _clean.md con il loro contesto heading. - def flush(): - body = "\n".join(current_body_lines).strip() + Restituisce: [{"context": "H1 > H2 > H3", "text": "...", "kind": "text|table|list"}] + + Ogni riga vuota chiude il paragrafo corrente. Tabelle (righe con |) e + liste (righe con -) vengono accumulate come blocchi atomici. + """ + h1 = h2 = h3 = "" + result: list[dict] = [] + buf: list[str] = [] + cur_kind = "text" + + def flush() -> None: + body = "\n".join(buf).strip() if body: - sections.append({ - "sezione": current_h2, - "titolo": current_h3, - "body": body, - }) + parts = [p for p in [h1, h2, h3] if p] + context = " > ".join(parts) if parts else "documento" + result.append({"context": context, "text": body, "kind": cur_kind}) + buf.clear() for line in text.splitlines(): if re.match(r"^# ", line): flush() - current_h2 = line[2:].strip() - current_h3 = "" - current_body_lines = [] + h1, h2, h3 = line[2:].strip(), "", "" + cur_kind = "text" elif re.match(r"^## ", line): flush() - current_h2 = line[3:].strip() - current_h3 = "" - current_body_lines = [] + h2, h3 = line[3:].strip(), "" + cur_kind = "text" elif re.match(r"^### ", line): flush() - current_h3 = line[4:].strip() - current_body_lines = [] + h3 = line[4:].strip() + cur_kind = "text" + elif line.strip().startswith("|"): + if cur_kind != "table": + flush() + cur_kind = "table" + buf.append(line) + elif line.strip().startswith("- "): + if cur_kind != "list": + flush() + cur_kind = "list" + buf.append(line) + elif line.strip() == "": + flush() + cur_kind = "text" else: - current_body_lines.append(line) + if cur_kind in ("table", "list"): + flush() + cur_kind = "text" + buf.append(line) flush() - return sections + return result -def parse_h2_sections(text: str) -> list[dict]: - sections = [] - current_h2 = "" - current_body_lines: list[str] = [] +# ─── Chunking ───────────────────────────────────────────────────────────────── - def flush(): - body = "\n".join(current_body_lines).strip() - if body: - sections.append({"sezione": current_h2, "body": body}) +def make_chunks(paragraphs: list[dict]) -> list[dict]: + """Genera chunk dal risultato di parse_paragraphs. - for line in text.splitlines(): - if re.match(r"^## ", line): - flush() - current_h2 = line[3:].strip() - current_body_lines = [] - elif re.match(r"^# ", line): - flush() - current_h2 = line[2:].strip() - current_body_lines = [] - else: - current_body_lines.append(line) + Regole: + - un chunk = un paragrafo (o sotto-parte se > MAX_CHARS) + - split solo a confine di frase; una frase che supera MAX_CHARS è emessa intera + - l'ultima frase del chunk N viene preposta al chunk N+1 (overlap) + - tabelle e liste: blocco atomico (mai spezzato) + """ + chunks: list[dict] = [] + overlap_tail: list[str] = [] + idx = 0 - flush() - return sections + for para in paragraphs: + text = para["text"] + context = para["context"] + kind = para["kind"] + sezione, titolo = context_to_meta(context) - -# ─── Strategie di chunking ──────────────────────────────────────────────────── - -def chunk_h3_aware(text: str, stem: str) -> list[dict]: - target, tolerance, overlap = _ov("h3_aware") - lower = int(target * (1 - tolerance)) - - sections = parse_h3_sections(text) - - merged: list[dict] = [] - pending: dict | None = None - - for sec in sections: - if pending is None: - pending = dict(sec) + # ── Blocchi atomici (tabelle, liste) ────────────────────────────────── + if kind in ("table", "list"): + prefix = " ".join(overlap_tail) + " " if overlap_tail else "" + body = (prefix + text).strip() + chunk_text = f"[{context}]\n{body}" + chunks.append({ + "chunk_id": f"c{idx}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "context": context, + "n_chars": len(chunk_text), + }) + idx += 1 + sents = split_sentences(text) + overlap_tail = sents[-cfg.OVERLAP_SENTENCES:] if cfg.OVERLAP_SENTENCES else [] continue - if (pending["sezione"] == sec["sezione"] - and len(pending["body"]) < lower): - sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]])) - pending = { - "sezione": pending["sezione"], - "titolo": sep_title or pending["titolo"], - "body": pending["body"] + "\n\n" + sec["body"], - } - else: - merged.append(pending) - pending = dict(sec) + # ── Paragrafo testo: split a confine di frase ───────────────────────── + sents = split_sentences(text) + if not sents: + continue - if pending: - merged.append(pending) + current: list[str] = list(overlap_tail) + has_primary: bool = False - chunks = [] - for sec in merged: - sezione = sec["sezione"] or stem - titolo = sec["titolo"] or "" - body = sec["body"] - prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n" - chunks.extend(make_sub_chunks(body, prefix, sezione, titolo, target, tolerance, overlap)) + for sent in sents: + candidate_len = len(" ".join(current + [sent])) - return chunks - - -def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: - target, tolerance, overlap = _ov("h2_paragraph_split") - lower = int(target * (1 - tolerance)) - - sections = parse_h2_sections(text) - chunks = [] - - for sec in sections: - sezione = sec["sezione"] or stem - body = sec["body"] - prefix = f"[{sezione}]\n" - - paragraphs = [ - p.strip() - for p in re.split(r"\n{2,}", body) - if p.strip() and not re.match(r"^#+\s", p.strip()) - ] - - merged_pars: list[str] = [] - pending = "" - for par in paragraphs: - if pending and len(pending) < lower: - pending = pending + "\n\n" + par + if candidate_len <= cfg.MAX_CHARS or not has_primary: + current.append(sent) + has_primary = True else: - if pending: - merged_pars.append(pending) - pending = par - if pending: - merged_pars.append(pending) + body = " ".join(current) + chunk_text = f"[{context}]\n{body}" + chunks.append({ + "chunk_id": f"c{idx}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "context": context, + "n_chars": len(chunk_text), + }) + idx += 1 + overlap_tail = current[-cfg.OVERLAP_SENTENCES:] if cfg.OVERLAP_SENTENCES else [] + current = list(overlap_tail) + [sent] + has_primary = True - for idx, par in enumerate(merged_pars): - sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", target, tolerance, overlap) - for c in sub: - c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}" - chunks.extend(sub) + if has_primary: + body = " ".join(current) + chunk_text = f"[{context}]\n{body}" + chunks.append({ + "chunk_id": f"c{idx}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "context": context, + "n_chars": len(chunk_text), + }) + idx += 1 + overlap_tail = current[-cfg.OVERLAP_SENTENCES:] if cfg.OVERLAP_SENTENCES else [] return chunks -def chunk_paragraph(text: str, stem: str) -> list[dict]: - target, tolerance, overlap = _ov("paragraph") - lower = int(target * (1 - tolerance)) +# ─── Pipeline per documento ─────────────────────────────────────────────────── - paragraphs = [ - p.strip() - for p in re.split(r"\n{2,}", text) - if p.strip() and not re.match(r"^#+\s", p.strip()) - ] - prefix = f"[Documento: {stem}]\n" +def process_stem(stem: str, project_root: Path, + force: bool, skip_optimize: bool) -> bool: + """Esegue Stage 1 (ottimizzazione MD) + Stage 2 (chunking) per un documento.""" - merged: list[str] = [] - pending = "" - for par in paragraphs: - if pending and len(pending) < lower: - pending = pending + "\n\n" + par - else: - if pending: - merged.append(pending) - pending = par - if pending: - merged.append(pending) + # ── Stage 1: ottimizzazione Markdown ────────────────────────────────────── + if not skip_optimize: + ok = _optimize_md(stem, project_root, force=force) + if not ok: + return False + else: + print(f"\n[Stage 1] skip (--skip-optimize)") - chunks = [] - for idx, par in enumerate(merged): - sub = make_sub_chunks(par, prefix, stem, f"par{idx}", target, tolerance, overlap) - for c in sub: - c["chunk_id"] = f"para__{idx}__s{c['sub_index']}" - chunks.extend(sub) + # ── Stage 2: chunking ───────────────────────────────────────────────────── + clean_md = project_root / "sources" / stem / "auto" / f"{stem}_clean.md" + out_dir = project_root / "chunks" / stem + out_file = out_dir / "chunks.json" - return chunks - - -def chunk_sliding_window(text: str, stem: str) -> list[dict]: - target, tolerance, overlap = _ov("sliding_window") - upper = int(target * (1 + tolerance)) - - sentences = split_sentences(text) - prefix = f"[Documento: {stem}]\n" - - chunks = [] - i = 0 - win_idx = 0 - - while i < len(sentences): - window: list[str] = [] - cur_len = 0 - - j = i - while j < len(sentences): - s = sentences[j] - sep = 1 if window else 0 - if window and cur_len + sep + len(s) > upper: - break - window.append(s) - cur_len += sep + len(s) - j += 1 - - if not window: - window = [sentences[i]] - j = i + 1 - - chunk_text = prefix + " ".join(window) - chunks.append({ - "chunk_id": f"win__{win_idx}", - "text": chunk_text, - "sezione": stem, - "titolo": f"finestra {win_idx}", - "sub_index": win_idx, - "n_chars": len(chunk_text), - }) - win_idx += 1 - i += max(1, len(window) - overlap) - - return chunks - - -# ─── Dispatcher ─────────────────────────────────────────────────────────────── - -_STRATEGIES: dict[str, callable] = { - "h3_aware": chunk_h3_aware, - "h2_paragraph_split": chunk_h2_paragraph_split, - "paragraph": chunk_paragraph, - "sliding_window": chunk_sliding_window, -} - - -def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]: - text = clean_md.read_text(encoding="utf-8") - strategia = profile.get("strategia_chunking", "paragraph") - fn = _STRATEGIES.get(strategia, chunk_paragraph) - return fn(text, stem) - - -# ─── Per-document processing ────────────────────────────────────────────────── - -def process_stem(stem: str, project_root: Path, force: bool) -> bool: - conv_dir = project_root / "conversione" / stem - out_dir = project_root / "chunks" / stem - clean_md = conv_dir / "clean.md" - profile_path = conv_dir / "structure_profile.json" - out_file = out_dir / "chunks.json" - - print(f"\nDocumento: {stem}") + print(f"[Stage 2] Chunking: {stem}") if not clean_md.exists(): - print(f" ✗ clean.md non trovato in conversione/{stem}/ — skip") - return False - if not profile_path.exists(): - print(f" ✗ structure_profile.json non trovato in conversione/{stem}/ — skip") + print(f" ✗ {stem}_clean.md non trovato") return False if out_file.exists() and not force: - print(f" ⚠️ chunks.json già presente — skip") - print(f" (usa --force per rieseguire)") + print(f" ↩ chunks.json già presente — skip chunking") return True - profile = json.loads(profile_path.read_text(encoding="utf-8")) - strategia = profile.get("strategia_chunking", "paragraph") - print(f" Strategia: {strategia}") + text = clean_md.read_text(encoding="utf-8") + paragraphs = parse_paragraphs(text) - chunks = chunk_document(clean_md, profile, stem) + if not paragraphs: + print(f" ✗ Nessun paragrafo estratto da {clean_md.name}") + return False + + chunks = make_chunks(paragraphs) if not chunks: - print(f" ✗ Nessun chunk generato — controlla clean.md") + print(f" ✗ Nessun chunk generato") return False out_dir.mkdir(parents=True, exist_ok=True) out_file.write_text( json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" ) - - target, tolerance, _ = _ov(strategia) - lower = int(target * (1 - tolerance)) - upper = int(target * (1 + tolerance)) - - meta = {"strategy": strategia, "target_chars": target, - "min_chars": lower, "max_chars": upper} (out_dir / "meta.json").write_text( - json.dumps(meta, ensure_ascii=False), encoding="utf-8" + json.dumps({ + "min_chars": cfg.MIN_CHARS, + "max_chars": cfg.MAX_CHARS, + "target_chars": cfg.MAX_CHARS, + "overlap": cfg.OVERLAP_SENTENCES, + "strategy": "paragraph_overlap", + }, ensure_ascii=False), + encoding="utf-8", ) - lengths = [c["n_chars"] for c in chunks] - min_c = min(lengths) - max_c = max(lengths) - avg_c = int(sum(lengths) / len(lengths)) - short = sum(1 for l in lengths if l < lower) - long_ = sum(1 for l in lengths if l > upper) + lengths = [c["n_chars"] for c in chunks] + over_max = sum(1 for l in lengths if l > cfg.MAX_CHARS) + under_min = sum(1 for l in lengths if l < cfg.MIN_CHARS) + avg = int(sum(lengths) / len(lengths)) - print(f" Target: {target} char ±{int(tolerance*100)}% " - f"→ range [{lower}, {upper}]") - print(f" Chunk totali: {len(chunks)}") - print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char") - if short: - print(f" ⚠️ {short} chunk sotto lower ({lower})") - if long_: - print(f" ⚠️ {long_} chunk sopra upper ({upper})") - print(f" ✅ chunks.json salvato in chunks/{stem}/") + print(f" ✅ {len(chunks)} chunk | media {avg} char | max {max(lengths)} char") + if over_max: + print(f" ⚠️ {over_max} chunk superano MAX_CHARS={cfg.MAX_CHARS}") + if under_min: + print(f" ℹ️ {under_min} chunk sotto MIN_CHARS={cfg.MIN_CHARS}") + print(f" → chunks/{stem}/chunks.json") return True -# ─── Entry point ───────────────────────────────────────────────────────────── +# ─── Entry point ────────────────────────────────────────────────────────────── if __name__ == "__main__": project_root = Path(__file__).parent.parent - parser = argparse.ArgumentParser(description="Chunking adattivo") - parser.add_argument("--stem", help="Nome del documento (sottocartella di conversione/)") - parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") + parser = argparse.ArgumentParser( + description="Pipeline unificata MinerU → _clean.md → chunks.json" + ) + parser.add_argument("--stem", help="Nome documento (sottocartella di sources/)") + parser.add_argument("--force", action="store_true", + help="Rigenera _clean.md e chunks.json anche se esistono") + parser.add_argument("--skip-optimize", action="store_true", + help="Salta Stage 1 (usa _clean.md già presente)") args = parser.parse_args() if args.stem: stems = [args.stem] else: - conv_dir = project_root / "conversione" - if not conv_dir.exists(): - print(f"Errore: cartella conversione/ non trovata in {project_root}") - sys.exit(1) + sources_dir = project_root / "sources" stems = sorted( - p.name for p in conv_dir.iterdir() - if p.is_dir() and (p / "clean.md").exists() + p.name for p in sources_dir.iterdir() + if p.is_dir() + and (p / "auto" / f"{p.name}_content_list_v2.json").exists() ) if not stems: - print(f"Errore: nessun documento trovato in conversione/") + print("Errore: nessun documento MinerU trovato in sources/") sys.exit(1) - results = [process_stem(s, project_root, args.force) for s in stems] - - ok = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti processati") + results = [ + process_stem(s, project_root, args.force, args.skip_optimize) + for s in stems + ] + ok = sum(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{len(results)} documenti processati") sys.exit(0 if all(results) else 1) diff --git a/chunks/config.py b/chunks/config.py index 1a73450..f355959 100644 --- a/chunks/config.py +++ b/chunks/config.py @@ -1,88 +1,113 @@ #!/usr/bin/env python3 """ -Parametri di configurazione della pipeline di chunking. +Parametri della pipeline chunks: chunker.py (+ md_optimizer interno) + verify/fix. -Modifica questo file per cambiare il comportamento di chunker.py, -verify_chunks.py e fix_chunks.py senza toccare il codice applicativo. +La pipeline è unificata: chunker.py esegue prima l'ottimizzazione del Markdown +(Stage 1, equivalente a md_optimizer.py) e poi il chunking (Stage 2). + +I parametri sono pensati per essere generici rispetto agli output di MinerU: +i file *_content_list_v2.json e *_model.json hanno sempre la stessa struttura, +indipendentemente dal documento sorgente. """ -# ─── Grandezza target dei chunk ─────────────────────────────────────────────── -# -# TARGET_CHARS è la dimensione ideale a cui il chunker mira. -# CHUNK_TOLERANCE è la tolleranza relativa (es. 0.25 = ±25%). -# -# range accettabile = [TARGET × (1 − TOL), TARGET × (1 + TOL)] -# -# Con TARGET=600 e TOL=0.25 → ogni chunk sarà tra 450 e 750 char, -# il più vicino possibile a 600, terminando sempre su un confine di frase. -# -TARGET_CHARS = 300 -CHUNK_TOLERANCE = 0.25 +# ─── Stage 1 — md_optimizer (pulizia Markdown) ─────────────────────────────── -# ─── Overlap ────────────────────────────────────────────────────────────────── - -# Numero di frasi ripetute all'inizio del chunk successivo per preservare -# il contesto tra chunk adiacenti della stessa sezione. -OVERLAP_SENTENCES = 1 - -# ─── Soglie di validazione ──────────────────────────────────────────────────── - -# fix_chunks.py spezza un chunk "too_long" solo se supera upper × questo fattore. -# Es. upper=750, fattore=1.5 → split solo per chunk > 1125 char. -# Chunk in [upper, upper×fattore] restano come warning non bloccanti. -SPLIT_THRESHOLD_FACTOR = 1.5 - -MATH_SYMS_MIN = 3 # min. simboli math per declassare incomplete → incomplete_math - -# ─── Pattern e formato ──────────────────────────────────────────────────────── - -SENTENCE_SPLIT_PATTERN = r"(?<=[.!?»])\s+" -PREFIX_TEMPLATE = "[{sezione} > {titolo}]" - -# ─── Protezione contenuti speciali ──────────────────────────────────────────── - -# Se True, un blocco prevalentemente tabella Markdown (≥50% righe |…|) -# viene emesso come chunk atomico senza sentence-splitting. -PROTECT_TABLES = True - -# Riservato — blocchi LaTeX non spezzabili (implementazione futura). -PROTECT_MATH = True - -# ─── Fix behavior ───────────────────────────────────────────────────────────── - -# Numero massimo di iterazioni del loop fix → verify → fix. -# Con 1 si ottiene il comportamento originale (fix singolo senza re-verifica). -FIX_MAX_ITERATIONS = 3 - -# ─── Override per strategia ─────────────────────────────────────────────────── -# -# Sovrascrivono TARGET_CHARS / CHUNK_TOLERANCE / OVERLAP_SENTENCES -# per la specifica strategia indicata in structure_profile.json. -# Chiavi riconosciute: "target_chars", "tolerance", "overlap". -# -STRATEGY_OVERRIDES: dict[str, dict] = { - "h3_aware": { - # Documenti strutturati H2→H3: chunk medi, overlap moderato. - "target_chars": 600, - "tolerance": 0.25, - "overlap": 2, - }, - "h2_paragraph_split": { - # Documenti piatti (solo H2): chunk più ampi, overlap ridotto. - "target_chars": 800, - "tolerance": 0.25, - "overlap": 1, - }, - "paragraph": { - # Documenti senza header significativi: chunk più corti. - "target_chars": 500, - "tolerance": 0.30, - "overlap": 1, - }, - "sliding_window": { - # Testo lineare/narrativo: finestre ampie, overlap generoso. - "target_chars": 800, - "tolerance": 0.25, - "overlap": 3, - }, +# Tipi MinerU da ignorare completamente. +NOISE_TYPES: set[str] = { + "page_header", "page_number", "page_footer", "index", "page_aside_text", } + +# Paragrafi promossi a H3 se testo ≤ H3_MAX_CHARS e matcha H3_DETECTION_RE. +# Regex generica: riga che inizia con numero seguito da punto e spazio. +# Per disabilitare la promozione a H3: imposta H3_DETECTION_RE = r"(?!)" +H3_DETECTION_RE: str = r"^\d+\.\s+\S" +H3_MAX_CHARS: int = 120 + +# Se True, i blocchi immagine non vengono inclusi nel Markdown. +SKIP_IMAGES: bool = True + +# Heading le cui sezioni vengono rimosse completamente (titolo + tutto il contenuto). +# Match case-insensitive: il testo dell'heading deve essere uguale o iniziare +# con uno dei valori seguenti. +# Nota: specifici per documento — impostare set vuoto per documenti non italiani +# o senza sezioni di frontmatter note. +FRONTMATTER_HEADINGS: set[str] = { + "sommario", + "indice", + "autori", + "abbreviazioni", + "atti normativi", + "specifici provvedimenti normativi", + "abbreviazioni generiche", +} + +# Numero minimo di heading consecutivi senza testo per riconoscere un TOC. +# Abbassare se il documento ha molti capitoli corti senza sottosezioni. +MIN_TOC_HEADINGS: int = 5 + +# Caratteri minimi di testo reale sotto un heading perché sia considerato +# "contenuto vero" (non frontespizio/copyright). +# Impostare >= lunghezza massima del testo di copyright/copertina nel documento. +MIN_CONTENT_CHARS: int = 2500 + +# Pattern per riconoscere prefissi di capitolo in blocchi paragraph. +# MinerU talvolta produce il numero/identificatore di capitolo come paragraph +# anziché come title L1 (comportamento non uniforme). Questi pattern permettono +# di bufferizzare tali paragrafi e fonderli col titolo L1 successivo. +# Impostare lista vuota [] per disabilitare. +CHAPTER_PREFIX_PATTERNS: list[str] = [ + r"^(CAPITOLO|PARTE)\s+(\d+|[IVXLCDM]+)\b", # italiano + r"^(CHAPTER|PART|SECTION)\s+(\d+|[IVXLCDM]+)\b", # inglese + r"^(CHAPITRE|PARTIE)\s+(\d+|[IVXLCDM]+)\b", # francese + r"^(KAPITEL|TEIL)\s+(\d+|[IVXLCDM]+)\b", # tedesco +] + +# Pattern testuali (regex) per riconoscere paragrafi "sommario interno" da saltare. +# Usati come fallback quando _model.json non assegna label "abstract". +# Generici: un pattern per paragrafo che inizia con indice/sommario di sezione. +# Per disabilitare: impostare lista vuota []. +SOMMARIO_PATTERNS: list[str] = [ + r"^SOMMARIO\s*:", # italiano + r"^SUMMARY\s*:", # inglese + r"^RÉSUMÉ\s*:", # francese + r"^ÍNDICE\s*:", # spagnolo/portoghese + r"^INHALT\s*:", # tedesco +] + +# ─── _model.json label sets ─────────────────────────────────────────────────── + +# Label di layout da saltare completamente. +MODEL_SKIP_LABELS: set[str] = { + "header", "number", "footer_image", "ocr_text", "aside_text", +} + +# Label che identifica indici/sommari interni (da saltare). +MODEL_ABSTRACT_LABELS: set[str] = {"abstract"} + +# ─── Stage 2 — chunker ──────────────────────────────────────────────────────── + +# Lunghezza massima di un chunk (caratteri, prefisso incluso). +# Paragrafi che superano questo limite vengono spezzati a confine di frase. +# Una singola frase che supera MAX_CHARS viene emessa intera (non si spezza mai). +MAX_CHARS: int = 1200 + +# Lunghezza minima attesa (warning in verify_chunks, non blocker). +MIN_CHARS: int = 80 + +# Frasi di overlap: l'ultima frase del chunk N viene preposta al chunk N+1. +OVERLAP_SENTENCES: int = 1 + +# Regex per rilevare il confine di fine frase per lo split. +# Split solo prima di lettera maiuscola o virgolette — evita split su abbreviazioni. +SENTENCE_SPLIT_RE: str = r"(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙ\"])" + + +# ─── verify_chunks.py / fix_chunks.py ───────────────────────────────────────── + +# fix_chunks spezza un chunk too_long solo se supera MAX_CHARS × questo fattore. +SPLIT_THRESHOLD_FACTOR: float = 1.5 + +MATH_SYMS_MIN: int = 3 +PROTECT_TABLES: bool = True +PROTECT_MATH: bool = True +FIX_MAX_ITERATIONS: int = 3 diff --git a/chunks/fix_chunks.py b/chunks/fix_chunks.py index 45b6862..9f94fca 100644 --- a/chunks/fix_chunks.py +++ b/chunks/fix_chunks.py @@ -34,7 +34,7 @@ if str(_HERE) not in sys.path: import config as cfg from verify_chunks import verify_stem as _verify_stem -MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) +MAX_CHARS = cfg.MAX_CHARS def _load_thresholds(stem_dir: Path) -> int: @@ -314,8 +314,8 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool, # Fase 1: risolvi blockers a convergenza (solo merge incomplete) chunks = _fix_blockers(chunks, report) - _min = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) - _max = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) + _min = cfg.MIN_CHARS + _max = cfg.MAX_CHARS prev_blockers = sum(len(v) for v in report.get("blockers", {}).values()) for iteration in range(1, max_iter + 1): @@ -378,7 +378,7 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fix chunk") parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di chunks/)") - _max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) + _max_def = cfg.MAX_CHARS parser.add_argument( "--max", type=int, default=_max_def, help=f"Soglia massima caratteri per lo split (default: TARGET×(1+TOL) = {_max_def})" diff --git a/chunks/md_optimizer.py b/chunks/md_optimizer.py new file mode 100644 index 0000000..e1db5e1 --- /dev/null +++ b/chunks/md_optimizer.py @@ -0,0 +1,540 @@ +#!/usr/bin/env python3 +""" +Stage 1 — Ottimizzatore Markdown (modulo interno, chiamato da chunker.py) + +Legge _content_list_v2.json (struttura primaria) e _model.json (label di +layout) di MinerU e produce un Markdown pulito con gerarchia H1/H2/H3. + +Progettato per essere generico rispetto al documento: sfrutta la struttura +comune di tutti gli output MinerU senza dipendere da pattern testuali +specifici del documento sorgente. + +Logica di costruzione blocchi: + - title L1 consecutivi senza contenuto tra loro → fusi in un H1 unico + (il primo frammento è sempre il numero/identificatore del capitolo) + - title L1 singolo → H1 + - title L2 → H2 + - paragraph con label "abstract" o che matcha SOMMARIO_PATTERNS → skip + - paragraph breve che matcha H3_DETECTION_RE → H3 + - paragraph normale → testo + - label MODEL_SKIP_LABELS → skip + +Filtri di pulizia: + - _remove_frontmatter : rimuove sezioni per nome (FRONTMATTER_HEADINGS) + - _remove_toc_runs : rimuove sequenze di heading senza contenuto (TOC) + - _remove_frontespizio : rimuove contenuto prima del primo heading "vero" + (>= MIN_CONTENT_CHARS di testo reale) + +Input: sources//auto/_content_list_v2.json + sources//auto/_model.json (opzionale) +Output: sources//auto/_clean.md + +Uso standalone: + python chunks/md_optimizer.py --stem [--force] + python chunks/md_optimizer.py # tutti gli stem in sources/ +""" + +import argparse +import json +import re +import sys +from dataclasses import dataclass +from pathlib import Path + +_HERE = Path(__file__).resolve().parent +if str(_HERE) not in sys.path: + sys.path.insert(0, str(_HERE)) +import config as cfg + + +# ─── Struttura dati interna ─────────────────────────────────────────────────── + +@dataclass +class Block: + kind: str # "h1" | "h2" | "h3" | "text" | "list" | "table" + text: str + + +_HEADING_LEVEL = {"h1": 1, "h2": 2, "h3": 3} + +# Pattern compilati da config (inizializzati lazy per permettere hot-reload in test) +_SOMMARIO_RES: list[re.Pattern] = [] +_CHAPTER_PREFIX_RES: list[re.Pattern] = [] + +def _init_patterns() -> None: + global _SOMMARIO_RES, _CHAPTER_PREFIX_RES + _SOMMARIO_RES = [re.compile(p, re.IGNORECASE) for p in cfg.SOMMARIO_PATTERNS] + _CHAPTER_PREFIX_RES = [re.compile(p, re.IGNORECASE) for p in cfg.CHAPTER_PREFIX_PATTERNS] + +_init_patterns() + + +def _is_sommario(text: str) -> bool: + return any(r.match(text) for r in _SOMMARIO_RES) + + +def _is_chapter_prefix(text: str) -> bool: + """True se il testo è un identificatore di capitolo (es. "CAPITOLO 1"). + + Usato come fallback quando MinerU produce il numero del capitolo come + paragraph anziché come title L1. + """ + return any(r.match(text) for r in _CHAPTER_PREFIX_RES) + + +# ─── Caricamento e indicizzazione _model.json ───────────────────────────────── + +def _load_label_map(model_path: Path) -> dict[int, list[tuple[float, float, str]]]: + """Restituisce {page_idx: [(cx_v2, cy_v2, label), ...]} + + Le coordinate cx/cy sono nel sistema di riferimento v2: + v2_coord = model_coord * 1000 / model_page_dim + """ + if not model_path.exists(): + return {} + + pages = json.loads(model_path.read_text(encoding="utf-8")) + label_map: dict[int, list[tuple[float, float, str]]] = {} + + for page in pages: + info = page.get("page_info", {}) + page_no = info.get("page_no", 0) + pw = info.get("width", 1350) + ph = info.get("height", 1891) + + entries: list[tuple[float, float, str]] = [] + for det in page.get("layout_dets", []): + label = det.get("label", "") + if label in cfg.MODEL_SKIP_LABELS: + continue + x0, y0, x1, y1 = det["bbox"] + cx = (x0 + x1) * 0.5 * 1000.0 / pw + cy = (y0 + y1) * 0.5 * 1000.0 / ph + entries.append((cx, cy, label)) + + label_map[page_no] = entries + + return label_map + + +def _get_label(page_idx: int, bbox: list[int], + label_map: dict[int, list]) -> str: + """Restituisce il label model.json il cui centro è più vicino al centro + del bbox v2 (tolleranza 80 unità v2 ≈ 8% della larghezza pagina).""" + entries = label_map.get(page_idx) + if not entries: + return "" + x0, y0, x1, y1 = bbox + cx = (x0 + x1) * 0.5 + cy = (y0 + y1) * 0.5 + + best_label = "" + best_dist = 80.0 + + for ex, ey, label in entries: + dist = ((cx - ex) ** 2 + (cy - ey) ** 2) ** 0.5 + if dist < best_dist: + best_dist = dist + best_label = label + + return best_label + + +# ─── Estrazione testo dai blocchi MinerU ────────────────────────────────────── + +def _text_para(content: dict) -> str: + return " ".join( + p["content"] for p in content.get("paragraph_content", []) + if p.get("type") == "text" + ).strip() + + +def _text_title(content: dict) -> str: + return " ".join( + p["content"] for p in content.get("title_content", []) + if p.get("type") == "text" + ).strip() + + +def _text_list(content: dict) -> str: + lines = [] + for item in content.get("list_content", []): + for block in item.get("blocks", []): + t = block.get("content", "").strip() + if t: + lines.append(f"- {t}") + return "\n".join(lines) + + +def _is_h3_candidate(text: str) -> bool: + return ( + len(text) <= cfg.H3_MAX_CHARS + and bool(re.match(cfg.H3_DETECTION_RE, text)) + ) + + +# ─── Build blocchi da JSON MinerU ───────────────────────────────────────────── + +def _build_blocks(pages: list, label_map: dict) -> list[Block]: + """Costruisce la lista di Block dalla struttura MinerU. + + Logica per i titoli H1 consecutivi (generica, senza pattern lingua-specifica): + - Ogni title L1 viene bufferizzato come "pending_h1". + - Se arriva un altro title L1 subito dopo (senza contenuto tra loro), + i due frammenti vengono fusi in un unico H1 con " — " come separatore. + Questo gestisce il pattern comune di MinerU dove il numero/identificatore + del capitolo e il suo titolo sono due blocchi separati. + - Quando arriva contenuto non-titolo (paragrafo, lista, H2), il pending_h1 + viene emesso così com'è. + """ + blocks: list[Block] = [] + pending_h1: str = "" # titolo L1 in attesa di conferma/merge + + def _flush_h1() -> None: + nonlocal pending_h1 + if pending_h1: + blocks.append(Block(kind="h1", text=pending_h1)) + pending_h1 = "" + + for page_idx, page in enumerate(pages): + for item in page: + kind = item.get("type", "") + content = item.get("content", {}) + bbox = item.get("bbox", [0, 0, 0, 0]) + + # ── Tipi MinerU rumorosi ───────────────────────────────────────── + if kind in cfg.NOISE_TYPES: + _flush_h1() + continue + + model_label = _get_label(page_idx, bbox, label_map) + + # ── Label model rumorosi ───────────────────────────────────────── + if model_label in cfg.MODEL_SKIP_LABELS: + continue + + # ── Sommari interni (abstract label o pattern testuale) ────────── + if model_label in cfg.MODEL_ABSTRACT_LABELS: + continue + + # ── Titoli ─────────────────────────────────────────────────────── + if kind == "title": + text = _text_title(content) + if not text: + continue + level = min(content.get("level", 2), 3) + + if level == 1: + if pending_h1: + # Due title L1 consecutivi: fondi il precedente col corrente + merged = f"{pending_h1} — {text}" + pending_h1 = merged + else: + pending_h1 = text + else: + # H2: emetti prima il pending H1 se esiste + _flush_h1() + blocks.append(Block(kind="h2", text=text)) + + # ── Paragrafi ──────────────────────────────────────────────────── + elif kind == "paragraph": + text = _text_para(content) + if not text: + continue + + # Sommario interno: salta (fallback testuale se label non copre) + if _is_sommario(text): + continue + + # Prefisso di capitolo come paragraph (es. "CAPITOLO 1"): + # bufferizza come pending H1, verrà fuso col titolo L1 successivo + if _is_chapter_prefix(text): + if pending_h1: + pending_h1 = f"{pending_h1} — {text}" + else: + pending_h1 = text + continue + + _flush_h1() + + if _is_h3_candidate(text): + blocks.append(Block(kind="h3", text=text)) + else: + blocks.append(Block(kind="text", text=text)) + + # ── Liste ──────────────────────────────────────────────────────── + elif kind == "list": + _flush_h1() + text = _text_list(content) + if text: + blocks.append(Block(kind="list", text=text)) + + # ── Tabelle ────────────────────────────────────────────────────── + elif kind == "table": + _flush_h1() + body = content.get("table_body", "") + if body: + blocks.append(Block(kind="table", text=body)) + + # ── Immagini (opzionale) ───────────────────────────────────────── + elif kind == "image" and not cfg.SKIP_IMAGES: + _flush_h1() + src = content.get("image_source", {}).get("path", "") + caption = " ".join( + c.get("content", "") for c in content.get("image_caption", []) + ).strip() + if src: + blocks.append(Block(kind="text", text=f"![{caption}]({src})")) + + else: + _flush_h1() + + _flush_h1() # flush finale + return blocks + + +# ─── Helpers content check ──────────────────────────────────────────────────── + +def _has_content(blocks: list[Block], idx: int) -> bool: + """True se esiste almeno un blocco testo/lista/tabella prima del prossimo + heading di livello uguale o superiore.""" + level = _HEADING_LEVEL.get(blocks[idx].kind) + if level is None: + return False + for b in blocks[idx + 1:]: + blevel = _HEADING_LEVEL.get(b.kind) + if blevel is not None and blevel <= level: + return False + if b.kind in ("text", "list", "table"): + return True + return False + + +def _has_real_content(blocks: list[Block], idx: int) -> bool: + """True se il totale caratteri di testo sotto questo heading >= + MIN_CONTENT_CHARS. Permette di distinguere frontespizi (copyright breve) + da sezioni con contenuto vero.""" + level = _HEADING_LEVEL.get(blocks[idx].kind) + if level is None: + return False + total = 0 + for b in blocks[idx + 1:]: + blevel = _HEADING_LEVEL.get(b.kind) + if blevel is not None and blevel <= level: + break + if b.kind in ("text", "list", "table"): + total += len(b.text) + if total >= cfg.MIN_CONTENT_CHARS: + return True + return False + + +# ─── Filtri di pulizia ──────────────────────────────────────────────────────── + +def _remove_frontmatter(blocks: list[Block]) -> list[Block]: + """Rimuove le sezioni il cui heading è in FRONTMATTER_HEADINGS, insieme + a tutto il loro contenuto. + + Il salto continua finché non si trova un heading non-frontmatter — + questo elimina anche sezioni TOC consecutive in un colpo solo. + """ + def _norm(text: str) -> str: + t = text.strip().lower() + # Rimuovi eventuale prefisso "Xxx N — " (identificatore capitolo) + return re.sub(r"^\S+\s+\S+\s+[—\-]\s*", "", t) + + def _is_fm(text: str) -> bool: + core = _norm(text) + return any( + core == fm or core.startswith(fm + " ") + for fm in cfg.FRONTMATTER_HEADINGS + ) + + if not cfg.FRONTMATTER_HEADINGS: + return blocks + + result: list[Block] = [] + i = 0 + while i < len(blocks): + b = blocks[i] + if b.kind in _HEADING_LEVEL and _is_fm(b.text): + level = _HEADING_LEVEL[b.kind] + i += 1 + while i < len(blocks): + nxt = blocks[i] + nxt_level = _HEADING_LEVEL.get(nxt.kind) + if nxt_level is not None and nxt_level <= level and not _is_fm(nxt.text): + break + i += 1 + continue + result.append(b) + i += 1 + return result + + +def _remove_toc_runs(blocks: list[Block]) -> list[Block]: + """Rimuove sequenze di MIN_TOC_HEADINGS o più heading consecutivi senza + testo reale tra loro (TOC residuo). + + "Consecutivi" tolera micro-testi brevi (≤ 120 chars) intercalati tra + i heading (es. attribuzioni autori nel TOC). + """ + def _is_toc_entry(idx: int) -> bool: + b = blocks[idx] + if b.kind not in _HEADING_LEVEL: + return False + level = _HEADING_LEVEL[b.kind] + for b2 in blocks[idx + 1:]: + blevel = _HEADING_LEVEL.get(b2.kind) + if blevel is not None and blevel <= level: + return True + if b2.kind in ("text", "list", "table") and len(b2.text) > 20: + return False + return True + + result: list[Block] = [] + i = 0 + while i < len(blocks): + b = blocks[i] + if b.kind in _HEADING_LEVEL and _is_toc_entry(i): + j = i + 1 + toc_count = 1 + while j < len(blocks): + bj = blocks[j] + if bj.kind in _HEADING_LEVEL: + if _is_toc_entry(j): + toc_count += 1 + j += 1 + continue + else: + break + if bj.kind in ("text", "list") and len(bj.text) <= 120: + j += 1 + continue + break + if toc_count >= cfg.MIN_TOC_HEADINGS: + i = j + continue + result.append(b) + i += 1 + return result + + +def _remove_frontespizio(blocks: list[Block]) -> list[Block]: + """Rimuove tutto il contenuto prima del primo heading con contenuto reale + (>= MIN_CONTENT_CHARS): copertine, copyright, pagine iniziali.""" + for i, b in enumerate(blocks): + if b.kind in _HEADING_LEVEL and _has_real_content(blocks, i): + return blocks[i:] + return blocks + + +def filter_blocks(blocks: list[Block]) -> list[Block]: + blocks = _remove_frontmatter(blocks) + blocks = _remove_toc_runs(blocks) + blocks = _remove_frontespizio(blocks) + return blocks + + +# ─── Rendering ──────────────────────────────────────────────────────────────── + +def _render(blocks: list[Block]) -> str: + lines: list[str] = [] + prev_was_heading = False + + for b in blocks: + if b.kind in ("h1", "h2", "h3"): + prefix = "#" * _HEADING_LEVEL[b.kind] + if lines and not prev_was_heading: + lines.append("") + lines.append(f"{prefix} {b.text}") + prev_was_heading = True + else: + lines.append("") + lines.append(b.text) + prev_was_heading = False + + md = "\n".join(lines).strip() + "\n" + return re.sub(r"\n{3,}", "\n\n", md) + + +# ─── Core ───────────────────────────────────────────────────────────────────── + +def optimize(stem: str, project_root: Path, force: bool = False) -> bool: + """Esegue Stage 1: _content_list_v2.json + _model.json → _clean.md. + + Restituisce True se il file è stato prodotto (o era già presente e + force=False), False in caso di errore. + """ + auto_dir = project_root / "sources" / stem / "auto" + json_path = auto_dir / f"{stem}_content_list_v2.json" + model_path = auto_dir / f"{stem}_model.json" + out_path = auto_dir / f"{stem}_clean.md" + + print(f"\n[Stage 1] Documento: {stem}") + + if not json_path.exists(): + print(f" ✗ {json_path.name} non trovato") + return False + + if out_path.exists() and not force: + print(f" ↩ {out_path.name} già presente — skip ottimizzazione") + return True + + pages = json.loads(json_path.read_text(encoding="utf-8")) + + if model_path.exists(): + label_map = _load_label_map(model_path) + n_labels = sum(len(v) for v in label_map.values()) + print(f" 📐 {model_path.name} ({n_labels} label)") + else: + label_map = {} + print(f" ℹ️ {model_path.name} non trovato — nessun enrichment layout") + + blocks = _build_blocks(pages, label_map) + n_raw = len(blocks) + blocks = filter_blocks(blocks) + n_filtered = n_raw - len(blocks) + + md = _render(blocks) + out_path.write_text(md, encoding="utf-8") + + n_h1 = len(re.findall(r"^# ", md, re.MULTILINE)) + n_h2 = len(re.findall(r"^## ", md, re.MULTILINE)) + n_h3 = len(re.findall(r"^### ", md, re.MULTILINE)) + print(f" ✅ {out_path.name} " + f"({md.count(chr(10))} righe — H1={n_h1} H2={n_h2} H3={n_h3} " + f"rimossi={n_filtered}/{n_raw})") + return True + + +# ─── Entry point standalone ─────────────────────────────────────────────────── + +if __name__ == "__main__": + project_root = Path(__file__).parent.parent + + parser = argparse.ArgumentParser( + description="Stage 1: _content_list_v2.json + _model.json → _clean.md" + ) + parser.add_argument("--stem", help="Nome documento (sottocartella di sources/)") + parser.add_argument("--force", action="store_true", + help="Rigenera anche se _clean.md esiste già") + args = parser.parse_args() + + if args.stem: + stems = [args.stem] + else: + sources_dir = project_root / "sources" + stems = sorted( + p.name for p in sources_dir.iterdir() + if p.is_dir() + and (p / "auto" / f"{p.name}_content_list_v2.json").exists() + ) + if not stems: + print("Errore: nessun documento MinerU trovato in sources/") + sys.exit(1) + + results = [optimize(s, project_root, args.force) for s in stems] + ok = sum(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{len(results)} documenti processati") + sys.exit(0 if all(results) else 1)