#!/usr/bin/env python3 """ Fix chunk Applica correzioni dirette su chunks//chunks.json basandosi sul report.json prodotto da verify_chunks.py. Non tocca clean.md. Fixes applicati: empty → rimuove il chunk incomplete → fonde con il chunk successivo (la frase continua) no_prefix → aggiunge prefisso [sezione > titolo] se mancante too_short → fonde con il chunk adiacente nello stesso sezione too_long → spezza all'ultimo confine di paragrafo/frase entro MAX_CHARS Input: chunks//chunks.json + chunks//report.json Output: chunks//chunks.json (sovrascrive) Uso: python chunks/fix_chunks.py --stem documento python chunks/fix_chunks.py --stem documento --dry-run """ import argparse import contextlib import io import json import re import sys from pathlib import Path _HERE = Path(__file__).resolve().parent if str(_HERE) not in sys.path: sys.path.insert(0, str(_HERE)) import config as cfg from verify_chunks import verify_stem as _verify_stem MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) def _load_thresholds(stem_dir: Path) -> int: """Legge max_chars da meta.json (scritto dal chunker) o usa il default da config.""" meta = stem_dir / "meta.json" if meta.exists(): import json as _json return _json.loads(meta.read_text(encoding="utf-8"))["max_chars"] return MAX_CHARS PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$") # ─── Helpers ────────────────────────────────────────────────────────────────── def _prefix(chunk: dict) -> str: sezione = chunk.get("sezione", "") titolo = chunk.get("titolo", "") if titolo: return f"[{sezione} > {titolo}]" return f"[{sezione}]" def _strip_prefix(text: str) -> str: text = text.lstrip() if text.startswith("["): end = text.find("]") if end != -1: return text[end + 1:].lstrip("\n") return text def _rebuild_text(chunk: dict, body: str) -> str: return f"{_prefix(chunk)}\n{body}" # Fine frase forte: . ! ? seguiti da spazio + maiuscola o virgolette. # Non usare punteggiatura debole (,;:)>>]) per non creare chunk incompleti. _STRONG_END = re.compile( r'[.!?\xbb]\s+(?=[A-Z\xc0-\xd6\xd8-\xde\xc0-\xff\xab\x22\x27(])' ) _SECONDARY_END = re.compile(r';\s+') def _split_at_boundary(text: str, max_chars: int) -> list[str]: """Spezza text in parti ≤ max_chars su confini di frase forti (.!?). Se non trova un confine forte entro max_chars, NON spezza: meglio un chunk too_long (warning) che un chunk incompleto (blocker). """ if len(text) <= max_chars: return [text] parts = [] remaining = text while len(remaining) > max_chars: candidate = remaining[:max_chars] last_pos = -1 for m in _STRONG_END.finditer(candidate): last_pos = m.start() + 1 # posizione dopo il carattere terminatore if last_pos > 0: first = remaining[:last_pos].rstrip() remaining = remaining[last_pos:].lstrip() if first: parts.append(first) else: # Prova confine secondario: ; + spazio (clausole legali) sec_pos = -1 for m in _SECONDARY_END.finditer(candidate): sec_pos = m.start() + 1 if sec_pos > 0: first = remaining[:sec_pos].rstrip() remaining = remaining[sec_pos:].lstrip() if first: parts.append(first) else: # Nessun confine: lascia il chunk intero (too_long > incomplete) break if remaining: parts.append(remaining) return [p for p in parts if p.strip()] # ─── Operazioni sui chunk ───────────────────────────────────────────────────── def fix_empty(chunks: list[dict], empty_ids: set[str]) -> tuple[list[dict], int]: before = len(chunks) chunks = [c for c in chunks if c["chunk_id"] not in empty_ids] return chunks, before - len(chunks) def fix_no_prefix(chunks: list[dict], no_prefix_ids: set[str]) -> tuple[list[dict], int]: count = 0 for c in chunks: if c["chunk_id"] in no_prefix_ids: body = _strip_prefix(c["text"]) c["text"] = _rebuild_text(c, body) c["n_chars"] = len(c["text"]) count += 1 return chunks, count def fix_incomplete_and_short(chunks: list[dict], problem_ids: set[str]) -> tuple[list[dict], int]: merged = 0 i = 0 result: list[dict] = [] while i < len(chunks): c = chunks[i] if c["chunk_id"] in problem_ids and i + 1 < len(chunks): nxt = chunks[i + 1] body_c = _strip_prefix(c["text"]) body_nxt = _strip_prefix(nxt["text"]) merged_body = body_c.rstrip() + "\n" + body_nxt.lstrip() nxt["text"] = _rebuild_text(nxt, merged_body) nxt["n_chars"] = len(nxt["text"]) merged += 1 i += 1 continue result.append(c) i += 1 return result, merged def fix_too_long(chunks: list[dict], too_long_ids: set[str], max_chars: int) -> tuple[list[dict], int]: result: list[dict] = [] split_count = 0 for c in chunks: if c["chunk_id"] not in too_long_ids: result.append(c) continue body = _strip_prefix(c["text"]) parts = _split_at_boundary(body, max_chars) if len(parts) == 1: result.append(c) continue base_id = re.sub(r"__s\d+$", "", c["chunk_id"]) base_sub = c.get("sub_index", 0) for j, part in enumerate(parts): new_chunk = dict(c) new_chunk["sub_index"] = base_sub + j new_chunk["chunk_id"] = f"{base_id}__s{base_sub + j}" new_chunk["text"] = _rebuild_text(new_chunk, part) new_chunk["n_chars"] = len(new_chunk["text"]) result.append(new_chunk) split_count += 1 return result, split_count def renumber_ids(chunks: list[dict]) -> list[dict]: seen: dict[str, int] = {} for c in chunks: base = re.sub(r"__s\d+$", "", c["chunk_id"]) idx = seen.get(base, 0) c["chunk_id"] = f"{base}__s{idx}" c["sub_index"] = idx seen[base] = idx + 1 return chunks # ─── Core ───────────────────────────────────────────────────────────────────── def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool, max_iter: int = 10) -> bool: stem_dir = project_root / "chunks" / stem chunks_path = stem_dir / "chunks.json" report_path = stem_dir / "report.json" max_chars = _load_thresholds(stem_dir) if not chunks_path.exists(): print(f"✗ chunks/{stem}/chunks.json non trovato.") print(f" Esegui prima: python chunks/chunker.py --stem {stem}") return False if not report_path.exists(): print(f"✗ chunks/{stem}/report.json non trovato.") print(f" Esegui prima: python chunks/verify_chunks.py --stem {stem}") return False chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8")) report: dict = json.loads(report_path.read_text(encoding="utf-8")) verdict = report.get("verdict", "ok") print(f"\nDocumento: {stem} (verdict: {verdict})") if verdict == "ok": print(" ✅ Nessun problema - nulla da correggere.") return True empty_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])} no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])} incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])} too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])} # Spezza solo chunk che superano upper × SPLIT_THRESHOLD_FACTOR, # non quelli appena oltre upper (che causerebbero split con chunk incompleti). _split_limit = max_chars * cfg.SPLIT_THRESHOLD_FACTOR too_long_ids = { e["chunk_id"] for e in report.get("warnings", {}).get("too_long", []) if e.get("n_chars", 0) > _split_limit } ops: list[str] = [] if empty_ids: ops.append(f" 🗑 rimuovi {len(empty_ids)} chunk vuoti") if no_prefix_ids: ops.append(f" 🔧 aggiungi prefisso a {len(no_prefix_ids)} chunk") if incomplete_ids: ops.append(f" 🔗 fondi {len(incomplete_ids)} chunk incompleti col successivo") if too_short_ids: ops.append(f" 🔗 fondi {len(too_short_ids)} chunk troppo corti col successivo") if too_long_ids: ops.append(f" ✂️ spezza {len(too_long_ids)} chunk troppo lunghi") if not ops: print(" ✅ Nessuna correzione necessaria.") return True print("\n Operazioni pianificate:") for op in ops: print(op) if dry_run: print("\n [dry-run] Nessuna modifica applicata.") return True n_before = len(chunks) def _fix_blockers(chunks: list[dict], report: dict) -> list[dict]: """Risolve solo i blockers (incomplete, empty, no_prefix) senza toccare warnings.""" empty_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])} no_prefix_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])} incomplete_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])} if empty_ids_: chunks, n = fix_empty(chunks, empty_ids_) print(f" 🗑 Rimossi {n} chunk vuoti.") if no_prefix_ids_: chunks, n = fix_no_prefix(chunks, no_prefix_ids_) print(f" 🔧 Aggiunto prefisso a {n} chunk.") if incomplete_ids_: chunks, n = fix_incomplete_and_short(chunks, incomplete_ids_) print(f" 🔗 Fusi {n} chunk incompleti.") return renumber_ids(chunks) def _fix_warnings(chunks: list[dict], report: dict) -> list[dict]: """Applica fix opzionali: merge too_short e split too_long.""" too_short_ids_ = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])} too_long_ids_ = { e["chunk_id"] for e in report.get("warnings", {}).get("too_long", []) if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR } if too_short_ids_: chunks, n = fix_incomplete_and_short(chunks, too_short_ids_) print(f" 🔗 Fusi {n} chunk troppo corti.") if too_long_ids_: chunks, n = fix_too_long(chunks, too_long_ids_, max_chars) print(f" ✂️ Spezzati {n} chunk lunghi.") return renumber_ids(chunks) # Fase 1: risolvi blockers a convergenza (solo merge incomplete) chunks = _fix_blockers(chunks, report) _min = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) _max = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) prev_blockers = sum(len(v) for v in report.get("blockers", {}).values()) for iteration in range(1, max_iter + 1): chunks_path.write_text( json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" ) with contextlib.redirect_stdout(io.StringIO()): _verify_stem(stem, project_root, _min, _max) report = json.loads(report_path.read_text(encoding="utf-8")) new_verdict = report.get("verdict", "ok") curr_blockers = sum(len(v) for v in report.get("blockers", {}).values()) if new_verdict in ("ok", "warnings_only") or curr_blockers == 0: break if curr_blockers >= prev_blockers: print(f"\n ⚠️ Nessun miglioramento ({curr_blockers} blockers) - i restanti richiedono correzione manuale del clean.md.") break print(f"\n Iterazione {iteration + 1} - {curr_blockers} blockers residui:") prev_blockers = curr_blockers chunks = _fix_blockers(chunks, report) # Fase 2: fix warnings (too_short merge + too_long split) - una sola passata finale with contextlib.redirect_stdout(io.StringIO()): _verify_stem(stem, project_root, _min, _max) report = json.loads(report_path.read_text(encoding="utf-8")) n_short = len(report.get("warnings", {}).get("too_short", [])) n_long = sum( 1 for e in report.get("warnings", {}).get("too_long", []) if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR ) if n_short or n_long: print(f"\n Fix warnings: {n_short} corti, {n_long} lunghi da spezzare") chunks = _fix_warnings(chunks, report) n_after = len(chunks) print(f"\n Totale chunk: {n_before} → {n_after}") chunks_path.write_text( json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" ) print(f" ✅ Salvato: chunks/{stem}/chunks.json") final_verdict = report.get("verdict", "?") if final_verdict == "ok": print(f" ✅ Verdict finale: ok - procedi alla vettorizzazione.") elif final_verdict == "warnings_only": print(f" 🟡 Verdict finale: warnings_only - puoi procedere.") else: print(f" 🔴 Verdict finale: {final_verdict} - rilancia la verifica manualmente:") print(f" python chunks/verify_chunks.py --stem {stem}") return True # ─── Entry point ────────────────────────────────────────────────────────────── if __name__ == "__main__": project_root = Path(__file__).parent.parent parser = argparse.ArgumentParser(description="Fix chunk") parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di chunks/)") _max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) parser.add_argument( "--max", type=int, default=_max_def, help=f"Soglia massima caratteri per lo split (default: TARGET×(1+TOL) = {_max_def})" ) parser.add_argument( "--dry-run", action="store_true", help="Mostra le operazioni pianificate senza applicarle" ) parser.add_argument( "--max-iter", type=int, default=10, metavar="N", help="Numero massimo di iterazioni automatiche (default: 10)" ) args = parser.parse_args() ok = fix_stem(args.stem, project_root, args.max, args.dry_run, args.max_iter) sys.exit(0 if ok else 1)