dd691b0111
The chunker emits any single paragraph as a stand-alone chunk regardless of size. One HVM page had a 14,858-char paragraph (a big config table) — nomic-embed-text 400'd the entire embed batch because the model's context is 2048 tokens. Added a hard-split fallback that splits any oversized chunk on line boundaries to MAX_CHARS=6000 (~1500 tokens, headroom). Also defaulted PRODUCT_NAME to "hvm" in rag/index.py to match server.py. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
135 lines
4.5 KiB
Python
135 lines
4.5 KiB
Python
"""Build Chroma (and optionally BM25) indexes from corpus on disk.
|
|
|
|
Reads `corpus/<bundle>/<page>.{md,json}`, chunks each page, upserts
|
|
into Chroma. With --rebuild, drops + recreates the collection (clean
|
|
state). With --bm25-only, skips Chroma and rebuilds only the FTS5
|
|
index — useful for fast iteration when chunking didn't change.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
import chromadb
|
|
from chromadb.config import Settings
|
|
|
|
from .chunk import chunks_from_page
|
|
from .embeddings import embedding_function
|
|
|
|
log = logging.getLogger(__name__)
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
CORPUS = ROOT / "corpus"
|
|
CHROMA_DIR = ROOT / "chroma"
|
|
|
|
# Collection name — convention: <product>_docs. Override via env if needed.
|
|
import os
|
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "hvm")
|
|
COLLECTION = f"{PRODUCT_NAME}_docs"
|
|
|
|
|
|
def page_records() -> Iterator[dict]:
|
|
"""Walk corpus/, yield chunks for every page."""
|
|
if not CORPUS.exists():
|
|
log.error("corpus/ doesn't exist; run the scraper first")
|
|
return
|
|
for bundle_dir in sorted(CORPUS.iterdir()):
|
|
if not bundle_dir.is_dir() or bundle_dir.name.startswith("."):
|
|
continue
|
|
for md_path in sorted(bundle_dir.glob("*.md")):
|
|
page_id = md_path.stem
|
|
sidecar = md_path.with_suffix(".json")
|
|
if not sidecar.exists():
|
|
log.warning("skipping %s — no JSON sidecar", md_path)
|
|
continue
|
|
md = md_path.read_text()
|
|
meta = json.loads(sidecar.read_text())
|
|
# Surface common filter fields at the chunk-metadata level
|
|
# so Chroma's `where` filter can use them.
|
|
base_meta = {
|
|
"bundle_id": bundle_dir.name,
|
|
"page_id": page_id,
|
|
"title": meta.get("title") or "",
|
|
"version": meta.get("version") or "",
|
|
"platform": meta.get("platform") or "",
|
|
"product": meta.get("product") or "",
|
|
}
|
|
yield from chunks_from_page(md, page_id, base_meta)
|
|
|
|
|
|
def upsert_to_chroma(records: list[dict]) -> int:
|
|
client = chromadb.PersistentClient(
|
|
path=str(CHROMA_DIR),
|
|
settings=Settings(anonymized_telemetry=False),
|
|
)
|
|
# Drop + recreate for --rebuild semantics
|
|
try:
|
|
client.delete_collection(COLLECTION)
|
|
except Exception:
|
|
pass
|
|
col = client.create_collection(COLLECTION, embedding_function=embedding_function())
|
|
|
|
BATCH = 64
|
|
total = 0
|
|
for i in range(0, len(records), BATCH):
|
|
chunk = records[i:i + BATCH]
|
|
col.upsert(
|
|
ids=[r["id"] for r in chunk],
|
|
documents=[r["text"] for r in chunk],
|
|
metadatas=[r["metadata"] for r in chunk],
|
|
)
|
|
total += len(chunk)
|
|
log.info("upserted %d / %d chunks", total, len(records))
|
|
return total
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--rebuild", action="store_true",
|
|
help="Drop and recreate the Chroma collection.")
|
|
p.add_argument("--bm25-only", action="store_true",
|
|
help="Rebuild only the BM25 index, skip Chroma.")
|
|
p.add_argument("--bm25-db", type=Path,
|
|
default=ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db",
|
|
help="Path to the BM25 sqlite db.")
|
|
args = p.parse_args()
|
|
|
|
log.info("reading corpus from %s", CORPUS)
|
|
t0 = time.time()
|
|
records = list(page_records())
|
|
log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0)
|
|
|
|
if args.bm25_only:
|
|
from .bm25 import BM25Index
|
|
log.info("--bm25-only: building FTS5 only")
|
|
BM25Index(args.bm25_db).build(records)
|
|
return 0
|
|
|
|
if not args.rebuild:
|
|
log.info("no --rebuild; nothing to do. (Use --rebuild to upsert.)")
|
|
return 0
|
|
|
|
t_c = time.time()
|
|
n = upsert_to_chroma(records)
|
|
log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c)
|
|
|
|
# Build BM25 too — see PLAN.md Phase 8. Safe to remove this block
|
|
# for products that don't need hybrid retrieval.
|
|
try:
|
|
from .bm25 import BM25Index
|
|
t_b = time.time()
|
|
BM25Index(args.bm25_db).build(records)
|
|
log.info("bm25 done in %.1fs", time.time() - t_b)
|
|
except ImportError:
|
|
log.info("rag.bm25 not available — skipping BM25 build")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|