Files
davide 48567fa5e7 fix(verify): riconosce URL www. come terminatori validi + doc multi-documento
- _URL_TAIL ora matcha anche www.* (non solo https://) — evita falsi blockers
  su watermark tipo www.docsity.com
- README: documenta --collection / --stems per ingestion, retrieve e rag

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 11:21:24 +02:00

363 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Verifica chunk
Analizza chunks/<stem>/chunks.json e segnala ogni anomalia che potrebbe
degradare la qualità del retrieval. Non modifica nulla.
Input: chunks/<stem>/chunks.json
Output: report a schermo + chunks/<stem>/report.json + exit code (0 = OK, 1 = problemi)
Uso:
python chunks/verify_chunks.py --stem documento
python chunks/verify_chunks.py # tutti i documenti in chunks/
python chunks/verify_chunks.py --min 200 --max 800
"""
import argparse
import json
import re
import sys
from pathlib import Path
_HERE = Path(__file__).resolve().parent
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
import config as cfg
# ─── Soglie (derivate dal target, sovrascrivibili da CLI) ────────────────────
MIN_CHARS = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
PUNCT_END = re.compile(
r"[.!?\xbb)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$"
r"|/$" # URL che finisce con /
r"|\|$" # riga di tabella Markdown
r"|;$" # fine clausola legale (testo giuridico)
r"|:$" # introduzione a lista o formula
)
_HEX_END = re.compile(r"[0-9a-fA-F]{8,}$")
_URL_TAIL = re.compile(r"(https?://|www\.)\S+(\s+\S+){0,3}$") # URL con fino a 3 token extra
_MATH_SYMS = re.compile(r"[∈∑≤≥≠∀∃∫√∞∂±×÷→←↔⊂⊃⊆⊇∩∪·°]")
_ROMAN_END = re.compile(r"\b(I{1,3}|IV|VI{0,3}|IX|XI{0,2}|XIV|XV|XVI{0,2}|XIX|XX{0,2})$")
def _load_thresholds(stem_dir: "Path") -> "tuple[int, int]":
"""Legge min/max da meta.json (scritto dal chunker) o usa i default da config."""
meta = stem_dir / "meta.json"
if meta.exists():
import json as _json
m = _json.loads(meta.read_text(encoding="utf-8"))
return m["min_chars"], m["max_chars"]
return MIN_CHARS, MAX_CHARS
# ─── Checks ───────────────────────────────────────────────────────────────────
def has_prefix(chunk: dict) -> bool:
return chunk.get("text", "").lstrip().startswith("[")
def is_empty(chunk: dict) -> bool:
return not chunk.get("text", "").strip()
def is_too_short(chunk: dict, min_chars: int) -> bool:
return chunk.get("n_chars", 0) < min_chars
def is_too_long(chunk: dict, max_chars: int) -> bool:
return chunk.get("n_chars", 0) > max_chars
def ends_incomplete(chunk: dict) -> bool:
text = chunk.get("text", "").rstrip()
if not text:
return False
text_check = re.sub(r"[_*]+$", "", text).rstrip()
if not text_check:
return False
if PUNCT_END.search(text_check):
return False
if _HEX_END.search(text_check): # hash SHA / codice hex
return False
if _ROMAN_END.search(text_check): # numero romano finale (indice/riferimento PDF)
return False
if _URL_TAIL.search(text_check[-200:]): # URL (con eventuale path dopo spazio)
return False
return True
def is_math_incomplete(chunk: dict) -> bool:
"""Incompleto ma in contesto matematico — degrada a warning invece di blocker."""
return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= cfg.MATH_SYMS_MIN
# ─── Report ───────────────────────────────────────────────────────────────────
def _fmt_chunk(c: dict) -> str:
cid = c.get("chunk_id", "?")
n = c.get("n_chars", 0)
preview = c.get("text", "")[:60].replace("\n", " ")
return f" [{cid}] ({n} char) «{preview}»"
def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool:
stem_dir = project_root / "chunks" / stem
chunks_path = stem_dir / "chunks.json"
min_chars, max_chars = _load_thresholds(stem_dir)
print(f"\nDocumento: {stem}")
if not chunks_path.exists():
print(f" ✗ chunks/{stem}/chunks.json non trovato")
print(f" Esegui prima: python chunks/chunker.py --stem {stem}")
return False
chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8"))
if not chunks:
print(f" ✗ chunks.json è vuoto")
return False
# ── Raccogli problemi ──────────────────────────────────────────────────────
empty_chunks = [c for c in chunks if is_empty(c)]
no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)]
too_short = [c for c in chunks if is_too_short(c, min_chars)]
too_long = [c for c in chunks if is_too_long(c, max_chars)]
_incomplete_all = [c for c in chunks if not is_empty(c) and ends_incomplete(c)]
incomplete_math = [c for c in _incomplete_all if is_math_incomplete(c)]
incomplete = [c for c in _incomplete_all if not is_math_incomplete(c)]
# ── Statistiche ───────────────────────────────────────────────────────────
lengths = [c.get("n_chars", 0) for c in chunks]
n_total = len(chunks)
n_ok = n_total - len(set(
c["chunk_id"]
for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete]
for c in lst
))
min_l = min(lengths)
max_l = max(lengths)
avg_l = int(sum(lengths) / n_total)
n_under = sum(1 for l in lengths if l < min_chars)
n_normal = sum(1 for l in lengths if min_chars <= l <= max_chars)
n_over = sum(1 for l in lengths if l > max_chars)
# ── Output ────────────────────────────────────────────────────────────────
print(f" Totale chunk: {n_total}")
print(f" ✅ OK: {n_ok}")
print()
print(f" Distribuzione lunghezze:")
print(f" Min: {min_l} char")
print(f" Max: {max_l} char")
print(f" Media: {avg_l} char")
print(f" < {min_chars} char (sotto MIN): {n_under}")
print(f" {min_chars}{max_chars} char (ideale): {n_normal}")
print(f" > {max_chars} char (sopra MAX): {n_over}")
has_errors = False
if empty_chunks:
has_errors = True
print(f"\n 🔴 {len(empty_chunks)} chunk VUOTI:")
for c in empty_chunks[:5]:
print(f" [{c.get('chunk_id', '?')}]")
if len(empty_chunks) > 5:
print(f" ... e altri {len(empty_chunks) - 5}")
if no_prefix:
has_errors = True
print(f"\n 🔴 {len(no_prefix)} chunk SENZA PREFISSO DI CONTESTO:")
for c in no_prefix[:5]:
print(_fmt_chunk(c))
if len(no_prefix) > 5:
print(f" ... e altri {len(no_prefix) - 5}")
print(f" → Causa probabile: header ### mancanti o malformati nel MD")
if too_short:
has_errors = True
print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):")
for c in too_short[:5]:
print(_fmt_chunk(c))
if len(too_short) > 5:
print(f" ... e altri {len(too_short) - 5}")
print(f" → Soluzione: abbassa MIN_CHARS o revisiona il MD")
if too_long:
has_errors = True
print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX ({max_chars}):")
for c in too_long[:5]:
print(_fmt_chunk(c))
if len(too_long) > 5:
print(f" ... e altri {len(too_long) - 5}")
print(f" → Causa probabile: frasi singole lunghe (liste/paragrafi non suddivisibili)")
if incomplete:
has_errors = True
print(f"\n 🔴 {len(incomplete)} chunk CHE FINISCONO SENZA PUNTEGGIATURA (frase spezzata):")
for c in incomplete[:5]:
last_line = c.get("text", "").rstrip().split("\n")[-1][-80:]
print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}")
if len(incomplete) > 5:
print(f" ... e altri {len(incomplete) - 5}")
print(f" → Soluzione: correggi le righe spezzate in conversione/{stem}/clean.md")
if incomplete_math:
has_errors = True
print(f"\n 🟡 {len(incomplete_math)} chunk MATEMATICI SENZA PUNTEGGIATURA (formula/espressione):")
for c in incomplete_math[:3]:
last_line = c.get("text", "").rstrip().split("\n")[-1][-80:]
print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}")
if len(incomplete_math) > 3:
print(f" ... e altri {len(incomplete_math) - 3}")
print(f" → Le formule non finiscono con punteggiatura — avviso non bloccante")
# ── Costruisci e salva report.json ────────────────────────────────────────
blockers = empty_chunks + no_prefix + incomplete
warnings = too_short + too_long + incomplete_math
def _chunk_entry(c: dict) -> dict:
return {
"chunk_id": c.get("chunk_id", ""),
"sezione": c.get("sezione", ""),
"titolo": c.get("titolo", ""),
"n_chars": c.get("n_chars", 0),
"last_text": c.get("text", "").rstrip().split("\n")[-1][-120:],
}
verdict = "ok" if not blockers else "blocked"
if not blockers and warnings:
verdict = "warnings_only"
report = {
"stem": stem,
"verdict": verdict,
"stats": {
"total": n_total,
"ok": n_ok,
"min_chars": min_l,
"max_chars": max_l,
"avg_chars": avg_l,
},
"thresholds": {
"min_chars": min_chars,
"max_chars": max_chars,
"target_chars": cfg.TARGET_CHARS,
"chunk_tolerance": cfg.CHUNK_TOLERANCE,
},
"blockers": {
"empty": [_chunk_entry(c) for c in empty_chunks],
"no_prefix": [_chunk_entry(c) for c in no_prefix],
"incomplete": [_chunk_entry(c) for c in incomplete],
},
"warnings": {
"too_short": [_chunk_entry(c) for c in too_short],
"too_long": [_chunk_entry(c) for c in too_long],
"incomplete_math": [_chunk_entry(c) for c in incomplete_math],
},
}
out_dir = project_root / "chunks" / stem
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / "report.json").write_text(
json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f"\n report.json salvato in chunks/{stem}/")
# ── Prossimi passi ────────────────────────────────────────────────────────
print(f"\n {'' * 50}")
print(f" PROSSIMI PASSI")
print(f" {'' * 50}")
if not blockers and not warnings:
print(f" ✅ Tutto OK — procedi alla vettorizzazione:")
print(f" python ingestion/ingest.py --stem {stem}")
elif not blockers:
print(f" 🟡 Solo avvisi minori — puoi procedere alla vettorizzazione:")
print(f" python ingestion/ingest.py --stem {stem}")
print()
print(f" Oppure, per ottimizzare prima:")
if too_short:
pct = int(len(too_short) / n_total * 100)
print(f"{len(too_short)} chunk corti ({pct}% del totale)")
if too_long:
pct = int(len(too_long) / n_total * 100)
print(f"{len(too_long)} chunk lunghi ({pct}% del totale)")
if too_short or too_long:
print(f" → Esegui: python chunks/fix_chunks.py --stem {stem} --dry-run")
print(f" poi: python chunks/fix_chunks.py --stem {stem}")
print(f" poi: python chunks/verify_chunks.py --stem {stem}")
else:
print(f" 🔴 Problemi bloccanti — correggi prima di procedere:")
print()
if empty_chunks:
print(f"{len(empty_chunks)} chunk vuoti")
print(f" → Controlla conversione/{stem}/clean.md per sezioni prive di testo")
if no_prefix:
print(f"{len(no_prefix)} chunk senza prefisso di contesto")
print(f" → Controlla che gli header ### siano corretti in conversione/{stem}/clean.md")
if incomplete:
print(f"{len(incomplete)} chunk con frase spezzata")
print(f" → Esegui: python chunks/fix_chunks.py --stem {stem}")
print()
print(f" Dopo le correzioni, riesegui nell'ordine:")
print(f" python chunks/chunker.py --stem {stem} --force")
print(f" python chunks/verify_chunks.py --stem {stem}")
print()
if warnings:
print(f" 🟡 Hai anche {len(warnings)} avvisi minori — affrontali dopo aver risolto i 🔴.")
return not blockers
# ─── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Verifica chunk")
parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)")
_min_def = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
_max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
parser.add_argument(
"--min", type=int, default=_min_def,
help=f"Soglia minima caratteri (default: TARGET×(1-TOL) = {_min_def})"
)
parser.add_argument(
"--max", type=int, default=_max_def,
help=f"Soglia massima caratteri (default: TARGET×(1+TOL) = {_max_def})"
)
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
chunks_dir = project_root / "chunks"
if not chunks_dir.exists():
print(f"Errore: cartella chunks/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(
p.name for p in chunks_dir.iterdir()
if p.is_dir() and (p / "chunks.json").exists()
)
if not stems:
print("Errore: nessun chunks.json trovato in chunks/")
sys.exit(1)
results = [verify_stem(s, project_root, args.min, args.max) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi")
sys.exit(0 if all(results) else 1)