253 lines
9.0 KiB
Python
253 lines
9.0 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Pipeline RAG interattiva
|
||
|
|
|
||
|
|
Riceve una domanda, recupera i chunk più rilevanti da ChromaDB (retrieval)
|
||
|
|
e genera una risposta tramite Ollama (generation).
|
||
|
|
|
||
|
|
Input: chroma_db/<stem> (collection ChromaDB)
|
||
|
|
Output: risposta a schermo
|
||
|
|
|
||
|
|
Uso:
|
||
|
|
python rag.py --stem <nome>
|
||
|
|
|
||
|
|
Nel loop interattivo:
|
||
|
|
Domanda: <testo> → risposta
|
||
|
|
Domanda: <testo> -v → risposta + chunk recuperati
|
||
|
|
Domanda: exit → uscita
|
||
|
|
"""
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import sys
|
||
|
|
import urllib.error
|
||
|
|
import urllib.request
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import chromadb
|
||
|
|
|
||
|
|
# ─── Configurazione ───────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
||
|
|
import config as _cfg
|
||
|
|
|
||
|
|
project_root = Path(__file__).parent
|
||
|
|
CHROMA_DIR = project_root / "chroma_db"
|
||
|
|
|
||
|
|
OLLAMA_URL = _cfg.OLLAMA_URL
|
||
|
|
EMBED_MODEL = _cfg.EMBED_MODEL
|
||
|
|
LLM_MODEL = _cfg.OLLAMA_MODEL
|
||
|
|
TOP_K = _cfg.TOP_K
|
||
|
|
TEMPERATURE = _cfg.TEMPERATURE
|
||
|
|
NO_THINK = _cfg.NO_THINK
|
||
|
|
SYSTEM_PROMPT = _cfg.SYSTEM_PROMPT
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Embedding ────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def embed(text: str) -> list[float]:
|
||
|
|
"""Genera il vettore della domanda tramite Ollama."""
|
||
|
|
payload = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode()
|
||
|
|
req = urllib.request.Request(
|
||
|
|
f"{OLLAMA_URL}/api/embeddings",
|
||
|
|
data=payload,
|
||
|
|
headers={"Content-Type": "application/json"},
|
||
|
|
method="POST",
|
||
|
|
)
|
||
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
||
|
|
return json.loads(resp.read())["embedding"]
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Generazione ──────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def call_ollama(prompt: str, system: str = "") -> str:
|
||
|
|
"""Chiama Ollama /api/generate e ritorna la risposta."""
|
||
|
|
payload = json.dumps({
|
||
|
|
"model": LLM_MODEL,
|
||
|
|
"system": system,
|
||
|
|
"prompt": prompt,
|
||
|
|
"stream": False,
|
||
|
|
"think": not NO_THINK,
|
||
|
|
"options": {"temperature": TEMPERATURE},
|
||
|
|
}).encode()
|
||
|
|
req = urllib.request.Request(
|
||
|
|
f"{OLLAMA_URL}/api/generate",
|
||
|
|
data=payload,
|
||
|
|
headers={"Content-Type": "application/json"},
|
||
|
|
method="POST",
|
||
|
|
)
|
||
|
|
with urllib.request.urlopen(req, timeout=300) as resp:
|
||
|
|
return json.loads(resp.read())["response"].strip()
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Retrieval ────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def retrieve(collection: chromadb.Collection, question: str) -> list[dict]:
|
||
|
|
"""
|
||
|
|
Genera l'embedding della domanda e recupera i TOP_K chunk più simili.
|
||
|
|
Ritorna lista di dict con chiavi: text, sezione, titolo, distance.
|
||
|
|
"""
|
||
|
|
vector = embed(question)
|
||
|
|
results = collection.query(
|
||
|
|
query_embeddings=[vector],
|
||
|
|
n_results=TOP_K,
|
||
|
|
include=["documents", "metadatas", "distances"],
|
||
|
|
)
|
||
|
|
chunks = []
|
||
|
|
for text, meta, dist in zip(
|
||
|
|
results["documents"][0],
|
||
|
|
results["metadatas"][0],
|
||
|
|
results["distances"][0],
|
||
|
|
):
|
||
|
|
chunks.append({
|
||
|
|
"text": text,
|
||
|
|
"sezione": meta.get("sezione", ""),
|
||
|
|
"titolo": meta.get("titolo", ""),
|
||
|
|
"distance": dist,
|
||
|
|
})
|
||
|
|
return chunks
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Prompt ───────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def build_prompt(question: str, chunks: list[dict]) -> str:
|
||
|
|
"""Ritorna (system, user_prompt) separati per l'API Ollama."""
|
||
|
|
context_parts = []
|
||
|
|
for i, c in enumerate(chunks, start=1):
|
||
|
|
header = f"[Contesto {i}"
|
||
|
|
if c["sezione"]:
|
||
|
|
header += f" — {c['sezione']}"
|
||
|
|
if c["titolo"]:
|
||
|
|
header += f" > {c['titolo']}"
|
||
|
|
header += "]"
|
||
|
|
context_parts.append(f"{header}\n{c['text']}")
|
||
|
|
|
||
|
|
context = "\n\n".join(context_parts)
|
||
|
|
user_prompt = f"{context}\n\nDomanda: {question}"
|
||
|
|
return SYSTEM_PROMPT, user_prompt
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Loop interattivo ─────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def answer(question: str, collection: chromadb.Collection, verbose: bool) -> None:
|
||
|
|
try:
|
||
|
|
chunks = retrieve(collection, question)
|
||
|
|
except (urllib.error.URLError, OSError) as e:
|
||
|
|
print(f"❌ Errore embedding: {e}")
|
||
|
|
return
|
||
|
|
|
||
|
|
if verbose:
|
||
|
|
print("\n── Chunk recuperati ──────────────────────────────────────────")
|
||
|
|
for i, c in enumerate(chunks, start=1):
|
||
|
|
loc = c["sezione"]
|
||
|
|
if c["titolo"]:
|
||
|
|
loc += f" > {c['titolo']}"
|
||
|
|
sim = 1 - c["distance"]
|
||
|
|
print(f" [{i}] {loc} (similarità: {sim:.3f})")
|
||
|
|
print(f" {c['text'][:120].replace(chr(10), ' ')}...")
|
||
|
|
print("──────────────────────────────────────────────────────────────\n")
|
||
|
|
|
||
|
|
system, prompt = build_prompt(question, chunks)
|
||
|
|
|
||
|
|
try:
|
||
|
|
response = call_ollama(prompt, system=system)
|
||
|
|
except (urllib.error.URLError, OSError) as e:
|
||
|
|
print(f"❌ Errore generazione: {e}")
|
||
|
|
return
|
||
|
|
|
||
|
|
print(f"\n{response}\n")
|
||
|
|
|
||
|
|
|
||
|
|
def run_loop(collection: chromadb.Collection) -> None:
|
||
|
|
print("── Loop RAG ─────────────────────────────────────── (exit per uscire)\n")
|
||
|
|
while True:
|
||
|
|
try:
|
||
|
|
raw = input("Domanda: ").strip()
|
||
|
|
except (EOFError, KeyboardInterrupt):
|
||
|
|
print("\nUscita.")
|
||
|
|
break
|
||
|
|
|
||
|
|
if not raw:
|
||
|
|
continue
|
||
|
|
if raw.lower() == "exit":
|
||
|
|
break
|
||
|
|
|
||
|
|
verbose = raw.endswith(" -v")
|
||
|
|
question = raw[:-3].strip() if verbose else raw
|
||
|
|
|
||
|
|
answer(question, collection, verbose)
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Entry point ──────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def _build_epilog() -> str:
|
||
|
|
lines = [
|
||
|
|
"Uso:",
|
||
|
|
" python rag.py --stem <nome>",
|
||
|
|
"",
|
||
|
|
"Loop interattivo:",
|
||
|
|
" <domanda> risposta basata sul documento",
|
||
|
|
" <domanda> -v risposta + chunk recuperati con score di similarità",
|
||
|
|
" exit termina",
|
||
|
|
]
|
||
|
|
if CHROMA_DIR.exists():
|
||
|
|
try:
|
||
|
|
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
|
||
|
|
names = [c.name for c in client.list_collections()]
|
||
|
|
if names:
|
||
|
|
lines += ["", f"Collection disponibili: {', '.join(names)}"]
|
||
|
|
else:
|
||
|
|
lines += ["", "Nessuna collection trovata — eseguire prima: python step-8/ingest.py"]
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description=(
|
||
|
|
"Pipeline RAG interattiva\n\n"
|
||
|
|
"Risponde a domande in linguaggio naturale su un documento\n"
|
||
|
|
"indicizzato in ChromaDB da step-8/ingest.py."
|
||
|
|
),
|
||
|
|
epilog=_build_epilog(),
|
||
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--stem",
|
||
|
|
required=True,
|
||
|
|
help=(
|
||
|
|
"Nome della collection ChromaDB da interrogare. "
|
||
|
|
"Le collection vengono create da: python step-8/ingest.py --stem <nome>"
|
||
|
|
),
|
||
|
|
)
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
print("─── Pipeline RAG ────────────────────────────────────────────\n")
|
||
|
|
print(f" Documento : {args.stem}")
|
||
|
|
print(f" Modello : {LLM_MODEL}")
|
||
|
|
print(f" Top-K : {TOP_K}")
|
||
|
|
print(f" Thinking : {'off' if NO_THINK else 'on'}")
|
||
|
|
print()
|
||
|
|
|
||
|
|
if not CHROMA_DIR.exists():
|
||
|
|
print("❌ chroma_db/ non trovata — esegui prima step-8")
|
||
|
|
return 1
|
||
|
|
|
||
|
|
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
|
||
|
|
collections = [c.name for c in client.list_collections()]
|
||
|
|
if args.stem not in collections:
|
||
|
|
print(f"❌ Collection '{args.stem}' non trovata in chroma_db/")
|
||
|
|
print(f" → python step-8/ingest.py --stem {args.stem}")
|
||
|
|
return 1
|
||
|
|
|
||
|
|
collection = client.get_collection(args.stem)
|
||
|
|
print(f"✅ Collection '{args.stem}' caricata ({collection.count()} chunk)\n")
|
||
|
|
|
||
|
|
run_loop(collection)
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|