02c785678d
Introduce chunks/config.py come unica fonte di verità per tutti i parametri della pipeline di chunking. TARGET_CHARS + CHUNK_TOLERANCE sostituiscono MIN_CHARS/MAX_CHARS: il chunker mira a una dimensione target e si avvicina il più possibile rispettando il vincolo assoluto di terminare ogni chunk su un confine di frase (punto/punteggiatura). - config.py: TARGET_CHARS, CHUNK_TOLERANCE, SPLIT_THRESHOLD_FACTOR, PROTECT_TABLES, FIX_MAX_ITERATIONS, STRATEGY_OVERRIDES per strategia - chunker.py: algoritmo target-based (emit quando frase successiva sfora upper_body = upper - prefix_len), table protection atomica, override MIN/MAX/overlap per ciascuna delle 4 strategie - verify_chunks.py: soglie derivate da target*(1±tolerance) - fix_chunks.py: _split_at_boundary sempre su punteggiatura finale, loop ricorsivo fix→verify fino a FIX_MAX_ITERATIONS, split solo per chunk > upper × SPLIT_THRESHOLD_FACTOR Risultato su bitcoin: 694 chunk, 0 incompleti, 83% in range [450,750], tutti terminanti su punteggiatura indipendentemente dalla dimensione. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
492 lines
16 KiB
Python
492 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Chunking adattivo
|
||
|
||
Divide il Markdown revisionato in chunk semantici pronti per la
|
||
vettorizzazione. La strategia dipende dal profilo strutturale del documento.
|
||
|
||
Input: conversione/<stem>/clean.md + conversione/<stem>/structure_profile.json
|
||
Output: chunks/<stem>/chunks.json
|
||
|
||
Uso:
|
||
python chunks/chunker.py # tutti i documenti in conversione/
|
||
python chunks/chunker.py --stem documento # un solo documento
|
||
python chunks/chunker.py --stem documento --force
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
_HERE = Path(__file__).resolve().parent
|
||
if str(_HERE) not in sys.path:
|
||
sys.path.insert(0, str(_HERE))
|
||
import config as cfg
|
||
|
||
|
||
# ─── Utilità ──────────────────────────────────────────────────────────────────
|
||
|
||
def split_sentences(text: str) -> list[str]:
|
||
parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip())
|
||
if len(parts) <= 1:
|
||
parts = re.split(r'(?<=[.!?»])\s+', text.strip())
|
||
return [p.strip() for p in parts if p.strip()]
|
||
|
||
|
||
def slugify(s: str, max_len: int = 60) -> str:
|
||
s = s.lower()
|
||
s = re.sub(r'[^\w\s-]', '', s)
|
||
s = re.sub(r'[\s_-]+', '_', s).strip('_')
|
||
return s[:max_len] if s else "section"
|
||
|
||
|
||
def _is_table_block(text: str) -> bool:
|
||
"""True se il testo è prevalentemente una tabella Markdown (≥50% righe con |)."""
|
||
lines = [l for l in text.strip().splitlines() if l.strip()]
|
||
if not lines:
|
||
return False
|
||
table_lines = sum(1 for l in lines if l.strip().startswith("|"))
|
||
return table_lines / len(lines) >= 0.5
|
||
|
||
|
||
def _ov(strategy: str) -> tuple[int, float, int]:
|
||
"""Legge (target_chars, tolerance, overlap) dagli override di strategia."""
|
||
ov = cfg.STRATEGY_OVERRIDES.get(strategy, {})
|
||
target = ov.get("target_chars", cfg.TARGET_CHARS)
|
||
tolerance = ov.get("tolerance", cfg.CHUNK_TOLERANCE)
|
||
overlap = ov.get("overlap", cfg.OVERLAP_SENTENCES)
|
||
return target, tolerance, overlap
|
||
|
||
|
||
# ─── Core: split in sotto-chunk orientato al target ───────────────────────────
|
||
|
||
def make_sub_chunks(
|
||
body: str,
|
||
prefix: str,
|
||
sezione: str,
|
||
titolo: str,
|
||
target: int,
|
||
tolerance: float,
|
||
overlap_s: int,
|
||
) -> list[dict]:
|
||
"""Divide body in chunk il più vicini possibile a `target` char.
|
||
|
||
Logica:
|
||
lower = target × (1 − tolerance) → soglia minima per emettere
|
||
upper = target × (1 + tolerance) → limite massimo
|
||
|
||
Si accumulano frasi intere finché la successiva farebbe superare `upper`.
|
||
A quel punto si emette (siamo vicini al target) e si riparte con overlap.
|
||
Ogni chunk termina sempre su un confine di frase; non attraversa mai
|
||
il boundary dell'header corrente.
|
||
"""
|
||
if cfg.PROTECT_TABLES and _is_table_block(body):
|
||
chunk_text = prefix + body
|
||
return [{
|
||
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s0",
|
||
"text": chunk_text,
|
||
"sezione": sezione,
|
||
"titolo": titolo,
|
||
"sub_index": 0,
|
||
"n_chars": len(chunk_text),
|
||
}]
|
||
|
||
# Soglia calcolata sul corpo (n_chars finale = prefix_len + body_len).
|
||
prefix_len = len(prefix)
|
||
upper_body = max(1, int(target * (1 + tolerance)) - prefix_len)
|
||
|
||
sentences = split_sentences(body)
|
||
if not sentences:
|
||
return []
|
||
|
||
chunks: list[dict] = []
|
||
current: list[str] = []
|
||
current_len = 0
|
||
sub_index = 0
|
||
|
||
def _emit() -> None:
|
||
nonlocal current, current_len, sub_index
|
||
chunk_text = prefix + " ".join(current)
|
||
chunks.append({
|
||
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
|
||
"text": chunk_text,
|
||
"sezione": sezione,
|
||
"titolo": titolo,
|
||
"sub_index": sub_index,
|
||
"n_chars": len(chunk_text),
|
||
})
|
||
overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else []
|
||
current = overlap[:]
|
||
# Lunghezza corretta dell'overlap (n-1 spazi tra n frasi).
|
||
current_len = sum(len(s) for s in current) + max(0, len(current) - 1)
|
||
sub_index += 1
|
||
|
||
for sent in sentences:
|
||
sep = 1 if current else 0
|
||
new_len = current_len + sep + len(sent)
|
||
|
||
if new_len <= upper_body:
|
||
# Ancora entro il limite del corpo: aggiungi e continua.
|
||
current.append(sent)
|
||
current_len = new_len
|
||
elif current:
|
||
# La frase successiva sfora il limite: emetti il chunk corrente
|
||
# (che termina su frase completa) poi inizia il nuovo con questa frase.
|
||
_emit()
|
||
current.append(sent)
|
||
current_len += (1 if current[:-1] else 0) + len(sent)
|
||
else:
|
||
# Chunk vuoto: la singola frase supera già il limite — emettiamo così com'è.
|
||
current.append(sent)
|
||
current_len = len(sent)
|
||
_emit()
|
||
|
||
if current:
|
||
chunk_text = prefix + " ".join(current)
|
||
chunks.append({
|
||
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
|
||
"text": chunk_text,
|
||
"sezione": sezione,
|
||
"titolo": titolo,
|
||
"sub_index": sub_index,
|
||
"n_chars": len(chunk_text),
|
||
})
|
||
|
||
return chunks
|
||
|
||
|
||
# ─── Parser Markdown ──────────────────────────────────────────────────────────
|
||
|
||
def parse_h3_sections(text: str) -> list[dict]:
|
||
sections = []
|
||
current_h2 = ""
|
||
current_h3 = ""
|
||
current_body_lines: list[str] = []
|
||
|
||
def flush():
|
||
body = "\n".join(current_body_lines).strip()
|
||
if body:
|
||
sections.append({
|
||
"sezione": current_h2,
|
||
"titolo": current_h3,
|
||
"body": body,
|
||
})
|
||
|
||
for line in text.splitlines():
|
||
if re.match(r"^# ", line):
|
||
flush()
|
||
current_h2 = line[2:].strip()
|
||
current_h3 = ""
|
||
current_body_lines = []
|
||
elif re.match(r"^## ", line):
|
||
flush()
|
||
current_h2 = line[3:].strip()
|
||
current_h3 = ""
|
||
current_body_lines = []
|
||
elif re.match(r"^### ", line):
|
||
flush()
|
||
current_h3 = line[4:].strip()
|
||
current_body_lines = []
|
||
else:
|
||
current_body_lines.append(line)
|
||
|
||
flush()
|
||
return sections
|
||
|
||
|
||
def parse_h2_sections(text: str) -> list[dict]:
|
||
sections = []
|
||
current_h2 = ""
|
||
current_body_lines: list[str] = []
|
||
|
||
def flush():
|
||
body = "\n".join(current_body_lines).strip()
|
||
if body:
|
||
sections.append({"sezione": current_h2, "body": body})
|
||
|
||
for line in text.splitlines():
|
||
if re.match(r"^## ", line):
|
||
flush()
|
||
current_h2 = line[3:].strip()
|
||
current_body_lines = []
|
||
elif re.match(r"^# ", line):
|
||
flush()
|
||
current_h2 = line[2:].strip()
|
||
current_body_lines = []
|
||
else:
|
||
current_body_lines.append(line)
|
||
|
||
flush()
|
||
return sections
|
||
|
||
|
||
# ─── Strategie di chunking ────────────────────────────────────────────────────
|
||
|
||
def chunk_h3_aware(text: str, stem: str) -> list[dict]:
|
||
target, tolerance, overlap = _ov("h3_aware")
|
||
lower = int(target * (1 - tolerance))
|
||
|
||
sections = parse_h3_sections(text)
|
||
|
||
merged: list[dict] = []
|
||
pending: dict | None = None
|
||
|
||
for sec in sections:
|
||
if pending is None:
|
||
pending = dict(sec)
|
||
continue
|
||
|
||
if (pending["sezione"] == sec["sezione"]
|
||
and len(pending["body"]) < lower):
|
||
sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]]))
|
||
pending = {
|
||
"sezione": pending["sezione"],
|
||
"titolo": sep_title or pending["titolo"],
|
||
"body": pending["body"] + "\n\n" + sec["body"],
|
||
}
|
||
else:
|
||
merged.append(pending)
|
||
pending = dict(sec)
|
||
|
||
if pending:
|
||
merged.append(pending)
|
||
|
||
chunks = []
|
||
for sec in merged:
|
||
sezione = sec["sezione"] or stem
|
||
titolo = sec["titolo"] or ""
|
||
body = sec["body"]
|
||
prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n"
|
||
chunks.extend(make_sub_chunks(body, prefix, sezione, titolo, target, tolerance, overlap))
|
||
|
||
return chunks
|
||
|
||
|
||
def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]:
|
||
target, tolerance, overlap = _ov("h2_paragraph_split")
|
||
lower = int(target * (1 - tolerance))
|
||
|
||
sections = parse_h2_sections(text)
|
||
chunks = []
|
||
|
||
for sec in sections:
|
||
sezione = sec["sezione"] or stem
|
||
body = sec["body"]
|
||
prefix = f"[{sezione}]\n"
|
||
|
||
paragraphs = [
|
||
p.strip()
|
||
for p in re.split(r"\n{2,}", body)
|
||
if p.strip() and not re.match(r"^#+\s", p.strip())
|
||
]
|
||
|
||
merged_pars: list[str] = []
|
||
pending = ""
|
||
for par in paragraphs:
|
||
if pending and len(pending) < lower:
|
||
pending = pending + "\n\n" + par
|
||
else:
|
||
if pending:
|
||
merged_pars.append(pending)
|
||
pending = par
|
||
if pending:
|
||
merged_pars.append(pending)
|
||
|
||
for idx, par in enumerate(merged_pars):
|
||
sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", target, tolerance, overlap)
|
||
for c in sub:
|
||
c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}"
|
||
chunks.extend(sub)
|
||
|
||
return chunks
|
||
|
||
|
||
def chunk_paragraph(text: str, stem: str) -> list[dict]:
|
||
target, tolerance, overlap = _ov("paragraph")
|
||
lower = int(target * (1 - tolerance))
|
||
|
||
paragraphs = [
|
||
p.strip()
|
||
for p in re.split(r"\n{2,}", text)
|
||
if p.strip() and not re.match(r"^#+\s", p.strip())
|
||
]
|
||
prefix = f"[Documento: {stem}]\n"
|
||
|
||
merged: list[str] = []
|
||
pending = ""
|
||
for par in paragraphs:
|
||
if pending and len(pending) < lower:
|
||
pending = pending + "\n\n" + par
|
||
else:
|
||
if pending:
|
||
merged.append(pending)
|
||
pending = par
|
||
if pending:
|
||
merged.append(pending)
|
||
|
||
chunks = []
|
||
for idx, par in enumerate(merged):
|
||
sub = make_sub_chunks(par, prefix, stem, f"par{idx}", target, tolerance, overlap)
|
||
for c in sub:
|
||
c["chunk_id"] = f"para__{idx}__s{c['sub_index']}"
|
||
chunks.extend(sub)
|
||
|
||
return chunks
|
||
|
||
|
||
def chunk_sliding_window(text: str, stem: str) -> list[dict]:
|
||
target, tolerance, overlap = _ov("sliding_window")
|
||
upper = int(target * (1 + tolerance))
|
||
|
||
sentences = split_sentences(text)
|
||
prefix = f"[Documento: {stem}]\n"
|
||
|
||
chunks = []
|
||
i = 0
|
||
win_idx = 0
|
||
|
||
while i < len(sentences):
|
||
window: list[str] = []
|
||
cur_len = 0
|
||
|
||
j = i
|
||
while j < len(sentences):
|
||
s = sentences[j]
|
||
sep = 1 if window else 0
|
||
if window and cur_len + sep + len(s) > upper:
|
||
break
|
||
window.append(s)
|
||
cur_len += sep + len(s)
|
||
j += 1
|
||
|
||
if not window:
|
||
window = [sentences[i]]
|
||
j = i + 1
|
||
|
||
chunk_text = prefix + " ".join(window)
|
||
chunks.append({
|
||
"chunk_id": f"win__{win_idx}",
|
||
"text": chunk_text,
|
||
"sezione": stem,
|
||
"titolo": f"finestra {win_idx}",
|
||
"sub_index": win_idx,
|
||
"n_chars": len(chunk_text),
|
||
})
|
||
win_idx += 1
|
||
i += max(1, len(window) - overlap)
|
||
|
||
return chunks
|
||
|
||
|
||
# ─── Dispatcher ───────────────────────────────────────────────────────────────
|
||
|
||
_STRATEGIES: dict[str, callable] = {
|
||
"h3_aware": chunk_h3_aware,
|
||
"h2_paragraph_split": chunk_h2_paragraph_split,
|
||
"paragraph": chunk_paragraph,
|
||
"sliding_window": chunk_sliding_window,
|
||
}
|
||
|
||
|
||
def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]:
|
||
text = clean_md.read_text(encoding="utf-8")
|
||
strategia = profile.get("strategia_chunking", "paragraph")
|
||
fn = _STRATEGIES.get(strategia, chunk_paragraph)
|
||
return fn(text, stem)
|
||
|
||
|
||
# ─── Per-document processing ──────────────────────────────────────────────────
|
||
|
||
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
|
||
conv_dir = project_root / "conversione" / stem
|
||
out_dir = project_root / "chunks" / stem
|
||
clean_md = conv_dir / "clean.md"
|
||
profile_path = conv_dir / "structure_profile.json"
|
||
out_file = out_dir / "chunks.json"
|
||
|
||
print(f"\nDocumento: {stem}")
|
||
|
||
if not clean_md.exists():
|
||
print(f" ✗ clean.md non trovato in conversione/{stem}/ — skip")
|
||
return False
|
||
if not profile_path.exists():
|
||
print(f" ✗ structure_profile.json non trovato in conversione/{stem}/ — skip")
|
||
return False
|
||
|
||
if out_file.exists() and not force:
|
||
print(f" ⚠️ chunks.json già presente — skip")
|
||
print(f" (usa --force per rieseguire)")
|
||
return True
|
||
|
||
profile = json.loads(profile_path.read_text(encoding="utf-8"))
|
||
strategia = profile.get("strategia_chunking", "paragraph")
|
||
print(f" Strategia: {strategia}")
|
||
|
||
chunks = chunk_document(clean_md, profile, stem)
|
||
|
||
if not chunks:
|
||
print(f" ✗ Nessun chunk generato — controlla clean.md")
|
||
return False
|
||
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
out_file.write_text(
|
||
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
|
||
target, tolerance, _ = _ov(strategia)
|
||
lower = int(target * (1 - tolerance))
|
||
upper = int(target * (1 + tolerance))
|
||
|
||
lengths = [c["n_chars"] for c in chunks]
|
||
min_c = min(lengths)
|
||
max_c = max(lengths)
|
||
avg_c = int(sum(lengths) / len(lengths))
|
||
short = sum(1 for l in lengths if l < lower)
|
||
long_ = sum(1 for l in lengths if l > upper)
|
||
|
||
print(f" Target: {target} char ±{int(tolerance*100)}% "
|
||
f"→ range [{lower}, {upper}]")
|
||
print(f" Chunk totali: {len(chunks)}")
|
||
print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char")
|
||
if short:
|
||
print(f" ⚠️ {short} chunk sotto lower ({lower})")
|
||
if long_:
|
||
print(f" ⚠️ {long_} chunk sopra upper ({upper})")
|
||
print(f" ✅ chunks.json salvato in chunks/{stem}/")
|
||
return True
|
||
|
||
|
||
# ─── Entry point ─────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
project_root = Path(__file__).parent.parent
|
||
|
||
parser = argparse.ArgumentParser(description="Chunking adattivo")
|
||
parser.add_argument("--stem", help="Nome del documento (sottocartella di conversione/)")
|
||
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
|
||
args = parser.parse_args()
|
||
|
||
if args.stem:
|
||
stems = [args.stem]
|
||
else:
|
||
conv_dir = project_root / "conversione"
|
||
if not conv_dir.exists():
|
||
print(f"Errore: cartella conversione/ non trovata in {project_root}")
|
||
sys.exit(1)
|
||
stems = sorted(
|
||
p.name for p in conv_dir.iterdir()
|
||
if p.is_dir() and (p / "clean.md").exists()
|
||
)
|
||
if not stems:
|
||
print(f"Errore: nessun documento trovato in conversione/")
|
||
sys.exit(1)
|
||
|
||
results = [process_stem(s, project_root, args.force) for s in stems]
|
||
|
||
ok = sum(results)
|
||
total = len(results)
|
||
print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti processati")
|
||
sys.exit(0 if all(results) else 1)
|