#!/usr/bin/env python3 """ Verifica chunk Analizza chunks//chunks.json e segnala ogni anomalia che potrebbe degradare la qualità del retrieval. Non modifica nulla. Input: chunks//chunks.json Output: report a schermo + chunks//report.json + exit code (0 = OK, 1 = problemi) Uso: python chunks/verify_chunks.py --stem documento python chunks/verify_chunks.py # tutti i documenti in chunks/ python chunks/verify_chunks.py --min 200 --max 800 """ import argparse import json import re import sys from pathlib import Path _HERE = Path(__file__).resolve().parent if str(_HERE) not in sys.path: sys.path.insert(0, str(_HERE)) import config as cfg # ─── Soglie (derivate dal target, sovrascrivibili da CLI) ──────────────────── MIN_CHARS = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) PUNCT_END = re.compile( r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$" r"|/$" # URL che finisce con / r"|\|$" # riga di tabella Markdown r"|:$" # introduzione a lista o formula ) _HEX_END = re.compile(r"[0-9a-fA-F]{8,}$") _URL_TAIL = re.compile(r"https?://\S+(\s+\S+){0,3}$") # URL con fino a 3 token extra _MATH_SYMS = re.compile(r"[∈∑≤≥≠∀∃∫√∞∂±×÷→←↔⊂⊃⊆⊇∩∪·°]") # ─── Checks ─────────────────────────────────────────────────────────────────── def has_prefix(chunk: dict) -> bool: return chunk.get("text", "").lstrip().startswith("[") def is_empty(chunk: dict) -> bool: return not chunk.get("text", "").strip() def is_too_short(chunk: dict, min_chars: int) -> bool: return chunk.get("n_chars", 0) < min_chars def is_too_long(chunk: dict, max_chars: int) -> bool: return chunk.get("n_chars", 0) > max_chars def ends_incomplete(chunk: dict) -> bool: text = chunk.get("text", "").rstrip() if not text: return False text_check = re.sub(r"[_*]+$", "", text).rstrip() if not text_check: return False if PUNCT_END.search(text_check): return False if _HEX_END.search(text_check): # hash SHA / codice hex return False if _URL_TAIL.search(text_check[-200:]): # URL (con eventuale path dopo spazio) return False return True def is_math_incomplete(chunk: dict) -> bool: """Incompleto ma in contesto matematico — degrada a warning invece di blocker.""" return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= cfg.MATH_SYMS_MIN # ─── Report ─────────────────────────────────────────────────────────────────── def _fmt_chunk(c: dict) -> str: cid = c.get("chunk_id", "?") n = c.get("n_chars", 0) preview = c.get("text", "")[:60].replace("\n", " ") return f" [{cid}] ({n} char) «{preview}»" def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool: chunks_path = project_root / "chunks" / stem / "chunks.json" print(f"\nDocumento: {stem}") if not chunks_path.exists(): print(f" ✗ chunks/{stem}/chunks.json non trovato") print(f" Esegui prima: python chunks/chunker.py --stem {stem}") return False chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8")) if not chunks: print(f" ✗ chunks.json è vuoto") return False # ── Raccogli problemi ────────────────────────────────────────────────────── empty_chunks = [c for c in chunks if is_empty(c)] no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)] too_short = [c for c in chunks if is_too_short(c, min_chars)] too_long = [c for c in chunks if is_too_long(c, max_chars)] _incomplete_all = [c for c in chunks if not is_empty(c) and ends_incomplete(c)] incomplete_math = [c for c in _incomplete_all if is_math_incomplete(c)] incomplete = [c for c in _incomplete_all if not is_math_incomplete(c)] # ── Statistiche ─────────────────────────────────────────────────────────── lengths = [c.get("n_chars", 0) for c in chunks] n_total = len(chunks) n_ok = n_total - len(set( c["chunk_id"] for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete] for c in lst )) min_l = min(lengths) max_l = max(lengths) avg_l = int(sum(lengths) / n_total) n_under = sum(1 for l in lengths if l < min_chars) n_normal = sum(1 for l in lengths if min_chars <= l <= max_chars) n_over = sum(1 for l in lengths if l > max_chars) # ── Output ──────────────────────────────────────────────────────────────── print(f" Totale chunk: {n_total}") print(f" ✅ OK: {n_ok}") print() print(f" Distribuzione lunghezze:") print(f" Min: {min_l} char") print(f" Max: {max_l} char") print(f" Media: {avg_l} char") print(f" < {min_chars} char (sotto MIN): {n_under}") print(f" {min_chars}–{max_chars} char (ideale): {n_normal}") print(f" > {max_chars} char (sopra MAX): {n_over}") has_errors = False if empty_chunks: has_errors = True print(f"\n 🔴 {len(empty_chunks)} chunk VUOTI:") for c in empty_chunks[:5]: print(f" [{c.get('chunk_id', '?')}]") if len(empty_chunks) > 5: print(f" ... e altri {len(empty_chunks) - 5}") if no_prefix: has_errors = True print(f"\n 🔴 {len(no_prefix)} chunk SENZA PREFISSO DI CONTESTO:") for c in no_prefix[:5]: print(_fmt_chunk(c)) if len(no_prefix) > 5: print(f" ... e altri {len(no_prefix) - 5}") print(f" → Causa probabile: header ### mancanti o malformati nel MD") if too_short: has_errors = True print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):") for c in too_short[:5]: print(_fmt_chunk(c)) if len(too_short) > 5: print(f" ... e altri {len(too_short) - 5}") print(f" → Soluzione: abbassa MIN_CHARS o revisiona il MD") if too_long: has_errors = True print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX ({max_chars}):") for c in too_long[:5]: print(_fmt_chunk(c)) if len(too_long) > 5: print(f" ... e altri {len(too_long) - 5}") print(f" → Causa probabile: frasi singole lunghe (liste/paragrafi non suddivisibili)") if incomplete: has_errors = True print(f"\n 🔴 {len(incomplete)} chunk CHE FINISCONO SENZA PUNTEGGIATURA (frase spezzata):") for c in incomplete[:5]: last_line = c.get("text", "").rstrip().split("\n")[-1][-80:] print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}") if len(incomplete) > 5: print(f" ... e altri {len(incomplete) - 5}") print(f" → Soluzione: correggi le righe spezzate in conversione/{stem}/clean.md") if incomplete_math: has_errors = True print(f"\n 🟡 {len(incomplete_math)} chunk MATEMATICI SENZA PUNTEGGIATURA (formula/espressione):") for c in incomplete_math[:3]: last_line = c.get("text", "").rstrip().split("\n")[-1][-80:] print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}") if len(incomplete_math) > 3: print(f" ... e altri {len(incomplete_math) - 3}") print(f" → Le formule non finiscono con punteggiatura — avviso non bloccante") # ── Costruisci e salva report.json ──────────────────────────────────────── blockers = empty_chunks + no_prefix + incomplete warnings = too_short + too_long + incomplete_math def _chunk_entry(c: dict) -> dict: return { "chunk_id": c.get("chunk_id", ""), "sezione": c.get("sezione", ""), "titolo": c.get("titolo", ""), "n_chars": c.get("n_chars", 0), "last_text": c.get("text", "").rstrip().split("\n")[-1][-120:], } verdict = "ok" if not blockers else "blocked" if not blockers and warnings: verdict = "warnings_only" report = { "stem": stem, "verdict": verdict, "stats": { "total": n_total, "ok": n_ok, "min_chars": min_l, "max_chars": max_l, "avg_chars": avg_l, }, "thresholds": { "min_chars": min_chars, "max_chars": max_chars, "target_chars": cfg.TARGET_CHARS, "chunk_tolerance": cfg.CHUNK_TOLERANCE, }, "blockers": { "empty": [_chunk_entry(c) for c in empty_chunks], "no_prefix": [_chunk_entry(c) for c in no_prefix], "incomplete": [_chunk_entry(c) for c in incomplete], }, "warnings": { "too_short": [_chunk_entry(c) for c in too_short], "too_long": [_chunk_entry(c) for c in too_long], "incomplete_math": [_chunk_entry(c) for c in incomplete_math], }, } out_dir = project_root / "chunks" / stem out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "report.json").write_text( json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8" ) print(f"\n report.json salvato in chunks/{stem}/") # ── Prossimi passi ──────────────────────────────────────────────────────── print(f"\n {'─' * 50}") print(f" PROSSIMI PASSI") print(f" {'─' * 50}") if not blockers and not warnings: print(f" ✅ Tutto OK — procedi alla vettorizzazione:") print(f" python ingestion/ingest.py --stem {stem}") elif not blockers: print(f" 🟡 Solo avvisi minori — puoi procedere alla vettorizzazione:") print(f" python ingestion/ingest.py --stem {stem}") print() print(f" Oppure, per ottimizzare prima:") if too_short: pct = int(len(too_short) / n_total * 100) print(f" • {len(too_short)} chunk corti ({pct}% del totale)") if too_long: pct = int(len(too_long) / n_total * 100) print(f" • {len(too_long)} chunk lunghi ({pct}% del totale)") if too_short or too_long: print(f" → Esegui: python chunks/fix_chunks.py --stem {stem} --dry-run") print(f" poi: python chunks/fix_chunks.py --stem {stem}") print(f" poi: python chunks/verify_chunks.py --stem {stem}") else: print(f" 🔴 Problemi bloccanti — correggi prima di procedere:") print() if empty_chunks: print(f" • {len(empty_chunks)} chunk vuoti") print(f" → Controlla conversione/{stem}/clean.md per sezioni prive di testo") if no_prefix: print(f" • {len(no_prefix)} chunk senza prefisso di contesto") print(f" → Controlla che gli header ### siano corretti in conversione/{stem}/clean.md") if incomplete: print(f" • {len(incomplete)} chunk con frase spezzata") print(f" → Esegui: python chunks/fix_chunks.py --stem {stem}") print() print(f" Dopo le correzioni, riesegui nell'ordine:") print(f" python chunks/chunker.py --stem {stem} --force") print(f" python chunks/verify_chunks.py --stem {stem}") print() if warnings: print(f" 🟡 Hai anche {len(warnings)} avvisi minori — affrontali dopo aver risolto i 🔴.") return not blockers # ─── Entry point ────────────────────────────────────────────────────────────── if __name__ == "__main__": project_root = Path(__file__).parent.parent parser = argparse.ArgumentParser(description="Verifica chunk") parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)") _min_def = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) _max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) parser.add_argument( "--min", type=int, default=_min_def, help=f"Soglia minima caratteri (default: TARGET×(1-TOL) = {_min_def})" ) parser.add_argument( "--max", type=int, default=_max_def, help=f"Soglia massima caratteri (default: TARGET×(1+TOL) = {_max_def})" ) args = parser.parse_args() if args.stem: stems = [args.stem] else: chunks_dir = project_root / "chunks" if not chunks_dir.exists(): print(f"Errore: cartella chunks/ non trovata in {project_root}") sys.exit(1) stems = sorted( p.name for p in chunks_dir.iterdir() if p.is_dir() and (p / "chunks.json").exists() ) if not stems: print("Errore: nessun chunks.json trovato in chunks/") sys.exit(1) results = [verify_stem(s, project_root, args.min, args.max) for s in stems] ok = sum(results) total = len(results) print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi") sys.exit(0 if all(results) else 1)