"""Build Chroma (and optionally BM25) indexes from corpus on disk. Reads `corpus//.{md,json}`, chunks each page, upserts into Chroma. With --rebuild, drops + recreates the collection (clean state). With --bm25-only, skips Chroma and rebuilds only the FTS5 index — useful for fast iteration when chunking didn't change. """ from __future__ import annotations import argparse import json import logging import time from pathlib import Path from typing import Iterator import chromadb from chromadb.config import Settings from .chunk import chunks_from_page from .embeddings import embedding_function log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") ROOT = Path(__file__).resolve().parent.parent CORPUS = ROOT / "corpus" CHROMA_DIR = ROOT / "chroma" # Collection name — convention: _docs. Override via env if needed. import os PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "morpheus") COLLECTION = f"{PRODUCT_NAME}_docs" def page_records() -> Iterator[dict]: """Walk corpus/, yield chunks for every page.""" if not CORPUS.exists(): log.error("corpus/ doesn't exist; run the scraper first") return for bundle_dir in sorted(CORPUS.iterdir()): if not bundle_dir.is_dir() or bundle_dir.name.startswith("."): continue for md_path in sorted(bundle_dir.glob("*.md")): page_id = md_path.stem sidecar = md_path.with_suffix(".json") if not sidecar.exists(): log.warning("skipping %s — no JSON sidecar", md_path) continue md = md_path.read_text() meta = json.loads(sidecar.read_text()) # Surface common filter fields at the chunk-metadata level # so Chroma's `where` filter can use them. base_meta = { "bundle_id": bundle_dir.name, "page_id": page_id, "title": meta.get("title") or "", "version": meta.get("version") or "", "platform": meta.get("platform") or "", "product": meta.get("product") or "", } yield from chunks_from_page(md, page_id, base_meta) def upsert_to_chroma(records: list[dict]) -> int: client = chromadb.PersistentClient( path=str(CHROMA_DIR), settings=Settings(anonymized_telemetry=False), ) # Drop + recreate for --rebuild semantics try: client.delete_collection(COLLECTION) except Exception: pass col = client.create_collection(COLLECTION, embedding_function=embedding_function()) BATCH = 64 total = 0 for i in range(0, len(records), BATCH): chunk = records[i:i + BATCH] col.upsert( ids=[r["id"] for r in chunk], documents=[r["text"] for r in chunk], metadatas=[r["metadata"] for r in chunk], ) total += len(chunk) log.info("upserted %d / %d chunks", total, len(records)) return total def main() -> int: p = argparse.ArgumentParser() p.add_argument("--rebuild", action="store_true", help="Drop and recreate the Chroma collection.") p.add_argument("--bm25-only", action="store_true", help="Rebuild only the BM25 index, skip Chroma.") p.add_argument("--bm25-db", type=Path, default=ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db", help="Path to the BM25 sqlite db.") args = p.parse_args() log.info("reading corpus from %s", CORPUS) t0 = time.time() records = list(page_records()) log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0) if args.bm25_only: from .bm25 import BM25Index log.info("--bm25-only: building FTS5 only") BM25Index(args.bm25_db).build(records) return 0 if not args.rebuild: log.info("no --rebuild; nothing to do. (Use --rebuild to upsert.)") return 0 t_c = time.time() n = upsert_to_chroma(records) log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c) # Build BM25 too — see PLAN.md Phase 8. Safe to remove this block # for products that don't need hybrid retrieval. try: from .bm25 import BM25Index t_b = time.time() BM25Index(args.bm25_db).build(records) log.info("bm25 done in %.1fs", time.time() - t_b) except ImportError: log.info("rag.bm25 not available — skipping BM25 build") return 0 if __name__ == "__main__": raise SystemExit(main())