Files
justin c737871c4c Trial-data scrapers: gh_plot_reports + agripro_trials + search_trials tool
This PR introduces TRIAL data — yield-performance results from real
field trials — as a SEPARATE data type alongside variety identity.
The two are complementary:

  search_docs  → "What's the disease resistance of DKC62-08RIB?"
                  (variety identity — what it IS)
  search_trials → "Which corn hybrid won the IA 2024 trials?"
                  (performance data — how it PERFORMED)

scrape/sources/gh_plot_reports.py — Golden Harvest plot reports
- 4,618 expected (2024+2025; 2023 deferred to a backfill pass).
- URL: /<crop>/plot-report/<state>/<year>/<plot_id>
- Cross-vendor: each plot lists products from multiple brands
  (NK / DEKALB / Golden Harvest / Enogen / Pioneer / Channel) side
  by side at one cooperator's field — the kind of independent
  comparison data Bayer doesn't publish itself.
- Generic per-column metrics dict (Yield/MST/Test Weight/$/Ac for
  corn+soy, Ton/Acre + Milk + Beef columns for silage).
- Politeness: 1 req/sec, retries on 429/5xx, no redirect-follow.

scrape/sources/agripro_trials.py — AgriPro regional trial PDFs
- 14 unique PDFs (38 sitemap links deduped) at /trials-data
- pdfplumber text extraction, region/year detection from filename
- Verbatim PDF text preserved in chunk body so variety + yield
  number adjacency drives retrieval (AP Iliad's Aberdeen ID yield
  matches a query about "AP Iliad Idaho yield")

rag/chunk.py — chunks_from_trial() dispatching by source
- Plot reports: identity preamble + Top-5 by primary metric + full
  ranking table. Metric labels chosen from the data (corn/soy use
  "Yield", silage uses "Ton/Acre").
- AgriPro PDFs: identity preamble + verbatim trial body inline so
  per-location yields surface for region+variety queries.
- Variety chunks get data_type="variety" metadata; trial chunks get
  data_type="trial". Single Chroma collection; the tool router
  filters by data_type rather than maintaining two collections.

rag/index.py — dispatch by sidecar's data_type field
rag/bm25.py — new filter columns (data_type, year, state)

docs_mcp/server.py — sixth MCP tool: search_trials(crop?, state?,
year?, product?, k=10)
- Filters trial chunks via where={"data_type": "trial", ...}
- Optional product substring post-filter for "DKC62-08RIB Iowa 2024"
  style searches
- search_docs now defaults to data_type="variety" so trial chunks
  don't bleed into variety identity queries
- Tool docstring routes the agent: "use lookup_variety to verify
  identity details on any trial winner you surface"

NK trial endpoint (/NKSeeds/wsProxy.asmx/GetPlotResult) is documented
as deferred — the ASMX-SOAP shape returned empty XML on initial
probe. Bayer per-variety yield data is not publicly indexed at all
— documented in the trial-scope note (DEKALB/Asgrow trial data flows
through Channel reps, not the web). AgRevival research books exist
as 10 large annual PDFs but are deferred (low ROI per parse).

Initial corpus shipped in this PR: 14 AgriPro trial PDFs. The 4,618
Golden Harvest plot reports are scraping in background and will be
added in a follow-up corpus-snapshot PR (~70 min ETA).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 15:19:03 -04:00

246 lines
9.2 KiB
Python

