Files
rag-from-scratch/chunks/verify_chunks.py
T
davide fe0ecc24ad feat(chunks): sentence-boundary flush, math incomplete detection, structure profile export
- chunker: estrai _flush_chunk() con estensione al confine di frase (max 120%)
- verify: rileva chunk matematici incompleti come warning, gestisci hash hex e URL
- conversione: esporta structure_profile.json nell'output dir
2026-04-20 12:28:03 +02:00

335 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Verifica chunk
Analizza chunks/<stem>/chunks.json e segnala ogni anomalia che potrebbe
degradare la qualità del retrieval. Non modifica nulla.
Input: chunks/<stem>/chunks.json
Output: report a schermo + chunks/<stem>/report.json + exit code (0 = OK, 1 = problemi)
Uso:
python chunks/verify_chunks.py --stem documento
python chunks/verify_chunks.py # tutti i documenti in chunks/
python chunks/verify_chunks.py --min 200 --max 800
"""
import argparse
import json
import re
import sys
from pathlib import Path
# ─── Soglie ───────────────────────────────────────────────────────────────────
MIN_CHARS = 200
MAX_CHARS = 800
PUNCT_END = re.compile(
r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$"
r"|/$" # URL che finisce con /
r"|\|$" # riga di tabella Markdown
r"|:$" # introduzione a lista o formula
)
_HEX_END = re.compile(r"[0-9a-fA-F]{8,}$")
_URL_TAIL = re.compile(r"https?://\S+(\s+\S+){0,3}$") # URL con fino a 3 token extra
_MATH_SYMS = re.compile(r"[∈∑≤≥≠∀∃∫√∞∂±×÷→←↔⊂⊃⊆⊇∩∪·°]")
# ─── Checks ───────────────────────────────────────────────────────────────────
def has_prefix(chunk: dict) -> bool:
return chunk.get("text", "").lstrip().startswith("[")
def is_empty(chunk: dict) -> bool:
return not chunk.get("text", "").strip()
def is_too_short(chunk: dict, min_chars: int) -> bool:
return chunk.get("n_chars", 0) < min_chars
def is_too_long(chunk: dict, max_chars: int) -> bool:
return chunk.get("n_chars", 0) > max_chars * 1.5
def ends_incomplete(chunk: dict) -> bool:
text = chunk.get("text", "").rstrip()
if not text:
return False
text_check = re.sub(r"[_*]+$", "", text).rstrip()
if not text_check:
return False
if PUNCT_END.search(text_check):
return False
if _HEX_END.search(text_check): # hash SHA / codice hex
return False
if _URL_TAIL.search(text_check[-200:]): # URL (con eventuale path dopo spazio)
return False
return True
def is_math_incomplete(chunk: dict) -> bool:
"""Incompleto ma in contesto matematico — degrada a warning invece di blocker."""
return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= 3
# ─── Report ───────────────────────────────────────────────────────────────────
def _fmt_chunk(c: dict) -> str:
cid = c.get("chunk_id", "?")
n = c.get("n_chars", 0)
preview = c.get("text", "")[:60].replace("\n", " ")
return f" [{cid}] ({n} char) «{preview}»"
def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool:
chunks_path = project_root / "chunks" / stem / "chunks.json"
print(f"\nDocumento: {stem}")
if not chunks_path.exists():
print(f" ✗ chunks/{stem}/chunks.json non trovato")
print(f" Esegui prima: python chunks/chunker.py --stem {stem}")
return False
chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8"))
if not chunks:
print(f" ✗ chunks.json è vuoto")
return False
# ── Raccogli problemi ──────────────────────────────────────────────────────
empty_chunks = [c for c in chunks if is_empty(c)]
no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)]
too_short = [c for c in chunks if is_too_short(c, min_chars)]
too_long = [c for c in chunks if is_too_long(c, max_chars)]
_incomplete_all = [c for c in chunks if not is_empty(c) and ends_incomplete(c)]
incomplete_math = [c for c in _incomplete_all if is_math_incomplete(c)]
incomplete = [c for c in _incomplete_all if not is_math_incomplete(c)]
# ── Statistiche ───────────────────────────────────────────────────────────
lengths = [c.get("n_chars", 0) for c in chunks]
n_total = len(chunks)
n_ok = n_total - len(set(
c["chunk_id"]
for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete]
for c in lst
))
min_l = min(lengths)
max_l = max(lengths)
avg_l = int(sum(lengths) / n_total)
n_under = sum(1 for l in lengths if l < min_chars)
n_normal = sum(1 for l in lengths if min_chars <= l <= max_chars)
n_over = sum(1 for l in lengths if l > max_chars)
# ── Output ────────────────────────────────────────────────────────────────
print(f" Totale chunk: {n_total}")
print(f" ✅ OK: {n_ok}")
print()
print(f" Distribuzione lunghezze:")
print(f" Min: {min_l} char")
print(f" Max: {max_l} char")
print(f" Media: {avg_l} char")
print(f" < {min_chars} char (sotto MIN): {n_under}")
print(f" {min_chars}{max_chars} char (ideale): {n_normal}")
print(f" > {max_chars} char (sopra MAX): {n_over}")
has_errors = False
if empty_chunks:
has_errors = True
print(f"\n 🔴 {len(empty_chunks)} chunk VUOTI:")
for c in empty_chunks[:5]:
print(f" [{c.get('chunk_id', '?')}]")
if len(empty_chunks) > 5:
print(f" ... e altri {len(empty_chunks) - 5}")
if no_prefix:
has_errors = True
print(f"\n 🔴 {len(no_prefix)} chunk SENZA PREFISSO DI CONTESTO:")
for c in no_prefix[:5]:
print(_fmt_chunk(c))
if len(no_prefix) > 5:
print(f" ... e altri {len(no_prefix) - 5}")
print(f" → Causa probabile: header ### mancanti o malformati nel MD")
if too_short:
has_errors = True
print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):")
for c in too_short[:5]:
print(_fmt_chunk(c))
if len(too_short) > 5:
print(f" ... e altri {len(too_short) - 5}")
print(f" → Soluzione: abbassa MIN_CHARS o revisiona il MD")
if too_long:
has_errors = True
print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX_CHARS×1.5 ({int(max_chars * 1.5)}):")
for c in too_long[:5]:
print(_fmt_chunk(c))
if len(too_long) > 5:
print(f" ... e altri {len(too_long) - 5}")
print(f" → Soluzione: alza MAX_CHARS o verifica il testo nel MD")
if incomplete:
has_errors = True
print(f"\n 🔴 {len(incomplete)} chunk CHE FINISCONO SENZA PUNTEGGIATURA (frase spezzata):")
for c in incomplete[:5]:
last_line = c.get("text", "").rstrip().split("\n")[-1][-80:]
print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}")
if len(incomplete) > 5:
print(f" ... e altri {len(incomplete) - 5}")
print(f" → Soluzione: correggi le righe spezzate in conversione/{stem}/clean.md")
if incomplete_math:
has_errors = True
print(f"\n 🟡 {len(incomplete_math)} chunk MATEMATICI SENZA PUNTEGGIATURA (formula/espressione):")
for c in incomplete_math[:3]:
last_line = c.get("text", "").rstrip().split("\n")[-1][-80:]
print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}")
if len(incomplete_math) > 3:
print(f" ... e altri {len(incomplete_math) - 3}")
print(f" → Le formule non finiscono con punteggiatura — avviso non bloccante")
# ── Costruisci e salva report.json ────────────────────────────────────────
blockers = empty_chunks + no_prefix + incomplete
warnings = too_short + too_long + incomplete_math
def _chunk_entry(c: dict) -> dict:
return {
"chunk_id": c.get("chunk_id", ""),
"sezione": c.get("sezione", ""),
"titolo": c.get("titolo", ""),
"n_chars": c.get("n_chars", 0),
"last_text": c.get("text", "").rstrip().split("\n")[-1][-120:],
}
verdict = "ok" if not blockers else "blocked"
if not blockers and warnings:
verdict = "warnings_only"
report = {
"stem": stem,
"verdict": verdict,
"stats": {
"total": n_total,
"ok": n_ok,
"min_chars": min_l,
"max_chars": max_l,
"avg_chars": avg_l,
},
"thresholds": {"min_chars": min_chars, "max_chars": max_chars},
"blockers": {
"empty": [_chunk_entry(c) for c in empty_chunks],
"no_prefix": [_chunk_entry(c) for c in no_prefix],
"incomplete": [_chunk_entry(c) for c in incomplete],
},
"warnings": {
"too_short": [_chunk_entry(c) for c in too_short],
"too_long": [_chunk_entry(c) for c in too_long],
"incomplete_math": [_chunk_entry(c) for c in incomplete_math],
},
}
out_dir = project_root / "chunks" / stem
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / "report.json").write_text(
json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f"\n report.json salvato in chunks/{stem}/")
# ── Prossimi passi ────────────────────────────────────────────────────────
print(f"\n {'' * 50}")
print(f" PROSSIMI PASSI")
print(f" {'' * 50}")
if not blockers and not warnings:
print(f" ✅ Tutto OK — procedi alla vettorizzazione:")
print(f" python step-8/ingest.py --stem {stem}")
elif not blockers:
print(f" 🟡 Solo avvisi minori — puoi procedere alla vettorizzazione:")
print(f" python step-8/ingest.py --stem {stem}")
print()
print(f" Oppure, per ottimizzare prima:")
if too_short:
pct = int(len(too_short) / n_total * 100)
print(f"{len(too_short)} chunk corti ({pct}% del totale)")
if too_long:
pct = int(len(too_long) / n_total * 100)
print(f"{len(too_long)} chunk lunghi ({pct}% del totale)")
if too_short or too_long:
print(f" → Esegui: python chunks/fix_chunks.py --stem {stem} --dry-run")
print(f" poi: python chunks/fix_chunks.py --stem {stem}")
print(f" poi: python chunks/verify_chunks.py --stem {stem}")
else:
print(f" 🔴 Problemi bloccanti — correggi prima di procedere:")
print()
if empty_chunks:
print(f"{len(empty_chunks)} chunk vuoti")
print(f" → Controlla conversione/{stem}/clean.md per sezioni prive di testo")
if no_prefix:
print(f"{len(no_prefix)} chunk senza prefisso di contesto")
print(f" → Controlla che gli header ### siano corretti in conversione/{stem}/clean.md")
if incomplete:
print(f"{len(incomplete)} chunk con frase spezzata")
print(f" → Esegui: python chunks/fix_chunks.py --stem {stem}")
print()
print(f" Dopo le correzioni, riesegui nell'ordine:")
print(f" python chunks/chunker.py --stem {stem} --force")
print(f" python chunks/verify_chunks.py --stem {stem}")
print()
if warnings:
print(f" 🟡 Hai anche {len(warnings)} avvisi minori — affrontali dopo aver risolto i 🔴.")
return not blockers
# ─── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Verifica chunk")
parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)")
parser.add_argument(
"--min", type=int, default=MIN_CHARS,
help=f"Soglia minima caratteri (default: {MIN_CHARS})"
)
parser.add_argument(
"--max", type=int, default=MAX_CHARS,
help=f"Soglia massima caratteri (default: {MAX_CHARS})"
)
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
chunks_dir = project_root / "chunks"
if not chunks_dir.exists():
print(f"Errore: cartella chunks/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(
p.name for p in chunks_dir.iterdir()
if p.is_dir() and (p / "chunks.json").exists()
)
if not stems:
print("Errore: nessun chunks.json trovato in chunks/")
sys.exit(1)
results = [verify_stem(s, project_root, args.min, args.max) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi")
sys.exit(0 if all(results) else 1)