Files
seed-mcp/docs_mcp/server.py
justin bd71f30ca7 Phase 6/7: wire rerank + eval harness — 100% pass on 21 golden queries
Phase 6 — Reranker integration
- New _rerank(query, [(cid, doc), ...]) helper in server.py calls
  llama.cpp's /v1/rerank endpoint, returns reranker-ordered ids
  or None on failure (graceful fallback — search never blocks
  on the sidecar).
- search_docs + search_trials both call _rerank() on the post-
  hybrid pool BEFORE truncating to k. The variety-code prefilter
  still pins exact matches on top.
- Per-doc truncation to 2000 chars to fit jina-reranker-v2-base's
  per-pair token budget. Full chunk text still returned to the
  caller — truncation is rerank-input-only.
- Telemetry adds `reranked: true|false` so usage logs distinguish
  reranked calls.

Phase 7 — Eval harness
- eval/queries.jsonl: 21 golden queries spanning:
    * variety-code lookups (DKC62-08RIB, AG29XF4, WB6430, E085Z5,
      AP Iliad)
    * semantic variety queries (drought-tolerant corn, SCN MG-3
      soy, Rps3a, XtendFlex, HRS stripe rust, SWW PNW, Goss's Wilt)
    * trial queries (IA/IN/MN regional, AP Iliad ID, NK1701 head-
      to-head, silage Ton/Acre, product=DKC65-95)
    * anti-hallucination (Pioneer P1142 fallback, DKC65-20 not-in-
      corpus expected_empty)
- eval/retrievers.py: 4 named retrievers — dense, bm25, hybrid
  (dense+bm25+RRF), hybrid+rerank — all sharing the same filter
  shape as docs_mcp/server.py._build_where.
- eval/run_eval.py: runs each retriever against each query,
  reports Recall / Precision@1 / MRR / avg latency. Markdown
  output in eval/results/baseline.md.

Baseline results (k=5, 21 queries):

  | Retriever       | Pass  | Recall | P@1   | MRR   | Avg ms |
  |-----------------|-------|--------|-------|-------|--------|
  | hybrid+rerank   | 21/21 | 100%   | 90%   | 0.905 | 2064   |
  | bm25            | 20/21 |  95%   | 81%   | 0.833 |    5   |
  | hybrid          | 15/21 |  71%   | 62%   | 0.619 |   73   |
  | dense           | 14/21 |  67%   | 38%   | 0.440 |   79   |

Key findings:
1. hybrid+rerank wins on quality — 100% pass, 90% P@1.
2. BM25 alone is surprisingly competitive (95% pass) at 5 ms —
   excellent fallback when rerank is down. The variety-code
   prefilter in search_docs is doing a lot of work here.
3. Dense embedding alone is the WEAKEST configuration on this
   corpus — variety identity tokens (DKC62-08RIB, AP Iliad,
   Rps3a) have no semantic neighbors, so nomic-embed-text returns
   noise. The hybrid (no rerank) layer actively hurts because
   RRF dilutes the BM25 ranking with dense noise.
4. Anti-hallucination queries (Pioneer fallback, DKC65-20 not-
   in-corpus) pass on ALL retrievers including dense-only —
   the must_not_contain + expected_empty design holds.

Deploy decision: HYBRID_SEARCH=true + RERANK_URL set
(production env already has both — refresh.yml + image-only.yml
+ deploy/docker-compose.yml all configured).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 17:02:57 -04:00

1209 lines
47 KiB
Python

"""seed-mcp — MCP server over public US row-crop seed catalogs.
Tools (all read-only):
search_docs natural-language retrieval over the corpus
get_page the full markdown body of one variety + sidecar
list_versions facet discovery (crops / brands / vendors / sources)
lookup_variety canonical sidecar JSON by source_key — fact-check
anything you surface from search_docs against this
The contract with the calling agent is **never fabricate**. Every chunk
we return is verbatim from the source's published catalog (the chunker
rebuilds chunks deterministically from the sidecar JSON). Every
response carries the variety's source URL so the agent can cite. The
``lookup_variety`` tool exists specifically so the agent can validate
specific rating values without having to trust paraphrased retrieval
text.
"""
from __future__ import annotations
import json
import logging
import os
import re
from pathlib import Path
from typing import Annotated, Any
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from .usage import TimedCall
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Product-specific configuration.
# ---------------------------------------------------------------------------
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp")
COLLECTION = f"{PRODUCT_NAME}_docs"
# Paths inside the deployed container (and matching layout locally for dev).
ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma"
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
# ---------------------------------------------------------------------------
# Feature flags.
# ---------------------------------------------------------------------------
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "true").lower() in ("true", "1", "yes", "on")
RRF_K = int(os.environ.get("RRF_K", "60"))
# ---------------------------------------------------------------------------
# FastMCP setup.
# ---------------------------------------------------------------------------
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
# ---------------------------------------------------------------------------
# Lazy singletons. Instantiate on first use so the server can start even
# when (e.g.) Ollama is briefly unreachable — degraded modes fall back
# gracefully rather than refusing to boot.
# ---------------------------------------------------------------------------
_chroma_client = None
_chroma_collection = None
_bm25 = None
_code_index: dict[str, list[tuple[str, str]]] | None = None
def _build_code_index() -> dict[str, list[tuple[str, str]]]:
"""Walk sidecars once and index varieties by every lookup key a
farmer or LLM might paste: ``source_key``, ``hybrid_prefix``,
``product_name`` (normalized), each token from
``hybrid_prefix``/``product_name`` longer than three chars.
Returns a dict mapping the normalized key → list of
``(source, source_key)`` tuples. Multiple-source matches are kept
so we can warn the agent about ambiguity.
"""
idx: dict[str, list[tuple[str, str]]] = {}
def _add(key: str, value: tuple[str, str]) -> None:
key = (key or "").lower().strip()
if not key or len(key) < 4:
return
idx.setdefault(key, [])
if value not in idx[key]:
idx[key].append(value)
if not CORPUS.exists():
return idx
for source_dir in CORPUS.iterdir():
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
for sc in source_dir.glob("*.json"):
try:
d = json.loads(sc.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
source = d.get("source") or source_dir.name
sk = d.get("source_key") or sc.stem
value = (source, sk)
_add(sk, value)
_add(d.get("hybrid_prefix") or "", value)
_add(d.get("product_name") or "", value)
# Token-split product_name and hybrid_prefix so "DKC62-08RIB"
# in a query matches "DKC62-08RIB BRAND BLEND" via the
# "DKC62-08RIB" token as well as the full string.
for source_field in (d.get("product_name"), d.get("hybrid_prefix")):
if not source_field:
continue
for tok in re.split(r"[\s()/,]+", source_field):
tok = tok.strip()
# Only retain tokens that LOOK like codes — at least
# one digit, mostly alphanumeric/dash. Skips "BRAND"
# / "BLEND" / etc.
if re.match(r"^[A-Za-z]*\d[\w\-]*$", tok):
_add(tok, value)
return idx
def _code_lookup() -> dict[str, list[tuple[str, str]]]:
global _code_index
if _code_index is None:
_code_index = _build_code_index()
log.info("code-index: %d lookup keys built", len(_code_index))
return _code_index
# Tokens that LOOK like a variety code — at least one digit, otherwise
# alphanumeric / dash. Catches "DKC62-08RIB", "AG29XF4", "WB1376CLP",
# "VT2PRIB"; skips ordinary words like "ratings".
_CODE_TOKEN_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9\-]{2,})\b")
def _exact_code_matches(query: str) -> list[tuple[str, str]]:
"""Find varieties whose source_key / product_name / token-split
components contain a query token. Used as a high-confidence pin
before fuzzy retrieval."""
idx = _code_lookup()
if not idx:
return []
out: list[tuple[str, str]] = []
seen: set[tuple[str, str]] = set()
for m in _CODE_TOKEN_RE.finditer(query or ""):
tok = m.group(1).lower()
if len(tok) < 4 or not any(c.isdigit() for c in tok):
continue
for v in idx.get(tok, []):
if v not in seen:
seen.add(v)
out.append(v)
return out
def _collection():
"""Return the Chroma collection, opening it lazily."""
global _chroma_client, _chroma_collection
if _chroma_collection is not None:
return _chroma_collection
import chromadb
from chromadb.config import Settings
from rag.embeddings import embedding_function
_chroma_client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
_chroma_collection = _chroma_client.get_collection(
COLLECTION, embedding_function=embedding_function()
)
return _chroma_collection
def _bm25_index():
"""Return the BM25 index, or None if it doesn't exist on disk."""
global _bm25
if _bm25 is not None:
return _bm25
from rag.bm25 import BM25Index
idx = BM25Index(BM25_DB)
if not idx.exists():
return None
_bm25 = idx
return _bm25
# ---------------------------------------------------------------------------
# Helpers — local file IO + filter builder.
# ---------------------------------------------------------------------------
def _build_where(
crop: str | None,
brand: str | None,
vendor: str | None,
source: str | None,
source_key: str | None,
*,
data_type: str | None = None,
state: str | None = None,
year: int | None = None,
) -> dict | None:
"""Translate filter args into a Chroma `where` clause."""
conds: list[dict] = []
if data_type:
conds.append({"data_type": data_type})
if crop:
conds.append({"crop": crop.lower()})
if brand:
conds.append({"brand": brand.upper()})
if vendor:
conds.append({"vendor": vendor})
if source:
conds.append({"source": source})
if source_key:
conds.append({"source_key": source_key})
if state:
conds.append({"state": state.upper() if len(state) <= 3 else state})
if year:
conds.append({"year": int(year)})
if not conds:
return None
if len(conds) == 1:
return conds[0]
return {"$and": conds}
def _read_sidecar(source: str, source_key: str) -> dict | None:
"""Read a variety's sidecar JSON off disk."""
path = CORPUS / source / f"{source_key}.json"
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
log.warning("sidecar read failed for %s/%s: %s", source, source_key, exc)
return None
def _read_markdown(source: str, source_key: str) -> str | None:
path = CORPUS / source / f"{source_key}.md"
if not path.exists():
return None
try:
return path.read_text(encoding="utf-8")
except OSError:
return None
def _format_hit(doc: str, meta: dict, distance: float | None = None) -> str:
"""Render one retrieval hit as a fenced markdown block with full
provenance attached. ``doc`` is the chunk text; ``meta`` is the
chunk's metadata dict."""
src_url = meta.get("source_url") or ""
src_key = meta.get("source_key") or ""
src = meta.get("source") or ""
vendor = meta.get("vendor") or ""
brand = meta.get("brand") or ""
crop = meta.get("crop") or ""
name = meta.get("product_name") or src_key
header = (
f"### {name} \n"
f"`{src}::{src_key}` — {vendor} / {brand} / {crop} \n"
f"<{src_url}>"
)
if distance is not None:
header += f" \n_(distance={distance:.4f})_"
return f"{header}\n\n{doc.strip()}\n"
def _rrf_fuse(rankings: list[list[str]], k: int = RRF_K) -> list[str]:
"""Reciprocal Rank Fusion — merge multiple ranked id lists into one.
Score(id) = sum over each ranking R of 1 / (k + rank_R(id)).
Robust to score-scale differences between dense (cosine) and lexical
(BM25). k=60 is the literature default; not particularly sensitive.
"""
scores: dict[str, float] = {}
for ranking in rankings:
for rank, doc_id in enumerate(ranking):
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)
return sorted(scores, key=lambda d: scores[d], reverse=True)
# Per-doc character cap when sending to the reranker. jina-reranker-v2-base
# accepts up to ~1024 tokens PER QUERY+DOC PAIR (n_ctx_train) and rejects
# the WHOLE BATCH if any one pair exceeds it. Truncating each doc to
# ~2000 chars (≈ 500-700 tokens) leaves headroom for the query + chat
# template overhead. The truncation is reranking-only — full chunk text
# still goes back to the LLM caller.
RERANK_DOC_MAX_CHARS = 2000
def _rerank(query: str, candidates: list[tuple[str, str]]) -> list[str] | None:
"""Call the llama.cpp /v1/rerank endpoint and return the candidate
chunk ids in reranker-preferred order.
Args:
query: the user's natural-language query
candidates: list of ``(chunk_id, chunk_text)`` to rerank.
Returns:
A list of chunk_ids ordered best-first by reranker score, OR
``None`` if reranking is disabled, the endpoint is unreachable,
or any other error. The caller treats ``None`` as "fall back to
the input ranking" — rerank failures must NEVER block a search.
Anti-hallucination: rerank only reorders chunks the retrievers
already surfaced. It cannot introduce content not in the corpus.
"""
if not RERANK_URL or not candidates:
return None
try:
import httpx
except ImportError:
return None
# Truncate each doc to fit the per-pair token budget. jina-reranker
# rejects the entire batch on any oversize doc.
docs = [(text[:RERANK_DOC_MAX_CHARS] if text else "") for _cid, text in candidates]
ids = [cid for cid, _ in candidates]
try:
with httpx.Client(timeout=RERANK_TIMEOUT) as c:
r = c.post(
f"{RERANK_URL}/v1/rerank",
json={
"model": "rerank", # llama.cpp ignores this; jina passes through
"query": query,
"documents": docs,
},
)
r.raise_for_status()
payload = r.json()
except Exception as exc: # noqa: BLE001
log.warning("rerank request failed (%s) — falling back to input order", exc)
return None
results = payload.get("results") or []
if not results:
log.warning("rerank returned empty results — falling back to input order")
return None
# llama.cpp returns results as [{"index": int, "relevance_score": float}, ...]
# Higher relevance_score = better; sort descending.
try:
ordered = sorted(results, key=lambda r: -r.get("relevance_score", float("-inf")))
return [ids[r["index"]] for r in ordered if 0 <= r.get("index", -1) < len(ids)]
except (KeyError, IndexError, TypeError) as exc:
log.warning("rerank response malformed (%s) — falling back to input order", exc)
return None
def _structured_ratings_block(sidecar: dict) -> str:
"""Render the sidecar's grouped characteristics + identity as a
fact-checkable block, with the source URL pinned at top.
This is the response body for ``get_page`` and the embedded
"canonical data" block agents should trust over any free-text
paraphrase."""
lines: list[str] = []
name = sidecar.get("product_name") or sidecar.get("source_key", "")
lines.append(f"# {name}")
lines.append("")
facets: list[str] = []
if sidecar.get("vendor"):
facets.append(f"**Vendor:** {sidecar['vendor']}")
if sidecar.get("brand"):
facets.append(f"**Brand:** {str(sidecar['brand']).title()}")
if sidecar.get("crop"):
facets.append(f"**Crop:** {str(sidecar['crop']).capitalize()}")
if sidecar.get("relative_maturity") and sidecar.get("crop") == "corn":
facets.append(f"**Relative maturity:** {sidecar['relative_maturity']}")
if sidecar.get("maturity_group") and sidecar.get("crop") == "soybeans":
facets.append(f"**Maturity group:** {sidecar['maturity_group']}")
if sidecar.get("relative_maturity") and sidecar.get("crop") == "wheat":
facets.append(f"**Maturity:** {sidecar['relative_maturity']}")
if sidecar.get("wheat_class"):
facets.append(f"**Wheat class:** {sidecar['wheat_class']}")
if sidecar.get("release_year"):
facets.append(f"**Released:** {sidecar['release_year']}")
if sidecar.get("trait_stack"):
facets.append(f"**Trait stack:** {', '.join(sidecar['trait_stack'])}")
if facets:
lines.append(" · ".join(facets))
lines.append("")
urls = sidecar.get("source_urls") or []
if urls:
lines.append(f"**Source:** {urls[0]}")
lines.append("")
scale = sidecar.get("_scale_direction") or "scale direction not declared by source"
lines.append(f"**Rating scale (as published):** {scale}")
lines.append("")
# Canonical ratings — one table per group, values verbatim.
for g in sidecar.get("characteristics_groups") or []:
label = g.get("label") or "Characteristics"
items = g.get("items") or []
if not items:
continue
lines.append(f"## {label.title()}")
lines.append("")
lines.append("| Characteristic | Value |")
lines.append("|---|---|")
for it in items:
ch = (it.get("characteristic") or "").strip()
v = (it.get("value") or "").strip()
lines.append(f"| {ch} | {v} |")
lines.append("")
if sidecar.get("positioning_statement"):
lines.append("## Vendor positioning")
lines.append("")
lines.append(sidecar["positioning_statement"].strip())
lines.append("")
if sidecar.get("strengths"):
lines.append("## Strengths / management notes (vendor copy)")
lines.append("")
for s in sidecar["strengths"]:
lines.append(f"- {s}")
lines.append("")
rec = sidecar.get("regional_recommendations") or []
if rec:
names = sorted({
(r.get("product_list_name") or "").strip()
for r in rec
if (r.get("product_list_name") or "").strip()
})
if names:
lines.append("## Listed in vendor regional seed guides")
lines.append("")
for n in names:
lines.append(f"- {n}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
# ---------------------------------------------------------------------------
# Curated lessons — docs_mcp/lessons.md is the canonical source.
# ---------------------------------------------------------------------------
LESSONS_FILE = Path(__file__).resolve().parent / "lessons.md"
_lessons_cache: list[tuple[str, str]] | None = None
def _load_lessons() -> list[tuple[str, str]]:
"""Parse lessons.md into ``[(slug, body), ...]`` sections.
Sections are delimited by ``## <slug>`` headings. The slug is the
`<slug>` token (whitespace stripped); the body is everything between
that heading and the next ``## `` heading (or EOF).
"""
global _lessons_cache
if _lessons_cache is not None:
return _lessons_cache
out: list[tuple[str, str]] = []
if not LESSONS_FILE.exists():
_lessons_cache = out
return out
text = LESSONS_FILE.read_text(encoding="utf-8")
parts = re.split(r"(?m)^## (.+)$", text)
# parts = [preamble, slug1, body1, slug2, body2, ...]
for i in range(1, len(parts), 2):
slug = parts[i].strip()
body = parts[i + 1] if i + 1 < len(parts) else ""
# Drop trailing horizontal rule that separates sections.
body = re.sub(r"\n---\s*$", "", body).strip()
out.append((slug, body))
_lessons_cache = out
return out
# ===========================================================================
# Tools
# ===========================================================================
@mcp.tool()
def search_docs(
query: Annotated[str, Field(description=(
"Natural-language query about row-crop seed varieties. "
"Mention crop (corn/soybeans/wheat), maturity (RM days / "
"MG number / wheat class), traits, disease tolerances, "
"or soil/region constraints — they all carry retrieval signal."
))],
crop: Annotated[
str | None,
Field(description="OPTIONAL filter: corn, soybeans, or wheat."),
] = None,
brand: Annotated[
str | None,
Field(description=(
"OPTIONAL brand filter — DEKALB, ASGROW, WESTBRED, "
"GoldenHarvest, NK, AgriPro, Becks. Case-insensitive."
)),
] = None,
vendor: Annotated[
str | None,
Field(description="OPTIONAL vendor filter — Bayer, Syngenta, Beck's."),
] = None,
source: Annotated[
str | None,
Field(description=(
"OPTIONAL source filter — e.g. 'bayer_seeds'. Use "
"list_versions() to see what's indexed."
)),
] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str:
"""Search the seed-variety corpus for hybrids/varieties matching a query.
Returns the top-k variety chunks with their full source URLs, ratings,
maturity, traits, and regional listings. Optional filters narrow to one
crop, brand, vendor, or scraper source. Use **list_versions()** first
to discover valid facet values. Use **lookup_variety()** on any
candidate the user is serious about — that returns the canonical
sidecar so you can verify exact rating values without trusting the
free-text chunk.
Call this tool whenever an ag professional or farmer asks anything
about choosing a seed variety, comparing hybrids, or finding seed
matched to a maturity / soil / region / disease constraint.
NEVER answer seed-selection questions from prior knowledge alone —
seed catalogs change yearly and brand-specific. Always search first.
"""
with TimedCall("search_docs", {
"query": query, "crop": crop, "brand": brand,
"vendor": vendor, "source": source, "k": k,
}) as _call:
# Variety-search default: filter to data_type=variety so trial
# documents (yield trials) don't pollute identity-focused
# results. To search trials, use search_trials().
where = _build_where(crop, brand, vendor, source, None,
data_type="variety")
pool_size = max(k * 3, RERANK_POOL)
# Exact-code pre-filter. Variety codes ("DKC62-08RIB", "AG29XF4")
# have NO semantic neighbors — dense retrieval misses them and
# RRF can let off-topic noise float to top-1. If the query
# contains a token that exactly matches a variety's identifier,
# pin those varieties to the top of results.
pinned_ids: list[str] = []
for src, sk in _exact_code_matches(query):
pinned_ids.append(f"{src}::{sk}::0")
# Dense retrieval via Chroma.
try:
col = _collection()
except Exception as exc: # noqa: BLE001
_call.set(error_dense=str(exc), hits_returned=0)
return (
"_(retrieval unavailable — Chroma collection not found. "
"Has the indexer run? `python -m rag.index --rebuild`.)_"
)
try:
dense = col.query(
query_texts=[query],
n_results=pool_size,
where=where,
)
except Exception as exc: # noqa: BLE001
_call.set(error_dense=str(exc), hits_returned=0)
return f"_(retrieval failed: {exc})_"
dense_ids: list[str] = (dense.get("ids") or [[]])[0]
dense_docs: list[str] = (dense.get("documents") or [[]])[0]
dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0]
dense_dists: list[float] = (dense.get("distances") or [[]])[0]
id_to_doc = dict(zip(dense_ids, dense_docs))
id_to_meta = dict(zip(dense_ids, dense_metas))
id_to_dist = dict(zip(dense_ids, dense_dists))
# Hybrid: optionally fuse with BM25. Both are pulled at pool_size,
# fused via RRF, top-k returned.
used_hybrid = False
if HYBRID_SEARCH:
bm25 = _bm25_index()
if bm25 is not None:
bm25_hits = bm25.query(query, n=pool_size, where=where)
bm25_ids = [h[0] for h in bm25_hits]
if bm25_ids:
fused = _rrf_fuse([dense_ids, bm25_ids])
fuzzy_ids = fused
used_hybrid = True
else:
fuzzy_ids = dense_ids
else:
fuzzy_ids = dense_ids
else:
fuzzy_ids = dense_ids
# Optional reranker pass over the fuzzy pool BEFORE truncating
# to k. The cross-encoder is much more accurate at the query/
# doc pairing than dense embedding alone, especially when the
# query mentions specific ag terms that share-token-cosine
# might miss. Skipped if RERANK_URL is unset or the call
# fails — search is never blocked on the sidecar.
used_rerank = False
if RERANK_URL and fuzzy_ids:
# Need docs to rerank — fetch any missing.
need = [i for i in fuzzy_ids if i not in id_to_doc]
if need:
try:
extra = col.get(ids=need[:RERANK_POOL], include=["documents", "metadatas"])
for cid, doc, meta in zip(
extra.get("ids") or [],
extra.get("documents") or [],
extra.get("metadatas") or [],
):
id_to_doc[cid] = doc
id_to_meta[cid] = meta
except Exception as exc: # noqa: BLE001
log.warning("pre-rerank get-by-id failed: %s", exc)
pool = [(cid, id_to_doc.get(cid, "")) for cid in fuzzy_ids[:RERANK_POOL]]
reranked = _rerank(query, pool)
if reranked:
fuzzy_ids = reranked + [c for c in fuzzy_ids if c not in set(reranked)]
used_rerank = True
# Pin exact-code matches at top, then fill remainder from fuzzy
# retrieval (deduped). Pinned matches are deterministic and
# high-confidence; they should never lose to a fuzzy match.
final_ids: list[str] = []
seen: set[str] = set()
for cid in pinned_ids + fuzzy_ids:
if cid in seen:
continue
seen.add(cid)
final_ids.append(cid)
if len(final_ids) >= k:
break
# For ids returned by BM25 but not by Chroma, we need their docs/
# metadata too. Re-fetch by id.
missing = [i for i in final_ids if i not in id_to_doc]
if missing:
try:
extra = col.get(ids=missing, include=["documents", "metadatas"])
for cid, doc, meta in zip(
extra.get("ids") or [],
extra.get("documents") or [],
extra.get("metadatas") or [],
):
id_to_doc[cid] = doc
id_to_meta[cid] = meta
except Exception as exc: # noqa: BLE001
log.warning("get-by-id for BM25-only hits failed: %s", exc)
_call.set(
hits_returned=len(final_ids),
hybrid=used_hybrid,
reranked=used_rerank,
pool_size=pool_size,
)
if not final_ids:
return (
"_(no varieties matched. Try broadening the query or "
"removing filters — call list_versions() to see what's "
"indexed.)_"
)
blocks: list[str] = []
for cid in final_ids:
doc = id_to_doc.get(cid, "")
meta = id_to_meta.get(cid, {})
dist = id_to_dist.get(cid) if not used_hybrid else None
blocks.append(_format_hit(doc, meta, dist))
header = (
f"# Search results — {len(final_ids)} variety chunk"
f"{'s' if len(final_ids) != 1 else ''}"
f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n"
f"_Use `lookup_variety(source_key=...)` on any candidate "
f"to fact-check ratings from the canonical sidecar._\n\n---\n\n"
)
return header + "\n---\n\n".join(blocks)
@mcp.tool()
def get_page(
source: Annotated[str, Field(description=(
"Scraper source id — e.g. 'bayer_seeds'. Same as the `source` "
"field in search_docs results."
))],
source_key: Annotated[str, Field(description=(
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'. Same as "
"the `source_key` field in search_docs results."
))],
) -> str:
"""Return the full canonical record for one variety.
Emits a structured ratings header (identity, all characteristic
groups, vendor positioning, regional listings) sourced from the
variety's sidecar JSON, followed by the raw markdown body the
chunker indexed.
Use this when the user is comparing varieties closely or wants to
see every published rating value, not just the ones that matched a
query. The structured header is the canonical fact-check surface;
use it to validate any specific rating value before quoting it.
"""
with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call:
sidecar = _read_sidecar(source, source_key)
if sidecar is None:
_call.set(found=False)
return (
f"_(variety not found: source='{source}' "
f"source_key='{source_key}'. Use list_versions() to see "
f"available sources or search_docs() to find candidates.)_"
)
body = _read_markdown(source, source_key) or ""
structured = _structured_ratings_block(sidecar)
_call.set(found=True, body_chars=len(body), groups=len(sidecar.get("characteristics_groups") or []))
sep = "\n\n---\n\n## Indexed body (chunk source text)\n\n"
return structured + sep + body
@mcp.tool()
def list_versions() -> str:
"""List the available scraper sources and the crop/brand/vendor
facets across all indexed varieties.
Use this BEFORE search_docs() the first time you query, to discover
which scraper sources, brands, vendors, and crops are actually in
the corpus right now. The agent should not assume — the corpus
grows as more vendors are scraped.
"""
with TimedCall("list_versions", {}) as _call:
facets: dict[str, dict[str, int]] = {
"source": {}, "vendor": {}, "brand": {}, "crop": {},
}
n_varieties = 0
if CORPUS.exists():
for source_dir in sorted(CORPUS.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
for sc in source_dir.glob("*.json"):
try:
d = json.loads(sc.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
n_varieties += 1
for k in facets:
v = d.get(k)
if v:
facets[k][v] = facets[k].get(v, 0) + 1
if n_varieties == 0:
return (
"_(no varieties indexed yet — run scrapers + indexer "
"before calling search_docs)_"
)
_call.set(
varieties=n_varieties,
sources=len(facets["source"]),
vendors=len(facets["vendor"]),
brands=len(facets["brand"]),
crops=len(facets["crop"]),
)
lines = [f"# Corpus facets ({n_varieties} varieties indexed)", ""]
for label, counts in facets.items():
if not counts:
continue
lines.append(f"## {label}")
lines.append("")
for k, n in sorted(counts.items(), key=lambda kv: -kv[1]):
lines.append(f"- `{k}` — {n} varieties")
lines.append("")
return "\n".join(lines).rstrip()
@mcp.tool()
def lookup_variety(
source_key: Annotated[str, Field(description=(
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'."
))],
source: Annotated[
str | None,
Field(description=(
"OPTIONAL scraper source ('bayer_seeds' etc). If omitted, "
"scans all sources for the source_key."
)),
] = None,
) -> str:
"""Return the canonical sidecar JSON for one variety, verbatim.
USE THIS to fact-check any rating value before quoting it to the
farmer. The output is the exact data the scraper captured from the
vendor's published catalog — no paraphrasing, no inference.
Call this whenever the user (or you) needs an exact value for a
specific rating, trait, or maturity — not just a semantic match
from search_docs.
"""
with TimedCall("lookup_variety", {"source_key": source_key, "source": source}) as _call:
if source:
sidecar = _read_sidecar(source, source_key)
if sidecar is not None:
_call.set(found=True, source=source)
return "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```"
_call.set(found=False, source=source)
return f"_(variety '{source_key}' not found under source '{source}')_"
# No source given — scan all source dirs for a match.
if not CORPUS.exists():
_call.set(found=False)
return "_(corpus is empty — no scrapers have run yet)_"
matches: list[tuple[str, dict]] = []
for source_dir in sorted(CORPUS.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
sidecar = _read_sidecar(source_dir.name, source_key)
if sidecar is not None:
matches.append((source_dir.name, sidecar))
if not matches:
_call.set(found=False)
return (
f"_(no variety with source_key '{source_key}' found in any "
f"source. Use search_docs() to find the right key.)_"
)
_call.set(found=True, matches=len(matches))
if len(matches) == 1:
src_name, sidecar = matches[0]
return (
f"_(matched in source `{src_name}`)_\n\n"
"```json\n"
+ json.dumps(sidecar, indent=2, ensure_ascii=False)
+ "\n```"
)
# Multi-match: serialize each labeled.
out: list[str] = [f"_(matched {len(matches)} sources)_\n"]
for src_name, sidecar in matches:
out.append(f"### source: `{src_name}`")
out.append("```json")
out.append(json.dumps(sidecar, indent=2, ensure_ascii=False))
out.append("```")
return "\n".join(out)
@mcp.tool()
def search_trials(
query: Annotated[str, Field(description=(
"Natural-language query about yield trials. Mention crop, "
"region or state, year, soil/conditions, and any specific "
"variety codes you want compared. Examples: "
"'best corn hybrid 2024 Iowa heavy clay'; "
"'AP Iliad yield Idaho stripe rust'; "
"'DKC65-20 vs NK1748 head to head Alabama 2023'."
))],
crop: Annotated[
str | None,
Field(description="OPTIONAL: corn, soybeans, silage, or wheat."),
] = None,
state: Annotated[
str | None,
Field(description=(
"OPTIONAL state filter. 2-letter abbrev (IA, IL, NE...) "
"for Golden Harvest plot reports; full or partial region "
"name (e.g. 'Pacific Northwest', 'Montana') for AgriPro "
"trial PDFs."
)),
] = None,
year: Annotated[
int | None,
Field(description="OPTIONAL year filter (e.g. 2024).", ge=2010, le=2030),
] = None,
product: Annotated[
str | None,
Field(description=(
"OPTIONAL variety/hybrid filter — substring match against "
"the product field. Example: 'DKC62' surfaces trials "
"containing any DKC62-* hybrid."
)),
] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str:
"""Search yield-trial data — head-to-head results from real field
trials. SEPARATE from variety-identity search.
Use this when the user wants to know HOW PRODUCTS PERFORMED, not
what they ARE. Trial data complements `search_docs`:
* `search_docs` answers: "What's the disease resistance profile
of DKC62-08RIB?" (variety identity)
* `search_trials` answers: "Which corn hybrid actually won the
yield trials in central Iowa in 2024?" (performance data)
Data sources:
* **Golden Harvest plot reports** (4,000+ trials) — per-site
head-to-head comparing products from MULTIPLE BRANDS at one
cooperator's field. NK, DEKALB, Golden Harvest, sometimes
others all compete at the same site. Cross-vendor data Bayer
itself doesn't publish.
* **AgriPro regional trial PDFs** (~14 PDFs) — multi-year
multi-location wheat performance for Northern Plains / PNW /
Plains regions.
A typical workflow: call this to identify top performers in a
region/year, then call `lookup_variety(source_key=...)` on the
leaders to verify identity details (RM, traits, disease ratings).
"""
with TimedCall("search_trials", {
"query": query, "crop": crop, "state": state, "year": year,
"product": product, "k": k,
}) as _call:
where = _build_where(
crop, None, None, None, None,
data_type="trial",
state=state,
year=year,
)
pool_size = max(k * 3, RERANK_POOL)
try:
col = _collection()
except Exception as exc: # noqa: BLE001
_call.set(error_dense=str(exc), hits_returned=0)
return (
"_(retrieval unavailable — Chroma collection not found. "
"Has the indexer run? `python -m rag.index --rebuild`.)_"
)
# If a product filter is set, augment the query with the
# product code so BM25 + dense both have signal.
full_query = query
if product:
full_query = f"{query} {product}"
try:
dense = col.query(
query_texts=[full_query],
n_results=pool_size,
where=where,
)
except Exception as exc: # noqa: BLE001
_call.set(error_dense=str(exc), hits_returned=0)
return f"_(trial retrieval failed: {exc})_"
dense_ids: list[str] = (dense.get("ids") or [[]])[0]
dense_docs: list[str] = (dense.get("documents") or [[]])[0]
dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0]
dense_dists: list[float] = (dense.get("distances") or [[]])[0]
id_to_doc = dict(zip(dense_ids, dense_docs))
id_to_meta = dict(zip(dense_ids, dense_metas))
id_to_dist = dict(zip(dense_ids, dense_dists))
used_hybrid = False
if HYBRID_SEARCH:
bm25 = _bm25_index()
if bm25 is not None:
bm25_hits = bm25.query(full_query, n=pool_size, where=where)
bm25_ids = [h[0] for h in bm25_hits]
if bm25_ids:
fused = _rrf_fuse([dense_ids, bm25_ids])
fuzzy_ids = fused
used_hybrid = True
else:
fuzzy_ids = dense_ids
else:
fuzzy_ids = dense_ids
else:
fuzzy_ids = dense_ids
# Optional reranker pass over the fuzzy pool — same shape as
# in search_docs. Skipped silently if RERANK_URL is unset or
# the rerank call fails.
used_rerank = False
if RERANK_URL and fuzzy_ids:
need = [i for i in fuzzy_ids if i not in id_to_doc]
if need:
try:
extra = col.get(ids=need[:RERANK_POOL], include=["documents", "metadatas"])
for cid, doc, meta in zip(
extra.get("ids") or [],
extra.get("documents") or [],
extra.get("metadatas") or [],
):
id_to_doc[cid] = doc
id_to_meta[cid] = meta
except Exception as exc: # noqa: BLE001
log.warning("pre-rerank get-by-id failed: %s", exc)
pool = [(cid, id_to_doc.get(cid, "")) for cid in fuzzy_ids[:RERANK_POOL]]
reranked = _rerank(full_query, pool)
if reranked:
fuzzy_ids = reranked + [c for c in fuzzy_ids if c not in set(reranked)]
used_rerank = True
# Optional product-substring post-filter: if user supplied
# ``product``, require the chunk to actually contain the
# token. This re-checks the bytes since BM25 only sees stems.
if product:
needle = product.lower()
def _has_product(cid: str) -> bool:
doc = id_to_doc.get(cid, "")
if needle in doc.lower():
return True
# Not yet fetched — defer; the get-by-id below will fix.
return cid not in id_to_doc
fuzzy_ids = [cid for cid in fuzzy_ids if _has_product(cid)]
final_ids: list[str] = []
seen: set[str] = set()
for cid in fuzzy_ids:
if cid in seen:
continue
seen.add(cid)
final_ids.append(cid)
if len(final_ids) >= k:
break
missing = [i for i in final_ids if i not in id_to_doc]
if missing:
try:
extra = col.get(ids=missing, include=["documents", "metadatas"])
for cid, doc, meta in zip(
extra.get("ids") or [],
extra.get("documents") or [],
extra.get("metadatas") or [],
):
id_to_doc[cid] = doc
id_to_meta[cid] = meta
except Exception as exc: # noqa: BLE001
log.warning("get-by-id for BM25-only hits failed: %s", exc)
# Apply product filter once we have docs from the get-by-id pass.
if product:
needle = product.lower()
final_ids = [cid for cid in final_ids if needle in id_to_doc.get(cid, "").lower()]
_call.set(
hits_returned=len(final_ids),
hybrid=used_hybrid,
reranked=used_rerank,
pool_size=pool_size,
data_type="trial",
)
if not final_ids:
return (
"_(no trials matched. Try widening — drop the state, "
"year, or product filter. `list_versions()` shows "
"which trial sources are indexed.)_"
)
blocks: list[str] = []
for cid in final_ids:
doc = id_to_doc.get(cid, "")
meta = id_to_meta.get(cid, {})
dist = id_to_dist.get(cid) if not used_hybrid else None
blocks.append(_format_trial_hit(doc, meta, dist))
header = (
f"# Trial search results — {len(final_ids)} trial document"
f"{'s' if len(final_ids) != 1 else ''}"
f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n"
f"_Use `get_page(source=..., source_key=...)` to read the "
f"full trial body. Use `lookup_variety(source_key=...)` on "
f"any product code to verify its identity (RM, traits, "
f"disease ratings)._\n\n---\n\n"
)
return header + "\n---\n\n".join(blocks)
def _format_trial_hit(doc: str, meta: dict, distance: float | None = None) -> str:
"""Trial-specific result header. Highlights crop/state/year and
sources URL (vs variety hits which emphasize brand + product
identity)."""
src_url = meta.get("source_url") or ""
src_key = meta.get("source_key") or ""
src = meta.get("source") or ""
crop = meta.get("crop") or ""
state = meta.get("state") or ""
year = meta.get("year") or ""
region = meta.get("region") or ""
title_bits = [b for b in [crop.title(), region or state, str(year) if year else ""] if b]
title = " · ".join(title_bits) if title_bits else src_key
header = (
f"### Trial: {title} \n"
f"`{src}::{src_key}` — {meta.get('vendor', '')} / {meta.get('brand', '')} \n"
f"<{src_url}>"
)
if distance is not None:
header += f" \n_(distance={distance:.4f})_"
return f"{header}\n\n{doc.strip()}\n"
@mcp.tool()
def crop_seed_api_lessons(
topic: Annotated[
str | None,
Field(description=(
"OPTIONAL topic — match against lesson section slugs or body "
"(substring, case-insensitive). Known slugs: pioneer, "
"rating-scales, maturity-semantics, trait-glossary, "
"scn-resistance, regional-listings, sources-not-yet-indexed, "
"checking-your-work. Omit for the full curated index."
)),
] = None,
) -> str:
"""Curated knowledge that does NOT live in the scraped corpus —
vendor scale-direction notes, trait glossary, maturity semantics,
SCN resistance interpretation, the **Pioneer fallback policy**,
and rules for fact-checking your work.
Call this tool when:
* The user asks about **Pioneer** or any P-series hybrid — Pioneer
is intentionally NOT scraped (ToS bans it); the lesson tells you
what to say instead.
* You need to compare ratings across vendors — different vendors
publish on different scale directions.
* You're parsing a trait code or disease abbreviation you don't
recognize.
* Before quoting a specific rating value to a farmer — the
``checking-your-work`` lesson reminds you to call
``lookup_variety`` to confirm.
This tool is **the only source of opinionated content** in the
server. Everything else returned by search_docs / get_page /
lookup_variety is verbatim from vendor catalogs.
"""
with TimedCall("crop_seed_api_lessons", {"topic": topic}) as _call:
sections = _load_lessons()
if not sections:
_call.set(sections_returned=0)
return "_(no lessons file present — docs_mcp/lessons.md missing)_"
if not topic:
_call.set(sections_returned=len(sections))
return "\n\n---\n\n".join(
f"## {slug}\n\n{body}" for slug, body in sections
)
needle = topic.strip().lower()
# Prefer slug matches (most specific). Fall back to body match
# only when no slug matches — keeps a query like "rating" from
# returning every section that happens to mention the word.
slug_matches: list[tuple[str, str]] = []
body_matches: list[tuple[str, str]] = []
for slug, body in sections:
if needle in slug.lower():
slug_matches.append((slug, body))
elif needle in body.lower():
body_matches.append((slug, body))
matched = slug_matches if slug_matches else body_matches
_call.set(sections_returned=len(matched), topic=topic)
if not matched:
slugs = ", ".join(s for s, _ in sections)
return (
f"_(no lesson section matched topic '{topic}'. "
f"Available slugs: {slugs}.)_"
)
return "\n\n---\n\n".join(
f"## {slug}\n\n{body}" for slug, body in matched
)
# ===========================================================================
# Entry point
# ===========================================================================
def main() -> None:
import argparse
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
default=os.environ.get("MCP_TRANSPORT", "stdio"))
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
args = p.parse_args()
if args.transport == "stdio":
mcp.run()
else:
mcp.settings.host = args.host
mcp.settings.port = args.port
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
mcp.settings.transport_security.enable_dns_rebinding_protection = False
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()