4009dc0b78
Add the fifth MCP tool — crop_seed_api_lessons(topic?) — backed by docs_mcp/lessons.md, the ONLY source of opinionated content in the server. Everything else (search_docs, get_page, lookup_variety) returns verbatim from vendor catalogs; lessons.md fills the gaps the corpus can't cover. The Pioneer fallback is the critical anti-hallucination piece: Pioneer's ToS bans automation, so the corpus has no Pioneer data. Without this tool, an agent might surface Bayer/Asgrow chunks as mediocre matches for a Pioneer query. The tool's docstring tells the agent to call it on any Pioneer / P-series question; the 'pioneer' section says clearly: "I don't have Pioneer's variety data indexed... please consult Pioneer or an extension service." "Do NOT invent Pioneer hybrid ratings." Other lesson sections cover knowledge the agent needs to interpret search_docs / get_page output correctly: - rating-scales: Bayer 1-9, Golden Harvest 9-to-1, what R/MR/S/Rps1c/R3 mean in soybean disease columns - maturity-semantics: corn RM days vs soybean MG vs wheat class + qualitative early/medium/late - trait-glossary: SSRIB, VT2PRIB, XF, E3, Conkesta, Clearfield, etc. - scn-resistance: race coverage + Peking vs PI 88788 source - regional-listings: how to interpret Bayer's "local profiles" - sources-not-yet-indexed: which vendors aren't in the corpus yet - checking-your-work: always call lookup_variety before quoting Lesson lookup prefers slug-match (returns just `rating-scales` for topic="rating", not every section that mentions ratings); falls back to body-match only when no slug matches. Smoke-tested with topic=pioneer, topic=rating, topic=trait, topic=zzzzzz (no match), and topic=None (full index = 10K chars, 8 sections). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
845 lines
32 KiB
Python
845 lines
32 KiB
Python
"""seed-mcp — MCP server over public US row-crop seed catalogs.
|
|
|
|
Tools (all read-only):
|
|
|
|
search_docs natural-language retrieval over the corpus
|
|
get_page the full markdown body of one variety + sidecar
|
|
list_versions facet discovery (crops / brands / vendors / sources)
|
|
lookup_variety canonical sidecar JSON by source_key — fact-check
|
|
anything you surface from search_docs against this
|
|
|
|
The contract with the calling agent is **never fabricate**. Every chunk
|
|
we return is verbatim from the source's published catalog (the chunker
|
|
rebuilds chunks deterministically from the sidecar JSON). Every
|
|
response carries the variety's source URL so the agent can cite. The
|
|
``lookup_variety`` tool exists specifically so the agent can validate
|
|
specific rating values without having to trust paraphrased retrieval
|
|
text.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Annotated, Any
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pydantic import Field
|
|
|
|
from .usage import TimedCall
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Product-specific configuration.
|
|
# ---------------------------------------------------------------------------
|
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
|
|
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp")
|
|
COLLECTION = f"{PRODUCT_NAME}_docs"
|
|
|
|
# Paths inside the deployed container (and matching layout locally for dev).
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
CORPUS = ROOT / "corpus"
|
|
CHROMA_DIR = ROOT / "chroma"
|
|
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature flags.
|
|
# ---------------------------------------------------------------------------
|
|
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
|
|
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
|
|
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
|
|
|
|
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "true").lower() in ("true", "1", "yes", "on")
|
|
RRF_K = int(os.environ.get("RRF_K", "60"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FastMCP setup.
|
|
# ---------------------------------------------------------------------------
|
|
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lazy singletons. Instantiate on first use so the server can start even
|
|
# when (e.g.) Ollama is briefly unreachable — degraded modes fall back
|
|
# gracefully rather than refusing to boot.
|
|
# ---------------------------------------------------------------------------
|
|
_chroma_client = None
|
|
_chroma_collection = None
|
|
_bm25 = None
|
|
_code_index: dict[str, list[tuple[str, str]]] | None = None
|
|
|
|
|
|
def _build_code_index() -> dict[str, list[tuple[str, str]]]:
|
|
"""Walk sidecars once and index varieties by every lookup key a
|
|
farmer or LLM might paste: ``source_key``, ``hybrid_prefix``,
|
|
``product_name`` (normalized), each token from
|
|
``hybrid_prefix``/``product_name`` longer than three chars.
|
|
|
|
Returns a dict mapping the normalized key → list of
|
|
``(source, source_key)`` tuples. Multiple-source matches are kept
|
|
so we can warn the agent about ambiguity.
|
|
"""
|
|
idx: dict[str, list[tuple[str, str]]] = {}
|
|
|
|
def _add(key: str, value: tuple[str, str]) -> None:
|
|
key = (key or "").lower().strip()
|
|
if not key or len(key) < 4:
|
|
return
|
|
idx.setdefault(key, [])
|
|
if value not in idx[key]:
|
|
idx[key].append(value)
|
|
|
|
if not CORPUS.exists():
|
|
return idx
|
|
for source_dir in CORPUS.iterdir():
|
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
|
continue
|
|
for sc in source_dir.glob("*.json"):
|
|
try:
|
|
d = json.loads(sc.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
source = d.get("source") or source_dir.name
|
|
sk = d.get("source_key") or sc.stem
|
|
value = (source, sk)
|
|
_add(sk, value)
|
|
_add(d.get("hybrid_prefix") or "", value)
|
|
_add(d.get("product_name") or "", value)
|
|
# Token-split product_name and hybrid_prefix so "DKC62-08RIB"
|
|
# in a query matches "DKC62-08RIB BRAND BLEND" via the
|
|
# "DKC62-08RIB" token as well as the full string.
|
|
for source_field in (d.get("product_name"), d.get("hybrid_prefix")):
|
|
if not source_field:
|
|
continue
|
|
for tok in re.split(r"[\s()/,]+", source_field):
|
|
tok = tok.strip()
|
|
# Only retain tokens that LOOK like codes — at least
|
|
# one digit, mostly alphanumeric/dash. Skips "BRAND"
|
|
# / "BLEND" / etc.
|
|
if re.match(r"^[A-Za-z]*\d[\w\-]*$", tok):
|
|
_add(tok, value)
|
|
return idx
|
|
|
|
|
|
def _code_lookup() -> dict[str, list[tuple[str, str]]]:
|
|
global _code_index
|
|
if _code_index is None:
|
|
_code_index = _build_code_index()
|
|
log.info("code-index: %d lookup keys built", len(_code_index))
|
|
return _code_index
|
|
|
|
|
|
# Tokens that LOOK like a variety code — at least one digit, otherwise
|
|
# alphanumeric / dash. Catches "DKC62-08RIB", "AG29XF4", "WB1376CLP",
|
|
# "VT2PRIB"; skips ordinary words like "ratings".
|
|
_CODE_TOKEN_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9\-]{2,})\b")
|
|
|
|
|
|
def _exact_code_matches(query: str) -> list[tuple[str, str]]:
|
|
"""Find varieties whose source_key / product_name / token-split
|
|
components contain a query token. Used as a high-confidence pin
|
|
before fuzzy retrieval."""
|
|
idx = _code_lookup()
|
|
if not idx:
|
|
return []
|
|
out: list[tuple[str, str]] = []
|
|
seen: set[tuple[str, str]] = set()
|
|
for m in _CODE_TOKEN_RE.finditer(query or ""):
|
|
tok = m.group(1).lower()
|
|
if len(tok) < 4 or not any(c.isdigit() for c in tok):
|
|
continue
|
|
for v in idx.get(tok, []):
|
|
if v not in seen:
|
|
seen.add(v)
|
|
out.append(v)
|
|
return out
|
|
|
|
|
|
def _collection():
|
|
"""Return the Chroma collection, opening it lazily."""
|
|
global _chroma_client, _chroma_collection
|
|
if _chroma_collection is not None:
|
|
return _chroma_collection
|
|
import chromadb
|
|
from chromadb.config import Settings
|
|
from rag.embeddings import embedding_function
|
|
|
|
_chroma_client = chromadb.PersistentClient(
|
|
path=str(CHROMA_DIR),
|
|
settings=Settings(anonymized_telemetry=False),
|
|
)
|
|
_chroma_collection = _chroma_client.get_collection(
|
|
COLLECTION, embedding_function=embedding_function()
|
|
)
|
|
return _chroma_collection
|
|
|
|
|
|
def _bm25_index():
|
|
"""Return the BM25 index, or None if it doesn't exist on disk."""
|
|
global _bm25
|
|
if _bm25 is not None:
|
|
return _bm25
|
|
from rag.bm25 import BM25Index
|
|
idx = BM25Index(BM25_DB)
|
|
if not idx.exists():
|
|
return None
|
|
_bm25 = idx
|
|
return _bm25
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers — local file IO + filter builder.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_where(
|
|
crop: str | None,
|
|
brand: str | None,
|
|
vendor: str | None,
|
|
source: str | None,
|
|
source_key: str | None,
|
|
) -> dict | None:
|
|
"""Translate filter args into a Chroma `where` clause."""
|
|
conds: list[dict] = []
|
|
if crop:
|
|
conds.append({"crop": crop.lower()})
|
|
if brand:
|
|
conds.append({"brand": brand.upper()})
|
|
if vendor:
|
|
conds.append({"vendor": vendor})
|
|
if source:
|
|
conds.append({"source": source})
|
|
if source_key:
|
|
conds.append({"source_key": source_key})
|
|
if not conds:
|
|
return None
|
|
if len(conds) == 1:
|
|
return conds[0]
|
|
return {"$and": conds}
|
|
|
|
|
|
def _read_sidecar(source: str, source_key: str) -> dict | None:
|
|
"""Read a variety's sidecar JSON off disk."""
|
|
path = CORPUS / source / f"{source_key}.json"
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
log.warning("sidecar read failed for %s/%s: %s", source, source_key, exc)
|
|
return None
|
|
|
|
|
|
def _read_markdown(source: str, source_key: str) -> str | None:
|
|
path = CORPUS / source / f"{source_key}.md"
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
return path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return None
|
|
|
|
|
|
def _format_hit(doc: str, meta: dict, distance: float | None = None) -> str:
|
|
"""Render one retrieval hit as a fenced markdown block with full
|
|
provenance attached. ``doc`` is the chunk text; ``meta`` is the
|
|
chunk's metadata dict."""
|
|
src_url = meta.get("source_url") or ""
|
|
src_key = meta.get("source_key") or ""
|
|
src = meta.get("source") or ""
|
|
vendor = meta.get("vendor") or ""
|
|
brand = meta.get("brand") or ""
|
|
crop = meta.get("crop") or ""
|
|
name = meta.get("product_name") or src_key
|
|
|
|
header = (
|
|
f"### {name} \n"
|
|
f"`{src}::{src_key}` — {vendor} / {brand} / {crop} \n"
|
|
f"<{src_url}>"
|
|
)
|
|
if distance is not None:
|
|
header += f" \n_(distance={distance:.4f})_"
|
|
return f"{header}\n\n{doc.strip()}\n"
|
|
|
|
|
|
def _rrf_fuse(rankings: list[list[str]], k: int = RRF_K) -> list[str]:
|
|
"""Reciprocal Rank Fusion — merge multiple ranked id lists into one.
|
|
|
|
Score(id) = sum over each ranking R of 1 / (k + rank_R(id)).
|
|
Robust to score-scale differences between dense (cosine) and lexical
|
|
(BM25). k=60 is the literature default; not particularly sensitive.
|
|
"""
|
|
scores: dict[str, float] = {}
|
|
for ranking in rankings:
|
|
for rank, doc_id in enumerate(ranking):
|
|
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)
|
|
return sorted(scores, key=lambda d: scores[d], reverse=True)
|
|
|
|
|
|
def _structured_ratings_block(sidecar: dict) -> str:
|
|
"""Render the sidecar's grouped characteristics + identity as a
|
|
fact-checkable block, with the source URL pinned at top.
|
|
|
|
This is the response body for ``get_page`` and the embedded
|
|
"canonical data" block agents should trust over any free-text
|
|
paraphrase."""
|
|
lines: list[str] = []
|
|
name = sidecar.get("product_name") or sidecar.get("source_key", "")
|
|
lines.append(f"# {name}")
|
|
lines.append("")
|
|
|
|
facets: list[str] = []
|
|
if sidecar.get("vendor"):
|
|
facets.append(f"**Vendor:** {sidecar['vendor']}")
|
|
if sidecar.get("brand"):
|
|
facets.append(f"**Brand:** {str(sidecar['brand']).title()}")
|
|
if sidecar.get("crop"):
|
|
facets.append(f"**Crop:** {str(sidecar['crop']).capitalize()}")
|
|
if sidecar.get("relative_maturity") and sidecar.get("crop") == "corn":
|
|
facets.append(f"**Relative maturity:** {sidecar['relative_maturity']}")
|
|
if sidecar.get("maturity_group") and sidecar.get("crop") == "soybeans":
|
|
facets.append(f"**Maturity group:** {sidecar['maturity_group']}")
|
|
if sidecar.get("relative_maturity") and sidecar.get("crop") == "wheat":
|
|
facets.append(f"**Maturity:** {sidecar['relative_maturity']}")
|
|
if sidecar.get("wheat_class"):
|
|
facets.append(f"**Wheat class:** {sidecar['wheat_class']}")
|
|
if sidecar.get("release_year"):
|
|
facets.append(f"**Released:** {sidecar['release_year']}")
|
|
if sidecar.get("trait_stack"):
|
|
facets.append(f"**Trait stack:** {', '.join(sidecar['trait_stack'])}")
|
|
if facets:
|
|
lines.append(" · ".join(facets))
|
|
lines.append("")
|
|
|
|
urls = sidecar.get("source_urls") or []
|
|
if urls:
|
|
lines.append(f"**Source:** {urls[0]}")
|
|
lines.append("")
|
|
|
|
scale = sidecar.get("_scale_direction") or "scale direction not declared by source"
|
|
lines.append(f"**Rating scale (as published):** {scale}")
|
|
lines.append("")
|
|
|
|
# Canonical ratings — one table per group, values verbatim.
|
|
for g in sidecar.get("characteristics_groups") or []:
|
|
label = g.get("label") or "Characteristics"
|
|
items = g.get("items") or []
|
|
if not items:
|
|
continue
|
|
lines.append(f"## {label.title()}")
|
|
lines.append("")
|
|
lines.append("| Characteristic | Value |")
|
|
lines.append("|---|---|")
|
|
for it in items:
|
|
ch = (it.get("characteristic") or "").strip()
|
|
v = (it.get("value") or "").strip()
|
|
lines.append(f"| {ch} | {v} |")
|
|
lines.append("")
|
|
|
|
if sidecar.get("positioning_statement"):
|
|
lines.append("## Vendor positioning")
|
|
lines.append("")
|
|
lines.append(sidecar["positioning_statement"].strip())
|
|
lines.append("")
|
|
|
|
if sidecar.get("strengths"):
|
|
lines.append("## Strengths / management notes (vendor copy)")
|
|
lines.append("")
|
|
for s in sidecar["strengths"]:
|
|
lines.append(f"- {s}")
|
|
lines.append("")
|
|
|
|
rec = sidecar.get("regional_recommendations") or []
|
|
if rec:
|
|
names = sorted({
|
|
(r.get("product_list_name") or "").strip()
|
|
for r in rec
|
|
if (r.get("product_list_name") or "").strip()
|
|
})
|
|
if names:
|
|
lines.append("## Listed in vendor regional seed guides")
|
|
lines.append("")
|
|
for n in names:
|
|
lines.append(f"- {n}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines).rstrip() + "\n"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Curated lessons — docs_mcp/lessons.md is the canonical source.
|
|
# ---------------------------------------------------------------------------
|
|
LESSONS_FILE = Path(__file__).resolve().parent / "lessons.md"
|
|
_lessons_cache: list[tuple[str, str]] | None = None
|
|
|
|
|
|
def _load_lessons() -> list[tuple[str, str]]:
|
|
"""Parse lessons.md into ``[(slug, body), ...]`` sections.
|
|
|
|
Sections are delimited by ``## <slug>`` headings. The slug is the
|
|
`<slug>` token (whitespace stripped); the body is everything between
|
|
that heading and the next ``## `` heading (or EOF).
|
|
"""
|
|
global _lessons_cache
|
|
if _lessons_cache is not None:
|
|
return _lessons_cache
|
|
out: list[tuple[str, str]] = []
|
|
if not LESSONS_FILE.exists():
|
|
_lessons_cache = out
|
|
return out
|
|
text = LESSONS_FILE.read_text(encoding="utf-8")
|
|
parts = re.split(r"(?m)^## (.+)$", text)
|
|
# parts = [preamble, slug1, body1, slug2, body2, ...]
|
|
for i in range(1, len(parts), 2):
|
|
slug = parts[i].strip()
|
|
body = parts[i + 1] if i + 1 < len(parts) else ""
|
|
# Drop trailing horizontal rule that separates sections.
|
|
body = re.sub(r"\n---\s*$", "", body).strip()
|
|
out.append((slug, body))
|
|
_lessons_cache = out
|
|
return out
|
|
|
|
|
|
# ===========================================================================
|
|
# Tools
|
|
# ===========================================================================
|
|
|
|
@mcp.tool()
|
|
def search_docs(
|
|
query: Annotated[str, Field(description=(
|
|
"Natural-language query about row-crop seed varieties. "
|
|
"Mention crop (corn/soybeans/wheat), maturity (RM days / "
|
|
"MG number / wheat class), traits, disease tolerances, "
|
|
"or soil/region constraints — they all carry retrieval signal."
|
|
))],
|
|
crop: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL filter: corn, soybeans, or wheat."),
|
|
] = None,
|
|
brand: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL brand filter — DEKALB, ASGROW, WESTBRED, "
|
|
"GoldenHarvest, NK, AgriPro, Becks. Case-insensitive."
|
|
)),
|
|
] = None,
|
|
vendor: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL vendor filter — Bayer, Syngenta, Beck's."),
|
|
] = None,
|
|
source: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL source filter — e.g. 'bayer_seeds'. Use "
|
|
"list_versions() to see what's indexed."
|
|
)),
|
|
] = None,
|
|
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
|
|
) -> str:
|
|
"""Search the seed-variety corpus for hybrids/varieties matching a query.
|
|
|
|
Returns the top-k variety chunks with their full source URLs, ratings,
|
|
maturity, traits, and regional listings. Optional filters narrow to one
|
|
crop, brand, vendor, or scraper source. Use **list_versions()** first
|
|
to discover valid facet values. Use **lookup_variety()** on any
|
|
candidate the user is serious about — that returns the canonical
|
|
sidecar so you can verify exact rating values without trusting the
|
|
free-text chunk.
|
|
|
|
Call this tool whenever an ag professional or farmer asks anything
|
|
about choosing a seed variety, comparing hybrids, or finding seed
|
|
matched to a maturity / soil / region / disease constraint.
|
|
|
|
NEVER answer seed-selection questions from prior knowledge alone —
|
|
seed catalogs change yearly and brand-specific. Always search first.
|
|
"""
|
|
with TimedCall("search_docs", {
|
|
"query": query, "crop": crop, "brand": brand,
|
|
"vendor": vendor, "source": source, "k": k,
|
|
}) as _call:
|
|
where = _build_where(crop, brand, vendor, source, None)
|
|
pool_size = max(k * 3, RERANK_POOL)
|
|
|
|
# Exact-code pre-filter. Variety codes ("DKC62-08RIB", "AG29XF4")
|
|
# have NO semantic neighbors — dense retrieval misses them and
|
|
# RRF can let off-topic noise float to top-1. If the query
|
|
# contains a token that exactly matches a variety's identifier,
|
|
# pin those varieties to the top of results.
|
|
pinned_ids: list[str] = []
|
|
for src, sk in _exact_code_matches(query):
|
|
pinned_ids.append(f"{src}::{sk}::0")
|
|
|
|
# Dense retrieval via Chroma.
|
|
try:
|
|
col = _collection()
|
|
except Exception as exc: # noqa: BLE001
|
|
_call.set(error_dense=str(exc), hits_returned=0)
|
|
return (
|
|
"_(retrieval unavailable — Chroma collection not found. "
|
|
"Has the indexer run? `python -m rag.index --rebuild`.)_"
|
|
)
|
|
|
|
try:
|
|
dense = col.query(
|
|
query_texts=[query],
|
|
n_results=pool_size,
|
|
where=where,
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
_call.set(error_dense=str(exc), hits_returned=0)
|
|
return f"_(retrieval failed: {exc})_"
|
|
|
|
dense_ids: list[str] = (dense.get("ids") or [[]])[0]
|
|
dense_docs: list[str] = (dense.get("documents") or [[]])[0]
|
|
dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0]
|
|
dense_dists: list[float] = (dense.get("distances") or [[]])[0]
|
|
|
|
id_to_doc = dict(zip(dense_ids, dense_docs))
|
|
id_to_meta = dict(zip(dense_ids, dense_metas))
|
|
id_to_dist = dict(zip(dense_ids, dense_dists))
|
|
|
|
# Hybrid: optionally fuse with BM25. Both are pulled at pool_size,
|
|
# fused via RRF, top-k returned.
|
|
used_hybrid = False
|
|
if HYBRID_SEARCH:
|
|
bm25 = _bm25_index()
|
|
if bm25 is not None:
|
|
bm25_hits = bm25.query(query, n=pool_size, where=where)
|
|
bm25_ids = [h[0] for h in bm25_hits]
|
|
if bm25_ids:
|
|
fused = _rrf_fuse([dense_ids, bm25_ids])
|
|
fuzzy_ids = fused
|
|
used_hybrid = True
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
else:
|
|
fuzzy_ids = dense_ids
|
|
|
|
# Pin exact-code matches at top, then fill remainder from fuzzy
|
|
# retrieval (deduped). Pinned matches are deterministic and
|
|
# high-confidence; they should never lose to a fuzzy match.
|
|
final_ids: list[str] = []
|
|
seen: set[str] = set()
|
|
for cid in pinned_ids + fuzzy_ids:
|
|
if cid in seen:
|
|
continue
|
|
seen.add(cid)
|
|
final_ids.append(cid)
|
|
if len(final_ids) >= k:
|
|
break
|
|
|
|
# For ids returned by BM25 but not by Chroma, we need their docs/
|
|
# metadata too. Re-fetch by id.
|
|
missing = [i for i in final_ids if i not in id_to_doc]
|
|
if missing:
|
|
try:
|
|
extra = col.get(ids=missing, include=["documents", "metadatas"])
|
|
for cid, doc, meta in zip(
|
|
extra.get("ids") or [],
|
|
extra.get("documents") or [],
|
|
extra.get("metadatas") or [],
|
|
):
|
|
id_to_doc[cid] = doc
|
|
id_to_meta[cid] = meta
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("get-by-id for BM25-only hits failed: %s", exc)
|
|
|
|
_call.set(
|
|
hits_returned=len(final_ids),
|
|
hybrid=used_hybrid,
|
|
pool_size=pool_size,
|
|
)
|
|
|
|
if not final_ids:
|
|
return (
|
|
"_(no varieties matched. Try broadening the query or "
|
|
"removing filters — call list_versions() to see what's "
|
|
"indexed.)_"
|
|
)
|
|
|
|
blocks: list[str] = []
|
|
for cid in final_ids:
|
|
doc = id_to_doc.get(cid, "")
|
|
meta = id_to_meta.get(cid, {})
|
|
dist = id_to_dist.get(cid) if not used_hybrid else None
|
|
blocks.append(_format_hit(doc, meta, dist))
|
|
|
|
header = (
|
|
f"# Search results — {len(final_ids)} variety chunk"
|
|
f"{'s' if len(final_ids) != 1 else ''}"
|
|
f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n"
|
|
f"_Use `lookup_variety(source_key=...)` on any candidate "
|
|
f"to fact-check ratings from the canonical sidecar._\n\n---\n\n"
|
|
)
|
|
return header + "\n---\n\n".join(blocks)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_page(
|
|
source: Annotated[str, Field(description=(
|
|
"Scraper source id — e.g. 'bayer_seeds'. Same as the `source` "
|
|
"field in search_docs results."
|
|
))],
|
|
source_key: Annotated[str, Field(description=(
|
|
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'. Same as "
|
|
"the `source_key` field in search_docs results."
|
|
))],
|
|
) -> str:
|
|
"""Return the full canonical record for one variety.
|
|
|
|
Emits a structured ratings header (identity, all characteristic
|
|
groups, vendor positioning, regional listings) sourced from the
|
|
variety's sidecar JSON, followed by the raw markdown body the
|
|
chunker indexed.
|
|
|
|
Use this when the user is comparing varieties closely or wants to
|
|
see every published rating value, not just the ones that matched a
|
|
query. The structured header is the canonical fact-check surface;
|
|
use it to validate any specific rating value before quoting it.
|
|
"""
|
|
with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call:
|
|
sidecar = _read_sidecar(source, source_key)
|
|
if sidecar is None:
|
|
_call.set(found=False)
|
|
return (
|
|
f"_(variety not found: source='{source}' "
|
|
f"source_key='{source_key}'. Use list_versions() to see "
|
|
f"available sources or search_docs() to find candidates.)_"
|
|
)
|
|
body = _read_markdown(source, source_key) or ""
|
|
structured = _structured_ratings_block(sidecar)
|
|
_call.set(found=True, body_chars=len(body), groups=len(sidecar.get("characteristics_groups") or []))
|
|
sep = "\n\n---\n\n## Indexed body (chunk source text)\n\n"
|
|
return structured + sep + body
|
|
|
|
|
|
@mcp.tool()
|
|
def list_versions() -> str:
|
|
"""List the available scraper sources and the crop/brand/vendor
|
|
facets across all indexed varieties.
|
|
|
|
Use this BEFORE search_docs() the first time you query, to discover
|
|
which scraper sources, brands, vendors, and crops are actually in
|
|
the corpus right now. The agent should not assume — the corpus
|
|
grows as more vendors are scraped.
|
|
"""
|
|
with TimedCall("list_versions", {}) as _call:
|
|
facets: dict[str, dict[str, int]] = {
|
|
"source": {}, "vendor": {}, "brand": {}, "crop": {},
|
|
}
|
|
n_varieties = 0
|
|
if CORPUS.exists():
|
|
for source_dir in sorted(CORPUS.iterdir()):
|
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
|
continue
|
|
for sc in source_dir.glob("*.json"):
|
|
try:
|
|
d = json.loads(sc.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
n_varieties += 1
|
|
for k in facets:
|
|
v = d.get(k)
|
|
if v:
|
|
facets[k][v] = facets[k].get(v, 0) + 1
|
|
|
|
if n_varieties == 0:
|
|
return (
|
|
"_(no varieties indexed yet — run scrapers + indexer "
|
|
"before calling search_docs)_"
|
|
)
|
|
|
|
_call.set(
|
|
varieties=n_varieties,
|
|
sources=len(facets["source"]),
|
|
vendors=len(facets["vendor"]),
|
|
brands=len(facets["brand"]),
|
|
crops=len(facets["crop"]),
|
|
)
|
|
|
|
lines = [f"# Corpus facets ({n_varieties} varieties indexed)", ""]
|
|
for label, counts in facets.items():
|
|
if not counts:
|
|
continue
|
|
lines.append(f"## {label}")
|
|
lines.append("")
|
|
for k, n in sorted(counts.items(), key=lambda kv: -kv[1]):
|
|
lines.append(f"- `{k}` — {n} varieties")
|
|
lines.append("")
|
|
return "\n".join(lines).rstrip()
|
|
|
|
|
|
@mcp.tool()
|
|
def lookup_variety(
|
|
source_key: Annotated[str, Field(description=(
|
|
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'."
|
|
))],
|
|
source: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL scraper source ('bayer_seeds' etc). If omitted, "
|
|
"scans all sources for the source_key."
|
|
)),
|
|
] = None,
|
|
) -> str:
|
|
"""Return the canonical sidecar JSON for one variety, verbatim.
|
|
|
|
USE THIS to fact-check any rating value before quoting it to the
|
|
farmer. The output is the exact data the scraper captured from the
|
|
vendor's published catalog — no paraphrasing, no inference.
|
|
|
|
Call this whenever the user (or you) needs an exact value for a
|
|
specific rating, trait, or maturity — not just a semantic match
|
|
from search_docs.
|
|
"""
|
|
with TimedCall("lookup_variety", {"source_key": source_key, "source": source}) as _call:
|
|
if source:
|
|
sidecar = _read_sidecar(source, source_key)
|
|
if sidecar is not None:
|
|
_call.set(found=True, source=source)
|
|
return "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```"
|
|
_call.set(found=False, source=source)
|
|
return f"_(variety '{source_key}' not found under source '{source}')_"
|
|
|
|
# No source given — scan all source dirs for a match.
|
|
if not CORPUS.exists():
|
|
_call.set(found=False)
|
|
return "_(corpus is empty — no scrapers have run yet)_"
|
|
matches: list[tuple[str, dict]] = []
|
|
for source_dir in sorted(CORPUS.iterdir()):
|
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
|
continue
|
|
sidecar = _read_sidecar(source_dir.name, source_key)
|
|
if sidecar is not None:
|
|
matches.append((source_dir.name, sidecar))
|
|
|
|
if not matches:
|
|
_call.set(found=False)
|
|
return (
|
|
f"_(no variety with source_key '{source_key}' found in any "
|
|
f"source. Use search_docs() to find the right key.)_"
|
|
)
|
|
|
|
_call.set(found=True, matches=len(matches))
|
|
if len(matches) == 1:
|
|
src_name, sidecar = matches[0]
|
|
return (
|
|
f"_(matched in source `{src_name}`)_\n\n"
|
|
"```json\n"
|
|
+ json.dumps(sidecar, indent=2, ensure_ascii=False)
|
|
+ "\n```"
|
|
)
|
|
|
|
# Multi-match: serialize each labeled.
|
|
out: list[str] = [f"_(matched {len(matches)} sources)_\n"]
|
|
for src_name, sidecar in matches:
|
|
out.append(f"### source: `{src_name}`")
|
|
out.append("```json")
|
|
out.append(json.dumps(sidecar, indent=2, ensure_ascii=False))
|
|
out.append("```")
|
|
return "\n".join(out)
|
|
|
|
|
|
@mcp.tool()
|
|
def crop_seed_api_lessons(
|
|
topic: Annotated[
|
|
str | None,
|
|
Field(description=(
|
|
"OPTIONAL topic — match against lesson section slugs or body "
|
|
"(substring, case-insensitive). Known slugs: pioneer, "
|
|
"rating-scales, maturity-semantics, trait-glossary, "
|
|
"scn-resistance, regional-listings, sources-not-yet-indexed, "
|
|
"checking-your-work. Omit for the full curated index."
|
|
)),
|
|
] = None,
|
|
) -> str:
|
|
"""Curated knowledge that does NOT live in the scraped corpus —
|
|
vendor scale-direction notes, trait glossary, maturity semantics,
|
|
SCN resistance interpretation, the **Pioneer fallback policy**,
|
|
and rules for fact-checking your work.
|
|
|
|
Call this tool when:
|
|
|
|
* The user asks about **Pioneer** or any P-series hybrid — Pioneer
|
|
is intentionally NOT scraped (ToS bans it); the lesson tells you
|
|
what to say instead.
|
|
* You need to compare ratings across vendors — different vendors
|
|
publish on different scale directions.
|
|
* You're parsing a trait code or disease abbreviation you don't
|
|
recognize.
|
|
* Before quoting a specific rating value to a farmer — the
|
|
``checking-your-work`` lesson reminds you to call
|
|
``lookup_variety`` to confirm.
|
|
|
|
This tool is **the only source of opinionated content** in the
|
|
server. Everything else returned by search_docs / get_page /
|
|
lookup_variety is verbatim from vendor catalogs.
|
|
"""
|
|
with TimedCall("crop_seed_api_lessons", {"topic": topic}) as _call:
|
|
sections = _load_lessons()
|
|
if not sections:
|
|
_call.set(sections_returned=0)
|
|
return "_(no lessons file present — docs_mcp/lessons.md missing)_"
|
|
|
|
if not topic:
|
|
_call.set(sections_returned=len(sections))
|
|
return "\n\n---\n\n".join(
|
|
f"## {slug}\n\n{body}" for slug, body in sections
|
|
)
|
|
|
|
needle = topic.strip().lower()
|
|
# Prefer slug matches (most specific). Fall back to body match
|
|
# only when no slug matches — keeps a query like "rating" from
|
|
# returning every section that happens to mention the word.
|
|
slug_matches: list[tuple[str, str]] = []
|
|
body_matches: list[tuple[str, str]] = []
|
|
for slug, body in sections:
|
|
if needle in slug.lower():
|
|
slug_matches.append((slug, body))
|
|
elif needle in body.lower():
|
|
body_matches.append((slug, body))
|
|
matched = slug_matches if slug_matches else body_matches
|
|
|
|
_call.set(sections_returned=len(matched), topic=topic)
|
|
if not matched:
|
|
slugs = ", ".join(s for s, _ in sections)
|
|
return (
|
|
f"_(no lesson section matched topic '{topic}'. "
|
|
f"Available slugs: {slugs}.)_"
|
|
)
|
|
return "\n\n---\n\n".join(
|
|
f"## {slug}\n\n{body}" for slug, body in matched
|
|
)
|
|
|
|
|
|
# ===========================================================================
|
|
# Entry point
|
|
# ===========================================================================
|
|
|
|
def main() -> None:
|
|
import argparse
|
|
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
|
|
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
|
|
default=os.environ.get("MCP_TRANSPORT", "stdio"))
|
|
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
|
|
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
|
|
args = p.parse_args()
|
|
|
|
if args.transport == "stdio":
|
|
mcp.run()
|
|
else:
|
|
mcp.settings.host = args.host
|
|
mcp.settings.port = args.port
|
|
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
|
|
mcp.settings.transport_security.enable_dns_rebinding_protection = False
|
|
mcp.run(transport=args.transport)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|