#!/usr/bin/env python3 """ Step 8 — Vettorizzazione Legge i chunk prodotti da step-6, genera gli embedding tramite Ollama e li indicizza in ChromaDB (persistente). Il modello di embedding viene letto da config.py (EMBED_MODEL). Puoi sovrascriverlo con --model, ma deve corrispondere al modello che userai in rag.py — altrimenti riesegui con --force dopo aver cambiato. Input: step-6//chunks.json Output: chroma_db/ (collection ChromaDB) Uso: python step-8/ingest.py --stem # singolo documento python step-8/ingest.py # tutti gli stem trovati python step-8/ingest.py --stem --force # sovrascrive collection python step-8/ingest.py --model bge-m3 # override modello """ import argparse import json import sys import time import urllib.error import urllib.request from pathlib import Path import chromadb # ─── Configurazione ──────────────────────────────────────────────────────────── project_root = Path(__file__).parent.parent CHUNKS_DIR = project_root / "step-6" CHROMA_DIR = project_root / "chroma_db" sys.path.insert(0, str(project_root)) from config import EMBED_MODEL, OLLAMA_URL # noqa: E402 EMBED_ENDPOINT = f"{OLLAMA_URL}/api/embeddings" # ─── Ollama ──────────────────────────────────────────────────────────────────── def embed(text: str, model: str) -> list[float]: """Chiama Ollama /api/embeddings e ritorna il vettore.""" payload = json.dumps({"model": model, "prompt": text}).encode() req = urllib.request.Request( EMBED_ENDPOINT, data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=60) as resp: data = json.loads(resp.read()) return data["embedding"] def check_ollama(model: str) -> bool: """Verifica che Ollama sia attivo e che il modello di embedding sia disponibile.""" try: req = urllib.request.Request(f"{OLLAMA_URL}/api/tags", method="GET") with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) models = [m["name"] for m in data.get("models", [])] found = any( m == model or m.startswith(model + ":") for m in models ) if found: print(f"✅ Ollama OK — {model} disponibile") return True print(f"❌ Modello {model} non trovato in Ollama") print(f" → ollama pull {model}") return False except (urllib.error.URLError, OSError): print("❌ Ollama non raggiungibile — assicurati che sia in esecuzione") print(" → ollama serve") return False # ─── ChromaDB ───────────────────────────────────────────────────────────────── def get_client() -> chromadb.PersistentClient: CHROMA_DIR.mkdir(parents=True, exist_ok=True) return chromadb.PersistentClient(path=str(CHROMA_DIR)) def collection_exists(client: chromadb.PersistentClient, stem: str) -> bool: return any(c.name == stem for c in client.list_collections()) # ─── Ingestione ─────────────────────────────────────────────────────────────── def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool: """ Legge step-6//chunks.json, genera embedding e popola ChromaDB. Ritorna True se completato con successo, False altrimenti. """ chunks_path = CHUNKS_DIR / stem / "chunks.json" if not chunks_path.exists(): print(f"❌ File non trovato: {chunks_path}") return False with open(chunks_path, encoding="utf-8") as f: chunks = json.load(f) if not chunks: print(f"⚠️ {stem}: chunks.json è vuoto — skip") return False client = get_client() if collection_exists(client, stem): if not force: print(f"⚠️ Collection '{stem}' già presente in ChromaDB — skip") print(f" → usa --force per sovrascrivere") return True # non è un errore, è uno skip client.delete_collection(stem) print(f"🗑️ Collection '{stem}' rimossa (--force)") collection = client.create_collection( name=stem, metadata={"hnsw:space": "cosine"}, ) total = len(chunks) print(f"📦 {total} chunk da ingestire\n") ids = [] embeddings = [] documents = [] metadatas = [] start = time.monotonic() durations: list[float] = [] for i, chunk in enumerate(chunks, start=1): t0 = time.monotonic() vector = embed(chunk["text"], model) t1 = time.monotonic() durations.append(t1 - t0) ids.append(chunk["chunk_id"]) embeddings.append(vector) documents.append(chunk["text"]) metadatas.append({ "sezione": chunk.get("sezione", ""), "titolo": chunk.get("titolo", ""), "sub_index": chunk.get("sub_index", 0), }) avg = sum(durations) / len(durations) eta = int(avg * (total - i)) done = f"[{i:>{len(str(total))}}/{total}]" cid = chunk["chunk_id"][:50] line = f" {done} ✓ {cid:<50} ETA: {eta}s" print(f"{line:<80}", end="\r", flush=True) # Upsert in batch da 100 per non sovraccaricare la memoria if len(ids) == 100: collection.add( ids=ids, embeddings=embeddings, documents=documents, metadatas=metadatas, ) ids, embeddings, documents, metadatas = [], [], [], [] # Upsert dei rimanenti if ids: collection.add( ids=ids, embeddings=embeddings, documents=documents, metadatas=metadatas, ) elapsed = int(time.monotonic() - start) print() # nuova riga dopo il \r print(f"\n✅ Ingestione completata in {elapsed}s — {total}/{total} chunk salvati") print(f" Collection '{stem}' in {CHROMA_DIR}/") return True # ─── Entry point ────────────────────────────────────────────────────────────── def find_stems() -> list[str]: """Ritorna tutti gli stem che hanno un chunks.json in step-6/.""" return sorted( p.parent.name for p in CHUNKS_DIR.glob("*/chunks.json") ) def main() -> int: parser = argparse.ArgumentParser( description="Step 8 — Vettorizzazione chunk in ChromaDB" ) parser.add_argument("--stem", help="Nome del documento (senza --stem = tutti)") parser.add_argument("--force", action="store_true", help="Sovrascrive la collection se già esistente") parser.add_argument("--model", default=EMBED_MODEL, help=f"Modello embedding Ollama (default da config.py: {EMBED_MODEL})") args = parser.parse_args() print("─── Step 8 — Vettorizzazione ─────────────────────────────────────────\n") if not check_ollama(args.model): return 1 stems = [args.stem] if args.stem else find_stems() if not stems: print("❌ Nessun chunks.json trovato in step-6/") return 1 print() results = [] for stem in stems: if len(stems) > 1: print(f"── {stem} ──") results.append(ingest(stem, force=args.force, model=args.model)) if len(stems) > 1: print() return 0 if all(results) else 1 if __name__ == "__main__": sys.exit(main())