feat(ingestion): supporto multi-documento in unica collection ChromaDB
Aggiunge la possibilità di unire più documenti in una singola collection ChromaDB, con chunk_id prefissati per stem e metadato source per filtrare. - ingest.py: --stems doc1 doc2 --collection nome (nuovo), --stem (invariato) - rag.py / retrieve.py: --collection, source nei chunk, verbose mostra [source] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+92
-49
@@ -94,40 +94,27 @@ def collection_exists(client: chromadb.PersistentClient, stem: str) -> bool:
|
|||||||
|
|
||||||
# ─── Ingestione ───────────────────────────────────────────────────────────────
|
# ─── Ingestione ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool:
|
def _ingest_stem(stem: str, collection: chromadb.Collection,
|
||||||
|
model: str, offset: int = 0) -> int:
|
||||||
"""
|
"""
|
||||||
Legge step-6/<stem>/chunks.json, genera embedding e popola ChromaDB.
|
Aggiunge i chunk di uno stem a una collection esistente.
|
||||||
Ritorna True se completato con successo, False altrimenti.
|
Prefissa chunk_id con stem per evitare collisioni multi-documento.
|
||||||
|
Ritorna il numero di chunk aggiunti.
|
||||||
"""
|
"""
|
||||||
chunks_path = CHUNKS_DIR / stem / "chunks.json"
|
chunks_path = CHUNKS_DIR / stem / "chunks.json"
|
||||||
if not chunks_path.exists():
|
if not chunks_path.exists():
|
||||||
print(f"❌ File non trovato: {chunks_path}")
|
print(f"❌ File non trovato: {chunks_path}")
|
||||||
return False
|
return 0
|
||||||
|
|
||||||
with open(chunks_path, encoding="utf-8") as f:
|
with open(chunks_path, encoding="utf-8") as f:
|
||||||
chunks = json.load(f)
|
chunks = json.load(f)
|
||||||
|
|
||||||
if not chunks:
|
if not chunks:
|
||||||
print(f"⚠️ {stem}: chunks.json è vuoto — skip")
|
print(f"⚠️ {stem}: chunks.json è vuoto — skip")
|
||||||
return False
|
return 0
|
||||||
|
|
||||||
client = get_client()
|
|
||||||
|
|
||||||
if collection_exists(client, stem):
|
|
||||||
if not force:
|
|
||||||
print(f"⚠️ Collection '{stem}' già presente in ChromaDB — skip")
|
|
||||||
print(f" → usa --force per sovrascrivere")
|
|
||||||
return True # non è un errore, è uno skip
|
|
||||||
client.delete_collection(stem)
|
|
||||||
print(f"🗑️ Collection '{stem}' rimossa (--force)")
|
|
||||||
|
|
||||||
collection = client.create_collection(
|
|
||||||
name=stem,
|
|
||||||
metadata={"hnsw:space": "cosine"},
|
|
||||||
)
|
|
||||||
|
|
||||||
total = len(chunks)
|
total = len(chunks)
|
||||||
print(f"📦 {total} chunk da ingestire\n")
|
print(f" 📄 {stem}: {total} chunk\n")
|
||||||
|
|
||||||
ids = []
|
ids = []
|
||||||
embeddings = []
|
embeddings = []
|
||||||
@@ -143,10 +130,11 @@ def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool:
|
|||||||
t1 = time.monotonic()
|
t1 = time.monotonic()
|
||||||
durations.append(t1 - t0)
|
durations.append(t1 - t0)
|
||||||
|
|
||||||
ids.append(chunk["chunk_id"])
|
ids.append(f"{stem}__{chunk['chunk_id']}")
|
||||||
embeddings.append(vector)
|
embeddings.append(vector)
|
||||||
documents.append(chunk["text"])
|
documents.append(chunk["text"])
|
||||||
metadatas.append({
|
metadatas.append({
|
||||||
|
"source": stem,
|
||||||
"sezione": chunk.get("sezione", ""),
|
"sezione": chunk.get("sezione", ""),
|
||||||
"titolo": chunk.get("titolo", ""),
|
"titolo": chunk.get("titolo", ""),
|
||||||
"sub_index": chunk.get("sub_index", 0),
|
"sub_index": chunk.get("sub_index", 0),
|
||||||
@@ -154,41 +142,69 @@ def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool:
|
|||||||
|
|
||||||
avg = sum(durations) / len(durations)
|
avg = sum(durations) / len(durations)
|
||||||
eta = int(avg * (total - i))
|
eta = int(avg * (total - i))
|
||||||
done = f"[{i:>{len(str(total))}}/{total}]"
|
done = f"[{offset + i:>6}/{offset + total}]"
|
||||||
cid = chunk["chunk_id"][:50]
|
cid = chunk["chunk_id"][:40]
|
||||||
line = f" {done} ✓ {cid:<50} ETA: {eta}s"
|
print(f" {done} ✓ {stem}/{cid:<40} ETA: {eta}s", end="\r", flush=True)
|
||||||
print(f"{line:<80}", end="\r", flush=True)
|
|
||||||
|
|
||||||
# Upsert in batch da 100 per non sovraccaricare la memoria
|
|
||||||
if len(ids) == 100:
|
if len(ids) == 100:
|
||||||
collection.add(
|
collection.add(ids=ids, embeddings=embeddings,
|
||||||
ids=ids,
|
documents=documents, metadatas=metadatas)
|
||||||
embeddings=embeddings,
|
|
||||||
documents=documents,
|
|
||||||
metadatas=metadatas,
|
|
||||||
)
|
|
||||||
ids, embeddings, documents, metadatas = [], [], [], []
|
ids, embeddings, documents, metadatas = [], [], [], []
|
||||||
|
|
||||||
# Upsert dei rimanenti
|
|
||||||
if ids:
|
if ids:
|
||||||
collection.add(
|
collection.add(ids=ids, embeddings=embeddings,
|
||||||
ids=ids,
|
documents=documents, metadatas=metadatas)
|
||||||
embeddings=embeddings,
|
|
||||||
documents=documents,
|
|
||||||
metadatas=metadatas,
|
|
||||||
)
|
|
||||||
|
|
||||||
elapsed = int(time.monotonic() - start)
|
elapsed = int(time.monotonic() - start)
|
||||||
print() # nuova riga dopo il \r
|
print()
|
||||||
print(f"\n✅ Ingestione completata in {elapsed}s — {total}/{total} chunk salvati")
|
print(f" ✅ {stem}: {total} chunk in {elapsed}s")
|
||||||
print(f" Collection '{stem}' in {CHROMA_DIR}/")
|
return total
|
||||||
|
|
||||||
|
|
||||||
|
def ingest(stem: str, force: bool, model: str = EMBED_MODEL) -> bool:
|
||||||
|
"""Ingest singolo documento nella sua collection dedicata (retrocompatibile)."""
|
||||||
|
return ingest_multi([stem], collection_name=stem, force=force, model=model)
|
||||||
|
|
||||||
|
|
||||||
|
def ingest_multi(stems: list[str], collection_name: str,
|
||||||
|
force: bool, model: str = EMBED_MODEL) -> bool:
|
||||||
|
"""
|
||||||
|
Ingerisce uno o più documenti in una singola collection ChromaDB.
|
||||||
|
I chunk_id sono prefissati con lo stem per evitare collisioni.
|
||||||
|
Il metadato 'source' identifica il documento di provenienza.
|
||||||
|
"""
|
||||||
|
client = get_client()
|
||||||
|
|
||||||
|
if collection_exists(client, collection_name):
|
||||||
|
if not force:
|
||||||
|
print(f"⚠️ Collection '{collection_name}' già presente in ChromaDB — skip")
|
||||||
|
print(f" → usa --force per sovrascrivere")
|
||||||
|
return True
|
||||||
|
client.delete_collection(collection_name)
|
||||||
|
print(f"🗑️ Collection '{collection_name}' rimossa (--force)")
|
||||||
|
|
||||||
|
collection = client.create_collection(
|
||||||
|
name=collection_name,
|
||||||
|
metadata={"hnsw:space": "cosine"},
|
||||||
|
)
|
||||||
|
|
||||||
|
total_chunks = 0
|
||||||
|
for stem in stems:
|
||||||
|
n = _ingest_stem(stem, collection, model, offset=total_chunks)
|
||||||
|
if n == 0 and len(stems) == 1:
|
||||||
|
return False
|
||||||
|
total_chunks += n
|
||||||
|
|
||||||
|
print(f"\n✅ Collection '{collection_name}': {total_chunks} chunk totali")
|
||||||
|
print(f" Documenti: {', '.join(stems)}")
|
||||||
|
print(f" Percorso: {CHROMA_DIR}/")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
# ─── Entry point ──────────────────────────────────────────────────────────────
|
# ─── Entry point ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def find_stems() -> list[str]:
|
def find_stems() -> list[str]:
|
||||||
"""Ritorna tutti gli stem che hanno un chunks.json in step-6/."""
|
"""Ritorna tutti gli stem che hanno un chunks.json in chunks/."""
|
||||||
return sorted(
|
return sorted(
|
||||||
p.parent.name
|
p.parent.name
|
||||||
for p in CHUNKS_DIR.glob("*/chunks.json")
|
for p in CHUNKS_DIR.glob("*/chunks.json")
|
||||||
@@ -197,26 +213,53 @@ def find_stems() -> list[str]:
|
|||||||
|
|
||||||
def main() -> int:
|
def main() -> int:
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="Step 8 — Vettorizzazione chunk in ChromaDB"
|
description="Vettorizzazione chunk in ChromaDB",
|
||||||
|
epilog=(
|
||||||
|
"Esempi:\n"
|
||||||
|
" python ingestion/ingest.py --stem manuale\n"
|
||||||
|
" python ingestion/ingest.py --collection archivio --stems doc1 doc2 doc3\n"
|
||||||
|
" python ingestion/ingest.py --collection archivio --stems doc1 doc2 --force\n"
|
||||||
|
" python ingestion/ingest.py # tutti i documenti, collection separate"
|
||||||
|
),
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
)
|
)
|
||||||
parser.add_argument("--stem", help="Nome del documento (senza --stem = tutti)")
|
parser.add_argument("--stem",
|
||||||
|
help="Singolo documento → collection con lo stesso nome")
|
||||||
|
parser.add_argument("--stems", nargs="+", metavar="STEM",
|
||||||
|
help="Uno o più documenti da unire in --collection")
|
||||||
|
parser.add_argument("--collection",
|
||||||
|
help="Nome della collection di destinazione (richiesto con --stems)")
|
||||||
parser.add_argument("--force", action="store_true",
|
parser.add_argument("--force", action="store_true",
|
||||||
help="Sovrascrive la collection se già esistente")
|
help="Sovrascrive la collection se già esistente")
|
||||||
parser.add_argument("--model", default=EMBED_MODEL,
|
parser.add_argument("--model", default=EMBED_MODEL,
|
||||||
help=f"Modello embedding Ollama (default da config.py: {EMBED_MODEL})")
|
help=f"Modello embedding (default: {EMBED_MODEL})")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
print("─── Step 8 — Vettorizzazione ─────────────────────────────────────────\n")
|
print("─── Vettorizzazione ──────────────────────────────────────────────────\n")
|
||||||
|
|
||||||
if not check_ollama(args.model):
|
if not check_ollama(args.model):
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
# ── Modalità multi-documento ─────────────────────────────────────────────
|
||||||
|
if args.stems or args.collection:
|
||||||
|
if not args.stems:
|
||||||
|
print("❌ --collection richiede --stems (es. --stems doc1 doc2 doc3)")
|
||||||
|
return 1
|
||||||
|
if not args.collection:
|
||||||
|
print("❌ --stems richiede --collection (es. --collection archivio)")
|
||||||
|
return 1
|
||||||
|
print(f" Collection : {args.collection}")
|
||||||
|
print(f" Documenti : {', '.join(args.stems)}\n")
|
||||||
|
ok = ingest_multi(args.stems, args.collection,
|
||||||
|
force=args.force, model=args.model)
|
||||||
|
return 0 if ok else 1
|
||||||
|
|
||||||
|
# ── Modalità singolo / tutti ─────────────────────────────────────────────
|
||||||
stems = [args.stem] if args.stem else find_stems()
|
stems = [args.stem] if args.stem else find_stems()
|
||||||
if not stems:
|
if not stems:
|
||||||
print("❌ Nessun chunks.json trovato in chunks/")
|
print("❌ Nessun chunks.json trovato in chunks/")
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
print()
|
|
||||||
results = []
|
results = []
|
||||||
for stem in stems:
|
for stem in stems:
|
||||||
if len(stems) > 1:
|
if len(stems) > 1:
|
||||||
|
|||||||
@@ -101,6 +101,7 @@ def retrieve(collection: chromadb.Collection, question: str) -> list[dict]:
|
|||||||
):
|
):
|
||||||
chunks.append({
|
chunks.append({
|
||||||
"text": text,
|
"text": text,
|
||||||
|
"source": meta.get("source", ""),
|
||||||
"sezione": meta.get("sezione", ""),
|
"sezione": meta.get("sezione", ""),
|
||||||
"titolo": meta.get("titolo", ""),
|
"titolo": meta.get("titolo", ""),
|
||||||
"distance": dist,
|
"distance": dist,
|
||||||
@@ -143,7 +144,8 @@ def answer(question: str, collection: chromadb.Collection, verbose: bool) -> Non
|
|||||||
if c["titolo"]:
|
if c["titolo"]:
|
||||||
loc += f" > {c['titolo']}"
|
loc += f" > {c['titolo']}"
|
||||||
sim = 1 - c["distance"]
|
sim = 1 - c["distance"]
|
||||||
print(f" [{i}] {loc} (similarità: {sim:.3f})")
|
src = f"[{c['source']}] " if c.get("source") else ""
|
||||||
|
print(f" [{i}] {src}{loc} (similarità: {sim:.3f})")
|
||||||
print(f" {c['text'][:120].replace(chr(10), ' ')}...")
|
print(f" {c['text'][:120].replace(chr(10), ' ')}...")
|
||||||
print("──────────────────────────────────────────────────────────────\n")
|
print("──────────────────────────────────────────────────────────────\n")
|
||||||
|
|
||||||
@@ -215,16 +217,20 @@ def main() -> int:
|
|||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--stem",
|
"--stem",
|
||||||
required=True,
|
help="Collection di un singolo documento (retrocompatibile)",
|
||||||
help=(
|
)
|
||||||
"Nome della collection ChromaDB da interrogare. "
|
parser.add_argument(
|
||||||
"Le collection vengono create da: python ingestion/ingest.py --stem <nome>"
|
"--collection",
|
||||||
),
|
help="Collection multi-documento creata con: ingest.py --collection <nome> --stems ...",
|
||||||
)
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
collection_name = args.collection or args.stem
|
||||||
|
if not collection_name:
|
||||||
|
parser.error("specifica --stem <nome> oppure --collection <nome>")
|
||||||
|
|
||||||
print("─── Pipeline RAG ────────────────────────────────────────────\n")
|
print("─── Pipeline RAG ────────────────────────────────────────────\n")
|
||||||
print(f" Documento : {args.stem}")
|
print(f" Collection : {collection_name}")
|
||||||
print(f" Modello : {LLM_MODEL}")
|
print(f" Modello : {LLM_MODEL}")
|
||||||
print(f" Top-K : {TOP_K}")
|
print(f" Top-K : {TOP_K}")
|
||||||
print(f" Thinking : {'off' if NO_THINK else 'on'}")
|
print(f" Thinking : {'off' if NO_THINK else 'on'}")
|
||||||
@@ -236,13 +242,16 @@ def main() -> int:
|
|||||||
|
|
||||||
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
|
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
|
||||||
collections = [c.name for c in client.list_collections()]
|
collections = [c.name for c in client.list_collections()]
|
||||||
if args.stem not in collections:
|
if collection_name not in collections:
|
||||||
print(f"❌ Collection '{args.stem}' non trovata in chroma_db/")
|
print(f"❌ Collection '{collection_name}' non trovata in chroma_db/")
|
||||||
print(f" → python ingestion/ingest.py --stem {args.stem}")
|
if args.stem:
|
||||||
|
print(f" → python ingestion/ingest.py --stem {collection_name}")
|
||||||
|
else:
|
||||||
|
print(f" → python ingestion/ingest.py --collection {collection_name} --stems doc1 doc2 ...")
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
collection = client.get_collection(args.stem)
|
collection = client.get_collection(collection_name)
|
||||||
print(f"✅ Collection '{args.stem}' caricata ({collection.count()} chunk)\n")
|
print(f"✅ Collection '{collection_name}' caricata ({collection.count()} chunk)\n")
|
||||||
|
|
||||||
run_loop(collection)
|
run_loop(collection)
|
||||||
return 0
|
return 0
|
||||||
|
|||||||
+21
-9
@@ -85,6 +85,7 @@ def retrieve(collection: chromadb.Collection, query: str, top_k: int) -> list[di
|
|||||||
chunks.append({
|
chunks.append({
|
||||||
"rank": rank,
|
"rank": rank,
|
||||||
"similarity": round(1 - dist, 4),
|
"similarity": round(1 - dist, 4),
|
||||||
|
"source": meta.get("source", ""),
|
||||||
"sezione": meta.get("sezione", ""),
|
"sezione": meta.get("sezione", ""),
|
||||||
"titolo": meta.get("titolo", ""),
|
"titolo": meta.get("titolo", ""),
|
||||||
"text": text,
|
"text": text,
|
||||||
@@ -97,10 +98,11 @@ def retrieve(collection: chromadb.Collection, query: str, top_k: int) -> list[di
|
|||||||
def print_results(chunks: list[dict], full: bool = False) -> None:
|
def print_results(chunks: list[dict], full: bool = False) -> None:
|
||||||
print(f"── {len(chunks)} chunk recuperati ─────────────────────────────────\n")
|
print(f"── {len(chunks)} chunk recuperati ─────────────────────────────────\n")
|
||||||
for c in chunks:
|
for c in chunks:
|
||||||
|
src = f"[{c['source']}] " if c.get("source") else ""
|
||||||
loc = c["sezione"]
|
loc = c["sezione"]
|
||||||
if c["titolo"]:
|
if c["titolo"]:
|
||||||
loc += f" > {c['titolo']}"
|
loc += f" > {c['titolo']}"
|
||||||
print(f" [{c['rank']}] similarità: {c['similarity']:.4f} | {loc}")
|
print(f" [{c['rank']}] similarità: {c['similarity']:.4f} | {src}{loc}")
|
||||||
if full:
|
if full:
|
||||||
print()
|
print()
|
||||||
print(c["text"])
|
print(c["text"])
|
||||||
@@ -177,8 +179,11 @@ def main() -> int:
|
|||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--stem",
|
"--stem",
|
||||||
required=True,
|
help="Collection di un singolo documento (retrocompatibile)",
|
||||||
help="Nome della collection ChromaDB da interrogare.",
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--collection",
|
||||||
|
help="Collection multi-documento creata con: ingest.py --collection <nome> --stems ...",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--top-k",
|
"--top-k",
|
||||||
@@ -189,8 +194,12 @@ def main() -> int:
|
|||||||
)
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
collection_name = args.collection or args.stem
|
||||||
|
if not collection_name:
|
||||||
|
parser.error("specifica --stem <nome> oppure --collection <nome>")
|
||||||
|
|
||||||
print("─── Retrieval puro ──────────────────────────────────────────\n")
|
print("─── Retrieval puro ──────────────────────────────────────────\n")
|
||||||
print(f" Documento : {args.stem}")
|
print(f" Collection : {collection_name}")
|
||||||
print(f" Embed model : {EMBED_MODEL}")
|
print(f" Embed model : {EMBED_MODEL}")
|
||||||
print(f" Top-K : {args.top_k}")
|
print(f" Top-K : {args.top_k}")
|
||||||
print()
|
print()
|
||||||
@@ -201,13 +210,16 @@ def main() -> int:
|
|||||||
|
|
||||||
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
|
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
|
||||||
collections = [c.name for c in client.list_collections()]
|
collections = [c.name for c in client.list_collections()]
|
||||||
if args.stem not in collections:
|
if collection_name not in collections:
|
||||||
print(f"❌ Collection '{args.stem}' non trovata in chroma_db/", file=sys.stderr)
|
print(f"❌ Collection '{collection_name}' non trovata in chroma_db/", file=sys.stderr)
|
||||||
print(f" → python ingestion/ingest.py --stem {args.stem}", file=sys.stderr)
|
if args.stem:
|
||||||
|
print(f" → python ingestion/ingest.py --stem {collection_name}", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
print(f" → python ingestion/ingest.py --collection {collection_name} --stems doc1 doc2 ...", file=sys.stderr)
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
collection = client.get_collection(args.stem)
|
collection = client.get_collection(collection_name)
|
||||||
print(f"✅ Collection '{args.stem}' caricata ({collection.count()} chunk)\n")
|
print(f"✅ Collection '{collection_name}' caricata ({collection.count()} chunk)\n")
|
||||||
|
|
||||||
run_loop(collection, args.top_k)
|
run_loop(collection, args.top_k)
|
||||||
return 0
|
return 0
|
||||||
|
|||||||
Reference in New Issue
Block a user