"""Build Chroma (and BM25) indexes from the seed corpus on disk. Reads ``corpus//.json`` sidecars, chunks each variety via ``rag.chunk.chunks_from_variety``, upserts into Chroma. With ``--rebuild``, drops + recreates the collection (clean state). With ``--bm25-only``, skips Chroma and rebuilds only the FTS5 index — useful for fast iteration when the chunker didn't change. Collection name is ``_docs`` (default: ``crop_seed_docs``). Override via the PRODUCT_NAME env var. """ from __future__ import annotations import argparse import json import logging import os import time from pathlib import Path from typing import Iterator import chromadb from chromadb.config import Settings from .chunk import chunks_from_variety, chunks_from_trial from .embeddings import embedding_function log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") ROOT = Path(__file__).resolve().parent.parent CORPUS = ROOT / "corpus" CHROMA_DIR = ROOT / "chroma" PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed") COLLECTION = f"{PRODUCT_NAME}_docs" def variety_records() -> Iterator[dict]: """Walk ``corpus//.json``, yield one chunk per document. Dispatches by the sidecar's ``data_type`` field: - ``"trial"`` → chunks_from_trial (gh_plot_reports, agripro_trials) - anything else (or absent) → chunks_from_variety (default) The output shape (id/text/metadata) is identical for both — only the chunk text composition and metadata keys differ. Chroma + BM25 can index both into the same collection; downstream tools filter by the ``data_type`` metadata field. """ if not CORPUS.exists(): log.error("corpus/ doesn't exist; run a scraper first") return for source_dir in sorted(CORPUS.iterdir()): if not source_dir.is_dir() or source_dir.name.startswith("."): continue for sidecar_path in sorted(source_dir.glob("*.json")): try: head = json.loads(sidecar_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: log.warning("skipping unreadable sidecar %s: %s", sidecar_path, exc) continue if head.get("data_type") == "trial": yield from chunks_from_trial(sidecar_path) else: yield from chunks_from_variety(sidecar_path) def upsert_to_chroma(records: list[dict]) -> int: client = chromadb.PersistentClient( path=str(CHROMA_DIR), settings=Settings(anonymized_telemetry=False), ) # Drop + recreate for --rebuild semantics. try: client.delete_collection(COLLECTION) except Exception: pass col = client.create_collection(COLLECTION, embedding_function=embedding_function()) BATCH = 64 total = 0 for i in range(0, len(records), BATCH): chunk = records[i:i + BATCH] col.upsert( ids=[r["id"] for r in chunk], documents=[r["text"] for r in chunk], metadatas=[r["metadata"] for r in chunk], ) total += len(chunk) log.info("upserted %d / %d chunks", total, len(records)) return total def main() -> int: p = argparse.ArgumentParser() p.add_argument("--rebuild", action="store_true", help="Drop and recreate the Chroma collection.") p.add_argument("--bm25-only", action="store_true", help="Rebuild only the BM25 index, skip Chroma.") p.add_argument("--bm25-db", type=Path, default=ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db", help="Path to the BM25 sqlite db.") args = p.parse_args() log.info("reading corpus from %s", CORPUS) t0 = time.time() records = list(variety_records()) log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0) if not records: log.error("no chunks — is corpus/ populated?") return 1 if args.bm25_only: from .bm25 import BM25Index log.info("--bm25-only: building FTS5 only") BM25Index(args.bm25_db).build(records) return 0 if not args.rebuild: log.info("no --rebuild; nothing to do. (Use --rebuild to upsert.)") return 0 t_c = time.time() n = upsert_to_chroma(records) log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c) try: from .bm25 import BM25Index t_b = time.time() BM25Index(args.bm25_db).build(records) log.info("bm25 done in %.1fs", time.time() - t_b) except ImportError: log.info("rag.bm25 not available — skipping BM25 build") return 0 if __name__ == "__main__": raise SystemExit(main())