step-8: add ingest.py, align README
- ingest.py: embed chunks via Ollama nomic-embed-text, index in ChromaDB (cosine space); --stem / --force / batch-100 / ETA display - README: fix step-8 input path (step-5 → step-6), script path (scripts/ → step-8/), add --force explanation and real timings
This commit is contained in:
@@ -0,0 +1,224 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Step 8 — Vettorizzazione
|
||||
|
||||
Legge i chunk prodotti da step-6, genera gli embedding tramite Ollama
|
||||
(nomic-embed-text) e li indicizza in ChromaDB (persistente).
|
||||
|
||||
Input: step-6/<stem>/chunks.json
|
||||
Output: chroma_db/<stem> (collection ChromaDB)
|
||||
|
||||
Uso:
|
||||
python step-8/ingest.py --stem <nome> # singolo documento
|
||||
python step-8/ingest.py # tutti gli stem trovati
|
||||
python step-8/ingest.py --stem <nome> --force # sovrascrive collection
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
import chromadb
|
||||
|
||||
# ─── Costanti ─────────────────────────────────────────────────────────────────
|
||||
|
||||
project_root = Path(__file__).parent.parent
|
||||
|
||||
CHUNKS_DIR = project_root / "step-6"
|
||||
CHROMA_DIR = project_root / "chroma_db"
|
||||
|
||||
OLLAMA_URL = "http://localhost:11434"
|
||||
EMBED_MODEL = "nomic-embed-text"
|
||||
EMBED_ENDPOINT = f"{OLLAMA_URL}/api/embeddings"
|
||||
|
||||
|
||||
# ─── Ollama ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def embed(text: str) -> list[float]:
|
||||
"""Chiama Ollama /api/embeddings e ritorna il vettore."""
|
||||
payload = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode()
|
||||
req = urllib.request.Request(
|
||||
EMBED_ENDPOINT,
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
data = json.loads(resp.read())
|
||||
return data["embedding"]
|
||||
|
||||
|
||||
def check_ollama() -> bool:
|
||||
"""Verifica che Ollama sia attivo e che nomic-embed-text sia disponibile."""
|
||||
try:
|
||||
req = urllib.request.Request(f"{OLLAMA_URL}/api/tags", method="GET")
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
data = json.loads(resp.read())
|
||||
models = [m["name"] for m in data.get("models", [])]
|
||||
found = any(
|
||||
m == EMBED_MODEL or m.startswith(EMBED_MODEL + ":")
|
||||
for m in models
|
||||
)
|
||||
if found:
|
||||
print(f"✅ Ollama OK — {EMBED_MODEL} disponibile")
|
||||
return True
|
||||
print(f"❌ Modello {EMBED_MODEL} non trovato in Ollama")
|
||||
print(f" → ollama pull {EMBED_MODEL}")
|
||||
return False
|
||||
except (urllib.error.URLError, OSError):
|
||||
print("❌ Ollama non raggiungibile — assicurati che sia in esecuzione")
|
||||
print(" → ollama serve")
|
||||
return False
|
||||
|
||||
|
||||
# ─── ChromaDB ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_client() -> chromadb.PersistentClient:
|
||||
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
|
||||
return chromadb.PersistentClient(path=str(CHROMA_DIR))
|
||||
|
||||
|
||||
def collection_exists(client: chromadb.PersistentClient, stem: str) -> bool:
|
||||
return any(c.name == stem for c in client.list_collections())
|
||||
|
||||
|
||||
# ─── Ingestione ───────────────────────────────────────────────────────────────
|
||||
|
||||
def ingest(stem: str, force: bool) -> bool:
|
||||
"""
|
||||
Legge step-6/<stem>/chunks.json, genera embedding e popola ChromaDB.
|
||||
Ritorna True se completato con successo, False altrimenti.
|
||||
"""
|
||||
chunks_path = CHUNKS_DIR / stem / "chunks.json"
|
||||
if not chunks_path.exists():
|
||||
print(f"❌ File non trovato: {chunks_path}")
|
||||
return False
|
||||
|
||||
with open(chunks_path, encoding="utf-8") as f:
|
||||
chunks = json.load(f)
|
||||
|
||||
if not chunks:
|
||||
print(f"⚠️ {stem}: chunks.json è vuoto — skip")
|
||||
return False
|
||||
|
||||
client = get_client()
|
||||
|
||||
if collection_exists(client, stem):
|
||||
if not force:
|
||||
print(f"⚠️ Collection '{stem}' già presente in ChromaDB — skip")
|
||||
print(f" → usa --force per sovrascrivere")
|
||||
return True # non è un errore, è uno skip
|
||||
client.delete_collection(stem)
|
||||
print(f"🗑️ Collection '{stem}' rimossa (--force)")
|
||||
|
||||
collection = client.create_collection(
|
||||
name=stem,
|
||||
metadata={"hnsw:space": "cosine"},
|
||||
)
|
||||
|
||||
total = len(chunks)
|
||||
print(f"📦 {total} chunk da ingestire\n")
|
||||
|
||||
ids = []
|
||||
embeddings = []
|
||||
documents = []
|
||||
metadatas = []
|
||||
|
||||
start = time.monotonic()
|
||||
durations: list[float] = []
|
||||
|
||||
for i, chunk in enumerate(chunks, start=1):
|
||||
t0 = time.monotonic()
|
||||
vector = embed(chunk["text"])
|
||||
t1 = time.monotonic()
|
||||
durations.append(t1 - t0)
|
||||
|
||||
ids.append(chunk["chunk_id"])
|
||||
embeddings.append(vector)
|
||||
documents.append(chunk["text"])
|
||||
metadatas.append({
|
||||
"sezione": chunk.get("sezione", ""),
|
||||
"titolo": chunk.get("titolo", ""),
|
||||
"sub_index": chunk.get("sub_index", 0),
|
||||
})
|
||||
|
||||
avg = sum(durations) / len(durations)
|
||||
eta = int(avg * (total - i))
|
||||
done = f"[{i:>{len(str(total))}}/{total}]"
|
||||
cid = chunk["chunk_id"][:50]
|
||||
line = f" {done} ✓ {cid:<50} ETA: {eta}s"
|
||||
print(f"{line:<80}", end="\r", flush=True)
|
||||
|
||||
# Upsert in batch da 100 per non sovraccaricare la memoria
|
||||
if len(ids) == 100:
|
||||
collection.add(
|
||||
ids=ids,
|
||||
embeddings=embeddings,
|
||||
documents=documents,
|
||||
metadatas=metadatas,
|
||||
)
|
||||
ids, embeddings, documents, metadatas = [], [], [], []
|
||||
|
||||
# Upsert dei rimanenti
|
||||
if ids:
|
||||
collection.add(
|
||||
ids=ids,
|
||||
embeddings=embeddings,
|
||||
documents=documents,
|
||||
metadatas=metadatas,
|
||||
)
|
||||
|
||||
elapsed = int(time.monotonic() - start)
|
||||
print() # nuova riga dopo il \r
|
||||
print(f"\n✅ Ingestione completata in {elapsed}s — {total}/{total} chunk salvati")
|
||||
print(f" Collection '{stem}' in {CHROMA_DIR}/")
|
||||
return True
|
||||
|
||||
|
||||
# ─── Entry point ──────────────────────────────────────────────────────────────
|
||||
|
||||
def find_stems() -> list[str]:
|
||||
"""Ritorna tutti gli stem che hanno un chunks.json in step-6/."""
|
||||
return sorted(
|
||||
p.parent.name
|
||||
for p in CHUNKS_DIR.glob("*/chunks.json")
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Step 8 — Vettorizzazione chunk in ChromaDB"
|
||||
)
|
||||
parser.add_argument("--stem", help="Nome del documento (senza --stem = tutti)")
|
||||
parser.add_argument("--force", action="store_true",
|
||||
help="Sovrascrive la collection se già esistente")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("─── Step 8 — Vettorizzazione ─────────────────────────────────────────\n")
|
||||
|
||||
if not check_ollama():
|
||||
return 1
|
||||
|
||||
stems = [args.stem] if args.stem else find_stems()
|
||||
if not stems:
|
||||
print("❌ Nessun chunks.json trovato in step-6/")
|
||||
return 1
|
||||
|
||||
print()
|
||||
results = []
|
||||
for stem in stems:
|
||||
if len(stems) > 1:
|
||||
print(f"── {stem} ──")
|
||||
results.append(ingest(stem, force=args.force))
|
||||
if len(stems) > 1:
|
||||
print()
|
||||
|
||||
return 0 if all(results) else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user