step-5: add adaptive chunker

chunker.py splits any revised Markdown (step-4) into RAG-ready chunks.
Supports 4 strategies driven by structure_profile.json: h3_aware,
h2_paragraph_split, paragraph, sliding_window. Respects MIN/MAX_CHARS
and sentence-level overlap. Updates .gitignore and README paths.
This commit is contained in:
2026-04-13 13:36:53 +02:00
parent 1631dff80d
commit 5126e0d971
3 changed files with 468 additions and 8 deletions
+3
View File
@@ -37,3 +37,6 @@ step-3/*/
step-4/*/
step-4/revision_log.md
# Output step-5 — chunk generati da chunker.py
step-5/*/
+8 -8
View File
@@ -370,12 +370,12 @@ git commit -m "step-4: revisione nietzsche completata"
### Step 5 — Chunking adattivo
**Tipo:** automatico
**Input:** `processed/documento/clean.md` + `structure_profile.json`
**Output:** `processed/documento/chunks.json`
**Script:** `scripts/chunker.py`
**Input:** `step-4/<stem>/clean.md` + `step-4/<stem>/structure_profile.json`
**Output:** `step-5/<stem>/chunks.json`
**Script:** `step-5/chunker.py`
```bash
python scripts/chunker.py processed/documento/clean.md --save
python step-5/chunker.py --stem documento
```
Divide il Markdown pulito in chunk. Usa il profilo strutturale
@@ -420,12 +420,12 @@ da solo sarebbe ambiguo.
### Step 6 — Verifica chunk
**Tipo:** automatico
**Input:** `processed/documento/chunks.json`
**Input:** `step-5/<stem>/chunks.json`
**Output:** report problemi + statistiche
**Script:** `scripts/verify_chunks.py`
```bash
python scripts/verify_chunks.py processed/documento/chunks.json
python scripts/verify_chunks.py step-5/documento/chunks.json
```
Analizza ogni chunk e segnala i problemi. Non corregge nulla.
@@ -502,12 +502,12 @@ Ollama è sempre disponibile sul sistema.
### Step 8 — Vettorizzazione
**Tipo:** automatico (lento)
**Input:** `processed/documento/chunks.json`
**Input:** `step-5/<stem>/chunks.json`
**Output:** `chroma_db/` popolato
**Script:** `scripts/ingest.py`
```bash
python scripts/ingest.py processed/documento/chunks.json
python scripts/ingest.py step-5/documento/chunks.json
```
Trasforma ogni chunk in un vettore numerico e lo salva in ChromaDB.
+457
View File
@@ -0,0 +1,457 @@
#!/usr/bin/env python3
"""
Step 5 — Chunking adattivo
Divide il Markdown revisionato (step 4) in chunk semantici pronti per la
vettorizzazione. La strategia dipende dal profilo strutturale del documento.
Input: step-4/<stem>/clean.md + step-4/<stem>/structure_profile.json
Output: step-5/<stem>/chunks.json
Uso:
python step-5/chunker.py # tutti i documenti in step-4/
python step-5/chunker.py --stem documento # un solo documento
python step-5/chunker.py --stem documento --force
"""
import argparse
import json
import re
import sys
from pathlib import Path
# ─── Parametri ────────────────────────────────────────────────────────────────
MIN_CHARS = 200 # sotto questa soglia → accorpa al chunk successivo
MAX_CHARS = 800 # sopra questa soglia → spezza su frasi
OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary
# ─── Utilità ──────────────────────────────────────────────────────────────────
def split_sentences(text: str) -> list[str]:
"""
Divide il testo in frasi senza spezzare abbreviazioni comuni.
Split su punteggiatura finale (.!?») seguita da spazio + lettera maiuscola.
"""
# Split conservativo: solo quando la punteggiatura è seguita da spazio
# e la parola successiva inizia in maiuscolo (o è fine stringa).
parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip())
# Se non trova nulla con maiuscola, usa split semplice
if len(parts) <= 1:
parts = re.split(r'(?<=[.!?»])\s+', text.strip())
return [p.strip() for p in parts if p.strip()]
def slugify(s: str, max_len: int = 60) -> str:
"""Converti una stringa in slug per chunk_id."""
s = s.lower()
s = re.sub(r'[^\w\s-]', '', s)
s = re.sub(r'[\s_-]+', '_', s).strip('_')
return s[:max_len] if s else "section"
def make_sub_chunks(
body: str,
prefix: str,
sezione: str,
titolo: str,
max_chars: int,
overlap_s: int,
) -> list[dict]:
"""
Suddivide un body in sotto-chunk rispettando max_chars.
Aggiunge overlap_s frasi di overlap tra sotto-chunk consecutivi.
Non attraversa mai i confini del body.
"""
sentences = split_sentences(body)
if not sentences:
return []
chunks = []
current: list[str] = []
current_len = 0
sub_index = 0
i = 0
while i < len(sentences):
sent = sentences[i]
# +1 per lo spazio di separazione
if not current or current_len + len(sent) + 1 <= max_chars:
current.append(sent)
current_len += len(sent) + (1 if len(current) > 1 else 0)
i += 1
else:
# Flush del chunk corrente
chunk_text = prefix + " ".join(current)
chunks.append({
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": sub_index,
"n_chars": len(chunk_text),
})
sub_index += 1
# Overlap: riparti dalle ultime overlap_s frasi
overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else []
current = overlap[:]
current_len = sum(len(s) + 1 for s in current)
# Flush delle frasi rimanenti
if current:
chunk_text = prefix + " ".join(current)
chunks.append({
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": sub_index,
"n_chars": len(chunk_text),
})
return chunks
# ─── Parser Markdown ──────────────────────────────────────────────────────────
def parse_h3_sections(text: str) -> list[dict]:
"""
Parsa il documento in sezioni (sezione h2, titolo h3, body).
Testo prima del primo header viene assegnato a sezione vuota.
"""
sections = []
current_h2 = ""
current_h3 = ""
current_body_lines: list[str] = []
def flush():
body = "\n".join(current_body_lines).strip()
if body:
sections.append({
"sezione": current_h2,
"titolo": current_h3,
"body": body,
})
for line in text.splitlines():
if re.match(r"^# ", line):
# h1 = titolo documento, non crea sezione
flush()
current_h2 = line[2:].strip()
current_h3 = ""
current_body_lines = []
elif re.match(r"^## ", line):
flush()
current_h2 = line[3:].strip()
current_h3 = ""
current_body_lines = []
elif re.match(r"^### ", line):
flush()
current_h3 = line[4:].strip()
current_body_lines = []
else:
current_body_lines.append(line)
flush()
return sections
def parse_h2_sections(text: str) -> list[dict]:
"""Parsa il documento in sezioni h2 con il loro testo completo."""
sections = []
current_h2 = ""
current_body_lines: list[str] = []
def flush():
body = "\n".join(current_body_lines).strip()
if body:
sections.append({"sezione": current_h2, "body": body})
for line in text.splitlines():
if re.match(r"^## ", line):
flush()
current_h2 = line[3:].strip()
current_body_lines = []
elif re.match(r"^# ", line):
flush()
current_h2 = line[2:].strip()
current_body_lines = []
else:
current_body_lines.append(line)
flush()
return sections
# ─── Strategie di chunking ────────────────────────────────────────────────────
def chunk_h3_aware(text: str, stem: str) -> list[dict]:
"""
Strategia h3_aware: boundary su ###.
Sezioni piccole (< MIN_CHARS) vengono accorpate alla successiva
purché appartengano allo stesso ## padre.
Sezioni grandi (> MAX_CHARS) vengono suddivise su frasi.
"""
sections = parse_h3_sections(text)
# Merge greedy: accorpa al successivo se stesso h2 e body piccolo
merged: list[dict] = []
pending: dict | None = None
for sec in sections:
if pending is None:
pending = dict(sec)
continue
if (pending["sezione"] == sec["sezione"]
and len(pending["body"]) < MIN_CHARS):
sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]]))
pending = {
"sezione": pending["sezione"],
"titolo": sep_title or pending["titolo"],
"body": pending["body"] + "\n\n" + sec["body"],
}
else:
merged.append(pending)
pending = dict(sec)
if pending:
merged.append(pending)
# Genera chunk con eventuale split su frasi
chunks = []
for sec in merged:
sezione = sec["sezione"] or stem
titolo = sec["titolo"] or ""
body = sec["body"]
prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n"
sub = make_sub_chunks(body, prefix, sezione, titolo, MAX_CHARS, OVERLAP_S)
chunks.extend(sub)
return chunks
def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]:
"""
Strategia h2_paragraph_split: boundary su ##.
All'interno di ogni ## i paragrafi vengono usati come sotto-unità.
"""
sections = parse_h2_sections(text)
chunks = []
for sec in sections:
sezione = sec["sezione"] or stem
body = sec["body"]
prefix = f"[{sezione}]\n"
# Suddividi in paragrafi interni (righe vuote doppie)
paragraphs = [
p.strip()
for p in re.split(r"\n{2,}", body)
if p.strip() and not re.match(r"^#+\s", p.strip())
]
# Merge paragrafi piccoli
merged_pars: list[str] = []
pending = ""
for par in paragraphs:
if pending and len(pending) < MIN_CHARS:
pending = pending + "\n\n" + par
else:
if pending:
merged_pars.append(pending)
pending = par
if pending:
merged_pars.append(pending)
for idx, par in enumerate(merged_pars):
sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", MAX_CHARS, OVERLAP_S)
for c in sub:
c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}"
chunks.extend(sub)
return chunks
def chunk_paragraph(text: str, stem: str) -> list[dict]:
"""
Strategia paragraph: boundary su paragrafo (doppia riga vuota).
"""
paragraphs = [
p.strip()
for p in re.split(r"\n{2,}", text)
if p.strip() and not re.match(r"^#+\s", p.strip())
]
prefix = f"[Documento: {stem}]\n"
# Merge paragrafi piccoli
merged: list[str] = []
pending = ""
for par in paragraphs:
if pending and len(pending) < MIN_CHARS:
pending = pending + "\n\n" + par
else:
if pending:
merged.append(pending)
pending = par
if pending:
merged.append(pending)
chunks = []
for idx, par in enumerate(merged):
sub = make_sub_chunks(par, prefix, stem, f"par{idx}", MAX_CHARS, OVERLAP_S)
for c in sub:
c["chunk_id"] = f"para__{idx}__s{c['sub_index']}"
chunks.extend(sub)
return chunks
def chunk_sliding_window(text: str, stem: str) -> list[dict]:
"""
Strategia sliding_window: finestre di MAX_CHARS con OVERLAP_S frasi di overlap.
Usata per testi piatti senza struttura (livello 0).
"""
sentences = split_sentences(text)
prefix = f"[Documento: {stem}]\n"
chunks = []
i = 0
win_idx = 0
while i < len(sentences):
window: list[str] = []
cur_len = 0
j = i
while j < len(sentences):
s = sentences[j]
if window and cur_len + len(s) + 1 > MAX_CHARS:
break
window.append(s)
cur_len += len(s) + (1 if len(window) > 1 else 0)
j += 1
if not window:
window = [sentences[i]]
j = i + 1
chunk_text = prefix + " ".join(window)
chunks.append({
"chunk_id": f"win__{win_idx}",
"text": chunk_text,
"sezione": stem,
"titolo": f"finestra {win_idx}",
"sub_index": win_idx,
"n_chars": len(chunk_text),
})
win_idx += 1
# Avanza di (window_size - overlap), almeno 1
i += max(1, len(window) - OVERLAP_S)
return chunks
# ─── Dispatcher ───────────────────────────────────────────────────────────────
_STRATEGIES: dict[str, callable] = {
"h3_aware": chunk_h3_aware,
"h2_paragraph_split": chunk_h2_paragraph_split,
"paragraph": chunk_paragraph,
"sliding_window": chunk_sliding_window,
}
def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]:
text = clean_md.read_text(encoding="utf-8")
strategia = profile.get("strategia_chunking", "paragraph")
fn = _STRATEGIES.get(strategia, chunk_paragraph)
return fn(text, stem)
# ─── Per-document processing ──────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
step4_dir = project_root / "step-4" / stem
out_dir = project_root / "step-5" / stem
clean_md = step4_dir / "clean.md"
profile_path = step4_dir / "structure_profile.json"
out_file = out_dir / "chunks.json"
print(f"\nDocumento: {stem}")
if not clean_md.exists():
print(f" ✗ clean.md non trovato in step-4/{stem}/ — skip")
return False
if not profile_path.exists():
print(f" ✗ structure_profile.json non trovato in step-4/{stem}/ — skip")
return False
if out_file.exists() and not force:
print(f" ⚠️ chunks.json già presente — skip")
print(f" (usa --force per rieseguire)")
return True
profile = json.loads(profile_path.read_text(encoding="utf-8"))
strategia = profile.get("strategia_chunking", "paragraph")
print(f" Strategia: {strategia}")
chunks = chunk_document(clean_md, profile, stem)
if not chunks:
print(f" ✗ Nessun chunk generato — controlla clean.md")
return False
out_dir.mkdir(parents=True, exist_ok=True)
out_file.write_text(
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
lengths = [c["n_chars"] for c in chunks]
min_c = min(lengths)
max_c = max(lengths)
avg_c = int(sum(lengths) / len(lengths))
short = sum(1 for l in lengths if l < MIN_CHARS)
long_ = sum(1 for l in lengths if l > MAX_CHARS * 1.5)
print(f" Chunk totali: {len(chunks)}")
print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char")
if short:
print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})")
if long_:
print(f" ⚠️ {long_} chunk sopra MAX_CHARS×1.5 ({int(MAX_CHARS * 1.5)})")
print(f" ✅ chunks.json salvato in step-5/{stem}/")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 5 — Chunking adattivo")
parser.add_argument("--stem", help="Nome del documento (sottocartella di step-4/)")
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
step4_dir = project_root / "step-4"
if not step4_dir.exists():
print(f"Errore: cartella step-4/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(p.name for p in step4_dir.iterdir() if p.is_dir())
if not stems:
print(f"Errore: nessun documento trovato in step-4/")
sys.exit(1)
results = [process_stem(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti processati")
sys.exit(0 if all(results) else 1)