Files
davide 5b63c423cc feat(chunks): ottimizzazione chunking e post-processing
- chunker.py: scrive meta.json con strategia e soglie effettive (target,
  min_chars, max_chars) per ogni documento chunked

- verify_chunks.py:
  * _load_thresholds(): legge min/max da meta.json invece del TARGET_CHARS
    globale, eliminando il mismatch tra soglie chunker e verify
    (h3_aware target=600 -> range 450-750, non piu' validato a 225-375)
  * _ROMAN_END: esclude numeri romani finali (XV, XIV...) dagli incompleti
    perche' sono artefatti indice PDF, non frasi spezzate
  * PUNCT_END: aggiunge ; come fine valida (clausole legali italiane)

- fix_chunks.py:
  * _load_thresholds(): usa max_chars da meta.json per split coerente
  * _SECONDARY_END: split secondario su ; per testo legale multi-clausola
  * Fase 1 (convergenza): risolve solo blockers (incomplete, empty,
    no_prefix) senza toccare warnings -- elimina il ciclo
    merge->too_long->split->incomplete->merge
  * Fase 2 (finale): una sola passata di merge too_short + split too_long
    dopo che i blockers sono azzerati

Risultato su dirittopenale: da blocked (265 incomplete) a warnings_only
in 2 iterazioni, senza cicli infiniti.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 11:09:28 +02:00

498 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Chunking adattivo
Divide il Markdown revisionato in chunk semantici pronti per la
vettorizzazione. La strategia dipende dal profilo strutturale del documento.
Input: conversione/<stem>/clean.md + conversione/<stem>/structure_profile.json
Output: chunks/<stem>/chunks.json
Uso:
python chunks/chunker.py # tutti i documenti in conversione/
python chunks/chunker.py --stem documento # un solo documento
python chunks/chunker.py --stem documento --force
"""
import argparse
import json
import re
import sys
from pathlib import Path
_HERE = Path(__file__).resolve().parent
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
import config as cfg
# ─── Utilità ──────────────────────────────────────────────────────────────────
def split_sentences(text: str) -> list[str]:
parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip())
if len(parts) <= 1:
parts = re.split(r'(?<=[.!?»])\s+', text.strip())
return [p.strip() for p in parts if p.strip()]
def slugify(s: str, max_len: int = 60) -> str:
s = s.lower()
s = re.sub(r'[^\w\s-]', '', s)
s = re.sub(r'[\s_-]+', '_', s).strip('_')
return s[:max_len] if s else "section"
def _is_table_block(text: str) -> bool:
"""True se il testo è prevalentemente una tabella Markdown (≥50% righe con |)."""
lines = [l for l in text.strip().splitlines() if l.strip()]
if not lines:
return False
table_lines = sum(1 for l in lines if l.strip().startswith("|"))
return table_lines / len(lines) >= 0.5
def _ov(strategy: str) -> tuple[int, float, int]:
"""Legge (target_chars, tolerance, overlap) dagli override di strategia."""
ov = cfg.STRATEGY_OVERRIDES.get(strategy, {})
target = ov.get("target_chars", cfg.TARGET_CHARS)
tolerance = ov.get("tolerance", cfg.CHUNK_TOLERANCE)
overlap = ov.get("overlap", cfg.OVERLAP_SENTENCES)
return target, tolerance, overlap
# ─── Core: split in sotto-chunk orientato al target ───────────────────────────
def make_sub_chunks(
body: str,
prefix: str,
sezione: str,
titolo: str,
target: int,
tolerance: float,
overlap_s: int,
) -> list[dict]:
"""Divide body in chunk il più vicini possibile a `target` char.
Logica:
lower = target × (1 tolerance) → soglia minima per emettere
upper = target × (1 + tolerance) → limite massimo
Si accumulano frasi intere finché la successiva farebbe superare `upper`.
A quel punto si emette (siamo vicini al target) e si riparte con overlap.
Ogni chunk termina sempre su un confine di frase; non attraversa mai
il boundary dell'header corrente.
"""
if cfg.PROTECT_TABLES and _is_table_block(body):
chunk_text = prefix + body
return [{
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s0",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": 0,
"n_chars": len(chunk_text),
}]
# Soglia calcolata sul corpo (n_chars finale = prefix_len + body_len).
prefix_len = len(prefix)
upper_body = max(1, int(target * (1 + tolerance)) - prefix_len)
sentences = split_sentences(body)
if not sentences:
return []
chunks: list[dict] = []
current: list[str] = []
current_len = 0
sub_index = 0
def _emit() -> None:
nonlocal current, current_len, sub_index
chunk_text = prefix + " ".join(current)
chunks.append({
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": sub_index,
"n_chars": len(chunk_text),
})
overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else []
current = overlap[:]
# Lunghezza corretta dell'overlap (n-1 spazi tra n frasi).
current_len = sum(len(s) for s in current) + max(0, len(current) - 1)
sub_index += 1
for sent in sentences:
sep = 1 if current else 0
new_len = current_len + sep + len(sent)
if new_len <= upper_body:
# Ancora entro il limite del corpo: aggiungi e continua.
current.append(sent)
current_len = new_len
elif current:
# La frase successiva sfora il limite: emetti il chunk corrente
# (che termina su frase completa) poi inizia il nuovo con questa frase.
_emit()
current.append(sent)
current_len += (1 if current[:-1] else 0) + len(sent)
else:
# Chunk vuoto: la singola frase supera già il limite — emettiamo così com'è.
current.append(sent)
current_len = len(sent)
_emit()
if current:
chunk_text = prefix + " ".join(current)
chunks.append({
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": sub_index,
"n_chars": len(chunk_text),
})
return chunks
# ─── Parser Markdown ──────────────────────────────────────────────────────────
def parse_h3_sections(text: str) -> list[dict]:
sections = []
current_h2 = ""
current_h3 = ""
current_body_lines: list[str] = []
def flush():
body = "\n".join(current_body_lines).strip()
if body:
sections.append({
"sezione": current_h2,
"titolo": current_h3,
"body": body,
})
for line in text.splitlines():
if re.match(r"^# ", line):
flush()
current_h2 = line[2:].strip()
current_h3 = ""
current_body_lines = []
elif re.match(r"^## ", line):
flush()
current_h2 = line[3:].strip()
current_h3 = ""
current_body_lines = []
elif re.match(r"^### ", line):
flush()
current_h3 = line[4:].strip()
current_body_lines = []
else:
current_body_lines.append(line)
flush()
return sections
def parse_h2_sections(text: str) -> list[dict]:
sections = []
current_h2 = ""
current_body_lines: list[str] = []
def flush():
body = "\n".join(current_body_lines).strip()
if body:
sections.append({"sezione": current_h2, "body": body})
for line in text.splitlines():
if re.match(r"^## ", line):
flush()
current_h2 = line[3:].strip()
current_body_lines = []
elif re.match(r"^# ", line):
flush()
current_h2 = line[2:].strip()
current_body_lines = []
else:
current_body_lines.append(line)
flush()
return sections
# ─── Strategie di chunking ────────────────────────────────────────────────────
def chunk_h3_aware(text: str, stem: str) -> list[dict]:
target, tolerance, overlap = _ov("h3_aware")
lower = int(target * (1 - tolerance))
sections = parse_h3_sections(text)
merged: list[dict] = []
pending: dict | None = None
for sec in sections:
if pending is None:
pending = dict(sec)
continue
if (pending["sezione"] == sec["sezione"]
and len(pending["body"]) < lower):
sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]]))
pending = {
"sezione": pending["sezione"],
"titolo": sep_title or pending["titolo"],
"body": pending["body"] + "\n\n" + sec["body"],
}
else:
merged.append(pending)
pending = dict(sec)
if pending:
merged.append(pending)
chunks = []
for sec in merged:
sezione = sec["sezione"] or stem
titolo = sec["titolo"] or ""
body = sec["body"]
prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n"
chunks.extend(make_sub_chunks(body, prefix, sezione, titolo, target, tolerance, overlap))
return chunks
def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]:
target, tolerance, overlap = _ov("h2_paragraph_split")
lower = int(target * (1 - tolerance))
sections = parse_h2_sections(text)
chunks = []
for sec in sections:
sezione = sec["sezione"] or stem
body = sec["body"]
prefix = f"[{sezione}]\n"
paragraphs = [
p.strip()
for p in re.split(r"\n{2,}", body)
if p.strip() and not re.match(r"^#+\s", p.strip())
]
merged_pars: list[str] = []
pending = ""
for par in paragraphs:
if pending and len(pending) < lower:
pending = pending + "\n\n" + par
else:
if pending:
merged_pars.append(pending)
pending = par
if pending:
merged_pars.append(pending)
for idx, par in enumerate(merged_pars):
sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", target, tolerance, overlap)
for c in sub:
c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}"
chunks.extend(sub)
return chunks
def chunk_paragraph(text: str, stem: str) -> list[dict]:
target, tolerance, overlap = _ov("paragraph")
lower = int(target * (1 - tolerance))
paragraphs = [
p.strip()
for p in re.split(r"\n{2,}", text)
if p.strip() and not re.match(r"^#+\s", p.strip())
]
prefix = f"[Documento: {stem}]\n"
merged: list[str] = []
pending = ""
for par in paragraphs:
if pending and len(pending) < lower:
pending = pending + "\n\n" + par
else:
if pending:
merged.append(pending)
pending = par
if pending:
merged.append(pending)
chunks = []
for idx, par in enumerate(merged):
sub = make_sub_chunks(par, prefix, stem, f"par{idx}", target, tolerance, overlap)
for c in sub:
c["chunk_id"] = f"para__{idx}__s{c['sub_index']}"
chunks.extend(sub)
return chunks
def chunk_sliding_window(text: str, stem: str) -> list[dict]:
target, tolerance, overlap = _ov("sliding_window")
upper = int(target * (1 + tolerance))
sentences = split_sentences(text)
prefix = f"[Documento: {stem}]\n"
chunks = []
i = 0
win_idx = 0
while i < len(sentences):
window: list[str] = []
cur_len = 0
j = i
while j < len(sentences):
s = sentences[j]
sep = 1 if window else 0
if window and cur_len + sep + len(s) > upper:
break
window.append(s)
cur_len += sep + len(s)
j += 1
if not window:
window = [sentences[i]]
j = i + 1
chunk_text = prefix + " ".join(window)
chunks.append({
"chunk_id": f"win__{win_idx}",
"text": chunk_text,
"sezione": stem,
"titolo": f"finestra {win_idx}",
"sub_index": win_idx,
"n_chars": len(chunk_text),
})
win_idx += 1
i += max(1, len(window) - overlap)
return chunks
# ─── Dispatcher ───────────────────────────────────────────────────────────────
_STRATEGIES: dict[str, callable] = {
"h3_aware": chunk_h3_aware,
"h2_paragraph_split": chunk_h2_paragraph_split,
"paragraph": chunk_paragraph,
"sliding_window": chunk_sliding_window,
}
def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]:
text = clean_md.read_text(encoding="utf-8")
strategia = profile.get("strategia_chunking", "paragraph")
fn = _STRATEGIES.get(strategia, chunk_paragraph)
return fn(text, stem)
# ─── Per-document processing ──────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
conv_dir = project_root / "conversione" / stem
out_dir = project_root / "chunks" / stem
clean_md = conv_dir / "clean.md"
profile_path = conv_dir / "structure_profile.json"
out_file = out_dir / "chunks.json"
print(f"\nDocumento: {stem}")
if not clean_md.exists():
print(f" ✗ clean.md non trovato in conversione/{stem}/ — skip")
return False
if not profile_path.exists():
print(f" ✗ structure_profile.json non trovato in conversione/{stem}/ — skip")
return False
if out_file.exists() and not force:
print(f" ⚠️ chunks.json già presente — skip")
print(f" (usa --force per rieseguire)")
return True
profile = json.loads(profile_path.read_text(encoding="utf-8"))
strategia = profile.get("strategia_chunking", "paragraph")
print(f" Strategia: {strategia}")
chunks = chunk_document(clean_md, profile, stem)
if not chunks:
print(f" ✗ Nessun chunk generato — controlla clean.md")
return False
out_dir.mkdir(parents=True, exist_ok=True)
out_file.write_text(
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
target, tolerance, _ = _ov(strategia)
lower = int(target * (1 - tolerance))
upper = int(target * (1 + tolerance))
meta = {"strategy": strategia, "target_chars": target,
"min_chars": lower, "max_chars": upper}
(out_dir / "meta.json").write_text(
json.dumps(meta, ensure_ascii=False), encoding="utf-8"
)
lengths = [c["n_chars"] for c in chunks]
min_c = min(lengths)
max_c = max(lengths)
avg_c = int(sum(lengths) / len(lengths))
short = sum(1 for l in lengths if l < lower)
long_ = sum(1 for l in lengths if l > upper)
print(f" Target: {target} char ±{int(tolerance*100)}% "
f"→ range [{lower}, {upper}]")
print(f" Chunk totali: {len(chunks)}")
print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char")
if short:
print(f" ⚠️ {short} chunk sotto lower ({lower})")
if long_:
print(f" ⚠️ {long_} chunk sopra upper ({upper})")
print(f" ✅ chunks.json salvato in chunks/{stem}/")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Chunking adattivo")
parser.add_argument("--stem", help="Nome del documento (sottocartella di conversione/)")
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
conv_dir = project_root / "conversione"
if not conv_dir.exists():
print(f"Errore: cartella conversione/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(
p.name for p in conv_dir.iterdir()
if p.is_dir() and (p / "clean.md").exists()
)
if not stems:
print(f"Errore: nessun documento trovato in conversione/")
sys.exit(1)
results = [process_stem(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti processati")
sys.exit(0 if all(results) else 1)