Files
crop-chem-docs/docs_mcp/server.py
T
justin 1a45280e45 rename: ppls-docs → crop-chem-docs
Repo/project rename to better reflect scope. PPLS is EPA's term for
their Pesticide Product Label System — accurate when the corpus was
EPA-only, narrow now that it also pulls from Bayer's own catalog
(and may expand to Syngenta/Corteva/BASF/FMC labels in the future).
crop-chem-docs scopes flexibly without acronyms to explain.

Renames:
- directory:           ppls-docs            → crop-chem-docs
- PRODUCT_NAME:        ppls                 → crop_chem
- Chroma collection:   ppls_docs            → crop_chem_docs  (in-place via .modify(), no re-embed)
- BM25 db:             bm25/ppls_docs.db    → bm25/crop_chem_docs.db
- MCP tool name:       ppls_api_lessons     → crop_chem_api_lessons
- FastMCP server name: ppls-docs            → crop-chem-docs
- Env vars:            PPLS_CORPUS_ROOT     → CORPUS_ROOT
                       PPLS_CHROMA_DIR      → CHROMA_DIR_OVERRIDE
- User-Agent:          ppls-docs-scraper    → crop-chem-docs-scraper

Preserved (intentional, correct):
- epa_ppls (source id) — refers specifically to EPA's PPLS database
- "EPA PPLS" mentions in regulatory text (lessons.md, server docstrings)
- PPLS_API_BASE / PPLS_PDF_BASE / PPLS_INDEX_URL_TEMPLATE in
  scrape/sources/epa_ppls.py — these point at EPA's actual endpoints

Memory entries get updated in a follow-up commit so the rename is
isolated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 12:25:59 -04:00

732 lines
29 KiB
Python

