8d972fa7c6
Aggiunge la possibilità di unire più documenti in una singola collection ChromaDB, con chunk_id prefissati per stem e metadato source per filtrare. - ingest.py: --stems doc1 doc2 --collection nome (nuovo), --stem (invariato) - rag.py / retrieve.py: --collection, source nei chunk, verbose mostra [source] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
276 lines
10 KiB
Python
276 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Step 8 — Vettorizzazione
|
|
|
|
Legge i chunk prodotti da chunks/, genera gli embedding tramite Ollama
|
|
e li indicizza in ChromaDB (persistente).
|
|
|
|
Il modello di embedding viene letto da config.py (EMBED_MODEL).
|
|
Puoi sovrascriverlo con --model, ma deve corrispondere al modello che
|
|
userai in rag.py — altrimenti riesegui con --force dopo aver cambiato.
|
|
|
|
Input: chunks/<stem>/chunks.json
|
|
Output: chroma_db/<stem> (collection ChromaDB)
|
|
|
|
Uso:
|
|
python ingestion/ingest.py --stem <nome> # singolo documento
|
|
python ingestion/ingest.py # tutti gli stem trovati
|
|
python ingestion/ingest.py --stem <nome> --force # sovrascrive collection
|
|
python ingestion/ingest.py --model bge-m3 # override modello
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
import chromadb
|
|
|
|
# ─── Configurazione ────────────────────────────────────────────────────────────
|
|
|
|
project_root = Path(__file__).parent.parent
|
|
|
|
CHUNKS_DIR = project_root / "chunks"
|
|
CHROMA_DIR = project_root / "chroma_db"
|
|
|
|
sys.path.insert(0, str(project_root))
|
|
from config import EMBED_MODEL, OLLAMA_URL # noqa: E402
|
|
|
|
EMBED_ENDPOINT = f"{OLLAMA_URL}/api/embeddings"
|
|
|
|
|
|
# ─── Ollama ────────────────────────────────────────────────────────────────────
|
|
|
|
def embed(text: str, model: str) -> list[float]:
|
|
"""Chiama Ollama /api/embeddings e ritorna il vettore."""
|
|
payload = json.dumps({"model": model, "prompt": text}).encode()
|
|
req = urllib.request.Request(
|
|
EMBED_ENDPOINT,
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=60) as resp:
|
|
data = json.loads(resp.read())
|
|
return data["embedding"]
|
|
|
|
|
|
def check_ollama(model: str) -> bool:
|
|
"""Verifica che Ollama sia attivo e che il modello di embedding sia disponibile."""
|
|
try:
|
|
req = urllib.request.Request(f"{OLLAMA_URL}/api/tags", method="GET")
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
data = json.loads(resp.read())
|
|
models = [m["name"] for m in data.get("models", [])]
|
|
found = any(
|
|
m == model or m.startswith(model + ":")
|
|
for m in models
|
|
)
|
|
if found:
|
|
print(f"✅ Ollama OK — {model} disponibile")
|
|
return True
|
|
print(f"❌ Modello {model} non trovato in Ollama")
|
|
print(f" → ollama pull {model}")
|
|
return False
|
|
except (urllib.error.URLError, OSError):
|
|
print("❌ Ollama non raggiungibile — assicurati che sia in esecuzione")
|
|
print(" → ollama serve")
|
|
return False
|
|
|
|
|
|
# ─── ChromaDB ─────────────────────────────────────────────────────────────────
|
|
|
|
def get_client() -> chromadb.PersistentClient:
|
|
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
|
|
return chromadb.PersistentClient(path=str(CHROMA_DIR))
|
|
|
|
|
|
def collection_exists(client: chromadb.PersistentClient, stem: str) -> bool:
|
|
return any(c.name == stem for c in client.list_collections())
|
|
|
|
|
|
# ─── Ingestione ───────────────────────────────────────────────────────────────
|
|
|
|
def _ingest_stem(stem: str, collection: chromadb.Collection,
|
|
model: str, offset: int = 0) -> int:
|
|
"""
|
|
Aggiunge i chunk di uno stem a una collection esistente.
|
|
Prefissa chunk_id con stem per evitare collisioni multi-documento.
|
|
Ritorna il numero di chunk aggiunti.
|
|
"""
|
|
chunks_path = CHUNKS_DIR / stem / "chunks.json"
|
|
if not chunks_path.exists():
|
|
print(f"❌ File non trovato: {chunks_path}")
|
|
return 0
|
|
|
|
with open(chunks_path, encoding="utf-8") as f:
|
|
chunks = json.load(f)
|
|
|
|
if not chunks:
|
|
print(f"⚠️ {stem}: chunks.json è vuoto — skip")
|
|
return 0
|
|
|
|
total = len(chunks)
|
|
print(f" 📄 {stem}: {total} chunk\n")
|
|
|
|
ids = []
|
|
embeddings = []
|
|
documents = []
|
|
metadatas = []
|
|
|
|
start = time.monotonic()
|
|
durations: list[float] = []
|
|
|
|
for i, chunk in enumerate(chunks, start=1):
|
|
t0 = time.monotonic()
|
|
vector = embed(chunk["text"], model)
|
|
t1 = time.monotonic()
|
|
durations.append(t1 - t0)
|
|
|
|
ids.append(f"{stem}__{chunk['chunk_id']}")
|
|
embeddings.append(vector)
|
|
documents.append(chunk["text"])
|
|
metadatas.append({
|
|
"source": stem,
|
|
"sezione": chunk.get("sezione", ""),
|
|
"titolo": chunk.get("titolo", ""),
|
|
"sub_index": chunk.get("sub_index", 0),
|
|
})
|
|
|
|
avg = sum(durations) / len(durations)
|
|
eta = int(avg * (total - i))
|
|
done = f"[{offset + i:>6}/{offset + total}]"
|
|
cid = chunk["chunk_id"][:40]
|
|
print(f" {done} ✓ {stem}/{cid:<40} ETA: {eta}s", end="\r", flush=True)
|
|
|
|
if len(ids) == 100:
|
|
collection.add(ids=ids, embeddings=embeddings,
|
|
documents=documents, metadatas=metadatas)
|
|
ids, embeddings, documents, metadatas = [], [], [], []
|
|
|
|
if ids:
|
|
collection.add(ids=ids, embeddings=embeddings,
|
|
documents=documents, metadatas=metadatas)
|
|
|
|
elapsed = int(time.monotonic() - start)
|
|
print()
|
|
print(f" ✅ {stem}: {total} chunk in {elapsed}s")
|
|
return total
|
|
|
|
|
|
def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool:
|
|
"""Ingest singolo documento nella sua collection dedicata (retrocompatibile)."""
|
|
return ingest_multi([stem], collection_name=stem, force=force, model=model)
|
|
|
|
|
|
def ingest_multi(stems: list[str], collection_name: str,
|
|
force: bool, model: str = EMBED_MODEL) -> bool:
|
|
"""
|
|
Ingerisce uno o più documenti in una singola collection ChromaDB.
|
|
I chunk_id sono prefissati con lo stem per evitare collisioni.
|
|
Il metadato 'source' identifica il documento di provenienza.
|
|
"""
|
|
client = get_client()
|
|
|
|
if collection_exists(client, collection_name):
|
|
if not force:
|
|
print(f"⚠️ Collection '{collection_name}' già presente in ChromaDB — skip")
|
|
print(f" → usa --force per sovrascrivere")
|
|
return True
|
|
client.delete_collection(collection_name)
|
|
print(f"🗑️ Collection '{collection_name}' rimossa (--force)")
|
|
|
|
collection = client.create_collection(
|
|
name=collection_name,
|
|
metadata={"hnsw:space": "cosine"},
|
|
)
|
|
|
|
total_chunks = 0
|
|
for stem in stems:
|
|
n = _ingest_stem(stem, collection, model, offset=total_chunks)
|
|
if n == 0 and len(stems) == 1:
|
|
return False
|
|
total_chunks += n
|
|
|
|
print(f"\n✅ Collection '{collection_name}': {total_chunks} chunk totali")
|
|
print(f" Documenti: {', '.join(stems)}")
|
|
print(f" Percorso: {CHROMA_DIR}/")
|
|
return True
|
|
|
|
|
|
# ─── Entry point ──────────────────────────────────────────────────────────────
|
|
|
|
def find_stems() -> list[str]:
|
|
"""Ritorna tutti gli stem che hanno un chunks.json in chunks/."""
|
|
return sorted(
|
|
p.parent.name
|
|
for p in CHUNKS_DIR.glob("*/chunks.json")
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Vettorizzazione chunk in ChromaDB",
|
|
epilog=(
|
|
"Esempi:\n"
|
|
" python ingestion/ingest.py --stem manuale\n"
|
|
" python ingestion/ingest.py --collection archivio --stems doc1 doc2 doc3\n"
|
|
" python ingestion/ingest.py --collection archivio --stems doc1 doc2 --force\n"
|
|
" python ingestion/ingest.py # tutti i documenti, collection separate"
|
|
),
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument("--stem",
|
|
help="Singolo documento → collection con lo stesso nome")
|
|
parser.add_argument("--stems", nargs="+", metavar="STEM",
|
|
help="Uno o più documenti da unire in --collection")
|
|
parser.add_argument("--collection",
|
|
help="Nome della collection di destinazione (richiesto con --stems)")
|
|
parser.add_argument("--force", action="store_true",
|
|
help="Sovrascrive la collection se già esistente")
|
|
parser.add_argument("--model", default=EMBED_MODEL,
|
|
help=f"Modello embedding (default: {EMBED_MODEL})")
|
|
args = parser.parse_args()
|
|
|
|
print("─── Vettorizzazione ──────────────────────────────────────────────────\n")
|
|
|
|
if not check_ollama(args.model):
|
|
return 1
|
|
|
|
# ── Modalità multi-documento ─────────────────────────────────────────────
|
|
if args.stems or args.collection:
|
|
if not args.stems:
|
|
print("❌ --collection richiede --stems (es. --stems doc1 doc2 doc3)")
|
|
return 1
|
|
if not args.collection:
|
|
print("❌ --stems richiede --collection (es. --collection archivio)")
|
|
return 1
|
|
print(f" Collection : {args.collection}")
|
|
print(f" Documenti : {', '.join(args.stems)}\n")
|
|
ok = ingest_multi(args.stems, args.collection,
|
|
force=args.force, model=args.model)
|
|
return 0 if ok else 1
|
|
|
|
# ── Modalità singolo / tutti ─────────────────────────────────────────────
|
|
stems = [args.stem] if args.stem else find_stems()
|
|
if not stems:
|
|
print("❌ Nessun chunks.json trovato in chunks/")
|
|
return 1
|
|
|
|
results = []
|
|
for stem in stems:
|
|
if len(stems) > 1:
|
|
print(f"── {stem} ──")
|
|
results.append(ingest(stem, force=args.force, model=args.model))
|
|
if len(stems) > 1:
|
|
print()
|
|
|
|
return 0 if all(results) else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|