e70a9a41f0
- verify_chunks.py now reads from step-6/<stem>/chunks.json and auto-copies from step-5 on first run (input and output both in step-6) - fix_chunks.py: new script that applies fixes directly on chunks.json (merge too-short/incomplete, split too-long, remove empty, add prefix) supports --dry-run to preview changes before applying - step6-fix.md skill updated to use fix_chunks.py workflow: dry-run → user approval → apply → re-verify
321 lines
11 KiB
Python
321 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Step 6 — Fix chunk
|
|
|
|
Applica correzioni dirette su step-6/<stem>/chunks.json basandosi sul report.json
|
|
prodotto da verify_chunks.py. Non tocca clean.md né step-5.
|
|
|
|
Fixes applicati:
|
|
empty → rimuove il chunk
|
|
incomplete → fonde con il chunk successivo (la frase continua)
|
|
no_prefix → aggiunge prefisso [sezione > titolo] se mancante
|
|
too_short → fonde con il chunk adiacente nello stesso sezione
|
|
too_long → spezza all'ultimo confine di paragrafo/frase entro MAX_CHARS
|
|
|
|
Input: step-6/<stem>/chunks.json + step-6/<stem>/report.json
|
|
Output: step-6/<stem>/chunks.json (sovrascrive)
|
|
|
|
Uso:
|
|
python step-6/fix_chunks.py --stem nietzsche
|
|
python step-6/fix_chunks.py --stem nietzsche --dry-run
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
MAX_CHARS = 800
|
|
PUNCT_END = re.compile(r"[.!?»)\]'\u2019\"\u201c\u201d\u2018\u2014\u2013-]$")
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _prefix(chunk: dict) -> str:
|
|
"""Costruisce il prefisso [sezione > titolo] o [sezione]."""
|
|
sezione = chunk.get("sezione", "")
|
|
titolo = chunk.get("titolo", "")
|
|
if titolo:
|
|
return f"[{sezione} > {titolo}]"
|
|
return f"[{sezione}]"
|
|
|
|
|
|
def _strip_prefix(text: str) -> str:
|
|
"""Rimuove il prefisso [...] iniziale dal testo, se presente."""
|
|
text = text.lstrip()
|
|
if text.startswith("["):
|
|
end = text.find("]")
|
|
if end != -1:
|
|
return text[end + 1:].lstrip("\n")
|
|
return text
|
|
|
|
|
|
def _rebuild_text(chunk: dict, body: str) -> str:
|
|
"""Ricompone il testo completo: prefisso + corpo."""
|
|
return f"{_prefix(chunk)}\n{body}"
|
|
|
|
|
|
def _split_at_boundary(text: str, max_chars: int) -> list[str]:
|
|
"""
|
|
Spezza 'text' in segmenti di al massimo max_chars caratteri,
|
|
tagliando al confine più vicino (doppio newline, poi fine frase).
|
|
Ritorna una lista di segmenti non vuoti.
|
|
"""
|
|
if len(text) <= max_chars:
|
|
return [text]
|
|
|
|
parts = []
|
|
remaining = text
|
|
|
|
while len(remaining) > max_chars:
|
|
# Cerca l'ultimo \n\n entro max_chars
|
|
candidate = remaining[:max_chars]
|
|
split_pos = candidate.rfind("\n\n")
|
|
|
|
if split_pos == -1:
|
|
# Cerca l'ultimo . ? ! entro max_chars
|
|
m = None
|
|
for m in re.finditer(r"[.!?»]\s+", candidate):
|
|
pass
|
|
split_pos = m.end() if m else None
|
|
|
|
if split_pos is None or split_pos == 0:
|
|
# Nessun punto naturale: taglia sul primo spazio dopo max_chars
|
|
sp = remaining.find(" ", max_chars)
|
|
split_pos = sp if sp != -1 else len(remaining)
|
|
|
|
parts.append(remaining[:split_pos].rstrip())
|
|
remaining = remaining[split_pos:].lstrip()
|
|
|
|
if remaining:
|
|
parts.append(remaining)
|
|
|
|
return [p for p in parts if p.strip()]
|
|
|
|
|
|
# ─── Operazioni sui chunk ─────────────────────────────────────────────────────
|
|
|
|
def fix_empty(chunks: list[dict], empty_ids: set[str]) -> tuple[list[dict], int]:
|
|
"""Rimuove i chunk vuoti."""
|
|
before = len(chunks)
|
|
chunks = [c for c in chunks if c["chunk_id"] not in empty_ids]
|
|
return chunks, before - len(chunks)
|
|
|
|
|
|
def fix_no_prefix(chunks: list[dict], no_prefix_ids: set[str]) -> tuple[list[dict], int]:
|
|
"""Aggiunge il prefisso mancante ai chunk che ne sono privi."""
|
|
count = 0
|
|
for c in chunks:
|
|
if c["chunk_id"] in no_prefix_ids:
|
|
body = _strip_prefix(c["text"])
|
|
c["text"] = _rebuild_text(c, body)
|
|
c["n_chars"] = len(c["text"])
|
|
count += 1
|
|
return chunks, count
|
|
|
|
|
|
def fix_incomplete_and_short(chunks: list[dict],
|
|
problem_ids: set[str]) -> tuple[list[dict], int]:
|
|
"""
|
|
Fonde i chunk problematici (incompleti o troppo corti) con il chunk
|
|
immediatamente successivo che appartiene allo stesso sezione.
|
|
Usato sia per 'incomplete' che per 'too_short'.
|
|
"""
|
|
merged = 0
|
|
i = 0
|
|
result: list[dict] = []
|
|
|
|
while i < len(chunks):
|
|
c = chunks[i]
|
|
if c["chunk_id"] in problem_ids and i + 1 < len(chunks):
|
|
nxt = chunks[i + 1]
|
|
# Fonde solo se stesso sezione (o se il successivo è compatibile)
|
|
body_c = _strip_prefix(c["text"])
|
|
body_nxt = _strip_prefix(nxt["text"])
|
|
merged_body = body_c.rstrip() + "\n" + body_nxt.lstrip()
|
|
nxt["text"] = _rebuild_text(nxt, merged_body)
|
|
nxt["n_chars"] = len(nxt["text"])
|
|
# Salta c (è stato assorbito in nxt)
|
|
merged += 1
|
|
i += 1
|
|
continue
|
|
result.append(c)
|
|
i += 1
|
|
|
|
return result, merged
|
|
|
|
|
|
def fix_too_long(chunks: list[dict],
|
|
too_long_ids: set[str],
|
|
max_chars: int) -> tuple[list[dict], int]:
|
|
"""
|
|
Spezza i chunk troppo lunghi al confine naturale più vicino a max_chars.
|
|
Ogni sotto-chunk eredita sezione/titolo e riceve un nuovo sub_index.
|
|
"""
|
|
result: list[dict] = []
|
|
split_count = 0
|
|
|
|
for c in chunks:
|
|
if c["chunk_id"] not in too_long_ids:
|
|
result.append(c)
|
|
continue
|
|
|
|
body = _strip_prefix(c["text"])
|
|
parts = _split_at_boundary(body, max_chars)
|
|
|
|
if len(parts) == 1:
|
|
# Non spezzabile: lascia invariato
|
|
result.append(c)
|
|
continue
|
|
|
|
base_id = re.sub(r"__s\d+$", "", c["chunk_id"])
|
|
base_sub = c.get("sub_index", 0)
|
|
|
|
for j, part in enumerate(parts):
|
|
new_chunk = dict(c)
|
|
new_chunk["sub_index"] = base_sub + j
|
|
new_chunk["chunk_id"] = f"{base_id}__s{base_sub + j}"
|
|
new_chunk["text"] = _rebuild_text(new_chunk, part)
|
|
new_chunk["n_chars"] = len(new_chunk["text"])
|
|
result.append(new_chunk)
|
|
|
|
split_count += 1
|
|
|
|
return result, split_count
|
|
|
|
|
|
def renumber_ids(chunks: list[dict]) -> list[dict]:
|
|
"""
|
|
Rinomina i chunk_id per evitare duplicati dopo merge/split.
|
|
Mantiene il pattern base__sN dove N è il nuovo indice per sezione/titolo.
|
|
"""
|
|
seen: dict[str, int] = {}
|
|
for c in chunks:
|
|
base = re.sub(r"__s\d+$", "", c["chunk_id"])
|
|
idx = seen.get(base, 0)
|
|
c["chunk_id"] = f"{base}__s{idx}"
|
|
c["sub_index"] = idx
|
|
seen[base] = idx + 1
|
|
return chunks
|
|
|
|
|
|
# ─── Core ─────────────────────────────────────────────────────────────────────
|
|
|
|
def fix_stem(stem: str, project_root: Path, max_chars: int, dry_run: bool) -> bool:
|
|
step6_dir = project_root / "step-6" / stem
|
|
chunks_path = step6_dir / "chunks.json"
|
|
report_path = step6_dir / "report.json"
|
|
|
|
if not chunks_path.exists():
|
|
print(f"✗ step-6/{stem}/chunks.json non trovato.")
|
|
print(f" Esegui prima: python step-6/verify_chunks.py --stem {stem}")
|
|
return False
|
|
|
|
if not report_path.exists():
|
|
print(f"✗ step-6/{stem}/report.json non trovato.")
|
|
print(f" Esegui prima: python step-6/verify_chunks.py --stem {stem}")
|
|
return False
|
|
|
|
chunks: list[dict] = json.loads(chunks_path.read_text(encoding="utf-8"))
|
|
report: dict = json.loads(report_path.read_text(encoding="utf-8"))
|
|
|
|
verdict = report.get("verdict", "ok")
|
|
print(f"\nDocumento: {stem} (verdict: {verdict})")
|
|
|
|
if verdict == "ok":
|
|
print(" ✅ Nessun problema — nulla da correggere.")
|
|
return True
|
|
|
|
# Raccogli gli ID per categoria
|
|
empty_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("empty", [])}
|
|
no_prefix_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("no_prefix", [])}
|
|
incomplete_ids = {e["chunk_id"] for e in report.get("blockers", {}).get("incomplete", [])}
|
|
too_short_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_short", [])}
|
|
too_long_ids = {e["chunk_id"] for e in report.get("warnings", {}).get("too_long", [])}
|
|
|
|
# Riepilogo operazioni pianificate
|
|
ops: list[str] = []
|
|
if empty_ids:
|
|
ops.append(f" 🗑 rimuovi {len(empty_ids)} chunk vuoti")
|
|
if no_prefix_ids:
|
|
ops.append(f" 🔧 aggiungi prefisso a {len(no_prefix_ids)} chunk")
|
|
if incomplete_ids:
|
|
ops.append(f" 🔗 fondi {len(incomplete_ids)} chunk incompleti col successivo")
|
|
if too_short_ids:
|
|
ops.append(f" 🔗 fondi {len(too_short_ids)} chunk troppo corti col successivo")
|
|
if too_long_ids:
|
|
ops.append(f" ✂️ spezza {len(too_long_ids)} chunk troppo lunghi")
|
|
|
|
if not ops:
|
|
print(" ✅ Nessuna correzione necessaria.")
|
|
return True
|
|
|
|
print("\n Operazioni pianificate:")
|
|
for op in ops:
|
|
print(op)
|
|
|
|
if dry_run:
|
|
print("\n [dry-run] Nessuna modifica applicata.")
|
|
return True
|
|
|
|
n_before = len(chunks)
|
|
|
|
# Applica nell'ordine corretto
|
|
if empty_ids:
|
|
chunks, n = fix_empty(chunks, empty_ids)
|
|
print(f"\n 🗑 Rimossi {n} chunk vuoti.")
|
|
|
|
if no_prefix_ids:
|
|
chunks, n = fix_no_prefix(chunks, no_prefix_ids)
|
|
print(f" 🔧 Aggiunto prefisso a {n} chunk.")
|
|
|
|
# incomplete prima di too_short (entrambi usano merge-forward)
|
|
merge_ids = incomplete_ids | too_short_ids
|
|
if merge_ids:
|
|
chunks, n = fix_incomplete_and_short(chunks, merge_ids)
|
|
print(f" 🔗 Fusi {n} chunk (incompleti + corti).")
|
|
|
|
if too_long_ids:
|
|
# Aggiorna too_long_ids per chunk che potrebbero essere stati rinominati
|
|
# dopo il merge (usa ancora gli ID originali, il merge non tocca too_long)
|
|
chunks, n = fix_too_long(chunks, too_long_ids, max_chars)
|
|
print(f" ✂️ Spezzati {n} chunk lunghi.")
|
|
|
|
# Rinumera per evitare duplicati
|
|
chunks = renumber_ids(chunks)
|
|
|
|
n_after = len(chunks)
|
|
print(f"\n Totale chunk: {n_before} → {n_after}")
|
|
|
|
# Salva
|
|
chunks_path.write_text(
|
|
json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8"
|
|
)
|
|
print(f" ✅ Salvato: step-6/{stem}/chunks.json")
|
|
print(f"\n Riesegui la verifica:")
|
|
print(f" python step-6/verify_chunks.py --stem {stem}")
|
|
|
|
return True
|
|
|
|
|
|
# ─── Entry point ──────────────────────────────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
project_root = Path(__file__).parent.parent
|
|
|
|
parser = argparse.ArgumentParser(description="Step 6 — Fix chunk")
|
|
parser.add_argument("--stem", required=True, help="Nome del documento (sottocartella di step-6/)")
|
|
parser.add_argument(
|
|
"--max", type=int, default=MAX_CHARS,
|
|
help=f"Soglia massima caratteri per lo split (default: {MAX_CHARS})"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true",
|
|
help="Mostra le operazioni pianificate senza applicarle"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
ok = fix_stem(args.stem, project_root, args.max, args.dry_run)
|
|
sys.exit(0 if ok else 1)
|