#!/usr/bin/env python3 """ Stage 1 — Ottimizzatore Markdown (modulo interno, chiamato da chunker.py) Legge _content_list_v2.json (struttura primaria) e _model.json (label di layout) di MinerU e produce un Markdown pulito con gerarchia H1/H2/H3. Progettato per essere generico rispetto al documento: sfrutta la struttura comune di tutti gli output MinerU senza dipendere da pattern testuali specifici del documento sorgente. Logica di costruzione blocchi: - title L1 consecutivi senza contenuto tra loro → fusi in un H1 unico (il primo frammento è sempre il numero/identificatore del capitolo) - title L1 singolo → H1 - title L2 → H2 - paragraph con label "abstract" o che matcha SOMMARIO_PATTERNS → skip - paragraph breve che matcha H3_DETECTION_RE → H3 - paragraph normale → testo - label MODEL_SKIP_LABELS → skip Filtri di pulizia: - _remove_frontmatter : rimuove sezioni per nome (FRONTMATTER_HEADINGS) - _remove_toc_runs : rimuove sequenze di heading senza contenuto (TOC) - _remove_frontespizio : rimuove contenuto prima del primo heading "vero" (>= MIN_CONTENT_CHARS di testo reale) Input: sources//auto/_content_list_v2.json sources//auto/_model.json (opzionale) Output: sources//auto/_clean.md Uso standalone: python chunks/md_optimizer.py --stem [--force] python chunks/md_optimizer.py # tutti gli stem in sources/ """ import argparse import json import re import sys from dataclasses import dataclass from pathlib import Path _HERE = Path(__file__).resolve().parent if str(_HERE) not in sys.path: sys.path.insert(0, str(_HERE)) import config as cfg # ─── Struttura dati interna ─────────────────────────────────────────────────── @dataclass class Block: kind: str # "h1" | "h2" | "h3" | "text" | "list" | "table" text: str _HEADING_LEVEL = {"h1": 1, "h2": 2, "h3": 3} # Pattern compilati da config (inizializzati lazy per permettere hot-reload in test) _SOMMARIO_RES: list[re.Pattern] = [] _CHAPTER_PREFIX_RES: list[re.Pattern] = [] def _init_patterns() -> None: global _SOMMARIO_RES, _CHAPTER_PREFIX_RES _SOMMARIO_RES = [re.compile(p, re.IGNORECASE) for p in cfg.SOMMARIO_PATTERNS] _CHAPTER_PREFIX_RES = [re.compile(p, re.IGNORECASE) for p in cfg.CHAPTER_PREFIX_PATTERNS] _init_patterns() def _is_sommario(text: str) -> bool: return any(r.match(text) for r in _SOMMARIO_RES) def _is_chapter_prefix(text: str) -> bool: """True se il testo è un identificatore di capitolo (es. "CAPITOLO 1"). Usato come fallback quando MinerU produce il numero del capitolo come paragraph anziché come title L1. """ return any(r.match(text) for r in _CHAPTER_PREFIX_RES) # ─── Caricamento e indicizzazione _model.json ───────────────────────────────── def _load_label_map(model_path: Path) -> dict[int, list[tuple[float, float, str]]]: """Restituisce {page_idx: [(cx_v2, cy_v2, label), ...]} Le coordinate cx/cy sono nel sistema di riferimento v2: v2_coord = model_coord * 1000 / model_page_dim """ if not model_path.exists(): return {} pages = json.loads(model_path.read_text(encoding="utf-8")) label_map: dict[int, list[tuple[float, float, str]]] = {} for page in pages: info = page.get("page_info", {}) page_no = info.get("page_no", 0) pw = info.get("width", 1350) ph = info.get("height", 1891) entries: list[tuple[float, float, str]] = [] for det in page.get("layout_dets", []): label = det.get("label", "") if label in cfg.MODEL_SKIP_LABELS: continue x0, y0, x1, y1 = det["bbox"] cx = (x0 + x1) * 0.5 * 1000.0 / pw cy = (y0 + y1) * 0.5 * 1000.0 / ph entries.append((cx, cy, label)) label_map[page_no] = entries return label_map def _get_label(page_idx: int, bbox: list[int], label_map: dict[int, list]) -> str: """Restituisce il label model.json il cui centro è più vicino al centro del bbox v2 (tolleranza 80 unità v2 ≈ 8% della larghezza pagina).""" entries = label_map.get(page_idx) if not entries: return "" x0, y0, x1, y1 = bbox cx = (x0 + x1) * 0.5 cy = (y0 + y1) * 0.5 best_label = "" best_dist = 80.0 for ex, ey, label in entries: dist = ((cx - ex) ** 2 + (cy - ey) ** 2) ** 0.5 if dist < best_dist: best_dist = dist best_label = label return best_label # ─── Estrazione testo dai blocchi MinerU ────────────────────────────────────── def _text_para(content: dict) -> str: return " ".join( p["content"] for p in content.get("paragraph_content", []) if p.get("type") == "text" ).strip() def _text_title(content: dict) -> str: return " ".join( p["content"] for p in content.get("title_content", []) if p.get("type") == "text" ).strip() def _text_list(content: dict) -> str: lines = [] for item in content.get("list_content", []): for block in item.get("blocks", []): t = block.get("content", "").strip() if t: lines.append(f"- {t}") return "\n".join(lines) def _is_h3_candidate(text: str) -> bool: return ( len(text) <= cfg.H3_MAX_CHARS and bool(re.match(cfg.H3_DETECTION_RE, text)) ) # ─── Build blocchi da JSON MinerU ───────────────────────────────────────────── def _build_blocks(pages: list, label_map: dict) -> list[Block]: """Costruisce la lista di Block dalla struttura MinerU. Logica per i titoli H1 consecutivi (generica, senza pattern lingua-specifica): - Ogni title L1 viene bufferizzato come "pending_h1". - Se arriva un altro title L1 subito dopo (senza contenuto tra loro), i due frammenti vengono fusi in un unico H1 con " — " come separatore. Questo gestisce il pattern comune di MinerU dove il numero/identificatore del capitolo e il suo titolo sono due blocchi separati. - Quando arriva contenuto non-titolo (paragrafo, lista, H2), il pending_h1 viene emesso così com'è. """ blocks: list[Block] = [] pending_h1: str = "" # titolo L1 in attesa di conferma/merge def _flush_h1() -> None: nonlocal pending_h1 if pending_h1: blocks.append(Block(kind="h1", text=pending_h1)) pending_h1 = "" for page_idx, page in enumerate(pages): for item in page: kind = item.get("type", "") content = item.get("content", {}) bbox = item.get("bbox", [0, 0, 0, 0]) # ── Tipi MinerU rumorosi ───────────────────────────────────────── if kind in cfg.NOISE_TYPES: _flush_h1() continue model_label = _get_label(page_idx, bbox, label_map) # ── Label model rumorosi ───────────────────────────────────────── if model_label in cfg.MODEL_SKIP_LABELS: continue # ── Sommari interni (abstract label o pattern testuale) ────────── if model_label in cfg.MODEL_ABSTRACT_LABELS: continue # ── Titoli ─────────────────────────────────────────────────────── if kind == "title": text = _text_title(content) if not text: continue level = min(content.get("level", 2), 3) if level == 1: if pending_h1: # Due title L1 consecutivi: fondi il precedente col corrente merged = f"{pending_h1} — {text}" pending_h1 = merged else: pending_h1 = text else: # H2: emetti prima il pending H1 se esiste _flush_h1() blocks.append(Block(kind="h2", text=text)) # ── Paragrafi ──────────────────────────────────────────────────── elif kind == "paragraph": text = _text_para(content) if not text: continue # Sommario interno: salta (fallback testuale se label non copre) if _is_sommario(text): continue # Prefisso di capitolo come paragraph (es. "CAPITOLO 1"): # bufferizza come pending H1, verrà fuso col titolo L1 successivo if _is_chapter_prefix(text): if pending_h1: pending_h1 = f"{pending_h1} — {text}" else: pending_h1 = text continue _flush_h1() if _is_h3_candidate(text): blocks.append(Block(kind="h3", text=text)) else: blocks.append(Block(kind="text", text=text)) # ── Liste ──────────────────────────────────────────────────────── elif kind == "list": _flush_h1() text = _text_list(content) if text: blocks.append(Block(kind="list", text=text)) # ── Tabelle ────────────────────────────────────────────────────── elif kind == "table": _flush_h1() body = content.get("table_body", "") if body: blocks.append(Block(kind="table", text=body)) # ── Immagini (opzionale) ───────────────────────────────────────── elif kind == "image" and not cfg.SKIP_IMAGES: _flush_h1() src = content.get("image_source", {}).get("path", "") caption = " ".join( c.get("content", "") for c in content.get("image_caption", []) ).strip() if src: blocks.append(Block(kind="text", text=f"![{caption}]({src})")) else: _flush_h1() _flush_h1() # flush finale return blocks # ─── Helpers content check ──────────────────────────────────────────────────── def _has_content(blocks: list[Block], idx: int) -> bool: """True se esiste almeno un blocco testo/lista/tabella prima del prossimo heading di livello uguale o superiore.""" level = _HEADING_LEVEL.get(blocks[idx].kind) if level is None: return False for b in blocks[idx + 1:]: blevel = _HEADING_LEVEL.get(b.kind) if blevel is not None and blevel <= level: return False if b.kind in ("text", "list", "table"): return True return False def _has_real_content(blocks: list[Block], idx: int) -> bool: """True se il totale caratteri di testo sotto questo heading >= MIN_CONTENT_CHARS. Permette di distinguere frontespizi (copyright breve) da sezioni con contenuto vero.""" level = _HEADING_LEVEL.get(blocks[idx].kind) if level is None: return False total = 0 for b in blocks[idx + 1:]: blevel = _HEADING_LEVEL.get(b.kind) if blevel is not None and blevel <= level: break if b.kind in ("text", "list", "table"): total += len(b.text) if total >= cfg.MIN_CONTENT_CHARS: return True return False # ─── Filtri di pulizia ──────────────────────────────────────────────────────── def _remove_frontmatter(blocks: list[Block]) -> list[Block]: """Rimuove le sezioni il cui heading è in FRONTMATTER_HEADINGS, insieme a tutto il loro contenuto. Il salto continua finché non si trova un heading non-frontmatter — questo elimina anche sezioni TOC consecutive in un colpo solo. """ def _norm(text: str) -> str: t = text.strip().lower() # Rimuovi eventuale prefisso "Xxx N — " (identificatore capitolo) return re.sub(r"^\S+\s+\S+\s+[—\-]\s*", "", t) def _is_fm(text: str) -> bool: core = _norm(text) return any( core == fm or core.startswith(fm + " ") for fm in cfg.FRONTMATTER_HEADINGS ) if not cfg.FRONTMATTER_HEADINGS: return blocks result: list[Block] = [] i = 0 while i < len(blocks): b = blocks[i] if b.kind in _HEADING_LEVEL and _is_fm(b.text): level = _HEADING_LEVEL[b.kind] i += 1 while i < len(blocks): nxt = blocks[i] nxt_level = _HEADING_LEVEL.get(nxt.kind) if nxt_level is not None and nxt_level <= level and not _is_fm(nxt.text): break i += 1 continue result.append(b) i += 1 return result def _remove_toc_runs(blocks: list[Block]) -> list[Block]: """Rimuove sequenze di MIN_TOC_HEADINGS o più heading consecutivi senza testo reale tra loro (TOC residuo). "Consecutivi" tolera micro-testi brevi (≤ 120 chars) intercalati tra i heading (es. attribuzioni autori nel TOC). """ def _is_toc_entry(idx: int) -> bool: b = blocks[idx] if b.kind not in _HEADING_LEVEL: return False level = _HEADING_LEVEL[b.kind] for b2 in blocks[idx + 1:]: blevel = _HEADING_LEVEL.get(b2.kind) if blevel is not None and blevel <= level: return True if b2.kind in ("text", "list", "table") and len(b2.text) > 20: return False return True result: list[Block] = [] i = 0 while i < len(blocks): b = blocks[i] if b.kind in _HEADING_LEVEL and _is_toc_entry(i): j = i + 1 toc_count = 1 while j < len(blocks): bj = blocks[j] if bj.kind in _HEADING_LEVEL: if _is_toc_entry(j): toc_count += 1 j += 1 continue else: break if bj.kind in ("text", "list") and len(bj.text) <= 120: j += 1 continue break if toc_count >= cfg.MIN_TOC_HEADINGS: i = j continue result.append(b) i += 1 return result def _remove_frontespizio(blocks: list[Block]) -> list[Block]: """Rimuove tutto il contenuto prima del primo heading con contenuto reale (>= MIN_CONTENT_CHARS): copertine, copyright, pagine iniziali.""" for i, b in enumerate(blocks): if b.kind in _HEADING_LEVEL and _has_real_content(blocks, i): return blocks[i:] return blocks def filter_blocks(blocks: list[Block]) -> list[Block]: blocks = _remove_frontmatter(blocks) blocks = _remove_toc_runs(blocks) blocks = _remove_frontespizio(blocks) return blocks # ─── Rendering ──────────────────────────────────────────────────────────────── def _render(blocks: list[Block]) -> str: lines: list[str] = [] prev_was_heading = False for b in blocks: if b.kind in ("h1", "h2", "h3"): prefix = "#" * _HEADING_LEVEL[b.kind] if lines and not prev_was_heading: lines.append("") lines.append(f"{prefix} {b.text}") prev_was_heading = True else: lines.append("") lines.append(b.text) prev_was_heading = False md = "\n".join(lines).strip() + "\n" return re.sub(r"\n{3,}", "\n\n", md) # ─── Core ───────────────────────────────────────────────────────────────────── def optimize(stem: str, project_root: Path, force: bool = False) -> bool: """Esegue Stage 1: _content_list_v2.json + _model.json → _clean.md. Restituisce True se il file è stato prodotto (o era già presente e force=False), False in caso di errore. """ auto_dir = project_root / "sources" / stem / "auto" json_path = auto_dir / f"{stem}_content_list_v2.json" model_path = auto_dir / f"{stem}_model.json" out_path = auto_dir / f"{stem}_clean.md" print(f"\n[Stage 1] Documento: {stem}") if not json_path.exists(): print(f" ✗ {json_path.name} non trovato") return False if out_path.exists() and not force: print(f" ↩ {out_path.name} già presente — skip ottimizzazione") return True pages = json.loads(json_path.read_text(encoding="utf-8")) if model_path.exists(): label_map = _load_label_map(model_path) n_labels = sum(len(v) for v in label_map.values()) print(f" 📐 {model_path.name} ({n_labels} label)") else: label_map = {} print(f" ℹ️ {model_path.name} non trovato — nessun enrichment layout") blocks = _build_blocks(pages, label_map) n_raw = len(blocks) blocks = filter_blocks(blocks) n_filtered = n_raw - len(blocks) md = _render(blocks) out_path.write_text(md, encoding="utf-8") n_h1 = len(re.findall(r"^# ", md, re.MULTILINE)) n_h2 = len(re.findall(r"^## ", md, re.MULTILINE)) n_h3 = len(re.findall(r"^### ", md, re.MULTILINE)) print(f" ✅ {out_path.name} " f"({md.count(chr(10))} righe — H1={n_h1} H2={n_h2} H3={n_h3} " f"rimossi={n_filtered}/{n_raw})") return True # ─── Entry point standalone ─────────────────────────────────────────────────── if __name__ == "__main__": project_root = Path(__file__).parent.parent parser = argparse.ArgumentParser( description="Stage 1: _content_list_v2.json + _model.json → _clean.md" ) parser.add_argument("--stem", help="Nome documento (sottocartella di sources/)") parser.add_argument("--force", action="store_true", help="Rigenera anche se _clean.md esiste già") args = parser.parse_args() if args.stem: stems = [args.stem] else: sources_dir = project_root / "sources" stems = sorted( p.name for p in sources_dir.iterdir() if p.is_dir() and (p / "auto" / f"{p.name}_content_list_v2.json").exists() ) if not stems: print("Errore: nessun documento MinerU trovato in sources/") sys.exit(1) results = [optimize(s, project_root, args.force) for s in stems] ok = sum(results) print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{len(results)} documenti processati") sys.exit(0 if all(results) else 1)