c737871c4c
This PR introduces TRIAL data — yield-performance results from real
field trials — as a SEPARATE data type alongside variety identity.
The two are complementary:
search_docs → "What's the disease resistance of DKC62-08RIB?"
(variety identity — what it IS)
search_trials → "Which corn hybrid won the IA 2024 trials?"
(performance data — how it PERFORMED)
scrape/sources/gh_plot_reports.py — Golden Harvest plot reports
- 4,618 expected (2024+2025; 2023 deferred to a backfill pass).
- URL: /<crop>/plot-report/<state>/<year>/<plot_id>
- Cross-vendor: each plot lists products from multiple brands
(NK / DEKALB / Golden Harvest / Enogen / Pioneer / Channel) side
by side at one cooperator's field — the kind of independent
comparison data Bayer doesn't publish itself.
- Generic per-column metrics dict (Yield/MST/Test Weight/$/Ac for
corn+soy, Ton/Acre + Milk + Beef columns for silage).
- Politeness: 1 req/sec, retries on 429/5xx, no redirect-follow.
scrape/sources/agripro_trials.py — AgriPro regional trial PDFs
- 14 unique PDFs (38 sitemap links deduped) at /trials-data
- pdfplumber text extraction, region/year detection from filename
- Verbatim PDF text preserved in chunk body so variety + yield
number adjacency drives retrieval (AP Iliad's Aberdeen ID yield
matches a query about "AP Iliad Idaho yield")
rag/chunk.py — chunks_from_trial() dispatching by source
- Plot reports: identity preamble + Top-5 by primary metric + full
ranking table. Metric labels chosen from the data (corn/soy use
"Yield", silage uses "Ton/Acre").
- AgriPro PDFs: identity preamble + verbatim trial body inline so
per-location yields surface for region+variety queries.
- Variety chunks get data_type="variety" metadata; trial chunks get
data_type="trial". Single Chroma collection; the tool router
filters by data_type rather than maintaining two collections.
rag/index.py — dispatch by sidecar's data_type field
rag/bm25.py — new filter columns (data_type, year, state)
docs_mcp/server.py — sixth MCP tool: search_trials(crop?, state?,
year?, product?, k=10)
- Filters trial chunks via where={"data_type": "trial", ...}
- Optional product substring post-filter for "DKC62-08RIB Iowa 2024"
style searches
- search_docs now defaults to data_type="variety" so trial chunks
don't bleed into variety identity queries
- Tool docstring routes the agent: "use lookup_variety to verify
identity details on any trial winner you surface"
NK trial endpoint (/NKSeeds/wsProxy.asmx/GetPlotResult) is documented
as deferred — the ASMX-SOAP shape returned empty XML on initial
probe. Bayer per-variety yield data is not publicly indexed at all
— documented in the trial-scope note (DEKALB/Asgrow trial data flows
through Channel reps, not the web). AgRevival research books exist
as 10 large annual PDFs but are deferred (low ROI per parse).
Initial corpus shipped in this PR: 14 AgriPro trial PDFs. The 4,618
Golden Harvest plot reports are scraping in background and will be
added in a follow-up corpus-snapshot PR (~70 min ETA).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
141 lines
4.7 KiB
Python
141 lines
4.7 KiB
Python
"""Build Chroma (and BM25) indexes from the seed corpus on disk.
|
|
|
|
Reads ``corpus/<source>/<source_key>.json`` sidecars, chunks each
|
|
variety via ``rag.chunk.chunks_from_variety``, upserts into Chroma.
|
|
With ``--rebuild``, drops + recreates the collection (clean state).
|
|
With ``--bm25-only``, skips Chroma and rebuilds only the FTS5 index
|
|
— useful for fast iteration when the chunker didn't change.
|
|
|
|
Collection name is ``<PRODUCT_NAME>_docs`` (default: ``crop_seed_docs``).
|
|
Override via the PRODUCT_NAME env var.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
import chromadb
|
|
from chromadb.config import Settings
|
|
|
|
from .chunk import chunks_from_variety, chunks_from_trial
|
|
from .embeddings import embedding_function
|
|
|
|
log = logging.getLogger(__name__)
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
CORPUS = ROOT / "corpus"
|
|
CHROMA_DIR = ROOT / "chroma"
|
|
|
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
|
|
COLLECTION = f"{PRODUCT_NAME}_docs"
|
|
|
|
|
|
def variety_records() -> Iterator[dict]:
|
|
"""Walk ``corpus/<source>/<source_key>.json``, yield one chunk per
|
|
document.
|
|
|
|
Dispatches by the sidecar's ``data_type`` field:
|
|
- ``"trial"`` → chunks_from_trial (gh_plot_reports, agripro_trials)
|
|
- anything else (or absent) → chunks_from_variety (default)
|
|
|
|
The output shape (id/text/metadata) is identical for both — only
|
|
the chunk text composition and metadata keys differ. Chroma + BM25
|
|
can index both into the same collection; downstream tools filter
|
|
by the ``data_type`` metadata field.
|
|
"""
|
|
if not CORPUS.exists():
|
|
log.error("corpus/ doesn't exist; run a scraper first")
|
|
return
|
|
for source_dir in sorted(CORPUS.iterdir()):
|
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
|
continue
|
|
for sidecar_path in sorted(source_dir.glob("*.json")):
|
|
try:
|
|
head = json.loads(sidecar_path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
log.warning("skipping unreadable sidecar %s: %s", sidecar_path, exc)
|
|
continue
|
|
if head.get("data_type") == "trial":
|
|
yield from chunks_from_trial(sidecar_path)
|
|
else:
|
|
yield from chunks_from_variety(sidecar_path)
|
|
|
|
|
|
def upsert_to_chroma(records: list[dict]) -> int:
|
|
client = chromadb.PersistentClient(
|
|
path=str(CHROMA_DIR),
|
|
settings=Settings(anonymized_telemetry=False),
|
|
)
|
|
# Drop + recreate for --rebuild semantics.
|
|
try:
|
|
client.delete_collection(COLLECTION)
|
|
except Exception:
|
|
pass
|
|
col = client.create_collection(COLLECTION, embedding_function=embedding_function())
|
|
|
|
BATCH = 64
|
|
total = 0
|
|
for i in range(0, len(records), BATCH):
|
|
chunk = records[i:i + BATCH]
|
|
col.upsert(
|
|
ids=[r["id"] for r in chunk],
|
|
documents=[r["text"] for r in chunk],
|
|
metadatas=[r["metadata"] for r in chunk],
|
|
)
|
|
total += len(chunk)
|
|
log.info("upserted %d / %d chunks", total, len(records))
|
|
return total
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--rebuild", action="store_true",
|
|
help="Drop and recreate the Chroma collection.")
|
|
p.add_argument("--bm25-only", action="store_true",
|
|
help="Rebuild only the BM25 index, skip Chroma.")
|
|
p.add_argument("--bm25-db", type=Path,
|
|
default=ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db",
|
|
help="Path to the BM25 sqlite db.")
|
|
args = p.parse_args()
|
|
|
|
log.info("reading corpus from %s", CORPUS)
|
|
t0 = time.time()
|
|
records = list(variety_records())
|
|
log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0)
|
|
if not records:
|
|
log.error("no chunks — is corpus/ populated?")
|
|
return 1
|
|
|
|
if args.bm25_only:
|
|
from .bm25 import BM25Index
|
|
log.info("--bm25-only: building FTS5 only")
|
|
BM25Index(args.bm25_db).build(records)
|
|
return 0
|
|
|
|
if not args.rebuild:
|
|
log.info("no --rebuild; nothing to do. (Use --rebuild to upsert.)")
|
|
return 0
|
|
|
|
t_c = time.time()
|
|
n = upsert_to_chroma(records)
|
|
log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c)
|
|
|
|
try:
|
|
from .bm25 import BM25Index
|
|
t_b = time.time()
|
|
BM25Index(args.bm25_db).build(records)
|
|
log.info("bm25 done in %.1fs", time.time() - t_b)
|
|
except ImportError:
|
|
log.info("rag.bm25 not available — skipping BM25 build")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|