Files
rag-from-scratch/chunks/verify_chunks.py
T
davide 85ae95bf8d feat(verify): nuovi check, istogramma ASCII e sezioni per documento
Nuovi check bloccanti:
  - prefisso malformato ([ senza ] o contenuto vuoto)
  - corpo vuoto dopo prefisso valido

Nuovi warning:
  - tabelle Markdown senza riga separatore |---|
  - chunk con corpo identico (duplicati da overlap/merge)

Output migliorato:
  - istogramma ASCII con marcatori ← MIN / ← MAX
  - top 5 sezioni per volume di chunk
  - mediana (p50) nelle statistiche di lunghezza

report.json arricchito: p50_chars, sections, malformed_prefix,
body_empty, broken_tables, duplicate_bodies.

PUNCT_END esteso con \d[\d.,/]*$ per numeri, anni, riferimenti normativi.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 16:07:51 +02:00

482 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Verifica chunk
Analizza chunks/<stem>/chunks.json e segnala ogni anomalia che potrebbe
degradare la qualità del retrieval. Non modifica nulla.
Input: chunks/<stem>/chunks.json
Output: report a schermo + chunks/<stem>/report.json + exit code (0 = OK, 1 = problemi)
Uso:
python chunks/verify_chunks.py --stem documento
python chunks/verify_chunks.py # tutti i documenti in chunks/
python chunks/verify_chunks.py --min 200 --max 800
"""
import argparse
import json
import re
import sys
from collections import Counter
from pathlib import Path
_HERE = Path(__file__).resolve().parent
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
import config as cfg
# ─── Soglie ───────────────────────────────────────────────────────────────────
MIN_CHARS = cfg.MIN_CHARS
MAX_CHARS = cfg.MAX_CHARS
PUNCT_END = re.compile(
r"[.!?\xbb)\]'\"“”‘—–…]$"
r"|/$" # URL che finisce con /
r"|\|$" # riga di tabella Markdown
r"|;$" # fine clausola legale
r"|:$" # introduzione a lista o formula
r"|\d[\d.,/]*$" # numero, anno, versione, riferimento normativo
)
_HEX_END = re.compile(r"[0-9a-fA-F]{8,}$")
_URL_TAIL = re.compile(r"(https?://|www\.)\S+(\s+\S+){0,3}$")
_MATH_SYMS = re.compile(r"[∈∑≤≥≠∀∃∫√∞∂±×÷→←↔⊂⊃⊆⊇∩∪·°]")
_ROMAN_END = re.compile(r"\b(I{1,3}|IV|VI{0,3}|IX|XI{0,2}|XIV|XV|XVI{0,2}|XIX|XX{0,2})$")
_TABLE_SEP = re.compile(r"^\s*\|[\s\-|:]+\|\s*$")
def _load_thresholds(stem_dir: Path) -> tuple[int, int]:
meta = stem_dir / "meta.json"
if meta.exists():
m = json.loads(meta.read_text(encoding="utf-8"))
return m["min_chars"], m["max_chars"]
return MIN_CHARS, MAX_CHARS
def _strip_prefix(text: str) -> str:
text = text.lstrip()
if text.startswith("["):
end = text.find("]")
if end != -1:
return text[end + 1:].lstrip("\n")
return text
# ─── Checks ───────────────────────────────────────────────────────────────────
def is_empty(chunk: dict) -> bool:
return not chunk.get("text", "").strip()
def has_prefix(chunk: dict) -> bool:
return chunk.get("text", "").lstrip().startswith("[")
def is_prefix_malformed(chunk: dict) -> bool:
"""Inizia con [ ma il prefisso non chiude con ] o ha contenuto vuoto."""
text = chunk.get("text", "").lstrip()
if not text.startswith("["):
return False
first_line = text.split("\n")[0]
end = first_line.find("]")
if end == -1:
return True
return len(first_line[1:end].strip()) == 0
def is_body_empty(chunk: dict) -> bool:
"""Prefisso valido ma nessun testo nel corpo."""
text = chunk.get("text", "").lstrip()
if not text.startswith("["):
return False
end = text.find("]")
if end == -1:
return False
return len(text[end + 1:].strip()) == 0
def is_too_short(chunk: dict, min_chars: int) -> bool:
return chunk.get("n_chars", 0) < min_chars
def is_too_long(chunk: dict, max_chars: int) -> bool:
return chunk.get("n_chars", 0) > max_chars
def ends_incomplete(chunk: dict) -> bool:
text = chunk.get("text", "").rstrip()
if not text:
return False
text_check = re.sub(r"[_*]+$", "", text).rstrip()
if not text_check:
return False
if PUNCT_END.search(text_check):
return False
if _HEX_END.search(text_check):
return False
if _ROMAN_END.search(text_check):
return False
if _URL_TAIL.search(text_check[-200:]):
return False
return True
def is_math_incomplete(chunk: dict) -> bool:
return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= cfg.MATH_SYMS_MIN
def is_table_broken(chunk: dict) -> bool:
"""Tabella Markdown (≥2 righe con |) senza riga separatore |---|."""
text = chunk.get("text", "")
pipe_lines = [l for l in text.splitlines() if "|" in l and l.strip().startswith("|")]
if len(pipe_lines) < 2:
return False
return not any(_TABLE_SEP.match(l) for l in pipe_lines)
def find_duplicate_bodies(chunks: list[dict]) -> list[dict]:
"""Chunk con testo body identico (prefisso escluso). Ignora corpi < 30 char."""
seen: dict[str, str] = {}
dupes = []
for c in chunks:
body = _strip_prefix(c.get("text", "")).strip()
if len(body) < 30:
continue
cid = c["chunk_id"]
if body in seen:
dupes.append({
"chunk_id": cid,
"duplicate_of": seen[body],
"sezione": c.get("sezione", ""),
"titolo": c.get("titolo", ""),
"n_chars": c.get("n_chars", 0),
"last_text": body[:120],
})
else:
seen[body] = cid
return dupes
# ─── Istogramma ───────────────────────────────────────────────────────────────
def _ascii_histogram(lengths: list[int], min_t: int, max_t: int,
n_bins: int = 10, bar_width: int = 28) -> list[str]:
if not lengths:
return []
lo, hi = min(lengths), max(lengths)
if lo == hi:
return [f" {lo:>5}{hi:<5}{'' * bar_width}{len(lengths)}"]
step = (hi - lo) / n_bins
bins = [0] * n_bins
for l in lengths:
idx = min(int((l - lo) / step), n_bins - 1)
bins[idx] += 1
max_count = max(bins) or 1
lines = []
for i, count in enumerate(bins):
lo_b = int(lo + i * step)
hi_b = int(lo + (i + 1) * step)
bar = "" * round(count / max_count * bar_width)
note = ""
if lo_b <= min_t < hi_b:
note = " ← MIN"
elif lo_b <= max_t < hi_b:
note = " ← MAX"
lines.append(f" {lo_b:>5}{hi_b:<5}{bar:<{bar_width}}{count}{note}")
return lines
# ─── Helpers output ───────────────────────────────────────────────────────────
def _fmt_chunk(c: dict) -> str:
cid = c.get("chunk_id", "?")
n = c.get("n_chars", 0)
preview = c.get("text", "")[:60].replace("\n", " ")
return f" [{cid}] ({n} char) «{preview}»"
def _chunk_entry(c: dict) -> dict:
return {
"chunk_id": c.get("chunk_id", ""),
"sezione": c.get("sezione", ""),
"titolo": c.get("titolo", ""),
"n_chars": c.get("n_chars", 0),
"last_text": c.get("text", "").rstrip().split("\n")[-1][-120:],
}
def _print_list(items: list[dict], limit: int = 5) -> None:
for c in items[:limit]:
print(_fmt_chunk(c))
if len(items) > limit:
print(f" ... e altri {len(items) - limit}")
# ─── Core ─────────────────────────────────────────────────────────────────────
def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool:
stem_dir = project_root / "chunks" / stem
chunks_path = stem_dir / "chunks.json"
min_chars, max_chars = _load_thresholds(stem_dir)
print(f"\nDocumento: {stem}")
if not chunks_path.exists():
print(f" ✗ chunks/{stem}/chunks.json non trovato")
print(f" Esegui prima: python chunks/chunker.py --stem {stem}")
return False
chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8"))
if not chunks:
print(f" ✗ chunks.json è vuoto")
return False
# ── Raccogli problemi ──────────────────────────────────────────────────────
empty_chunks = [c for c in chunks if is_empty(c)]
no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)]
malformed_prefix = [c for c in chunks
if not is_empty(c) and has_prefix(c) and is_prefix_malformed(c)]
body_empty = [c for c in chunks
if not is_empty(c) and has_prefix(c)
and not is_prefix_malformed(c) and is_body_empty(c)]
too_short = [c for c in chunks if is_too_short(c, min_chars)]
too_long = [c for c in chunks if is_too_long(c, max_chars)]
_incomplete_all = [c for c in chunks if not is_empty(c) and ends_incomplete(c)]
incomplete_math = [c for c in _incomplete_all if is_math_incomplete(c)]
incomplete = [c for c in _incomplete_all if not is_math_incomplete(c)]
broken_tables = [c for c in chunks if is_table_broken(c)]
duplicates = find_duplicate_bodies(chunks)
# ── Statistiche ───────────────────────────────────────────────────────────
lengths = [c.get("n_chars", 0) for c in chunks]
n_total = len(chunks)
blocker_ids = set(
c["chunk_id"]
for lst in [empty_chunks, no_prefix, malformed_prefix, body_empty, incomplete]
for c in lst
)
n_ok = n_total - len(blocker_ids)
min_l = min(lengths)
max_l = max(lengths)
avg_l = int(sum(lengths) / n_total)
p50 = sorted(lengths)[n_total // 2]
n_under = sum(1 for l in lengths if l < min_chars)
n_norm = sum(1 for l in lengths if min_chars <= l <= max_chars)
n_over = sum(1 for l in lengths if l > max_chars)
section_counts = Counter(c.get("sezione", "") or "" for c in chunks)
# ── Output statistiche ────────────────────────────────────────────────────
print(f" Totale: {n_total} | ✅ OK: {n_ok}")
print()
print(f" Lunghezze — min {min_l} p50 {p50} media {avg_l} max {max_l}")
print(f" Fasce — <{min_chars}: {n_under} | {min_chars}{max_chars}: {n_norm} | >{max_chars}: {n_over}")
print()
print(" Istogramma:")
for line in _ascii_histogram(lengths, min_chars, max_chars):
print(line)
print()
print(" Top sezioni:")
for sezione, count in section_counts.most_common(5):
bar = "" * min(count, 35)
print(f" {bar} {count:>4} {sezione[:65]}")
# ── Blockers ──────────────────────────────────────────────────────────────
if empty_chunks:
print(f"\n 🔴 {len(empty_chunks)} chunk VUOTI:")
for c in empty_chunks[:5]:
print(f" [{c.get('chunk_id', '?')}]")
if len(empty_chunks) > 5:
print(f" ... e altri {len(empty_chunks) - 5}")
if no_prefix:
print(f"\n 🔴 {len(no_prefix)} chunk SENZA PREFISSO DI CONTESTO:")
_print_list(no_prefix)
print(f" → Causa probabile: heading mancanti nel clean.md")
if malformed_prefix:
print(f"\n 🔴 {len(malformed_prefix)} chunk con PREFISSO MALFORMATO ([ senza ] o vuoto):")
_print_list(malformed_prefix)
print(f" → Causa probabile: heading con caratteri speciali nel clean.md")
if body_empty:
print(f"\n 🔴 {len(body_empty)} chunk con CORPO VUOTO (solo prefisso):")
_print_list(body_empty)
print(f" → Causa probabile: sezioni senza testo nel clean.md")
if incomplete:
print(f"\n 🔴 {len(incomplete)} chunk con FRASE SPEZZATA:")
for c in incomplete[:5]:
last_line = c.get("text", "").rstrip().split("\n")[-1][-80:]
print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}")
if len(incomplete) > 5:
print(f" ... e altri {len(incomplete) - 5}")
print(f" → Soluzione: python chunks/fix_chunks.py --stem {stem}")
# ── Warnings ──────────────────────────────────────────────────────────────
if too_short:
print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):")
_print_list(too_short)
if too_long:
print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX ({max_chars}):")
_print_list(too_long)
print(f" → Causa: frasi non suddivisibili o blocchi atomici (tabelle/liste)")
if incomplete_math:
print(f"\n 🟡 {len(incomplete_math)} chunk MATEMATICI senza punteggiatura finale:")
for c in incomplete_math[:3]:
last_line = c.get("text", "").rstrip().split("\n")[-1][-80:]
print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}")
if len(incomplete_math) > 3:
print(f" ... e altri {len(incomplete_math) - 3}")
if broken_tables:
print(f"\n 🟡 {len(broken_tables)} TABELLE senza riga separatore |---|:")
_print_list(broken_tables, limit=3)
print(f" → Le tabelle potrebbero non renderizzarsi nel retrieval")
if duplicates:
print(f"\n 🟡 {len(duplicates)} DUPLICATI (corpo identico):")
for e in duplicates[:5]:
print(f" [{e['chunk_id']}] ≡ [{e['duplicate_of']}] «{e['last_text'][:60]}»")
if len(duplicates) > 5:
print(f" ... e altri {len(duplicates) - 5}")
print(f" → Causa probabile: fix_chunks merge multipli o sezioni ripetute")
# ── Report.json ───────────────────────────────────────────────────────────
blockers = empty_chunks + no_prefix + malformed_prefix + body_empty + incomplete
warnings = too_short + too_long + incomplete_math + broken_tables
verdict = "blocked" if blockers else ("warnings_only" if (warnings or duplicates) else "ok")
report = {
"stem": stem,
"verdict": verdict,
"stats": {
"total": n_total,
"ok": n_ok,
"min_chars": min_l,
"max_chars": max_l,
"avg_chars": avg_l,
"p50_chars": p50,
"under_min": n_under,
"in_range": n_norm,
"over_max": n_over,
"sections": [{"sezione": s, "n_chunks": n}
for s, n in section_counts.most_common()],
},
"thresholds": {
"min_chars": min_chars,
"max_chars": max_chars,
"target_chars": cfg.MAX_CHARS,
},
"blockers": {
"empty": [_chunk_entry(c) for c in empty_chunks],
"no_prefix": [_chunk_entry(c) for c in no_prefix],
"malformed_prefix": [_chunk_entry(c) for c in malformed_prefix],
"body_empty": [_chunk_entry(c) for c in body_empty],
"incomplete": [_chunk_entry(c) for c in incomplete],
},
"warnings": {
"too_short": [_chunk_entry(c) for c in too_short],
"too_long": [_chunk_entry(c) for c in too_long],
"incomplete_math": [_chunk_entry(c) for c in incomplete_math],
"broken_tables": [_chunk_entry(c) for c in broken_tables],
"duplicate_bodies": duplicates,
},
}
out_dir = project_root / "chunks" / stem
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / "report.json").write_text(
json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f"\n report.json → chunks/{stem}/")
# ── Prossimi passi ────────────────────────────────────────────────────────
print(f"\n {'' * 50}")
print(f" Verdict: {verdict.upper()}")
print(f" {'' * 50}")
if verdict == "ok":
print(f" ✅ Tutto OK — procedi alla vettorizzazione:")
print(f" python ingestion/ingest.py --stem {stem}")
elif verdict == "warnings_only":
print(f" 🟡 Solo avvisi — puoi procedere alla vettorizzazione:")
print(f" python ingestion/ingest.py --stem {stem}")
if too_short or too_long:
print()
print(f" Per ottimizzare prima:")
print(f" python chunks/fix_chunks.py --stem {stem} --dry-run")
print(f" python chunks/fix_chunks.py --stem {stem}")
else:
print(f" 🔴 {len(blockers)} problemi bloccanti — correggi prima di procedere:")
if empty_chunks or body_empty:
print(f" • chunk vuoti/senza corpo → controlla sources/{stem}/auto/{stem}_clean.md")
if no_prefix or malformed_prefix:
print(f" • prefisso mancante/malformato → controlla gli heading in {stem}_clean.md")
if incomplete:
print(f" • frasi spezzate → python chunks/fix_chunks.py --stem {stem}")
print()
print(f" Dopo le correzioni:")
print(f" python chunks/chunker.py --stem {stem} --force")
print(f" python chunks/verify_chunks.py --stem {stem}")
if warnings:
print()
print(f" 🟡 Hai anche {len(warnings)} avvisi — affrontali dopo aver risolto i 🔴.")
return not blockers
# ─── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Verifica chunk")
parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)")
parser.add_argument(
"--min", type=int, default=cfg.MIN_CHARS,
help=f"Soglia minima caratteri (default: {cfg.MIN_CHARS})"
)
parser.add_argument(
"--max", type=int, default=cfg.MAX_CHARS,
help=f"Soglia massima caratteri (default: {cfg.MAX_CHARS})"
)
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
chunks_dir = project_root / "chunks"
if not chunks_dir.exists():
print(f"Errore: cartella chunks/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(
p.name for p in chunks_dir.iterdir()
if p.is_dir() and (p / "chunks.json").exists()
)
if not stems:
print("Errore: nessun chunks.json trovato in chunks/")
sys.exit(1)
results = [verify_stem(s, project_root, args.min, args.max) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi bloccanti")
sys.exit(0 if all(results) else 1)