bd71f30ca7
Phase 6 — Reranker integration
- New _rerank(query, [(cid, doc), ...]) helper in server.py calls
llama.cpp's /v1/rerank endpoint, returns reranker-ordered ids
or None on failure (graceful fallback — search never blocks
on the sidecar).
- search_docs + search_trials both call _rerank() on the post-
hybrid pool BEFORE truncating to k. The variety-code prefilter
still pins exact matches on top.
- Per-doc truncation to 2000 chars to fit jina-reranker-v2-base's
per-pair token budget. Full chunk text still returned to the
caller — truncation is rerank-input-only.
- Telemetry adds `reranked: true|false` so usage logs distinguish
reranked calls.
Phase 7 — Eval harness
- eval/queries.jsonl: 21 golden queries spanning:
* variety-code lookups (DKC62-08RIB, AG29XF4, WB6430, E085Z5,
AP Iliad)
* semantic variety queries (drought-tolerant corn, SCN MG-3
soy, Rps3a, XtendFlex, HRS stripe rust, SWW PNW, Goss's Wilt)
* trial queries (IA/IN/MN regional, AP Iliad ID, NK1701 head-
to-head, silage Ton/Acre, product=DKC65-95)
* anti-hallucination (Pioneer P1142 fallback, DKC65-20 not-in-
corpus expected_empty)
- eval/retrievers.py: 4 named retrievers — dense, bm25, hybrid
(dense+bm25+RRF), hybrid+rerank — all sharing the same filter
shape as docs_mcp/server.py._build_where.
- eval/run_eval.py: runs each retriever against each query,
reports Recall / Precision@1 / MRR / avg latency. Markdown
output in eval/results/baseline.md.
Baseline results (k=5, 21 queries):
| Retriever | Pass | Recall | P@1 | MRR | Avg ms |
|-----------------|-------|--------|-------|-------|--------|
| hybrid+rerank | 21/21 | 100% | 90% | 0.905 | 2064 |
| bm25 | 20/21 | 95% | 81% | 0.833 | 5 |
| hybrid | 15/21 | 71% | 62% | 0.619 | 73 |
| dense | 14/21 | 67% | 38% | 0.440 | 79 |
Key findings:
1. hybrid+rerank wins on quality — 100% pass, 90% P@1.
2. BM25 alone is surprisingly competitive (95% pass) at 5 ms —
excellent fallback when rerank is down. The variety-code
prefilter in search_docs is doing a lot of work here.
3. Dense embedding alone is the WEAKEST configuration on this
corpus — variety identity tokens (DKC62-08RIB, AP Iliad,
Rps3a) have no semantic neighbors, so nomic-embed-text returns
noise. The hybrid (no rerank) layer actively hurts because
RRF dilutes the BM25 ranking with dense noise.
4. Anti-hallucination queries (Pioneer fallback, DKC65-20 not-
in-corpus) pass on ALL retrievers including dense-only —
the must_not_contain + expected_empty design holds.
Deploy decision: HYBRID_SEARCH=true + RERANK_URL set
(production env already has both — refresh.yml + image-only.yml
+ deploy/docker-compose.yml all configured).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1209 lines
47 KiB
Python
1209 lines
47 KiB
Python
"""seed-mcp — MCP server over public US row-crop seed catalogs.
|
|
|
|
Tools (all read-only):
|
|
|
|
search_docs natural-language retrieval over the corpus
|
|
get_page the full markdown body of one variety + sidecar
|
|
list_versions facet discovery (crops / brands / vendors / sources)
|
|
lookup_variety canonical sidecar JSON by source_key — fact-check
|
|
anything you surface from search_docs against this
|
|
|
|
The contract with the calling agent is **never fabricate**. Every chunk
|
|
we return is verbatim from the source's published catalog (the chunker
|
|
rebuilds chunks deterministically from the sidecar JSON). Every
|
|
response carries the variety's source URL so the agent can cite. The
|
|
``lookup_variety`` tool exists specifically so the agent can validate
|
|
specific rating values without having to trust paraphrased retrieval
|
|
text.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Annotated, Any
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pydantic import Field
|
|
|
|
from .usage import TimedCall
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Product-specific configuration.
|
|
# ---------------------------------------------------------------------------
|
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
|
|
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp")
|
|
COLLECTION = f"{PRODUCT_NAME}_docs"
|
|
|
|
# Paths inside the deployed container (and matching layout locally for dev).
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
CORPUS = ROOT / "corpus"
|
|
CHROMA_DIR = ROOT / "chroma"
|
|
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature flags.
|
|
# ---------------------------------------------------------------------------
|
|
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
|
|
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
|
|
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
|
|
|
|
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "true").lower() in ("true", "1", "yes", "on")
|
|
RRF_K = int(os.environ.get("RRF_K", "60"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FastMCP setup.
|
|
# ---------------------------------------------------------------------------
|
|
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lazy singletons. Instantiate on first use so the server can start even
|
|
# when (e.g.) Ollama is briefly unreachable — degraded modes fall back
|
|
# gracefully rather than refusing to boot.
|
|
# ---------------------------------------------------------------------------
|
|
_chroma_client = None
|
|
_chroma_collection = None
|
|
_bm25 = None
|
|
_code_index: dict[str, list[tuple[str, str]]] | None = None
|
|
|
|
|
|
def _build_code_index() -> dict[str, list[tuple[str, str]]]:
|
|
"""Walk sidecars once and index varieties by every lookup key a
|
|
farmer or LLM might paste: ``source_key``, ``hybrid_prefix``,
|
|
``product_name`` (normalized), each token from
|
|
``hybrid_prefix``/``product_name`` longer than three chars.
|
|
|
|
Returns a dict mapping the normalized key → list of
|
|
``(source, source_key)`` tuples. Multiple-source matches are kept
|
|
so we can warn the agent about ambiguity.
|
|
"""
|
|
idx: dict[str, list[tuple[str, str]]] = {}
|
|
|
|
def _add(key: str, value: tuple[str, str]) -> None:
|
|
key = (key or "").lower().strip()
|
|
if not key or len(key) < 4:
|
|
return
|
|
idx.setdefault(key, [])
|
|
if value not in idx[key]:
|
|
idx[key].append(value)
|
|
|
|
if not CORPUS.exists():
|
|
return idx
|
|
for source_dir in CORPUS.iterdir():
|
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
|
continue
|
|
for sc in source_dir.glob("*.json"):
|
|
try:
|
|
d = json.loads(sc.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
source = d.get("source") or source_dir.name
|
|
sk = d.get("source_key") or sc.stem
|
|
value = (source, sk)
|
|
_add(sk, value)
|
|
_add(d.get("hybrid_prefix") or "", value)
|
|
_add(d.get("product_name") or "", value)
|
|
# Token-split product_name and hybrid_prefix so "DKC62-08RIB"
|
|
# in a query matches "DKC62-08RIB BRAND BLEND" via the
|
|
# "DKC62-08RIB" token as well as the full string.
|
|
for source_field in (d.get("product_name"), d.get("hybrid_prefix")):
|
|
if not source_field:
|
|
continue
|
|
for tok in re.split(r"[\s()/,]+", source_field):
|
|
tok = tok.strip()
|
|
# Only retain tokens that LOOK like codes — at least
|
|
# one digit, mostly alphanumeric/dash. Skips "BRAND"
|
|
# / "BLEND" / etc.
|
|
if re.match(r"^[A-Za-z]*\d[\w\-]*$", tok):
|
|
_add(tok, value)
|
|
return idx
|
|
|
|
|
|
def _code_lookup() -> dict[str, list[tuple[str, str]]]:
|
|
global _code_index
|
|
if _code_index is None:
|
|
_code_index = _build_code_index()
|
|
log.info("code-index: %d lookup keys built", len(_code_index))
|
|
return _code_index
|
|
|
|
|
|
# Tokens that LOOK like a variety code — at least one digit, otherwise
|
|
# alphanumeric / dash. Catches "DKC62-08RIB", "AG29XF4", "WB1376CLP",
|
|
# "VT2PRIB"; skips ordinary words like "ratings".
|
|
_CODE_TOKEN_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9\-]{2,})\b")
|
|
|
|
|
|
def _exact_code_matches(query: str) -> list[tuple[str, str]]:
|
|
"""Find varieties whose source_key / product_name / token-split
|
|
components contain a query token. Used as a high-confidence pin
|
|
before fuzzy retrieval."""
|
|
idx = _code_lookup()
|
|
if not idx:
|
|
return []
|
|
out: list[tuple[str, str]] = []
|
|
seen: set[tuple[str, str]] = set()
|
|
for m in _CODE_TOKEN_RE.finditer(query or ""):
|
|
tok = m.group(1).lower()
|
|
if len(tok) < 4 or not any(c.isdigit() for c in tok):
|
|
continue
|
|
for v in idx.get(tok, []):
|
|
if v not in seen:
|
|
seen.add(v)
|
|
out.append(v)
|
|
return out
|
|
|
|
|
|
def _collection():
|
|
"""Return the Chroma collection, opening it lazily."""
|
|
global _chroma_client, _chroma_collection
|
|
if _chroma_collection is not None:
|
|
return _chroma_collection
|
|
import chromadb
|
|
from chromadb.config import Settings
|
|
from rag.embeddings import embedding_function
|
|
|
|
_chroma_client = chromadb.PersistentClient(
|
|
path=str(CHROMA_DIR),
|
|
settings=Settings(anonymized_telemetry=False),
|
|
)
|
|
_chroma_collection = _chroma_client.get_collection(
|
|
COLLECTION, embedding_function=embedding_function()
|
|
)
|
|
return _chroma_collection
|
|
|
|
|
|
def _bm25_index():
|
|
"""Return the BM25 index, or None if it doesn't exist on disk."""
|
|
global _bm25
|
|
if _bm25 is not None:
|
|
return _bm25
|
|
from rag.bm25 import BM25Index
|
|
idx = BM25Index(BM25_DB)
|
|
if not idx.exists():
|
|
return None
|
|
_bm25 = idx
|
|
return _bm25
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers — local file IO + filter builder.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_where(
|
|
crop: str | None,
|
|
brand: str | None,
|
|
vendor: str | None,
|
|
source: str | None,
|
|
source_key: str | None,
|
|
*,
|
|
data_type: str | None = None,
|
|
state: str | None = None,
|
|
year: int | None = None,
|
|
) -> dict | None:
|
|
"""Translate filter args into a Chroma `where` clause."""
|
|
conds: list[dict] = []
|
|
if data_type:
|
|
conds.append({"data_type": data_type})
|
|
if crop:
|
|
conds.append({"crop": crop.lower()})
|
|
if brand:
|
|
conds.append({"brand": brand.upper()})
|
|
if vendor:
|
|
conds.append({"vendor": vendor})
|
|
if source:
|
|
conds.append({"source": source})
|
|
if source_key:
|
|
conds.append({"source_key": source_key})
|
|
if state:
|
|
conds.append({"state": state.upper() if len(state) <= 3 else state})
|
|
if year:
|
|
conds.append({"year": int(year)})
|
|
if not conds:
|
|
return None
|
|
if len(conds) == 1:
|
|
return conds[0]
|
|
return {"$and": conds}
|
|
|
|
|
|
def _read_sidecar(source: str, source_key: str) -> dict | None:
|
|
"""Read a variety's sidecar JSON off disk."""
|
|
path = CORPUS / source / f"{source_key}.json"
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
log.warning("sidecar read failed for %s/%s: %s", source, source_key, exc)
|
|
return None
|
|
|
|
|
|
def _read_markdown(source: str, source_key: str) -> str | None:
|
|
path = CORPUS / source / f"{source_key}.md"
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
return path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return None
|
|
|
|
|
|
def _format_hit(doc: str, meta: dict, distance: float | None = None) -> str:
|
|
"""Render one retrieval hit as a fenced markdown block with full
|
|
provenance attached. ``doc`` is the chunk text; ``meta`` is the
|
|
chunk's metadata dict."""
|
|
src_url = meta.get("source_url") or ""
|
|
src_key = meta.get("source_key") or ""
|
|
src = meta.get("source") or ""
|
|
vendor = meta.get("vendor") or ""
|
|
brand = meta.get("brand") or ""
|
|
crop = meta.get("crop") or ""
|
|
name = meta.get("product_name") or src_key
|
|
|
|
header = (
|
|
f"### {name} \n"
|
|
f"`{src}::{src_key}` — {vendor} / {brand} / {crop} \n"
|
|
f"<{src_url}>"
|
|
)
|
|
if distance is not None:
|
|
header += f" \n_(distance={distance:.4f})_"
|
|
return f"{header}\n\n{doc.strip()}\n"
|
|
|
|
|
|
def _rrf_fuse(rankings: list[list[str]], k: int = RRF_K) -> list[str]:
|
|
"""Reciprocal Rank Fusion — merge multiple ranked id lists into one.
|
|
|
|
Score(id) = sum over each ranking R of 1 / (k + rank_R(id)).
|
|
Robust to score-scale differences between dense (cosine) and lexical
|
|
(BM25). k=60 is the literature default; not particularly sensitive.
|
|
"""
|
|
scores: dict[str, float] = {}
|
|
for ranking in rankings:
|
|
for rank, doc_id in enumerate(ranking):
|
|
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)
|
|
return sorted(scores, key=lambda d: scores[d], reverse=True)
|
|
|
|
|
|
# Per-doc character cap when sending to the reranker. jina-reranker-v2-base
|
|
# accepts up to ~1024 tokens PER QUERY+DOC PAIR (n_ctx_train) and rejects
|
|
# the WHOLE BATCH if any one pair exceeds it. Truncating each doc to
|
|
# ~2000 chars (≈ 500-700 tokens) leaves headroom for the query + chat
|
|
# template overhead. The truncation is reranking-only — full chunk text
|
|
# still goes back to the LLM caller.
|
|
RERANK_DOC_MAX_CHARS = 2000
|
|
|
|
|
|
def _rerank(query: str, candidates: list[tuple[str, str]]) -> list[str] | None:
|
|
"""Call the llama.cpp /v1/rerank endpoint and return the candidate
|
|
chunk ids in reranker-preferred order.
|
|
|
|
Args:
|
|
query: the user's natural-language query
|
|
candidates: list of ``(chunk_id, chunk_text)`` to rerank.
|
|
|
|
Returns:
|
|
A list of chunk_ids ordered best-first by reranker score, OR
|
|
``None`` if reranking is disabled, the endpoint is unreachable,
|
|
or any other error. The caller treats ``None`` as "fall back to
|
|
the input ranking" — rerank failures must NEVER block a search.
|
|
|
|
Anti-hallucination: rerank only reorders chunks the retrievers
|
|
already surfaced. It cannot introduce content not in the corpus.
|
|
"""
|
|
if not RERANK_URL or not candidates:
|
|
return None
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return None
|
|
|
|
# Truncate each doc to fit the per-pair token budget. jina-reranker
|
|
# rejects the entire batch on any oversize doc.
|
|
docs = [(text[:RERANK_DOC_MAX_CHARS] if text else "") for _cid, text in candidates]
|
|
ids = [cid for cid, _ in candidates]
|
|
|
|
try:
|
|
with httpx.Client(timeout=RERANK_TIMEOUT) as c:
|
|
r = c.post(
|
|
f"{RERANK_URL}/v1/rerank",
|
|
json={
|
|
"model": "rerank", # llama.cpp ignores this; jina passes through
|
|
"query": query,
|
|
"documents": docs,
|
|
},
|
|
)
|
|
r.raise_for_status()
|
|
payload = r.json()
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("rerank request failed (%s) — falling back to input order", exc)
|
|
return None
|
|
|
|
results = payload.get("results") or []
|
|
if not results:
|
|
log.warning("rerank returned empty results — falling back to input order")
|
|
return None
|
|
|
|
# llama.cpp returns results as [{"index": int, "relevance_score": float}, ...]
|
|
# Higher relevance_score = better; sort descending.
|
|
try:
|
|
ordered = sorted(results, key=lambda r: -r.get("relevance_score", float("-inf")))
|
|
return [ids[r["index"]] for r in ordered if 0 <= r.get("index", -1) < len(ids)]
|
|
except (KeyError, IndexError, TypeError) as exc:
|
|
log.warning("rerank response malformed (%s) — falling back to input order", exc)
|
|
return None
|
|
|
|
|
|
def _structured_ratings_block(sidecar: dict) -> str:
|
|
"""Render the sidecar's grouped characteristics + identity as a
|
|
fact-checkable block, with the source URL pinned at top.
|
|
|
|
This is the response body for ``get_page`` and the embedded
|
|
"canonical data" block agents should trust over any free-text
|
|
paraphrase."""
|
|
lines: list[str] = []
|
|
name = sidecar.get("product_name") or sidecar.get("source_key", "")
|
|
lines.append(f"# {name}")
|
|
lines.append("")
|
|
|
|
facets: list[str] = []
|
|
if sidecar.get("vendor"):
|
|
facets.append(f"**Vendor:** {sidecar['vendor']}")
|
|
if sidecar.get("brand"):
|
|
facets.append(f"**Brand:** {str(sidecar['brand']).title()}")
|
|
if sidecar.get("crop"):
|
|
facets.append(f"**Crop:** {str(sidecar['crop']).capitalize()}")
|
|
if sidecar.get("relative_maturity") and sidecar.get("crop") == "corn":
|
|
facets.append(f"**Relative maturity:** {sidecar['relative_maturity']}")
|
|
if sidecar.get("maturity_group") and sidecar.get("crop") == "soybeans":
|
|
facets.append(f"**Maturity group:** {sidecar['maturity_group']}")
|
|
if sidecar.get("relative_maturity") and sidecar.get("crop") == "wheat":
|
|
facets.append(f"**Maturity:** {sidecar['relative_maturity']}")
|
|
if sidecar.get("wheat_class"):
|
|
facets.append(f"**Wheat class:** {sidecar['wheat_class']}")
|
|
if sidecar.get("release_year"):
|
|
facets.append(f"**Released:** {sidecar['release_year']}")
|
|
if sidecar.get("trait_stack"):
|
|
facets.append(f"**Trait stack:** {', '.join(sidecar['trait_stack'])}")
|
|
if facets:
|
|
lines.append(" · ".join(facets))
|
|
lines.append("")
|
|
|
|
urls = sidecar.get("source_urls") or []
|
|
if urls:
|
|
lines.append(f"**Source:** {urls[0]}")
|
|
lines.append("")
|
|
|
|
scale = sidecar.get("_scale_direction") or "scale direction not declared by source"
|
|
lines.append(f"**Rating scale (as published):** {scale}")
|
|
lines.append("")
|
|
|
|
# Canonical ratings — one table per group, values verbatim.
|
|
for g in sidecar.get("characteristics_groups") or []:
|
|
label = g.get("label") or "Characteristics"
|
|
items = g.get("items") or []
|
|
if not items:
|
|
continue
|
|
lines.append(f"## {label.title()}")
|
|
lines.append("")
|
|
lines.append("| Characteristic | Value |")
|
|
lines.append("|---|---|")
|
|
for it in items:
|
|
ch = (it.get("characteristic") or "").strip()
|
|
v = (it.get("value") or "").strip()
|
|
lines.append(f"| {ch} | {v} |")
|
|
lines.append("")
|
|
|
|
if sidecar.get("positioning_statement"):
|
|
lines.append("## Vendor positioning")
|
|
lines.append("")
|
|
lines.append(sidecar["positioning_statement"].strip())
|
|
lines.append("")
|
|
|
|
if sidecar.get("strengths"):
|
|
lines.append("## Strengths / management notes (vendor copy)")
|
|
lines.append("")
|
|
for s in sidecar["strengths"]:
|
|
lines.append(f"- {s}")
|
|
lines.append("")
|
|
|
|
rec = sidecar.get("regional_recommendations") or []
|
|
if rec:
|
|
names = sorted({
|
|
(r.get("product_list_name") or "").strip()
|
|
for r in rec
|
|
if (r.get("product_list_name") or "").strip()
|
|
})
|
|
if names:
|
|
lines.append("## Listed in vendor regional seed guides")
|
|
lines.append("")
|
|
for n in names:
|
|
lines.append(f"- {n}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines).rstrip() + "\n"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Curated lessons — docs_mcp/lessons.md is the canonical source.
|
|
# ---------------------------------------------------------------------------
|
|
LESSONS_FILE = Path(__file__).resolve().parent / "lessons.md"
|
|
_lessons_cache: list[tuple[str, str]] | None = None
|
|
|
|
|
|
def _load_lessons() -> list[tuple[str, str]]:
|
|
"""Parse lessons.md into ``[(slug, body), ...]`` sections.
|
|
|
|
Sections are delimited by ``## <slug>`` headings. The slug is the
|
|
`<slug>` token (whitespace stripped); the body is everything between
|
|
that heading and the next ``## `` heading (or EOF).
|
|
"""
|
|
global _lessons_cache
|
|
if _lessons_cache is not None:
|
|
return _lessons_cache
|
|
out: list[tuple[str, str]] = []
|
|
if not LESSONS_FILE.exists():
|
|
_lessons_cache = out
|
|
return out
|
|
text = LESSONS_FILE.read_text(encoding="utf-8")
|
|
parts = re.split(r"(?m)^## (.+)$", text)
|
|
# parts = [preamble, slug1, body1, slug2, body2, ...]
|
|
for i in range(1, len(parts), 2):
|
|
slug = parts[i].strip()
|
|
body = parts[i + 1] if i + 1 < len(parts) else ""
|
|
# Drop trailing horizontal rule that separates sections.
|
|
body = re.sub(r"\n---\s*$", "", body).strip()
|
|
out.append((slug, body))
|
|
_lessons_cache = out
|
|
return out
|
|
|
|
|
|
# ===========================================================================
|
|
# Tools
|
|
# ===========================================================================
|
|
|
|
@mcp.tool()
|
|
def search_docs(
|
|
query: Annotated[str, Field(description=(
|
|
"Natural-language query about row-crop seed varieties. "
|
|
"Mention crop (corn/soybeans/wheat), maturity (RM days / "
|
|
"MG number / wheat class), traits, disease tolerances, "
|
|
"or soil/region constraints — they all carry retrieval signal."
|
|
))],
|
|
crop: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL filter: corn, soybeans, or wheat."),
|
|
] = None,
|
|
brand: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL brand filter — DEKALB, ASGROW, WESTBRED, "
|
|
"GoldenHarvest, NK, AgriPro, Becks. Case-insensitive."
|
|
)),
|
|
] = None,
|
|
vendor: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL vendor filter — Bayer, Syngenta, Beck's."),
|
|
] = None,
|
|
source: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL source filter — e.g. 'bayer_seeds'. Use "
|
|
"list_versions() to see what's indexed."
|
|
)),
|
|
] = None,
|
|
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
|
|
) -> str:
|
|
"""Search the seed-variety corpus for hybrids/varieties matching a query.
|
|
|
|
Returns the top-k variety chunks with their full source URLs, ratings,
|
|
maturity, traits, and regional listings. Optional filters narrow to one
|
|
crop, brand, vendor, or scraper source. Use **list_versions()** first
|
|
to discover valid facet values. Use **lookup_variety()** on any
|
|
candidate the user is serious about — that returns the canonical
|
|
sidecar so you can verify exact rating values without trusting the
|
|
free-text chunk.
|
|
|
|
Call this tool whenever an ag professional or farmer asks anything
|
|
about choosing a seed variety, comparing hybrids, or finding seed
|
|
matched to a maturity / soil / region / disease constraint.
|
|
|
|
NEVER answer seed-selection questions from prior knowledge alone —
|
|
seed catalogs change yearly and brand-specific. Always search first.
|
|
"""
|
|
with TimedCall("search_docs", {
|
|
"query": query, "crop": crop, "brand": brand,
|
|
"vendor": vendor, "source": source, "k": k,
|
|
}) as _call:
|
|
# Variety-search default: filter to data_type=variety so trial
|
|
# documents (yield trials) don't pollute identity-focused
|
|
# results. To search trials, use search_trials().
|
|
where = _build_where(crop, brand, vendor, source, None,
|
|
data_type="variety")
|
|
pool_size = max(k * 3, RERANK_POOL)
|
|
|
|
# Exact-code pre-filter. Variety codes ("DKC62-08RIB", "AG29XF4")
|
|
# have NO semantic neighbors — dense retrieval misses them and
|
|
# RRF can let off-topic noise float to top-1. If the query
|
|
# contains a token that exactly matches a variety's identifier,
|
|
# pin those varieties to the top of results.
|
|
pinned_ids: list[str] = []
|
|
for src, sk in _exact_code_matches(query):
|
|
pinned_ids.append(f"{src}::{sk}::0")
|
|
|
|
# Dense retrieval via Chroma.
|
|
try:
|
|
col = _collection()
|
|
except Exception as exc: # noqa: BLE001
|
|
_call.set(error_dense=str(exc), hits_returned=0)
|
|
return (
|
|
"_(retrieval unavailable — Chroma collection not found. "
|
|
"Has the indexer run? `python -m rag.index --rebuild`.)_"
|
|
)
|
|
|
|
try:
|
|
dense = col.query(
|
|
query_texts=[query],
|
|
n_results=pool_size,
|
|
where=where,
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
_call.set(error_dense=str(exc), hits_returned=0)
|
|
return f"_(retrieval failed: {exc})_"
|
|
|
|
dense_ids: list[str] = (dense.get("ids") or [[]])[0]
|
|
dense_docs: list[str] = (dense.get("documents") or [[]])[0]
|
|
dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0]
|
|
dense_dists: list[float] = (dense.get("distances") or [[]])[0]
|
|
|
|
id_to_doc = dict(zip(dense_ids, dense_docs))
|
|
id_to_meta = dict(zip(dense_ids, dense_metas))
|
|
id_to_dist = dict(zip(dense_ids, dense_dists))
|
|
|
|
# Hybrid: optionally fuse with BM25. Both are pulled at pool_size,
|
|
# fused via RRF, top-k returned.
|
|
used_hybrid = False
|
|
if HYBRID_SEARCH:
|
|
bm25 = _bm25_index()
|
|
if bm25 is not None:
|
|
bm25_hits = bm25.query(query, n=pool_size, where=where)
|
|
bm25_ids = [h[0] for h in bm25_hits]
|
|
if bm25_ids:
|
|
fused = _rrf_fuse([dense_ids, bm25_ids])
|
|
fuzzy_ids = fused
|
|
used_hybrid = True
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
|
|
# Optional reranker pass over the fuzzy pool BEFORE truncating
|
|
# to k. The cross-encoder is much more accurate at the query/
|
|
# doc pairing than dense embedding alone, especially when the
|
|
# query mentions specific ag terms that share-token-cosine
|
|
# might miss. Skipped if RERANK_URL is unset or the call
|
|
# fails — search is never blocked on the sidecar.
|
|
used_rerank = False
|
|
if RERANK_URL and fuzzy_ids:
|
|
# Need docs to rerank — fetch any missing.
|
|
need = [i for i in fuzzy_ids if i not in id_to_doc]
|
|
if need:
|
|
try:
|
|
extra = col.get(ids=need[:RERANK_POOL], include=["documents", "metadatas"])
|
|
for cid, doc, meta in zip(
|
|
extra.get("ids") or [],
|
|
extra.get("documents") or [],
|
|
extra.get("metadatas") or [],
|
|
):
|
|
id_to_doc[cid] = doc
|
|
id_to_meta[cid] = meta
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("pre-rerank get-by-id failed: %s", exc)
|
|
pool = [(cid, id_to_doc.get(cid, "")) for cid in fuzzy_ids[:RERANK_POOL]]
|
|
reranked = _rerank(query, pool)
|
|
if reranked:
|
|
fuzzy_ids = reranked + [c for c in fuzzy_ids if c not in set(reranked)]
|
|
used_rerank = True
|
|
|
|
# Pin exact-code matches at top, then fill remainder from fuzzy
|
|
# retrieval (deduped). Pinned matches are deterministic and
|
|
# high-confidence; they should never lose to a fuzzy match.
|
|
final_ids: list[str] = []
|
|
seen: set[str] = set()
|
|
for cid in pinned_ids + fuzzy_ids:
|
|
if cid in seen:
|
|
continue
|
|
seen.add(cid)
|
|
final_ids.append(cid)
|
|
if len(final_ids) >= k:
|
|
break
|
|
|
|
# For ids returned by BM25 but not by Chroma, we need their docs/
|
|
# metadata too. Re-fetch by id.
|
|
missing = [i for i in final_ids if i not in id_to_doc]
|
|
if missing:
|
|
try:
|
|
extra = col.get(ids=missing, include=["documents", "metadatas"])
|
|
for cid, doc, meta in zip(
|
|
extra.get("ids") or [],
|
|
extra.get("documents") or [],
|
|
extra.get("metadatas") or [],
|
|
):
|
|
id_to_doc[cid] = doc
|
|
id_to_meta[cid] = meta
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("get-by-id for BM25-only hits failed: %s", exc)
|
|
|
|
_call.set(
|
|
hits_returned=len(final_ids),
|
|
hybrid=used_hybrid,
|
|
reranked=used_rerank,
|
|
pool_size=pool_size,
|
|
)
|
|
|
|
if not final_ids:
|
|
return (
|
|
"_(no varieties matched. Try broadening the query or "
|
|
"removing filters — call list_versions() to see what's "
|
|
"indexed.)_"
|
|
)
|
|
|
|
blocks: list[str] = []
|
|
for cid in final_ids:
|
|
doc = id_to_doc.get(cid, "")
|
|
meta = id_to_meta.get(cid, {})
|
|
dist = id_to_dist.get(cid) if not used_hybrid else None
|
|
blocks.append(_format_hit(doc, meta, dist))
|
|
|
|
header = (
|
|
f"# Search results — {len(final_ids)} variety chunk"
|
|
f"{'s' if len(final_ids) != 1 else ''}"
|
|
f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n"
|
|
f"_Use `lookup_variety(source_key=...)` on any candidate "
|
|
f"to fact-check ratings from the canonical sidecar._\n\n---\n\n"
|
|
)
|
|
return header + "\n---\n\n".join(blocks)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_page(
|
|
source: Annotated[str, Field(description=(
|
|
"Scraper source id — e.g. 'bayer_seeds'. Same as the `source` "
|
|
"field in search_docs results."
|
|
))],
|
|
source_key: Annotated[str, Field(description=(
|
|
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'. Same as "
|
|
"the `source_key` field in search_docs results."
|
|
))],
|
|
) -> str:
|
|
"""Return the full canonical record for one variety.
|
|
|
|
Emits a structured ratings header (identity, all characteristic
|
|
groups, vendor positioning, regional listings) sourced from the
|
|
variety's sidecar JSON, followed by the raw markdown body the
|
|
chunker indexed.
|
|
|
|
Use this when the user is comparing varieties closely or wants to
|
|
see every published rating value, not just the ones that matched a
|
|
query. The structured header is the canonical fact-check surface;
|
|
use it to validate any specific rating value before quoting it.
|
|
"""
|
|
with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call:
|
|
sidecar = _read_sidecar(source, source_key)
|
|
if sidecar is None:
|
|
_call.set(found=False)
|
|
return (
|
|
f"_(variety not found: source='{source}' "
|
|
f"source_key='{source_key}'. Use list_versions() to see "
|
|
f"available sources or search_docs() to find candidates.)_"
|
|
)
|
|
body = _read_markdown(source, source_key) or ""
|
|
structured = _structured_ratings_block(sidecar)
|
|
_call.set(found=True, body_chars=len(body), groups=len(sidecar.get("characteristics_groups") or []))
|
|
sep = "\n\n---\n\n## Indexed body (chunk source text)\n\n"
|
|
return structured + sep + body
|
|
|
|
|
|
@mcp.tool()
|
|
def list_versions() -> str:
|
|
"""List the available scraper sources and the crop/brand/vendor
|
|
facets across all indexed varieties.
|
|
|
|
Use this BEFORE search_docs() the first time you query, to discover
|
|
which scraper sources, brands, vendors, and crops are actually in
|
|
the corpus right now. The agent should not assume — the corpus
|
|
grows as more vendors are scraped.
|
|
"""
|
|
with TimedCall("list_versions", {}) as _call:
|
|
facets: dict[str, dict[str, int]] = {
|
|
"source": {}, "vendor": {}, "brand": {}, "crop": {},
|
|
}
|
|
n_varieties = 0
|
|
if CORPUS.exists():
|
|
for source_dir in sorted(CORPUS.iterdir()):
|
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
|
continue
|
|
for sc in source_dir.glob("*.json"):
|
|
try:
|
|
d = json.loads(sc.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
n_varieties += 1
|
|
for k in facets:
|
|
v = d.get(k)
|
|
if v:
|
|
facets[k][v] = facets[k].get(v, 0) + 1
|
|
|
|
if n_varieties == 0:
|
|
return (
|
|
"_(no varieties indexed yet — run scrapers + indexer "
|
|
"before calling search_docs)_"
|
|
)
|
|
|
|
_call.set(
|
|
varieties=n_varieties,
|
|
sources=len(facets["source"]),
|
|
vendors=len(facets["vendor"]),
|
|
brands=len(facets["brand"]),
|
|
crops=len(facets["crop"]),
|
|
)
|
|
|
|
lines = [f"# Corpus facets ({n_varieties} varieties indexed)", ""]
|
|
for label, counts in facets.items():
|
|
if not counts:
|
|
continue
|
|
lines.append(f"## {label}")
|
|
lines.append("")
|
|
for k, n in sorted(counts.items(), key=lambda kv: -kv[1]):
|
|
lines.append(f"- `{k}` — {n} varieties")
|
|
lines.append("")
|
|
return "\n".join(lines).rstrip()
|
|
|
|
|
|
@mcp.tool()
|
|
def lookup_variety(
|
|
source_key: Annotated[str, Field(description=(
|
|
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'."
|
|
))],
|
|
source: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL scraper source ('bayer_seeds' etc). If omitted, "
|
|
"scans all sources for the source_key."
|
|
)),
|
|
] = None,
|
|
) -> str:
|
|
"""Return the canonical sidecar JSON for one variety, verbatim.
|
|
|
|
USE THIS to fact-check any rating value before quoting it to the
|
|
farmer. The output is the exact data the scraper captured from the
|
|
vendor's published catalog — no paraphrasing, no inference.
|
|
|
|
Call this whenever the user (or you) needs an exact value for a
|
|
specific rating, trait, or maturity — not just a semantic match
|
|
from search_docs.
|
|
"""
|
|
with TimedCall("lookup_variety", {"source_key": source_key, "source": source}) as _call:
|
|
if source:
|
|
sidecar = _read_sidecar(source, source_key)
|
|
if sidecar is not None:
|
|
_call.set(found=True, source=source)
|
|
return "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```"
|
|
_call.set(found=False, source=source)
|
|
return f"_(variety '{source_key}' not found under source '{source}')_"
|
|
|
|
# No source given — scan all source dirs for a match.
|
|
if not CORPUS.exists():
|
|
_call.set(found=False)
|
|
return "_(corpus is empty — no scrapers have run yet)_"
|
|
matches: list[tuple[str, dict]] = []
|
|
for source_dir in sorted(CORPUS.iterdir()):
|
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
|
continue
|
|
sidecar = _read_sidecar(source_dir.name, source_key)
|
|
if sidecar is not None:
|
|
matches.append((source_dir.name, sidecar))
|
|
|
|
if not matches:
|
|
_call.set(found=False)
|
|
return (
|
|
f"_(no variety with source_key '{source_key}' found in any "
|
|
f"source. Use search_docs() to find the right key.)_"
|
|
)
|
|
|
|
_call.set(found=True, matches=len(matches))
|
|
if len(matches) == 1:
|
|
src_name, sidecar = matches[0]
|
|
return (
|
|
f"_(matched in source `{src_name}`)_\n\n"
|
|
"```json\n"
|
|
+ json.dumps(sidecar, indent=2, ensure_ascii=False)
|
|
+ "\n```"
|
|
)
|
|
|
|
# Multi-match: serialize each labeled.
|
|
out: list[str] = [f"_(matched {len(matches)} sources)_\n"]
|
|
for src_name, sidecar in matches:
|
|
out.append(f"### source: `{src_name}`")
|
|
out.append("```json")
|
|
out.append(json.dumps(sidecar, indent=2, ensure_ascii=False))
|
|
out.append("```")
|
|
return "\n".join(out)
|
|
|
|
|
|
@mcp.tool()
|
|
def search_trials(
|
|
query: Annotated[str, Field(description=(
|
|
"Natural-language query about yield trials. Mention crop, "
|
|
"region or state, year, soil/conditions, and any specific "
|
|
"variety codes you want compared. Examples: "
|
|
"'best corn hybrid 2024 Iowa heavy clay'; "
|
|
"'AP Iliad yield Idaho stripe rust'; "
|
|
"'DKC65-20 vs NK1748 head to head Alabama 2023'."
|
|
))],
|
|
crop: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL: corn, soybeans, silage, or wheat."),
|
|
] = None,
|
|
state: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL state filter. 2-letter abbrev (IA, IL, NE...) "
|
|
"for Golden Harvest plot reports; full or partial region "
|
|
"name (e.g. 'Pacific Northwest', 'Montana') for AgriPro "
|
|
"trial PDFs."
|
|
)),
|
|
] = None,
|
|
year: Annotated[
|
|
int | None,
|
|
Field(description="OPTIONAL year filter (e.g. 2024).", ge=2010, le=2030),
|
|
] = None,
|
|
product: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL variety/hybrid filter — substring match against "
|
|
"the product field. Example: 'DKC62' surfaces trials "
|
|
"containing any DKC62-* hybrid."
|
|
)),
|
|
] = None,
|
|
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
|
|
) -> str:
|
|
"""Search yield-trial data — head-to-head results from real field
|
|
trials. SEPARATE from variety-identity search.
|
|
|
|
Use this when the user wants to know HOW PRODUCTS PERFORMED, not
|
|
what they ARE. Trial data complements `search_docs`:
|
|
|
|
* `search_docs` answers: "What's the disease resistance profile
|
|
of DKC62-08RIB?" (variety identity)
|
|
* `search_trials` answers: "Which corn hybrid actually won the
|
|
yield trials in central Iowa in 2024?" (performance data)
|
|
|
|
Data sources:
|
|
|
|
* **Golden Harvest plot reports** (4,000+ trials) — per-site
|
|
head-to-head comparing products from MULTIPLE BRANDS at one
|
|
cooperator's field. NK, DEKALB, Golden Harvest, sometimes
|
|
others all compete at the same site. Cross-vendor data Bayer
|
|
itself doesn't publish.
|
|
* **AgriPro regional trial PDFs** (~14 PDFs) — multi-year
|
|
multi-location wheat performance for Northern Plains / PNW /
|
|
Plains regions.
|
|
|
|
A typical workflow: call this to identify top performers in a
|
|
region/year, then call `lookup_variety(source_key=...)` on the
|
|
leaders to verify identity details (RM, traits, disease ratings).
|
|
"""
|
|
with TimedCall("search_trials", {
|
|
"query": query, "crop": crop, "state": state, "year": year,
|
|
"product": product, "k": k,
|
|
}) as _call:
|
|
where = _build_where(
|
|
crop, None, None, None, None,
|
|
data_type="trial",
|
|
state=state,
|
|
year=year,
|
|
)
|
|
pool_size = max(k * 3, RERANK_POOL)
|
|
|
|
try:
|
|
col = _collection()
|
|
except Exception as exc: # noqa: BLE001
|
|
_call.set(error_dense=str(exc), hits_returned=0)
|
|
return (
|
|
"_(retrieval unavailable — Chroma collection not found. "
|
|
"Has the indexer run? `python -m rag.index --rebuild`.)_"
|
|
)
|
|
|
|
# If a product filter is set, augment the query with the
|
|
# product code so BM25 + dense both have signal.
|
|
full_query = query
|
|
if product:
|
|
full_query = f"{query} {product}"
|
|
|
|
try:
|
|
dense = col.query(
|
|
query_texts=[full_query],
|
|
n_results=pool_size,
|
|
where=where,
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
_call.set(error_dense=str(exc), hits_returned=0)
|
|
return f"_(trial retrieval failed: {exc})_"
|
|
|
|
dense_ids: list[str] = (dense.get("ids") or [[]])[0]
|
|
dense_docs: list[str] = (dense.get("documents") or [[]])[0]
|
|
dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0]
|
|
dense_dists: list[float] = (dense.get("distances") or [[]])[0]
|
|
|
|
id_to_doc = dict(zip(dense_ids, dense_docs))
|
|
id_to_meta = dict(zip(dense_ids, dense_metas))
|
|
id_to_dist = dict(zip(dense_ids, dense_dists))
|
|
|
|
used_hybrid = False
|
|
if HYBRID_SEARCH:
|
|
bm25 = _bm25_index()
|
|
if bm25 is not None:
|
|
bm25_hits = bm25.query(full_query, n=pool_size, where=where)
|
|
bm25_ids = [h[0] for h in bm25_hits]
|
|
if bm25_ids:
|
|
fused = _rrf_fuse([dense_ids, bm25_ids])
|
|
fuzzy_ids = fused
|
|
used_hybrid = True
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
|
|
# Optional reranker pass over the fuzzy pool — same shape as
|
|
# in search_docs. Skipped silently if RERANK_URL is unset or
|
|
# the rerank call fails.
|
|
used_rerank = False
|
|
if RERANK_URL and fuzzy_ids:
|
|
need = [i for i in fuzzy_ids if i not in id_to_doc]
|
|
if need:
|
|
try:
|
|
extra = col.get(ids=need[:RERANK_POOL], include=["documents", "metadatas"])
|
|
for cid, doc, meta in zip(
|
|
extra.get("ids") or [],
|
|
extra.get("documents") or [],
|
|
extra.get("metadatas") or [],
|
|
):
|
|
id_to_doc[cid] = doc
|
|
id_to_meta[cid] = meta
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("pre-rerank get-by-id failed: %s", exc)
|
|
pool = [(cid, id_to_doc.get(cid, "")) for cid in fuzzy_ids[:RERANK_POOL]]
|
|
reranked = _rerank(full_query, pool)
|
|
if reranked:
|
|
fuzzy_ids = reranked + [c for c in fuzzy_ids if c not in set(reranked)]
|
|
used_rerank = True
|
|
|
|
# Optional product-substring post-filter: if user supplied
|
|
# ``product``, require the chunk to actually contain the
|
|
# token. This re-checks the bytes since BM25 only sees stems.
|
|
if product:
|
|
needle = product.lower()
|
|
def _has_product(cid: str) -> bool:
|
|
doc = id_to_doc.get(cid, "")
|
|
if needle in doc.lower():
|
|
return True
|
|
# Not yet fetched — defer; the get-by-id below will fix.
|
|
return cid not in id_to_doc
|
|
|
|
fuzzy_ids = [cid for cid in fuzzy_ids if _has_product(cid)]
|
|
|
|
final_ids: list[str] = []
|
|
seen: set[str] = set()
|
|
for cid in fuzzy_ids:
|
|
if cid in seen:
|
|
continue
|
|
seen.add(cid)
|
|
final_ids.append(cid)
|
|
if len(final_ids) >= k:
|
|
break
|
|
|
|
missing = [i for i in final_ids if i not in id_to_doc]
|
|
if missing:
|
|
try:
|
|
extra = col.get(ids=missing, include=["documents", "metadatas"])
|
|
for cid, doc, meta in zip(
|
|
extra.get("ids") or [],
|
|
extra.get("documents") or [],
|
|
extra.get("metadatas") or [],
|
|
):
|
|
id_to_doc[cid] = doc
|
|
id_to_meta[cid] = meta
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("get-by-id for BM25-only hits failed: %s", exc)
|
|
|
|
# Apply product filter once we have docs from the get-by-id pass.
|
|
if product:
|
|
needle = product.lower()
|
|
final_ids = [cid for cid in final_ids if needle in id_to_doc.get(cid, "").lower()]
|
|
|
|
_call.set(
|
|
hits_returned=len(final_ids),
|
|
hybrid=used_hybrid,
|
|
reranked=used_rerank,
|
|
pool_size=pool_size,
|
|
data_type="trial",
|
|
)
|
|
|
|
if not final_ids:
|
|
return (
|
|
"_(no trials matched. Try widening — drop the state, "
|
|
"year, or product filter. `list_versions()` shows "
|
|
"which trial sources are indexed.)_"
|
|
)
|
|
|
|
blocks: list[str] = []
|
|
for cid in final_ids:
|
|
doc = id_to_doc.get(cid, "")
|
|
meta = id_to_meta.get(cid, {})
|
|
dist = id_to_dist.get(cid) if not used_hybrid else None
|
|
blocks.append(_format_trial_hit(doc, meta, dist))
|
|
|
|
header = (
|
|
f"# Trial search results — {len(final_ids)} trial document"
|
|
f"{'s' if len(final_ids) != 1 else ''}"
|
|
f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n"
|
|
f"_Use `get_page(source=..., source_key=...)` to read the "
|
|
f"full trial body. Use `lookup_variety(source_key=...)` on "
|
|
f"any product code to verify its identity (RM, traits, "
|
|
f"disease ratings)._\n\n---\n\n"
|
|
)
|
|
return header + "\n---\n\n".join(blocks)
|
|
|
|
|
|
def _format_trial_hit(doc: str, meta: dict, distance: float | None = None) -> str:
|
|
"""Trial-specific result header. Highlights crop/state/year and
|
|
sources URL (vs variety hits which emphasize brand + product
|
|
identity)."""
|
|
src_url = meta.get("source_url") or ""
|
|
src_key = meta.get("source_key") or ""
|
|
src = meta.get("source") or ""
|
|
crop = meta.get("crop") or ""
|
|
state = meta.get("state") or ""
|
|
year = meta.get("year") or ""
|
|
region = meta.get("region") or ""
|
|
|
|
title_bits = [b for b in [crop.title(), region or state, str(year) if year else ""] if b]
|
|
title = " · ".join(title_bits) if title_bits else src_key
|
|
|
|
header = (
|
|
f"### Trial: {title} \n"
|
|
f"`{src}::{src_key}` — {meta.get('vendor', '')} / {meta.get('brand', '')} \n"
|
|
f"<{src_url}>"
|
|
)
|
|
if distance is not None:
|
|
header += f" \n_(distance={distance:.4f})_"
|
|
return f"{header}\n\n{doc.strip()}\n"
|
|
|
|
|
|
@mcp.tool()
|
|
def crop_seed_api_lessons(
|
|
topic: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL topic — match against lesson section slugs or body "
|
|
"(substring, case-insensitive). Known slugs: pioneer, "
|
|
"rating-scales, maturity-semantics, trait-glossary, "
|
|
"scn-resistance, regional-listings, sources-not-yet-indexed, "
|
|
"checking-your-work. Omit for the full curated index."
|
|
)),
|
|
] = None,
|
|
) -> str:
|
|
"""Curated knowledge that does NOT live in the scraped corpus —
|
|
vendor scale-direction notes, trait glossary, maturity semantics,
|
|
SCN resistance interpretation, the **Pioneer fallback policy**,
|
|
and rules for fact-checking your work.
|
|
|
|
Call this tool when:
|
|
|
|
* The user asks about **Pioneer** or any P-series hybrid — Pioneer
|
|
is intentionally NOT scraped (ToS bans it); the lesson tells you
|
|
what to say instead.
|
|
* You need to compare ratings across vendors — different vendors
|
|
publish on different scale directions.
|
|
* You're parsing a trait code or disease abbreviation you don't
|
|
recognize.
|
|
* Before quoting a specific rating value to a farmer — the
|
|
``checking-your-work`` lesson reminds you to call
|
|
``lookup_variety`` to confirm.
|
|
|
|
This tool is **the only source of opinionated content** in the
|
|
server. Everything else returned by search_docs / get_page /
|
|
lookup_variety is verbatim from vendor catalogs.
|
|
"""
|
|
with TimedCall("crop_seed_api_lessons", {"topic": topic}) as _call:
|
|
sections = _load_lessons()
|
|
if not sections:
|
|
_call.set(sections_returned=0)
|
|
return "_(no lessons file present — docs_mcp/lessons.md missing)_"
|
|
|
|
if not topic:
|
|
_call.set(sections_returned=len(sections))
|
|
return "\n\n---\n\n".join(
|
|
f"## {slug}\n\n{body}" for slug, body in sections
|
|
)
|
|
|
|
needle = topic.strip().lower()
|
|
# Prefer slug matches (most specific). Fall back to body match
|
|
# only when no slug matches — keeps a query like "rating" from
|
|
# returning every section that happens to mention the word.
|
|
slug_matches: list[tuple[str, str]] = []
|
|
body_matches: list[tuple[str, str]] = []
|
|
for slug, body in sections:
|
|
if needle in slug.lower():
|
|
slug_matches.append((slug, body))
|
|
elif needle in body.lower():
|
|
body_matches.append((slug, body))
|
|
matched = slug_matches if slug_matches else body_matches
|
|
|
|
_call.set(sections_returned=len(matched), topic=topic)
|
|
if not matched:
|
|
slugs = ", ".join(s for s, _ in sections)
|
|
return (
|
|
f"_(no lesson section matched topic '{topic}'. "
|
|
f"Available slugs: {slugs}.)_"
|
|
)
|
|
return "\n\n---\n\n".join(
|
|
f"## {slug}\n\n{body}" for slug, body in matched
|
|
)
|
|
|
|
|
|
# ===========================================================================
|
|
# Entry point
|
|
# ===========================================================================
|
|
|
|
def main() -> None:
|
|
import argparse
|
|
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
|
|
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
|
|
default=os.environ.get("MCP_TRANSPORT", "stdio"))
|
|
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
|
|
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
|
|
args = p.parse_args()
|
|
|
|
if args.transport == "stdio":
|
|
mcp.run()
|
|
else:
|
|
mcp.settings.host = args.host
|
|
mcp.settings.port = args.port
|
|
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
|
|
mcp.settings.transport_security.enable_dns_rebinding_protection = False
|
|
mcp.run(transport=args.transport)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|