"""SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes.
Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a
single-tower dense embedding's weakness on rare technical tokens —
for seed-mcp that's variety codes ("DKC62-08RIB"), trait codes
("XF", "VT2PRIB"), disease abbreviations ("SCN", "SDS", "Goss's"),
and Rps gene names ("Rps1c", "Rps3a"). Dense embeddings don't bridge
queries like "Rps3a soybean" cleanly into the relevant chunk; BM25
matches them directly. Fused with the dense ranking via RRF, the
hybrid result is strictly better than either alone for the queries
we expect from the farm-advisor agent.
Why SQLite FTS5:
- In the stdlib. Zero new deps.
- On-disk. Same persistence model as Chroma — Docker COPY the dir,
`rag.index --rebuild` regenerates from corpus.
- Built-in `bm25()` ranking function. No knobs to tune that matter
for our use case (k1=1.2, b=0.75 defaults are fine).
- Builds <1k chunks in milliseconds; adds nothing to rebuild time.
Schema is two tables to keep filtering clean. FTS5 doesn't filter
nicely on its own columns; the content_rowid pattern keeps an
external metadata table joinable by rowid. For seed-mcp the
filterable columns are seed-domain facets — source, vendor, brand,
crop, source_key — rather than the docs-template version/platform.
"""
from __future__ import annotations
import logging
import re
import sqlite3
from pathlib import Path
from typing import Any
log = logging.getLogger(__name__)
# Default location: bm25/<product>_docs.db at the repo root, next to chroma/.
ROOT = Path(__file__).resolve().parent.parent
DEFAULT_DB_DIR = ROOT / "bm25"
DEFAULT_DB_NAME = "crop_seed_docs.db"
# Columns we expose as filterable metadata. Mirrors what
# ``docs_mcp.server._build_where`` accepts so the same filter dict
# works for both Chroma and BM25 without per-retriever translation.
# data_type / year / state / region are trial-specific facets; variety
# chunks leave them empty.
FILTER_COLUMNS = (
"source", "vendor", "brand", "crop", "source_key",
"data_type", "year", "state", "ordinal",
)
# Allowlist tokenizer for free-text queries. FTS5's parser chokes on
# lots of punctuation we routinely see in farmer queries ("Rps1c",
# "SCN-resistant", "0.05 MG", em-dashes). Rather than blocklist every
# operator, keep alphanumerics + a few separators and replace
# everything else with a space.
_KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]")
# FTS5 reserves these Boolean operator KEYWORDS at the token level.
_BOOLEAN_KW_RE = re.compile(r"(?<!\w)(AND|OR|NOT|NEAR)(?!\w)")
def _sanitize_query(text: str) -> str:
"""Reduce a natural-language query to an FTS5 OR-of-tokens query.
See ``crop-chem-docs`` for the rationale; same transformation
applies here. OR semantics maximizes recall — BM25 already
weights documents with more query-term matches higher.
"""
cleaned = _KEEP_RE.sub(" ", text)
cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned)
tokens = cleaned.split()
if not tokens:
return ""
return " OR ".join(tokens)
def _where_to_sql(where: dict | None) -> tuple[str, list[Any]]:
"""Translate a Chroma-shaped filter dict into a SQL fragment + params.
Accepts the same shapes ``docs_mcp.server._build_where`` produces:
None → ("", [])
{"crop": "corn"} → ("AND m.crop = ?", ["corn"])
{"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...])
Unknown keys are silently dropped (defensive — better to over-match
than to crash on a filter we don't know).
"""
if not where:
return "", []
parts: list[str] = []
params: list[Any] = []
def _emit_eq(cond: dict[str, Any]) -> None:
for k, v in cond.items():
if k in FILTER_COLUMNS:
parts.append(f"m.{k} = ?")
params.append(v)
if "$and" in where:
for sub in where["$and"]:
_emit_eq(sub)
else:
_emit_eq(where)
if not parts:
return "", []
return "AND " + " AND ".join(parts), params
class BM25Index:
"""Thin wrapper around an FTS5-backed sqlite db.
Single-writer model. Reads are connection-per-call (sqlite handles
concurrency through file locks; for our read-heavy workload that's
fine and avoids cross-thread connection sharing issues with the MCP
server's request handlers).
"""
def __init__(self, db_path: Path | None = None):
self.db_path = Path(db_path) if db_path else (DEFAULT_DB_DIR / DEFAULT_DB_NAME)
# -- build ----------------------------------------------------------
def build(self, records: list[dict]) -> int:
"""Rebuild the index from scratch from `records`.
`records` is the same list ``rag.index.variety_records`` produces:
``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk
insert wrapped in a transaction.
"""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
if self.db_path.exists():
self.db_path.unlink()
with sqlite3.connect(self.db_path) as con:
con.executescript(self._schema_sql())
con.executemany(
"INSERT INTO chunks_meta "
"(id, source, vendor, brand, crop, source_key, "
" data_type, year, state, ordinal) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
[
(
r["id"],
r["metadata"].get("source") or "",
r["metadata"].get("vendor") or "",
r["metadata"].get("brand") or "",
r["metadata"].get("crop") or "",
r["metadata"].get("source_key") or "",
r["metadata"].get("data_type") or "variety",
int(r["metadata"]["year"]) if isinstance(r["metadata"].get("year"), int) else None,
r["metadata"].get("state") or "",
int(r["metadata"].get("ordinal") or 0),
)
for r in records
],
)
con.executemany(
"INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)",
[
(i + 1, r["text"])
for i, r in enumerate(records)
],
)
con.commit()
log.info("bm25: indexed %d chunks → %s", len(records), self.db_path)
return len(records)
# -- query ----------------------------------------------------------
def query(
self,
text: str,
n: int = 200,
where: dict | None = None,
) -> list[tuple[str, float]]:
"""Return up to `n` (chunk_id, bm25_score) pairs, lowest score first.
FTS5's bm25() returns NEGATIVE numbers — more relevant docs have
smaller (more negative) scores. We order ASC so the first row is
the most relevant.
"""
sanitized = _sanitize_query(text)
if not sanitized:
return []
where_sql, params = _where_to_sql(where)
sql = (
"SELECT m.id, bm25(chunks_fts) AS score "
"FROM chunks_fts "
"JOIN chunks_meta m ON m.rowid = chunks_fts.rowid "
f"WHERE chunks_fts MATCH ? {where_sql} "
"ORDER BY bm25(chunks_fts) "
"LIMIT ?"
)
try:
with sqlite3.connect(self.db_path) as con:
cur = con.execute(sql, [sanitized, *params, n])
return [(row[0], float(row[1])) for row in cur.fetchall()]
except sqlite3.OperationalError as e:
log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80])
return []
def exists(self) -> bool:
return self.db_path.exists()
def count(self) -> int:
if not self.exists():
return 0
try:
with sqlite3.connect(self.db_path) as con:
return con.execute("SELECT COUNT(*) FROM chunks_meta").fetchone()[0]
except sqlite3.OperationalError:
return 0
# -- schema ---------------------------------------------------------
@staticmethod
def _schema_sql() -> str:
return """
CREATE TABLE chunks_meta (
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT UNIQUE NOT NULL,
source TEXT,
vendor TEXT,
brand TEXT,
crop TEXT,
source_key TEXT,
data_type TEXT,
year INTEGER,
state TEXT,
ordinal INTEGER
);
CREATE INDEX idx_meta_source ON chunks_meta(source);
CREATE INDEX idx_meta_crop ON chunks_meta(crop);
CREATE INDEX idx_meta_brand ON chunks_meta(brand);
CREATE INDEX idx_meta_source_key ON chunks_meta(source_key);
CREATE INDEX idx_meta_data_type ON chunks_meta(data_type);
CREATE INDEX idx_meta_year ON chunks_meta(year);
CREATE INDEX idx_meta_state ON chunks_meta(state);
CREATE VIRTUAL TABLE chunks_fts USING fts5(
text,
tokenize = 'porter unicode61 remove_diacritics 2'
);
"""