4c0e0db2a5
Nuova cartella chunks/ con chunker.py (step 5), verify_chunks.py e fix_chunks.py (step 6). Tutto l'I/O va in chunks/<stem>/ invece di step-5/ e step-6/ separati. Input: conversione/<stem>/clean.md
303 lines
12 KiB
Python
303 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Verifica chunk
|
||
|
||
Analizza chunks/<stem>/chunks.json e segnala ogni anomalia che potrebbe
|
||
degradare la qualità del retrieval. Non modifica nulla.
|
||
|
||
Input: chunks/<stem>/chunks.json
|
||
Output: report a schermo + chunks/<stem>/report.json + exit code (0 = OK, 1 = problemi)
|
||
|
||
Uso:
|
||
python chunks/verify_chunks.py --stem documento
|
||
python chunks/verify_chunks.py # tutti i documenti in chunks/
|
||
python chunks/verify_chunks.py --min 200 --max 800
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
|
||
# ─── Soglie ───────────────────────────────────────────────────────────────────
|
||
|
||
MIN_CHARS = 200
|
||
MAX_CHARS = 800
|
||
PUNCT_END = re.compile("[.!?»)\\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026-]$")
|
||
|
||
|
||
# ─── Checks ───────────────────────────────────────────────────────────────────
|
||
|
||
def has_prefix(chunk: dict) -> bool:
|
||
return chunk.get("text", "").lstrip().startswith("[")
|
||
|
||
|
||
def is_empty(chunk: dict) -> bool:
|
||
return not chunk.get("text", "").strip()
|
||
|
||
|
||
def is_too_short(chunk: dict, min_chars: int) -> bool:
|
||
return chunk.get("n_chars", 0) < min_chars
|
||
|
||
|
||
def is_too_long(chunk: dict, max_chars: int) -> bool:
|
||
return chunk.get("n_chars", 0) > max_chars * 1.5
|
||
|
||
|
||
def ends_incomplete(chunk: dict) -> bool:
|
||
text = chunk.get("text", "").rstrip()
|
||
if not text:
|
||
return False
|
||
text_check = re.sub(r"[_*]+$", "", text).rstrip()
|
||
if not text_check:
|
||
return False
|
||
return not PUNCT_END.search(text_check)
|
||
|
||
|
||
# ─── Report ───────────────────────────────────────────────────────────────────
|
||
|
||
def _fmt_chunk(c: dict) -> str:
|
||
cid = c.get("chunk_id", "?")
|
||
n = c.get("n_chars", 0)
|
||
preview = c.get("text", "")[:60].replace("\n", " ")
|
||
return f" [{cid}] ({n} char) «{preview}»"
|
||
|
||
|
||
def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool:
|
||
chunks_path = project_root / "chunks" / stem / "chunks.json"
|
||
|
||
print(f"\nDocumento: {stem}")
|
||
|
||
if not chunks_path.exists():
|
||
print(f" ✗ chunks/{stem}/chunks.json non trovato")
|
||
print(f" Esegui prima: python chunks/chunker.py --stem {stem}")
|
||
return False
|
||
|
||
chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8"))
|
||
|
||
if not chunks:
|
||
print(f" ✗ chunks.json è vuoto")
|
||
return False
|
||
|
||
# ── Raccogli problemi ──────────────────────────────────────────────────────
|
||
|
||
empty_chunks = [c for c in chunks if is_empty(c)]
|
||
no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)]
|
||
too_short = [c for c in chunks if is_too_short(c, min_chars)]
|
||
too_long = [c for c in chunks if is_too_long(c, max_chars)]
|
||
incomplete = [c for c in chunks if not is_empty(c) and ends_incomplete(c)]
|
||
|
||
# ── Statistiche ───────────────────────────────────────────────────────────
|
||
|
||
lengths = [c.get("n_chars", 0) for c in chunks]
|
||
n_total = len(chunks)
|
||
n_ok = n_total - len(set(
|
||
c["chunk_id"]
|
||
for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete]
|
||
for c in lst
|
||
))
|
||
min_l = min(lengths)
|
||
max_l = max(lengths)
|
||
avg_l = int(sum(lengths) / n_total)
|
||
|
||
n_under = sum(1 for l in lengths if l < min_chars)
|
||
n_normal = sum(1 for l in lengths if min_chars <= l <= max_chars)
|
||
n_over = sum(1 for l in lengths if l > max_chars)
|
||
|
||
# ── Output ────────────────────────────────────────────────────────────────
|
||
|
||
print(f" Totale chunk: {n_total}")
|
||
print(f" ✅ OK: {n_ok}")
|
||
print()
|
||
print(f" Distribuzione lunghezze:")
|
||
print(f" Min: {min_l} char")
|
||
print(f" Max: {max_l} char")
|
||
print(f" Media: {avg_l} char")
|
||
print(f" < {min_chars} char (sotto MIN): {n_under}")
|
||
print(f" {min_chars}–{max_chars} char (ideale): {n_normal}")
|
||
print(f" > {max_chars} char (sopra MAX): {n_over}")
|
||
|
||
has_errors = False
|
||
|
||
if empty_chunks:
|
||
has_errors = True
|
||
print(f"\n 🔴 {len(empty_chunks)} chunk VUOTI:")
|
||
for c in empty_chunks[:5]:
|
||
print(f" [{c.get('chunk_id', '?')}]")
|
||
if len(empty_chunks) > 5:
|
||
print(f" ... e altri {len(empty_chunks) - 5}")
|
||
|
||
if no_prefix:
|
||
has_errors = True
|
||
print(f"\n 🔴 {len(no_prefix)} chunk SENZA PREFISSO DI CONTESTO:")
|
||
for c in no_prefix[:5]:
|
||
print(_fmt_chunk(c))
|
||
if len(no_prefix) > 5:
|
||
print(f" ... e altri {len(no_prefix) - 5}")
|
||
print(f" → Causa probabile: header ### mancanti o malformati nel MD")
|
||
|
||
if too_short:
|
||
has_errors = True
|
||
print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):")
|
||
for c in too_short[:5]:
|
||
print(_fmt_chunk(c))
|
||
if len(too_short) > 5:
|
||
print(f" ... e altri {len(too_short) - 5}")
|
||
print(f" → Soluzione: abbassa MIN_CHARS o revisiona il MD")
|
||
|
||
if too_long:
|
||
has_errors = True
|
||
print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX_CHARS×1.5 ({int(max_chars * 1.5)}):")
|
||
for c in too_long[:5]:
|
||
print(_fmt_chunk(c))
|
||
if len(too_long) > 5:
|
||
print(f" ... e altri {len(too_long) - 5}")
|
||
print(f" → Soluzione: alza MAX_CHARS o verifica il testo nel MD")
|
||
|
||
if incomplete:
|
||
has_errors = True
|
||
print(f"\n 🔴 {len(incomplete)} chunk CHE FINISCONO SENZA PUNTEGGIATURA (frase spezzata):")
|
||
for c in incomplete[:5]:
|
||
last_line = c.get("text", "").rstrip().split("\n")[-1][-80:]
|
||
print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}")
|
||
if len(incomplete) > 5:
|
||
print(f" ... e altri {len(incomplete) - 5}")
|
||
print(f" → Soluzione: correggi le righe spezzate in conversione/{stem}/clean.md")
|
||
|
||
# ── Costruisci e salva report.json ────────────────────────────────────────
|
||
|
||
blockers = empty_chunks + no_prefix + incomplete
|
||
warnings = too_short + too_long
|
||
|
||
def _chunk_entry(c: dict) -> dict:
|
||
return {
|
||
"chunk_id": c.get("chunk_id", ""),
|
||
"sezione": c.get("sezione", ""),
|
||
"titolo": c.get("titolo", ""),
|
||
"n_chars": c.get("n_chars", 0),
|
||
"last_text": c.get("text", "").rstrip().split("\n")[-1][-120:],
|
||
}
|
||
|
||
verdict = "ok" if not blockers else "blocked"
|
||
if not blockers and warnings:
|
||
verdict = "warnings_only"
|
||
|
||
report = {
|
||
"stem": stem,
|
||
"verdict": verdict,
|
||
"stats": {
|
||
"total": n_total,
|
||
"ok": n_ok,
|
||
"min_chars": min_l,
|
||
"max_chars": max_l,
|
||
"avg_chars": avg_l,
|
||
},
|
||
"thresholds": {"min_chars": min_chars, "max_chars": max_chars},
|
||
"blockers": {
|
||
"empty": [_chunk_entry(c) for c in empty_chunks],
|
||
"no_prefix": [_chunk_entry(c) for c in no_prefix],
|
||
"incomplete": [_chunk_entry(c) for c in incomplete],
|
||
},
|
||
"warnings": {
|
||
"too_short": [_chunk_entry(c) for c in too_short],
|
||
"too_long": [_chunk_entry(c) for c in too_long],
|
||
},
|
||
}
|
||
|
||
out_dir = project_root / "chunks" / stem
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
(out_dir / "report.json").write_text(
|
||
json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
print(f"\n report.json salvato in chunks/{stem}/")
|
||
|
||
# ── Prossimi passi ────────────────────────────────────────────────────────
|
||
|
||
print(f"\n {'─' * 50}")
|
||
print(f" PROSSIMI PASSI")
|
||
print(f" {'─' * 50}")
|
||
|
||
if not blockers and not warnings:
|
||
print(f" ✅ Tutto OK — procedi alla vettorizzazione:")
|
||
print(f" python step-8/ingest.py --stem {stem}")
|
||
|
||
elif not blockers:
|
||
print(f" 🟡 Solo avvisi minori — puoi procedere alla vettorizzazione:")
|
||
print(f" python step-8/ingest.py --stem {stem}")
|
||
print()
|
||
print(f" Oppure, per ottimizzare prima:")
|
||
if too_short:
|
||
pct = int(len(too_short) / n_total * 100)
|
||
print(f" • {len(too_short)} chunk corti ({pct}% del totale)")
|
||
if too_long:
|
||
pct = int(len(too_long) / n_total * 100)
|
||
print(f" • {len(too_long)} chunk lunghi ({pct}% del totale)")
|
||
if too_short or too_long:
|
||
print(f" → Esegui: python chunks/fix_chunks.py --stem {stem} --dry-run")
|
||
print(f" poi: python chunks/fix_chunks.py --stem {stem}")
|
||
print(f" poi: python chunks/verify_chunks.py --stem {stem}")
|
||
|
||
else:
|
||
print(f" 🔴 Problemi bloccanti — correggi prima di procedere:")
|
||
print()
|
||
if empty_chunks:
|
||
print(f" • {len(empty_chunks)} chunk vuoti")
|
||
print(f" → Controlla conversione/{stem}/clean.md per sezioni prive di testo")
|
||
if no_prefix:
|
||
print(f" • {len(no_prefix)} chunk senza prefisso di contesto")
|
||
print(f" → Controlla che gli header ### siano corretti in conversione/{stem}/clean.md")
|
||
if incomplete:
|
||
print(f" • {len(incomplete)} chunk con frase spezzata")
|
||
print(f" → Esegui: python chunks/fix_chunks.py --stem {stem}")
|
||
print()
|
||
print(f" Dopo le correzioni, riesegui nell'ordine:")
|
||
print(f" python chunks/chunker.py --stem {stem} --force")
|
||
print(f" python chunks/verify_chunks.py --stem {stem}")
|
||
print()
|
||
if warnings:
|
||
print(f" 🟡 Hai anche {len(warnings)} avvisi minori — affrontali dopo aver risolto i 🔴.")
|
||
|
||
return not blockers
|
||
|
||
|
||
# ─── Entry point ──────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
project_root = Path(__file__).parent.parent
|
||
|
||
parser = argparse.ArgumentParser(description="Verifica chunk")
|
||
parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)")
|
||
parser.add_argument(
|
||
"--min", type=int, default=MIN_CHARS,
|
||
help=f"Soglia minima caratteri (default: {MIN_CHARS})"
|
||
)
|
||
parser.add_argument(
|
||
"--max", type=int, default=MAX_CHARS,
|
||
help=f"Soglia massima caratteri (default: {MAX_CHARS})"
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if args.stem:
|
||
stems = [args.stem]
|
||
else:
|
||
chunks_dir = project_root / "chunks"
|
||
if not chunks_dir.exists():
|
||
print(f"Errore: cartella chunks/ non trovata in {project_root}")
|
||
sys.exit(1)
|
||
stems = sorted(
|
||
p.name for p in chunks_dir.iterdir()
|
||
if p.is_dir() and (p / "chunks.json").exists()
|
||
)
|
||
if not stems:
|
||
print("Errore: nessun chunks.json trovato in chunks/")
|
||
sys.exit(1)
|
||
|
||
results = [verify_stem(s, project_root, args.min, args.max) for s in stems]
|
||
|
||
ok = sum(results)
|
||
total = len(results)
|
||
print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi")
|
||
sys.exit(0 if all(results) else 1)
|