diff --git a/chunks/verify_chunks.py b/chunks/verify_chunks.py index 22ec8e0..b20fac7 100644 --- a/chunks/verify_chunks.py +++ b/chunks/verify_chunks.py @@ -18,6 +18,7 @@ import argparse import json import re import sys +from collections import Counter from pathlib import Path _HERE = Path(__file__).resolve().parent @@ -26,41 +27,74 @@ if str(_HERE) not in sys.path: import config as cfg -# ─── Soglie (derivate dal target, sovrascrivibili da CLI) ──────────────────── +# ─── Soglie ─────────────────────────────────────────────────────────────────── + +MIN_CHARS = cfg.MIN_CHARS +MAX_CHARS = cfg.MAX_CHARS -MIN_CHARS = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) -MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) PUNCT_END = re.compile( - r"[.!?\xbb)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$" - r"|/$" # URL che finisce con / - r"|\|$" # riga di tabella Markdown - r"|;$" # fine clausola legale (testo giuridico) - r"|:$" # introduzione a lista o formula + r"[.!?\xbb)\]'’\"“”‘—–…]$" + r"|/$" # URL che finisce con / + r"|\|$" # riga di tabella Markdown + r"|;$" # fine clausola legale + r"|:$" # introduzione a lista o formula + r"|\d[\d.,/]*$" # numero, anno, versione, riferimento normativo ) -_HEX_END = re.compile(r"[0-9a-fA-F]{8,}$") -_URL_TAIL = re.compile(r"(https?://|www\.)\S+(\s+\S+){0,3}$") # URL con fino a 3 token extra -_MATH_SYMS = re.compile(r"[∈∑≤≥≠∀∃∫√∞∂±×÷→←↔⊂⊃⊆⊇∩∪·°]") -_ROMAN_END = re.compile(r"\b(I{1,3}|IV|VI{0,3}|IX|XI{0,2}|XIV|XV|XVI{0,2}|XIX|XX{0,2})$") +_HEX_END = re.compile(r"[0-9a-fA-F]{8,}$") +_URL_TAIL = re.compile(r"(https?://|www\.)\S+(\s+\S+){0,3}$") +_MATH_SYMS = re.compile(r"[∈∑≤≥≠∀∃∫√∞∂±×÷→←↔⊂⊃⊆⊇∩∪·°]") +_ROMAN_END = re.compile(r"\b(I{1,3}|IV|VI{0,3}|IX|XI{0,2}|XIV|XV|XVI{0,2}|XIX|XX{0,2})$") +_TABLE_SEP = re.compile(r"^\s*\|[\s\-|:]+\|\s*$") - -def _load_thresholds(stem_dir: "Path") -> "tuple[int, int]": - """Legge min/max da meta.json (scritto dal chunker) o usa i default da config.""" +def _load_thresholds(stem_dir: Path) -> tuple[int, int]: meta = stem_dir / "meta.json" if meta.exists(): - import json as _json - m = _json.loads(meta.read_text(encoding="utf-8")) + m = json.loads(meta.read_text(encoding="utf-8")) return m["min_chars"], m["max_chars"] return MIN_CHARS, MAX_CHARS + +def _strip_prefix(text: str) -> str: + text = text.lstrip() + if text.startswith("["): + end = text.find("]") + if end != -1: + return text[end + 1:].lstrip("\n") + return text + + # ─── Checks ─────────────────────────────────────────────────────────────────── +def is_empty(chunk: dict) -> bool: + return not chunk.get("text", "").strip() + + def has_prefix(chunk: dict) -> bool: return chunk.get("text", "").lstrip().startswith("[") -def is_empty(chunk: dict) -> bool: - return not chunk.get("text", "").strip() +def is_prefix_malformed(chunk: dict) -> bool: + """Inizia con [ ma il prefisso non chiude con ] o ha contenuto vuoto.""" + text = chunk.get("text", "").lstrip() + if not text.startswith("["): + return False + first_line = text.split("\n")[0] + end = first_line.find("]") + if end == -1: + return True + return len(first_line[1:end].strip()) == 0 + + +def is_body_empty(chunk: dict) -> bool: + """Prefisso valido ma nessun testo nel corpo.""" + text = chunk.get("text", "").lstrip() + if not text.startswith("["): + return False + end = text.find("]") + if end == -1: + return False + return len(text[end + 1:].strip()) == 0 def is_too_short(chunk: dict, min_chars: int) -> bool: @@ -80,21 +114,81 @@ def ends_incomplete(chunk: dict) -> bool: return False if PUNCT_END.search(text_check): return False - if _HEX_END.search(text_check): # hash SHA / codice hex + if _HEX_END.search(text_check): return False - if _ROMAN_END.search(text_check): # numero romano finale (indice/riferimento PDF) + if _ROMAN_END.search(text_check): return False - if _URL_TAIL.search(text_check[-200:]): # URL (con eventuale path dopo spazio) + if _URL_TAIL.search(text_check[-200:]): return False return True def is_math_incomplete(chunk: dict) -> bool: - """Incompleto ma in contesto matematico — degrada a warning invece di blocker.""" return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= cfg.MATH_SYMS_MIN -# ─── Report ─────────────────────────────────────────────────────────────────── +def is_table_broken(chunk: dict) -> bool: + """Tabella Markdown (≥2 righe con |) senza riga separatore |---|.""" + text = chunk.get("text", "") + pipe_lines = [l for l in text.splitlines() if "|" in l and l.strip().startswith("|")] + if len(pipe_lines) < 2: + return False + return not any(_TABLE_SEP.match(l) for l in pipe_lines) + + +def find_duplicate_bodies(chunks: list[dict]) -> list[dict]: + """Chunk con testo body identico (prefisso escluso). Ignora corpi < 30 char.""" + seen: dict[str, str] = {} + dupes = [] + for c in chunks: + body = _strip_prefix(c.get("text", "")).strip() + if len(body) < 30: + continue + cid = c["chunk_id"] + if body in seen: + dupes.append({ + "chunk_id": cid, + "duplicate_of": seen[body], + "sezione": c.get("sezione", ""), + "titolo": c.get("titolo", ""), + "n_chars": c.get("n_chars", 0), + "last_text": body[:120], + }) + else: + seen[body] = cid + return dupes + + +# ─── Istogramma ─────────────────────────────────────────────────────────────── + +def _ascii_histogram(lengths: list[int], min_t: int, max_t: int, + n_bins: int = 10, bar_width: int = 28) -> list[str]: + if not lengths: + return [] + lo, hi = min(lengths), max(lengths) + if lo == hi: + return [f" {lo:>5}–{hi:<5} │{'█' * bar_width}│ {len(lengths)}"] + step = (hi - lo) / n_bins + bins = [0] * n_bins + for l in lengths: + idx = min(int((l - lo) / step), n_bins - 1) + bins[idx] += 1 + max_count = max(bins) or 1 + lines = [] + for i, count in enumerate(bins): + lo_b = int(lo + i * step) + hi_b = int(lo + (i + 1) * step) + bar = "█" * round(count / max_count * bar_width) + note = "" + if lo_b <= min_t < hi_b: + note = " ← MIN" + elif lo_b <= max_t < hi_b: + note = " ← MAX" + lines.append(f" {lo_b:>5}–{hi_b:<5} │{bar:<{bar_width}}│ {count}{note}") + return lines + + +# ─── Helpers output ─────────────────────────────────────────────────────────── def _fmt_chunk(c: dict) -> str: cid = c.get("chunk_id", "?") @@ -103,6 +197,25 @@ def _fmt_chunk(c: dict) -> str: return f" [{cid}] ({n} char) «{preview}»" +def _chunk_entry(c: dict) -> dict: + return { + "chunk_id": c.get("chunk_id", ""), + "sezione": c.get("sezione", ""), + "titolo": c.get("titolo", ""), + "n_chars": c.get("n_chars", 0), + "last_text": c.get("text", "").rstrip().split("\n")[-1][-120:], + } + + +def _print_list(items: list[dict], limit: int = 5) -> None: + for c in items[:limit]: + print(_fmt_chunk(c)) + if len(items) > limit: + print(f" ... e altri {len(items) - limit}") + + +# ─── Core ───────────────────────────────────────────────────────────────────── + def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool: stem_dir = project_root / "chunks" / stem chunks_path = stem_dir / "chunks.json" @@ -123,48 +236,60 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - # ── Raccogli problemi ────────────────────────────────────────────────────── - empty_chunks = [c for c in chunks if is_empty(c)] - no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)] - too_short = [c for c in chunks if is_too_short(c, min_chars)] - too_long = [c for c in chunks if is_too_long(c, max_chars)] - _incomplete_all = [c for c in chunks if not is_empty(c) and ends_incomplete(c)] - incomplete_math = [c for c in _incomplete_all if is_math_incomplete(c)] - incomplete = [c for c in _incomplete_all if not is_math_incomplete(c)] + empty_chunks = [c for c in chunks if is_empty(c)] + no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)] + malformed_prefix = [c for c in chunks + if not is_empty(c) and has_prefix(c) and is_prefix_malformed(c)] + body_empty = [c for c in chunks + if not is_empty(c) and has_prefix(c) + and not is_prefix_malformed(c) and is_body_empty(c)] + too_short = [c for c in chunks if is_too_short(c, min_chars)] + too_long = [c for c in chunks if is_too_long(c, max_chars)] + _incomplete_all = [c for c in chunks if not is_empty(c) and ends_incomplete(c)] + incomplete_math = [c for c in _incomplete_all if is_math_incomplete(c)] + incomplete = [c for c in _incomplete_all if not is_math_incomplete(c)] + broken_tables = [c for c in chunks if is_table_broken(c)] + duplicates = find_duplicate_bodies(chunks) # ── Statistiche ─────────────────────────────────────────────────────────── - lengths = [c.get("n_chars", 0) for c in chunks] - n_total = len(chunks) - n_ok = n_total - len(set( + lengths = [c.get("n_chars", 0) for c in chunks] + n_total = len(chunks) + blocker_ids = set( c["chunk_id"] - for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete] + for lst in [empty_chunks, no_prefix, malformed_prefix, body_empty, incomplete] for c in lst - )) - min_l = min(lengths) - max_l = max(lengths) - avg_l = int(sum(lengths) / n_total) + ) + n_ok = n_total - len(blocker_ids) + min_l = min(lengths) + max_l = max(lengths) + avg_l = int(sum(lengths) / n_total) + p50 = sorted(lengths)[n_total // 2] + n_under = sum(1 for l in lengths if l < min_chars) + n_norm = sum(1 for l in lengths if min_chars <= l <= max_chars) + n_over = sum(1 for l in lengths if l > max_chars) - n_under = sum(1 for l in lengths if l < min_chars) - n_normal = sum(1 for l in lengths if min_chars <= l <= max_chars) - n_over = sum(1 for l in lengths if l > max_chars) + section_counts = Counter(c.get("sezione", "—") or "—" for c in chunks) - # ── Output ──────────────────────────────────────────────────────────────── + # ── Output statistiche ──────────────────────────────────────────────────── - print(f" Totale chunk: {n_total}") - print(f" ✅ OK: {n_ok}") + print(f" Totale: {n_total} | ✅ OK: {n_ok}") print() - print(f" Distribuzione lunghezze:") - print(f" Min: {min_l} char") - print(f" Max: {max_l} char") - print(f" Media: {avg_l} char") - print(f" < {min_chars} char (sotto MIN): {n_under}") - print(f" {min_chars}–{max_chars} char (ideale): {n_normal}") - print(f" > {max_chars} char (sopra MAX): {n_over}") + print(f" Lunghezze — min {min_l} p50 {p50} media {avg_l} max {max_l}") + print(f" Fasce — <{min_chars}: {n_under} | {min_chars}–{max_chars}: {n_norm} | >{max_chars}: {n_over}") + print() + print(" Istogramma:") + for line in _ascii_histogram(lengths, min_chars, max_chars): + print(line) + print() + print(" Top sezioni:") + for sezione, count in section_counts.most_common(5): + bar = "▪" * min(count, 35) + print(f" {bar} {count:>4} {sezione[:65]}") - has_errors = False + # ── Blockers ────────────────────────────────────────────────────────────── if empty_chunks: - has_errors = True print(f"\n 🔴 {len(empty_chunks)} chunk VUOTI:") for c in empty_chunks[:5]: print(f" [{c.get('chunk_id', '?')}]") @@ -172,95 +297,102 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - print(f" ... e altri {len(empty_chunks) - 5}") if no_prefix: - has_errors = True print(f"\n 🔴 {len(no_prefix)} chunk SENZA PREFISSO DI CONTESTO:") - for c in no_prefix[:5]: - print(_fmt_chunk(c)) - if len(no_prefix) > 5: - print(f" ... e altri {len(no_prefix) - 5}") - print(f" → Causa probabile: header ### mancanti o malformati nel MD") + _print_list(no_prefix) + print(f" → Causa probabile: heading mancanti nel clean.md") - if too_short: - has_errors = True - print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):") - for c in too_short[:5]: - print(_fmt_chunk(c)) - if len(too_short) > 5: - print(f" ... e altri {len(too_short) - 5}") - print(f" → Soluzione: abbassa MIN_CHARS o revisiona il MD") + if malformed_prefix: + print(f"\n 🔴 {len(malformed_prefix)} chunk con PREFISSO MALFORMATO ([ senza ] o vuoto):") + _print_list(malformed_prefix) + print(f" → Causa probabile: heading con caratteri speciali nel clean.md") - if too_long: - has_errors = True - print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX ({max_chars}):") - for c in too_long[:5]: - print(_fmt_chunk(c)) - if len(too_long) > 5: - print(f" ... e altri {len(too_long) - 5}") - print(f" → Causa probabile: frasi singole lunghe (liste/paragrafi non suddivisibili)") + if body_empty: + print(f"\n 🔴 {len(body_empty)} chunk con CORPO VUOTO (solo prefisso):") + _print_list(body_empty) + print(f" → Causa probabile: sezioni senza testo nel clean.md") if incomplete: - has_errors = True - print(f"\n 🔴 {len(incomplete)} chunk CHE FINISCONO SENZA PUNTEGGIATURA (frase spezzata):") + print(f"\n 🔴 {len(incomplete)} chunk con FRASE SPEZZATA:") for c in incomplete[:5]: last_line = c.get("text", "").rstrip().split("\n")[-1][-80:] print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}") if len(incomplete) > 5: print(f" ... e altri {len(incomplete) - 5}") - print(f" → Soluzione: correggi le righe spezzate in conversione/{stem}/clean.md") + print(f" → Soluzione: python chunks/fix_chunks.py --stem {stem}") + + # ── Warnings ────────────────────────────────────────────────────────────── + + if too_short: + print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):") + _print_list(too_short) + + if too_long: + print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX ({max_chars}):") + _print_list(too_long) + print(f" → Causa: frasi non suddivisibili o blocchi atomici (tabelle/liste)") if incomplete_math: - has_errors = True - print(f"\n 🟡 {len(incomplete_math)} chunk MATEMATICI SENZA PUNTEGGIATURA (formula/espressione):") + print(f"\n 🟡 {len(incomplete_math)} chunk MATEMATICI senza punteggiatura finale:") for c in incomplete_math[:3]: last_line = c.get("text", "").rstrip().split("\n")[-1][-80:] print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}") if len(incomplete_math) > 3: print(f" ... e altri {len(incomplete_math) - 3}") - print(f" → Le formule non finiscono con punteggiatura — avviso non bloccante") - # ── Costruisci e salva report.json ──────────────────────────────────────── + if broken_tables: + print(f"\n 🟡 {len(broken_tables)} TABELLE senza riga separatore |---|:") + _print_list(broken_tables, limit=3) + print(f" → Le tabelle potrebbero non renderizzarsi nel retrieval") - blockers = empty_chunks + no_prefix + incomplete - warnings = too_short + too_long + incomplete_math + if duplicates: + print(f"\n 🟡 {len(duplicates)} DUPLICATI (corpo identico):") + for e in duplicates[:5]: + print(f" [{e['chunk_id']}] ≡ [{e['duplicate_of']}] «{e['last_text'][:60]}»") + if len(duplicates) > 5: + print(f" ... e altri {len(duplicates) - 5}") + print(f" → Causa probabile: fix_chunks merge multipli o sezioni ripetute") - def _chunk_entry(c: dict) -> dict: - return { - "chunk_id": c.get("chunk_id", ""), - "sezione": c.get("sezione", ""), - "titolo": c.get("titolo", ""), - "n_chars": c.get("n_chars", 0), - "last_text": c.get("text", "").rstrip().split("\n")[-1][-120:], - } + # ── Report.json ─────────────────────────────────────────────────────────── - verdict = "ok" if not blockers else "blocked" - if not blockers and warnings: - verdict = "warnings_only" + blockers = empty_chunks + no_prefix + malformed_prefix + body_empty + incomplete + warnings = too_short + too_long + incomplete_math + broken_tables + + verdict = "blocked" if blockers else ("warnings_only" if (warnings or duplicates) else "ok") report = { "stem": stem, "verdict": verdict, "stats": { - "total": n_total, - "ok": n_ok, + "total": n_total, + "ok": n_ok, "min_chars": min_l, "max_chars": max_l, "avg_chars": avg_l, + "p50_chars": p50, + "under_min": n_under, + "in_range": n_norm, + "over_max": n_over, + "sections": [{"sezione": s, "n_chunks": n} + for s, n in section_counts.most_common()], }, "thresholds": { - "min_chars": min_chars, - "max_chars": max_chars, - "target_chars": cfg.TARGET_CHARS, - "chunk_tolerance": cfg.CHUNK_TOLERANCE, + "min_chars": min_chars, + "max_chars": max_chars, + "target_chars": cfg.MAX_CHARS, }, "blockers": { - "empty": [_chunk_entry(c) for c in empty_chunks], - "no_prefix": [_chunk_entry(c) for c in no_prefix], - "incomplete": [_chunk_entry(c) for c in incomplete], + "empty": [_chunk_entry(c) for c in empty_chunks], + "no_prefix": [_chunk_entry(c) for c in no_prefix], + "malformed_prefix": [_chunk_entry(c) for c in malformed_prefix], + "body_empty": [_chunk_entry(c) for c in body_empty], + "incomplete": [_chunk_entry(c) for c in incomplete], }, "warnings": { "too_short": [_chunk_entry(c) for c in too_short], "too_long": [_chunk_entry(c) for c in too_long], "incomplete_math": [_chunk_entry(c) for c in incomplete_math], + "broken_tables": [_chunk_entry(c) for c in broken_tables], + "duplicate_bodies": duplicates, }, } @@ -269,53 +401,42 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - (out_dir / "report.json").write_text( json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8" ) - print(f"\n report.json salvato in chunks/{stem}/") + print(f"\n report.json → chunks/{stem}/") # ── Prossimi passi ──────────────────────────────────────────────────────── print(f"\n {'─' * 50}") - print(f" PROSSIMI PASSI") + print(f" Verdict: {verdict.upper()}") print(f" {'─' * 50}") - if not blockers and not warnings: + if verdict == "ok": print(f" ✅ Tutto OK — procedi alla vettorizzazione:") print(f" python ingestion/ingest.py --stem {stem}") - elif not blockers: - print(f" 🟡 Solo avvisi minori — puoi procedere alla vettorizzazione:") + elif verdict == "warnings_only": + print(f" 🟡 Solo avvisi — puoi procedere alla vettorizzazione:") print(f" python ingestion/ingest.py --stem {stem}") - print() - print(f" Oppure, per ottimizzare prima:") - if too_short: - pct = int(len(too_short) / n_total * 100) - print(f" • {len(too_short)} chunk corti ({pct}% del totale)") - if too_long: - pct = int(len(too_long) / n_total * 100) - print(f" • {len(too_long)} chunk lunghi ({pct}% del totale)") if too_short or too_long: - print(f" → Esegui: python chunks/fix_chunks.py --stem {stem} --dry-run") - print(f" poi: python chunks/fix_chunks.py --stem {stem}") - print(f" poi: python chunks/verify_chunks.py --stem {stem}") + print() + print(f" Per ottimizzare prima:") + print(f" python chunks/fix_chunks.py --stem {stem} --dry-run") + print(f" python chunks/fix_chunks.py --stem {stem}") else: - print(f" 🔴 Problemi bloccanti — correggi prima di procedere:") - print() - if empty_chunks: - print(f" • {len(empty_chunks)} chunk vuoti") - print(f" → Controlla conversione/{stem}/clean.md per sezioni prive di testo") - if no_prefix: - print(f" • {len(no_prefix)} chunk senza prefisso di contesto") - print(f" → Controlla che gli header ### siano corretti in conversione/{stem}/clean.md") + print(f" 🔴 {len(blockers)} problemi bloccanti — correggi prima di procedere:") + if empty_chunks or body_empty: + print(f" • chunk vuoti/senza corpo → controlla sources/{stem}/auto/{stem}_clean.md") + if no_prefix or malformed_prefix: + print(f" • prefisso mancante/malformato → controlla gli heading in {stem}_clean.md") if incomplete: - print(f" • {len(incomplete)} chunk con frase spezzata") - print(f" → Esegui: python chunks/fix_chunks.py --stem {stem}") + print(f" • frasi spezzate → python chunks/fix_chunks.py --stem {stem}") print() - print(f" Dopo le correzioni, riesegui nell'ordine:") + print(f" Dopo le correzioni:") print(f" python chunks/chunker.py --stem {stem} --force") print(f" python chunks/verify_chunks.py --stem {stem}") - print() if warnings: - print(f" 🟡 Hai anche {len(warnings)} avvisi minori — affrontali dopo aver risolto i 🔴.") + print() + print(f" 🟡 Hai anche {len(warnings)} avvisi — affrontali dopo aver risolto i 🔴.") return not blockers @@ -327,15 +448,13 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Verifica chunk") parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)") - _min_def = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) - _max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) parser.add_argument( - "--min", type=int, default=_min_def, - help=f"Soglia minima caratteri (default: TARGET×(1-TOL) = {_min_def})" + "--min", type=int, default=cfg.MIN_CHARS, + help=f"Soglia minima caratteri (default: {cfg.MIN_CHARS})" ) parser.add_argument( - "--max", type=int, default=_max_def, - help=f"Soglia massima caratteri (default: TARGET×(1+TOL) = {_max_def})" + "--max", type=int, default=cfg.MAX_CHARS, + help=f"Soglia massima caratteri (default: {cfg.MAX_CHARS})" ) args = parser.parse_args() @@ -358,5 +477,5 @@ if __name__ == "__main__": ok = sum(results) total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi") + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi bloccanti") sys.exit(0 if all(results) else 1)