Files
seed-mcp/docs_mcp/server.py
T
justin ac40e05734
Image rebuild (skip scrape) / build (push) Failing after 7s
seed-mcp scaffold: clone docs-mcp-template, customize for crop_seed PRODUCT_NAME
Sibling project to crop-chem-docs, same MCP-template lineage. Corpus is
seed/hybrid varieties across 6 vendors instead of pesticide labels.

What's customized vs. the template:
- CLAUDE.md: vendor matrix, build priority, Pioneer fallback policy,
  canonical sidecar schema (per-crop), Golden Harvest disease-scale
  reversal gotcha, no-IPv6 / HTTPS-clone note
- README.md: vendor coverage table, tool list, phase status
- Dockerfile: PRODUCT_NAME=crop_seed default, sources.json (not
  bundles.json), HYBRID_SEARCH=true, OLLAMA_URL + RERANK_URL Docker
  DNS defaults (same llama-rerank sidecar as crop-chem-docs)
- .gitea/workflows/refresh.yml: monthly cron (seed catalogs move
  slowly), 5 GREEN scraper steps, corpus-YYYY.MM.DD tag for Drawbar
  pinning, continue-on-error on GC step
- .gitea/workflows/image-only.yml: paths filter + cancel-in-progress
  concurrency group
- scripts/registry_gc.py: lifted from crop-chem-docs (correct Gitea
  packages API URL + UA header to bypass CF block on default
  Python-urllib UA)
- sources.json: catalog of 6 sources + scope_filter + per-source
  schema notes + Pioneer-exclusion rationale
- scrape/runner.py: dispatcher with --all = GREEN-only
- scrape/sources/{bayer_seeds,golden_harvest,nk,agripro,becks_pfr,
  becks_products}.py: stub modules with implementation notes
- docs_mcp/server.py: PRODUCT_NAME default → crop_seed,
  PRODUCT_DOCS_URL → repo URL

Pioneer is intentionally NOT a source. ToS bans automation; dealer
locator is login-gated. The MCP returns a curated fallback lesson
directing the user to pioneer.com.

Next phases:
- Phase 1: implement bayer_seeds (lift-and-shift from crop-chem-docs
  Bayer scraper; same __NEXT_DATA__ infra)
- Phase 7: curate eval/queries.jsonl
- Phase 11: lessons.md with Pioneer fallback + disease-scale notes

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 12:28:49 -04:00

264 lines
11 KiB
Python

