Phase 2/3: chunker + indexer + MCP server tools

Phase 2 — Chunking and indexing
- rag/chunk.py: replace template chunker with seed-variety-specific
  chunks_from_variety(). One chunk per variety (varieties are small
  and named-rating retrieval signal is best kept together). Output
  is rebuilt deterministically from the sidecar JSON: every value is
  verbatim from the source, only framing language ("Disease ratings
  (1-9, 9=best):") is template glue. Anti-hallucination contract:
  same sidecar in → same chunk out, never a fabricated rating.
  Metadata flattened to Chroma-safe primitives (str/int/float/bool):
  source, source_key, vendor, brand, crop, product_name,
  product_id, source_url, rm (corn), mg (soy), wheat_class,
  release_year, trait_codes_csv, rating_scale.
- rag/index.py: walks corpus/<source>/<source_key>.json sidecars
  via the new chunker. Default PRODUCT_NAME=crop_seed so the
  Chroma collection is crop_seed_docs.
- rag/bm25.py: filterable columns updated to seed-domain facets
  (source/vendor/brand/crop/source_key) instead of the template's
  version/platform/product.

Phase 3 — MCP server tools wired up
- search_docs: hybrid dense (Chroma) + BM25 (FTS5) retrieval with
  RRF fusion. Optional filters: crop, brand, vendor, source.
  Variety-code prefilter pins exact source_key / product_name /
  hybrid_prefix matches at the top — dense embeddings have no
  semantic neighbor for tokens like "DKC62-08RIB" and RRF can let
  noise float to #1 without this pin. Each response carries the
  variety's source URL inline so the agent can cite.
- get_page(source, source_key): emits a structured ratings header
  (verbatim from sidecar, table per characteristics group, vendor
  positioning, regional listings) followed by the raw indexed body.
  This is the canonical fact-check surface.
- list_versions(): facet discovery — distinct sources, vendors,
  brands, crops across the corpus.
- lookup_variety(source_key, source?): returns the raw sidecar JSON
  for one variety. The agent should call this BEFORE quoting any
  specific rating value to a farmer — guaranteed verbatim.

Smoke tests against 475 indexed Bayer varieties:
- list_versions returns 475 varieties, 1 source, 1 vendor, 3 brands,
  3 crops with correct per-brand counts (288/102/85).
- Semantic ag queries find the right candidates: short-season
  drought-tolerant corn → DKC44-97RIB at RM 94 (in 90-95 band);
  SCN+MG3 soybean → Asgrow XF varieties with explicit SCN R3 ratings;
  Phytophthora Rps3a soy → AG07XF4 (right gene); stripe-rust
  wheat → WestBred WB1376CLP (Yellow Rust 2 = best).
- Variety-code lookups work via prefilter: DKC62-08RIB, AG29XF4,
  WB6430 all return as #1 hit. BM25 confirms ranking unambiguously
  (top-1 score -13.2 vs -8.5 for #2 on "DKC62-08RIB ratings").
- Server boots cleanly in stdio AND streamable-http modes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 13:14:16 -04:00
parent 0fb8d9d92d
commit a766756a05
4 changed files with 982 additions and 369 deletions
+607 -132
View File
@@ -1,20 +1,20 @@
"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies.
"""seed-mcp — MCP server over public US row-crop seed catalogs.
This file is the template's structural anchor. The phases described in
PLAN.md add or extend pieces of this file:
Tools (all read-only):
Phase 3 — search_docs, get_page, list_versions stubs (you are here)
Phase 6 — reranker integration in search_docs
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse)
Phase 9 — diff_versions, list_cluster, bundle_changelog
Phase 10 — TimedCall wiring (already imported below)
Phase 11 — <product>_api_lessons tool
Phase 12 — find_doc_inconsistencies, submit_doc_bug
Phase 13 — weekly_digest + _digest_history reader
search_docs natural-language retrieval over the corpus
get_page the full markdown body of one variety + sidecar
list_versions facet discovery (crops / brands / vendors / sources)
lookup_variety canonical sidecar JSON by source_key — fact-check
anything you surface from search_docs against this
Every stub below has a docstring + `raise NotImplementedError`. Replace
the body when you reach the corresponding phase. Keep the signatures
stable across products — clients depend on them.
The contract with the calling agent is **never fabricate**. Every chunk
we return is verbatim from the source's published catalog (the chunker
rebuilds chunks deterministically from the sidecar JSON). Every
response carries the variety's source URL so the agent can cite. The
``lookup_variety`` tool exists specifically so the agent can validate
specific rating values without having to trust paraphrased retrieval
text.
"""
from __future__ import annotations
@@ -23,7 +23,7 @@ import logging
import os
import re
from pathlib import Path
from typing import Annotated
from typing import Annotated, Any
from mcp.server.fastmcp import FastMCP
from pydantic import Field
@@ -33,7 +33,7 @@ from .usage import TimedCall
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Product-specific configuration. Set these for each new build.
# Product-specific configuration.
# ---------------------------------------------------------------------------
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp")
@@ -44,59 +44,176 @@ ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma"
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
BUNDLES_JSON = ROOT / "bundles.json"
# ---------------------------------------------------------------------------
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase).
# Feature flags.
# ---------------------------------------------------------------------------
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "true").lower() in ("true", "1", "yes", "on")
RRF_K = int(os.environ.get("RRF_K", "60"))
DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on")
DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint
DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15"))
# ---------------------------------------------------------------------------
# FastMCP setup.
#
# stateless_http=True — every request creates an ephemeral session and
# discards it on return. Critical for production: clients don't get
# 404 storms when the container is recreated by Watchtower.
# ---------------------------------------------------------------------------
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
# ---------------------------------------------------------------------------
# Lazy helpers — instantiate expensive things only when actually needed,
# so the server still starts when (e.g.) Ollama is briefly unreachable.
# Lazy singletons. Instantiate on first use so the server can start even
# when (e.g.) Ollama is briefly unreachable — degraded modes fall back
# gracefully rather than refusing to boot.
# ---------------------------------------------------------------------------
_chroma_client = None
_chroma_collection = None
_bm25 = None
_code_index: dict[str, list[tuple[str, str]]] | None = None
def _build_code_index() -> dict[str, list[tuple[str, str]]]:
"""Walk sidecars once and index varieties by every lookup key a
farmer or LLM might paste: ``source_key``, ``hybrid_prefix``,
``product_name`` (normalized), each token from
``hybrid_prefix``/``product_name`` longer than three chars.
Returns a dict mapping the normalized key → list of
``(source, source_key)`` tuples. Multiple-source matches are kept
so we can warn the agent about ambiguity.
"""
idx: dict[str, list[tuple[str, str]]] = {}
def _add(key: str, value: tuple[str, str]) -> None:
key = (key or "").lower().strip()
if not key or len(key) < 4:
return
idx.setdefault(key, [])
if value not in idx[key]:
idx[key].append(value)
if not CORPUS.exists():
return idx
for source_dir in CORPUS.iterdir():
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
for sc in source_dir.glob("*.json"):
try:
d = json.loads(sc.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
source = d.get("source") or source_dir.name
sk = d.get("source_key") or sc.stem
value = (source, sk)
_add(sk, value)
_add(d.get("hybrid_prefix") or "", value)
_add(d.get("product_name") or "", value)
# Token-split product_name and hybrid_prefix so "DKC62-08RIB"
# in a query matches "DKC62-08RIB BRAND BLEND" via the
# "DKC62-08RIB" token as well as the full string.
for source_field in (d.get("product_name"), d.get("hybrid_prefix")):
if not source_field:
continue
for tok in re.split(r"[\s()/,]+", source_field):
tok = tok.strip()
# Only retain tokens that LOOK like codes — at least
# one digit, mostly alphanumeric/dash. Skips "BRAND"
# / "BLEND" / etc.
if re.match(r"^[A-Za-z]*\d[\w\-]*$", tok):
_add(tok, value)
return idx
def _code_lookup() -> dict[str, list[tuple[str, str]]]:
global _code_index
if _code_index is None:
_code_index = _build_code_index()
log.info("code-index: %d lookup keys built", len(_code_index))
return _code_index
# Tokens that LOOK like a variety code — at least one digit, otherwise
# alphanumeric / dash. Catches "DKC62-08RIB", "AG29XF4", "WB1376CLP",
# "VT2PRIB"; skips ordinary words like "ratings".
_CODE_TOKEN_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9\-]{2,})\b")
def _exact_code_matches(query: str) -> list[tuple[str, str]]:
"""Find varieties whose source_key / product_name / token-split
components contain a query token. Used as a high-confidence pin
before fuzzy retrieval."""
idx = _code_lookup()
if not idx:
return []
out: list[tuple[str, str]] = []
seen: set[tuple[str, str]] = set()
for m in _CODE_TOKEN_RE.finditer(query or ""):
tok = m.group(1).lower()
if len(tok) < 4 or not any(c.isdigit() for c in tok):
continue
for v in idx.get(tok, []):
if v not in seen:
seen.add(v)
out.append(v)
return out
def _collection():
"""Return the Chroma collection, opening it lazily."""
global _chroma_client, _chroma_collection
if _chroma_collection is not None:
return _chroma_collection
import chromadb
from chromadb.config import Settings
from rag.embeddings import embedding_function
_chroma_client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
_chroma_collection = _chroma_client.get_collection(
COLLECTION, embedding_function=embedding_function()
)
return _chroma_collection
def _bm25_index():
"""Return the BM25 index, or None if it doesn't exist on disk."""
global _bm25
if _bm25 is not None:
return _bm25
from rag.bm25 import BM25Index
idx = BM25Index(BM25_DB)
if not idx.exists():
return None
_bm25 = idx
return _bm25
# ---------------------------------------------------------------------------
# Helpers — local file IO + filter builder.
# ---------------------------------------------------------------------------
def _bundles() -> dict[str, dict]:
"""Cached load of bundles.json into a {slug: bundle_dict} mapping.
bundles.json is the product-specific catalog written by the Phase 1
scraper. See PLAN.md Phase 1 for the schema.
"""
if not BUNDLES_JSON.exists():
return {}
cat = json.loads(BUNDLES_JSON.read_text())
return {b["slug"]: b for b in cat}
def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
def _build_where(
crop: str | None,
brand: str | None,
vendor: str | None,
source: str | None,
source_key: str | None,
) -> dict | None:
"""Translate filter args into a Chroma `where` clause."""
conds: list[dict] = []
if version:
conds.append({"version": version})
if platform:
conds.append({"platform": platform})
if bundle_id:
conds.append({"bundle_id": bundle_id})
if crop:
conds.append({"crop": crop.lower()})
if brand:
conds.append({"brand": brand.upper()})
if vendor:
conds.append({"vendor": vendor})
if source:
conds.append({"source": source})
if source_key:
conds.append({"source_key": source_key})
if not conds:
return None
if len(conds) == 1:
@@ -104,13 +221,152 @@ def _build_where(version: str | None, platform: str | None, bundle_id: str | Non
return {"$and": conds}
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict)."""
md_path = CORPUS / bundle_id / (page_id + ".md")
json_path = CORPUS / bundle_id / (page_id + ".json")
if not md_path.exists() or not json_path.exists():
def _read_sidecar(source: str, source_key: str) -> dict | None:
"""Read a variety's sidecar JSON off disk."""
path = CORPUS / source / f"{source_key}.json"
if not path.exists():
return None
return md_path.read_text(), json.loads(json_path.read_text())
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
log.warning("sidecar read failed for %s/%s: %s", source, source_key, exc)
return None
def _read_markdown(source: str, source_key: str) -> str | None:
path = CORPUS / source / f"{source_key}.md"
if not path.exists():
return None
try:
return path.read_text(encoding="utf-8")
except OSError:
return None
def _format_hit(doc: str, meta: dict, distance: float | None = None) -> str:
"""Render one retrieval hit as a fenced markdown block with full
provenance attached. ``doc`` is the chunk text; ``meta`` is the
chunk's metadata dict."""
src_url = meta.get("source_url") or ""
src_key = meta.get("source_key") or ""
src = meta.get("source") or ""
vendor = meta.get("vendor") or ""
brand = meta.get("brand") or ""
crop = meta.get("crop") or ""
name = meta.get("product_name") or src_key
header = (
f"### {name} \n"
f"`{src}::{src_key}` — {vendor} / {brand} / {crop} \n"
f"<{src_url}>"
)
if distance is not None:
header += f" \n_(distance={distance:.4f})_"
return f"{header}\n\n{doc.strip()}\n"
def _rrf_fuse(rankings: list[list[str]], k: int = RRF_K) -> list[str]:
"""Reciprocal Rank Fusion — merge multiple ranked id lists into one.
Score(id) = sum over each ranking R of 1 / (k + rank_R(id)).
Robust to score-scale differences between dense (cosine) and lexical
(BM25). k=60 is the literature default; not particularly sensitive.
"""
scores: dict[str, float] = {}
for ranking in rankings:
for rank, doc_id in enumerate(ranking):
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)
return sorted(scores, key=lambda d: scores[d], reverse=True)
def _structured_ratings_block(sidecar: dict) -> str:
"""Render the sidecar's grouped characteristics + identity as a
fact-checkable block, with the source URL pinned at top.
This is the response body for ``get_page`` and the embedded
"canonical data" block agents should trust over any free-text
paraphrase."""
lines: list[str] = []
name = sidecar.get("product_name") or sidecar.get("source_key", "")
lines.append(f"# {name}")
lines.append("")
facets: list[str] = []
if sidecar.get("vendor"):
facets.append(f"**Vendor:** {sidecar['vendor']}")
if sidecar.get("brand"):
facets.append(f"**Brand:** {str(sidecar['brand']).title()}")
if sidecar.get("crop"):
facets.append(f"**Crop:** {str(sidecar['crop']).capitalize()}")
if sidecar.get("relative_maturity") and sidecar.get("crop") == "corn":
facets.append(f"**Relative maturity:** {sidecar['relative_maturity']}")
if sidecar.get("maturity_group") and sidecar.get("crop") == "soybeans":
facets.append(f"**Maturity group:** {sidecar['maturity_group']}")
if sidecar.get("relative_maturity") and sidecar.get("crop") == "wheat":
facets.append(f"**Maturity:** {sidecar['relative_maturity']}")
if sidecar.get("wheat_class"):
facets.append(f"**Wheat class:** {sidecar['wheat_class']}")
if sidecar.get("release_year"):
facets.append(f"**Released:** {sidecar['release_year']}")
if sidecar.get("trait_stack"):
facets.append(f"**Trait stack:** {', '.join(sidecar['trait_stack'])}")
if facets:
lines.append(" · ".join(facets))
lines.append("")
urls = sidecar.get("source_urls") or []
if urls:
lines.append(f"**Source:** {urls[0]}")
lines.append("")
scale = sidecar.get("_scale_direction") or "scale direction not declared by source"
lines.append(f"**Rating scale (as published):** {scale}")
lines.append("")
# Canonical ratings — one table per group, values verbatim.
for g in sidecar.get("characteristics_groups") or []:
label = g.get("label") or "Characteristics"
items = g.get("items") or []
if not items:
continue
lines.append(f"## {label.title()}")
lines.append("")
lines.append("| Characteristic | Value |")
lines.append("|---|---|")
for it in items:
ch = (it.get("characteristic") or "").strip()
v = (it.get("value") or "").strip()
lines.append(f"| {ch} | {v} |")
lines.append("")
if sidecar.get("positioning_statement"):
lines.append("## Vendor positioning")
lines.append("")
lines.append(sidecar["positioning_statement"].strip())
lines.append("")
if sidecar.get("strengths"):
lines.append("## Strengths / management notes (vendor copy)")
lines.append("")
for s in sidecar["strengths"]:
lines.append(f"- {s}")
lines.append("")
rec = sidecar.get("regional_recommendations") or []
if rec:
names = sorted({
(r.get("product_list_name") or "").strip()
for r in rec
if (r.get("product_list_name") or "").strip()
})
if names:
lines.append("## Listed in vendor regional seed guides")
lines.append("")
for n in names:
lines.append(f"- {n}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
# ===========================================================================
@@ -119,119 +375,340 @@ def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
@mcp.tool()
def search_docs(
query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")],
version: Annotated[
query: Annotated[str, Field(description=(
"Natural-language query about row-crop seed varieties. "
"Mention crop (corn/soybeans/wheat), maturity (RM days / "
"MG number / wheat class), traits, disease tolerances, "
"or soil/region constraints — they all carry retrieval signal."
))],
crop: Annotated[
str | None,
Field(description="OPTIONAL version filter — restrict to one product version."),
Field(description="OPTIONAL filter: corn, soybeans, or wheat."),
] = None,
platform: Annotated[
brand: Annotated[
str | None,
Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."),
Field(description=(
"OPTIONAL brand filter — DEKALB, ASGROW, WESTBRED, "
"GoldenHarvest, NK, AgriPro, Becks. Case-insensitive."
)),
] = None,
bundle_id: Annotated[
vendor: Annotated[
str | None,
Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."),
Field(description="OPTIONAL vendor filter — Bayer, Syngenta, Beck's."),
] = None,
source: Annotated[
str | None,
Field(description=(
"OPTIONAL source filter — e.g. 'bayer_seeds'. Use "
"list_versions() to see what's indexed."
)),
] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str:
"""Search the {product} docs corpus.
"""Search the seed-variety corpus for hybrids/varieties matching a query.
Returns the top-k most relevant chunks (with full source page URLs)
given a natural-language query. Optional filters narrow the search
to one version, one platform, or one bundle. Use list_versions()
first if you need to discover the available facet values.
Returns the top-k variety chunks with their full source URLs, ratings,
maturity, traits, and regional listings. Optional filters narrow to one
crop, brand, vendor, or scraper source. Use **list_versions()** first
to discover valid facet values. Use **lookup_variety()** on any
candidate the user is serious about — that returns the canonical
sidecar so you can verify exact rating values without trusting the
free-text chunk.
Call this tool whenever the user asks anything that should be
answerable from the official product documentation.
Call this tool whenever an ag professional or farmer asks anything
about choosing a seed variety, comparing hybrids, or finding seed
matched to a maturity / soil / region / disease constraint.
NEVER answer seed-selection questions from prior knowledge alone —
seed catalogs change yearly and brand-specific. Always search first.
"""
with TimedCall("search_docs", {
"query": query, "version": version, "platform": platform,
"bundle_id": bundle_id, "k": k,
"query": query, "crop": crop, "brand": brand,
"vendor": vendor, "source": source, "k": k,
}) as _call:
# TODO Phase 2-3: query Chroma collection (see rag/index.py for
# how it was built). Render the top-k chunks as markdown with
# source URLs.
# TODO Phase 6: optional reranker via _rerank() if RERANK_URL set.
# TODO Phase 8: hybrid retrieval if HYBRID_SEARCH=true — run
# dense + BM25 in parallel, RRF-fuse, hand merged pool to rerank.
_call.set(hits_returned=0)
raise NotImplementedError("Phase 2/3: implement Chroma query + rendering")
where = _build_where(crop, brand, vendor, source, None)
pool_size = max(k * 3, RERANK_POOL)
# Exact-code pre-filter. Variety codes ("DKC62-08RIB", "AG29XF4")
# have NO semantic neighbors — dense retrieval misses them and
# RRF can let off-topic noise float to top-1. If the query
# contains a token that exactly matches a variety's identifier,
# pin those varieties to the top of results.
pinned_ids: list[str] = []
for src, sk in _exact_code_matches(query):
pinned_ids.append(f"{src}::{sk}::0")
# Dense retrieval via Chroma.
try:
col = _collection()
except Exception as exc: # noqa: BLE001
_call.set(error_dense=str(exc), hits_returned=0)
return (
"_(retrieval unavailable — Chroma collection not found. "
"Has the indexer run? `python -m rag.index --rebuild`.)_"
)
try:
dense = col.query(
query_texts=[query],
n_results=pool_size,
where=where,
)
except Exception as exc: # noqa: BLE001
_call.set(error_dense=str(exc), hits_returned=0)
return f"_(retrieval failed: {exc})_"
dense_ids: list[str] = (dense.get("ids") or [[]])[0]
dense_docs: list[str] = (dense.get("documents") or [[]])[0]
dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0]
dense_dists: list[float] = (dense.get("distances") or [[]])[0]
id_to_doc = dict(zip(dense_ids, dense_docs))
id_to_meta = dict(zip(dense_ids, dense_metas))
id_to_dist = dict(zip(dense_ids, dense_dists))
# Hybrid: optionally fuse with BM25. Both are pulled at pool_size,
# fused via RRF, top-k returned.
used_hybrid = False
if HYBRID_SEARCH:
bm25 = _bm25_index()
if bm25 is not None:
bm25_hits = bm25.query(query, n=pool_size, where=where)
bm25_ids = [h[0] for h in bm25_hits]
if bm25_ids:
fused = _rrf_fuse([dense_ids, bm25_ids])
fuzzy_ids = fused
used_hybrid = True
else:
fuzzy_ids = dense_ids
else:
fuzzy_ids = dense_ids
else:
fuzzy_ids = dense_ids
# Pin exact-code matches at top, then fill remainder from fuzzy
# retrieval (deduped). Pinned matches are deterministic and
# high-confidence; they should never lose to a fuzzy match.
final_ids: list[str] = []
seen: set[str] = set()
for cid in pinned_ids + fuzzy_ids:
if cid in seen:
continue
seen.add(cid)
final_ids.append(cid)
if len(final_ids) >= k:
break
# For ids returned by BM25 but not by Chroma, we need their docs/
# metadata too. Re-fetch by id.
missing = [i for i in final_ids if i not in id_to_doc]
if missing:
try:
extra = col.get(ids=missing, include=["documents", "metadatas"])
for cid, doc, meta in zip(
extra.get("ids") or [],
extra.get("documents") or [],
extra.get("metadatas") or [],
):
id_to_doc[cid] = doc
id_to_meta[cid] = meta
except Exception as exc: # noqa: BLE001
log.warning("get-by-id for BM25-only hits failed: %s", exc)
_call.set(
hits_returned=len(final_ids),
hybrid=used_hybrid,
pool_size=pool_size,
)
if not final_ids:
return (
"_(no varieties matched. Try broadening the query or "
"removing filters — call list_versions() to see what's "
"indexed.)_"
)
blocks: list[str] = []
for cid in final_ids:
doc = id_to_doc.get(cid, "")
meta = id_to_meta.get(cid, {})
dist = id_to_dist.get(cid) if not used_hybrid else None
blocks.append(_format_hit(doc, meta, dist))
header = (
f"# Search results — {len(final_ids)} variety chunk"
f"{'s' if len(final_ids) != 1 else ''}"
f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n"
f"_Use `lookup_variety(source_key=...)` on any candidate "
f"to fact-check ratings from the canonical sidecar._\n\n---\n\n"
)
return header + "\n---\n\n".join(blocks)
@mcp.tool()
def get_page(
bundle_id: Annotated[str, Field(description="Bundle slug.")],
page_id: Annotated[str, Field(description="Page filename within the bundle.")],
source: Annotated[str, Field(description=(
"Scraper source id — e.g. 'bayer_seeds'. Same as the `source` "
"field in search_docs results."
))],
source_key: Annotated[str, Field(description=(
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'. Same as "
"the `source_key` field in search_docs results."
))],
) -> str:
"""Return the full markdown for one page, plus a metadata header.
"""Return the full canonical record for one variety.
Use after search_docs surfaces a relevant page and the user (or you)
want the complete text — not just the matched chunks.
Emits a structured ratings header (identity, all characteristic
groups, vendor positioning, regional listings) sourced from the
variety's sidecar JSON, followed by the raw markdown body the
chunker indexed.
Use this when the user is comparing varieties closely or wants to
see every published rating value, not just the ones that matched a
query. The structured header is the canonical fact-check surface;
use it to validate any specific rating value before quoting it.
"""
with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
data = _read_page(bundle_id, page_id)
if data is None:
with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call:
sidecar = _read_sidecar(source, source_key)
if sidecar is None:
_call.set(found=False)
return f"Page not found: {bundle_id}/{page_id}"
md, meta = data
_call.set(found=True, page_chars=len(md))
# TODO: add a metadata header (title, version, source URL) above
# the body. Product-specific shape.
return md
return (
f"_(variety not found: source='{source}' "
f"source_key='{source_key}'. Use list_versions() to see "
f"available sources or search_docs() to find candidates.)_"
)
body = _read_markdown(source, source_key) or ""
structured = _structured_ratings_block(sidecar)
_call.set(found=True, body_chars=len(body), groups=len(sidecar.get("characteristics_groups") or []))
sep = "\n\n---\n\n## Indexed body (chunk source text)\n\n"
return structured + sep + body
@mcp.tool()
def list_versions() -> str:
"""List the available version/platform facets across all bundles.
"""List the available scraper sources and the crop/brand/vendor
facets across all indexed varieties.
Use this to discover valid filter values for search_docs.
Use this BEFORE search_docs() the first time you query, to discover
which scraper sources, brands, vendors, and crops are actually in
the corpus right now. The agent should not assume — the corpus
grows as more vendors are scraped.
"""
with TimedCall("list_versions", {}) as _call:
cat = _bundles()
if not cat:
return "_(no bundles indexed yet — run the scraper + indexer)_"
versions = sorted({b.get("version") for b in cat.values() if b.get("version")})
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")})
_call.set(versions=len(versions), platforms=len(platforms))
lines = [f"# Facets across {len(cat)} bundle(s)", ""]
if versions:
lines.append("## Versions"); lines.append("")
for v in versions: lines.append(f"- `{v}`")
facets: dict[str, dict[str, int]] = {
"source": {}, "vendor": {}, "brand": {}, "crop": {},
}
n_varieties = 0
if CORPUS.exists():
for source_dir in sorted(CORPUS.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
for sc in source_dir.glob("*.json"):
try:
d = json.loads(sc.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
n_varieties += 1
for k in facets:
v = d.get(k)
if v:
facets[k][v] = facets[k].get(v, 0) + 1
if n_varieties == 0:
return (
"_(no varieties indexed yet — run scrapers + indexer "
"before calling search_docs)_"
)
_call.set(
varieties=n_varieties,
sources=len(facets["source"]),
vendors=len(facets["vendor"]),
brands=len(facets["brand"]),
crops=len(facets["crop"]),
)
lines = [f"# Corpus facets ({n_varieties} varieties indexed)", ""]
for label, counts in facets.items():
if not counts:
continue
lines.append(f"## {label}")
lines.append("")
if platforms:
lines.append("## Platforms"); lines.append("")
for p in platforms: lines.append(f"- `{p}`")
return "\n".join(lines)
for k, n in sorted(counts.items(), key=lambda kv: -kv[1]):
lines.append(f"- `{k}` — {n} varieties")
lines.append("")
return "\n".join(lines).rstrip()
# ---------------------------------------------------------------------------
# Stubs for later phases — keep the signatures in this file so refactors
# don't lose the contracts. Implementations come per phase.
# ---------------------------------------------------------------------------
@mcp.tool()
def lookup_variety(
source_key: Annotated[str, Field(description=(
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'."
))],
source: Annotated[
str | None,
Field(description=(
"OPTIONAL scraper source ('bayer_seeds' etc). If omitted, "
"scans all sources for the source_key."
)),
] = None,
) -> str:
"""Return the canonical sidecar JSON for one variety, verbatim.
# @mcp.tool() # Phase 9
# def list_cluster(bundle_id: str, page_id: str) -> str: ...
USE THIS to fact-check any rating value before quoting it to the
farmer. The output is the exact data the scraper captured from the
vendor's published catalog — no paraphrasing, no inference.
# @mcp.tool() # Phase 9
# def diff_versions(bundle_id: str, page_id: str, against_bundle_id: str, context: int = 3) -> str: ...
Call this whenever the user (or you) needs an exact value for a
specific rating, trait, or maturity — not just a semantic match
from search_docs.
"""
with TimedCall("lookup_variety", {"source_key": source_key, "source": source}) as _call:
if source:
sidecar = _read_sidecar(source, source_key)
if sidecar is not None:
_call.set(found=True, source=source)
return "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```"
_call.set(found=False, source=source)
return f"_(variety '{source_key}' not found under source '{source}')_"
# @mcp.tool() # Phase 9
# def bundle_changelog(bundle_id_new: str, bundle_id_old: str, min_churn: int = 5, max_changed: int = 50) -> str: ...
# No source given — scan all source dirs for a match.
if not CORPUS.exists():
_call.set(found=False)
return "_(corpus is empty — no scrapers have run yet)_"
matches: list[tuple[str, dict]] = []
for source_dir in sorted(CORPUS.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
sidecar = _read_sidecar(source_dir.name, source_key)
if sidecar is not None:
matches.append((source_dir.name, sidecar))
# @mcp.tool() # Phase 13
# def weekly_digest(days: int = 7, version: str | None = None, platform: str | None = None, ...) -> str: ...
if not matches:
_call.set(found=False)
return (
f"_(no variety with source_key '{source_key}' found in any "
f"source. Use search_docs() to find the right key.)_"
)
# @mcp.tool() # Phase 9 (or 3 — useful early)
# def corpus_status() -> str: ...
_call.set(found=True, matches=len(matches))
if len(matches) == 1:
src_name, sidecar = matches[0]
return (
f"_(matched in source `{src_name}`)_\n\n"
"```json\n"
+ json.dumps(sidecar, indent=2, ensure_ascii=False)
+ "\n```"
)
# @mcp.tool() # Phase 11
# def myproduct_api_lessons(topic: str | None = None) -> str: ...
# @mcp.tool() # Phase 12
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ...
# @mcp.tool() # Phase 12
# def submit_doc_bug(page_url: str, content: str, email: str | None = None, ...) -> str: ...
# Multi-match: serialize each labeled.
out: list[str] = [f"_(matched {len(matches)} sources)_\n"]
for src_name, sidecar in matches:
out.append(f"### source: `{src_name}`")
out.append("```json")
out.append(json.dumps(sidecar, indent=2, ensure_ascii=False))
out.append("```")
return "\n".join(out)
# ===========================================================================
@@ -252,8 +729,6 @@ def main() -> None:
else:
mcp.settings.host = args.host
mcp.settings.port = args.port
# DNS-rebinding protection defaults to localhost-only — disable for
# container-network DNS hostnames. See PLAN.md "Hosting" notes.
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
mcp.settings.transport_security.enable_dns_rebinding_protection = False
mcp.run(transport=args.transport)