From 4c0e0db2a51c2e064ec5ebb4b9cf2ce4e7655b57 Mon Sep 17 00:00:00 2001 From: Davide Grilli Date: Mon, 20 Apr 2026 11:36:18 +0200 Subject: [PATCH] feat(chunks): aggiungi pipeline chunking consolidata Nuova cartella chunks/ con chunker.py (step 5), verify_chunks.py e fix_chunks.py (step 6). Tutto l'I/O va in chunks// invece di step-5/ e step-6/ separati. Input: conversione//clean.md --- chunks/chunker.py | 414 ++++++++++++++++++++++++++++++++++++++++ chunks/fix_chunks.py | 283 +++++++++++++++++++++++++++ chunks/verify_chunks.py | 302 +++++++++++++++++++++++++++++ 3 files changed, 999 insertions(+) create mode 100644 chunks/chunker.py create mode 100644 chunks/fix_chunks.py create mode 100644 chunks/verify_chunks.py diff --git a/chunks/chunker.py b/chunks/chunker.py new file mode 100644 index 0000000..2f4c718 --- /dev/null +++ b/chunks/chunker.py @@ -0,0 +1,414 @@ +#!/usr/bin/env python3 +""" +Chunking adattivo + +Divide il Markdown revisionato in chunk semantici pronti per la +vettorizzazione. La strategia dipende dal profilo strutturale del documento. + +Input: conversione//clean.md + conversione//structure_profile.json +Output: chunks//chunks.json + +Uso: + python chunks/chunker.py # tutti i documenti in conversione/ + python chunks/chunker.py --stem documento # un solo documento + python chunks/chunker.py --stem documento --force +""" + +import argparse +import json +import re +import sys +from pathlib import Path + + +# ─── Parametri ──────────────────────────────────────────────────────────────── + +MIN_CHARS = 200 # sotto questa soglia → accorpa al chunk successivo +MAX_CHARS = 800 # sopra questa soglia → spezza su frasi +OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary + + +# ─── Utilità ────────────────────────────────────────────────────────────────── + +def split_sentences(text: str) -> list[str]: + parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip()) + if len(parts) <= 1: + parts = re.split(r'(?<=[.!?»])\s+', text.strip()) + return [p.strip() for p in parts if p.strip()] + + +def slugify(s: str, max_len: int = 60) -> str: + s = s.lower() + s = re.sub(r'[^\w\s-]', '', s) + s = re.sub(r'[\s_-]+', '_', s).strip('_') + return s[:max_len] if s else "section" + + +def make_sub_chunks( + body: str, + prefix: str, + sezione: str, + titolo: str, + max_chars: int, + overlap_s: int, +) -> list[dict]: + sentences = split_sentences(body) + if not sentences: + return [] + + chunks = [] + current: list[str] = [] + current_len = 0 + sub_index = 0 + + i = 0 + while i < len(sentences): + sent = sentences[i] + if not current or current_len + len(sent) + 1 <= max_chars: + current.append(sent) + current_len += len(sent) + (1 if len(current) > 1 else 0) + i += 1 + else: + chunk_text = prefix + " ".join(current) + chunks.append({ + "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "sub_index": sub_index, + "n_chars": len(chunk_text), + }) + sub_index += 1 + overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] + current = overlap[:] + current_len = sum(len(s) + 1 for s in current) + + if current: + chunk_text = prefix + " ".join(current) + chunks.append({ + "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "sub_index": sub_index, + "n_chars": len(chunk_text), + }) + + return chunks + + +# ─── Parser Markdown ────────────────────────────────────────────────────────── + +def parse_h3_sections(text: str) -> list[dict]: + sections = [] + current_h2 = "" + current_h3 = "" + current_body_lines: list[str] = [] + + def flush(): + body = "\n".join(current_body_lines).strip() + if body: + sections.append({ + "sezione": current_h2, + "titolo": current_h3, + "body": body, + }) + + for line in text.splitlines(): + if re.match(r"^# ", line): + flush() + current_h2 = line[2:].strip() + current_h3 = "" + current_body_lines = [] + elif re.match(r"^## ", line): + flush() + current_h2 = line[3:].strip() + current_h3 = "" + current_body_lines = [] + elif re.match(r"^### ", line): + flush() + current_h3 = line[4:].strip() + current_body_lines = [] + else: + current_body_lines.append(line) + + flush() + return sections + + +def parse_h2_sections(text: str) -> list[dict]: + sections = [] + current_h2 = "" + current_body_lines: list[str] = [] + + def flush(): + body = "\n".join(current_body_lines).strip() + if body: + sections.append({"sezione": current_h2, "body": body}) + + for line in text.splitlines(): + if re.match(r"^## ", line): + flush() + current_h2 = line[3:].strip() + current_body_lines = [] + elif re.match(r"^# ", line): + flush() + current_h2 = line[2:].strip() + current_body_lines = [] + else: + current_body_lines.append(line) + + flush() + return sections + + +# ─── Strategie di chunking ──────────────────────────────────────────────────── + +def chunk_h3_aware(text: str, stem: str) -> list[dict]: + sections = parse_h3_sections(text) + + merged: list[dict] = [] + pending: dict | None = None + + for sec in sections: + if pending is None: + pending = dict(sec) + continue + + if (pending["sezione"] == sec["sezione"] + and len(pending["body"]) < MIN_CHARS): + sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]])) + pending = { + "sezione": pending["sezione"], + "titolo": sep_title or pending["titolo"], + "body": pending["body"] + "\n\n" + sec["body"], + } + else: + merged.append(pending) + pending = dict(sec) + + if pending: + merged.append(pending) + + chunks = [] + for sec in merged: + sezione = sec["sezione"] or stem + titolo = sec["titolo"] or "" + body = sec["body"] + + prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n" + sub = make_sub_chunks(body, prefix, sezione, titolo, MAX_CHARS, OVERLAP_S) + chunks.extend(sub) + + return chunks + + +def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: + sections = parse_h2_sections(text) + chunks = [] + + for sec in sections: + sezione = sec["sezione"] or stem + body = sec["body"] + prefix = f"[{sezione}]\n" + + paragraphs = [ + p.strip() + for p in re.split(r"\n{2,}", body) + if p.strip() and not re.match(r"^#+\s", p.strip()) + ] + + merged_pars: list[str] = [] + pending = "" + for par in paragraphs: + if pending and len(pending) < MIN_CHARS: + pending = pending + "\n\n" + par + else: + if pending: + merged_pars.append(pending) + pending = par + if pending: + merged_pars.append(pending) + + for idx, par in enumerate(merged_pars): + sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", MAX_CHARS, OVERLAP_S) + for c in sub: + c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}" + chunks.extend(sub) + + return chunks + + +def chunk_paragraph(text: str, stem: str) -> list[dict]: + paragraphs = [ + p.strip() + for p in re.split(r"\n{2,}", text) + if p.strip() and not re.match(r"^#+\s", p.strip()) + ] + prefix = f"[Documento: {stem}]\n" + + merged: list[str] = [] + pending = "" + for par in paragraphs: + if pending and len(pending) < MIN_CHARS: + pending = pending + "\n\n" + par + else: + if pending: + merged.append(pending) + pending = par + if pending: + merged.append(pending) + + chunks = [] + for idx, par in enumerate(merged): + sub = make_sub_chunks(par, prefix, stem, f"par{idx}", MAX_CHARS, OVERLAP_S) + for c in sub: + c["chunk_id"] = f"para__{idx}__s{c['sub_index']}" + chunks.extend(sub) + + return chunks + + +def chunk_sliding_window(text: str, stem: str) -> list[dict]: + sentences = split_sentences(text) + prefix = f"[Documento: {stem}]\n" + + chunks = [] + i = 0 + win_idx = 0 + + while i < len(sentences): + window: list[str] = [] + cur_len = 0 + + j = i + while j < len(sentences): + s = sentences[j] + if window and cur_len + len(s) + 1 > MAX_CHARS: + break + window.append(s) + cur_len += len(s) + (1 if len(window) > 1 else 0) + j += 1 + + if not window: + window = [sentences[i]] + j = i + 1 + + chunk_text = prefix + " ".join(window) + chunks.append({ + "chunk_id": f"win__{win_idx}", + "text": chunk_text, + "sezione": stem, + "titolo": f"finestra {win_idx}", + "sub_index": win_idx, + "n_chars": len(chunk_text), + }) + win_idx += 1 + i += max(1, len(window) - OVERLAP_S) + + return chunks + + +# ─── Dispatcher ─────────────────────────────────────────────────────────────── + +_STRATEGIES: dict[str, callable] = { + "h3_aware": chunk_h3_aware, + "h2_paragraph_split": chunk_h2_paragraph_split, + "paragraph": chunk_paragraph, + "sliding_window": chunk_sliding_window, +} + + +def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]: + text = clean_md.read_text(encoding="utf-8") + strategia = profile.get("strategia_chunking", "paragraph") + fn = _STRATEGIES.get(strategia, chunk_paragraph) + return fn(text, stem) + + +# ─── Per-document processing ────────────────────────────────────────────────── + +def process_stem(stem: str, project_root: Path, force: bool) -> bool: + conv_dir = project_root / "conversione" / stem + out_dir = project_root / "chunks" / stem + clean_md = conv_dir / "clean.md" + profile_path = conv_dir / "structure_profile.json" + out_file = out_dir / "chunks.json" + + print(f"\nDocumento: {stem}") + + if not clean_md.exists(): + print(f" ✗ clean.md non trovato in conversione/{stem}/ — skip") + return False + if not profile_path.exists(): + print(f" ✗ structure_profile.json non trovato in conversione/{stem}/ — skip") + return False + + if out_file.exists() and not force: + print(f" ⚠️ chunks.json già presente — skip") + print(f" (usa --force per rieseguire)") + return True + + profile = json.loads(profile_path.read_text(encoding="utf-8")) + strategia = profile.get("strategia_chunking", "paragraph") + print(f" Strategia: {strategia}") + + chunks = chunk_document(clean_md, profile, stem) + + if not chunks: + print(f" ✗ Nessun chunk generato — controlla clean.md") + return False + + out_dir.mkdir(parents=True, exist_ok=True) + out_file.write_text( + json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" + ) + + lengths = [c["n_chars"] for c in chunks] + min_c = min(lengths) + max_c = max(lengths) + avg_c = int(sum(lengths) / len(lengths)) + short = sum(1 for l in lengths if l < MIN_CHARS) + long_ = sum(1 for l in lengths if l > MAX_CHARS * 1.5) + + print(f" Chunk totali: {len(chunks)}") + print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char") + if short: + print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})") + if long_: + print(f" ⚠️ {long_} chunk sopra MAX_CHARS×1.5 ({int(MAX_CHARS * 1.5)})") + print(f" ✅ chunks.json salvato in chunks/{stem}/") + return True + + +# ─── Entry point ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + project_root = Path(__file__).parent.parent + + parser = argparse.ArgumentParser(description="Chunking adattivo") + parser.add_argument("--stem", help="Nome del documento (sottocartella di conversione/)") + parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") + args = parser.parse_args() + + if args.stem: + stems = [args.stem] + else: + conv_dir = project_root / "conversione" + if not conv_dir.exists(): + print(f"Errore: cartella conversione/ non trovata in {project_root}") + sys.exit(1) + stems = sorted( + p.name for p in conv_dir.iterdir() + if p.is_dir() and (p / "clean.md").exists() + ) + if not stems: + print(f"Errore: nessun documento trovato in conversione/") + sys.exit(1) + + results = [process_stem(s, project_root, args.force) for s in stems] + + ok = sum(results) + total = len(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti processati") + sys.exit(0 if all(results) else 1) diff --git a/chunks/fix_chunks.py b/chunks/fix_chunks.py new file mode 100644 index 0000000..e817e51 --- /dev/null +++ b/chunks/fix_chunks.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python3 +""" +Fix chunk + +Applica correzioni dirette su chunks//chunks.json basandosi sul +report.json prodotto da verify_chunks.py. Non tocca clean.md. + +Fixes applicati: + empty → rimuove il chunk + incomplete → fonde con il chunk successivo (la frase continua) + no_prefix → aggiunge prefisso [sezione > titolo] se mancante + too_short → fonde con il chunk adiacente nello stesso sezione + too_long → spezza all'ultimo confine di paragrafo/frase entro MAX_CHARS + +Input: chunks//chunks.json + chunks//report.json +Output: chunks//chunks.json (sovrascrive) + +Uso: + python chunks/fix_chunks.py --stem documento + python chunks/fix_chunks.py --stem documento --dry-run +""" + +import argparse +import json +import re +import sys +from pathlib import Path + +MAX_CHARS = 800 +PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$") + + +# ─── Helpers ────────────────────────────────────────────────────────────────── + +def _prefix(chunk: dict) -> str: + sezione = chunk.get("sezione", "") + titolo = chunk.get("titolo", "") + if titolo: + return f"[{sezione} > {titolo}]" + return f"[{sezione}]" + + +def _strip_prefix(text: str) -> str: + text = text.lstrip() + if text.startswith("["): + end = text.find("]") + if end != -1: + return text[end + 1:].lstrip("\n") + return text + + +def _rebuild_text(chunk: dict, body: str) -> str: + return f"{_prefix(chunk)}\n{body}" + + +def _split_at_boundary(text: str, max_chars: int) -> list[str]: + if len(text) <= max_chars: + return [text] + + parts = [] + remaining = text + + while len(remaining) > max_chars: + candidate = remaining[:max_chars] + split_pos = candidate.rfind("\n\n") + + if split_pos == -1: + m = None + for m in re.finditer(r"[.!?»]\s+", candidate): + pass + split_pos = m.end() if m else None + + if split_pos is None or split_pos == 0: + sp = remaining.find(" ", max_chars) + split_pos = sp if sp != -1 else len(remaining) + + parts.append(remaining[:split_pos].rstrip()) + remaining = remaining[split_pos:].lstrip() + + if remaining: + parts.append(remaining) + + return [p for p in parts if p.strip()] + + +# ─── Operazioni sui chunk ───────────────────────────────────────────────────── + +def fix_empty(chunks: list[dict], empty_ids: set[str]) -> tuple[list[dict], int]: + before = len(chunks) + chunks = [c for c in chunks if c["chunk_id"] not in empty_ids] + return chunks, before - len(chunks) + + +def fix_no_prefix(chunks: list[dict], no_prefix_ids: set[str]) -> tuple[list[dict], int]: + count = 0 + for c in chunks: + if c["chunk_id"] in no_prefix_ids: + body = _strip_prefix(c["text"]) + c["text"] = _rebuild_text(c, body) + c["n_chars"] = len(c["text"]) + count += 1 + return chunks, count + + +def fix_incomplete_and_short(chunks: list[dict], + problem_ids: set[str]) -> tuple[list[dict], int]: + merged = 0 + i = 0 + result: list[dict] = [] + + while i < len(chunks): + c = chunks[i] + if c["chunk_id"] in problem_ids and i + 1 < len(chunks): + nxt = chunks[i + 1] + body_c = _strip_prefix(c["text"]) + body_nxt = _strip_prefix(nxt["text"]) + merged_body = body_c.rstrip() + "\n" + body_nxt.lstrip() + nxt["text"] = _rebuild_text(nxt, merged_body) + nxt["n_chars"] = len(nxt["text"]) + merged += 1 + i += 1 + continue + result.append(c) + i += 1 + + return result, merged + + +def fix_too_long(chunks: list[dict], + too_long_ids: set[str], + max_chars: int) -> tuple[list[dict], int]: + result: list[dict] = [] + split_count = 0 + + for c in chunks: + if c["chunk_id"] not in too_long_ids: + result.append(c) + continue + + body = _strip_prefix(c["text"]) + parts = _split_at_boundary(body, max_chars) + + if len(parts) == 1: + result.append(c) + continue + + base_id = re.sub(r"__s\d+$", "", c["chunk_id"]) + base_sub = c.get("sub_index", 0) + + for j, part in enumerate(parts): + new_chunk = dict(c) + new_chunk["sub_index"] = base_sub + j + new_chunk["chunk_id"] = f"{base_id}__s{base_sub + j}" + new_chunk["text"] = _rebuild_text(new_chunk, part) + new_chunk["n_chars"] = len(new_chunk["text"]) + result.append(new_chunk) + + split_count += 1 + + return result, split_count + + +def renumber_ids(chunks: list[dict]) -> list[dict]: + seen: dict[str, int] = {} + for c in chunks: + base = re.sub(r"__s\d+$", "", c["chunk_id"]) + idx = seen.get(base, 0) + c["chunk_id"] = f"{base}__s{idx}" + c["sub_index"] = idx + seen[base] = idx + 1 + return chunks + + +# ─── Core ───────────────────────────────────────────────────────────────────── + +def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bool: + stem_dir = project_root / "chunks" / stem + chunks_path = stem_dir / "chunks.json" + report_path = stem_dir / "report.json" + + if not chunks_path.exists(): + print(f"✗ chunks/{stem}/chunks.json non trovato.") + print(f" Esegui prima: python chunks/chunker.py --stem {stem}") + return False + + if not report_path.exists(): + print(f"✗ chunks/{stem}/report.json non trovato.") + print(f" Esegui prima: python chunks/verify_chunks.py --stem {stem}") + return False + + chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8")) + report: dict = json.loads(report_path.read_text(encoding="utf-8")) + + verdict = report.get("verdict", "ok") + print(f"\nDocumento: {stem} (verdict: {verdict})") + + if verdict == "ok": + print(" ✅ Nessun problema — nulla da correggere.") + return True + + empty_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])} + no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])} + incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])} + too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])} + too_long_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_long", [])} + + ops: list[str] = [] + if empty_ids: + ops.append(f" 🗑 rimuovi {len(empty_ids)} chunk vuoti") + if no_prefix_ids: + ops.append(f" 🔧 aggiungi prefisso a {len(no_prefix_ids)} chunk") + if incomplete_ids: + ops.append(f" 🔗 fondi {len(incomplete_ids)} chunk incompleti col successivo") + if too_short_ids: + ops.append(f" 🔗 fondi {len(too_short_ids)} chunk troppo corti col successivo") + if too_long_ids: + ops.append(f" ✂️ spezza {len(too_long_ids)} chunk troppo lunghi") + + if not ops: + print(" ✅ Nessuna correzione necessaria.") + return True + + print("\n Operazioni pianificate:") + for op in ops: + print(op) + + if dry_run: + print("\n [dry-run] Nessuna modifica applicata.") + return True + + n_before = len(chunks) + + if empty_ids: + chunks, n = fix_empty(chunks, empty_ids) + print(f"\n 🗑 Rimossi {n} chunk vuoti.") + + if no_prefix_ids: + chunks, n = fix_no_prefix(chunks, no_prefix_ids) + print(f" 🔧 Aggiunto prefisso a {n} chunk.") + + merge_ids = incomplete_ids | too_short_ids + if merge_ids: + chunks, n = fix_incomplete_and_short(chunks, merge_ids) + print(f" 🔗 Fusi {n} chunk (incompleti + corti).") + + if too_long_ids: + chunks, n = fix_too_long(chunks, too_long_ids, max_chars) + print(f" ✂️ Spezzati {n} chunk lunghi.") + + chunks = renumber_ids(chunks) + + n_after = len(chunks) + print(f"\n Totale chunk: {n_before} → {n_after}") + + chunks_path.write_text( + json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" + ) + print(f" ✅ Salvato: chunks/{stem}/chunks.json") + print(f"\n Riesegui la verifica:") + print(f" python chunks/verify_chunks.py --stem {stem}") + + return True + + +# ─── Entry point ────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + project_root = Path(__file__).parent.parent + + parser = argparse.ArgumentParser(description="Fix chunk") + parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di chunks/)") + parser.add_argument( + "--max", type=int, default=MAX_CHARS, + help=f"Soglia massima caratteri per lo split (default: {MAX_CHARS})" + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Mostra le operazioni pianificate senza applicarle" + ) + args = parser.parse_args() + + ok = fix_stem(args.stem, project_root, args.max, args.dry_run) + sys.exit(0 if ok else 1) diff --git a/chunks/verify_chunks.py b/chunks/verify_chunks.py new file mode 100644 index 0000000..b18e55a --- /dev/null +++ b/chunks/verify_chunks.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 +""" +Verifica chunk + +Analizza chunks//chunks.json e segnala ogni anomalia che potrebbe +degradare la qualità del retrieval. Non modifica nulla. + +Input: chunks//chunks.json +Output: report a schermo + chunks//report.json + exit code (0 = OK, 1 = problemi) + +Uso: + python chunks/verify_chunks.py --stem documento + python chunks/verify_chunks.py # tutti i documenti in chunks/ + python chunks/verify_chunks.py --min 200 --max 800 +""" + +import argparse +import json +import re +import sys +from pathlib import Path + + +# ─── Soglie ─────────────────────────────────────────────────────────────────── + +MIN_CHARS = 200 +MAX_CHARS = 800 +PUNCT_END = re.compile("[.!?»)\\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026-]$") + + +# ─── Checks ─────────────────────────────────────────────────────────────────── + +def has_prefix(chunk: dict) -> bool: + return chunk.get("text", "").lstrip().startswith("[") + + +def is_empty(chunk: dict) -> bool: + return not chunk.get("text", "").strip() + + +def is_too_short(chunk: dict, min_chars: int) -> bool: + return chunk.get("n_chars", 0) < min_chars + + +def is_too_long(chunk: dict, max_chars: int) -> bool: + return chunk.get("n_chars", 0) > max_chars * 1.5 + + +def ends_incomplete(chunk: dict) -> bool: + text = chunk.get("text", "").rstrip() + if not text: + return False + text_check = re.sub(r"[_*]+$", "", text).rstrip() + if not text_check: + return False + return not PUNCT_END.search(text_check) + + +# ─── Report ─────────────────────────────────────────────────────────────────── + +def _fmt_chunk(c: dict) -> str: + cid = c.get("chunk_id", "?") + n = c.get("n_chars", 0) + preview = c.get("text", "")[:60].replace("\n", " ") + return f" [{cid}] ({n} char) «{preview}»" + + +def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool: + chunks_path = project_root / "chunks" / stem / "chunks.json" + + print(f"\nDocumento: {stem}") + + if not chunks_path.exists(): + print(f" ✗ chunks/{stem}/chunks.json non trovato") + print(f" Esegui prima: python chunks/chunker.py --stem {stem}") + return False + + chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8")) + + if not chunks: + print(f" ✗ chunks.json è vuoto") + return False + + # ── Raccogli problemi ────────────────────────────────────────────────────── + + empty_chunks = [c for c in chunks if is_empty(c)] + no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)] + too_short = [c for c in chunks if is_too_short(c, min_chars)] + too_long = [c for c in chunks if is_too_long(c, max_chars)] + incomplete = [c for c in chunks if not is_empty(c) and ends_incomplete(c)] + + # ── Statistiche ─────────────────────────────────────────────────────────── + + lengths = [c.get("n_chars", 0) for c in chunks] + n_total = len(chunks) + n_ok = n_total - len(set( + c["chunk_id"] + for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete] + for c in lst + )) + min_l = min(lengths) + max_l = max(lengths) + avg_l = int(sum(lengths) / n_total) + + n_under = sum(1 for l in lengths if l < min_chars) + n_normal = sum(1 for l in lengths if min_chars <= l <= max_chars) + n_over = sum(1 for l in lengths if l > max_chars) + + # ── Output ──────────────────────────────────────────────────────────────── + + print(f" Totale chunk: {n_total}") + print(f" ✅ OK: {n_ok}") + print() + print(f" Distribuzione lunghezze:") + print(f" Min: {min_l} char") + print(f" Max: {max_l} char") + print(f" Media: {avg_l} char") + print(f" < {min_chars} char (sotto MIN): {n_under}") + print(f" {min_chars}–{max_chars} char (ideale): {n_normal}") + print(f" > {max_chars} char (sopra MAX): {n_over}") + + has_errors = False + + if empty_chunks: + has_errors = True + print(f"\n 🔴 {len(empty_chunks)} chunk VUOTI:") + for c in empty_chunks[:5]: + print(f" [{c.get('chunk_id', '?')}]") + if len(empty_chunks) > 5: + print(f" ... e altri {len(empty_chunks) - 5}") + + if no_prefix: + has_errors = True + print(f"\n 🔴 {len(no_prefix)} chunk SENZA PREFISSO DI CONTESTO:") + for c in no_prefix[:5]: + print(_fmt_chunk(c)) + if len(no_prefix) > 5: + print(f" ... e altri {len(no_prefix) - 5}") + print(f" → Causa probabile: header ### mancanti o malformati nel MD") + + if too_short: + has_errors = True + print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):") + for c in too_short[:5]: + print(_fmt_chunk(c)) + if len(too_short) > 5: + print(f" ... e altri {len(too_short) - 5}") + print(f" → Soluzione: abbassa MIN_CHARS o revisiona il MD") + + if too_long: + has_errors = True + print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX_CHARS×1.5 ({int(max_chars * 1.5)}):") + for c in too_long[:5]: + print(_fmt_chunk(c)) + if len(too_long) > 5: + print(f" ... e altri {len(too_long) - 5}") + print(f" → Soluzione: alza MAX_CHARS o verifica il testo nel MD") + + if incomplete: + has_errors = True + print(f"\n 🔴 {len(incomplete)} chunk CHE FINISCONO SENZA PUNTEGGIATURA (frase spezzata):") + for c in incomplete[:5]: + last_line = c.get("text", "").rstrip().split("\n")[-1][-80:] + print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}") + if len(incomplete) > 5: + print(f" ... e altri {len(incomplete) - 5}") + print(f" → Soluzione: correggi le righe spezzate in conversione/{stem}/clean.md") + + # ── Costruisci e salva report.json ──────────────────────────────────────── + + blockers = empty_chunks + no_prefix + incomplete + warnings = too_short + too_long + + def _chunk_entry(c: dict) -> dict: + return { + "chunk_id": c.get("chunk_id", ""), + "sezione": c.get("sezione", ""), + "titolo": c.get("titolo", ""), + "n_chars": c.get("n_chars", 0), + "last_text": c.get("text", "").rstrip().split("\n")[-1][-120:], + } + + verdict = "ok" if not blockers else "blocked" + if not blockers and warnings: + verdict = "warnings_only" + + report = { + "stem": stem, + "verdict": verdict, + "stats": { + "total": n_total, + "ok": n_ok, + "min_chars": min_l, + "max_chars": max_l, + "avg_chars": avg_l, + }, + "thresholds": {"min_chars": min_chars, "max_chars": max_chars}, + "blockers": { + "empty": [_chunk_entry(c) for c in empty_chunks], + "no_prefix": [_chunk_entry(c) for c in no_prefix], + "incomplete": [_chunk_entry(c) for c in incomplete], + }, + "warnings": { + "too_short": [_chunk_entry(c) for c in too_short], + "too_long": [_chunk_entry(c) for c in too_long], + }, + } + + out_dir = project_root / "chunks" / stem + out_dir.mkdir(parents=True, exist_ok=True) + (out_dir / "report.json").write_text( + json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8" + ) + print(f"\n report.json salvato in chunks/{stem}/") + + # ── Prossimi passi ──────────────────────────────────────────────────────── + + print(f"\n {'─' * 50}") + print(f" PROSSIMI PASSI") + print(f" {'─' * 50}") + + if not blockers and not warnings: + print(f" ✅ Tutto OK — procedi alla vettorizzazione:") + print(f" python step-8/ingest.py --stem {stem}") + + elif not blockers: + print(f" 🟡 Solo avvisi minori — puoi procedere alla vettorizzazione:") + print(f" python step-8/ingest.py --stem {stem}") + print() + print(f" Oppure, per ottimizzare prima:") + if too_short: + pct = int(len(too_short) / n_total * 100) + print(f" • {len(too_short)} chunk corti ({pct}% del totale)") + if too_long: + pct = int(len(too_long) / n_total * 100) + print(f" • {len(too_long)} chunk lunghi ({pct}% del totale)") + if too_short or too_long: + print(f" → Esegui: python chunks/fix_chunks.py --stem {stem} --dry-run") + print(f" poi: python chunks/fix_chunks.py --stem {stem}") + print(f" poi: python chunks/verify_chunks.py --stem {stem}") + + else: + print(f" 🔴 Problemi bloccanti — correggi prima di procedere:") + print() + if empty_chunks: + print(f" • {len(empty_chunks)} chunk vuoti") + print(f" → Controlla conversione/{stem}/clean.md per sezioni prive di testo") + if no_prefix: + print(f" • {len(no_prefix)} chunk senza prefisso di contesto") + print(f" → Controlla che gli header ### siano corretti in conversione/{stem}/clean.md") + if incomplete: + print(f" • {len(incomplete)} chunk con frase spezzata") + print(f" → Esegui: python chunks/fix_chunks.py --stem {stem}") + print() + print(f" Dopo le correzioni, riesegui nell'ordine:") + print(f" python chunks/chunker.py --stem {stem} --force") + print(f" python chunks/verify_chunks.py --stem {stem}") + print() + if warnings: + print(f" 🟡 Hai anche {len(warnings)} avvisi minori — affrontali dopo aver risolto i 🔴.") + + return not blockers + + +# ─── Entry point ────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + project_root = Path(__file__).parent.parent + + parser = argparse.ArgumentParser(description="Verifica chunk") + parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)") + parser.add_argument( + "--min", type=int, default=MIN_CHARS, + help=f"Soglia minima caratteri (default: {MIN_CHARS})" + ) + parser.add_argument( + "--max", type=int, default=MAX_CHARS, + help=f"Soglia massima caratteri (default: {MAX_CHARS})" + ) + args = parser.parse_args() + + if args.stem: + stems = [args.stem] + else: + chunks_dir = project_root / "chunks" + if not chunks_dir.exists(): + print(f"Errore: cartella chunks/ non trovata in {project_root}") + sys.exit(1) + stems = sorted( + p.name for p in chunks_dir.iterdir() + if p.is_dir() and (p / "chunks.json").exists() + ) + if not stems: + print("Errore: nessun chunks.json trovato in chunks/") + sys.exit(1) + + results = [verify_stem(s, project_root, args.min, args.max) for s in stems] + + ok = sum(results) + total = len(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi") + sys.exit(0 if all(results) else 1)