Files
davide 5b63c423cc feat(chunks): ottimizzazione chunking e post-processing
- chunker.py: scrive meta.json con strategia e soglie effettive (target,
  min_chars, max_chars) per ogni documento chunked

- verify_chunks.py:
  * _load_thresholds(): legge min/max da meta.json invece del TARGET_CHARS
    globale, eliminando il mismatch tra soglie chunker e verify
    (h3_aware target=600 -> range 450-750, non piu' validato a 225-375)
  * _ROMAN_END: esclude numeri romani finali (XV, XIV...) dagli incompleti
    perche' sono artefatti indice PDF, non frasi spezzate
  * PUNCT_END: aggiunge ; come fine valida (clausole legali italiane)

- fix_chunks.py:
  * _load_thresholds(): usa max_chars da meta.json per split coerente
  * _SECONDARY_END: split secondario su ; per testo legale multi-clausola
  * Fase 1 (convergenza): risolve solo blockers (incomplete, empty,
    no_prefix) senza toccare warnings -- elimina il ciclo
    merge->too_long->split->incomplete->merge
  * Fase 2 (finale): una sola passata di merge too_short + split too_long
    dopo che i blockers sono azzerati

Risultato su dirittopenale: da blocked (265 incomplete) a warnings_only
in 2 iterazioni, senza cicli infiniti.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 11:09:28 +02:00

398 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Fix chunk
Applica correzioni dirette su chunks/<stem>/chunks.json basandosi sul
report.json prodotto da verify_chunks.py. Non tocca clean.md.
Fixes applicati:
empty → rimuove il chunk
incomplete → fonde con il chunk successivo (la frase continua)
no_prefix → aggiunge prefisso [sezione > titolo] se mancante
too_short → fonde con il chunk adiacente nello stesso sezione
too_long → spezza all'ultimo confine di paragrafo/frase entro MAX_CHARS
Input: chunks/<stem>/chunks.json + chunks/<stem>/report.json
Output: chunks/<stem>/chunks.json (sovrascrive)
Uso:
python chunks/fix_chunks.py --stem documento
python chunks/fix_chunks.py --stem documento --dry-run
"""
import argparse
import contextlib
import io
import json
import re
import sys
from pathlib import Path
_HERE = Path(__file__).resolve().parent
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
import config as cfg
from verify_chunks import verify_stem as _verify_stem
MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
def _load_thresholds(stem_dir: Path) -> int:
"""Legge max_chars da meta.json (scritto dal chunker) o usa il default da config."""
meta = stem_dir / "meta.json"
if meta.exists():
import json as _json
return _json.loads(meta.read_text(encoding="utf-8"))["max_chars"]
return MAX_CHARS
PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$")
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _prefix(chunk: dict) -> str:
sezione = chunk.get("sezione", "")
titolo = chunk.get("titolo", "")
if titolo:
return f"[{sezione} > {titolo}]"
return f"[{sezione}]"
def _strip_prefix(text: str) -> str:
text = text.lstrip()
if text.startswith("["):
end = text.find("]")
if end != -1:
return text[end + 1:].lstrip("\n")
return text
def _rebuild_text(chunk: dict, body: str) -> str:
return f"{_prefix(chunk)}\n{body}"
# Fine frase forte: . ! ? seguiti da spazio + maiuscola o virgolette.
# Non usare punteggiatura debole (,;:)>>]) per non creare chunk incompleti.
_STRONG_END = re.compile(
r'[.!?\xbb]\s+(?=[A-Z\xc0-\xd6\xd8-\xde\xc0-\xff\xab\x22\x27(])'
)
_SECONDARY_END = re.compile(r';\s+')
def _split_at_boundary(text: str, max_chars: int) -> list[str]:
"""Spezza text in parti ≤ max_chars su confini di frase forti (.!?).
Se non trova un confine forte entro max_chars, NON spezza: meglio un
chunk too_long (warning) che un chunk incompleto (blocker).
"""
if len(text) <= max_chars:
return [text]
parts = []
remaining = text
while len(remaining) > max_chars:
candidate = remaining[:max_chars]
last_pos = -1
for m in _STRONG_END.finditer(candidate):
last_pos = m.start() + 1 # posizione dopo il carattere terminatore
if last_pos > 0:
first = remaining[:last_pos].rstrip()
remaining = remaining[last_pos:].lstrip()
if first:
parts.append(first)
else:
# Prova confine secondario: ; + spazio (clausole legali)
sec_pos = -1
for m in _SECONDARY_END.finditer(candidate):
sec_pos = m.start() + 1
if sec_pos > 0:
first = remaining[:sec_pos].rstrip()
remaining = remaining[sec_pos:].lstrip()
if first:
parts.append(first)
else:
# Nessun confine: lascia il chunk intero (too_long > incomplete)
break
if remaining:
parts.append(remaining)
return [p for p in parts if p.strip()]
# ─── Operazioni sui chunk ─────────────────────────────────────────────────────
def fix_empty(chunks: list[dict], empty_ids: set[str]) -> tuple[list[dict], int]:
before = len(chunks)
chunks = [c for c in chunks if c["chunk_id"] not in empty_ids]
return chunks, before - len(chunks)
def fix_no_prefix(chunks: list[dict], no_prefix_ids: set[str]) -> tuple[list[dict], int]:
count = 0
for c in chunks:
if c["chunk_id"] in no_prefix_ids:
body = _strip_prefix(c["text"])
c["text"] = _rebuild_text(c, body)
c["n_chars"] = len(c["text"])
count += 1
return chunks, count
def fix_incomplete_and_short(chunks: list[dict],
problem_ids: set[str]) -> tuple[list[dict], int]:
merged = 0
i = 0
result: list[dict] = []
while i < len(chunks):
c = chunks[i]
if c["chunk_id"] in problem_ids and i + 1 < len(chunks):
nxt = chunks[i + 1]
body_c = _strip_prefix(c["text"])
body_nxt = _strip_prefix(nxt["text"])
merged_body = body_c.rstrip() + "\n" + body_nxt.lstrip()
nxt["text"] = _rebuild_text(nxt, merged_body)
nxt["n_chars"] = len(nxt["text"])
merged += 1
i += 1
continue
result.append(c)
i += 1
return result, merged
def fix_too_long(chunks: list[dict],
too_long_ids: set[str],
max_chars: int) -> tuple[list[dict], int]:
result: list[dict] = []
split_count = 0
for c in chunks:
if c["chunk_id"] not in too_long_ids:
result.append(c)
continue
body = _strip_prefix(c["text"])
parts = _split_at_boundary(body, max_chars)
if len(parts) == 1:
result.append(c)
continue
base_id = re.sub(r"__s\d+$", "", c["chunk_id"])
base_sub = c.get("sub_index", 0)
for j, part in enumerate(parts):
new_chunk = dict(c)
new_chunk["sub_index"] = base_sub + j
new_chunk["chunk_id"] = f"{base_id}__s{base_sub + j}"
new_chunk["text"] = _rebuild_text(new_chunk, part)
new_chunk["n_chars"] = len(new_chunk["text"])
result.append(new_chunk)
split_count += 1
return result, split_count
def renumber_ids(chunks: list[dict]) -> list[dict]:
seen: dict[str, int] = {}
for c in chunks:
base = re.sub(r"__s\d+$", "", c["chunk_id"])
idx = seen.get(base, 0)
c["chunk_id"] = f"{base}__s{idx}"
c["sub_index"] = idx
seen[base] = idx + 1
return chunks
# ─── Core ─────────────────────────────────────────────────────────────────────
def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool,
max_iter: int = 10) -> bool:
stem_dir = project_root / "chunks" / stem
chunks_path = stem_dir / "chunks.json"
report_path = stem_dir / "report.json"
max_chars = _load_thresholds(stem_dir)
if not chunks_path.exists():
print(f"✗ chunks/{stem}/chunks.json non trovato.")
print(f" Esegui prima: python chunks/chunker.py --stem {stem}")
return False
if not report_path.exists():
print(f"✗ chunks/{stem}/report.json non trovato.")
print(f" Esegui prima: python chunks/verify_chunks.py --stem {stem}")
return False
chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8"))
report: dict = json.loads(report_path.read_text(encoding="utf-8"))
verdict = report.get("verdict", "ok")
print(f"\nDocumento: {stem} (verdict: {verdict})")
if verdict == "ok":
print(" ✅ Nessun problema - nulla da correggere.")
return True
empty_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
# Spezza solo chunk che superano upper × SPLIT_THRESHOLD_FACTOR,
# non quelli appena oltre upper (che causerebbero split con chunk incompleti).
_split_limit = max_chars * cfg.SPLIT_THRESHOLD_FACTOR
too_long_ids = {
e["chunk_id"]
for e in report.get("warnings", {}).get("too_long", [])
if e.get("n_chars", 0) > _split_limit
}
ops: list[str] = []
if empty_ids:
ops.append(f" 🗑 rimuovi {len(empty_ids)} chunk vuoti")
if no_prefix_ids:
ops.append(f" 🔧 aggiungi prefisso a {len(no_prefix_ids)} chunk")
if incomplete_ids:
ops.append(f" 🔗 fondi {len(incomplete_ids)} chunk incompleti col successivo")
if too_short_ids:
ops.append(f" 🔗 fondi {len(too_short_ids)} chunk troppo corti col successivo")
if too_long_ids:
ops.append(f" ✂️ spezza {len(too_long_ids)} chunk troppo lunghi")
if not ops:
print(" ✅ Nessuna correzione necessaria.")
return True
print("\n Operazioni pianificate:")
for op in ops:
print(op)
if dry_run:
print("\n [dry-run] Nessuna modifica applicata.")
return True
n_before = len(chunks)
def _fix_blockers(chunks: list[dict], report: dict) -> list[dict]:
"""Risolve solo i blockers (incomplete, empty, no_prefix) senza toccare warnings."""
empty_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
no_prefix_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
incomplete_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
if empty_ids_:
chunks, n = fix_empty(chunks, empty_ids_)
print(f" 🗑 Rimossi {n} chunk vuoti.")
if no_prefix_ids_:
chunks, n = fix_no_prefix(chunks, no_prefix_ids_)
print(f" 🔧 Aggiunto prefisso a {n} chunk.")
if incomplete_ids_:
chunks, n = fix_incomplete_and_short(chunks, incomplete_ids_)
print(f" 🔗 Fusi {n} chunk incompleti.")
return renumber_ids(chunks)
def _fix_warnings(chunks: list[dict], report: dict) -> list[dict]:
"""Applica fix opzionali: merge too_short e split too_long."""
too_short_ids_ = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
too_long_ids_ = {
e["chunk_id"]
for e in report.get("warnings", {}).get("too_long", [])
if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR
}
if too_short_ids_:
chunks, n = fix_incomplete_and_short(chunks, too_short_ids_)
print(f" 🔗 Fusi {n} chunk troppo corti.")
if too_long_ids_:
chunks, n = fix_too_long(chunks, too_long_ids_, max_chars)
print(f" ✂️ Spezzati {n} chunk lunghi.")
return renumber_ids(chunks)
# Fase 1: risolvi blockers a convergenza (solo merge incomplete)
chunks = _fix_blockers(chunks, report)
_min = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
_max = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
prev_blockers = sum(len(v) for v in report.get("blockers", {}).values())
for iteration in range(1, max_iter + 1):
chunks_path.write_text(
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
with contextlib.redirect_stdout(io.StringIO()):
_verify_stem(stem, project_root, _min, _max)
report = json.loads(report_path.read_text(encoding="utf-8"))
new_verdict = report.get("verdict", "ok")
curr_blockers = sum(len(v) for v in report.get("blockers", {}).values())
if new_verdict in ("ok", "warnings_only") or curr_blockers == 0:
break
if curr_blockers >= prev_blockers:
print(f"\n ⚠️ Nessun miglioramento ({curr_blockers} blockers) - i restanti richiedono correzione manuale del clean.md.")
break
print(f"\n Iterazione {iteration + 1} - {curr_blockers} blockers residui:")
prev_blockers = curr_blockers
chunks = _fix_blockers(chunks, report)
# Fase 2: fix warnings (too_short merge + too_long split) - una sola passata finale
with contextlib.redirect_stdout(io.StringIO()):
_verify_stem(stem, project_root, _min, _max)
report = json.loads(report_path.read_text(encoding="utf-8"))
n_short = len(report.get("warnings", {}).get("too_short", []))
n_long = sum(
1 for e in report.get("warnings", {}).get("too_long", [])
if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR
)
if n_short or n_long:
print(f"\n Fix warnings: {n_short} corti, {n_long} lunghi da spezzare")
chunks = _fix_warnings(chunks, report)
n_after = len(chunks)
print(f"\n Totale chunk: {n_before}{n_after}")
chunks_path.write_text(
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f" ✅ Salvato: chunks/{stem}/chunks.json")
final_verdict = report.get("verdict", "?")
if final_verdict == "ok":
print(f" ✅ Verdict finale: ok - procedi alla vettorizzazione.")
elif final_verdict == "warnings_only":
print(f" 🟡 Verdict finale: warnings_only - puoi procedere.")
else:
print(f" 🔴 Verdict finale: {final_verdict} - rilancia la verifica manualmente:")
print(f" python chunks/verify_chunks.py --stem {stem}")
return True
# ─── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Fix chunk")
parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di chunks/)")
_max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
parser.add_argument(
"--max", type=int, default=_max_def,
help=f"Soglia massima caratteri per lo split (default: TARGET×(1+TOL) = {_max_def})"
)
parser.add_argument(
"--dry-run", action="store_true",
help="Mostra le operazioni pianificate senza applicarle"
)
parser.add_argument(
"--max-iter", type=int, default=10, metavar="N",
help="Numero massimo di iterazioni automatiche (default: 10)"
)
args = parser.parse_args()
ok = fix_stem(args.stem, project_root, args.max, args.dry_run, args.max_iter)
sys.exit(0 if ok else 1)