#!/usr/bin/env python3 """ Step 5 — Chunking adattivo Divide il Markdown revisionato (step 4) in chunk semantici pronti per la vettorizzazione. La strategia dipende dal profilo strutturale del documento. Input: step-4//clean.md + step-4//structure_profile.json Output: step-5//chunks.json Uso: python step-5/chunker.py # tutti i documenti in step-4/ python step-5/chunker.py --stem documento # un solo documento python step-5/chunker.py --stem documento --force """ import argparse import json import re import sys from pathlib import Path # ─── Parametri ──────────────────────────────────────────────────────────────── MIN_CHARS = 200 # sotto questa soglia → accorpa al chunk successivo MAX_CHARS = 800 # sopra questa soglia → spezza su frasi OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary # ─── Utilità ────────────────────────────────────────────────────────────────── def split_sentences(text: str) -> list[str]: """ Divide il testo in frasi senza spezzare abbreviazioni comuni. Split su punteggiatura finale (.!?») seguita da spazio + lettera maiuscola. """ # Split conservativo: solo quando la punteggiatura è seguita da spazio # e la parola successiva inizia in maiuscolo (o è fine stringa). parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip()) # Se non trova nulla con maiuscola, usa split semplice if len(parts) <= 1: parts = re.split(r'(?<=[.!?»])\s+', text.strip()) return [p.strip() for p in parts if p.strip()] def slugify(s: str, max_len: int = 60) -> str: """Converti una stringa in slug per chunk_id.""" s = s.lower() s = re.sub(r'[^\w\s-]', '', s) s = re.sub(r'[\s_-]+', '_', s).strip('_') return s[:max_len] if s else "section" def make_sub_chunks( body: str, prefix: str, sezione: str, titolo: str, max_chars: int, overlap_s: int, ) -> list[dict]: """ Suddivide un body in sotto-chunk rispettando max_chars. Aggiunge overlap_s frasi di overlap tra sotto-chunk consecutivi. Non attraversa mai i confini del body. """ sentences = split_sentences(body) if not sentences: return [] chunks = [] current: list[str] = [] current_len = 0 sub_index = 0 i = 0 while i < len(sentences): sent = sentences[i] # +1 per lo spazio di separazione if not current or current_len + len(sent) + 1 <= max_chars: current.append(sent) current_len += len(sent) + (1 if len(current) > 1 else 0) i += 1 else: # Flush del chunk corrente chunk_text = prefix + " ".join(current) chunks.append({ "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", "text": chunk_text, "sezione": sezione, "titolo": titolo, "sub_index": sub_index, "n_chars": len(chunk_text), }) sub_index += 1 # Overlap: riparti dalle ultime overlap_s frasi overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] current = overlap[:] current_len = sum(len(s) + 1 for s in current) # Flush delle frasi rimanenti if current: chunk_text = prefix + " ".join(current) chunks.append({ "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", "text": chunk_text, "sezione": sezione, "titolo": titolo, "sub_index": sub_index, "n_chars": len(chunk_text), }) return chunks # ─── Parser Markdown ────────────────────────────────────────────────────────── def parse_h3_sections(text: str) -> list[dict]: """ Parsa il documento in sezioni (sezione h2, titolo h3, body). Testo prima del primo header viene assegnato a sezione vuota. """ sections = [] current_h2 = "" current_h3 = "" current_body_lines: list[str] = [] def flush(): body = "\n".join(current_body_lines).strip() if body: sections.append({ "sezione": current_h2, "titolo": current_h3, "body": body, }) for line in text.splitlines(): if re.match(r"^# ", line): # h1 = titolo documento, non crea sezione flush() current_h2 = line[2:].strip() current_h3 = "" current_body_lines = [] elif re.match(r"^## ", line): flush() current_h2 = line[3:].strip() current_h3 = "" current_body_lines = [] elif re.match(r"^### ", line): flush() current_h3 = line[4:].strip() current_body_lines = [] else: current_body_lines.append(line) flush() return sections def parse_h2_sections(text: str) -> list[dict]: """Parsa il documento in sezioni h2 con il loro testo completo.""" sections = [] current_h2 = "" current_body_lines: list[str] = [] def flush(): body = "\n".join(current_body_lines).strip() if body: sections.append({"sezione": current_h2, "body": body}) for line in text.splitlines(): if re.match(r"^## ", line): flush() current_h2 = line[3:].strip() current_body_lines = [] elif re.match(r"^# ", line): flush() current_h2 = line[2:].strip() current_body_lines = [] else: current_body_lines.append(line) flush() return sections # ─── Strategie di chunking ──────────────────────────────────────────────────── def chunk_h3_aware(text: str, stem: str) -> list[dict]: """ Strategia h3_aware: boundary su ###. Sezioni piccole (< MIN_CHARS) vengono accorpate alla successiva purché appartengano allo stesso ## padre. Sezioni grandi (> MAX_CHARS) vengono suddivise su frasi. """ sections = parse_h3_sections(text) # Merge greedy: accorpa al successivo se stesso h2 e body piccolo merged: list[dict] = [] pending: dict | None = None for sec in sections: if pending is None: pending = dict(sec) continue if (pending["sezione"] == sec["sezione"] and len(pending["body"]) < MIN_CHARS): sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]])) pending = { "sezione": pending["sezione"], "titolo": sep_title or pending["titolo"], "body": pending["body"] + "\n\n" + sec["body"], } else: merged.append(pending) pending = dict(sec) if pending: merged.append(pending) # Genera chunk con eventuale split su frasi chunks = [] for sec in merged: sezione = sec["sezione"] or stem titolo = sec["titolo"] or "" body = sec["body"] prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n" sub = make_sub_chunks(body, prefix, sezione, titolo, MAX_CHARS, OVERLAP_S) chunks.extend(sub) return chunks def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: """ Strategia h2_paragraph_split: boundary su ##. All'interno di ogni ## i paragrafi vengono usati come sotto-unità. """ sections = parse_h2_sections(text) chunks = [] for sec in sections: sezione = sec["sezione"] or stem body = sec["body"] prefix = f"[{sezione}]\n" # Suddividi in paragrafi interni (righe vuote doppie) paragraphs = [ p.strip() for p in re.split(r"\n{2,}", body) if p.strip() and not re.match(r"^#+\s", p.strip()) ] # Merge paragrafi piccoli merged_pars: list[str] = [] pending = "" for par in paragraphs: if pending and len(pending) < MIN_CHARS: pending = pending + "\n\n" + par else: if pending: merged_pars.append(pending) pending = par if pending: merged_pars.append(pending) for idx, par in enumerate(merged_pars): sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", MAX_CHARS, OVERLAP_S) for c in sub: c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}" chunks.extend(sub) return chunks def chunk_paragraph(text: str, stem: str) -> list[dict]: """ Strategia paragraph: boundary su paragrafo (doppia riga vuota). """ paragraphs = [ p.strip() for p in re.split(r"\n{2,}", text) if p.strip() and not re.match(r"^#+\s", p.strip()) ] prefix = f"[Documento: {stem}]\n" # Merge paragrafi piccoli merged: list[str] = [] pending = "" for par in paragraphs: if pending and len(pending) < MIN_CHARS: pending = pending + "\n\n" + par else: if pending: merged.append(pending) pending = par if pending: merged.append(pending) chunks = [] for idx, par in enumerate(merged): sub = make_sub_chunks(par, prefix, stem, f"par{idx}", MAX_CHARS, OVERLAP_S) for c in sub: c["chunk_id"] = f"para__{idx}__s{c['sub_index']}" chunks.extend(sub) return chunks def chunk_sliding_window(text: str, stem: str) -> list[dict]: """ Strategia sliding_window: finestre di MAX_CHARS con OVERLAP_S frasi di overlap. Usata per testi piatti senza struttura (livello 0). """ sentences = split_sentences(text) prefix = f"[Documento: {stem}]\n" chunks = [] i = 0 win_idx = 0 while i < len(sentences): window: list[str] = [] cur_len = 0 j = i while j < len(sentences): s = sentences[j] if window and cur_len + len(s) + 1 > MAX_CHARS: break window.append(s) cur_len += len(s) + (1 if len(window) > 1 else 0) j += 1 if not window: window = [sentences[i]] j = i + 1 chunk_text = prefix + " ".join(window) chunks.append({ "chunk_id": f"win__{win_idx}", "text": chunk_text, "sezione": stem, "titolo": f"finestra {win_idx}", "sub_index": win_idx, "n_chars": len(chunk_text), }) win_idx += 1 # Avanza di (window_size - overlap), almeno 1 i += max(1, len(window) - OVERLAP_S) return chunks # ─── Dispatcher ─────────────────────────────────────────────────────────────── _STRATEGIES: dict[str, callable] = { "h3_aware": chunk_h3_aware, "h2_paragraph_split": chunk_h2_paragraph_split, "paragraph": chunk_paragraph, "sliding_window": chunk_sliding_window, } def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]: text = clean_md.read_text(encoding="utf-8") strategia = profile.get("strategia_chunking", "paragraph") fn = _STRATEGIES.get(strategia, chunk_paragraph) return fn(text, stem) # ─── Per-document processing ────────────────────────────────────────────────── def process_stem(stem: str, project_root: Path, force: bool) -> bool: step4_dir = project_root / "step-4" / stem out_dir = project_root / "step-5" / stem clean_md = step4_dir / "clean.md" profile_path = step4_dir / "structure_profile.json" out_file = out_dir / "chunks.json" print(f"\nDocumento: {stem}") if not clean_md.exists(): print(f" ✗ clean.md non trovato in step-4/{stem}/ — skip") return False if not profile_path.exists(): print(f" ✗ structure_profile.json non trovato in step-4/{stem}/ — skip") return False if out_file.exists() and not force: print(f" ⚠️ chunks.json già presente — skip") print(f" (usa --force per rieseguire)") return True profile = json.loads(profile_path.read_text(encoding="utf-8")) strategia = profile.get("strategia_chunking", "paragraph") print(f" Strategia: {strategia}") chunks = chunk_document(clean_md, profile, stem) if not chunks: print(f" ✗ Nessun chunk generato — controlla clean.md") return False out_dir.mkdir(parents=True, exist_ok=True) out_file.write_text( json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" ) lengths = [c["n_chars"] for c in chunks] min_c = min(lengths) max_c = max(lengths) avg_c = int(sum(lengths) / len(lengths)) short = sum(1 for l in lengths if l < MIN_CHARS) long_ = sum(1 for l in lengths if l > MAX_CHARS * 1.5) print(f" Chunk totali: {len(chunks)}") print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char") if short: print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})") if long_: print(f" ⚠️ {long_} chunk sopra MAX_CHARS×1.5 ({int(MAX_CHARS * 1.5)})") print(f" ✅ chunks.json salvato in step-5/{stem}/") return True # ─── Entry point ───────────────────────────────────────────────────────────── if __name__ == "__main__": project_root = Path(__file__).parent.parent parser = argparse.ArgumentParser(description="Step 5 — Chunking adattivo") parser.add_argument("--stem", help="Nome del documento (sottocartella di step-4/)") parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") args = parser.parse_args() if args.stem: stems = [args.stem] else: step4_dir = project_root / "step-4" if not step4_dir.exists(): print(f"Errore: cartella step-4/ non trovata in {project_root}") sys.exit(1) stems = sorted(p.name for p in step4_dir.iterdir() if p.is_dir()) if not stems: print(f"Errore: nessun documento trovato in step-4/") sys.exit(1) results = [process_stem(s, project_root, args.force) for s in stems] ok = sum(results) total = len(results) print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti processati") sys.exit(0 if all(results) else 1)