From 8d972fa7c6fc5b5a931e973569e221a29cdd9a54 Mon Sep 17 00:00:00 2001 From: Davide Grilli Date: Tue, 12 May 2026 11:21:17 +0200 Subject: [PATCH] feat(ingestion): supporto multi-documento in unica collection ChromaDB MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Aggiunge la possibilità di unire più documenti in una singola collection ChromaDB, con chunk_id prefissati per stem e metadato source per filtrare. - ingest.py: --stems doc1 doc2 --collection nome (nuovo), --stem (invariato) - rag.py / retrieve.py: --collection, source nei chunk, verbose mostra [source] Co-Authored-By: Claude Sonnet 4.6 --- ingestion/ingest.py | 141 +++++++++++++++++++++++++++++--------------- rag.py | 39 +++++++----- retrieve.py | 30 +++++++--- 3 files changed, 137 insertions(+), 73 deletions(-) diff --git a/ingestion/ingest.py b/ingestion/ingest.py index d9b0a3d..4a2c6d7 100644 --- a/ingestion/ingest.py +++ b/ingestion/ingest.py @@ -94,40 +94,27 @@ def collection_exists(client: chromadb.PersistentClient, stem: str) -> bool: # ─── Ingestione ─────────────────────────────────────────────────────────────── -def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool: +def _ingest_stem(stem: str, collection: chromadb.Collection, + model: str, offset: int = 0) -> int: """ - Legge step-6//chunks.json, genera embedding e popola ChromaDB. - Ritorna True se completato con successo, False altrimenti. + Aggiunge i chunk di uno stem a una collection esistente. + Prefissa chunk_id con stem per evitare collisioni multi-documento. + Ritorna il numero di chunk aggiunti. """ chunks_path = CHUNKS_DIR / stem / "chunks.json" if not chunks_path.exists(): print(f"❌ File non trovato: {chunks_path}") - return False + return 0 with open(chunks_path, encoding="utf-8") as f: chunks = json.load(f) if not chunks: print(f"⚠️ {stem}: chunks.json è vuoto — skip") - return False - - client = get_client() - - if collection_exists(client, stem): - if not force: - print(f"⚠️ Collection '{stem}' già presente in ChromaDB — skip") - print(f" → usa --force per sovrascrivere") - return True # non è un errore, è uno skip - client.delete_collection(stem) - print(f"🗑️ Collection '{stem}' rimossa (--force)") - - collection = client.create_collection( - name=stem, - metadata={"hnsw:space": "cosine"}, - ) + return 0 total = len(chunks) - print(f"📦 {total} chunk da ingestire\n") + print(f" 📄 {stem}: {total} chunk\n") ids = [] embeddings = [] @@ -143,10 +130,11 @@ def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool: t1 = time.monotonic() durations.append(t1 - t0) - ids.append(chunk["chunk_id"]) + ids.append(f"{stem}__{chunk['chunk_id']}") embeddings.append(vector) documents.append(chunk["text"]) metadatas.append({ + "source": stem, "sezione": chunk.get("sezione", ""), "titolo": chunk.get("titolo", ""), "sub_index": chunk.get("sub_index", 0), @@ -154,41 +142,69 @@ def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool: avg = sum(durations) / len(durations) eta = int(avg * (total - i)) - done = f"[{i:>{len(str(total))}}/{total}]" - cid = chunk["chunk_id"][:50] - line = f" {done} ✓ {cid:<50} ETA: {eta}s" - print(f"{line:<80}", end="\r", flush=True) + done = f"[{offset + i:>6}/{offset + total}]" + cid = chunk["chunk_id"][:40] + print(f" {done} ✓ {stem}/{cid:<40} ETA: {eta}s", end="\r", flush=True) - # Upsert in batch da 100 per non sovraccaricare la memoria if len(ids) == 100: - collection.add( - ids=ids, - embeddings=embeddings, - documents=documents, - metadatas=metadatas, - ) + collection.add(ids=ids, embeddings=embeddings, + documents=documents, metadatas=metadatas) ids, embeddings, documents, metadatas = [], [], [], [] - # Upsert dei rimanenti if ids: - collection.add( - ids=ids, - embeddings=embeddings, - documents=documents, - metadatas=metadatas, - ) + collection.add(ids=ids, embeddings=embeddings, + documents=documents, metadatas=metadatas) elapsed = int(time.monotonic() - start) - print() # nuova riga dopo il \r - print(f"\n✅ Ingestione completata in {elapsed}s — {total}/{total} chunk salvati") - print(f" Collection '{stem}' in {CHROMA_DIR}/") + print() + print(f" ✅ {stem}: {total} chunk in {elapsed}s") + return total + + +def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool: + """Ingest singolo documento nella sua collection dedicata (retrocompatibile).""" + return ingest_multi([stem], collection_name=stem, force=force, model=model) + + +def ingest_multi(stems: list[str], collection_name: str, + force: bool, model: str = EMBED_MODEL) -> bool: + """ + Ingerisce uno o più documenti in una singola collection ChromaDB. + I chunk_id sono prefissati con lo stem per evitare collisioni. + Il metadato 'source' identifica il documento di provenienza. + """ + client = get_client() + + if collection_exists(client, collection_name): + if not force: + print(f"⚠️ Collection '{collection_name}' già presente in ChromaDB — skip") + print(f" → usa --force per sovrascrivere") + return True + client.delete_collection(collection_name) + print(f"🗑️ Collection '{collection_name}' rimossa (--force)") + + collection = client.create_collection( + name=collection_name, + metadata={"hnsw:space": "cosine"}, + ) + + total_chunks = 0 + for stem in stems: + n = _ingest_stem(stem, collection, model, offset=total_chunks) + if n == 0 and len(stems) == 1: + return False + total_chunks += n + + print(f"\n✅ Collection '{collection_name}': {total_chunks} chunk totali") + print(f" Documenti: {', '.join(stems)}") + print(f" Percorso: {CHROMA_DIR}/") return True # ─── Entry point ────────────────────────────────────────────────────────────── def find_stems() -> list[str]: - """Ritorna tutti gli stem che hanno un chunks.json in step-6/.""" + """Ritorna tutti gli stem che hanno un chunks.json in chunks/.""" return sorted( p.parent.name for p in CHUNKS_DIR.glob("*/chunks.json") @@ -197,26 +213,53 @@ def find_stems() -> list[str]: def main() -> int: parser = argparse.ArgumentParser( - description="Step 8 — Vettorizzazione chunk in ChromaDB" + description="Vettorizzazione chunk in ChromaDB", + epilog=( + "Esempi:\n" + " python ingestion/ingest.py --stem manuale\n" + " python ingestion/ingest.py --collection archivio --stems doc1 doc2 doc3\n" + " python ingestion/ingest.py --collection archivio --stems doc1 doc2 --force\n" + " python ingestion/ingest.py # tutti i documenti, collection separate" + ), + formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument("--stem", help="Nome del documento (senza --stem = tutti)") + parser.add_argument("--stem", + help="Singolo documento → collection con lo stesso nome") + parser.add_argument("--stems", nargs="+", metavar="STEM", + help="Uno o più documenti da unire in --collection") + parser.add_argument("--collection", + help="Nome della collection di destinazione (richiesto con --stems)") parser.add_argument("--force", action="store_true", help="Sovrascrive la collection se già esistente") parser.add_argument("--model", default=EMBED_MODEL, - help=f"Modello embedding Ollama (default da config.py: {EMBED_MODEL})") + help=f"Modello embedding (default: {EMBED_MODEL})") args = parser.parse_args() - print("─── Step 8 — Vettorizzazione ─────────────────────────────────────────\n") + print("─── Vettorizzazione ──────────────────────────────────────────────────\n") if not check_ollama(args.model): return 1 + # ── Modalità multi-documento ───────────────────────────────────────────── + if args.stems or args.collection: + if not args.stems: + print("❌ --collection richiede --stems (es. --stems doc1 doc2 doc3)") + return 1 + if not args.collection: + print("❌ --stems richiede --collection (es. --collection archivio)") + return 1 + print(f" Collection : {args.collection}") + print(f" Documenti : {', '.join(args.stems)}\n") + ok = ingest_multi(args.stems, args.collection, + force=args.force, model=args.model) + return 0 if ok else 1 + + # ── Modalità singolo / tutti ───────────────────────────────────────────── stems = [args.stem] if args.stem else find_stems() if not stems: print("❌ Nessun chunks.json trovato in chunks/") return 1 - print() results = [] for stem in stems: if len(stems) > 1: diff --git a/rag.py b/rag.py index 8749427..992aa54 100644 --- a/rag.py +++ b/rag.py @@ -101,6 +101,7 @@ def retrieve(collection: chromadb.Collection, question: str) -> list[dict]: ): chunks.append({ "text": text, + "source": meta.get("source", ""), "sezione": meta.get("sezione", ""), "titolo": meta.get("titolo", ""), "distance": dist, @@ -143,7 +144,8 @@ def answer(question: str, collection: chromadb.Collection, verbose: bool) -> Non if c["titolo"]: loc += f" > {c['titolo']}" sim = 1 - c["distance"] - print(f" [{i}] {loc} (similarità: {sim:.3f})") + src = f"[{c['source']}] " if c.get("source") else "" + print(f" [{i}] {src}{loc} (similarità: {sim:.3f})") print(f" {c['text'][:120].replace(chr(10), ' ')}...") print("──────────────────────────────────────────────────────────────\n") @@ -215,19 +217,23 @@ def main() -> int: ) parser.add_argument( "--stem", - required=True, - help=( - "Nome della collection ChromaDB da interrogare. " - "Le collection vengono create da: python ingestion/ingest.py --stem " - ), + help="Collection di un singolo documento (retrocompatibile)", + ) + parser.add_argument( + "--collection", + help="Collection multi-documento creata con: ingest.py --collection --stems ...", ) args = parser.parse_args() + collection_name = args.collection or args.stem + if not collection_name: + parser.error("specifica --stem oppure --collection ") + print("─── Pipeline RAG ────────────────────────────────────────────\n") - print(f" Documento : {args.stem}") - print(f" Modello : {LLM_MODEL}") - print(f" Top-K : {TOP_K}") - print(f" Thinking : {'off' if NO_THINK else 'on'}") + print(f" Collection : {collection_name}") + print(f" Modello : {LLM_MODEL}") + print(f" Top-K : {TOP_K}") + print(f" Thinking : {'off' if NO_THINK else 'on'}") print() if not CHROMA_DIR.exists(): @@ -236,13 +242,16 @@ def main() -> int: client = chromadb.PersistentClient(path=str(CHROMA_DIR)) collections = [c.name for c in client.list_collections()] - if args.stem not in collections: - print(f"❌ Collection '{args.stem}' non trovata in chroma_db/") - print(f" → python ingestion/ingest.py --stem {args.stem}") + if collection_name not in collections: + print(f"❌ Collection '{collection_name}' non trovata in chroma_db/") + if args.stem: + print(f" → python ingestion/ingest.py --stem {collection_name}") + else: + print(f" → python ingestion/ingest.py --collection {collection_name} --stems doc1 doc2 ...") return 1 - collection = client.get_collection(args.stem) - print(f"✅ Collection '{args.stem}' caricata ({collection.count()} chunk)\n") + collection = client.get_collection(collection_name) + print(f"✅ Collection '{collection_name}' caricata ({collection.count()} chunk)\n") run_loop(collection) return 0 diff --git a/retrieve.py b/retrieve.py index 7e9644e..02a3868 100644 --- a/retrieve.py +++ b/retrieve.py @@ -85,6 +85,7 @@ def retrieve(collection: chromadb.Collection, query: str, top_k: int) -> list[di chunks.append({ "rank": rank, "similarity": round(1 - dist, 4), + "source": meta.get("source", ""), "sezione": meta.get("sezione", ""), "titolo": meta.get("titolo", ""), "text": text, @@ -97,10 +98,11 @@ def retrieve(collection: chromadb.Collection, query: str, top_k: int) -> list[di def print_results(chunks: list[dict], full: bool = False) -> None: print(f"── {len(chunks)} chunk recuperati ─────────────────────────────────\n") for c in chunks: + src = f"[{c['source']}] " if c.get("source") else "" loc = c["sezione"] if c["titolo"]: loc += f" > {c['titolo']}" - print(f" [{c['rank']}] similarità: {c['similarity']:.4f} | {loc}") + print(f" [{c['rank']}] similarità: {c['similarity']:.4f} | {src}{loc}") if full: print() print(c["text"]) @@ -177,8 +179,11 @@ def main() -> int: ) parser.add_argument( "--stem", - required=True, - help="Nome della collection ChromaDB da interrogare.", + help="Collection di un singolo documento (retrocompatibile)", + ) + parser.add_argument( + "--collection", + help="Collection multi-documento creata con: ingest.py --collection --stems ...", ) parser.add_argument( "--top-k", @@ -189,8 +194,12 @@ def main() -> int: ) args = parser.parse_args() + collection_name = args.collection or args.stem + if not collection_name: + parser.error("specifica --stem oppure --collection ") + print("─── Retrieval puro ──────────────────────────────────────────\n") - print(f" Documento : {args.stem}") + print(f" Collection : {collection_name}") print(f" Embed model : {EMBED_MODEL}") print(f" Top-K : {args.top_k}") print() @@ -201,13 +210,16 @@ def main() -> int: client = chromadb.PersistentClient(path=str(CHROMA_DIR)) collections = [c.name for c in client.list_collections()] - if args.stem not in collections: - print(f"❌ Collection '{args.stem}' non trovata in chroma_db/", file=sys.stderr) - print(f" → python ingestion/ingest.py --stem {args.stem}", file=sys.stderr) + if collection_name not in collections: + print(f"❌ Collection '{collection_name}' non trovata in chroma_db/", file=sys.stderr) + if args.stem: + print(f" → python ingestion/ingest.py --stem {collection_name}", file=sys.stderr) + else: + print(f" → python ingestion/ingest.py --collection {collection_name} --stems doc1 doc2 ...", file=sys.stderr) return 1 - collection = client.get_collection(args.stem) - print(f"✅ Collection '{args.stem}' caricata ({collection.count()} chunk)\n") + collection = client.get_collection(collection_name) + print(f"✅ Collection '{collection_name}' caricata ({collection.count()} chunk)\n") run_loop(collection, args.top_k) return 0