Files
seed-mcp/rag/index.py
T
justin ac40e05734
Image rebuild (skip scrape) / build (push) Failing after 7s
seed-mcp scaffold: clone docs-mcp-template, customize for crop_seed PRODUCT_NAME
Sibling project to crop-chem-docs, same MCP-template lineage. Corpus is
seed/hybrid varieties across 6 vendors instead of pesticide labels.

What's customized vs. the template:
- CLAUDE.md: vendor matrix, build priority, Pioneer fallback policy,
  canonical sidecar schema (per-crop), Golden Harvest disease-scale
  reversal gotcha, no-IPv6 / HTTPS-clone note
- README.md: vendor coverage table, tool list, phase status
- Dockerfile: PRODUCT_NAME=crop_seed default, sources.json (not
  bundles.json), HYBRID_SEARCH=true, OLLAMA_URL + RERANK_URL Docker
  DNS defaults (same llama-rerank sidecar as crop-chem-docs)
- .gitea/workflows/refresh.yml: monthly cron (seed catalogs move
  slowly), 5 GREEN scraper steps, corpus-YYYY.MM.DD tag for Drawbar
  pinning, continue-on-error on GC step
- .gitea/workflows/image-only.yml: paths filter + cancel-in-progress
  concurrency group
- scripts/registry_gc.py: lifted from crop-chem-docs (correct Gitea
  packages API URL + UA header to bypass CF block on default
  Python-urllib UA)
- sources.json: catalog of 6 sources + scope_filter + per-source
  schema notes + Pioneer-exclusion rationale
- scrape/runner.py: dispatcher with --all = GREEN-only
- scrape/sources/{bayer_seeds,golden_harvest,nk,agripro,becks_pfr,
  becks_products}.py: stub modules with implementation notes
- docs_mcp/server.py: PRODUCT_NAME default → crop_seed,
  PRODUCT_DOCS_URL → repo URL

Pioneer is intentionally NOT a source. ToS bans automation; dealer
locator is login-gated. The MCP returns a curated fallback lesson
directing the user to pioneer.com.

Next phases:
- Phase 1: implement bayer_seeds (lift-and-shift from crop-chem-docs
  Bayer scraper; same __NEXT_DATA__ infra)
- Phase 7: curate eval/queries.jsonl
- Phase 11: lessons.md with Pioneer fallback + disease-scale notes

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 12:28:49 -04:00

135 lines
4.5 KiB
Python

"""Build Chroma (and optionally BM25) indexes from corpus on disk.
Reads `corpus/<bundle>/<page>.{md,json}`, chunks each page, upserts
into Chroma. With --rebuild, drops + recreates the collection (clean
state). With --bm25-only, skips Chroma and rebuilds only the FTS5
index — useful for fast iteration when chunking didn't change.
"""
from __future__ import annotations
import argparse
import json
import logging
import time
from pathlib import Path
from typing import Iterator
import chromadb
from chromadb.config import Settings
from .chunk import chunks_from_page
from .embeddings import embedding_function
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma"
# Collection name — convention: <product>_docs. Override via env if needed.
import os
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "myproduct")
COLLECTION = f"{PRODUCT_NAME}_docs"
def page_records() -> Iterator[dict]:
"""Walk corpus/, yield chunks for every page."""
if not CORPUS.exists():
log.error("corpus/ doesn't exist; run the scraper first")
return
for bundle_dir in sorted(CORPUS.iterdir()):
if not bundle_dir.is_dir() or bundle_dir.name.startswith("."):
continue
for md_path in sorted(bundle_dir.glob("*.md")):
page_id = md_path.stem
sidecar = md_path.with_suffix(".json")
if not sidecar.exists():
log.warning("skipping %s — no JSON sidecar", md_path)
continue
md = md_path.read_text()
meta = json.loads(sidecar.read_text())
# Surface common filter fields at the chunk-metadata level
# so Chroma's `where` filter can use them.
base_meta = {
"bundle_id": bundle_dir.name,
"page_id": page_id,
"title": meta.get("title") or "",
"version": meta.get("version") or "",
"platform": meta.get("platform") or "",
"product": meta.get("product") or "",
}
yield from chunks_from_page(md, page_id, base_meta)
def upsert_to_chroma(records: list[dict]) -> int:
client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
# Drop + recreate for --rebuild semantics
try:
client.delete_collection(COLLECTION)
except Exception:
pass
col = client.create_collection(COLLECTION, embedding_function=embedding_function())
BATCH = 64
total = 0
for i in range(0, len(records), BATCH):
chunk = records[i:i + BATCH]
col.upsert(
ids=[r["id"] for r in chunk],
documents=[r["text"] for r in chunk],
metadatas=[r["metadata"] for r in chunk],
)
total += len(chunk)
log.info("upserted %d / %d chunks", total, len(records))
return total
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--rebuild", action="store_true",
help="Drop and recreate the Chroma collection.")
p.add_argument("--bm25-only", action="store_true",
help="Rebuild only the BM25 index, skip Chroma.")
p.add_argument("--bm25-db", type=Path,
default=ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db",
help="Path to the BM25 sqlite db.")
args = p.parse_args()
log.info("reading corpus from %s", CORPUS)
t0 = time.time()
records = list(page_records())
log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0)
if args.bm25_only:
from .bm25 import BM25Index
log.info("--bm25-only: building FTS5 only")
BM25Index(args.bm25_db).build(records)
return 0
if not args.rebuild:
log.info("no --rebuild; nothing to do. (Use --rebuild to upsert.)")
return 0
t_c = time.time()
n = upsert_to_chroma(records)
log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c)
# Build BM25 too — see PLAN.md Phase 8. Safe to remove this block
# for products that don't need hybrid retrieval.
try:
from .bm25 import BM25Index
t_b = time.time()
BM25Index(args.bm25_db).build(records)
log.info("bm25 done in %.1fs", time.time() - t_b)
except ImportError:
log.info("rag.bm25 not available — skipping BM25 build")
return 0
if __name__ == "__main__":
raise SystemExit(main())