feat(chunks): target-based chunking con config centralizzata

Introduce chunks/config.py come unica fonte di verità per tutti i
parametri della pipeline di chunking. TARGET_CHARS + CHUNK_TOLERANCE
sostituiscono MIN_CHARS/MAX_CHARS: il chunker mira a una dimensione
target e si avvicina il più possibile rispettando il vincolo assoluto
di terminare ogni chunk su un confine di frase (punto/punteggiatura).

- config.py: TARGET_CHARS, CHUNK_TOLERANCE, SPLIT_THRESHOLD_FACTOR,
  PROTECT_TABLES, FIX_MAX_ITERATIONS, STRATEGY_OVERRIDES per strategia
- chunker.py: algoritmo target-based (emit quando frase successiva
  sfora upper_body = upper - prefix_len), table protection atomica,
  override MIN/MAX/overlap per ciascuna delle 4 strategie
- verify_chunks.py: soglie derivate da target*(1±tolerance)
- fix_chunks.py: _split_at_boundary sempre su punteggiatura finale,
  loop ricorsivo fix→verify fino a FIX_MAX_ITERATIONS, split solo
  per chunk > upper × SPLIT_THRESHOLD_FACTOR

Risultato su bitcoin: 694 chunk, 0 incompleti, 83% in range [450,750],
tutti terminanti su punteggiatura indipendentemente dalla dimensione.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-11 15:45:24 +02:00
parent 508587c5bf
commit 02c785678d
4 changed files with 342 additions and 130 deletions
+134 -86
View File
@@ -20,12 +20,10 @@ import re
import sys
from pathlib import Path
# ─── Parametri ────────────────────────────────────────────────────────────────
MIN_CHARS = 200 # sotto questa soglia → accorpa al chunk successivo
MAX_CHARS = 800 # sopra questa soglia → spezza su frasi
OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary
_HERE = Path(__file__).resolve().parent
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
import config as cfg
# ─── Utilità ──────────────────────────────────────────────────────────────────
@@ -44,73 +42,106 @@ def slugify(s: str, max_len: int = 60) -> str:
return s[:max_len] if s else "section"
_SENT_BOUNDARY = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d/:|\u2026]$")
def _is_table_block(text: str) -> bool:
"""True se il testo è prevalentemente una tabella Markdown (≥50% righe con |)."""
lines = [l for l in text.strip().splitlines() if l.strip()]
if not lines:
return False
table_lines = sum(1 for l in lines if l.strip().startswith("|"))
return table_lines / len(lines) >= 0.5
def _flush_chunk(
current: list[str],
sentences: list[str],
i: int,
prefix: str,
sezione: str,
titolo: str,
sub_index: int,
max_chars: int,
) -> tuple[dict, list[str], int, int]:
"""Emette un chunk, estendendo fino a un confine di frase (max +20%)."""
hard_limit = int(max_chars * 1.2)
current_len = sum(len(s) + 1 for s in current)
while i < len(sentences) and not _SENT_BOUNDARY.search(" ".join(current)):
nxt = sentences[i]
if current_len + len(nxt) + 1 > hard_limit:
break
current.append(nxt)
current_len += len(nxt) + 1
i += 1
chunk_text = prefix + " ".join(current)
chunk = {
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": sub_index,
"n_chars": len(chunk_text),
}
return chunk, current, i, sub_index + 1
def _ov(strategy: str) -> tuple[int, float, int]:
"""Legge (target_chars, tolerance, overlap) dagli override di strategia."""
ov = cfg.STRATEGY_OVERRIDES.get(strategy, {})
target = ov.get("target_chars", cfg.TARGET_CHARS)
tolerance = ov.get("tolerance", cfg.CHUNK_TOLERANCE)
overlap = ov.get("overlap", cfg.OVERLAP_SENTENCES)
return target, tolerance, overlap
# ─── Core: split in sotto-chunk orientato al target ───────────────────────────
def make_sub_chunks(
body: str,
prefix: str,
sezione: str,
titolo: str,
max_chars: int,
target: int,
tolerance: float,
overlap_s: int,
) -> list[dict]:
"""Divide body in chunk il più vicini possibile a `target` char.
Logica:
lower = target × (1 tolerance) → soglia minima per emettere
upper = target × (1 + tolerance) → limite massimo
Si accumulano frasi intere finché la successiva farebbe superare `upper`.
A quel punto si emette (siamo vicini al target) e si riparte con overlap.
Ogni chunk termina sempre su un confine di frase; non attraversa mai
il boundary dell'header corrente.
"""
if cfg.PROTECT_TABLES and _is_table_block(body):
chunk_text = prefix + body
return [{
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s0",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": 0,
"n_chars": len(chunk_text),
}]
# Soglia calcolata sul corpo (n_chars finale = prefix_len + body_len).
prefix_len = len(prefix)
upper_body = max(1, int(target * (1 + tolerance)) - prefix_len)
sentences = split_sentences(body)
if not sentences:
return []
chunks = []
chunks: list[dict] = []
current: list[str] = []
current_len = 0
sub_index = 0
i = 0
while i < len(sentences):
sent = sentences[i]
if not current or current_len + len(sent) + 1 <= max_chars:
def _emit() -> None:
nonlocal current, current_len, sub_index
chunk_text = prefix + " ".join(current)
chunks.append({
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": sub_index,
"n_chars": len(chunk_text),
})
overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else []
current = overlap[:]
# Lunghezza corretta dell'overlap (n-1 spazi tra n frasi).
current_len = sum(len(s) for s in current) + max(0, len(current) - 1)
sub_index += 1
for sent in sentences:
sep = 1 if current else 0
new_len = current_len + sep + len(sent)
if new_len <= upper_body:
# Ancora entro il limite del corpo: aggiungi e continua.
current.append(sent)
current_len += len(sent) + (1 if len(current) > 1 else 0)
i += 1
current_len = new_len
elif current:
# La frase successiva sfora il limite: emetti il chunk corrente
# (che termina su frase completa) poi inizia il nuovo con questa frase.
_emit()
current.append(sent)
current_len += (1 if current[:-1] else 0) + len(sent)
else:
chunk, current, i, sub_index = _flush_chunk(
current, sentences, i, prefix, sezione, titolo, sub_index, max_chars
)
chunks.append(chunk)
overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else []
current = overlap[:]
current_len = sum(len(s) + 1 for s in current)
# Chunk vuoto: la singola frase supera già il limite — emettiamo così com'è.
current.append(sent)
current_len = len(sent)
_emit()
if current:
chunk_text = prefix + " ".join(current)
@@ -194,6 +225,9 @@ def parse_h2_sections(text: str) -> list[dict]:
# ─── Strategie di chunking ────────────────────────────────────────────────────
def chunk_h3_aware(text: str, stem: str) -> list[dict]:
target, tolerance, overlap = _ov("h3_aware")
lower = int(target * (1 - tolerance))
sections = parse_h3_sections(text)
merged: list[dict] = []
@@ -205,7 +239,7 @@ def chunk_h3_aware(text: str, stem: str) -> list[dict]:
continue
if (pending["sezione"] == sec["sezione"]
and len(pending["body"]) < MIN_CHARS):
and len(pending["body"]) < lower):
sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]]))
pending = {
"sezione": pending["sezione"],
@@ -222,24 +256,25 @@ def chunk_h3_aware(text: str, stem: str) -> list[dict]:
chunks = []
for sec in merged:
sezione = sec["sezione"] or stem
titolo = sec["titolo"] or ""
body = sec["body"]
prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n"
sub = make_sub_chunks(body, prefix, sezione, titolo, MAX_CHARS, OVERLAP_S)
chunks.extend(sub)
titolo = sec["titolo"] or ""
body = sec["body"]
prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n"
chunks.extend(make_sub_chunks(body, prefix, sezione, titolo, target, tolerance, overlap))
return chunks
def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]:
target, tolerance, overlap = _ov("h2_paragraph_split")
lower = int(target * (1 - tolerance))
sections = parse_h2_sections(text)
chunks = []
for sec in sections:
sezione = sec["sezione"] or stem
body = sec["body"]
prefix = f"[{sezione}]\n"
body = sec["body"]
prefix = f"[{sezione}]\n"
paragraphs = [
p.strip()
@@ -250,7 +285,7 @@ def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]:
merged_pars: list[str] = []
pending = ""
for par in paragraphs:
if pending and len(pending) < MIN_CHARS:
if pending and len(pending) < lower:
pending = pending + "\n\n" + par
else:
if pending:
@@ -260,7 +295,7 @@ def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]:
merged_pars.append(pending)
for idx, par in enumerate(merged_pars):
sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", MAX_CHARS, OVERLAP_S)
sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", target, tolerance, overlap)
for c in sub:
c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}"
chunks.extend(sub)
@@ -269,6 +304,9 @@ def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]:
def chunk_paragraph(text: str, stem: str) -> list[dict]:
target, tolerance, overlap = _ov("paragraph")
lower = int(target * (1 - tolerance))
paragraphs = [
p.strip()
for p in re.split(r"\n{2,}", text)
@@ -279,7 +317,7 @@ def chunk_paragraph(text: str, stem: str) -> list[dict]:
merged: list[str] = []
pending = ""
for par in paragraphs:
if pending and len(pending) < MIN_CHARS:
if pending and len(pending) < lower:
pending = pending + "\n\n" + par
else:
if pending:
@@ -290,7 +328,7 @@ def chunk_paragraph(text: str, stem: str) -> list[dict]:
chunks = []
for idx, par in enumerate(merged):
sub = make_sub_chunks(par, prefix, stem, f"par{idx}", MAX_CHARS, OVERLAP_S)
sub = make_sub_chunks(par, prefix, stem, f"par{idx}", target, tolerance, overlap)
for c in sub:
c["chunk_id"] = f"para__{idx}__s{c['sub_index']}"
chunks.extend(sub)
@@ -299,6 +337,9 @@ def chunk_paragraph(text: str, stem: str) -> list[dict]:
def chunk_sliding_window(text: str, stem: str) -> list[dict]:
target, tolerance, overlap = _ov("sliding_window")
upper = int(target * (1 + tolerance))
sentences = split_sentences(text)
prefix = f"[Documento: {stem}]\n"
@@ -313,10 +354,11 @@ def chunk_sliding_window(text: str, stem: str) -> list[dict]:
j = i
while j < len(sentences):
s = sentences[j]
if window and cur_len + len(s) + 1 > MAX_CHARS:
sep = 1 if window else 0
if window and cur_len + sep + len(s) > upper:
break
window.append(s)
cur_len += len(s) + (1 if len(window) > 1 else 0)
cur_len += sep + len(s)
j += 1
if not window:
@@ -333,7 +375,7 @@ def chunk_sliding_window(text: str, stem: str) -> list[dict]:
"n_chars": len(chunk_text),
})
win_idx += 1
i += max(1, len(window) - OVERLAP_S)
i += max(1, len(window) - overlap)
return chunks
@@ -341,28 +383,28 @@ def chunk_sliding_window(text: str, stem: str) -> list[dict]:
# ─── Dispatcher ───────────────────────────────────────────────────────────────
_STRATEGIES: dict[str, callable] = {
"h3_aware": chunk_h3_aware,
"h2_paragraph_split": chunk_h2_paragraph_split,
"paragraph": chunk_paragraph,
"sliding_window": chunk_sliding_window,
"h3_aware": chunk_h3_aware,
"h2_paragraph_split": chunk_h2_paragraph_split,
"paragraph": chunk_paragraph,
"sliding_window": chunk_sliding_window,
}
def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]:
text = clean_md.read_text(encoding="utf-8")
text = clean_md.read_text(encoding="utf-8")
strategia = profile.get("strategia_chunking", "paragraph")
fn = _STRATEGIES.get(strategia, chunk_paragraph)
fn = _STRATEGIES.get(strategia, chunk_paragraph)
return fn(text, stem)
# ─── Per-document processing ──────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
conv_dir = project_root / "conversione" / stem
out_dir = project_root / "chunks" / stem
clean_md = conv_dir / "clean.md"
conv_dir = project_root / "conversione" / stem
out_dir = project_root / "chunks" / stem
clean_md = conv_dir / "clean.md"
profile_path = conv_dir / "structure_profile.json"
out_file = out_dir / "chunks.json"
out_file = out_dir / "chunks.json"
print(f"\nDocumento: {stem}")
@@ -393,19 +435,25 @@ def process_stem(stem: str, project_root: Path, force: bool) -> bool:
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
lengths = [c["n_chars"] for c in chunks]
min_c = min(lengths)
max_c = max(lengths)
avg_c = int(sum(lengths) / len(lengths))
short = sum(1 for l in lengths if l < MIN_CHARS)
long_ = sum(1 for l in lengths if l > MAX_CHARS * 1.5)
target, tolerance, _ = _ov(strategia)
lower = int(target * (1 - tolerance))
upper = int(target * (1 + tolerance))
lengths = [c["n_chars"] for c in chunks]
min_c = min(lengths)
max_c = max(lengths)
avg_c = int(sum(lengths) / len(lengths))
short = sum(1 for l in lengths if l < lower)
long_ = sum(1 for l in lengths if l > upper)
print(f" Target: {target} char ±{int(tolerance*100)}% "
f"→ range [{lower}, {upper}]")
print(f" Chunk totali: {len(chunks)}")
print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char")
if short:
print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})")
print(f" ⚠️ {short} chunk sotto lower ({lower})")
if long_:
print(f" ⚠️ {long_} chunk sopra MAX_CHARS×1.5 ({int(MAX_CHARS * 1.5)})")
print(f" ⚠️ {long_} chunk sopra upper ({upper})")
print(f" ✅ chunks.json salvato in chunks/{stem}/")
return True
+88
View File
@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""
Parametri di configurazione della pipeline di chunking.
Modifica questo file per cambiare il comportamento di chunker.py,
verify_chunks.py e fix_chunks.py senza toccare il codice applicativo.
"""
# ─── Grandezza target dei chunk ───────────────────────────────────────────────
#
# TARGET_CHARS è la dimensione ideale a cui il chunker mira.
# CHUNK_TOLERANCE è la tolleranza relativa (es. 0.25 = ±25%).
#
# range accettabile = [TARGET × (1 TOL), TARGET × (1 + TOL)]
#
# Con TARGET=600 e TOL=0.25 → ogni chunk sarà tra 450 e 750 char,
# il più vicino possibile a 600, terminando sempre su un confine di frase.
#
TARGET_CHARS = 600
CHUNK_TOLERANCE = 0.25
# ─── Overlap ──────────────────────────────────────────────────────────────────
# Numero di frasi ripetute all'inizio del chunk successivo per preservare
# il contesto tra chunk adiacenti della stessa sezione.
OVERLAP_SENTENCES = 1
# ─── Soglie di validazione ────────────────────────────────────────────────────
# fix_chunks.py spezza un chunk "too_long" solo se supera upper × questo fattore.
# Es. upper=750, fattore=1.5 → split solo per chunk > 1125 char.
# Chunk in [upper, upper×fattore] restano come warning non bloccanti.
SPLIT_THRESHOLD_FACTOR = 1.5
MATH_SYMS_MIN = 3 # min. simboli math per declassare incomplete → incomplete_math
# ─── Pattern e formato ────────────────────────────────────────────────────────
SENTENCE_SPLIT_PATTERN = r"(?<=[.!?»])\s+"
PREFIX_TEMPLATE = "[{sezione} > {titolo}]"
# ─── Protezione contenuti speciali ────────────────────────────────────────────
# Se True, un blocco prevalentemente tabella Markdown (≥50% righe |…|)
# viene emesso come chunk atomico senza sentence-splitting.
PROTECT_TABLES = True
# Riservato — blocchi LaTeX non spezzabili (implementazione futura).
PROTECT_MATH = True
# ─── Fix behavior ─────────────────────────────────────────────────────────────
# Numero massimo di iterazioni del loop fix → verify → fix.
# Con 1 si ottiene il comportamento originale (fix singolo senza re-verifica).
FIX_MAX_ITERATIONS = 3
# ─── Override per strategia ───────────────────────────────────────────────────
#
# Sovrascrivono TARGET_CHARS / CHUNK_TOLERANCE / OVERLAP_SENTENCES
# per la specifica strategia indicata in structure_profile.json.
# Chiavi riconosciute: "target_chars", "tolerance", "overlap".
#
STRATEGY_OVERRIDES: dict[str, dict] = {
"h3_aware": {
# Documenti strutturati H2→H3: chunk medi, overlap moderato.
"target_chars": 600,
"tolerance": 0.25,
"overlap": 2,
},
"h2_paragraph_split": {
# Documenti piatti (solo H2): chunk più ampi, overlap ridotto.
"target_chars": 800,
"tolerance": 0.25,
"overlap": 1,
},
"paragraph": {
# Documenti senza header significativi: chunk più corti.
"target_chars": 500,
"tolerance": 0.30,
"overlap": 1,
},
"sliding_window": {
# Testo lineare/narrativo: finestre ampie, overlap generoso.
"target_chars": 800,
"tolerance": 0.25,
"overlap": 3,
},
}
+96 -32
View File
@@ -26,7 +26,13 @@ import re
import sys
from pathlib import Path
MAX_CHARS = 800
_HERE = Path(__file__).resolve().parent
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
import config as cfg
from verify_chunks import verify_stem as _verify_stem
MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$")
@@ -53,7 +59,11 @@ def _rebuild_text(chunk: dict, body: str) -> str:
return f"{_prefix(chunk)}\n{body}"
_SENT_END = re.compile(r'[.!?»)\]\'"’”…]')
def _split_at_boundary(text: str, max_chars: int) -> list[str]:
"""Spezza text in parti ≤ max_chars, ciascuna terminante su punteggiatura."""
if len(text) <= max_chars:
return [text]
@@ -62,20 +72,28 @@ def _split_at_boundary(text: str, max_chars: int) -> list[str]:
while len(remaining) > max_chars:
candidate = remaining[:max_chars]
split_pos = candidate.rfind("\n\n")
if split_pos == -1:
m = None
for m in re.finditer(r"[.!?»]\s+", candidate):
pass
split_pos = m.end() if m else None
# Cerca l'ultima punteggiatura finale entro max_chars.
last_punct = -1
for m in _SENT_END.finditer(candidate):
last_punct = m.end() # posizione dopo il carattere di punteggiatura
if split_pos is None or split_pos == 0:
sp = remaining.find(" ", max_chars)
split_pos = sp if sp != -1 else len(remaining)
if last_punct > 0:
# Taglia dopo la punteggiatura; il resto inizia alla parola successiva.
first = remaining[:last_punct].rstrip()
remaining = remaining[last_punct:].lstrip()
else:
# Nessuna punteggiatura: taglia all'ultimo spazio disponibile.
sp = candidate.rfind(" ")
if sp > 0:
first = remaining[:sp].rstrip()
remaining = remaining[sp:].lstrip()
else:
first = remaining[:max_chars]
remaining = remaining[max_chars:]
parts.append(remaining[:split_pos].rstrip())
remaining = remaining[split_pos:].lstrip()
if first:
parts.append(first)
if remaining:
parts.append(remaining)
@@ -202,7 +220,15 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo
no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
too_long_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_long", [])}
# Spezza solo chunk che superano upper × SPLIT_THRESHOLD_FACTOR,
# non quelli appena oltre upper (che causerebbero split con chunk incompleti).
_split_limit = max_chars * cfg.SPLIT_THRESHOLD_FACTOR
too_long_ids = {
e["chunk_id"]
for e in report.get("warnings", {}).get("too_long", [])
if e.get("n_chars", 0) > _split_limit
}
ops: list[str] = []
if empty_ids:
@@ -230,24 +256,54 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo
n_before = len(chunks)
if empty_ids:
chunks, n = fix_empty(chunks, empty_ids)
print(f"\n 🗑 Rimossi {n} chunk vuoti.")
def _apply_fixes(chunks: list[dict], report: dict) -> list[dict]:
empty_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
no_prefix_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
incomplete_ids_= {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
too_short_ids_ = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
too_long_ids_ = {
e["chunk_id"]
for e in report.get("warnings", {}).get("too_long", [])
if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR
}
if no_prefix_ids:
chunks, n = fix_no_prefix(chunks, no_prefix_ids)
print(f" 🔧 Aggiunto prefisso a {n} chunk.")
if empty_ids_:
chunks, n = fix_empty(chunks, empty_ids_)
print(f" 🗑 Rimossi {n} chunk vuoti.")
if no_prefix_ids_:
chunks, n = fix_no_prefix(chunks, no_prefix_ids_)
print(f" 🔧 Aggiunto prefisso a {n} chunk.")
merge_ids_ = incomplete_ids_ | too_short_ids_
if merge_ids_:
chunks, n = fix_incomplete_and_short(chunks, merge_ids_)
print(f" 🔗 Fusi {n} chunk (incompleti + corti).")
if too_long_ids_:
chunks, n = fix_too_long(chunks, too_long_ids_, max_chars)
print(f" ✂️ Spezzati {n} chunk lunghi.")
return renumber_ids(chunks)
merge_ids = incomplete_ids | too_short_ids
if merge_ids:
chunks, n = fix_incomplete_and_short(chunks, merge_ids)
print(f" 🔗 Fusi {n} chunk (incompleti + corti).")
chunks = _apply_fixes(chunks, report)
if too_long_ids:
chunks, n = fix_too_long(chunks, too_long_ids, max_chars)
print(f" ✂️ Spezzati {n} chunk lunghi.")
chunks = renumber_ids(chunks)
for iteration in range(1, cfg.FIX_MAX_ITERATIONS):
chunks_path.write_text(
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
project_root = chunks_path.parent.parent.parent
_min = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
_max = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
_verify_stem(stem, project_root, _min, _max)
report = json.loads(report_path.read_text(encoding="utf-8"))
new_verdict = report.get("verdict", "ok")
if new_verdict in ("ok", "warnings_only"):
break
remaining_blockers = sum(
len(v) for v in report.get("blockers", {}).values()
)
if remaining_blockers == 0:
break
print(f"\n Iterazione {iteration + 1}/{cfg.FIX_MAX_ITERATIONS} "
f"{remaining_blockers} bloccer residui:")
chunks = _apply_fixes(chunks, report)
n_after = len(chunks)
print(f"\n Totale chunk: {n_before}{n_after}")
@@ -256,8 +312,15 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f" ✅ Salvato: chunks/{stem}/chunks.json")
print(f"\n Riesegui la verifica:")
print(f" python chunks/verify_chunks.py --stem {stem}")
final_verdict = report.get("verdict", "?")
if final_verdict == "ok":
print(f" ✅ Verdict finale: ok — procedi alla vettorizzazione.")
elif final_verdict == "warnings_only":
print(f" 🟡 Verdict finale: warnings_only — puoi procedere.")
else:
print(f" 🔴 Verdict finale: {final_verdict} — rilancia la verifica manualmente:")
print(f" python chunks/verify_chunks.py --stem {stem}")
return True
@@ -269,9 +332,10 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fix chunk")
parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di chunks/)")
_max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
parser.add_argument(
"--max", type=int, default=MAX_CHARS,
help=f"Soglia massima caratteri per lo split (default: {MAX_CHARS})"
"--max", type=int, default=_max_def,
help=f"Soglia massima caratteri per lo split (default: TARGET×(1+TOL) = {_max_def})"
)
parser.add_argument(
"--dry-run", action="store_true",
+24 -12
View File
@@ -20,11 +20,16 @@ import re
import sys
from pathlib import Path
_HERE = Path(__file__).resolve().parent
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
import config as cfg
# ─── Soglie ───────────────────────────────────────────────────────────────────
MIN_CHARS = 200
MAX_CHARS = 800
# ─── Soglie (derivate dal target, sovrascrivibili da CLI) ────────────────────
MIN_CHARS = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
PUNCT_END = re.compile(
r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$"
r"|/$" # URL che finisce con /
@@ -51,7 +56,7 @@ def is_too_short(chunk: dict, min_chars: int) -> bool:
def is_too_long(chunk: dict, max_chars: int) -> bool:
return chunk.get("n_chars", 0) > max_chars * 1.5
return chunk.get("n_chars", 0) > max_chars
def ends_incomplete(chunk: dict) -> bool:
@@ -72,7 +77,7 @@ def ends_incomplete(chunk: dict) -> bool:
def is_math_incomplete(chunk: dict) -> bool:
"""Incompleto ma in contesto matematico — degrada a warning invece di blocker."""
return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= 3
return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= cfg.MATH_SYMS_MIN
# ─── Report ───────────────────────────────────────────────────────────────────
@@ -170,12 +175,12 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -
if too_long:
has_errors = True
print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX_CHARS×1.5 ({int(max_chars * 1.5)}):")
print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX ({max_chars}):")
for c in too_long[:5]:
print(_fmt_chunk(c))
if len(too_long) > 5:
print(f" ... e altri {len(too_long) - 5}")
print(f"Soluzione: alza MAX_CHARS o verifica il testo nel MD")
print(f"Causa probabile: frasi singole lunghe (liste/paragrafi non suddivisibili)")
if incomplete:
has_errors = True
@@ -225,7 +230,12 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -
"max_chars": max_l,
"avg_chars": avg_l,
},
"thresholds": {"min_chars": min_chars, "max_chars": max_chars},
"thresholds": {
"min_chars": min_chars,
"max_chars": max_chars,
"target_chars": cfg.TARGET_CHARS,
"chunk_tolerance": cfg.CHUNK_TOLERANCE,
},
"blockers": {
"empty": [_chunk_entry(c) for c in empty_chunks],
"no_prefix": [_chunk_entry(c) for c in no_prefix],
@@ -301,13 +311,15 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Verifica chunk")
parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)")
_min_def = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE))
_max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE))
parser.add_argument(
"--min", type=int, default=MIN_CHARS,
help=f"Soglia minima caratteri (default: {MIN_CHARS})"
"--min", type=int, default=_min_def,
help=f"Soglia minima caratteri (default: TARGET×(1-TOL) = {_min_def})"
)
parser.add_argument(
"--max", type=int, default=MAX_CHARS,
help=f"Soglia massima caratteri (default: {MAX_CHARS})"
"--max", type=int, default=_max_def,
help=f"Soglia massima caratteri (default: TARGET×(1+TOL) = {_max_def})"
)
args = parser.parse_args()