diff --git a/.claude/commands/step6-fix.md b/.claude/commands/post-chunk.md similarity index 59% rename from .claude/commands/step6-fix.md rename to .claude/commands/post-chunk.md index 66a88a4..9a6ce50 100644 --- a/.claude/commands/step6-fix.md +++ b/.claude/commands/post-chunk.md @@ -1,25 +1,23 @@ --- -description: Verifica i chunk di step 5, mostra i problemi, propone e applica le fix tramite fix_chunks.py con ri-verifica automatica finale. +description: Perfeziona i chunk di un documento (verifica, dry-run, fix, ri-verifica) e li prepara per la vettorizzazione. allowed-tools: Read Bash Grep argument-hint: --- -## Passo 0 β€” Verifica fresca (sempre) +## Passo 0 β€” Verifica fresca -Esegui sempre `verify_chunks.py` per avere un report aggiornato (non fidarti di un report.json preesistente): +Esegui sempre `verify_chunks.py` per un report aggiornato: ```bash -source .venv/bin/activate && python step-6/verify_chunks.py --stem $ARGUMENTS +source .venv/bin/activate && python chunks/verify_chunks.py --stem $ARGUMENTS ``` ---- - Leggi il report appena generato: !`python3 -c " import json, sys try: - r = json.load(open('step-6/$ARGUMENTS/report.json')) + r = json.load(open('chunks/$ARGUMENTS/report.json')) v = r.get('verdict','?') s = r.get('stats', {}) t = r.get('thresholds', {}) @@ -35,7 +33,7 @@ try: print(f' πŸ”΄ {label}: {len(items)}') for c in items[:3]: print(f' [{c[\"chunk_id\"]}] {c[\"n_chars\"]} char β†’ {c[\"last_text\"][-60:]!r}') - for cat, label in [('too_short','Troppo corti'), ('too_long','Troppo lunghi')]: + for cat, label in [('too_short','Troppo corti'), ('too_long','Troppo lunghi'), ('incomplete_math','Math incompleto')]: items = wa.get(cat, []) if items: print(f' 🟑 {label}: {len(items)}') @@ -48,14 +46,14 @@ except Exception as e: print(f'ERRORE lettura report: {e}') ## Se verdict == "ok" -βœ… Nessun problema. Comunica: +βœ… Nessun problema bloccante. Comunica: ``` -βœ… Chunk puliti β€” procedi con la vettorizzazione: +βœ… Chunk pronti β€” procedi con la vettorizzazione: python step-8/ingest.py --stem $ARGUMENTS ``` -Fermati qui. Non eseguire nessun altro passo. +Se ci sono solo 🟑, spiega brevemente i warning e chiedi se l'utente vuole risolverli prima o procedere. --- @@ -64,15 +62,16 @@ Fermati qui. Non eseguire nessun altro passo. ### Passo 1 β€” Dry-run ```bash -source .venv/bin/activate && python step-6/fix_chunks.py --stem $ARGUMENTS --dry-run +source .venv/bin/activate && python chunks/fix_chunks.py --stem $ARGUMENTS --dry-run ``` Spiega in italiano ogni operazione pianificata: -- **rimuovi chunk vuoti** β€” chunk privi di testo, non contribuiscono al retrieval -- **aggiungi prefisso** β€” il prefisso `[sezione > titolo]` fornisce contesto all'embedding; senza, il chunk Γ¨ semanticamente decontestualizzato + +- **rimuovi chunk vuoti** β€” privi di testo, non contribuiscono al retrieval +- **aggiungi prefisso** β€” `[sezione > titolo]` fornisce contesto all'embedding; senza, il chunk Γ¨ decontestualizzato - **fondi incompleti** β€” frase spezzata a metΓ : il chunk corrente e il successivo formano una frase unica -- **fondi troppo corti** β€” chunk sotto MIN_CHARS: troppo brevi per portare informazione semantica utile -- **spezza troppo lunghi** β€” chunk sopra MAX_CHARSΓ—1.5: troppo densi, degradano la precision del retrieval +- **fondi troppo corti** β€” sotto MIN_CHARS: troppo brevi per portare informazione semantica utile +- **spezza troppo lunghi** β€” sopra MAX_CHARSΓ—1.5: troppo densi, degradano la precision del retrieval Se ci sono solo 🟑 (nessun πŸ”΄), informa che si puΓ² procedere anche senza fix e chiedi la preferenza. @@ -85,16 +84,16 @@ Applica solo su risposta affermativa esplicita. ### Passo 3 β€” Applica ```bash -source .venv/bin/activate && python step-6/fix_chunks.py --stem $ARGUMENTS +source .venv/bin/activate && python chunks/fix_chunks.py --stem $ARGUMENTS ``` ### Passo 4 β€” Ri-verifica automatica ```bash -source .venv/bin/activate && python step-6/verify_chunks.py --stem $ARGUMENTS +source .venv/bin/activate && python chunks/verify_chunks.py --stem $ARGUMENTS ``` -Leggi il nuovo `step-6/$ARGUMENTS/report.json` e riporta: +Leggi il nuovo `chunks/$ARGUMENTS/report.json` e riporta: - Nuovo verdict - Delta chunk (N prima β†’ N dopo) - Problemi residui se presenti @@ -104,17 +103,17 @@ Leggi il nuovo `step-6/$ARGUMENTS/report.json` e riporta: Se verdict finale Γ¨ `ok` o `warnings_only` senza πŸ”΄: ``` -βœ… Chunk pronti in step-6/$ARGUMENTS/chunks.json +βœ… Chunk pronti in chunks/$ARGUMENTS/chunks.json Procedi con la vettorizzazione: python step-8/ingest.py --stem $ARGUMENTS ``` -Se rimangono πŸ”΄ dopo il fix (raro β€” testo non spezzabile o struttura anomala): +Se rimangono πŸ”΄ dopo il fix (testo non spezzabile o struttura anomala nel sorgente): ``` πŸ”΄ X problemi residui non risolvibili automaticamente. - Torna a step-4/$ARGUMENTS/clean.md e correggi manualmente le sezioni indicate, + Torna a conversione/$ARGUMENTS/clean.md e correggi manualmente le sezioni indicate, poi riesegui nell'ordine: - python step-5/chunker.py --stem $ARGUMENTS --force - python step-6/verify_chunks.py --stem $ARGUMENTS + python chunks/chunker.py --stem $ARGUMENTS --force + python chunks/verify_chunks.py --stem $ARGUMENTS ``` diff --git a/step-5/chunker.py b/chunks/chunker.py similarity index 80% rename from step-5/chunker.py rename to chunks/chunker.py index b5ba539..188d95a 100644 --- a/step-5/chunker.py +++ b/chunks/chunker.py @@ -6,12 +6,12 @@ Divide il Markdown revisionato in chunk semantici pronti per la vettorizzazione. La strategia dipende dal profilo strutturale del documento. Input: conversione//clean.md + conversione//structure_profile.json -Output: step-5//chunks.json +Output: chunks//chunks.json Uso: - python step-5/chunker.py # tutti i documenti in conversione/ - python step-5/chunker.py --stem documento # un solo documento - python step-5/chunker.py --stem documento --force + python chunks/chunker.py # tutti i documenti in conversione/ + python chunks/chunker.py --stem documento # un solo documento + python chunks/chunker.py --stem documento --force """ import argparse @@ -31,27 +31,54 @@ OVERLAP_S = 2 # frasi di overlap tra sotto-chunk dello stesso boundary # ─── UtilitΓ  ────────────────────────────────────────────────────────────────── def split_sentences(text: str) -> list[str]: - """ - Divide il testo in frasi senza spezzare abbreviazioni comuni. - Split su punteggiatura finale (.!?Β») seguita da spazio + lettera maiuscola. - """ - # Split conservativo: solo quando la punteggiatura Γ¨ seguita da spazio - # e la parola successiva inizia in maiuscolo (o Γ¨ fine stringa). parts = re.split(r'(?<=[.!?Β»])\s+(?=[A-ZΓ€ΓˆΓ‰ΓŒΓ’Γ™A-Z\"])', text.strip()) - # Se non trova nulla con maiuscola, usa split semplice if len(parts) <= 1: parts = re.split(r'(?<=[.!?Β»])\s+', text.strip()) return [p.strip() for p in parts if p.strip()] def slugify(s: str, max_len: int = 60) -> str: - """Converti una stringa in slug per chunk_id.""" s = s.lower() s = re.sub(r'[^\w\s-]', '', s) s = re.sub(r'[\s_-]+', '_', s).strip('_') return s[:max_len] if s else "section" +_SENT_BOUNDARY = re.compile(r"[.!?Β»)\]'\u2019\"\u201c\u201d/:|\u2026]$") + + +def _flush_chunk( + current: list[str], + sentences: list[str], + i: int, + prefix: str, + sezione: str, + titolo: str, + sub_index: int, + max_chars: int, +) -> tuple[dict, list[str], int, int]: + """Emette un chunk, estendendo fino a un confine di frase (max +20%).""" + hard_limit = int(max_chars * 1.2) + current_len = sum(len(s) + 1 for s in current) + while i < len(sentences) and not _SENT_BOUNDARY.search(" ".join(current)): + nxt = sentences[i] + if current_len + len(nxt) + 1 > hard_limit: + break + current.append(nxt) + current_len += len(nxt) + 1 + i += 1 + chunk_text = prefix + " ".join(current) + chunk = { + "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", + "text": chunk_text, + "sezione": sezione, + "titolo": titolo, + "sub_index": sub_index, + "n_chars": len(chunk_text), + } + return chunk, current, i, sub_index + 1 + + def make_sub_chunks( body: str, prefix: str, @@ -60,11 +87,6 @@ def make_sub_chunks( max_chars: int, overlap_s: int, ) -> list[dict]: - """ - Suddivide un body in sotto-chunk rispettando max_chars. - Aggiunge overlap_s frasi di overlap tra sotto-chunk consecutivi. - Non attraversa mai i confini del body. - """ sentences = split_sentences(body) if not sentences: return [] @@ -77,29 +99,19 @@ def make_sub_chunks( i = 0 while i < len(sentences): sent = sentences[i] - # +1 per lo spazio di separazione if not current or current_len + len(sent) + 1 <= max_chars: current.append(sent) current_len += len(sent) + (1 if len(current) > 1 else 0) i += 1 else: - # Flush del chunk corrente - chunk_text = prefix + " ".join(current) - chunks.append({ - "chunk_id": f"{slugify(sezione)}__{slugify(titolo)}__s{sub_index}", - "text": chunk_text, - "sezione": sezione, - "titolo": titolo, - "sub_index": sub_index, - "n_chars": len(chunk_text), - }) - sub_index += 1 - # Overlap: riparti dalle ultime overlap_s frasi + chunk, current, i, sub_index = _flush_chunk( + current, sentences, i, prefix, sezione, titolo, sub_index, max_chars + ) + chunks.append(chunk) overlap = current[-overlap_s:] if overlap_s and len(current) > overlap_s else [] current = overlap[:] current_len = sum(len(s) + 1 for s in current) - # Flush delle frasi rimanenti if current: chunk_text = prefix + " ".join(current) chunks.append({ @@ -117,10 +129,6 @@ def make_sub_chunks( # ─── Parser Markdown ────────────────────────────────────────────────────────── def parse_h3_sections(text: str) -> list[dict]: - """ - Parsa il documento in sezioni (sezione h2, titolo h3, body). - Testo prima del primo header viene assegnato a sezione vuota. - """ sections = [] current_h2 = "" current_h3 = "" @@ -137,7 +145,6 @@ def parse_h3_sections(text: str) -> list[dict]: for line in text.splitlines(): if re.match(r"^# ", line): - # h1 = titolo documento, non crea sezione flush() current_h2 = line[2:].strip() current_h3 = "" @@ -159,7 +166,6 @@ def parse_h3_sections(text: str) -> list[dict]: def parse_h2_sections(text: str) -> list[dict]: - """Parsa il documento in sezioni h2 con il loro testo completo.""" sections = [] current_h2 = "" current_body_lines: list[str] = [] @@ -188,15 +194,8 @@ def parse_h2_sections(text: str) -> list[dict]: # ─── Strategie di chunking ──────────────────────────────────────────────────── def chunk_h3_aware(text: str, stem: str) -> list[dict]: - """ - Strategia h3_aware: boundary su ###. - Sezioni piccole (< MIN_CHARS) vengono accorpate alla successiva - purchΓ© appartengano allo stesso ## padre. - Sezioni grandi (> MAX_CHARS) vengono suddivise su frasi. - """ sections = parse_h3_sections(text) - # Merge greedy: accorpa al successivo se stesso h2 e body piccolo merged: list[dict] = [] pending: dict | None = None @@ -220,7 +219,6 @@ def chunk_h3_aware(text: str, stem: str) -> list[dict]: if pending: merged.append(pending) - # Genera chunk con eventuale split su frasi chunks = [] for sec in merged: sezione = sec["sezione"] or stem @@ -235,10 +233,6 @@ def chunk_h3_aware(text: str, stem: str) -> list[dict]: def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: - """ - Strategia h2_paragraph_split: boundary su ##. - All'interno di ogni ## i paragrafi vengono usati come sotto-unitΓ . - """ sections = parse_h2_sections(text) chunks = [] @@ -247,14 +241,12 @@ def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: body = sec["body"] prefix = f"[{sezione}]\n" - # Suddividi in paragrafi interni (righe vuote doppie) paragraphs = [ p.strip() for p in re.split(r"\n{2,}", body) if p.strip() and not re.match(r"^#+\s", p.strip()) ] - # Merge paragrafi piccoli merged_pars: list[str] = [] pending = "" for par in paragraphs: @@ -277,9 +269,6 @@ def chunk_h2_paragraph_split(text: str, stem: str) -> list[dict]: def chunk_paragraph(text: str, stem: str) -> list[dict]: - """ - Strategia paragraph: boundary su paragrafo (doppia riga vuota). - """ paragraphs = [ p.strip() for p in re.split(r"\n{2,}", text) @@ -287,7 +276,6 @@ def chunk_paragraph(text: str, stem: str) -> list[dict]: ] prefix = f"[Documento: {stem}]\n" - # Merge paragrafi piccoli merged: list[str] = [] pending = "" for par in paragraphs: @@ -311,10 +299,6 @@ def chunk_paragraph(text: str, stem: str) -> list[dict]: def chunk_sliding_window(text: str, stem: str) -> list[dict]: - """ - Strategia sliding_window: finestre di MAX_CHARS con OVERLAP_S frasi di overlap. - Usata per testi piatti senza struttura (livello 0). - """ sentences = split_sentences(text) prefix = f"[Documento: {stem}]\n" @@ -349,7 +333,6 @@ def chunk_sliding_window(text: str, stem: str) -> list[dict]: "n_chars": len(chunk_text), }) win_idx += 1 - # Avanza di (window_size - overlap), almeno 1 i += max(1, len(window) - OVERLAP_S) return chunks @@ -375,11 +358,11 @@ def chunk_document(clean_md: Path, profile: dict, stem: str) -> list[dict]: # ─── Per-document processing ────────────────────────────────────────────────── def process_stem(stem: str, project_root: Path, force: bool) -> bool: - conv_dir = project_root / "conversione" / stem - out_dir = project_root / "step-5" / stem - clean_md = conv_dir / "clean.md" + conv_dir = project_root / "conversione" / stem + out_dir = project_root / "chunks" / stem + clean_md = conv_dir / "clean.md" profile_path = conv_dir / "structure_profile.json" - out_file = out_dir / "chunks.json" + out_file = out_dir / "chunks.json" print(f"\nDocumento: {stem}") @@ -395,7 +378,7 @@ def process_stem(stem: str, project_root: Path, force: bool) -> bool: print(f" (usa --force per rieseguire)") return True - profile = json.loads(profile_path.read_text(encoding="utf-8")) + profile = json.loads(profile_path.read_text(encoding="utf-8")) strategia = profile.get("strategia_chunking", "paragraph") print(f" Strategia: {strategia}") @@ -423,7 +406,7 @@ def process_stem(stem: str, project_root: Path, force: bool) -> bool: print(f" ⚠️ {short} chunk sotto MIN_CHARS ({MIN_CHARS})") if long_: print(f" ⚠️ {long_} chunk sopra MAX_CHARSΓ—1.5 ({int(MAX_CHARS * 1.5)})") - print(f" βœ… chunks.json salvato in step-5/{stem}/") + print(f" βœ… chunks.json salvato in chunks/{stem}/") return True @@ -444,14 +427,17 @@ if __name__ == "__main__": if not conv_dir.exists(): print(f"Errore: cartella conversione/ non trovata in {project_root}") sys.exit(1) - stems = sorted(p.name for p in conv_dir.iterdir() if p.is_dir() and (p / "clean.md").exists()) + stems = sorted( + p.name for p in conv_dir.iterdir() + if p.is_dir() and (p / "clean.md").exists() + ) if not stems: print(f"Errore: nessun documento trovato in conversione/") sys.exit(1) results = [process_stem(s, project_root, args.force) for s in stems] - ok = sum(results) + ok = sum(results) total = len(results) print(f"\n{'βœ…' if all(results) else '⚠️ '} {ok}/{total} documenti processati") sys.exit(0 if all(results) else 1) diff --git a/step-6/fix_chunks.py b/chunks/fix_chunks.py similarity index 73% rename from step-6/fix_chunks.py rename to chunks/fix_chunks.py index 448bbc0..e817e51 100644 --- a/step-6/fix_chunks.py +++ b/chunks/fix_chunks.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 """ -Step 6 β€” Fix chunk +Fix chunk -Applica correzioni dirette su step-6//chunks.json basandosi sul report.json -prodotto da verify_chunks.py. Non tocca clean.md nΓ© step-5. +Applica correzioni dirette su chunks//chunks.json basandosi sul +report.json prodotto da verify_chunks.py. Non tocca clean.md. Fixes applicati: empty β†’ rimuove il chunk @@ -12,12 +12,12 @@ Fixes applicati: too_short β†’ fonde con il chunk adiacente nello stesso sezione too_long β†’ spezza all'ultimo confine di paragrafo/frase entro MAX_CHARS -Input: step-6//chunks.json + step-6//report.json -Output: step-6//chunks.json (sovrascrive) +Input: chunks//chunks.json + chunks//report.json +Output: chunks//chunks.json (sovrascrive) Uso: - python step-6/fix_chunks.py --stem nietzsche - python step-6/fix_chunks.py --stem nietzsche --dry-run + python chunks/fix_chunks.py --stem documento + python chunks/fix_chunks.py --stem documento --dry-run """ import argparse @@ -33,7 +33,6 @@ PUNCT_END = re.compile(r"[.!?Β»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$") # ─── Helpers ────────────────────────────────────────────────────────────────── def _prefix(chunk: dict) -> str: - """Costruisce il prefisso [sezione > titolo] o [sezione].""" sezione = chunk.get("sezione", "") titolo = chunk.get("titolo", "") if titolo: @@ -42,7 +41,6 @@ def _prefix(chunk: dict) -> str: def _strip_prefix(text: str) -> str: - """Rimuove il prefisso [...] iniziale dal testo, se presente.""" text = text.lstrip() if text.startswith("["): end = text.find("]") @@ -52,16 +50,10 @@ def _strip_prefix(text: str) -> str: def _rebuild_text(chunk: dict, body: str) -> str: - """Ricompone il testo completo: prefisso + corpo.""" return f"{_prefix(chunk)}\n{body}" def _split_at_boundary(text: str, max_chars: int) -> list[str]: - """ - Spezza 'text' in segmenti di al massimo max_chars caratteri, - tagliando al confine piΓΉ vicino (doppio newline, poi fine frase). - Ritorna una lista di segmenti non vuoti. - """ if len(text) <= max_chars: return [text] @@ -69,19 +61,16 @@ def _split_at_boundary(text: str, max_chars: int) -> list[str]: remaining = text while len(remaining) > max_chars: - # Cerca l'ultimo \n\n entro max_chars candidate = remaining[:max_chars] split_pos = candidate.rfind("\n\n") if split_pos == -1: - # Cerca l'ultimo . ? ! entro max_chars m = None for m in re.finditer(r"[.!?Β»]\s+", candidate): pass split_pos = m.end() if m else None if split_pos is None or split_pos == 0: - # Nessun punto naturale: taglia sul primo spazio dopo max_chars sp = remaining.find(" ", max_chars) split_pos = sp if sp != -1 else len(remaining) @@ -97,14 +86,12 @@ def _split_at_boundary(text: str, max_chars: int) -> list[str]: # ─── Operazioni sui chunk ───────────────────────────────────────────────────── def fix_empty(chunks: list[dict], empty_ids: set[str]) -> tuple[list[dict], int]: - """Rimuove i chunk vuoti.""" before = len(chunks) chunks = [c for c in chunks if c["chunk_id"] not in empty_ids] return chunks, before - len(chunks) def fix_no_prefix(chunks: list[dict], no_prefix_ids: set[str]) -> tuple[list[dict], int]: - """Aggiunge il prefisso mancante ai chunk che ne sono privi.""" count = 0 for c in chunks: if c["chunk_id"] in no_prefix_ids: @@ -117,11 +104,6 @@ def fix_no_prefix(chunks: list[dict], no_prefix_ids: set[str]) -> tuple[list[dic def fix_incomplete_and_short(chunks: list[dict], problem_ids: set[str]) -> tuple[list[dict], int]: - """ - Fonde i chunk problematici (incompleti o troppo corti) con il chunk - immediatamente successivo che appartiene allo stesso sezione. - Usato sia per 'incomplete' che per 'too_short'. - """ merged = 0 i = 0 result: list[dict] = [] @@ -130,13 +112,11 @@ def fix_incomplete_and_short(chunks: list[dict], c = chunks[i] if c["chunk_id"] in problem_ids and i + 1 < len(chunks): nxt = chunks[i + 1] - # Fonde solo se stesso sezione (o se il successivo Γ¨ compatibile) body_c = _strip_prefix(c["text"]) body_nxt = _strip_prefix(nxt["text"]) merged_body = body_c.rstrip() + "\n" + body_nxt.lstrip() - nxt["text"] = _rebuild_text(nxt, merged_body) + nxt["text"] = _rebuild_text(nxt, merged_body) nxt["n_chars"] = len(nxt["text"]) - # Salta c (Γ¨ stato assorbito in nxt) merged += 1 i += 1 continue @@ -149,10 +129,6 @@ def fix_incomplete_and_short(chunks: list[dict], def fix_too_long(chunks: list[dict], too_long_ids: set[str], max_chars: int) -> tuple[list[dict], int]: - """ - Spezza i chunk troppo lunghi al confine naturale piΓΉ vicino a max_chars. - Ogni sotto-chunk eredita sezione/titolo e riceve un nuovo sub_index. - """ result: list[dict] = [] split_count = 0 @@ -161,15 +137,14 @@ def fix_too_long(chunks: list[dict], result.append(c) continue - body = _strip_prefix(c["text"]) + body = _strip_prefix(c["text"]) parts = _split_at_boundary(body, max_chars) if len(parts) == 1: - # Non spezzabile: lascia invariato result.append(c) continue - base_id = re.sub(r"__s\d+$", "", c["chunk_id"]) + base_id = re.sub(r"__s\d+$", "", c["chunk_id"]) base_sub = c.get("sub_index", 0) for j, part in enumerate(parts): @@ -186,15 +161,11 @@ def fix_too_long(chunks: list[dict], def renumber_ids(chunks: list[dict]) -> list[dict]: - """ - Rinomina i chunk_id per evitare duplicati dopo merge/split. - Mantiene il pattern base__sN dove N Γ¨ il nuovo indice per sezione/titolo. - """ seen: dict[str, int] = {} for c in chunks: base = re.sub(r"__s\d+$", "", c["chunk_id"]) - idx = seen.get(base, 0) - c["chunk_id"] = f"{base}__s{idx}" + idx = seen.get(base, 0) + c["chunk_id"] = f"{base}__s{idx}" c["sub_index"] = idx seen[base] = idx + 1 return chunks @@ -203,18 +174,18 @@ def renumber_ids(chunks: list[dict]) -> list[dict]: # ─── Core ───────────────────────────────────────────────────────────────────── def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bool: - step6_dir = project_root / "step-6" / stem - chunks_path = step6_dir / "chunks.json" - report_path = step6_dir / "report.json" + stem_dir = project_root / "chunks" / stem + chunks_path = stem_dir / "chunks.json" + report_path = stem_dir / "report.json" if not chunks_path.exists(): - print(f"βœ— step-6/{stem}/chunks.json non trovato.") - print(f" Esegui prima: python step-6/verify_chunks.py --stem {stem}") + print(f"βœ— chunks/{stem}/chunks.json non trovato.") + print(f" Esegui prima: python chunks/chunker.py --stem {stem}") return False if not report_path.exists(): - print(f"βœ— step-6/{stem}/report.json non trovato.") - print(f" Esegui prima: python step-6/verify_chunks.py --stem {stem}") + print(f"βœ— chunks/{stem}/report.json non trovato.") + print(f" Esegui prima: python chunks/verify_chunks.py --stem {stem}") return False chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8")) @@ -227,14 +198,12 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo print(" βœ… Nessun problema β€” nulla da correggere.") return True - # Raccogli gli ID per categoria empty_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])} no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])} incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])} too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])} too_long_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_long", [])} - # Riepilogo operazioni pianificate ops: list[str] = [] if empty_ids: ops.append(f" πŸ—‘ rimuovi {len(empty_ids)} chunk vuoti") @@ -261,7 +230,6 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo n_before = len(chunks) - # Applica nell'ordine corretto if empty_ids: chunks, n = fix_empty(chunks, empty_ids) print(f"\n πŸ—‘ Rimossi {n} chunk vuoti.") @@ -270,31 +238,26 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo chunks, n = fix_no_prefix(chunks, no_prefix_ids) print(f" πŸ”§ Aggiunto prefisso a {n} chunk.") - # incomplete prima di too_short (entrambi usano merge-forward) merge_ids = incomplete_ids | too_short_ids if merge_ids: chunks, n = fix_incomplete_and_short(chunks, merge_ids) print(f" πŸ”— Fusi {n} chunk (incompleti + corti).") if too_long_ids: - # Aggiorna too_long_ids per chunk che potrebbero essere stati rinominati - # dopo il merge (usa ancora gli ID originali, il merge non tocca too_long) chunks, n = fix_too_long(chunks, too_long_ids, max_chars) print(f" βœ‚οΈ Spezzati {n} chunk lunghi.") - # Rinumera per evitare duplicati chunks = renumber_ids(chunks) n_after = len(chunks) print(f"\n Totale chunk: {n_before} β†’ {n_after}") - # Salva chunks_path.write_text( json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8" ) - print(f" βœ… Salvato: step-6/{stem}/chunks.json") + print(f" βœ… Salvato: chunks/{stem}/chunks.json") print(f"\n Riesegui la verifica:") - print(f" python step-6/verify_chunks.py --stem {stem}") + print(f" python chunks/verify_chunks.py --stem {stem}") return True @@ -304,8 +267,8 @@ def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bo if __name__ == "__main__": project_root = Path(__file__).parent.parent - parser = argparse.ArgumentParser(description="Step 6 β€” Fix chunk") - parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di step-6/)") + parser = argparse.ArgumentParser(description="Fix chunk") + parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di chunks/)") parser.add_argument( "--max", type=int, default=MAX_CHARS, help=f"Soglia massima caratteri per lo split (default: {MAX_CHARS})" diff --git a/step-6/verify_chunks.py b/chunks/verify_chunks.py similarity index 64% rename from step-6/verify_chunks.py rename to chunks/verify_chunks.py index cf881af..d682748 100644 --- a/step-6/verify_chunks.py +++ b/chunks/verify_chunks.py @@ -1,18 +1,17 @@ #!/usr/bin/env python3 """ -Step 6 β€” Verifica chunk +Verifica chunk -Analizza step-5//chunks.json e segnala ogni anomalia che potrebbe -degradare la qualitΓ  del retrieval. Non modifica nulla: se ci sono problemi -torna allo step 4 (revisione MD) o aggiusta i parametri dello step 5. +Analizza chunks//chunks.json e segnala ogni anomalia che potrebbe +degradare la qualitΓ  del retrieval. Non modifica nulla. -Input: step-5//chunks.json -Output: report a schermo + step-6//report.json + exit code (0 = OK, 1 = problemi) +Input: chunks//chunks.json +Output: report a schermo + chunks//report.json + exit code (0 = OK, 1 = problemi) Uso: - python step-6/verify_chunks.py --stem documento - python step-6/verify_chunks.py # tutti i documenti in step-5/ - python step-6/verify_chunks.py --min 200 --max 800 + python chunks/verify_chunks.py --stem documento + python chunks/verify_chunks.py # tutti i documenti in chunks/ + python chunks/verify_chunks.py --min 200 --max 800 """ import argparse @@ -22,17 +21,24 @@ import sys from pathlib import Path -# ─── Soglie (devono coincidere con quelle usate in chunker.py) ──────────────── +# ─── Soglie ─────────────────────────────────────────────────────────────────── MIN_CHARS = 200 MAX_CHARS = 800 -PUNCT_END = re.compile("[.!?Β»)\\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026-]$") +PUNCT_END = re.compile( + r"[.!?Β»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013\u2026]$" + r"|/$" # URL che finisce con / + r"|\|$" # riga di tabella Markdown + r"|:$" # introduzione a lista o formula +) +_HEX_END = re.compile(r"[0-9a-fA-F]{8,}$") +_URL_TAIL = re.compile(r"https?://\S+(\s+\S+){0,3}$") # URL con fino a 3 token extra +_MATH_SYMS = re.compile(r"[βˆˆβˆ‘β‰€β‰₯β‰ βˆ€βˆƒβˆ«βˆšβˆžβˆ‚Β±Γ—Γ·β†’β†β†”βŠ‚βŠƒβŠ†βŠ‡βˆ©βˆͺΒ·Β°]") # ─── Checks ─────────────────────────────────────────────────────────────────── def has_prefix(chunk: dict) -> bool: - """Il chunk inizia con il prefisso di contesto '[...'.""" return chunk.get("text", "").lstrip().startswith("[") @@ -49,44 +55,44 @@ def is_too_long(chunk: dict, max_chars: int) -> bool: def ends_incomplete(chunk: dict) -> bool: - """L'ultima riga di testo non termina con punteggiatura di fine frase.""" text = chunk.get("text", "").rstrip() if not text: return False - # Rimuovi marcatori markdown finali (_ e *) prima di controllare: - # pattern come _parola._ o _parola!_ sono frasi complete. text_check = re.sub(r"[_*]+$", "", text).rstrip() if not text_check: return False - return not PUNCT_END.search(text_check) + if PUNCT_END.search(text_check): + return False + if _HEX_END.search(text_check): # hash SHA / codice hex + return False + if _URL_TAIL.search(text_check[-200:]): # URL (con eventuale path dopo spazio) + return False + return True + + +def is_math_incomplete(chunk: dict) -> bool: + """Incompleto ma in contesto matematico β€” degrada a warning invece di blocker.""" + return ends_incomplete(chunk) and len(_MATH_SYMS.findall(chunk.get("text", ""))) >= 3 # ─── Report ─────────────────────────────────────────────────────────────────── def _fmt_chunk(c: dict) -> str: - cid = c.get("chunk_id", "?") - n = c.get("n_chars", 0) + cid = c.get("chunk_id", "?") + n = c.get("n_chars", 0) preview = c.get("text", "")[:60].replace("\n", " ") return f" [{cid}] ({n} char) Β«{preview}Β»" def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) -> bool: - """Verifica i chunk di un documento. Ritorna True se nessun bloccante rilevato.""" - import shutil - - chunks_path = project_root / "step-6" / stem / "chunks.json" + chunks_path = project_root / "chunks" / stem / "chunks.json" print(f"\nDocumento: {stem}") if not chunks_path.exists(): - src = project_root / "step-5" / stem / "chunks.json" - if src.exists(): - chunks_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(src, chunks_path) - print(f" β†’ Copiato step-5/{stem}/chunks.json β†’ step-6/{stem}/chunks.json") - else: - print(f" βœ— chunks.json non trovato in step-5/{stem}/ nΓ© in step-6/{stem}/ β€” skip") - return False + print(f" βœ— chunks/{stem}/chunks.json non trovato") + print(f" Esegui prima: python chunks/chunker.py --stem {stem}") + return False chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8")) @@ -96,25 +102,27 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - # ── Raccogli problemi ────────────────────────────────────────────────────── - empty_chunks = [c for c in chunks if is_empty(c)] - no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)] - too_short = [c for c in chunks if is_too_short(c, min_chars)] - too_long = [c for c in chunks if is_too_long(c, max_chars)] - incomplete = [c for c in chunks if not is_empty(c) and ends_incomplete(c)] + empty_chunks = [c for c in chunks if is_empty(c)] + no_prefix = [c for c in chunks if not is_empty(c) and not has_prefix(c)] + too_short = [c for c in chunks if is_too_short(c, min_chars)] + too_long = [c for c in chunks if is_too_long(c, max_chars)] + _incomplete_all = [c for c in chunks if not is_empty(c) and ends_incomplete(c)] + incomplete_math = [c for c in _incomplete_all if is_math_incomplete(c)] + incomplete = [c for c in _incomplete_all if not is_math_incomplete(c)] - # ── Statistiche lunghezze ────────────────────────────────────────────────── + # ── Statistiche ─────────────────────────────────────────────────────────── lengths = [c.get("n_chars", 0) for c in chunks] n_total = len(chunks) n_ok = n_total - len(set( - c["chunk_id"] for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete] + c["chunk_id"] + for lst in [empty_chunks, no_prefix, too_short, too_long, incomplete] for c in lst )) - min_l = min(lengths) - max_l = max(lengths) - avg_l = int(sum(lengths) / n_total) + min_l = min(lengths) + max_l = max(lengths) + avg_l = int(sum(lengths) / n_total) - # Distribuzione in fasce n_under = sum(1 for l in lengths if l < min_chars) n_normal = sum(1 for l in lengths if min_chars <= l <= max_chars) n_over = sum(1 for l in lengths if l > max_chars) @@ -149,7 +157,7 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - print(_fmt_chunk(c)) if len(no_prefix) > 5: print(f" ... e altri {len(no_prefix) - 5}") - print(f" β†’ Causa probabile: header ### mancanti o malformati nel MD (step 4)") + print(f" β†’ Causa probabile: header ### mancanti o malformati nel MD") if too_short: has_errors = True @@ -158,8 +166,7 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - print(_fmt_chunk(c)) if len(too_short) > 5: print(f" ... e altri {len(too_short) - 5}") - print(f" β†’ Causa probabile: sezione isolata senza successivo nello stesso ##") - print(f" β†’ Soluzione: abbassa MIN_CHARS o revisiona il MD (step 4)") + print(f" β†’ Soluzione: abbassa MIN_CHARS o revisiona il MD") if too_long: has_errors = True @@ -168,8 +175,7 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - print(_fmt_chunk(c)) if len(too_long) > 5: print(f" ... e altri {len(too_long) - 5}") - print(f" β†’ Causa probabile: frase singola molto lunga non spezzabile") - print(f" β†’ Soluzione: alza MAX_CHARS o verifica il testo nel MD (step 4)") + print(f" β†’ Soluzione: alza MAX_CHARS o verifica il testo nel MD") if incomplete: has_errors = True @@ -179,30 +185,38 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}") if len(incomplete) > 5: print(f" ... e altri {len(incomplete) - 5}") - print(f" β†’ Causa probabile: paragrafo spezzato nel MD") print(f" β†’ Soluzione: correggi le righe spezzate in conversione/{stem}/clean.md") - # ── Costruisci e salva report.json ─────────────────────────────────────── + if incomplete_math: + has_errors = True + print(f"\n 🟑 {len(incomplete_math)} chunk MATEMATICI SENZA PUNTEGGIATURA (formula/espressione):") + for c in incomplete_math[:3]: + last_line = c.get("text", "").rstrip().split("\n")[-1][-80:] + print(f" [{c.get('chunk_id', '?')}] ...{last_line!r}") + if len(incomplete_math) > 3: + print(f" ... e altri {len(incomplete_math) - 3}") + print(f" β†’ Le formule non finiscono con punteggiatura β€” avviso non bloccante") + + # ── Costruisci e salva report.json ──────────────────────────────────────── blockers = empty_chunks + no_prefix + incomplete - warnings = too_short + too_long + warnings = too_short + too_long + incomplete_math def _chunk_entry(c: dict) -> dict: return { - "chunk_id": c.get("chunk_id", ""), - "sezione": c.get("sezione", ""), - "titolo": c.get("titolo", ""), - "n_chars": c.get("n_chars", 0), + "chunk_id": c.get("chunk_id", ""), + "sezione": c.get("sezione", ""), + "titolo": c.get("titolo", ""), + "n_chars": c.get("n_chars", 0), "last_text": c.get("text", "").rstrip().split("\n")[-1][-120:], } - if not blockers: - verdict = "ok" if not warnings else "warnings_only" - else: - verdict = "blocked" + verdict = "ok" if not blockers else "blocked" + if not blockers and warnings: + verdict = "warnings_only" report = { - "stem": stem, + "stem": stem, "verdict": verdict, "stats": { "total": n_total, @@ -218,17 +232,18 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - "incomplete": [_chunk_entry(c) for c in incomplete], }, "warnings": { - "too_short": [_chunk_entry(c) for c in too_short], - "too_long": [_chunk_entry(c) for c in too_long], + "too_short": [_chunk_entry(c) for c in too_short], + "too_long": [_chunk_entry(c) for c in too_long], + "incomplete_math": [_chunk_entry(c) for c in incomplete_math], }, } - out_dir = project_root / "step-6" / stem + out_dir = project_root / "chunks" / stem out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "report.json").write_text( json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8" ) - print(f"\n report.json salvato in step-6/{stem}/") + print(f"\n report.json salvato in chunks/{stem}/") # ── Prossimi passi ──────────────────────────────────────────────────────── @@ -241,11 +256,10 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - print(f" python step-8/ingest.py --stem {stem}") elif not blockers: - # Solo 🟑: si puΓ² procedere, i warning sono facoltativi print(f" 🟑 Solo avvisi minori β€” puoi procedere alla vettorizzazione:") print(f" python step-8/ingest.py --stem {stem}") print() - print(f" Oppure, per ottimizzare prima di procedere:") + print(f" Oppure, per ottimizzare prima:") if too_short: pct = int(len(too_short) / n_total * 100) print(f" β€’ {len(too_short)} chunk corti ({pct}% del totale)") @@ -253,12 +267,11 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - pct = int(len(too_long) / n_total * 100) print(f" β€’ {len(too_long)} chunk lunghi ({pct}% del totale)") if too_short or too_long: - print(f" β†’ Esegui: python step-6/fix_chunks.py --stem {stem} --dry-run") - print(f" poi: python step-6/fix_chunks.py --stem {stem}") - print(f" poi: python step-6/verify_chunks.py --stem {stem}") + print(f" β†’ Esegui: python chunks/fix_chunks.py --stem {stem} --dry-run") + print(f" poi: python chunks/fix_chunks.py --stem {stem}") + print(f" poi: python chunks/verify_chunks.py --stem {stem}") else: - # Ci sono πŸ”΄: non si procede print(f" πŸ”΄ Problemi bloccanti β€” correggi prima di procedere:") print() if empty_chunks: @@ -269,11 +282,11 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - print(f" β†’ Controlla che gli header ### siano corretti in conversione/{stem}/clean.md") if incomplete: print(f" β€’ {len(incomplete)} chunk con frase spezzata") - print(f" β†’ Esegui: python step-6/fix_chunks.py --stem {stem}") + print(f" β†’ Esegui: python chunks/fix_chunks.py --stem {stem}") print() print(f" Dopo le correzioni, riesegui nell'ordine:") - print(f" python step-5/chunker.py --stem {stem} --force") - print(f" python step-6/verify_chunks.py --stem {stem}") + print(f" python chunks/chunker.py --stem {stem} --force") + print(f" python chunks/verify_chunks.py --stem {stem}") print() if warnings: print(f" 🟑 Hai anche {len(warnings)} avvisi minori β€” affrontali dopo aver risolto i πŸ”΄.") @@ -286,8 +299,8 @@ def verify_stem(stem: str, project_root: Path, min_chars: int, max_chars: int) - if __name__ == "__main__": project_root = Path(__file__).parent.parent - parser = argparse.ArgumentParser(description="Step 6 β€” Verifica chunk") - parser.add_argument("--stem", help="Nome del documento (sottocartella di step-5/)") + parser = argparse.ArgumentParser(description="Verifica chunk") + parser.add_argument("--stem", help="Nome del documento (sottocartella di chunks/)") parser.add_argument( "--min", type=int, default=MIN_CHARS, help=f"Soglia minima caratteri (default: {MIN_CHARS})" @@ -301,16 +314,16 @@ if __name__ == "__main__": if args.stem: stems = [args.stem] else: - step5_dir = project_root / "step-5" - if not step5_dir.exists(): - print(f"Errore: cartella step-5/ non trovata in {project_root}") + chunks_dir = project_root / "chunks" + if not chunks_dir.exists(): + print(f"Errore: cartella chunks/ non trovata in {project_root}") sys.exit(1) stems = sorted( - p.name for p in step5_dir.iterdir() + p.name for p in chunks_dir.iterdir() if p.is_dir() and (p / "chunks.json").exists() ) if not stems: - print("Errore: nessun chunks.json trovato in step-5/") + print("Errore: nessun chunks.json trovato in chunks/") sys.exit(1) results = [verify_stem(s, project_root, args.min, args.max) for s in stems]