#!/usr/bin/env python3 """ Step 9 — Pipeline RAG interattiva Riceve una domanda, recupera i chunk più rilevanti da ChromaDB (retrieval) e genera una risposta tramite Ollama (generation). Input: chroma_db/ (collection ChromaDB) Output: risposta a schermo Uso: python step-9/rag.py --stem Nel loop interattivo: Domanda: → risposta Domanda: -v → risposta + chunk recuperati Domanda: exit → uscita """ import argparse import json import sys import urllib.error import urllib.request from pathlib import Path import chromadb # ─── Configurazione ─────────────────────────────────────────────────────────── sys.path.insert(0, str(Path(__file__).parent)) import config as _cfg project_root = Path(__file__).parent.parent CHROMA_DIR = project_root / "chroma_db" OLLAMA_URL = _cfg.OLLAMA_URL EMBED_MODEL = _cfg.EMBED_MODEL LLM_MODEL = _cfg.OLLAMA_MODEL TOP_K = _cfg.TOP_K TEMPERATURE = _cfg.TEMPERATURE NO_THINK = _cfg.NO_THINK SYSTEM_PROMPT = _cfg.SYSTEM_PROMPT # ─── Embedding ──────────────────────────────────────────────────────────────── def embed(text: str) -> list[float]: """Genera il vettore della domanda tramite Ollama.""" payload = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode() req = urllib.request.Request( f"{OLLAMA_URL}/api/embeddings", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read())["embedding"] # ─── Generazione ────────────────────────────────────────────────────────────── def call_ollama(prompt: str, system: str = "") -> str: """Chiama Ollama /api/generate e ritorna la risposta.""" payload = json.dumps({ "model": LLM_MODEL, "system": system, "prompt": prompt, "stream": False, "think": not NO_THINK, "options": {"temperature": TEMPERATURE}, }).encode() req = urllib.request.Request( f"{OLLAMA_URL}/api/generate", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=300) as resp: return json.loads(resp.read())["response"].strip() # ─── Retrieval ──────────────────────────────────────────────────────────────── def retrieve(collection: chromadb.Collection, question: str) -> list[dict]: """ Genera l'embedding della domanda e recupera i TOP_K chunk più simili. Ritorna lista di dict con chiavi: text, sezione, titolo, distance. """ vector = embed(question) results = collection.query( query_embeddings=[vector], n_results=TOP_K, include=["documents", "metadatas", "distances"], ) chunks = [] for text, meta, dist in zip( results["documents"][0], results["metadatas"][0], results["distances"][0], ): chunks.append({ "text": text, "sezione": meta.get("sezione", ""), "titolo": meta.get("titolo", ""), "distance": dist, }) return chunks # ─── Prompt ─────────────────────────────────────────────────────────────────── def build_prompt(question: str, chunks: list[dict]) -> str: """Ritorna (system, user_prompt) separati per l'API Ollama.""" context_parts = [] for i, c in enumerate(chunks, start=1): header = f"[Contesto {i}" if c["sezione"]: header += f" — {c['sezione']}" if c["titolo"]: header += f" > {c['titolo']}" header += "]" context_parts.append(f"{header}\n{c['text']}") context = "\n\n".join(context_parts) user_prompt = f"{context}\n\nDomanda: {question}" return SYSTEM_PROMPT, user_prompt # ─── Loop interattivo ───────────────────────────────────────────────────────── def answer(question: str, collection: chromadb.Collection, verbose: bool) -> None: try: chunks = retrieve(collection, question) except (urllib.error.URLError, OSError) as e: print(f"❌ Errore embedding: {e}") return if verbose: print("\n── Chunk recuperati ──────────────────────────────────────────") for i, c in enumerate(chunks, start=1): loc = c["sezione"] if c["titolo"]: loc += f" > {c['titolo']}" sim = 1 - c["distance"] print(f" [{i}] {loc} (similarità: {sim:.3f})") print(f" {c['text'][:120].replace(chr(10), ' ')}...") print("──────────────────────────────────────────────────────────────\n") system, prompt = build_prompt(question, chunks) try: response = call_ollama(prompt, system=system) except (urllib.error.URLError, OSError) as e: print(f"❌ Errore generazione: {e}") return print(f"\n{response}\n") def run_loop(collection: chromadb.Collection) -> None: print("── Loop RAG ─────────────────────────────────────── (exit per uscire)\n") while True: try: raw = input("Domanda: ").strip() except (EOFError, KeyboardInterrupt): print("\nUscita.") break if not raw: continue if raw.lower() == "exit": break verbose = raw.endswith(" -v") question = raw[:-3].strip() if verbose else raw answer(question, collection, verbose) # ─── Entry point ────────────────────────────────────────────────────────────── def _build_epilog() -> str: lines = [ "Uso:", " python step-9/rag.py --stem ", "", "Loop interattivo:", " risposta basata sul documento", " -v risposta + chunk recuperati con score di similarità", " exit termina", ] if CHROMA_DIR.exists(): try: client = chromadb.PersistentClient(path=str(CHROMA_DIR)) names = [c.name for c in client.list_collections()] if names: lines += ["", f"Collection disponibili: {', '.join(names)}"] else: lines += ["", "Nessuna collection trovata — eseguire prima: python step-8/ingest.py"] except Exception: pass return "\n".join(lines) def main() -> int: parser = argparse.ArgumentParser( description=( "Step 9 — Pipeline RAG interattiva\n\n" "Risponde a domande in linguaggio naturale su un documento\n" "indicizzato in ChromaDB da step-8/ingest.py." ), epilog=_build_epilog(), formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--stem", required=True, help=( "Nome della collection ChromaDB da interrogare. " "Le collection vengono create da: python step-8/ingest.py --stem " ), ) args = parser.parse_args() print("─── Step 9 — Pipeline RAG ────────────────────────────────────────────\n") print(f" Documento : {args.stem}") print(f" Modello : {LLM_MODEL}") print(f" Top-K : {TOP_K}") print(f" Thinking : {'off' if NO_THINK else 'on'}") print() if not CHROMA_DIR.exists(): print("❌ chroma_db/ non trovata — esegui prima step-8") return 1 client = chromadb.PersistentClient(path=str(CHROMA_DIR)) collections = [c.name for c in client.list_collections()] if args.stem not in collections: print(f"❌ Collection '{args.stem}' non trovata in chroma_db/") print(f" → python step-8/ingest.py --stem {args.stem}") return 1 collection = client.get_collection(args.stem) print(f"✅ Collection '{args.stem}' caricata ({collection.count()} chunk)\n") run_loop(collection) return 0 if __name__ == "__main__": sys.exit(main())