From 5126e0d97193abcf4ef13444db18ae39742ab60e Mon Sep 17 00:00:00 2001 From: Davide Grilli Date: Mon, 13 Apr 2026 13:36:53 +0200 Subject: [PATCH] step-5: add adaptive chunker chunker.py splits any revised Markdown (step-4) into RAG-ready chunks. Supports 4 strategies driven by structure_profile.json: h3_aware, h2_paragraph_split, paragraph, sliding_window. Respects MIN/MAX_CHARS and sentence-level overlap. Updates .gitignore and README paths. --- .gitignore | 3 + README.md | 16 +- step-5/chunker.py | 457 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 468 insertions(+), 8 deletions(-) create mode 100644 step-5/chunker.py diff --git a/.gitignore b/.gitignore index 4bc48a5..bd2b47d 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,6 @@ step-3/*/ step-4/*/ step-4/revision_log.md +# Output step-5 — chunk generati da chunker.py +step-5/*/ + diff --git a/README.md b/README.md index b3e36da..b9e7ba7 100644 --- a/README.md +++ b/README.md @@ -370,12 +370,12 @@ git commit -m "step-4: revisione nietzsche completata" ### Step 5 — Chunking adattivo **Tipo:** automatico -**Input:** `processed/documento/clean.md` + `structure_profile.json` -**Output:** `processed/documento/chunks.json` -**Script:** `scripts/chunker.py` +**Input:** `step-4//clean.md` + `step-4//structure_profile.json` +**Output:** `step-5//chunks.json` +**Script:** `step-5/chunker.py` ```bash -python scripts/chunker.py processed/documento/clean.md --save +python step-5/chunker.py --stem documento ``` Divide il Markdown pulito in chunk. Usa il profilo strutturale @@ -420,12 +420,12 @@ da solo sarebbe ambiguo. ### Step 6 — Verifica chunk **Tipo:** automatico -**Input:** `processed/documento/chunks.json` +**Input:** `step-5//chunks.json` **Output:** report problemi + statistiche **Script:** `scripts/verify_chunks.py` ```bash -python scripts/verify_chunks.py processed/documento/chunks.json +python scripts/verify_chunks.py step-5/documento/chunks.json ``` Analizza ogni chunk e segnala i problemi. Non corregge nulla. @@ -502,12 +502,12 @@ Ollama è sempre disponibile sul sistema. ### Step 8 — Vettorizzazione **Tipo:** automatico (lento) -**Input:** `processed/documento/chunks.json` +**Input:** `step-5//chunks.json` **Output:** `chroma_db/` popolato **Script:** `scripts/ingest.py` ```bash -python scripts/ingest.py processed/documento/chunks.json +python scripts/ingest.py step-5/documento/chunks.json ``` Trasforma ogni chunk in un vettore numerico e lo salva in ChromaDB. diff --git a/step-5/chunker.py b/step-5/chunker.py new file mode 100644 index 0000000..f8af623 --- /dev/null +++ b/step-5/chunker.py @@ -0,0 +1,457 @@ +#!/usr/bin/env python3 +""" +Step 5 — Chunking adattivo + +Divide il Markdown revisionato (step 4) in chunk semantici pronti per la +vettorizzazione. La strategia dipende dal profilo strutturale del documento. + +Input: step-4//clean.md + step-4//structure_profile.json +Output: step-5//chunks.json + +Uso: + python step-5/chunker.py # tutti i documenti in step-4/ + python step-5/chunker.py --stem documento # un solo documento + python step-5/chunker.py --stem documento --force +""" + +import argparse +import json +import re +import sys +from pathlib import Path + + +# ─── Parametri ──────────────────────────────────────────────────────────────── + +MIN_CHARS = 200 # sotto questa soglia → accorpa al chunk successivo +MAX_CHARS = 800 # sopra questa soglia → spezza su frasi +OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary + + +# ─── Utilità ────────────────────────────────────────────────────────────────── + +def split_sentences(text: str) -> list[str]: + """ + Divide il testo in frasi senza spezzare abbreviazioni comuni. + Split su punteggiatura finale (.!?») seguita da spazio + lettera maiuscola. + """ + # Split conservativo: solo quando la punteggiatura è seguita da spazio + # e la parola successiva inizia in maiuscolo (o è fine stringa). + parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip()) + # Se non trova nulla con maiuscola, usa split semplice + if len(parts) <= 1: + parts = re.split(r'(?<=[.!?»])\s+', text.strip()) + return [p.strip() for p in parts if p.strip()] + + +def slugify(s: str, max_len: int = 60) -> str: + """Converti una stringa in slug per chunk_id.""" + s = s.lower() + s = re.sub(r'[^\w\s-]', '', s) + s = re.sub(r'[\s_-]+', '_', s).strip('_') + return s[:max_len] if s else "section" + + +def make_sub_chunks( + body: str, + prefix: str, + sezione: str, + titolo: str, + max_chars: int, + overlap_s: int, +) -> list[dict]: + """ + Suddivide un body in sotto-chunk rispettando max_chars. + Aggiunge overlap_s frasi di overlap tra sotto-chunk consecutivi. + Non attraversa mai i confini del body. + """ + sentences = split_sentences(body) + if not sentences: + return [] + + chunks = [] + current: list[str] = [] + current_len = 0 + sub_index = 0 + + i = 0 + while i < len(sentences): + sent = sentences[i] + # +1 per lo spazio di separazione + if not current or current_len + len(sent) + 1 <= max_chars: + current.append(sent) + current_len += len(sent) + (1 if len(current) > 1 else 0) + i += 1 + else: + # Flush del chunk corrente + chunk_text = prefix + " ".join(current) + chunks.append({ + "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "sub_index": sub_index, + "n_chars": len(chunk_text), + }) + sub_index += 1 + # Overlap: riparti dalle ultime overlap_s frasi + overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] + current = overlap[:] + current_len = sum(len(s) + 1 for s in current) + + # Flush delle frasi rimanenti + if current: + chunk_text = prefix + " ".join(current) + chunks.append({ + "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "sub_index": sub_index, + "n_chars": len(chunk_text), + }) + + return chunks + + +# ─── Parser Markdown ────────────────────────────────────────────────────────── + +def parse_h3_sections(text: str) -> list[dict]: + """ + Parsa il documento in sezioni (sezione h2, titolo h3, body). + Testo prima del primo header viene assegnato a sezione vuota. + """ + sections = [] + current_h2 = "" + current_h3 = "" + current_body_lines: list[str] = [] + + def flush(): + body = "\n".join(current_body_lines).strip() + if body: + sections.append({ + "sezione": current_h2, + "titolo": current_h3, + "body": body, + }) + + for line in text.splitlines(): + if re.match(r"^# ", line): + # h1 = titolo documento, non crea sezione + flush() + current_h2 = line[2:].strip() + current_h3 = "" + current_body_lines = [] + elif re.match(r"^## ", line): + flush() + current_h2 = line[3:].strip() + current_h3 = "" + current_body_lines = [] + elif re.match(r"^### ", line): + flush() + current_h3 = line[4:].strip() + current_body_lines = [] + else: + current_body_lines.append(line) + + flush() + return sections + + +def parse_h2_sections(text: str) -> list[dict]: + """Parsa il documento in sezioni h2 con il loro testo completo.""" + sections = [] + current_h2 = "" + current_body_lines: list[str] = [] + + def flush(): + body = "\n".join(current_body_lines).strip() + if body: + sections.append({"sezione": current_h2, "body": body}) + + for line in text.splitlines(): + if re.match(r"^## ", line): + flush() + current_h2 = line[3:].strip() + current_body_lines = [] + elif re.match(r"^# ", line): + flush() + current_h2 = line[2:].strip() + current_body_lines = [] + else: + current_body_lines.append(line) + + flush() + return sections + + +# ─── Strategie di chunking ──────────────────────────────────────────────────── + +def chunk_h3_aware(text: str, stem: str) -> list[dict]: + """ + Strategia h3_aware: boundary su ###. + Sezioni piccole (< MIN_CHARS) vengono accorpate alla successiva + purché appartengano allo stesso ## padre. + Sezioni grandi (> MAX_CHARS) vengono suddivise su frasi. + """ + sections = parse_h3_sections(text) + + # Merge greedy: accorpa al successivo se stesso h2 e body piccolo + merged: list[dict] = [] + pending: dict | None = None + + for sec in sections: + if pending is None: + pending = dict(sec) + continue + + if (pending["sezione"] == sec["sezione"] + and len(pending["body"]) < MIN_CHARS): + sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]])) + pending = { + "sezione": pending["sezione"], + "titolo": sep_title or pending["titolo"], + "body": pending["body"] + "\n\n" + sec["body"], + } + else: + merged.append(pending) + pending = dict(sec) + + if pending: + merged.append(pending) + + # Genera chunk con eventuale split su frasi + chunks = [] + for sec in merged: + sezione = sec["sezione"] or stem + titolo = sec["titolo"] or "" + body = sec["body"] + + prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n" + sub = make_sub_chunks(body, prefix, sezione, titolo, MAX_CHARS, OVERLAP_S) + chunks.extend(sub) + + return chunks + + +def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: + """ + Strategia h2_paragraph_split: boundary su ##. + All'interno di ogni ## i paragrafi vengono usati come sotto-unità. + """ + sections = parse_h2_sections(text) + chunks = [] + + for sec in sections: + sezione = sec["sezione"] or stem + body = sec["body"] + prefix = f"[{sezione}]\n" + + # Suddividi in paragrafi interni (righe vuote doppie) + paragraphs = [ + p.strip() + for p in re.split(r"\n{2,}", body) + if p.strip() and not re.match(r"^#+\s", p.strip()) + ] + + # Merge paragrafi piccoli + merged_pars: list[str] = [] + pending = "" + for par in paragraphs: + if pending and len(pending) < MIN_CHARS: + pending = pending + "\n\n" + par + else: + if pending: + merged_pars.append(pending) + pending = par + if pending: + merged_pars.append(pending) + + for idx, par in enumerate(merged_pars): + sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", MAX_CHARS, OVERLAP_S) + for c in sub: + c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}" + chunks.extend(sub) + + return chunks + + +def chunk_paragraph(text: str, stem: str) -> list[dict]: + """ + Strategia paragraph: boundary su paragrafo (doppia riga vuota). + """ + paragraphs = [ + p.strip() + for p in re.split(r"\n{2,}", text) + if p.strip() and not re.match(r"^#+\s", p.strip()) + ] + prefix = f"[Documento: {stem}]\n" + + # Merge paragrafi piccoli + merged: list[str] = [] + pending = "" + for par in paragraphs: + if pending and len(pending) < MIN_CHARS: + pending = pending + "\n\n" + par + else: + if pending: + merged.append(pending) + pending = par + if pending: + merged.append(pending) + + chunks = [] + for idx, par in enumerate(merged): + sub = make_sub_chunks(par, prefix, stem, f"par{idx}", MAX_CHARS, OVERLAP_S) + for c in sub: + c["chunk_id"] = f"para__{idx}__s{c['sub_index']}" + chunks.extend(sub) + + return chunks + + +def chunk_sliding_window(text: str, stem: str) -> list[dict]: + """ + Strategia sliding_window: finestre di MAX_CHARS con OVERLAP_S frasi di overlap. + Usata per testi piatti senza struttura (livello 0). + """ + sentences = split_sentences(text) + prefix = f"[Documento: {stem}]\n" + + chunks = [] + i = 0 + win_idx = 0 + + while i < len(sentences): + window: list[str] = [] + cur_len = 0 + + j = i + while j < len(sentences): + s = sentences[j] + if window and cur_len + len(s) + 1 > MAX_CHARS: + break + window.append(s) + cur_len += len(s) + (1 if len(window) > 1 else 0) + j += 1 + + if not window: + window = [sentences[i]] + j = i + 1 + + chunk_text = prefix + " ".join(window) + chunks.append({ + "chunk_id": f"win__{win_idx}", + "text": chunk_text, + "sezione": stem, + "titolo": f"finestra {win_idx}", + "sub_index": win_idx, + "n_chars": len(chunk_text), + }) + win_idx += 1 + # Avanza di (window_size - overlap), almeno 1 + i += max(1, len(window) - OVERLAP_S) + + return chunks + + +# ─── Dispatcher ─────────────────────────────────────────────────────────────── + +_STRATEGIES: dict[str, callable] = { + "h3_aware": chunk_h3_aware, + "h2_paragraph_split": chunk_h2_paragraph_split, + "paragraph": chunk_paragraph, + "sliding_window": chunk_sliding_window, +} + + +def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]: + text = clean_md.read_text(encoding="utf-8") + strategia = profile.get("strategia_chunking", "paragraph") + fn = _STRATEGIES.get(strategia, chunk_paragraph) + return fn(text, stem) + + +# ─── Per-document processing ────────────────────────────────────────────────── + +def process_stem(stem: str, project_root: Path, force: bool) -> bool: + step4_dir = project_root / "step-4" / stem + out_dir = project_root / "step-5" / stem + clean_md = step4_dir / "clean.md" + profile_path = step4_dir / "structure_profile.json" + out_file = out_dir / "chunks.json" + + print(f"\nDocumento: {stem}") + + if not clean_md.exists(): + print(f" ✗ clean.md non trovato in step-4/{stem}/ — skip") + return False + if not profile_path.exists(): + print(f" ✗ structure_profile.json non trovato in step-4/{stem}/ — skip") + return False + + if out_file.exists() and not force: + print(f" ⚠️ chunks.json già presente — skip") + print(f" (usa --force per rieseguire)") + return True + + profile = json.loads(profile_path.read_text(encoding="utf-8")) + strategia = profile.get("strategia_chunking", "paragraph") + print(f" Strategia: {strategia}") + + chunks = chunk_document(clean_md, profile, stem) + + if not chunks: + print(f" ✗ Nessun chunk generato — controlla clean.md") + return False + + out_dir.mkdir(parents=True, exist_ok=True) + out_file.write_text( + json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" + ) + + lengths = [c["n_chars"] for c in chunks] + min_c = min(lengths) + max_c = max(lengths) + avg_c = int(sum(lengths) / len(lengths)) + short = sum(1 for l in lengths if l < MIN_CHARS) + long_ = sum(1 for l in lengths if l > MAX_CHARS * 1.5) + + print(f" Chunk totali: {len(chunks)}") + print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char") + if short: + print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})") + if long_: + print(f" ⚠️ {long_} chunk sopra MAX_CHARS×1.5 ({int(MAX_CHARS * 1.5)})") + print(f" ✅ chunks.json salvato in step-5/{stem}/") + return True + + +# ─── Entry point ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + project_root = Path(__file__).parent.parent + + parser = argparse.ArgumentParser(description="Step 5 — Chunking adattivo") + parser.add_argument("--stem", help="Nome del documento (sottocartella di step-4/)") + parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") + args = parser.parse_args() + + if args.stem: + stems = [args.stem] + else: + step4_dir = project_root / "step-4" + if not step4_dir.exists(): + print(f"Errore: cartella step-4/ non trovata in {project_root}") + sys.exit(1) + stems = sorted(p.name for p in step4_dir.iterdir() if p.is_dir()) + if not stems: + print(f"Errore: nessun documento trovato in step-4/") + sys.exit(1) + + results = [process_stem(s, project_root, args.force) for s in stems] + + ok = sum(results) + total = len(results) + print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti processati") + sys.exit(0 if all(results) else 1)