"""seed-mcp — MCP server over public US row-crop seed catalogs. Tools (all read-only): search_docs natural-language retrieval over the corpus get_page the full markdown body of one variety + sidecar list_versions facet discovery (crops / brands / vendors / sources) lookup_variety canonical sidecar JSON by source_key — fact-check anything you surface from search_docs against this The contract with the calling agent is **never fabricate**. Every chunk we return is verbatim from the source's published catalog (the chunker rebuilds chunks deterministically from the sidecar JSON). Every response carries the variety's source URL so the agent can cite. The ``lookup_variety`` tool exists specifically so the agent can validate specific rating values without having to trust paraphrased retrieval text. """ from __future__ import annotations import json import logging import os import re from pathlib import Path from typing import Annotated, Any from mcp.server.fastmcp import FastMCP from pydantic import Field from .usage import TimedCall log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Product-specific configuration. # --------------------------------------------------------------------------- PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed") PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp") COLLECTION = f"{PRODUCT_NAME}_docs" # Paths inside the deployed container (and matching layout locally for dev). ROOT = Path(__file__).resolve().parent.parent CORPUS = ROOT / "corpus" CHROMA_DIR = ROOT / "chroma" BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db"))) # --------------------------------------------------------------------------- # Feature flags. # --------------------------------------------------------------------------- RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None RERANK_POOL = int(os.environ.get("RERANK_POOL", "50")) RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30")) HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "true").lower() in ("true", "1", "yes", "on") RRF_K = int(os.environ.get("RRF_K", "60")) # --------------------------------------------------------------------------- # FastMCP setup. # --------------------------------------------------------------------------- mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True) # --------------------------------------------------------------------------- # Lazy singletons. Instantiate on first use so the server can start even # when (e.g.) Ollama is briefly unreachable — degraded modes fall back # gracefully rather than refusing to boot. # --------------------------------------------------------------------------- _chroma_client = None _chroma_collection = None _bm25 = None _code_index: dict[str, list[tuple[str, str]]] | None = None def _build_code_index() -> dict[str, list[tuple[str, str]]]: """Walk sidecars once and index varieties by every lookup key a farmer or LLM might paste: ``source_key``, ``hybrid_prefix``, ``product_name`` (normalized), each token from ``hybrid_prefix``/``product_name`` longer than three chars. Returns a dict mapping the normalized key → list of ``(source, source_key)`` tuples. Multiple-source matches are kept so we can warn the agent about ambiguity. """ idx: dict[str, list[tuple[str, str]]] = {} def _add(key: str, value: tuple[str, str]) -> None: key = (key or "").lower().strip() if not key or len(key) < 4: return idx.setdefault(key, []) if value not in idx[key]: idx[key].append(value) if not CORPUS.exists(): return idx for source_dir in CORPUS.iterdir(): if not source_dir.is_dir() or source_dir.name.startswith("."): continue for sc in source_dir.glob("*.json"): try: d = json.loads(sc.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue source = d.get("source") or source_dir.name sk = d.get("source_key") or sc.stem value = (source, sk) _add(sk, value) _add(d.get("hybrid_prefix") or "", value) _add(d.get("product_name") or "", value) # Token-split product_name and hybrid_prefix so "DKC62-08RIB" # in a query matches "DKC62-08RIB BRAND BLEND" via the # "DKC62-08RIB" token as well as the full string. for source_field in (d.get("product_name"), d.get("hybrid_prefix")): if not source_field: continue for tok in re.split(r"[\s()/,]+", source_field): tok = tok.strip() # Only retain tokens that LOOK like codes — at least # one digit, mostly alphanumeric/dash. Skips "BRAND" # / "BLEND" / etc. if re.match(r"^[A-Za-z]*\d[\w\-]*$", tok): _add(tok, value) return idx def _code_lookup() -> dict[str, list[tuple[str, str]]]: global _code_index if _code_index is None: _code_index = _build_code_index() log.info("code-index: %d lookup keys built", len(_code_index)) return _code_index # Tokens that LOOK like a variety code — at least one digit, otherwise # alphanumeric / dash. Catches "DKC62-08RIB", "AG29XF4", "WB1376CLP", # "VT2PRIB"; skips ordinary words like "ratings". _CODE_TOKEN_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9\-]{2,})\b") def _exact_code_matches(query: str) -> list[tuple[str, str]]: """Find varieties whose source_key / product_name / token-split components contain a query token. Used as a high-confidence pin before fuzzy retrieval.""" idx = _code_lookup() if not idx: return [] out: list[tuple[str, str]] = [] seen: set[tuple[str, str]] = set() for m in _CODE_TOKEN_RE.finditer(query or ""): tok = m.group(1).lower() if len(tok) < 4 or not any(c.isdigit() for c in tok): continue for v in idx.get(tok, []): if v not in seen: seen.add(v) out.append(v) return out def _collection(): """Return the Chroma collection, opening it lazily.""" global _chroma_client, _chroma_collection if _chroma_collection is not None: return _chroma_collection import chromadb from chromadb.config import Settings from rag.embeddings import embedding_function _chroma_client = chromadb.PersistentClient( path=str(CHROMA_DIR), settings=Settings(anonymized_telemetry=False), ) _chroma_collection = _chroma_client.get_collection( COLLECTION, embedding_function=embedding_function() ) return _chroma_collection def _bm25_index(): """Return the BM25 index, or None if it doesn't exist on disk.""" global _bm25 if _bm25 is not None: return _bm25 from rag.bm25 import BM25Index idx = BM25Index(BM25_DB) if not idx.exists(): return None _bm25 = idx return _bm25 # --------------------------------------------------------------------------- # Helpers — local file IO + filter builder. # --------------------------------------------------------------------------- def _build_where( crop: str | None, brand: str | None, vendor: str | None, source: str | None, source_key: str | None, ) -> dict | None: """Translate filter args into a Chroma `where` clause.""" conds: list[dict] = [] if crop: conds.append({"crop": crop.lower()}) if brand: conds.append({"brand": brand.upper()}) if vendor: conds.append({"vendor": vendor}) if source: conds.append({"source": source}) if source_key: conds.append({"source_key": source_key}) if not conds: return None if len(conds) == 1: return conds[0] return {"$and": conds} def _read_sidecar(source: str, source_key: str) -> dict | None: """Read a variety's sidecar JSON off disk.""" path = CORPUS / source / f"{source_key}.json" if not path.exists(): return None try: return json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: log.warning("sidecar read failed for %s/%s: %s", source, source_key, exc) return None def _read_markdown(source: str, source_key: str) -> str | None: path = CORPUS / source / f"{source_key}.md" if not path.exists(): return None try: return path.read_text(encoding="utf-8") except OSError: return None def _format_hit(doc: str, meta: dict, distance: float | None = None) -> str: """Render one retrieval hit as a fenced markdown block with full provenance attached. ``doc`` is the chunk text; ``meta`` is the chunk's metadata dict.""" src_url = meta.get("source_url") or "" src_key = meta.get("source_key") or "" src = meta.get("source") or "" vendor = meta.get("vendor") or "" brand = meta.get("brand") or "" crop = meta.get("crop") or "" name = meta.get("product_name") or src_key header = ( f"### {name} \n" f"`{src}::{src_key}` — {vendor} / {brand} / {crop} \n" f"<{src_url}>" ) if distance is not None: header += f" \n_(distance={distance:.4f})_" return f"{header}\n\n{doc.strip()}\n" def _rrf_fuse(rankings: list[list[str]], k: int = RRF_K) -> list[str]: """Reciprocal Rank Fusion — merge multiple ranked id lists into one. Score(id) = sum over each ranking R of 1 / (k + rank_R(id)). Robust to score-scale differences between dense (cosine) and lexical (BM25). k=60 is the literature default; not particularly sensitive. """ scores: dict[str, float] = {} for ranking in rankings: for rank, doc_id in enumerate(ranking): scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1) return sorted(scores, key=lambda d: scores[d], reverse=True) def _structured_ratings_block(sidecar: dict) -> str: """Render the sidecar's grouped characteristics + identity as a fact-checkable block, with the source URL pinned at top. This is the response body for ``get_page`` and the embedded "canonical data" block agents should trust over any free-text paraphrase.""" lines: list[str] = [] name = sidecar.get("product_name") or sidecar.get("source_key", "") lines.append(f"# {name}") lines.append("") facets: list[str] = [] if sidecar.get("vendor"): facets.append(f"**Vendor:** {sidecar['vendor']}") if sidecar.get("brand"): facets.append(f"**Brand:** {str(sidecar['brand']).title()}") if sidecar.get("crop"): facets.append(f"**Crop:** {str(sidecar['crop']).capitalize()}") if sidecar.get("relative_maturity") and sidecar.get("crop") == "corn": facets.append(f"**Relative maturity:** {sidecar['relative_maturity']}") if sidecar.get("maturity_group") and sidecar.get("crop") == "soybeans": facets.append(f"**Maturity group:** {sidecar['maturity_group']}") if sidecar.get("relative_maturity") and sidecar.get("crop") == "wheat": facets.append(f"**Maturity:** {sidecar['relative_maturity']}") if sidecar.get("wheat_class"): facets.append(f"**Wheat class:** {sidecar['wheat_class']}") if sidecar.get("release_year"): facets.append(f"**Released:** {sidecar['release_year']}") if sidecar.get("trait_stack"): facets.append(f"**Trait stack:** {', '.join(sidecar['trait_stack'])}") if facets: lines.append(" · ".join(facets)) lines.append("") urls = sidecar.get("source_urls") or [] if urls: lines.append(f"**Source:** {urls[0]}") lines.append("") scale = sidecar.get("_scale_direction") or "scale direction not declared by source" lines.append(f"**Rating scale (as published):** {scale}") lines.append("") # Canonical ratings — one table per group, values verbatim. for g in sidecar.get("characteristics_groups") or []: label = g.get("label") or "Characteristics" items = g.get("items") or [] if not items: continue lines.append(f"## {label.title()}") lines.append("") lines.append("| Characteristic | Value |") lines.append("|---|---|") for it in items: ch = (it.get("characteristic") or "").strip() v = (it.get("value") or "").strip() lines.append(f"| {ch} | {v} |") lines.append("") if sidecar.get("positioning_statement"): lines.append("## Vendor positioning") lines.append("") lines.append(sidecar["positioning_statement"].strip()) lines.append("") if sidecar.get("strengths"): lines.append("## Strengths / management notes (vendor copy)") lines.append("") for s in sidecar["strengths"]: lines.append(f"- {s}") lines.append("") rec = sidecar.get("regional_recommendations") or [] if rec: names = sorted({ (r.get("product_list_name") or "").strip() for r in rec if (r.get("product_list_name") or "").strip() }) if names: lines.append("## Listed in vendor regional seed guides") lines.append("") for n in names: lines.append(f"- {n}") lines.append("") return "\n".join(lines).rstrip() + "\n" # =========================================================================== # Tools # =========================================================================== @mcp.tool() def search_docs( query: Annotated[str, Field(description=( "Natural-language query about row-crop seed varieties. " "Mention crop (corn/soybeans/wheat), maturity (RM days / " "MG number / wheat class), traits, disease tolerances, " "or soil/region constraints — they all carry retrieval signal." ))], crop: Annotated[ str | None, Field(description="OPTIONAL filter: corn, soybeans, or wheat."), ] = None, brand: Annotated[ str | None, Field(description=( "OPTIONAL brand filter — DEKALB, ASGROW, WESTBRED, " "GoldenHarvest, NK, AgriPro, Becks. Case-insensitive." )), ] = None, vendor: Annotated[ str | None, Field(description="OPTIONAL vendor filter — Bayer, Syngenta, Beck's."), ] = None, source: Annotated[ str | None, Field(description=( "OPTIONAL source filter — e.g. 'bayer_seeds'. Use " "list_versions() to see what's indexed." )), ] = None, k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10, ) -> str: """Search the seed-variety corpus for hybrids/varieties matching a query. Returns the top-k variety chunks with their full source URLs, ratings, maturity, traits, and regional listings. Optional filters narrow to one crop, brand, vendor, or scraper source. Use **list_versions()** first to discover valid facet values. Use **lookup_variety()** on any candidate the user is serious about — that returns the canonical sidecar so you can verify exact rating values without trusting the free-text chunk. Call this tool whenever an ag professional or farmer asks anything about choosing a seed variety, comparing hybrids, or finding seed matched to a maturity / soil / region / disease constraint. NEVER answer seed-selection questions from prior knowledge alone — seed catalogs change yearly and brand-specific. Always search first. """ with TimedCall("search_docs", { "query": query, "crop": crop, "brand": brand, "vendor": vendor, "source": source, "k": k, }) as _call: where = _build_where(crop, brand, vendor, source, None) pool_size = max(k * 3, RERANK_POOL) # Exact-code pre-filter. Variety codes ("DKC62-08RIB", "AG29XF4") # have NO semantic neighbors — dense retrieval misses them and # RRF can let off-topic noise float to top-1. If the query # contains a token that exactly matches a variety's identifier, # pin those varieties to the top of results. pinned_ids: list[str] = [] for src, sk in _exact_code_matches(query): pinned_ids.append(f"{src}::{sk}::0") # Dense retrieval via Chroma. try: col = _collection() except Exception as exc: # noqa: BLE001 _call.set(error_dense=str(exc), hits_returned=0) return ( "_(retrieval unavailable — Chroma collection not found. " "Has the indexer run? `python -m rag.index --rebuild`.)_" ) try: dense = col.query( query_texts=[query], n_results=pool_size, where=where, ) except Exception as exc: # noqa: BLE001 _call.set(error_dense=str(exc), hits_returned=0) return f"_(retrieval failed: {exc})_" dense_ids: list[str] = (dense.get("ids") or [[]])[0] dense_docs: list[str] = (dense.get("documents") or [[]])[0] dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0] dense_dists: list[float] = (dense.get("distances") or [[]])[0] id_to_doc = dict(zip(dense_ids, dense_docs)) id_to_meta = dict(zip(dense_ids, dense_metas)) id_to_dist = dict(zip(dense_ids, dense_dists)) # Hybrid: optionally fuse with BM25. Both are pulled at pool_size, # fused via RRF, top-k returned. used_hybrid = False if HYBRID_SEARCH: bm25 = _bm25_index() if bm25 is not None: bm25_hits = bm25.query(query, n=pool_size, where=where) bm25_ids = [h[0] for h in bm25_hits] if bm25_ids: fused = _rrf_fuse([dense_ids, bm25_ids]) fuzzy_ids = fused used_hybrid = True else: fuzzy_ids = dense_ids else: fuzzy_ids = dense_ids else: fuzzy_ids = dense_ids # Pin exact-code matches at top, then fill remainder from fuzzy # retrieval (deduped). Pinned matches are deterministic and # high-confidence; they should never lose to a fuzzy match. final_ids: list[str] = [] seen: set[str] = set() for cid in pinned_ids + fuzzy_ids: if cid in seen: continue seen.add(cid) final_ids.append(cid) if len(final_ids) >= k: break # For ids returned by BM25 but not by Chroma, we need their docs/ # metadata too. Re-fetch by id. missing = [i for i in final_ids if i not in id_to_doc] if missing: try: extra = col.get(ids=missing, include=["documents", "metadatas"]) for cid, doc, meta in zip( extra.get("ids") or [], extra.get("documents") or [], extra.get("metadatas") or [], ): id_to_doc[cid] = doc id_to_meta[cid] = meta except Exception as exc: # noqa: BLE001 log.warning("get-by-id for BM25-only hits failed: %s", exc) _call.set( hits_returned=len(final_ids), hybrid=used_hybrid, pool_size=pool_size, ) if not final_ids: return ( "_(no varieties matched. Try broadening the query or " "removing filters — call list_versions() to see what's " "indexed.)_" ) blocks: list[str] = [] for cid in final_ids: doc = id_to_doc.get(cid, "") meta = id_to_meta.get(cid, {}) dist = id_to_dist.get(cid) if not used_hybrid else None blocks.append(_format_hit(doc, meta, dist)) header = ( f"# Search results — {len(final_ids)} variety chunk" f"{'s' if len(final_ids) != 1 else ''}" f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n" f"_Use `lookup_variety(source_key=...)` on any candidate " f"to fact-check ratings from the canonical sidecar._\n\n---\n\n" ) return header + "\n---\n\n".join(blocks) @mcp.tool() def get_page( source: Annotated[str, Field(description=( "Scraper source id — e.g. 'bayer_seeds'. Same as the `source` " "field in search_docs results." ))], source_key: Annotated[str, Field(description=( "Per-variety stable key — e.g. 'dekalb-dkc62-08rib'. Same as " "the `source_key` field in search_docs results." ))], ) -> str: """Return the full canonical record for one variety. Emits a structured ratings header (identity, all characteristic groups, vendor positioning, regional listings) sourced from the variety's sidecar JSON, followed by the raw markdown body the chunker indexed. Use this when the user is comparing varieties closely or wants to see every published rating value, not just the ones that matched a query. The structured header is the canonical fact-check surface; use it to validate any specific rating value before quoting it. """ with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call: sidecar = _read_sidecar(source, source_key) if sidecar is None: _call.set(found=False) return ( f"_(variety not found: source='{source}' " f"source_key='{source_key}'. Use list_versions() to see " f"available sources or search_docs() to find candidates.)_" ) body = _read_markdown(source, source_key) or "" structured = _structured_ratings_block(sidecar) _call.set(found=True, body_chars=len(body), groups=len(sidecar.get("characteristics_groups") or [])) sep = "\n\n---\n\n## Indexed body (chunk source text)\n\n" return structured + sep + body @mcp.tool() def list_versions() -> str: """List the available scraper sources and the crop/brand/vendor facets across all indexed varieties. Use this BEFORE search_docs() the first time you query, to discover which scraper sources, brands, vendors, and crops are actually in the corpus right now. The agent should not assume — the corpus grows as more vendors are scraped. """ with TimedCall("list_versions", {}) as _call: facets: dict[str, dict[str, int]] = { "source": {}, "vendor": {}, "brand": {}, "crop": {}, } n_varieties = 0 if CORPUS.exists(): for source_dir in sorted(CORPUS.iterdir()): if not source_dir.is_dir() or source_dir.name.startswith("."): continue for sc in source_dir.glob("*.json"): try: d = json.loads(sc.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue n_varieties += 1 for k in facets: v = d.get(k) if v: facets[k][v] = facets[k].get(v, 0) + 1 if n_varieties == 0: return ( "_(no varieties indexed yet — run scrapers + indexer " "before calling search_docs)_" ) _call.set( varieties=n_varieties, sources=len(facets["source"]), vendors=len(facets["vendor"]), brands=len(facets["brand"]), crops=len(facets["crop"]), ) lines = [f"# Corpus facets ({n_varieties} varieties indexed)", ""] for label, counts in facets.items(): if not counts: continue lines.append(f"## {label}") lines.append("") for k, n in sorted(counts.items(), key=lambda kv: -kv[1]): lines.append(f"- `{k}` — {n} varieties") lines.append("") return "\n".join(lines).rstrip() @mcp.tool() def lookup_variety( source_key: Annotated[str, Field(description=( "Per-variety stable key — e.g. 'dekalb-dkc62-08rib'." ))], source: Annotated[ str | None, Field(description=( "OPTIONAL scraper source ('bayer_seeds' etc). If omitted, " "scans all sources for the source_key." )), ] = None, ) -> str: """Return the canonical sidecar JSON for one variety, verbatim. USE THIS to fact-check any rating value before quoting it to the farmer. The output is the exact data the scraper captured from the vendor's published catalog — no paraphrasing, no inference. Call this whenever the user (or you) needs an exact value for a specific rating, trait, or maturity — not just a semantic match from search_docs. """ with TimedCall("lookup_variety", {"source_key": source_key, "source": source}) as _call: if source: sidecar = _read_sidecar(source, source_key) if sidecar is not None: _call.set(found=True, source=source) return "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```" _call.set(found=False, source=source) return f"_(variety '{source_key}' not found under source '{source}')_" # No source given — scan all source dirs for a match. if not CORPUS.exists(): _call.set(found=False) return "_(corpus is empty — no scrapers have run yet)_" matches: list[tuple[str, dict]] = [] for source_dir in sorted(CORPUS.iterdir()): if not source_dir.is_dir() or source_dir.name.startswith("."): continue sidecar = _read_sidecar(source_dir.name, source_key) if sidecar is not None: matches.append((source_dir.name, sidecar)) if not matches: _call.set(found=False) return ( f"_(no variety with source_key '{source_key}' found in any " f"source. Use search_docs() to find the right key.)_" ) _call.set(found=True, matches=len(matches)) if len(matches) == 1: src_name, sidecar = matches[0] return ( f"_(matched in source `{src_name}`)_\n\n" "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```" ) # Multi-match: serialize each labeled. out: list[str] = [f"_(matched {len(matches)} sources)_\n"] for src_name, sidecar in matches: out.append(f"### source: `{src_name}`") out.append("```json") out.append(json.dumps(sidecar, indent=2, ensure_ascii=False)) out.append("```") return "\n".join(out) # =========================================================================== # Entry point # =========================================================================== def main() -> None: import argparse p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server") p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"], default=os.environ.get("MCP_TRANSPORT", "stdio")) p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0")) p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000"))) args = p.parse_args() if args.transport == "stdio": mcp.run() else: mcp.settings.host = args.host mcp.settings.port = args.port if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}: mcp.settings.transport_security.enable_dns_rebinding_protection = False mcp.run(transport=args.transport) if __name__ == "__main__": main()