diff --git a/step-5/chunker.py b/step-5/chunker.py deleted file mode 100644 index b5ba539..0000000 --- a/step-5/chunker.py +++ /dev/null @@ -1,457 +0,0 @@ -#!/usr/bin/env python3 -""" -Chunking adattivo - -Divide il Markdown revisionato in chunk semantici pronti per la -vettorizzazione. La strategia dipende dal profilo strutturale del documento. - -Input: conversione//clean.md + conversione//structure_profile.json -Output: step-5//chunks.json - -Uso: - python step-5/chunker.py # tutti i documenti in conversione/ - python step-5/chunker.py --stem documento # un solo documento - python step-5/chunker.py --stem documento --force -""" - -import argparse -import json -import re -import sys -from pathlib import Path - - -# ─── Parametri ──────────────────────────────────────────────────────────────── - -MIN_CHARS = 200 # sotto questa soglia → accorpa al chunk successivo -MAX_CHARS = 800 # sopra questa soglia → spezza su frasi -OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary - - -# ─── Utilità ────────────────────────────────────────────────────────────────── - -def split_sentences(text: str) -> list[str]: - """ - Divide il testo in frasi senza spezzare abbreviazioni comuni. - Split su punteggiatura finale (.!?») seguita da spazio + lettera maiuscola. - """ - # Split conservativo: solo quando la punteggiatura è seguita da spazio - # e la parola successiva inizia in maiuscolo (o è fine stringa). - parts = re.split(r'(?<=[.!?»])\s+(?=[A-ZÀÈÉÌÒÙA-Z\"])', text.strip()) - # Se non trova nulla con maiuscola, usa split semplice - if len(parts) <= 1: - parts = re.split(r'(?<=[.!?»])\s+', text.strip()) - return [p.strip() for p in parts if p.strip()] - - -def slugify(s: str, max_len: int = 60) -> str: - """Converti una stringa in slug per chunk_id.""" - s = s.lower() - s = re.sub(r'[^\w\s-]', '', s) - s = re.sub(r'[\s_-]+', '_', s).strip('_') - return s[:max_len] if s else "section" - - -def make_sub_chunks( - body: str, - prefix: str, - sezione: str, - titolo: str, - max_chars: int, - overlap_s: int, -) -> list[dict]: - """ - Suddivide un body in sotto-chunk rispettando max_chars. - Aggiunge overlap_s frasi di overlap tra sotto-chunk consecutivi. - Non attraversa mai i confini del body. - """ - sentences = split_sentences(body) - if not sentences: - return [] - - chunks = [] - current: list[str] = [] - current_len = 0 - sub_index = 0 - - i = 0 - while i < len(sentences): - sent = sentences[i] - # +1 per lo spazio di separazione - if not current or current_len + len(sent) + 1 <= max_chars: - current.append(sent) - current_len += len(sent) + (1 if len(current) > 1 else 0) - i += 1 - else: - # Flush del chunk corrente - chunk_text = prefix + " ".join(current) - chunks.append({ - "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", - "text": chunk_text, - "sezione": sezione, - "titolo": titolo, - "sub_index": sub_index, - "n_chars": len(chunk_text), - }) - sub_index += 1 - # Overlap: riparti dalle ultime overlap_s frasi - overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] - current = overlap[:] - current_len = sum(len(s) + 1 for s in current) - - # Flush delle frasi rimanenti - if current: - chunk_text = prefix + " ".join(current) - chunks.append({ - "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", - "text": chunk_text, - "sezione": sezione, - "titolo": titolo, - "sub_index": sub_index, - "n_chars": len(chunk_text), - }) - - return chunks - - -# ─── Parser Markdown ────────────────────────────────────────────────────────── - -def parse_h3_sections(text: str) -> list[dict]: - """ - Parsa il documento in sezioni (sezione h2, titolo h3, body). - Testo prima del primo header viene assegnato a sezione vuota. - """ - sections = [] - current_h2 = "" - current_h3 = "" - current_body_lines: list[str] = [] - - def flush(): - body = "\n".join(current_body_lines).strip() - if body: - sections.append({ - "sezione": current_h2, - "titolo": current_h3, - "body": body, - }) - - for line in text.splitlines(): - if re.match(r"^# ", line): - # h1 = titolo documento, non crea sezione - flush() - current_h2 = line[2:].strip() - current_h3 = "" - current_body_lines = [] - elif re.match(r"^## ", line): - flush() - current_h2 = line[3:].strip() - current_h3 = "" - current_body_lines = [] - elif re.match(r"^### ", line): - flush() - current_h3 = line[4:].strip() - current_body_lines = [] - else: - current_body_lines.append(line) - - flush() - return sections - - -def parse_h2_sections(text: str) -> list[dict]: - """Parsa il documento in sezioni h2 con il loro testo completo.""" - sections = [] - current_h2 = "" - current_body_lines: list[str] = [] - - def flush(): - body = "\n".join(current_body_lines).strip() - if body: - sections.append({"sezione": current_h2, "body": body}) - - for line in text.splitlines(): - if re.match(r"^## ", line): - flush() - current_h2 = line[3:].strip() - current_body_lines = [] - elif re.match(r"^# ", line): - flush() - current_h2 = line[2:].strip() - current_body_lines = [] - else: - current_body_lines.append(line) - - flush() - return sections - - -# ─── Strategie di chunking ──────────────────────────────────────────────────── - -def chunk_h3_aware(text: str, stem: str) -> list[dict]: - """ - Strategia h3_aware: boundary su ###. - Sezioni piccole (< MIN_CHARS) vengono accorpate alla successiva - purché appartengano allo stesso ## padre. - Sezioni grandi (> MAX_CHARS) vengono suddivise su frasi. - """ - sections = parse_h3_sections(text) - - # Merge greedy: accorpa al successivo se stesso h2 e body piccolo - merged: list[dict] = [] - pending: dict | None = None - - for sec in sections: - if pending is None: - pending = dict(sec) - continue - - if (pending["sezione"] == sec["sezione"] - and len(pending["body"]) < MIN_CHARS): - sep_title = " / ".join(filter(None, [pending["titolo"], sec["titolo"]])) - pending = { - "sezione": pending["sezione"], - "titolo": sep_title or pending["titolo"], - "body": pending["body"] + "\n\n" + sec["body"], - } - else: - merged.append(pending) - pending = dict(sec) - - if pending: - merged.append(pending) - - # Genera chunk con eventuale split su frasi - chunks = [] - for sec in merged: - sezione = sec["sezione"] or stem - titolo = sec["titolo"] or "" - body = sec["body"] - - prefix = f"[{sezione} > {titolo}]\n" if titolo else f"[{sezione}]\n" - sub = make_sub_chunks(body, prefix, sezione, titolo, MAX_CHARS, OVERLAP_S) - chunks.extend(sub) - - return chunks - - -def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: - """ - Strategia h2_paragraph_split: boundary su ##. - All'interno di ogni ## i paragrafi vengono usati come sotto-unità. - """ - sections = parse_h2_sections(text) - chunks = [] - - for sec in sections: - sezione = sec["sezione"] or stem - body = sec["body"] - prefix = f"[{sezione}]\n" - - # Suddividi in paragrafi interni (righe vuote doppie) - paragraphs = [ - p.strip() - for p in re.split(r"\n{2,}", body) - if p.strip() and not re.match(r"^#+\s", p.strip()) - ] - - # Merge paragrafi piccoli - merged_pars: list[str] = [] - pending = "" - for par in paragraphs: - if pending and len(pending) < MIN_CHARS: - pending = pending + "\n\n" + par - else: - if pending: - merged_pars.append(pending) - pending = par - if pending: - merged_pars.append(pending) - - for idx, par in enumerate(merged_pars): - sub = make_sub_chunks(par, prefix, sezione, f"par{idx}", MAX_CHARS, OVERLAP_S) - for c in sub: - c["chunk_id"] = f"{slugify(sezione)}__p{idx}__s{c['sub_index']}" - chunks.extend(sub) - - return chunks - - -def chunk_paragraph(text: str, stem: str) -> list[dict]: - """ - Strategia paragraph: boundary su paragrafo (doppia riga vuota). - """ - paragraphs = [ - p.strip() - for p in re.split(r"\n{2,}", text) - if p.strip() and not re.match(r"^#+\s", p.strip()) - ] - prefix = f"[Documento: {stem}]\n" - - # Merge paragrafi piccoli - merged: list[str] = [] - pending = "" - for par in paragraphs: - if pending and len(pending) < MIN_CHARS: - pending = pending + "\n\n" + par - else: - if pending: - merged.append(pending) - pending = par - if pending: - merged.append(pending) - - chunks = [] - for idx, par in enumerate(merged): - sub = make_sub_chunks(par, prefix, stem, f"par{idx}", MAX_CHARS, OVERLAP_S) - for c in sub: - c["chunk_id"] = f"para__{idx}__s{c['sub_index']}" - chunks.extend(sub) - - return chunks - - -def chunk_sliding_window(text: str, stem: str) -> list[dict]: - """ - Strategia sliding_window: finestre di MAX_CHARS con OVERLAP_S frasi di overlap. - Usata per testi piatti senza struttura (livello 0). - """ - sentences = split_sentences(text) - prefix = f"[Documento: {stem}]\n" - - chunks = [] - i = 0 - win_idx = 0 - - while i < len(sentences): - window: list[str] = [] - cur_len = 0 - - j = i - while j < len(sentences): - s = sentences[j] - if window and cur_len + len(s) + 1 > MAX_CHARS: - break - window.append(s) - cur_len += len(s) + (1 if len(window) > 1 else 0) - j += 1 - - if not window: - window = [sentences[i]] - j = i + 1 - - chunk_text = prefix + " ".join(window) - chunks.append({ - "chunk_id": f"win__{win_idx}", - "text": chunk_text, - "sezione": stem, - "titolo": f"finestra {win_idx}", - "sub_index": win_idx, - "n_chars": len(chunk_text), - }) - win_idx += 1 - # Avanza di (window_size - overlap), almeno 1 - i += max(1, len(window) - OVERLAP_S) - - return chunks - - -# ─── Dispatcher ─────────────────────────────────────────────────────────────── - -_STRATEGIES: dict[str, callable] = { - "h3_aware": chunk_h3_aware, - "h2_paragraph_split": chunk_h2_paragraph_split, - "paragraph": chunk_paragraph, - "sliding_window": chunk_sliding_window, -} - - -def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]: - text = clean_md.read_text(encoding="utf-8") - strategia = profile.get("strategia_chunking", "paragraph") - fn = _STRATEGIES.get(strategia, chunk_paragraph) - return fn(text, stem) - - -# ─── Per-document processing ────────────────────────────────────────────────── - -def process_stem(stem: str, project_root: Path, force: bool) -> bool: - conv_dir = project_root / "conversione" / stem - out_dir = project_root / "step-5" / stem - clean_md = conv_dir / "clean.md" - profile_path = conv_dir / "structure_profile.json" - out_file = out_dir / "chunks.json" - - print(f"\nDocumento: {stem}") - - if not clean_md.exists(): - print(f" ✗ clean.md non trovato in conversione/{stem}/ — skip") - return False - if not profile_path.exists(): - print(f" ✗ structure_profile.json non trovato in conversione/{stem}/ — skip") - return False - - if out_file.exists() and not force: - print(f" ⚠️ chunks.json già presente — skip") - print(f" (usa --force per rieseguire)") - return True - - profile = json.loads(profile_path.read_text(encoding="utf-8")) - strategia = profile.get("strategia_chunking", "paragraph") - print(f" Strategia: {strategia}") - - chunks = chunk_document(clean_md, profile, stem) - - if not chunks: - print(f" ✗ Nessun chunk generato — controlla clean.md") - return False - - out_dir.mkdir(parents=True, exist_ok=True) - out_file.write_text( - json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" - ) - - lengths = [c["n_chars"] for c in chunks] - min_c = min(lengths) - max_c = max(lengths) - avg_c = int(sum(lengths) / len(lengths)) - short = sum(1 for l in lengths if l < MIN_CHARS) - long_ = sum(1 for l in lengths if l > MAX_CHARS * 1.5) - - print(f" Chunk totali: {len(chunks)}") - print(f" Min: {min_c} char Max: {max_c} char Media: {avg_c} char") - if short: - print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})") - if long_: - print(f" ⚠️ {long_} chunk sopra MAX_CHARS×1.5 ({int(MAX_CHARS * 1.5)})") - print(f" ✅ chunks.json salvato in step-5/{stem}/") - return True - - -# ─── Entry point ───────────────────────────────────────────────────────────── - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Chunking adattivo") - parser.add_argument("--stem", help="Nome del documento (sottocartella di conversione/)") - parser.add_argument("--force", action="store_true", help="Riesegui anche se già presente") - args = parser.parse_args() - - if args.stem: - stems = [args.stem] - else: - conv_dir = project_root / "conversione" - if not conv_dir.exists(): - print(f"Errore: cartella conversione/ non trovata in {project_root}") - sys.exit(1) - stems = sorted(p.name for p in conv_dir.iterdir() if p.is_dir() and (p / "clean.md").exists()) - if not stems: - print(f"Errore: nessun documento trovato in conversione/") - sys.exit(1) - - results = [process_stem(s, project_root, args.force) for s in stems] - - ok = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti processati") - sys.exit(0 if all(results) else 1) diff --git a/step-6/fix_chunks.py b/step-6/fix_chunks.py deleted file mode 100644 index 448bbc0..0000000 --- a/step-6/fix_chunks.py +++ /dev/null @@ -1,320 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 6 — Fix chunk - -Applica correzioni dirette su step-6//chunks.json basandosi sul report.json -prodotto da verify_chunks.py. Non tocca clean.md né step-5. - -Fixes applicati: - empty → rimuove il chunk - incomplete → fonde con il chunk successivo (la frase continua) - no_prefix → aggiunge prefisso [sezione > titolo] se mancante - too_short → fonde con il chunk adiacente nello stesso sezione - too_long → spezza all'ultimo confine di paragrafo/frase entro MAX_CHARS - -Input: step-6//chunks.json + step-6//report.json -Output: step-6//chunks.json (sovrascrive) - -Uso: - python step-6/fix_chunks.py --stem nietzsche - python step-6/fix_chunks.py --stem nietzsche --dry-run -""" - -import argparse -import json -import re -import sys -from pathlib import Path - -MAX_CHARS = 800 -PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$") - - -# ─── Helpers ────────────────────────────────────────────────────────────────── - -def _prefix(chunk: dict) -> str: - """Costruisce il prefisso [sezione > titolo] o [sezione].""" - sezione = chunk.get("sezione", "") - titolo = chunk.get("titolo", "") - if titolo: - return f"[{sezione} > {titolo}]" - return f"[{sezione}]" - - -def _strip_prefix(text: str) -> str: - """Rimuove il prefisso [...] iniziale dal testo, se presente.""" - text = text.lstrip() - if text.startswith("["): - end = text.find("]") - if end != -1: - return text[end + 1:].lstrip("\n") - return text - - -def _rebuild_text(chunk: dict, body: str) -> str: - """Ricompone il testo completo: prefisso + corpo.""" - return f"{_prefix(chunk)}\n{body}" - - -def _split_at_boundary(text: str, max_chars: int) -> list[str]: - """ - Spezza 'text' in segmenti di al massimo max_chars caratteri, - tagliando al confine più vicino (doppio newline, poi fine frase). - Ritorna una lista di segmenti non vuoti. - """ - if len(text) <= max_chars: - return [text] - - parts = [] - remaining = text - - while len(remaining) > max_chars: - # Cerca l'ultimo \n\n entro max_chars - candidate = remaining[:max_chars] - split_pos = candidate.rfind("\n\n") - - if split_pos == -1: - # Cerca l'ultimo . ? ! entro max_chars - m = None - for m in re.finditer(r"[.!?»]\s+", candidate): - pass - split_pos = m.end() if m else None - - if split_pos is None or split_pos == 0: - # Nessun punto naturale: taglia sul primo spazio dopo max_chars - sp = remaining.find(" ", max_chars) - split_pos = sp if sp != -1 else len(remaining) - - parts.append(remaining[:split_pos].rstrip()) - remaining = remaining[split_pos:].lstrip() - - if remaining: - parts.append(remaining) - - return [p for p in parts if p.strip()] - - -# ─── Operazioni sui chunk ───────────────────────────────────────────────────── - -def fix_empty(chunks: list[dict], empty_ids: set[str]) -> tuple[list[dict], int]: - """Rimuove i chunk vuoti.""" - before = len(chunks) - chunks = [c for c in chunks if c["chunk_id"] not in empty_ids] - return chunks, before - len(chunks) - - -def fix_no_prefix(chunks: list[dict], no_prefix_ids: set[str]) -> tuple[list[dict], int]: - """Aggiunge il prefisso mancante ai chunk che ne sono privi.""" - count = 0 - for c in chunks: - if c["chunk_id"] in no_prefix_ids: - body = _strip_prefix(c["text"]) - c["text"] = _rebuild_text(c, body) - c["n_chars"] = len(c["text"]) - count += 1 - return chunks, count - - -def fix_incomplete_and_short(chunks: list[dict], - problem_ids: set[str]) -> tuple[list[dict], int]: - """ - Fonde i chunk problematici (incompleti o troppo corti) con il chunk - immediatamente successivo che appartiene allo stesso sezione. - Usato sia per 'incomplete' che per 'too_short'. - """ - merged = 0 - i = 0 - result: list[dict] = [] - - while i < len(chunks): - c = chunks[i] - if c["chunk_id"] in problem_ids and i + 1 < len(chunks): - nxt = chunks[i + 1] - # Fonde solo se stesso sezione (o se il successivo è compatibile) - body_c = _strip_prefix(c["text"]) - body_nxt = _strip_prefix(nxt["text"]) - merged_body = body_c.rstrip() + "\n" + body_nxt.lstrip() - nxt["text"] = _rebuild_text(nxt, merged_body) - nxt["n_chars"] = len(nxt["text"]) - # Salta c (è stato assorbito in nxt) - merged += 1 - i += 1 - continue - result.append(c) - i += 1 - - return result, merged - - -def fix_too_long(chunks: list[dict], - too_long_ids: set[str], - max_chars: int) -> tuple[list[dict], int]: - """ - Spezza i chunk troppo lunghi al confine naturale più vicino a max_chars. - Ogni sotto-chunk eredita sezione/titolo e riceve un nuovo sub_index. - """ - result: list[dict] = [] - split_count = 0 - - for c in chunks: - if c["chunk_id"] not in too_long_ids: - result.append(c) - continue - - body = _strip_prefix(c["text"]) - parts = _split_at_boundary(body, max_chars) - - if len(parts) == 1: - # Non spezzabile: lascia invariato - result.append(c) - continue - - base_id = re.sub(r"__s\d+$", "", c["chunk_id"]) - base_sub = c.get("sub_index", 0) - - for j, part in enumerate(parts): - new_chunk = dict(c) - new_chunk["sub_index"] = base_sub + j - new_chunk["chunk_id"] = f"{base_id}__s{base_sub + j}" - new_chunk["text"] = _rebuild_text(new_chunk, part) - new_chunk["n_chars"] = len(new_chunk["text"]) - result.append(new_chunk) - - split_count += 1 - - return result, split_count - - -def renumber_ids(chunks: list[dict]) -> list[dict]: - """ - Rinomina i chunk_id per evitare duplicati dopo merge/split. - Mantiene il pattern base__sN dove N è il nuovo indice per sezione/titolo. - """ - seen: dict[str, int] = {} - for c in chunks: - base = re.sub(r"__s\d+$", "", c["chunk_id"]) - idx = seen.get(base, 0) - c["chunk_id"] = f"{base}__s{idx}" - c["sub_index"] = idx - seen[base] = idx + 1 - return chunks - - -# ─── Core ───────────────────────────────────────────────────────────────────── - -def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bool: - step6_dir = project_root / "step-6" / stem - chunks_path = step6_dir / "chunks.json" - report_path = step6_dir / "report.json" - - if not chunks_path.exists(): - print(f"✗ step-6/{stem}/chunks.json non trovato.") - print(f" Esegui prima: python step-6/verify_chunks.py --stem {stem}") - return False - - if not report_path.exists(): - print(f"✗ step-6/{stem}/report.json non trovato.") - print(f" Esegui prima: python step-6/verify_chunks.py --stem {stem}") - return False - - chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8")) - report: dict = json.loads(report_path.read_text(encoding="utf-8")) - - verdict = report.get("verdict", "ok") - print(f"\nDocumento: {stem} (verdict: {verdict})") - - if verdict == "ok": - print(" ✅ Nessun problema — nulla da correggere.") - return True - - # Raccogli gli ID per categoria - empty_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])} - no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])} - incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])} - too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])} - too_long_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_long", [])} - - # Riepilogo operazioni pianificate - ops: list[str] = [] - if empty_ids: - ops.append(f" 🗑 rimuovi {len(empty_ids)} chunk vuoti") - if no_prefix_ids: - ops.append(f" 🔧 aggiungi prefisso a {len(no_prefix_ids)} chunk") - if incomplete_ids: - ops.append(f" 🔗 fondi {len(incomplete_ids)} chunk incompleti col successivo") - if too_short_ids: - ops.append(f" 🔗 fondi {len(too_short_ids)} chunk troppo corti col successivo") - if too_long_ids: - ops.append(f" ✂️ spezza {len(too_long_ids)} chunk troppo lunghi") - - if not ops: - print(" ✅ Nessuna correzione necessaria.") - return True - - print("\n Operazioni pianificate:") - for op in ops: - print(op) - - if dry_run: - print("\n [dry-run] Nessuna modifica applicata.") - return True - - n_before = len(chunks) - - # Applica nell'ordine corretto - if empty_ids: - chunks, n = fix_empty(chunks, empty_ids) - print(f"\n 🗑 Rimossi {n} chunk vuoti.") - - if no_prefix_ids: - chunks, n = fix_no_prefix(chunks, no_prefix_ids) - print(f" 🔧 Aggiunto prefisso a {n} chunk.") - - # incomplete prima di too_short (entrambi usano merge-forward) - merge_ids = incomplete_ids | too_short_ids - if merge_ids: - chunks, n = fix_incomplete_and_short(chunks, merge_ids) - print(f" 🔗 Fusi {n} chunk (incompleti + corti).") - - if too_long_ids: - # Aggiorna too_long_ids per chunk che potrebbero essere stati rinominati - # dopo il merge (usa ancora gli ID originali, il merge non tocca too_long) - chunks, n = fix_too_long(chunks, too_long_ids, max_chars) - print(f" ✂️ Spezzati {n} chunk lunghi.") - - # Rinumera per evitare duplicati - chunks = renumber_ids(chunks) - - n_after = len(chunks) - print(f"\n Totale chunk: {n_before} → {n_after}") - - # Salva - chunks_path.write_text( - json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" - ) - print(f" ✅ Salvato: step-6/{stem}/chunks.json") - print(f"\n Riesegui la verifica:") - print(f" python step-6/verify_chunks.py --stem {stem}") - - return True - - -# ─── Entry point ────────────────────────────────────────────────────────────── - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Step 6 — Fix chunk") - parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di step-6/)") - parser.add_argument( - "--max", type=int, default=MAX_CHARS, - help=f"Soglia massima caratteri per lo split (default: {MAX_CHARS})" - ) - parser.add_argument( - "--dry-run", action="store_true", - help="Mostra le operazioni pianificate senza applicarle" - ) - args = parser.parse_args() - - ok = fix_stem(args.stem, project_root, args.max, args.dry_run) - sys.exit(0 if ok else 1) diff --git a/step-6/verify_chunks.py b/step-6/verify_chunks.py deleted file mode 100644 index cf881af..0000000 --- a/step-6/verify_chunks.py +++ /dev/null @@ -1,321 +0,0 @@ -#!/usr/bin/env python3 -""" -Step 6 — Verifica chunk - -Analizza step-5//chunks.json e segnala ogni anomalia che potrebbe -degradare la qualità del retrieval. Non modifica nulla: se ci sono problemi -torna allo step 4 (revisione MD) o aggiusta i parametri dello step 5. - -Input: step-5//chunks.json -Output: report a schermo + step-6//report.json + exit code (0 = OK, 1 = problemi) - -Uso: - python step-6/verify_chunks.py --stem documento - python step-6/verify_chunks.py # tutti i documenti in step-5/ - python step-6/verify_chunks.py --min 200 --max 800 -""" - -import argparse -import json -import re -import sys -from pathlib import Path - - -# ─── Soglie (devono coincidere con quelle usate in chunker.py) ──────────────── - -MIN_CHARS = 200 -MAX_CHARS = 800 -PUNCT_END = re.compile("[.!?»)\\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026-]$") - - -# ─── Checks ─────────────────────────────────────────────────────────────────── - -def has_prefix(chunk: dict) -> bool: - """Il chunk inizia con il prefisso di contesto '[...'.""" - return chunk.get("text", "").lstrip().startswith("[") - - -def is_empty(chunk: dict) -> bool: - return not chunk.get("text", "").strip() - - -def is_too_short(chunk: dict, min_chars: int) -> bool: - return chunk.get("n_chars", 0) < min_chars - - -def is_too_long(chunk: dict, max_chars: int) -> bool: - return chunk.get("n_chars", 0) > max_chars * 1.5 - - -def ends_incomplete(chunk: dict) -> bool: - """L'ultima riga di testo non termina con punteggiatura di fine frase.""" - text = chunk.get("text", "").rstrip() - if not text: - return False - # Rimuovi marcatori markdown finali (_ e *) prima di controllare: - # pattern come _parola._ o _parola!_ sono frasi complete. - text_check = re.sub(r"[_*]+$", "", text).rstrip() - if not text_check: - return False - return not PUNCT_END.search(text_check) - - -# ─── Report ─────────────────────────────────────────────────────────────────── - -def _fmt_chunk(c: dict) -> str: - cid = c.get("chunk_id", "?") - n = c.get("n_chars", 0) - preview = c.get("text", "")[:60].replace("\n", " ") - return f" [{cid}] ({n} char) «{preview}»" - - -def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool: - """Verifica i chunk di un documento. Ritorna True se nessun bloccante rilevato.""" - import shutil - - chunks_path = project_root / "step-6" / stem / "chunks.json" - - print(f"\nDocumento: {stem}") - - if not chunks_path.exists(): - src = project_root / "step-5" / stem / "chunks.json" - if src.exists(): - chunks_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(src, chunks_path) - print(f" → Copiato step-5/{stem}/chunks.json → step-6/{stem}/chunks.json") - else: - print(f" ✗ chunks.json non trovato in step-5/{stem}/ né in step-6/{stem}/ — skip") - return False - - chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8")) - - if not chunks: - print(f" ✗ chunks.json è vuoto") - return False - - # ── Raccogli problemi ────────────────────────────────────────────────────── - - empty_chunks = [c for c in chunks if is_empty(c)] - no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)] - too_short = [c for c in chunks if is_too_short(c, min_chars)] - too_long = [c for c in chunks if is_too_long(c, max_chars)] - incomplete = [c for c in chunks if not is_empty(c) and ends_incomplete(c)] - - # ── Statistiche lunghezze ────────────────────────────────────────────────── - - lengths = [c.get("n_chars", 0) for c in chunks] - n_total = len(chunks) - n_ok = n_total - len(set( - c["chunk_id"] for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete] - for c in lst - )) - min_l = min(lengths) - max_l = max(lengths) - avg_l = int(sum(lengths) / n_total) - - # Distribuzione in fasce - n_under = sum(1 for l in lengths if l < min_chars) - n_normal = sum(1 for l in lengths if min_chars <= l <= max_chars) - n_over = sum(1 for l in lengths if l > max_chars) - - # ── Output ──────────────────────────────────────────────────────────────── - - print(f" Totale chunk: {n_total}") - print(f" ✅ OK: {n_ok}") - print() - print(f" Distribuzione lunghezze:") - print(f" Min: {min_l} char") - print(f" Max: {max_l} char") - print(f" Media: {avg_l} char") - print(f" < {min_chars} char (sotto MIN): {n_under}") - print(f" {min_chars}–{max_chars} char (ideale): {n_normal}") - print(f" > {max_chars} char (sopra MAX): {n_over}") - - has_errors = False - - if empty_chunks: - has_errors = True - print(f"\n 🔴 {len(empty_chunks)} chunk VUOTI:") - for c in empty_chunks[:5]: - print(f" [{c.get('chunk_id', '?')}]") - if len(empty_chunks) > 5: - print(f" ... e altri {len(empty_chunks) - 5}") - - if no_prefix: - has_errors = True - print(f"\n 🔴 {len(no_prefix)} chunk SENZA PREFISSO DI CONTESTO:") - for c in no_prefix[:5]: - print(_fmt_chunk(c)) - if len(no_prefix) > 5: - print(f" ... e altri {len(no_prefix) - 5}") - print(f" → Causa probabile: header ### mancanti o malformati nel MD (step 4)") - - if too_short: - has_errors = True - print(f"\n 🟡 {len(too_short)} chunk SOTTO MIN_CHARS ({min_chars}):") - for c in too_short[:5]: - print(_fmt_chunk(c)) - if len(too_short) > 5: - print(f" ... e altri {len(too_short) - 5}") - print(f" → Causa probabile: sezione isolata senza successivo nello stesso ##") - print(f" → Soluzione: abbassa MIN_CHARS o revisiona il MD (step 4)") - - if too_long: - has_errors = True - print(f"\n 🟡 {len(too_long)} chunk SOPRA MAX_CHARS×1.5 ({int(max_chars * 1.5)}):") - for c in too_long[:5]: - print(_fmt_chunk(c)) - if len(too_long) > 5: - print(f" ... e altri {len(too_long) - 5}") - print(f" → Causa probabile: frase singola molto lunga non spezzabile") - print(f" → Soluzione: alza MAX_CHARS o verifica il testo nel MD (step 4)") - - if incomplete: - has_errors = True - print(f"\n 🔴 {len(incomplete)} chunk CHE FINISCONO SENZA PUNTEGGIATURA (frase spezzata):") - for c in incomplete[:5]: - last_line = c.get("text", "").rstrip().split("\n")[-1][-80:] - print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}") - if len(incomplete) > 5: - print(f" ... e altri {len(incomplete) - 5}") - print(f" → Causa probabile: paragrafo spezzato nel MD") - print(f" → Soluzione: correggi le righe spezzate in conversione/{stem}/clean.md") - - # ── Costruisci e salva report.json ─────────────────────────────────────── - - blockers = empty_chunks + no_prefix + incomplete - warnings = too_short + too_long - - def _chunk_entry(c: dict) -> dict: - return { - "chunk_id": c.get("chunk_id", ""), - "sezione": c.get("sezione", ""), - "titolo": c.get("titolo", ""), - "n_chars": c.get("n_chars", 0), - "last_text": c.get("text", "").rstrip().split("\n")[-1][-120:], - } - - if not blockers: - verdict = "ok" if not warnings else "warnings_only" - else: - verdict = "blocked" - - report = { - "stem": stem, - "verdict": verdict, - "stats": { - "total": n_total, - "ok": n_ok, - "min_chars": min_l, - "max_chars": max_l, - "avg_chars": avg_l, - }, - "thresholds": {"min_chars": min_chars, "max_chars": max_chars}, - "blockers": { - "empty": [_chunk_entry(c) for c in empty_chunks], - "no_prefix": [_chunk_entry(c) for c in no_prefix], - "incomplete": [_chunk_entry(c) for c in incomplete], - }, - "warnings": { - "too_short": [_chunk_entry(c) for c in too_short], - "too_long": [_chunk_entry(c) for c in too_long], - }, - } - - out_dir = project_root / "step-6" / stem - out_dir.mkdir(parents=True, exist_ok=True) - (out_dir / "report.json").write_text( - json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8" - ) - print(f"\n report.json salvato in step-6/{stem}/") - - # ── Prossimi passi ──────────────────────────────────────────────────────── - - print(f"\n {'─' * 50}") - print(f" PROSSIMI PASSI") - print(f" {'─' * 50}") - - if not blockers and not warnings: - print(f" ✅ Tutto OK — procedi alla vettorizzazione:") - print(f" python step-8/ingest.py --stem {stem}") - - elif not blockers: - # Solo 🟡: si può procedere, i warning sono facoltativi - print(f" 🟡 Solo avvisi minori — puoi procedere alla vettorizzazione:") - print(f" python step-8/ingest.py --stem {stem}") - print() - print(f" Oppure, per ottimizzare prima di procedere:") - if too_short: - pct = int(len(too_short) / n_total * 100) - print(f" • {len(too_short)} chunk corti ({pct}% del totale)") - if too_long: - pct = int(len(too_long) / n_total * 100) - print(f" • {len(too_long)} chunk lunghi ({pct}% del totale)") - if too_short or too_long: - print(f" → Esegui: python step-6/fix_chunks.py --stem {stem} --dry-run") - print(f" poi: python step-6/fix_chunks.py --stem {stem}") - print(f" poi: python step-6/verify_chunks.py --stem {stem}") - - else: - # Ci sono 🔴: non si procede - print(f" 🔴 Problemi bloccanti — correggi prima di procedere:") - print() - if empty_chunks: - print(f" • {len(empty_chunks)} chunk vuoti") - print(f" → Controlla conversione/{stem}/clean.md per sezioni prive di testo") - if no_prefix: - print(f" • {len(no_prefix)} chunk senza prefisso di contesto") - print(f" → Controlla che gli header ### siano corretti in conversione/{stem}/clean.md") - if incomplete: - print(f" • {len(incomplete)} chunk con frase spezzata") - print(f" → Esegui: python step-6/fix_chunks.py --stem {stem}") - print() - print(f" Dopo le correzioni, riesegui nell'ordine:") - print(f" python step-5/chunker.py --stem {stem} --force") - print(f" python step-6/verify_chunks.py --stem {stem}") - print() - if warnings: - print(f" 🟡 Hai anche {len(warnings)} avvisi minori — affrontali dopo aver risolto i 🔴.") - - return not blockers - - -# ─── Entry point ────────────────────────────────────────────────────────────── - -if __name__ == "__main__": - project_root = Path(__file__).parent.parent - - parser = argparse.ArgumentParser(description="Step 6 — Verifica chunk") - parser.add_argument("--stem", help="Nome del documento (sottocartella di step-5/)") - parser.add_argument( - "--min", type=int, default=MIN_CHARS, - help=f"Soglia minima caratteri (default: {MIN_CHARS})" - ) - parser.add_argument( - "--max", type=int, default=MAX_CHARS, - help=f"Soglia massima caratteri (default: {MAX_CHARS})" - ) - args = parser.parse_args() - - if args.stem: - stems = [args.stem] - else: - step5_dir = project_root / "step-5" - if not step5_dir.exists(): - print(f"Errore: cartella step-5/ non trovata in {project_root}") - sys.exit(1) - stems = sorted( - p.name for p in step5_dir.iterdir() - if p.is_dir() and (p / "chunks.json").exists() - ) - if not stems: - print("Errore: nessun chunks.json trovato in step-5/") - sys.exit(1) - - results = [verify_stem(s, project_root, args.min, args.max) for s in stems] - - ok = sum(results) - total = len(results) - print(f"\n{'✅' if all(results) else '⚠️ '} {ok}/{total} documenti senza problemi") - sys.exit(0 if all(results) else 1)