5b63c423cc
- chunker.py: scrive meta.json con strategia e soglie effettive (target,
min_chars, max_chars) per ogni documento chunked
- verify_chunks.py:
* _load_thresholds(): legge min/max da meta.json invece del TARGET_CHARS
globale, eliminando il mismatch tra soglie chunker e verify
(h3_aware target=600 -> range 450-750, non piu' validato a 225-375)
* _ROMAN_END: esclude numeri romani finali (XV, XIV...) dagli incompleti
perche' sono artefatti indice PDF, non frasi spezzate
* PUNCT_END: aggiunge ; come fine valida (clausole legali italiane)
- fix_chunks.py:
* _load_thresholds(): usa max_chars da meta.json per split coerente
* _SECONDARY_END: split secondario su ; per testo legale multi-clausola
* Fase 1 (convergenza): risolve solo blockers (incomplete, empty,
no_prefix) senza toccare warnings -- elimina il ciclo
merge->too_long->split->incomplete->merge
* Fase 2 (finale): una sola passata di merge too_short + split too_long
dopo che i blockers sono azzerati
Risultato su dirittopenale: da blocked (265 incomplete) a warnings_only
in 2 iterazioni, senza cicli infiniti.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
398 lines
15 KiB
Python
398 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Fix chunk
|
||
|
||
Applica correzioni dirette su chunks/<stem>/chunks.json basandosi sul
|
||
report.json prodotto da verify_chunks.py. Non tocca clean.md.
|
||
|
||
Fixes applicati:
|
||
empty → rimuove il chunk
|
||
incomplete → fonde con il chunk successivo (la frase continua)
|
||
no_prefix → aggiunge prefisso [sezione > titolo] se mancante
|
||
too_short → fonde con il chunk adiacente nello stesso sezione
|
||
too_long → spezza all'ultimo confine di paragrafo/frase entro MAX_CHARS
|
||
|
||
Input: chunks/<stem>/chunks.json + chunks/<stem>/report.json
|
||
Output: chunks/<stem>/chunks.json (sovrascrive)
|
||
|
||
Uso:
|
||
python chunks/fix_chunks.py --stem documento
|
||
python chunks/fix_chunks.py --stem documento --dry-run
|
||
"""
|
||
|
||
import argparse
|
||
import contextlib
|
||
import io
|
||
import json
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
_HERE = Path(__file__).resolve().parent
|
||
if str(_HERE) not in sys.path:
|
||
sys.path.insert(0, str(_HERE))
|
||
import config as cfg
|
||
from verify_chunks import verify_stem as _verify_stem
|
||
|
||
MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
|
||
|
||
|
||
def _load_thresholds(stem_dir: Path) -> int:
|
||
"""Legge max_chars da meta.json (scritto dal chunker) o usa il default da config."""
|
||
meta = stem_dir / "meta.json"
|
||
if meta.exists():
|
||
import json as _json
|
||
return _json.loads(meta.read_text(encoding="utf-8"))["max_chars"]
|
||
return MAX_CHARS
|
||
PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$")
|
||
|
||
|
||
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
||
|
||
def _prefix(chunk: dict) -> str:
|
||
sezione = chunk.get("sezione", "")
|
||
titolo = chunk.get("titolo", "")
|
||
if titolo:
|
||
return f"[{sezione} > {titolo}]"
|
||
return f"[{sezione}]"
|
||
|
||
|
||
def _strip_prefix(text: str) -> str:
|
||
text = text.lstrip()
|
||
if text.startswith("["):
|
||
end = text.find("]")
|
||
if end != -1:
|
||
return text[end + 1:].lstrip("\n")
|
||
return text
|
||
|
||
|
||
def _rebuild_text(chunk: dict, body: str) -> str:
|
||
return f"{_prefix(chunk)}\n{body}"
|
||
|
||
|
||
# Fine frase forte: . ! ? seguiti da spazio + maiuscola o virgolette.
|
||
# Non usare punteggiatura debole (,;:)>>]) per non creare chunk incompleti.
|
||
_STRONG_END = re.compile(
|
||
r'[.!?\xbb]\s+(?=[A-Z\xc0-\xd6\xd8-\xde\xc0-\xff\xab\x22\x27(])'
|
||
)
|
||
_SECONDARY_END = re.compile(r';\s+')
|
||
|
||
|
||
def _split_at_boundary(text: str, max_chars: int) -> list[str]:
|
||
"""Spezza text in parti ≤ max_chars su confini di frase forti (.!?).
|
||
|
||
Se non trova un confine forte entro max_chars, NON spezza: meglio un
|
||
chunk too_long (warning) che un chunk incompleto (blocker).
|
||
"""
|
||
if len(text) <= max_chars:
|
||
return [text]
|
||
|
||
parts = []
|
||
remaining = text
|
||
|
||
while len(remaining) > max_chars:
|
||
candidate = remaining[:max_chars]
|
||
|
||
last_pos = -1
|
||
for m in _STRONG_END.finditer(candidate):
|
||
last_pos = m.start() + 1 # posizione dopo il carattere terminatore
|
||
|
||
if last_pos > 0:
|
||
first = remaining[:last_pos].rstrip()
|
||
remaining = remaining[last_pos:].lstrip()
|
||
if first:
|
||
parts.append(first)
|
||
else:
|
||
# Prova confine secondario: ; + spazio (clausole legali)
|
||
sec_pos = -1
|
||
for m in _SECONDARY_END.finditer(candidate):
|
||
sec_pos = m.start() + 1
|
||
if sec_pos > 0:
|
||
first = remaining[:sec_pos].rstrip()
|
||
remaining = remaining[sec_pos:].lstrip()
|
||
if first:
|
||
parts.append(first)
|
||
else:
|
||
# Nessun confine: lascia il chunk intero (too_long > incomplete)
|
||
break
|
||
|
||
if remaining:
|
||
parts.append(remaining)
|
||
|
||
return [p for p in parts if p.strip()]
|
||
|
||
|
||
# ─── Operazioni sui chunk ─────────────────────────────────────────────────────
|
||
|
||
def fix_empty(chunks: list[dict], empty_ids: set[str]) -> tuple[list[dict], int]:
|
||
before = len(chunks)
|
||
chunks = [c for c in chunks if c["chunk_id"] not in empty_ids]
|
||
return chunks, before - len(chunks)
|
||
|
||
|
||
def fix_no_prefix(chunks: list[dict], no_prefix_ids: set[str]) -> tuple[list[dict], int]:
|
||
count = 0
|
||
for c in chunks:
|
||
if c["chunk_id"] in no_prefix_ids:
|
||
body = _strip_prefix(c["text"])
|
||
c["text"] = _rebuild_text(c, body)
|
||
c["n_chars"] = len(c["text"])
|
||
count += 1
|
||
return chunks, count
|
||
|
||
|
||
def fix_incomplete_and_short(chunks: list[dict],
|
||
problem_ids: set[str]) -> tuple[list[dict], int]:
|
||
merged = 0
|
||
i = 0
|
||
result: list[dict] = []
|
||
|
||
while i < len(chunks):
|
||
c = chunks[i]
|
||
if c["chunk_id"] in problem_ids and i + 1 < len(chunks):
|
||
nxt = chunks[i + 1]
|
||
body_c = _strip_prefix(c["text"])
|
||
body_nxt = _strip_prefix(nxt["text"])
|
||
merged_body = body_c.rstrip() + "\n" + body_nxt.lstrip()
|
||
nxt["text"] = _rebuild_text(nxt, merged_body)
|
||
nxt["n_chars"] = len(nxt["text"])
|
||
merged += 1
|
||
i += 1
|
||
continue
|
||
result.append(c)
|
||
i += 1
|
||
|
||
return result, merged
|
||
|
||
|
||
def fix_too_long(chunks: list[dict],
|
||
too_long_ids: set[str],
|
||
max_chars: int) -> tuple[list[dict], int]:
|
||
result: list[dict] = []
|
||
split_count = 0
|
||
|
||
for c in chunks:
|
||
if c["chunk_id"] not in too_long_ids:
|
||
result.append(c)
|
||
continue
|
||
|
||
body = _strip_prefix(c["text"])
|
||
parts = _split_at_boundary(body, max_chars)
|
||
|
||
if len(parts) == 1:
|
||
result.append(c)
|
||
continue
|
||
|
||
base_id = re.sub(r"__s\d+$", "", c["chunk_id"])
|
||
base_sub = c.get("sub_index", 0)
|
||
|
||
for j, part in enumerate(parts):
|
||
new_chunk = dict(c)
|
||
new_chunk["sub_index"] = base_sub + j
|
||
new_chunk["chunk_id"] = f"{base_id}__s{base_sub + j}"
|
||
new_chunk["text"] = _rebuild_text(new_chunk, part)
|
||
new_chunk["n_chars"] = len(new_chunk["text"])
|
||
result.append(new_chunk)
|
||
|
||
split_count += 1
|
||
|
||
return result, split_count
|
||
|
||
|
||
def renumber_ids(chunks: list[dict]) -> list[dict]:
|
||
seen: dict[str, int] = {}
|
||
for c in chunks:
|
||
base = re.sub(r"__s\d+$", "", c["chunk_id"])
|
||
idx = seen.get(base, 0)
|
||
c["chunk_id"] = f"{base}__s{idx}"
|
||
c["sub_index"] = idx
|
||
seen[base] = idx + 1
|
||
return chunks
|
||
|
||
|
||
# ─── Core ─────────────────────────────────────────────────────────────────────
|
||
|
||
def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool,
|
||
max_iter: int = 10) -> bool:
|
||
stem_dir = project_root / "chunks" / stem
|
||
chunks_path = stem_dir / "chunks.json"
|
||
report_path = stem_dir / "report.json"
|
||
max_chars = _load_thresholds(stem_dir)
|
||
|
||
if not chunks_path.exists():
|
||
print(f"✗ chunks/{stem}/chunks.json non trovato.")
|
||
print(f" Esegui prima: python chunks/chunker.py --stem {stem}")
|
||
return False
|
||
|
||
if not report_path.exists():
|
||
print(f"✗ chunks/{stem}/report.json non trovato.")
|
||
print(f" Esegui prima: python chunks/verify_chunks.py --stem {stem}")
|
||
return False
|
||
|
||
chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8"))
|
||
report: dict = json.loads(report_path.read_text(encoding="utf-8"))
|
||
|
||
verdict = report.get("verdict", "ok")
|
||
print(f"\nDocumento: {stem} (verdict: {verdict})")
|
||
|
||
if verdict == "ok":
|
||
print(" ✅ Nessun problema - nulla da correggere.")
|
||
return True
|
||
|
||
empty_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
|
||
no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
|
||
incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
|
||
too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
|
||
|
||
# Spezza solo chunk che superano upper × SPLIT_THRESHOLD_FACTOR,
|
||
# non quelli appena oltre upper (che causerebbero split con chunk incompleti).
|
||
_split_limit = max_chars * cfg.SPLIT_THRESHOLD_FACTOR
|
||
too_long_ids = {
|
||
e["chunk_id"]
|
||
for e in report.get("warnings", {}).get("too_long", [])
|
||
if e.get("n_chars", 0) > _split_limit
|
||
}
|
||
|
||
ops: list[str] = []
|
||
if empty_ids:
|
||
ops.append(f" 🗑 rimuovi {len(empty_ids)} chunk vuoti")
|
||
if no_prefix_ids:
|
||
ops.append(f" 🔧 aggiungi prefisso a {len(no_prefix_ids)} chunk")
|
||
if incomplete_ids:
|
||
ops.append(f" 🔗 fondi {len(incomplete_ids)} chunk incompleti col successivo")
|
||
if too_short_ids:
|
||
ops.append(f" 🔗 fondi {len(too_short_ids)} chunk troppo corti col successivo")
|
||
if too_long_ids:
|
||
ops.append(f" ✂️ spezza {len(too_long_ids)} chunk troppo lunghi")
|
||
|
||
if not ops:
|
||
print(" ✅ Nessuna correzione necessaria.")
|
||
return True
|
||
|
||
print("\n Operazioni pianificate:")
|
||
for op in ops:
|
||
print(op)
|
||
|
||
if dry_run:
|
||
print("\n [dry-run] Nessuna modifica applicata.")
|
||
return True
|
||
|
||
n_before = len(chunks)
|
||
|
||
def _fix_blockers(chunks: list[dict], report: dict) -> list[dict]:
|
||
"""Risolve solo i blockers (incomplete, empty, no_prefix) senza toccare warnings."""
|
||
empty_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
|
||
no_prefix_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
|
||
incomplete_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
|
||
if empty_ids_:
|
||
chunks, n = fix_empty(chunks, empty_ids_)
|
||
print(f" 🗑 Rimossi {n} chunk vuoti.")
|
||
if no_prefix_ids_:
|
||
chunks, n = fix_no_prefix(chunks, no_prefix_ids_)
|
||
print(f" 🔧 Aggiunto prefisso a {n} chunk.")
|
||
if incomplete_ids_:
|
||
chunks, n = fix_incomplete_and_short(chunks, incomplete_ids_)
|
||
print(f" 🔗 Fusi {n} chunk incompleti.")
|
||
return renumber_ids(chunks)
|
||
|
||
def _fix_warnings(chunks: list[dict], report: dict) -> list[dict]:
|
||
"""Applica fix opzionali: merge too_short e split too_long."""
|
||
too_short_ids_ = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
|
||
too_long_ids_ = {
|
||
e["chunk_id"]
|
||
for e in report.get("warnings", {}).get("too_long", [])
|
||
if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR
|
||
}
|
||
if too_short_ids_:
|
||
chunks, n = fix_incomplete_and_short(chunks, too_short_ids_)
|
||
print(f" 🔗 Fusi {n} chunk troppo corti.")
|
||
if too_long_ids_:
|
||
chunks, n = fix_too_long(chunks, too_long_ids_, max_chars)
|
||
print(f" ✂️ Spezzati {n} chunk lunghi.")
|
||
return renumber_ids(chunks)
|
||
|
||
# Fase 1: risolvi blockers a convergenza (solo merge incomplete)
|
||
chunks = _fix_blockers(chunks, report)
|
||
|
||
_min = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
|
||
_max = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
|
||
prev_blockers = sum(len(v) for v in report.get("blockers", {}).values())
|
||
|
||
for iteration in range(1, max_iter + 1):
|
||
chunks_path.write_text(
|
||
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
with contextlib.redirect_stdout(io.StringIO()):
|
||
_verify_stem(stem, project_root, _min, _max)
|
||
report = json.loads(report_path.read_text(encoding="utf-8"))
|
||
new_verdict = report.get("verdict", "ok")
|
||
curr_blockers = sum(len(v) for v in report.get("blockers", {}).values())
|
||
|
||
if new_verdict in ("ok", "warnings_only") or curr_blockers == 0:
|
||
break
|
||
if curr_blockers >= prev_blockers:
|
||
print(f"\n ⚠️ Nessun miglioramento ({curr_blockers} blockers) - i restanti richiedono correzione manuale del clean.md.")
|
||
break
|
||
|
||
print(f"\n Iterazione {iteration + 1} - {curr_blockers} blockers residui:")
|
||
prev_blockers = curr_blockers
|
||
chunks = _fix_blockers(chunks, report)
|
||
|
||
# Fase 2: fix warnings (too_short merge + too_long split) - una sola passata finale
|
||
with contextlib.redirect_stdout(io.StringIO()):
|
||
_verify_stem(stem, project_root, _min, _max)
|
||
report = json.loads(report_path.read_text(encoding="utf-8"))
|
||
n_short = len(report.get("warnings", {}).get("too_short", []))
|
||
n_long = sum(
|
||
1 for e in report.get("warnings", {}).get("too_long", [])
|
||
if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR
|
||
)
|
||
if n_short or n_long:
|
||
print(f"\n Fix warnings: {n_short} corti, {n_long} lunghi da spezzare")
|
||
chunks = _fix_warnings(chunks, report)
|
||
|
||
n_after = len(chunks)
|
||
print(f"\n Totale chunk: {n_before} → {n_after}")
|
||
|
||
chunks_path.write_text(
|
||
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
print(f" ✅ Salvato: chunks/{stem}/chunks.json")
|
||
|
||
final_verdict = report.get("verdict", "?")
|
||
if final_verdict == "ok":
|
||
print(f" ✅ Verdict finale: ok - procedi alla vettorizzazione.")
|
||
elif final_verdict == "warnings_only":
|
||
print(f" 🟡 Verdict finale: warnings_only - puoi procedere.")
|
||
else:
|
||
print(f" 🔴 Verdict finale: {final_verdict} - rilancia la verifica manualmente:")
|
||
print(f" python chunks/verify_chunks.py --stem {stem}")
|
||
|
||
return True
|
||
|
||
|
||
# ─── Entry point ──────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
project_root = Path(__file__).parent.parent
|
||
|
||
parser = argparse.ArgumentParser(description="Fix chunk")
|
||
parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di chunks/)")
|
||
_max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
|
||
parser.add_argument(
|
||
"--max", type=int, default=_max_def,
|
||
help=f"Soglia massima caratteri per lo split (default: TARGET×(1+TOL) = {_max_def})"
|
||
)
|
||
parser.add_argument(
|
||
"--dry-run", action="store_true",
|
||
help="Mostra le operazioni pianificate senza applicarle"
|
||
)
|
||
parser.add_argument(
|
||
"--max-iter", type=int, default=10, metavar="N",
|
||
help="Numero massimo di iterazioni automatiche (default: 10)"
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
ok = fix_stem(args.stem, project_root, args.max, args.dry_run, args.max_iter)
|
||
sys.exit(0 if ok else 1)
|