"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies.
This file is the template's structural anchor. The phases described in
PLAN.md add or extend pieces of this file:
Phase 3 — search_docs, get_page, list_versions stubs (you are here)
Phase 6 — reranker integration in search_docs
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse)
Phase 9 — diff_versions, list_cluster, bundle_changelog
Phase 10 — TimedCall wiring (already imported below)
Phase 11 — <product>_api_lessons tool
Phase 12 — find_doc_inconsistencies, submit_doc_bug
Phase 13 — weekly_digest + _digest_history reader
Every stub below has a docstring + `raise NotImplementedError`. Replace
the body when you reach the corresponding phase. Keep the signatures
stable across products — clients depend on them.
"""
from __future__ import annotations
import json
import logging
import os
import re
from pathlib import Path
from typing import Annotated
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from .usage import TimedCall
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Product-specific configuration. Set these for each new build.
# ---------------------------------------------------------------------------
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp")
COLLECTION = f"{PRODUCT_NAME}_docs"
# Paths inside the deployed container (and matching layout locally for dev).
ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma"
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
BUNDLES_JSON = ROOT / "bundles.json"
# ---------------------------------------------------------------------------
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase).
# ---------------------------------------------------------------------------
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
RRF_K = int(os.environ.get("RRF_K", "60"))
DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on")
DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint
DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15"))
# ---------------------------------------------------------------------------
# FastMCP setup.
#
# stateless_http=True — every request creates an ephemeral session and
# discards it on return. Critical for production: clients don't get
# 404 storms when the container is recreated by Watchtower.
# ---------------------------------------------------------------------------
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
# ---------------------------------------------------------------------------
# Lazy helpers — instantiate expensive things only when actually needed,
# so the server still starts when (e.g.) Ollama is briefly unreachable.
# ---------------------------------------------------------------------------
def _bundles() -> dict[str, dict]:
"""Cached load of bundles.json into a {slug: bundle_dict} mapping.
bundles.json is the product-specific catalog written by the Phase 1
scraper. See PLAN.md Phase 1 for the schema.
"""
if not BUNDLES_JSON.exists():
return {}
cat = json.loads(BUNDLES_JSON.read_text())
return {b["slug"]: b for b in cat}
def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
"""Translate filter args into a Chroma `where` clause."""
conds: list[dict] = []
if version:
conds.append({"version": version})
if platform:
conds.append({"platform": platform})
if bundle_id:
conds.append({"bundle_id": bundle_id})
if not conds:
return None
if len(conds) == 1:
return conds[0]
return {"$and": conds}
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict)."""
md_path = CORPUS / bundle_id / (page_id + ".md")
json_path = CORPUS / bundle_id / (page_id + ".json")
if not md_path.exists() or not json_path.exists():
return None
return md_path.read_text(), json.loads(json_path.read_text())
# ===========================================================================
# Tools
# ===========================================================================
@mcp.tool()
def search_docs(
query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")],
version: Annotated[
str | None,
Field(description="OPTIONAL version filter — restrict to one product version."),
] = None,
platform: Annotated[
str | None,
Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."),
] = None,
bundle_id: Annotated[
str | None,
Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."),
] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str:
"""Search the {product} docs corpus.
Returns the top-k most relevant chunks (with full source page URLs)
given a natural-language query. Optional filters narrow the search
to one version, one platform, or one bundle. Use list_versions()
first if you need to discover the available facet values.
Call this tool whenever the user asks anything that should be
answerable from the official product documentation.
"""
with TimedCall("search_docs", {
"query": query, "version": version, "platform": platform,
"bundle_id": bundle_id, "k": k,
}) as _call:
# TODO Phase 2-3: query Chroma collection (see rag/index.py for
# how it was built). Render the top-k chunks as markdown with
# source URLs.
# TODO Phase 6: optional reranker via _rerank() if RERANK_URL set.
# TODO Phase 8: hybrid retrieval if HYBRID_SEARCH=true — run
# dense + BM25 in parallel, RRF-fuse, hand merged pool to rerank.
_call.set(hits_returned=0)
raise NotImplementedError("Phase 2/3: implement Chroma query + rendering")
@mcp.tool()
def get_page(
bundle_id: Annotated[str, Field(description="Bundle slug.")],
page_id: Annotated[str, Field(description="Page filename within the bundle.")],
) -> str:
"""Return the full markdown for one page, plus a metadata header.
Use after search_docs surfaces a relevant page and the user (or you)
want the complete text — not just the matched chunks.
"""
with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
data = _read_page(bundle_id, page_id)
if data is None:
_call.set(found=False)
return f"Page not found: {bundle_id}/{page_id}"
md, meta = data
_call.set(found=True, page_chars=len(md))
# TODO: add a metadata header (title, version, source URL) above
# the body. Product-specific shape.
return md
@mcp.tool()
def list_versions() -> str:
"""List the available version/platform facets across all bundles.
Use this to discover valid filter values for search_docs.
"""
with TimedCall("list_versions", {}) as _call:
cat = _bundles()
if not cat:
return "_(no bundles indexed yet — run the scraper + indexer)_"
versions = sorted({b.get("version") for b in cat.values() if b.get("version")})
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")})
_call.set(versions=len(versions), platforms=len(platforms))
lines = [f"# Facets across {len(cat)} bundle(s)", ""]
if versions:
lines.append("## Versions"); lines.append("")
for v in versions: lines.append(f"- `{v}`")
lines.append("")
if platforms:
lines.append("## Platforms"); lines.append("")
for p in platforms: lines.append(f"- `{p}`")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Stubs for later phases — keep the signatures in this file so refactors
# don't lose the contracts. Implementations come per phase.
# ---------------------------------------------------------------------------
# @mcp.tool() # Phase 9
# def list_cluster(bundle_id: str, page_id: str) -> str: ...
# @mcp.tool() # Phase 9
# def diff_versions(bundle_id: str, page_id: str, against_bundle_id: str, context: int = 3) -> str: ...
# @mcp.tool() # Phase 9
# def bundle_changelog(bundle_id_new: str, bundle_id_old: str, min_churn: int = 5, max_changed: int = 50) -> str: ...
# @mcp.tool() # Phase 13
# def weekly_digest(days: int = 7, version: str | None = None, platform: str | None = None, ...) -> str: ...
# @mcp.tool() # Phase 9 (or 3 — useful early)
# def corpus_status() -> str: ...
# @mcp.tool() # Phase 11
# def myproduct_api_lessons(topic: str | None = None) -> str: ...
# @mcp.tool() # Phase 12
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ...
# @mcp.tool() # Phase 12
# def submit_doc_bug(page_url: str, content: str, email: str | None = None, ...) -> str: ...
# ===========================================================================
# Entry point
# ===========================================================================
def main() -> None:
import argparse
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
default=os.environ.get("MCP_TRANSPORT", "stdio"))
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
args = p.parse_args()
if args.transport == "stdio":
mcp.run()
else:
mcp.settings.host = args.host
mcp.settings.port = args.port
# DNS-rebinding protection defaults to localhost-only — disable for
# container-network DNS hostnames. See PLAN.md "Hosting" notes.
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
mcp.settings.transport_security.enable_dns_rebinding_protection = False
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()