feat(chunks): ottimizzazione chunking e post-processing

- chunker.py: scrive meta.json con strategia e soglie effettive (target,
  min_chars, max_chars) per ogni documento chunked

- verify_chunks.py:
  * _load_thresholds(): legge min/max da meta.json invece del TARGET_CHARS
    globale, eliminando il mismatch tra soglie chunker e verify
    (h3_aware target=600 -> range 450-750, non piu' validato a 225-375)
  * _ROMAN_END: esclude numeri romani finali (XV, XIV...) dagli incompleti
    perche' sono artefatti indice PDF, non frasi spezzate
  * PUNCT_END: aggiunge ; come fine valida (clausole legali italiane)

- fix_chunks.py:
  * _load_thresholds(): usa max_chars da meta.json per split coerente
  * _SECONDARY_END: split secondario su ; per testo legale multi-clausola
  * Fase 1 (convergenza): risolve solo blockers (incomplete, empty,
    no_prefix) senza toccare warnings -- elimina il ciclo
    merge->too_long->split->incomplete->merge
  * Fase 2 (finale): una sola passata di merge too_short + split too_long
    dopo che i blockers sono azzerati

Risultato su dirittopenale: da blocked (265 incomplete) a warnings_only
in 2 iterazioni, senza cicli infiniti.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-12 11:09:28 +02:00
parent 587238f9f5
commit 5b63c423cc
3 changed files with 129 additions and 57 deletions
+6
View File
@@ -439,6 +439,12 @@ def process_stem(stem: str, project_root: Path, force: bool) -> bool:
lower = int(target * (1 - tolerance))
upper = int(target * (1 + tolerance))
meta = {"strategy": strategia, "target_chars": target,
"min_chars": lower, "max_chars": upper}
(out_dir / "meta.json").write_text(
json.dumps(meta, ensure_ascii=False), encoding="utf-8"
)
lengths = [c["n_chars"] for c in chunks]
min_c = min(lengths)
max_c = max(lengths)
+105 -55
View File
@@ -21,6 +21,8 @@ Uso:
"""
import argparse
import contextlib
import io
import json
import re
import sys
@@ -33,6 +35,15 @@ import config as cfg
from verify_chunks import verify_stem as _verify_stem
MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
def _load_thresholds(stem_dir: Path) -> int:
"""Legge max_chars da meta.json (scritto dal chunker) o usa il default da config."""
meta = stem_dir / "meta.json"
if meta.exists():
import json as _json
return _json.loads(meta.read_text(encoding="utf-8"))["max_chars"]
return MAX_CHARS
PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$")
@@ -59,11 +70,20 @@ def _rebuild_text(chunk: dict, body: str) -> str:
return f"{_prefix(chunk)}\n{body}"
_SENT_END = re.compile(r'[.!?»)\]\'"’”…]')
# Fine frase forte: . ! ? seguiti da spazio + maiuscola o virgolette.
# Non usare punteggiatura debole (,;:)>>]) per non creare chunk incompleti.
_STRONG_END = re.compile(
r'[.!?\xbb]\s+(?=[A-Z\xc0-\xd6\xd8-\xde\xc0-\xff\xab\x22\x27(])'
)
_SECONDARY_END = re.compile(r';\s+')
def _split_at_boundary(text: str, max_chars: int) -> list[str]:
"""Spezza text in parti ≤ max_chars, ciascuna terminante su punteggiatura."""
"""Spezza text in parti ≤ max_chars su confini di frase forti (.!?).
Se non trova un confine forte entro max_chars, NON spezza: meglio un
chunk too_long (warning) che un chunk incompleto (blocker).
"""
if len(text) <= max_chars:
return [text]
@@ -73,27 +93,28 @@ def _split_at_boundary(text: str, max_chars: int) -> list[str]:
while len(remaining) > max_chars:
candidate = remaining[:max_chars]
# Cerca l'ultima punteggiatura finale entro max_chars.
last_punct = -1
for m in _SENT_END.finditer(candidate):
last_punct = m.end() # posizione dopo il carattere di punteggiatura
last_pos = -1
for m in _STRONG_END.finditer(candidate):
last_pos = m.start() + 1 # posizione dopo il carattere terminatore
if last_punct > 0:
# Taglia dopo la punteggiatura; il resto inizia alla parola successiva.
first = remaining[:last_punct].rstrip()
remaining = remaining[last_punct:].lstrip()
if last_pos > 0:
first = remaining[:last_pos].rstrip()
remaining = remaining[last_pos:].lstrip()
if first:
parts.append(first)
else:
# Nessuna punteggiatura: taglia all'ultimo spazio disponibile.
sp = candidate.rfind(" ")
if sp > 0:
first = remaining[:sp].rstrip()
remaining = remaining[sp:].lstrip()
# Prova confine secondario: ; + spazio (clausole legali)
sec_pos = -1
for m in _SECONDARY_END.finditer(candidate):
sec_pos = m.start() + 1
if sec_pos > 0:
first = remaining[:sec_pos].rstrip()
remaining = remaining[sec_pos:].lstrip()
if first:
parts.append(first)
else:
first = remaining[:max_chars]
remaining = remaining[max_chars:]
if first:
parts.append(first)
# Nessun confine: lascia il chunk intero (too_long > incomplete)
break
if remaining:
parts.append(remaining)
@@ -191,10 +212,12 @@ def renumber_ids(chunks: list[dict]) -> list[dict]:
# ─── Core ─────────────────────────────────────────────────────────────────────
def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bool:
def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool,
max_iter: int = 10) -> bool:
stem_dir = project_root / "chunks" / stem
chunks_path = stem_dir / "chunks.json"
report_path = stem_dir / "report.json"
max_chars = _load_thresholds(stem_dir)
if not chunks_path.exists():
print(f"✗ chunks/{stem}/chunks.json non trovato.")
@@ -213,7 +236,7 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo
print(f"\nDocumento: {stem} (verdict: {verdict})")
if verdict == "ok":
print(" ✅ Nessun problema nulla da correggere.")
print(" ✅ Nessun problema - nulla da correggere.")
return True
empty_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
@@ -256,54 +279,77 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo
n_before = len(chunks)
def _apply_fixes(chunks: list[dict], report: dict) -> list[dict]:
empty_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
no_prefix_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
incomplete_ids_= {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
too_short_ids_ = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
too_long_ids_ = {
e["chunk_id"]
for e in report.get("warnings", {}).get("too_long", [])
if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR
}
def _fix_blockers(chunks: list[dict], report: dict) -> list[dict]:
"""Risolve solo i blockers (incomplete, empty, no_prefix) senza toccare warnings."""
empty_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
no_prefix_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
incomplete_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
if empty_ids_:
chunks, n = fix_empty(chunks, empty_ids_)
print(f" 🗑 Rimossi {n} chunk vuoti.")
if no_prefix_ids_:
chunks, n = fix_no_prefix(chunks, no_prefix_ids_)
print(f" 🔧 Aggiunto prefisso a {n} chunk.")
merge_ids_ = incomplete_ids_ | too_short_ids_
if merge_ids_:
chunks, n = fix_incomplete_and_short(chunks, merge_ids_)
print(f" 🔗 Fusi {n} chunk (incompleti + corti).")
if incomplete_ids_:
chunks, n = fix_incomplete_and_short(chunks, incomplete_ids_)
print(f" 🔗 Fusi {n} chunk incompleti.")
return renumber_ids(chunks)
def _fix_warnings(chunks: list[dict], report: dict) -> list[dict]:
"""Applica fix opzionali: merge too_short e split too_long."""
too_short_ids_ = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
too_long_ids_ = {
e["chunk_id"]
for e in report.get("warnings", {}).get("too_long", [])
if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR
}
if too_short_ids_:
chunks, n = fix_incomplete_and_short(chunks, too_short_ids_)
print(f" 🔗 Fusi {n} chunk troppo corti.")
if too_long_ids_:
chunks, n = fix_too_long(chunks, too_long_ids_, max_chars)
print(f" ✂️ Spezzati {n} chunk lunghi.")
return renumber_ids(chunks)
chunks = _apply_fixes(chunks, report)
# Fase 1: risolvi blockers a convergenza (solo merge incomplete)
chunks = _fix_blockers(chunks, report)
for iteration in range(1, cfg.FIX_MAX_ITERATIONS):
_min = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
_max = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
prev_blockers = sum(len(v) for v in report.get("blockers", {}).values())
for iteration in range(1, max_iter + 1):
chunks_path.write_text(
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
project_root = chunks_path.parent.parent.parent
_min = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
_max = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
_verify_stem(stem, project_root, _min, _max)
with contextlib.redirect_stdout(io.StringIO()):
_verify_stem(stem, project_root, _min, _max)
report = json.loads(report_path.read_text(encoding="utf-8"))
new_verdict = report.get("verdict", "ok")
if new_verdict in ("ok", "warnings_only"):
curr_blockers = sum(len(v) for v in report.get("blockers", {}).values())
if new_verdict in ("ok", "warnings_only") or curr_blockers == 0:
break
remaining_blockers = sum(
len(v) for v in report.get("blockers", {}).values()
)
if remaining_blockers == 0:
if curr_blockers >= prev_blockers:
print(f"\n ⚠️ Nessun miglioramento ({curr_blockers} blockers) - i restanti richiedono correzione manuale del clean.md.")
break
print(f"\n Iterazione {iteration + 1}/{cfg.FIX_MAX_ITERATIONS} "
f" {remaining_blockers} bloccer residui:")
chunks = _apply_fixes(chunks, report)
print(f"\n Iterazione {iteration + 1} - {curr_blockers} blockers residui:")
prev_blockers = curr_blockers
chunks = _fix_blockers(chunks, report)
# Fase 2: fix warnings (too_short merge + too_long split) - una sola passata finale
with contextlib.redirect_stdout(io.StringIO()):
_verify_stem(stem, project_root, _min, _max)
report = json.loads(report_path.read_text(encoding="utf-8"))
n_short = len(report.get("warnings", {}).get("too_short", []))
n_long = sum(
1 for e in report.get("warnings", {}).get("too_long", [])
if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR
)
if n_short or n_long:
print(f"\n Fix warnings: {n_short} corti, {n_long} lunghi da spezzare")
chunks = _fix_warnings(chunks, report)
n_after = len(chunks)
print(f"\n Totale chunk: {n_before}{n_after}")
@@ -315,11 +361,11 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo
final_verdict = report.get("verdict", "?")
if final_verdict == "ok":
print(f" ✅ Verdict finale: ok procedi alla vettorizzazione.")
print(f" ✅ Verdict finale: ok - procedi alla vettorizzazione.")
elif final_verdict == "warnings_only":
print(f" 🟡 Verdict finale: warnings_only puoi procedere.")
print(f" 🟡 Verdict finale: warnings_only - puoi procedere.")
else:
print(f" 🔴 Verdict finale: {final_verdict} rilancia la verifica manualmente:")
print(f" 🔴 Verdict finale: {final_verdict} - rilancia la verifica manualmente:")
print(f" python chunks/verify_chunks.py --stem {stem}")
return True
@@ -341,7 +387,11 @@ if __name__ == "__main__":
"--dry-run", action="store_true",
help="Mostra le operazioni pianificate senza applicarle"
)
parser.add_argument(
"--max-iter", type=int, default=10, metavar="N",
help="Numero massimo di iterazioni automatiche (default: 10)"
)
args = parser.parse_args()
ok = fix_stem(args.stem, project_root, args.max, args.dry_run)
ok = fix_stem(args.stem, project_root, args.max, args.dry_run, args.max_iter)
sys.exit(0 if ok else 1)
+18 -2
View File
@@ -31,16 +31,28 @@ import config as cfg
MIN_CHARS = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
PUNCT_END = re.compile(
r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$"
r"[.!?\xbb)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$"
r"|/$" # URL che finisce con /
r"|\|$" # riga di tabella Markdown
r"|;$" # fine clausola legale (testo giuridico)
r"|:$" # introduzione a lista o formula
)
_HEX_END = re.compile(r"[0-9a-fA-F]{8,}$")
_URL_TAIL = re.compile(r"https?://\S+(\s+\S+){0,3}$") # URL con fino a 3 token extra
_MATH_SYMS = re.compile(r"[∈∑≤≥≠∀∃∫√∞∂±×÷→←↔⊂⊃⊆⊇∩∪·°]")
_ROMAN_END = re.compile(r"\b(I{1,3}|IV|VI{0,3}|IX|XI{0,2}|XIV|XV|XVI{0,2}|XIX|XX{0,2})$")
def _load_thresholds(stem_dir: "Path") -> "tuple[int, int]":
"""Legge min/max da meta.json (scritto dal chunker) o usa i default da config."""
meta = stem_dir / "meta.json"
if meta.exists():
import json as _json
m = _json.loads(meta.read_text(encoding="utf-8"))
return m["min_chars"], m["max_chars"]
return MIN_CHARS, MAX_CHARS
# ─── Checks ───────────────────────────────────────────────────────────────────
def has_prefix(chunk: dict) -> bool:
@@ -70,6 +82,8 @@ def ends_incomplete(chunk: dict) -> bool:
return False
if _HEX_END.search(text_check): # hash SHA / codice hex
return False
if _ROMAN_END.search(text_check): # numero romano finale (indice/riferimento PDF)
return False
if _URL_TAIL.search(text_check[-200:]): # URL (con eventuale path dopo spazio)
return False
return True
@@ -90,7 +104,9 @@ def _fmt_chunk(c: dict) -> str:
def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool:
chunks_path = project_root / "chunks" / stem / "chunks.json"
stem_dir = project_root / "chunks" / stem
chunks_path = stem_dir / "chunks.json"
min_chars, max_chars = _load_thresholds(stem_dir)
print(f"\nDocumento: {stem}")