#!/usr/bin/env python3 """ Chunking adattivo Divide il Markdown revisionato in chunk semantici pronti per la vettorizzazione. La strategia dipende dal profilo strutturale del documento. Input: conversione//clean.md + conversione//structure_profile.json Output: chunks//chunks.json Uso: python chunks/chunker.py # tutti i documenti in conversione/ python chunks/chunker.py --stem documento # un solo documento python chunks/chunker.py --stem documento --force """ import argparse import json import re import sys from pathlib import Path _HERE = Path(__file__).resolve().parent if str(_HERE) not in sys.path: sys.path.insert(0, str(_HERE)) import config as cfg # ─── Utilità ────────────────────────────────────────────────────────────────── def split_sentences(text: str) -> list[str]: parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip()) if len(parts) <= 1: parts = re.split(r'(?<=[.!?»])\s+', text.strip()) return [p.strip() for p in parts if p.strip()] def slugify(s: str, max_len: int = 60) -> str: s = s.lower() s = re.sub(r'[^\w\s-]', '', s) s = re.sub(r'[\s_-]+', '_', s).strip('_') return s[:max_len] if s else "section" def _is_table_block(text: str) -> bool: """True se il testo è prevalentemente una tabella Markdown (≥50% righe con |).""" lines = [l for l in text.strip().splitlines() if l.strip()] if not lines: return False table_lines = sum(1 for l in lines if l.strip().startswith("|")) return table_lines / len(lines) >= 0.5 def _ov(strategy: str) -> tuple[int, float, int]: """Legge (target_chars, tolerance, overlap) dagli override di strategia.""" ov = cfg.STRATEGY_OVERRIDES.get(strategy, {}) target = ov.get("target_chars", cfg.TARGET_CHARS) tolerance = ov.get("tolerance", cfg.CHUNK_TOLERANCE) overlap = ov.get("overlap", cfg.OVERLAP_SENTENCES) return target, tolerance, overlap # ─── Core: split in sotto-chunk orientato al target ─────────────────────────── def make_sub_chunks( body: str, prefix: str, sezione: str, titolo: str, target: int, tolerance: float, overlap_s: int, ) -> list[dict]: """Divide body in chunk il più vicini possibile a `target` char. Logica: lower = target × (1 − tolerance) → soglia minima per emettere upper = target × (1 + tolerance) → limite massimo Si accumulano frasi intere finché la successiva farebbe superare `upper`. A quel punto si emette (siamo vicini al target) e si riparte con overlap. Ogni chunk termina sempre su un confine di frase; non attraversa mai il boundary dell'header corrente. """ if cfg.PROTECT_TABLES and _is_table_block(body): chunk_text = prefix + body return [{ "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s0", "text": chunk_text, "sezione": sezione, "titolo": titolo, "sub_index": 0, "n_chars": len(chunk_text), }] # Soglia calcolata sul corpo (n_chars finale = prefix_len + body_len). prefix_len = len(prefix) upper_body = max(1, int(target * (1 + tolerance)) - prefix_len) sentences = split_sentences(body) if not sentences: return [] chunks: list[dict] = [] current: list[str] = [] current_len = 0 sub_index = 0 def _emit() -> None: nonlocal current, current_len, sub_index chunk_text = prefix + " ".join(current) chunks.append({ "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", "text": chunk_text, "sezione": sezione, "titolo": titolo, "sub_index": sub_index, "n_chars": len(chunk_text), }) overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] current = overlap[:] # Lunghezza corretta dell'overlap (n-1 spazi tra n frasi). current_len = sum(len(s) for s in current) + max(0, len(current) - 1) sub_index += 1 for sent in sentences: sep = 1 if current else 0 new_len = current_len + sep + len(sent) if new_len <= upper_body: # Ancora entro il limite del corpo: aggiungi e continua. current.append(sent) current_len = new_len elif current: # La frase successiva sfora il limite: emetti il chunk corrente # (che termina su frase completa) poi inizia il nuovo con questa frase. _emit() current.append(sent) current_len += (1 if current[:-1] else 0) + len(sent) else: # Chunk vuoto: la singola frase supera già il limite — emettiamo così com'è. current.append(sent) current_len = len(sent) _emit() if current: chunk_text = prefix + " ".join(current) chunks.append({ "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", "text": chunk_text, "sezione": sezione, "titolo": titolo, "sub_index": sub_index, "n_chars": len(chunk_text), }) return chunks # ─── Parser Markdown ────────────────────────────────────────────────────────── def parse_h3_sections(text: str) -> list[dict]: sections = [] current_h2 = "" current_h3 = "" current_body_lines: list[str] = [] def flush(): body = "\n".join(current_body_lines).strip() if body: sections.append({ "sezione": current_h2, "titolo": current_h3, "body": body, }) for line in text.splitlines(): if re.match(r"^# ", line): flush() current_h2 = line[2:].strip() current_h3 = "" current_body_lines = [] elif re.match(r"^## ", line): flush() current_h2 = line[3:].strip() current_h3 = "" current_body_lines = [] elif re.match(r"^### ", line): flush() current_h3 = line[4:].strip() current_body_lines = [] else: current_body_lines.append(line) flush() return sections def parse_h2_sections(text: str) -> list[dict]: sections = [] current_h2 = "" current_body_lines: list[str] = [] def flush(): body = "\n".join(current_body_lines).strip() if body: sections.append({"sezione": current_h2, "body": body}) for line in text.splitlines(): if re.match(r"^## ", line): flush() current_h2 = line[3:].strip() current_body_lines = [] elif re.match(r"^# ", line): flush() current_h2 = line[2:].strip() current_body_lines = [] else: current_body_lines.append(line) flush() return sections # ─── Strategie di chunking ──────────────────────────────────────────────────── def chunk_h3_aware(text: str, stem: str) -> list[dict]: target, tolerance, overlap = _ov("h3_aware") lower = int(target * (1 - tolerance)) sections = parse_h3_sections(text) merged: list[dict] = [] pending: dict | None = None for sec in sections: if pending is None: pending = dict(sec) continue if (pending["sezione"] == sec["sezione"] and len(pending["body"]) < lower): sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]])) pending = { "sezione": pending["sezione"], "titolo": sep_title or pending["titolo"], "body": pending["body"] + "\n\n" + sec["body"], } else: merged.append(pending) pending = dict(sec) if pending: merged.append(pending) chunks = [] for sec in merged: sezione = sec["sezione"] or stem titolo = sec["titolo"] or "" body = sec["body"] prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n" chunks.extend(make_sub_chunks(body, prefix, sezione, titolo, target, tolerance, overlap)) return chunks def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: target, tolerance, overlap = _ov("h2_paragraph_split") lower = int(target * (1 - tolerance)) sections = parse_h2_sections(text) chunks = [] for sec in sections: sezione = sec["sezione"] or stem body = sec["body"] prefix = f"[{sezione}]\n" paragraphs = [ p.strip() for p in re.split(r"\n{2,}", body) if p.strip() and not re.match(r"^#+\s", p.strip()) ] merged_pars: list[str] = [] pending = "" for par in paragraphs: if pending and len(pending) < lower: pending = pending + "\n\n" + par else: if pending: merged_pars.append(pending) pending = par if pending: merged_pars.append(pending) for idx, par in enumerate(merged_pars): sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", target, tolerance, overlap) for c in sub: c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}" chunks.extend(sub) return chunks def chunk_paragraph(text: str, stem: str) -> list[dict]: target, tolerance, overlap = _ov("paragraph") lower = int(target * (1 - tolerance)) paragraphs = [ p.strip() for p in re.split(r"\n{2,}", text) if p.strip() and not re.match(r"^#+\s", p.strip()) ] prefix = f"[Documento: {stem}]\n" merged: list[str] = [] pending = "" for par in paragraphs: if pending and len(pending) < lower: pending = pending + "\n\n" + par else: if pending: merged.append(pending) pending = par if pending: merged.append(pending) chunks = [] for idx, par in enumerate(merged): sub = make_sub_chunks(par, prefix, stem, f"par{idx}", target, tolerance, overlap) for c in sub: c["chunk_id"] = f"para__{idx}__s{c['sub_index']}" chunks.extend(sub) return chunks def chunk_sliding_window(text: str, stem: str) -> list[dict]: target, tolerance, overlap = _ov("sliding_window") upper = int(target * (1 + tolerance)) sentences = split_sentences(text) prefix = f"[Documento: {stem}]\n" chunks = [] i = 0 win_idx = 0 while i < len(sentences): window: list[str] = [] cur_len = 0 j = i while j < len(sentences): s = sentences[j] sep = 1 if window else 0 if window and cur_len + sep + len(s) > upper: break window.append(s) cur_len += sep + len(s) j += 1 if not window: window = [sentences[i]] j = i + 1 chunk_text = prefix + " ".join(window) chunks.append({ "chunk_id": f"win__{win_idx}", "text": chunk_text, "sezione": stem, "titolo": f"finestra {win_idx}", "sub_index": win_idx, "n_chars": len(chunk_text), }) win_idx += 1 i += max(1, len(window) - overlap) return chunks # ─── Dispatcher ─────────────────────────────────────────────────────────────── _STRATEGIES: dict[str, callable] = { "h3_aware": chunk_h3_aware, "h2_paragraph_split": chunk_h2_paragraph_split, "paragraph": chunk_paragraph, "sliding_window": chunk_sliding_window, } def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]: text = clean_md.read_text(encoding="utf-8") strategia = profile.get("strategia_chunking", "paragraph") fn = _STRATEGIES.get(strategia, chunk_paragraph) return fn(text, stem) # ─── Per-document processing ────────────────────────────────────────────────── def process_stem(stem: str, project_root: Path, force: bool) -> bool: conv_dir = project_root / "conversione" / stem out_dir = project_root / "chunks" / stem clean_md = conv_dir / "clean.md" profile_path = conv_dir / "structure_profile.json" out_file = out_dir / "chunks.json" print(f"\nDocumento: {stem}") if not clean_md.exists(): print(f" ✗ clean.md non trovato in conversione/{stem}/ — skip") return False if not profile_path.exists(): print(f" ✗ structure_profile.json non trovato in conversione/{stem}/ — skip") return False if out_file.exists() and not force: print(f" ⚠️ chunks.json già presente — skip") print(f" (usa --force per rieseguire)") return True profile = json.loads(profile_path.read_text(encoding="utf-8")) strategia = profile.get("strategia_chunking", "paragraph") print(f" Strategia: {strategia}") chunks = chunk_document(clean_md, profile, stem) if not chunks: print(f" ✗ Nessun chunk generato — controlla clean.md") return False out_dir.mkdir(parents=True, exist_ok=True) out_file.write_text( json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" ) target, tolerance, _ = _ov(strategia) lower = int(target * (1 - tolerance)) upper = int(target * (1 + tolerance)) meta = {"strategy": strategia, "target_chars": target, "min_chars": lower, "max_chars": upper} (out_dir / "meta.json").write_text( json.dumps(meta, ensure_ascii=False), encoding="utf-8" ) lengths = [c["n_chars"] for c in chunks] min_c = min(lengths) max_c = max(lengths) avg_c = int(sum(lengths) / len(lengths)) short = sum(1 for l in lengths if l < lower) long_ = sum(1 for l in lengths if l > upper) print(f" Target: {target} char ±{int(tolerance*100)}% " f"→ range [{lower}, {upper}]") print(f" Chunk totali: {len(chunks)}") print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char") if short: print(f" ⚠️ {short} chunk sotto lower ({lower})") if long_: print(f" ⚠️ {long_} chunk sopra upper ({upper})") print(f" ✅ chunks.json salvato in chunks/{stem}/") return True # ─── Entry point ───────────────────────────────────────────────────────────── if __name__ == "__main__": project_root = Path(__file__).parent.parent parser = argparse.ArgumentParser(description="Chunking adattivo") parser.add_argument("--stem", help="Nome del documento (sottocartella di conversione/)") parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") args = parser.parse_args() if args.stem: stems = [args.stem] else: conv_dir = project_root / "conversione" if not conv_dir.exists(): print(f"Errore: cartella conversione/ non trovata in {project_root}") sys.exit(1) stems = sorted( p.name for p in conv_dir.iterdir() if p.is_dir() and (p / "clean.md").exists() ) if not stems: print(f"Errore: nessun documento trovato in conversione/") sys.exit(1) results = [process_stem(s, project_root, args.force) for s in stems] ok = sum(results) total = len(results) print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti processati") sys.exit(0 if all(results) else 1)