Files
davide 5126e0d971 step-5: add adaptive chunker
chunker.py splits any revised Markdown (step-4) into RAG-ready chunks.
Supports 4 strategies driven by structure_profile.json: h3_aware,
h2_paragraph_split, paragraph, sliding_window. Respects MIN/MAX_CHARS
and sentence-level overlap. Updates .gitignore and README paths.
2026-04-13 13:48:51 +02:00

458 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Step 5 — Chunking adattivo
Divide il Markdown revisionato (step 4) in chunk semantici pronti per la
vettorizzazione. La strategia dipende dal profilo strutturale del documento.
Input: step-4/<stem>/clean.md + step-4/<stem>/structure_profile.json
Output: step-5/<stem>/chunks.json
Uso:
python step-5/chunker.py # tutti i documenti in step-4/
python step-5/chunker.py --stem documento # un solo documento
python step-5/chunker.py --stem documento --force
"""
import argparse
import json
import re
import sys
from pathlib import Path
# ─── Parametri ────────────────────────────────────────────────────────────────
MIN_CHARS = 200 # sotto questa soglia → accorpa al chunk successivo
MAX_CHARS = 800 # sopra questa soglia → spezza su frasi
OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary
# ─── Utilità ──────────────────────────────────────────────────────────────────
def split_sentences(text: str) -> list[str]:
"""
Divide il testo in frasi senza spezzare abbreviazioni comuni.
Split su punteggiatura finale (.!?») seguita da spazio + lettera maiuscola.
"""
# Split conservativo: solo quando la punteggiatura è seguita da spazio
# e la parola successiva inizia in maiuscolo (o è fine stringa).
parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip())
# Se non trova nulla con maiuscola, usa split semplice
if len(parts) <= 1:
parts = re.split(r'(?<=[.!?»])\s+', text.strip())
return [p.strip() for p in parts if p.strip()]
def slugify(s: str, max_len: int = 60) -> str:
"""Converti una stringa in slug per chunk_id."""
s = s.lower()
s = re.sub(r'[^\w\s-]', '', s)
s = re.sub(r'[\s_-]+', '_', s).strip('_')
return s[:max_len] if s else "section"
def make_sub_chunks(
body: str,
prefix: str,
sezione: str,
titolo: str,
max_chars: int,
overlap_s: int,
) -> list[dict]:
"""
Suddivide un body in sotto-chunk rispettando max_chars.
Aggiunge overlap_s frasi di overlap tra sotto-chunk consecutivi.
Non attraversa mai i confini del body.
"""
sentences = split_sentences(body)
if not sentences:
return []
chunks = []
current: list[str] = []
current_len = 0
sub_index = 0
i = 0
while i < len(sentences):
sent = sentences[i]
# +1 per lo spazio di separazione
if not current or current_len + len(sent) + 1 <= max_chars:
current.append(sent)
current_len += len(sent) + (1 if len(current) > 1 else 0)
i += 1
else:
# Flush del chunk corrente
chunk_text = prefix + " ".join(current)
chunks.append({
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": sub_index,
"n_chars": len(chunk_text),
})
sub_index += 1
# Overlap: riparti dalle ultime overlap_s frasi
overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else []
current = overlap[:]
current_len = sum(len(s) + 1 for s in current)
# Flush delle frasi rimanenti
if current:
chunk_text = prefix + " ".join(current)
chunks.append({
"chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}",
"text": chunk_text,
"sezione": sezione,
"titolo": titolo,
"sub_index": sub_index,
"n_chars": len(chunk_text),
})
return chunks
# ─── Parser Markdown ──────────────────────────────────────────────────────────
def parse_h3_sections(text: str) -> list[dict]:
"""
Parsa il documento in sezioni (sezione h2, titolo h3, body).
Testo prima del primo header viene assegnato a sezione vuota.
"""
sections = []
current_h2 = ""
current_h3 = ""
current_body_lines: list[str] = []
def flush():
body = "\n".join(current_body_lines).strip()
if body:
sections.append({
"sezione": current_h2,
"titolo": current_h3,
"body": body,
})
for line in text.splitlines():
if re.match(r"^# ", line):
# h1 = titolo documento, non crea sezione
flush()
current_h2 = line[2:].strip()
current_h3 = ""
current_body_lines = []
elif re.match(r"^## ", line):
flush()
current_h2 = line[3:].strip()
current_h3 = ""
current_body_lines = []
elif re.match(r"^### ", line):
flush()
current_h3 = line[4:].strip()
current_body_lines = []
else:
current_body_lines.append(line)
flush()
return sections
def parse_h2_sections(text: str) -> list[dict]:
"""Parsa il documento in sezioni h2 con il loro testo completo."""
sections = []
current_h2 = ""
current_body_lines: list[str] = []
def flush():
body = "\n".join(current_body_lines).strip()
if body:
sections.append({"sezione": current_h2, "body": body})
for line in text.splitlines():
if re.match(r"^## ", line):
flush()
current_h2 = line[3:].strip()
current_body_lines = []
elif re.match(r"^# ", line):
flush()
current_h2 = line[2:].strip()
current_body_lines = []
else:
current_body_lines.append(line)
flush()
return sections
# ─── Strategie di chunking ────────────────────────────────────────────────────
def chunk_h3_aware(text: str, stem: str) -> list[dict]:
"""
Strategia h3_aware: boundary su ###.
Sezioni piccole (< MIN_CHARS) vengono accorpate alla successiva
purché appartengano allo stesso ## padre.
Sezioni grandi (> MAX_CHARS) vengono suddivise su frasi.
"""
sections = parse_h3_sections(text)
# Merge greedy: accorpa al successivo se stesso h2 e body piccolo
merged: list[dict] = []
pending: dict | None = None
for sec in sections:
if pending is None:
pending = dict(sec)
continue
if (pending["sezione"] == sec["sezione"]
and len(pending["body"]) < MIN_CHARS):
sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]]))
pending = {
"sezione": pending["sezione"],
"titolo": sep_title or pending["titolo"],
"body": pending["body"] + "\n\n" + sec["body"],
}
else:
merged.append(pending)
pending = dict(sec)
if pending:
merged.append(pending)
# Genera chunk con eventuale split su frasi
chunks = []
for sec in merged:
sezione = sec["sezione"] or stem
titolo = sec["titolo"] or ""
body = sec["body"]
prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n"
sub = make_sub_chunks(body, prefix, sezione, titolo, MAX_CHARS, OVERLAP_S)
chunks.extend(sub)
return chunks
def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]:
"""
Strategia h2_paragraph_split: boundary su ##.
All'interno di ogni ## i paragrafi vengono usati come sotto-unità.
"""
sections = parse_h2_sections(text)
chunks = []
for sec in sections:
sezione = sec["sezione"] or stem
body = sec["body"]
prefix = f"[{sezione}]\n"
# Suddividi in paragrafi interni (righe vuote doppie)
paragraphs = [
p.strip()
for p in re.split(r"\n{2,}", body)
if p.strip() and not re.match(r"^#+\s", p.strip())
]
# Merge paragrafi piccoli
merged_pars: list[str] = []
pending = ""
for par in paragraphs:
if pending and len(pending) < MIN_CHARS:
pending = pending + "\n\n" + par
else:
if pending:
merged_pars.append(pending)
pending = par
if pending:
merged_pars.append(pending)
for idx, par in enumerate(merged_pars):
sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", MAX_CHARS, OVERLAP_S)
for c in sub:
c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}"
chunks.extend(sub)
return chunks
def chunk_paragraph(text: str, stem: str) -> list[dict]:
"""
Strategia paragraph: boundary su paragrafo (doppia riga vuota).
"""
paragraphs = [
p.strip()
for p in re.split(r"\n{2,}", text)
if p.strip() and not re.match(r"^#+\s", p.strip())
]
prefix = f"[Documento: {stem}]\n"
# Merge paragrafi piccoli
merged: list[str] = []
pending = ""
for par in paragraphs:
if pending and len(pending) < MIN_CHARS:
pending = pending + "\n\n" + par
else:
if pending:
merged.append(pending)
pending = par
if pending:
merged.append(pending)
chunks = []
for idx, par in enumerate(merged):
sub = make_sub_chunks(par, prefix, stem, f"par{idx}", MAX_CHARS, OVERLAP_S)
for c in sub:
c["chunk_id"] = f"para__{idx}__s{c['sub_index']}"
chunks.extend(sub)
return chunks
def chunk_sliding_window(text: str, stem: str) -> list[dict]:
"""
Strategia sliding_window: finestre di MAX_CHARS con OVERLAP_S frasi di overlap.
Usata per testi piatti senza struttura (livello 0).
"""
sentences = split_sentences(text)
prefix = f"[Documento: {stem}]\n"
chunks = []
i = 0
win_idx = 0
while i < len(sentences):
window: list[str] = []
cur_len = 0
j = i
while j < len(sentences):
s = sentences[j]
if window and cur_len + len(s) + 1 > MAX_CHARS:
break
window.append(s)
cur_len += len(s) + (1 if len(window) > 1 else 0)
j += 1
if not window:
window = [sentences[i]]
j = i + 1
chunk_text = prefix + " ".join(window)
chunks.append({
"chunk_id": f"win__{win_idx}",
"text": chunk_text,
"sezione": stem,
"titolo": f"finestra {win_idx}",
"sub_index": win_idx,
"n_chars": len(chunk_text),
})
win_idx += 1
# Avanza di (window_size - overlap), almeno 1
i += max(1, len(window) - OVERLAP_S)
return chunks
# ─── Dispatcher ───────────────────────────────────────────────────────────────
_STRATEGIES: dict[str, callable] = {
"h3_aware": chunk_h3_aware,
"h2_paragraph_split": chunk_h2_paragraph_split,
"paragraph": chunk_paragraph,
"sliding_window": chunk_sliding_window,
}
def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]:
text = clean_md.read_text(encoding="utf-8")
strategia = profile.get("strategia_chunking", "paragraph")
fn = _STRATEGIES.get(strategia, chunk_paragraph)
return fn(text, stem)
# ─── Per-document processing ──────────────────────────────────────────────────
def process_stem(stem: str, project_root: Path, force: bool) -> bool:
step4_dir = project_root / "step-4" / stem
out_dir = project_root / "step-5" / stem
clean_md = step4_dir / "clean.md"
profile_path = step4_dir / "structure_profile.json"
out_file = out_dir / "chunks.json"
print(f"\nDocumento: {stem}")
if not clean_md.exists():
print(f" ✗ clean.md non trovato in step-4/{stem}/ — skip")
return False
if not profile_path.exists():
print(f" ✗ structure_profile.json non trovato in step-4/{stem}/ — skip")
return False
if out_file.exists() and not force:
print(f" ⚠️ chunks.json già presente — skip")
print(f" (usa --force per rieseguire)")
return True
profile = json.loads(profile_path.read_text(encoding="utf-8"))
strategia = profile.get("strategia_chunking", "paragraph")
print(f" Strategia: {strategia}")
chunks = chunk_document(clean_md, profile, stem)
if not chunks:
print(f" ✗ Nessun chunk generato — controlla clean.md")
return False
out_dir.mkdir(parents=True, exist_ok=True)
out_file.write_text(
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
)
lengths = [c["n_chars"] for c in chunks]
min_c = min(lengths)
max_c = max(lengths)
avg_c = int(sum(lengths) / len(lengths))
short = sum(1 for l in lengths if l < MIN_CHARS)
long_ = sum(1 for l in lengths if l > MAX_CHARS * 1.5)
print(f" Chunk totali: {len(chunks)}")
print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char")
if short:
print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})")
if long_:
print(f" ⚠️ {long_} chunk sopra MAX_CHARS×1.5 ({int(MAX_CHARS * 1.5)})")
print(f" ✅ chunks.json salvato in step-5/{stem}/")
return True
# ─── Entry point ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
project_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Step 5 — Chunking adattivo")
parser.add_argument("--stem", help="Nome del documento (sottocartella di step-4/)")
parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente")
args = parser.parse_args()
if args.stem:
stems = [args.stem]
else:
step4_dir = project_root / "step-4"
if not step4_dir.exists():
print(f"Errore: cartella step-4/ non trovata in {project_root}")
sys.exit(1)
stems = sorted(p.name for p in step4_dir.iterdir() if p.is_dir())
if not stems:
print(f"Errore: nessun documento trovato in step-4/")
sys.exit(1)
results = [process_stem(s, project_root, args.force) for s in stems]
ok = sum(results)
total = len(results)
print(f"\n{'' if all(results) else '⚠️ '} {ok}/{total} documenti processati")
sys.exit(0 if all(results) else 1)