"""MCP server for the crop-chem-docs pesticide label corpus.
Adapted from the docs-mcp-template (which targeted versioned software
docs) for the EPA pesticide-labels domain: ``bundle_id`` → ``source``,
``page_id`` → ``source_key`` (slug for MFRs, EPA Reg No for EPA PPLS),
and ``version``/``platform`` filters → product-class / registrant /
signal-word filters. See ``scrape/README.md`` for the corpus schema.
Phase progression in this file:
Phase 3 — search_docs, get_page, list_versions, corpus_status (you are here)
Phase 6 — reranker integration in search_docs
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate)
Standard MCP tool names (search_docs / get_page / list_versions) are
preserved so clients that expect a docs MCP shape still work; the
docstrings make the labels-domain semantics explicit.
"""
from __future__ import annotations
import json
import logging
import os
import re
from pathlib import Path
from typing import Annotated
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from .usage import TimedCall
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Product configuration.
# ---------------------------------------------------------------------------
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_chem")
PRODUCT_DOCS_URL = os.environ.get(
"PRODUCT_DOCS_URL",
"https://ordspub.epa.gov/ords/pesticides/f?p=PPLS:1",
)
COLLECTION = f"{PRODUCT_NAME}_docs"
# Paths — corpus on (possibly) external storage, indexes always at repo root.
REPO_ROOT = Path(__file__).resolve().parent.parent
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
CHROMA_DIR = Path(os.environ.get("CHROMA_DIR_OVERRIDE") or REPO_ROOT / "chroma")
BM25_DB = Path(os.environ.get("BM25_DB",
str(REPO_ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
SOURCES_JSON = REPO_ROOT / "sources.json"
# ---------------------------------------------------------------------------
# Feature flags (enabled in later phases).
# ---------------------------------------------------------------------------
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
RRF_K = int(os.environ.get("RRF_K", "60"))
# ---------------------------------------------------------------------------
# FastMCP setup.
# ---------------------------------------------------------------------------
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
# ---------------------------------------------------------------------------
# Lazy helpers.
# ---------------------------------------------------------------------------
_chroma_collection = None
_sources_cache: dict[str, dict] | None = None
def _sources() -> dict[str, dict]:
"""Load sources.json as {source_id: source_dict}."""
global _sources_cache
if _sources_cache is not None:
return _sources_cache
if not SOURCES_JSON.exists():
_sources_cache = {}
return _sources_cache
try:
items = json.loads(SOURCES_JSON.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
log.warning("sources.json unreadable: %s", exc)
items = []
_sources_cache = {s["id"]: s for s in items if "id" in s}
return _sources_cache
def _collection():
"""Get the Chroma collection (lazy — only loads the embedder when first
queried, so the server starts cleanly even if Ollama is briefly down)."""
global _chroma_collection
if _chroma_collection is not None:
return _chroma_collection
import chromadb
from chromadb.config import Settings
from rag.embeddings import embedding_function
client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
_chroma_collection = client.get_collection(
COLLECTION, embedding_function=embedding_function()
)
return _chroma_collection
def _build_where(
source: str | None,
product_class: str | None,
registrant_contains: str | None,
signal_word: str | None,
epa_reg_no: str | None,
) -> dict | None:
"""Translate filter args into a Chroma `where` clause.
Chroma's where supports exact-match per field (and $and/$or). For
`registrant_contains` we can only do exact equality at the where level,
so substring matching is applied post-query in Python.
"""
conds: list[dict] = []
if source:
conds.append({"source": source})
if product_class:
conds.append({"product_class": product_class})
if signal_word:
conds.append({"signal_word": signal_word})
if epa_reg_no:
conds.append({"epa_reg_no": epa_reg_no})
if not conds:
return None
if len(conds) == 1:
return conds[0]
return {"$and": conds}
def _read_label(source: str, source_key: str) -> tuple[str, dict] | None:
"""Read a label off disk. Returns (markdown_body, metadata_dict) or None."""
md_path = CORPUS_ROOT / source / f"{source_key}.md"
json_path = CORPUS_ROOT / source / f"{source_key}.json"
if not md_path.exists() or not json_path.exists():
return None
try:
return md_path.read_text(encoding="utf-8"), json.loads(
json_path.read_text(encoding="utf-8")
)
except (OSError, json.JSONDecodeError):
return None
def _format_hit(doc: str, meta: dict, score: float) -> str:
"""Render one search hit as a markdown block."""
product = meta.get("product_name") or meta.get("source_key") or "(unknown)"
reg = meta.get("epa_reg_no") or "—"
registrant = meta.get("registrant") or ""
actives = meta.get("active_ingredients") or ""
pclass = meta.get("product_class") or ""
signal = meta.get("signal_word") or ""
section = meta.get("section") or ""
source = meta.get("source") or "?"
source_key = meta.get("source_key") or "?"
label_url = meta.get("label_url") or ""
header = (
f"### {product} (EPA Reg {reg}) · score={score:.3f}\n"
f"- **Source:** `{source}/{source_key}`"
+ (f" · class: {pclass}" if pclass else "")
+ (f" · signal: {signal}" if signal else "")
+ (f" · section: {section}" if section else "")
+ "\n"
+ (f"- **Registrant:** {registrant}\n" if registrant else "")
+ (f"- **Active ingredients:** {actives}\n" if actives else "")
+ (f"- **Label PDF:** {label_url}\n" if label_url else "")
)
return header + "\n" + doc.strip() + "\n"
# ===========================================================================
# Tools
# ===========================================================================
@mcp.tool()
def search_docs(
query: Annotated[
str,
Field(description="Natural-language query about pesticide labels — "
"products, crops, pests, application rates, REI/PHI, "
"tank-mix restrictions, signal words, active ingredients."),
],
source: Annotated[
str | None,
Field(description="OPTIONAL source id to restrict the search (e.g. "
"'bayer', 'epa_ppls'). Use list_versions() to discover "
"available sources."),
] = None,
product_class: Annotated[
str | None,
Field(description="OPTIONAL product class filter: 'herbicide', "
"'fungicide', 'insecticide', 'seed-treatment'. "
"Often null for EPA PPLS records."),
] = None,
registrant_contains: Annotated[
str | None,
Field(description="OPTIONAL substring of the registrant company name "
"(case-insensitive). Use to scope to a manufacturer "
"(e.g., 'SYNGENTA', 'BAYER', 'CORTEVA')."),
] = None,
signal_word: Annotated[
str | None,
Field(description="OPTIONAL EPA signal word filter: 'Danger', 'Warning', "
"'Caution', or 'No Signal Word'."),
] = None,
epa_reg_no: Annotated[
str | None,
Field(description="OPTIONAL exact EPA Registration Number (e.g. "
"'524-591', '524-475-12345'). Narrows to chunks from "
"just that registration."),
] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str:
"""Search the EPA / manufacturer pesticide-label corpus.
Returns the top-k most relevant label chunks for a natural-language
query. Each hit shows product name, EPA Reg No, registrant, signal
word, active ingredients, and a link to the source PDF.
Call this proactively whenever the user asks anything that should
be answerable from a pesticide product label — application rates,
target pests, target crops, re-entry intervals (REI), pre-harvest
intervals (PHI), tank-mix restrictions, signal words, environmental
hazards, storage requirements, etc.
The corpus is scoped to US row crops (corn / soybeans / wheat).
For products outside that scope, results will be empty or marginal.
"""
with TimedCall("search_docs", {
"query": query, "source": source, "product_class": product_class,
"registrant_contains": registrant_contains, "signal_word": signal_word,
"epa_reg_no": epa_reg_no, "k": k,
"hybrid": HYBRID_SEARCH, "rerank": bool(RERANK_URL),
}) as _call:
try:
col = _collection()
except Exception as exc: # noqa: BLE001
_call.set(hits_returned=0, error=str(exc))
return f"_(search backend unavailable: {exc})_"
where = _build_where(source, product_class, registrant_contains,
signal_word, epa_reg_no)
# Over-fetch — we need a meaningful pool for fusion/reranking,
# and registrant_contains filtering trims down post-query.
pool = max(k * (5 if (HYBRID_SEARCH or RERANK_URL) else 2),
k * (4 if registrant_contains else 2))
scored: list[tuple[str, dict, float]] = []
try:
scored = _search_chunks(query, pool, where, registrant_contains)
except Exception as exc: # noqa: BLE001
_call.set(hits_returned=0, error=str(exc))
return f"_(search failed: {exc})_"
# Optionally rerank the pool (Phase 6) before truncating to k.
if RERANK_URL and len(scored) > 1:
try:
scored = _rerank_pool(query, scored)
except Exception as exc: # noqa: BLE001
log.warning("rerank failed (%s) — falling back to base order", exc)
scored = scored[:k]
_call.set(hits_returned=len(scored))
if not scored:
return "_(no results — try broadening the query, dropping filters, or check list_versions() for valid sources/classes)_"
mode = "hybrid-rrf+rerank" if (HYBRID_SEARCH and RERANK_URL) else \
"hybrid-rrf" if HYBRID_SEARCH else \
"dense+rerank" if RERANK_URL else "dense"
out: list[str] = [
f"# Search results for {query!r} ({len(scored)} hits, mode={mode})",
"",
]
for doc, meta, score in scored:
out.append(_format_hit(doc, meta, score))
return "\n".join(out)
def _search_chunks(
query: str,
pool: int,
where: dict | None,
registrant_contains: str | None,
) -> list[tuple[str, dict, float]]:
"""Run dense (and optionally BM25-hybrid) chunk retrieval, return
list of (doc_text, metadata, score) sorted by score descending.
Filters by ``registrant_contains`` post-query."""
col = _collection()
# --- dense (Chroma) ----------------------------------------------------
dense_res = col.query(query_texts=[query], n_results=pool, where=where)
dense_ids = dense_res.get("ids", [[]])[0]
dense_docs = dense_res.get("documents", [[]])[0]
dense_metas = dense_res.get("metadatas", [[]])[0]
dense_dists = dense_res.get("distances", [[]])[0]
chunk_pool: dict[str, dict] = {}
for cid, doc, meta, dist in zip(dense_ids, dense_docs, dense_metas, dense_dists):
chunk_pool[cid] = {
"doc": doc, "meta": meta or {},
"dense_sim": max(0.0, 1.0 - float(dist)),
"dense_rank": None, "bm25_rank": None,
}
for rank, cid in enumerate(dense_ids, start=1):
chunk_pool[cid]["dense_rank"] = rank
# --- BM25 (Phase 8 hybrid) --------------------------------------------
if HYBRID_SEARCH:
try:
from rag.bm25 import BM25Index
bm25 = BM25Index(BM25_DB)
bm25_hits = bm25.query(query, n=pool)
except Exception as exc: # noqa: BLE001
log.warning("bm25 query failed (%s) — dense-only this call", exc)
bm25_hits = []
missing_ids = [cid for cid, _ in bm25_hits if cid not in chunk_pool]
if missing_ids:
got = col.get(ids=missing_ids,
include=["documents", "metadatas"])
for cid, doc, meta in zip(got.get("ids", []), got.get("documents", []),
got.get("metadatas", [])):
chunk_pool[cid] = {
"doc": doc, "meta": meta or {},
"dense_sim": 0.0, "dense_rank": None, "bm25_rank": None,
}
for rank, (cid, _bm25_score) in enumerate(bm25_hits, start=1):
if cid in chunk_pool:
chunk_pool[cid]["bm25_rank"] = rank
# --- RRF fusion or dense-only score -----------------------------------
out: list[tuple[str, dict, float]] = []
for cid, info in chunk_pool.items():
meta = info["meta"]
if registrant_contains:
reg = (meta.get("registrant") or "").upper()
if registrant_contains.upper() not in reg:
continue
if HYBRID_SEARCH:
rrf = 0.0
if info["dense_rank"]:
rrf += 1.0 / (RRF_K + info["dense_rank"])
if info["bm25_rank"]:
rrf += 1.0 / (RRF_K + info["bm25_rank"])
score = rrf
else:
score = info["dense_sim"]
out.append((info["doc"], meta, score))
out.sort(key=lambda x: -x[2])
return out
def _rerank_pool(
query: str,
pool: list[tuple[str, dict, float]],
) -> list[tuple[str, dict, float]]:
"""Send (query, doc_text) pairs to a llama.cpp /v1/rerank endpoint
and reorder by relevance score. Truncates docs to 2000 chars (the
jina-reranker GGUF rejects the ENTIRE batch if any pair exceeds
n_ctx_train=1024; full text still goes back to the user)."""
import httpx
docs_truncated = [d[:2000] for d, _meta, _s in pool[:RERANK_POOL]]
if not docs_truncated:
return pool
r = httpx.post(
f"{RERANK_URL}/v1/rerank",
json={"query": query, "documents": docs_truncated},
timeout=RERANK_TIMEOUT,
)
r.raise_for_status()
data = r.json()
results = data.get("results") or []
rescored: list[tuple[str, dict, float]] = []
for r_item in results:
idx = r_item.get("index")
score = r_item.get("relevance_score") or r_item.get("score") or 0.0
if isinstance(idx, int) and 0 <= idx < len(pool):
doc, meta, _ = pool[idx]
rescored.append((doc, meta, float(score)))
rescored.sort(key=lambda x: -x[2])
# Anything in the original pool past RERANK_POOL stays at the tail
# in original order (rare — we usually rerank the entire pool).
seen = {id(item) for item in rescored}
tail = [p for p in pool[RERANK_POOL:] if id(p) not in seen]
return rescored + tail
@mcp.tool()
def get_page(
source: Annotated[
str,
Field(description="Source id (e.g. 'bayer', 'epa_ppls'). See "
"list_versions()."),
],
source_key: Annotated[
str,
Field(description="Per-source primary key — a product slug for "
"manufacturer sources ('warrant', 'huskie') or an "
"EPA Reg No for EPA PPLS ('524-475')."),
],
) -> str:
"""Return the full markdown of one pesticide label, with metadata header.
Use this after search_docs surfaces a relevant label and you (or the
user) want the complete text — not just the matched chunks. Useful
when answering nuanced questions about a specific product's
directions, restrictions, or tank-mix table.
"""
with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call:
data = _read_label(source, source_key)
if data is None:
_call.set(found=False)
return f"Label not found: {source}/{source_key}"
md, meta = data
_call.set(found=True, label_chars=len(md))
label = meta.get("label") or {}
actives_list = [
a["name"] for a in (meta.get("active_ingredients") or [])
if isinstance(a, dict) and a.get("name")
]
header_lines = [
f"# {meta.get('product_name') or source_key}",
"",
f"- **EPA Reg No:** {meta.get('epa_reg_no') or '(unknown)'}",
f"- **Source:** {source}/{source_key}",
]
if meta.get("registrant"):
header_lines.append(f"- **Registrant:** {meta['registrant']}")
if meta.get("product_class"):
header_lines.append(f"- **Product class:** {meta['product_class']}")
if meta.get("signal_word"):
header_lines.append(f"- **Signal word:** {meta['signal_word']}")
if actives_list:
header_lines.append(f"- **Active ingredients:** {', '.join(actives_list)}")
if label.get("accepted_date"):
header_lines.append(f"- **Label accepted:** {label['accepted_date']}")
if label.get("url"):
header_lines.append(f"- **Label PDF:** {label['url']}")
header_lines.extend(["", "---", ""])
return "\n".join(header_lines) + md
@mcp.tool()
def list_versions() -> str:
"""List the available sources, product classes, and registrants in the corpus.
Use this to discover valid filter values for search_docs. The corpus
is scoped to US row-crop pesticide labels (corn / soybeans / wheat).
Despite the name (preserved for MCP-client compatibility), this
returns labels-domain facets — not software-version facets.
"""
with TimedCall("list_versions", {}) as _call:
cat = _sources()
# Source-level summary from sources.json
lines: list[str] = ["# crop-chem-docs corpus"]
# Live counts from Chroma (best-effort; the server should still
# render a useful response if Chroma is unreachable)
chunk_count = label_count = None
try:
col = _collection()
chunk_count = col.count()
except Exception: # noqa: BLE001
pass
if CORPUS_ROOT.exists():
label_count = sum(
1 for p in CORPUS_ROOT.glob("*/*.json")
if not p.name.startswith(".")
)
if chunk_count is not None or label_count is not None:
lines.append("")
if label_count is not None:
lines.append(f"- **Labels indexed:** {label_count:,}")
if chunk_count is not None:
lines.append(f"- **Chunks indexed:** {chunk_count:,}")
if cat:
lines.append("\n## Sources\n")
for sid, s in sorted(cat.items()):
title = s.get("title") or sid
stype = s.get("type") or ""
lines.append(f"- `{sid}` *({stype})* — {title}")
if s.get("scope_filter"):
lines.append(f" - scope: {s['scope_filter']}")
else:
lines.append("\n_(sources.json missing — corpus may not be initialized)_")
# Per-source facets if Chroma is reachable
try:
col = _collection()
# We can't enumerate distinct metadata values from Chroma directly;
# walk a sample to discover them. ~50K sample is fine for our
# ~200K-chunk corpus and keeps this tool fast.
sample = col.get(limit=50000, include=["metadatas"])
metas = sample.get("metadatas") or []
classes = sorted({m.get("product_class") for m in metas if m.get("product_class")})
signals = sorted({m.get("signal_word") for m in metas if m.get("signal_word")})
registrants = sorted({m.get("registrant") for m in metas if m.get("registrant")})
_call.set(sources=len(cat), classes=len(classes),
signals=len(signals), registrants=len(registrants))
if classes:
lines.append("\n## Product classes\n")
for c in classes:
lines.append(f"- `{c}`")
if signals:
lines.append("\n## Signal words\n")
for s in signals:
lines.append(f"- `{s}`")
if registrants:
lines.append(f"\n## Registrants ({len(registrants)})\n")
for r in registrants[:50]:
lines.append(f"- {r}")
if len(registrants) > 50:
lines.append(f"- _(…{len(registrants)-50} more)_")
except Exception as exc: # noqa: BLE001
log.debug("could not sample Chroma metadata: %s", exc)
_call.set(sources=len(cat), classes=0, signals=0, registrants=0)
return "\n".join(lines)
@mcp.tool()
def corpus_status() -> str:
"""Report counts + freshness of the indexed label corpus.
Use to confirm the search backend is healthy, see how many labels are
indexed, and check which sources are currently feeding the corpus.
Cheap — no embedder call.
"""
with TimedCall("corpus_status", {}) as _call:
lines: list[str] = ["# PPLS corpus status\n"]
# On-disk corpus
labels_by_source: dict[str, int] = {}
if CORPUS_ROOT.exists():
for source_dir in sorted(CORPUS_ROOT.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
n = sum(1 for _ in source_dir.glob("*.json"))
if n:
labels_by_source[source_dir.name] = n
else:
lines.append(f"_(corpus root {CORPUS_ROOT} doesn't exist)_")
_call.set(labels=0, chunks=0, sources=0)
return "\n".join(lines)
total_labels = sum(labels_by_source.values())
lines.append(f"- **Corpus root:** `{CORPUS_ROOT}`")
lines.append(f"- **Total labels on disk:** {total_labels:,}")
# Chroma
try:
col = _collection()
chunks = col.count()
lines.append(f"- **Chunks in Chroma:** {chunks:,}")
lines.append(f"- **Chroma dir:** `{CHROMA_DIR}`")
lines.append(f"- **Collection:** `{COLLECTION}`")
except Exception as exc: # noqa: BLE001
chunks = 0
lines.append(f"- **Chroma:** _unavailable_ ({exc})")
# BM25
if BM25_DB.exists():
lines.append(f"- **BM25 db:** `{BM25_DB}` ({BM25_DB.stat().st_size / 1024 / 1024:.0f} MB)")
else:
lines.append("- **BM25 db:** _not built_")
if labels_by_source:
lines.append("\n## Labels per source\n")
for src, n in sorted(labels_by_source.items(), key=lambda kv: -kv[1]):
lines.append(f"- `{src}`: {n:,} labels")
# Active feature flags
flags = []
if RERANK_URL:
flags.append(f"RERANK_URL=`{RERANK_URL}`")
if HYBRID_SEARCH:
flags.append("HYBRID_SEARCH=on")
if flags:
lines.append("\n## Active feature flags\n")
for f in flags:
lines.append(f"- {f}")
_call.set(labels=total_labels, chunks=chunks, sources=len(labels_by_source))
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Phase 11 — Curated agronomy / label-handling knowledge
# ---------------------------------------------------------------------------
LESSONS_PATH = Path(__file__).resolve().parent / "lessons.md"
_lessons_cache: tuple[str, list[tuple[str, str]]] | None = None # (full, sections)
def _load_lessons() -> tuple[str, list[tuple[str, str]]]:
"""Read lessons.md once, split into (topic_slug, body) sections."""
global _lessons_cache
if _lessons_cache is not None:
return _lessons_cache
if not LESSONS_PATH.exists():
_lessons_cache = ("", [])
return _lessons_cache
full = LESSONS_PATH.read_text(encoding="utf-8")
sections: list[tuple[str, str]] = []
# Split on lines like "## Topic: <slug>" (case-sensitive marker)
parts = re.split(r"^## Topic:\s+(\S+)\s*$", full, flags=re.MULTILINE)
# parts = [preamble, slug1, body1, slug2, body2, ...]
for i in range(1, len(parts), 2):
slug = parts[i].strip()
body = parts[i + 1].strip() if i + 1 < len(parts) else ""
sections.append((slug, body))
_lessons_cache = (full, sections)
return _lessons_cache
@mcp.tool()
def crop_chem_api_lessons(
topic: Annotated[
str | None,
Field(description="OPTIONAL: topic slug or substring (e.g., "
"'rup-handling', 'dicamba', 'rei'). Omit to get "
"the full table of contents."),
] = None,
) -> str:
"""Surface curated agronomy + label-handling knowledge that supplements
the raw label corpus.
**Call this proactively whenever you're about to give a pesticide
recommendation from search_docs results.** The lessons cover:
EPA signal words, REI/PHI fundamentals, RUP handling, 2(ee)/24(c)
supplemental labels, tank-mix and resistance-management
fundamentals (HRAC/FRAC/IRAC groups), product-specific application
rules (glufosinate, dicamba), Lake Erie watershed considerations
for Ohio, SCN context for soybean, drift management, and the
canonical recommendation format the farmer expects.
Without these lessons, your recommendations risk being technically
correct but missing the regulatory framing, resistance group
callouts, RUP applicator requirements, or off-target-damage
warnings that make them actionable. Call this first; cite specific
lessons in your response.
"""
with TimedCall("crop_chem_api_lessons", {"topic": topic}) as _call:
full, sections = _load_lessons()
if not sections:
_call.set(sections=0)
return "_(lessons.md not found — Phase 11 knowledge layer not populated)_"
if not topic:
_call.set(sections=len(sections), returned="toc")
toc_lines = [
"# Crop-Chem API lessons — table of contents",
"",
f"Call `crop_chem_api_lessons(topic='<slug>')` to fetch a specific section.",
"",
]
for slug, body in sections:
# First non-blank, non-list line as the headline summary
summary = ""
for line in body.splitlines():
s = line.strip()
if s and not s.startswith(("-", "*", "|", "```")):
summary = s[:140]
break
toc_lines.append(f"- **`{slug}`** — {summary}")
return "\n".join(toc_lines)
topic_lc = topic.lower()
matched = [(slug, body) for slug, body in sections if topic_lc in slug.lower()]
if not matched:
_call.set(sections=0, returned="no-match")
available = ", ".join(f"`{s}`" for s, _ in sections)
return (f"_(no lesson matched topic `{topic}`. Available: {available})_")
_call.set(sections=len(matched), returned="match")
out: list[str] = []
for slug, body in matched:
out.append(f"## Topic: {slug}\n\n{body}")
return "\n\n---\n\n".join(out)
# ---------------------------------------------------------------------------
# Stubs for later phases
# ---------------------------------------------------------------------------
# @mcp.tool() # Phase 12
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ...
# ===========================================================================
# Entry point
# ===========================================================================
def main() -> None:
import argparse
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
default=os.environ.get("MCP_TRANSPORT", "stdio"))
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
args = p.parse_args()
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
if args.transport == "stdio":
mcp.run()
else:
mcp.settings.host = args.host
mcp.settings.port = args.port
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
mcp.settings.transport_security.enable_dns_rebinding_protection = False
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()