diff --git a/chunks/chunker.py b/chunks/chunker.py index 188d95a..2a07f57 100644 --- a/chunks/chunker.py +++ b/chunks/chunker.py @@ -20,12 +20,10 @@ import re import sys from pathlib import Path - -# ─── Parametri ──────────────────────────────────────────────────────────────── - -MIN_CHARS = 200 # sotto questa soglia → accorpa al chunk successivo -MAX_CHARS = 800 # sopra questa soglia → spezza su frasi -OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary +_HERE = Path(__file__).resolve().parent +if str(_HERE) not in sys.path: + sys.path.insert(0, str(_HERE)) +import config as cfg # ─── Utilità ────────────────────────────────────────────────────────────────── @@ -44,73 +42,106 @@ def slugify(s: str, max_len: int = 60) -> str: return s[:max_len] if s else "section" -_SENT_BOUNDARY = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d/:|\u2026]$") +def _is_table_block(text: str) -> bool: + """True se il testo è prevalentemente una tabella Markdown (≥50% righe con |).""" + lines = [l for l in text.strip().splitlines() if l.strip()] + if not lines: + return False + table_lines = sum(1 for l in lines if l.strip().startswith("|")) + return table_lines / len(lines) >= 0.5 -def _flush_chunk( - current: list[str], - sentences: list[str], - i: int, - prefix: str, - sezione: str, - titolo: str, - sub_index: int, - max_chars: int, -) -> tuple[dict, list[str], int, int]: - """Emette un chunk, estendendo fino a un confine di frase (max +20%).""" - hard_limit = int(max_chars * 1.2) - current_len = sum(len(s) + 1 for s in current) - while i < len(sentences) and not _SENT_BOUNDARY.search(" ".join(current)): - nxt = sentences[i] - if current_len + len(nxt) + 1 > hard_limit: - break - current.append(nxt) - current_len += len(nxt) + 1 - i += 1 - chunk_text = prefix + " ".join(current) - chunk = { - "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", - "text": chunk_text, - "sezione": sezione, - "titolo": titolo, - "sub_index": sub_index, - "n_chars": len(chunk_text), - } - return chunk, current, i, sub_index + 1 +def _ov(strategy: str) -> tuple[int, float, int]: + """Legge (target_chars, tolerance, overlap) dagli override di strategia.""" + ov = cfg.STRATEGY_OVERRIDES.get(strategy, {}) + target = ov.get("target_chars", cfg.TARGET_CHARS) + tolerance = ov.get("tolerance", cfg.CHUNK_TOLERANCE) + overlap = ov.get("overlap", cfg.OVERLAP_SENTENCES) + return target, tolerance, overlap +# ─── Core: split in sotto-chunk orientato al target ─────────────────────────── + def make_sub_chunks( body: str, prefix: str, sezione: str, titolo: str, - max_chars: int, + target: int, + tolerance: float, overlap_s: int, ) -> list[dict]: + """Divide body in chunk il più vicini possibile a `target` char. + + Logica: + lower = target × (1 − tolerance) → soglia minima per emettere + upper = target × (1 + tolerance) → limite massimo + + Si accumulano frasi intere finché la successiva farebbe superare `upper`. + A quel punto si emette (siamo vicini al target) e si riparte con overlap. + Ogni chunk termina sempre su un confine di frase; non attraversa mai + il boundary dell'header corrente. + """ + if cfg.PROTECT_TABLES and _is_table_block(body): + chunk_text = prefix + body + return [{ + "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s0", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "sub_index": 0, + "n_chars": len(chunk_text), + }] + + # Soglia calcolata sul corpo (n_chars finale = prefix_len + body_len). + prefix_len = len(prefix) + upper_body = max(1, int(target * (1 + tolerance)) - prefix_len) + sentences = split_sentences(body) if not sentences: return [] - chunks = [] + chunks: list[dict] = [] current: list[str] = [] current_len = 0 sub_index = 0 - i = 0 - while i < len(sentences): - sent = sentences[i] - if not current or current_len + len(sent) + 1 <= max_chars: + def _emit() -> None: + nonlocal current, current_len, sub_index + chunk_text = prefix + " ".join(current) + chunks.append({ + "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "sub_index": sub_index, + "n_chars": len(chunk_text), + }) + overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] + current = overlap[:] + # Lunghezza corretta dell'overlap (n-1 spazi tra n frasi). + current_len = sum(len(s) for s in current) + max(0, len(current) - 1) + sub_index += 1 + + for sent in sentences: + sep = 1 if current else 0 + new_len = current_len + sep + len(sent) + + if new_len <= upper_body: + # Ancora entro il limite del corpo: aggiungi e continua. current.append(sent) - current_len += len(sent) + (1 if len(current) > 1 else 0) - i += 1 + current_len = new_len + elif current: + # La frase successiva sfora il limite: emetti il chunk corrente + # (che termina su frase completa) poi inizia il nuovo con questa frase. + _emit() + current.append(sent) + current_len += (1 if current[:-1] else 0) + len(sent) else: - chunk, current, i, sub_index = _flush_chunk( - current, sentences, i, prefix, sezione, titolo, sub_index, max_chars - ) - chunks.append(chunk) - overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] - current = overlap[:] - current_len = sum(len(s) + 1 for s in current) + # Chunk vuoto: la singola frase supera già il limite — emettiamo così com'è. + current.append(sent) + current_len = len(sent) + _emit() if current: chunk_text = prefix + " ".join(current) @@ -194,6 +225,9 @@ def parse_h2_sections(text: str) -> list[dict]: # ─── Strategie di chunking ──────────────────────────────────────────────────── def chunk_h3_aware(text: str, stem: str) -> list[dict]: + target, tolerance, overlap = _ov("h3_aware") + lower = int(target * (1 - tolerance)) + sections = parse_h3_sections(text) merged: list[dict] = [] @@ -205,7 +239,7 @@ def chunk_h3_aware(text: str, stem: str) -> list[dict]: continue if (pending["sezione"] == sec["sezione"] - and len(pending["body"]) < MIN_CHARS): + and len(pending["body"]) < lower): sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]])) pending = { "sezione": pending["sezione"], @@ -222,24 +256,25 @@ def chunk_h3_aware(text: str, stem: str) -> list[dict]: chunks = [] for sec in merged: sezione = sec["sezione"] or stem - titolo = sec["titolo"] or "" - body = sec["body"] - - prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n" - sub = make_sub_chunks(body, prefix, sezione, titolo, MAX_CHARS, OVERLAP_S) - chunks.extend(sub) + titolo = sec["titolo"] or "" + body = sec["body"] + prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n" + chunks.extend(make_sub_chunks(body, prefix, sezione, titolo, target, tolerance, overlap)) return chunks def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: + target, tolerance, overlap = _ov("h2_paragraph_split") + lower = int(target * (1 - tolerance)) + sections = parse_h2_sections(text) chunks = [] for sec in sections: sezione = sec["sezione"] or stem - body = sec["body"] - prefix = f"[{sezione}]\n" + body = sec["body"] + prefix = f"[{sezione}]\n" paragraphs = [ p.strip() @@ -250,7 +285,7 @@ def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: merged_pars: list[str] = [] pending = "" for par in paragraphs: - if pending and len(pending) < MIN_CHARS: + if pending and len(pending) < lower: pending = pending + "\n\n" + par else: if pending: @@ -260,7 +295,7 @@ def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: merged_pars.append(pending) for idx, par in enumerate(merged_pars): - sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", MAX_CHARS, OVERLAP_S) + sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", target, tolerance, overlap) for c in sub: c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}" chunks.extend(sub) @@ -269,6 +304,9 @@ def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: def chunk_paragraph(text: str, stem: str) -> list[dict]: + target, tolerance, overlap = _ov("paragraph") + lower = int(target * (1 - tolerance)) + paragraphs = [ p.strip() for p in re.split(r"\n{2,}", text) @@ -279,7 +317,7 @@ def chunk_paragraph(text: str, stem: str) -> list[dict]: merged: list[str] = [] pending = "" for par in paragraphs: - if pending and len(pending) < MIN_CHARS: + if pending and len(pending) < lower: pending = pending + "\n\n" + par else: if pending: @@ -290,7 +328,7 @@ def chunk_paragraph(text: str, stem: str) -> list[dict]: chunks = [] for idx, par in enumerate(merged): - sub = make_sub_chunks(par, prefix, stem, f"par{idx}", MAX_CHARS, OVERLAP_S) + sub = make_sub_chunks(par, prefix, stem, f"par{idx}", target, tolerance, overlap) for c in sub: c["chunk_id"] = f"para__{idx}__s{c['sub_index']}" chunks.extend(sub) @@ -299,6 +337,9 @@ def chunk_paragraph(text: str, stem: str) -> list[dict]: def chunk_sliding_window(text: str, stem: str) -> list[dict]: + target, tolerance, overlap = _ov("sliding_window") + upper = int(target * (1 + tolerance)) + sentences = split_sentences(text) prefix = f"[Documento: {stem}]\n" @@ -313,10 +354,11 @@ def chunk_sliding_window(text: str, stem: str) -> list[dict]: j = i while j < len(sentences): s = sentences[j] - if window and cur_len + len(s) + 1 > MAX_CHARS: + sep = 1 if window else 0 + if window and cur_len + sep + len(s) > upper: break window.append(s) - cur_len += len(s) + (1 if len(window) > 1 else 0) + cur_len += sep + len(s) j += 1 if not window: @@ -333,7 +375,7 @@ def chunk_sliding_window(text: str, stem: str) -> list[dict]: "n_chars": len(chunk_text), }) win_idx += 1 - i += max(1, len(window) - OVERLAP_S) + i += max(1, len(window) - overlap) return chunks @@ -341,28 +383,28 @@ def chunk_sliding_window(text: str, stem: str) -> list[dict]: # ─── Dispatcher ─────────────────────────────────────────────────────────────── _STRATEGIES: dict[str, callable] = { - "h3_aware": chunk_h3_aware, - "h2_paragraph_split": chunk_h2_paragraph_split, - "paragraph": chunk_paragraph, - "sliding_window": chunk_sliding_window, + "h3_aware": chunk_h3_aware, + "h2_paragraph_split": chunk_h2_paragraph_split, + "paragraph": chunk_paragraph, + "sliding_window": chunk_sliding_window, } def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]: - text = clean_md.read_text(encoding="utf-8") + text = clean_md.read_text(encoding="utf-8") strategia = profile.get("strategia_chunking", "paragraph") - fn = _STRATEGIES.get(strategia, chunk_paragraph) + fn = _STRATEGIES.get(strategia, chunk_paragraph) return fn(text, stem) # ─── Per-document processing ────────────────────────────────────────────────── def process_stem(stem: str, project_root: Path, force: bool) -> bool: - conv_dir = project_root / "conversione" / stem - out_dir = project_root / "chunks" / stem - clean_md = conv_dir / "clean.md" + conv_dir = project_root / "conversione" / stem + out_dir = project_root / "chunks" / stem + clean_md = conv_dir / "clean.md" profile_path = conv_dir / "structure_profile.json" - out_file = out_dir / "chunks.json" + out_file = out_dir / "chunks.json" print(f"\nDocumento: {stem}") @@ -393,19 +435,25 @@ def process_stem(stem: str, project_root: Path, force: bool) -> bool: json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" ) - lengths = [c["n_chars"] for c in chunks] - min_c = min(lengths) - max_c = max(lengths) - avg_c = int(sum(lengths) / len(lengths)) - short = sum(1 for l in lengths if l < MIN_CHARS) - long_ = sum(1 for l in lengths if l > MAX_CHARS * 1.5) + target, tolerance, _ = _ov(strategia) + lower = int(target * (1 - tolerance)) + upper = int(target * (1 + tolerance)) + lengths = [c["n_chars"] for c in chunks] + min_c = min(lengths) + max_c = max(lengths) + avg_c = int(sum(lengths) / len(lengths)) + short = sum(1 for l in lengths if l < lower) + long_ = sum(1 for l in lengths if l > upper) + + print(f" Target: {target} char ±{int(tolerance*100)}% " + f"→ range [{lower}, {upper}]") print(f" Chunk totali: {len(chunks)}") print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char") if short: - print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})") + print(f" ⚠️ {short} chunk sotto lower ({lower})") if long_: - print(f" ⚠️ {long_} chunk sopra MAX_CHARS×1.5 ({int(MAX_CHARS * 1.5)})") + print(f" ⚠️ {long_} chunk sopra upper ({upper})") print(f" ✅ chunks.json salvato in chunks/{stem}/") return True diff --git a/chunks/config.py b/chunks/config.py new file mode 100644 index 0000000..0eb8e3d --- /dev/null +++ b/chunks/config.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +Parametri di configurazione della pipeline di chunking. + +Modifica questo file per cambiare il comportamento di chunker.py, +verify_chunks.py e fix_chunks.py senza toccare il codice applicativo. +""" + +# ─── Grandezza target dei chunk ─────────────────────────────────────────────── +# +# TARGET_CHARS è la dimensione ideale a cui il chunker mira. +# CHUNK_TOLERANCE è la tolleranza relativa (es. 0.25 = ±25%). +# +# range accettabile = [TARGET × (1 − TOL), TARGET × (1 + TOL)] +# +# Con TARGET=600 e TOL=0.25 → ogni chunk sarà tra 450 e 750 char, +# il più vicino possibile a 600, terminando sempre su un confine di frase. +# +TARGET_CHARS = 600 +CHUNK_TOLERANCE = 0.25 + +# ─── Overlap ────────────────────────────────────────────────────────────────── + +# Numero di frasi ripetute all'inizio del chunk successivo per preservare +# il contesto tra chunk adiacenti della stessa sezione. +OVERLAP_SENTENCES = 1 + +# ─── Soglie di validazione ──────────────────────────────────────────────────── + +# fix_chunks.py spezza un chunk "too_long" solo se supera upper × questo fattore. +# Es. upper=750, fattore=1.5 → split solo per chunk > 1125 char. +# Chunk in [upper, upper×fattore] restano come warning non bloccanti. +SPLIT_THRESHOLD_FACTOR = 1.5 + +MATH_SYMS_MIN = 3 # min. simboli math per declassare incomplete → incomplete_math + +# ─── Pattern e formato ──────────────────────────────────────────────────────── + +SENTENCE_SPLIT_PATTERN = r"(?<=[.!?»])\s+" +PREFIX_TEMPLATE = "[{sezione} > {titolo}]" + +# ─── Protezione contenuti speciali ──────────────────────────────────────────── + +# Se True, un blocco prevalentemente tabella Markdown (≥50% righe |…|) +# viene emesso come chunk atomico senza sentence-splitting. +PROTECT_TABLES = True + +# Riservato — blocchi LaTeX non spezzabili (implementazione futura). +PROTECT_MATH = True + +# ─── Fix behavior ───────────────────────────────────────────────────────────── + +# Numero massimo di iterazioni del loop fix → verify → fix. +# Con 1 si ottiene il comportamento originale (fix singolo senza re-verifica). +FIX_MAX_ITERATIONS = 3 + +# ─── Override per strategia ─────────────────────────────────────────────────── +# +# Sovrascrivono TARGET_CHARS / CHUNK_TOLERANCE / OVERLAP_SENTENCES +# per la specifica strategia indicata in structure_profile.json. +# Chiavi riconosciute: "target_chars", "tolerance", "overlap". +# +STRATEGY_OVERRIDES: dict[str, dict] = { + "h3_aware": { + # Documenti strutturati H2→H3: chunk medi, overlap moderato. + "target_chars": 600, + "tolerance": 0.25, + "overlap": 2, + }, + "h2_paragraph_split": { + # Documenti piatti (solo H2): chunk più ampi, overlap ridotto. + "target_chars": 800, + "tolerance": 0.25, + "overlap": 1, + }, + "paragraph": { + # Documenti senza header significativi: chunk più corti. + "target_chars": 500, + "tolerance": 0.30, + "overlap": 1, + }, + "sliding_window": { + # Testo lineare/narrativo: finestre ampie, overlap generoso. + "target_chars": 800, + "tolerance": 0.25, + "overlap": 3, + }, +} diff --git a/chunks/fix_chunks.py b/chunks/fix_chunks.py index e817e51..794dc2b 100644 --- a/chunks/fix_chunks.py +++ b/chunks/fix_chunks.py @@ -26,7 +26,13 @@ import re import sys from pathlib import Path -MAX_CHARS = 800 +_HERE = Path(__file__).resolve().parent +if str(_HERE) not in sys.path: + sys.path.insert(0, str(_HERE)) +import config as cfg +from verify_chunks import verify_stem as _verify_stem + +MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$") @@ -53,7 +59,11 @@ def _rebuild_text(chunk: dict, body: str) -> str: return f"{_prefix(chunk)}\n{body}" +_SENT_END = re.compile(r'[.!?»)\]\'"’”…]') + + def _split_at_boundary(text: str, max_chars: int) -> list[str]: + """Spezza text in parti ≤ max_chars, ciascuna terminante su punteggiatura.""" if len(text) <= max_chars: return [text] @@ -62,20 +72,28 @@ def _split_at_boundary(text: str, max_chars: int) -> list[str]: while len(remaining) > max_chars: candidate = remaining[:max_chars] - split_pos = candidate.rfind("\n\n") - if split_pos == -1: - m = None - for m in re.finditer(r"[.!?»]\s+", candidate): - pass - split_pos = m.end() if m else None + # Cerca l'ultima punteggiatura finale entro max_chars. + last_punct = -1 + for m in _SENT_END.finditer(candidate): + last_punct = m.end() # posizione dopo il carattere di punteggiatura - if split_pos is None or split_pos == 0: - sp = remaining.find(" ", max_chars) - split_pos = sp if sp != -1 else len(remaining) + if last_punct > 0: + # Taglia dopo la punteggiatura; il resto inizia alla parola successiva. + first = remaining[:last_punct].rstrip() + remaining = remaining[last_punct:].lstrip() + else: + # Nessuna punteggiatura: taglia all'ultimo spazio disponibile. + sp = candidate.rfind(" ") + if sp > 0: + first = remaining[:sp].rstrip() + remaining = remaining[sp:].lstrip() + else: + first = remaining[:max_chars] + remaining = remaining[max_chars:] - parts.append(remaining[:split_pos].rstrip()) - remaining = remaining[split_pos:].lstrip() + if first: + parts.append(first) if remaining: parts.append(remaining) @@ -202,7 +220,15 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])} incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])} too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])} - too_long_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_long", [])} + + # Spezza solo chunk che superano upper × SPLIT_THRESHOLD_FACTOR, + # non quelli appena oltre upper (che causerebbero split con chunk incompleti). + _split_limit = max_chars * cfg.SPLIT_THRESHOLD_FACTOR + too_long_ids = { + e["chunk_id"] + for e in report.get("warnings", {}).get("too_long", []) + if e.get("n_chars", 0) > _split_limit + } ops: list[str] = [] if empty_ids: @@ -230,24 +256,54 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo n_before = len(chunks) - if empty_ids: - chunks, n = fix_empty(chunks, empty_ids) - print(f"\n 🗑 Rimossi {n} chunk vuoti.") + def _apply_fixes(chunks: list[dict], report: dict) -> list[dict]: + empty_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])} + no_prefix_ids_ = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])} + incomplete_ids_= {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])} + too_short_ids_ = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])} + too_long_ids_ = { + e["chunk_id"] + for e in report.get("warnings", {}).get("too_long", []) + if e.get("n_chars", 0) > max_chars * cfg.SPLIT_THRESHOLD_FACTOR + } - if no_prefix_ids: - chunks, n = fix_no_prefix(chunks, no_prefix_ids) - print(f" 🔧 Aggiunto prefisso a {n} chunk.") + if empty_ids_: + chunks, n = fix_empty(chunks, empty_ids_) + print(f" 🗑 Rimossi {n} chunk vuoti.") + if no_prefix_ids_: + chunks, n = fix_no_prefix(chunks, no_prefix_ids_) + print(f" 🔧 Aggiunto prefisso a {n} chunk.") + merge_ids_ = incomplete_ids_ | too_short_ids_ + if merge_ids_: + chunks, n = fix_incomplete_and_short(chunks, merge_ids_) + print(f" 🔗 Fusi {n} chunk (incompleti + corti).") + if too_long_ids_: + chunks, n = fix_too_long(chunks, too_long_ids_, max_chars) + print(f" ✂️ Spezzati {n} chunk lunghi.") + return renumber_ids(chunks) - merge_ids = incomplete_ids | too_short_ids - if merge_ids: - chunks, n = fix_incomplete_and_short(chunks, merge_ids) - print(f" 🔗 Fusi {n} chunk (incompleti + corti).") + chunks = _apply_fixes(chunks, report) - if too_long_ids: - chunks, n = fix_too_long(chunks, too_long_ids, max_chars) - print(f" ✂️ Spezzati {n} chunk lunghi.") - - chunks = renumber_ids(chunks) + for iteration in range(1, cfg.FIX_MAX_ITERATIONS): + chunks_path.write_text( + json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" + ) + project_root = chunks_path.parent.parent.parent + _min = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) + _max = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) + _verify_stem(stem, project_root, _min, _max) + report = json.loads(report_path.read_text(encoding="utf-8")) + new_verdict = report.get("verdict", "ok") + if new_verdict in ("ok", "warnings_only"): + break + remaining_blockers = sum( + len(v) for v in report.get("blockers", {}).values() + ) + if remaining_blockers == 0: + break + print(f"\n Iterazione {iteration + 1}/{cfg.FIX_MAX_ITERATIONS} " + f"— {remaining_blockers} bloccer residui:") + chunks = _apply_fixes(chunks, report) n_after = len(chunks) print(f"\n Totale chunk: {n_before} → {n_after}") @@ -256,8 +312,15 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" ) print(f" ✅ Salvato: chunks/{stem}/chunks.json") - print(f"\n Riesegui la verifica:") - print(f" python chunks/verify_chunks.py --stem {stem}") + + final_verdict = report.get("verdict", "?") + if final_verdict == "ok": + print(f" ✅ Verdict finale: ok — procedi alla vettorizzazione.") + elif final_verdict == "warnings_only": + print(f" 🟡 Verdict finale: warnings_only — puoi procedere.") + else: + print(f" 🔴 Verdict finale: {final_verdict} — rilancia la verifica manualmente:") + print(f" python chunks/verify_chunks.py --stem {stem}") return True @@ -269,9 +332,10 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fix chunk") parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di chunks/)") + _max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) parser.add_argument( - "--max", type=int, default=MAX_CHARS, - help=f"Soglia massima caratteri per lo split (default: {MAX_CHARS})" + "--max", type=int, default=_max_def, + help=f"Soglia massima caratteri per lo split (default: TARGET×(1+TOL) = {_max_def})" ) parser.add_argument( "--dry-run", action="store_true", diff --git a/chunks/verify_chunks.py b/chunks/verify_chunks.py index d682748..7452baa 100644 --- a/chunks/verify_chunks.py +++ b/chunks/verify_chunks.py @@ -20,11 +20,16 @@ import re import sys from pathlib import Path +_HERE = Path(__file__).resolve().parent +if str(_HERE) not in sys.path: + sys.path.insert(0, str(_HERE)) +import config as cfg -# ─── Soglie ─────────────────────────────────────────────────────────────────── -MIN_CHARS = 200 -MAX_CHARS = 800 +# ─── Soglie (derivate dal target, sovrascrivibili da CLI) ──────────────────── + +MIN_CHARS = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) +MAX_CHARS = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) PUNCT_END = re.compile( r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$" r"|/$" # URL che finisce con / @@ -51,7 +56,7 @@ def is_too_short(chunk: dict, min_chars: int) -> bool: def is_too_long(chunk: dict, max_chars: int) -> bool: - return chunk.get("n_chars", 0) > max_chars * 1.5 + return chunk.get("n_chars", 0) > max_chars def ends_incomplete(chunk: dict) -> bool: @@ -72,7 +77,7 @@ def ends_incomplete(chunk: dict) -> bool: def is_math_incomplete(chunk: dict) -> bool: """Incompleto ma in contesto matematico — degrada a warning invece di blocker.""" - return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= 3 + return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= cfg.MATH_SYMS_MIN # ─── Report ─────────────────────────────────────────────────────────────────── @@ -170,12 +175,12 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - if too_long: has_errors = True - print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX_CHARS×1.5 ({int(max_chars * 1.5)}):") + print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX ({max_chars}):") for c in too_long[:5]: print(_fmt_chunk(c)) if len(too_long) > 5: print(f" ... e altri {len(too_long) - 5}") - print(f" → Soluzione: alza MAX_CHARS o verifica il testo nel MD") + print(f" → Causa probabile: frasi singole lunghe (liste/paragrafi non suddivisibili)") if incomplete: has_errors = True @@ -225,7 +230,12 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - "max_chars": max_l, "avg_chars": avg_l, }, - "thresholds": {"min_chars": min_chars, "max_chars": max_chars}, + "thresholds": { + "min_chars": min_chars, + "max_chars": max_chars, + "target_chars": cfg.TARGET_CHARS, + "chunk_tolerance": cfg.CHUNK_TOLERANCE, + }, "blockers": { "empty": [_chunk_entry(c) for c in empty_chunks], "no_prefix": [_chunk_entry(c) for c in no_prefix], @@ -301,13 +311,15 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Verifica chunk") parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)") + _min_def = int(cfg.TARGET_CHARS * (1 - cfg.CHUNK_TOLERANCE)) + _max_def = int(cfg.TARGET_CHARS * (1 + cfg.CHUNK_TOLERANCE)) parser.add_argument( - "--min", type=int, default=MIN_CHARS, - help=f"Soglia minima caratteri (default: {MIN_CHARS})" + "--min", type=int, default=_min_def, + help=f"Soglia minima caratteri (default: TARGET×(1-TOL) = {_min_def})" ) parser.add_argument( - "--max", type=int, default=MAX_CHARS, - help=f"Soglia massima caratteri (default: {MAX_CHARS})" + "--max", type=int, default=_max_def, + help=f"Soglia massima caratteri (default: TARGET×(1+TOL) = {_max_def})" ) args = parser.parse_args()