"""MCP server for the ppls-docs pesticide label corpus. Adapted from the docs-mcp-template (which targeted versioned software docs) for the EPA pesticide-labels domain: ``bundle_id`` → ``source``, ``page_id`` → ``source_key`` (slug for MFRs, EPA Reg No for EPA PPLS), and ``version``/``platform`` filters → product-class / registrant / signal-word filters. See ``scrape/README.md`` for the corpus schema. Phase progression in this file: Phase 3 — search_docs, get_page, list_versions, corpus_status (you are here) Phase 6 — reranker integration in search_docs Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate) Standard MCP tool names (search_docs / get_page / list_versions) are preserved so clients that expect a docs MCP shape still work; the docstrings make the labels-domain semantics explicit. """ from __future__ import annotations import json import logging import os import re from pathlib import Path from typing import Annotated from mcp.server.fastmcp import FastMCP from pydantic import Field from .usage import TimedCall log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Product configuration. # --------------------------------------------------------------------------- PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "ppls") PRODUCT_DOCS_URL = os.environ.get( "PRODUCT_DOCS_URL", "https://ordspub.epa.gov/ords/pesticides/f?p=PPLS:1", ) COLLECTION = f"{PRODUCT_NAME}_docs" # Paths — corpus on (possibly) external storage, indexes always at repo root. REPO_ROOT = Path(__file__).resolve().parent.parent CORPUS_ROOT = Path(os.environ.get("PPLS_CORPUS_ROOT") or REPO_ROOT / "corpus") CHROMA_DIR = Path(os.environ.get("PPLS_CHROMA_DIR") or REPO_ROOT / "chroma") BM25_DB = Path(os.environ.get("BM25_DB", str(REPO_ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db"))) SOURCES_JSON = REPO_ROOT / "sources.json" # --------------------------------------------------------------------------- # Feature flags (enabled in later phases). # --------------------------------------------------------------------------- RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None RERANK_POOL = int(os.environ.get("RERANK_POOL", "50")) RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30")) HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on") RRF_K = int(os.environ.get("RRF_K", "60")) # --------------------------------------------------------------------------- # FastMCP setup. # --------------------------------------------------------------------------- mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True) # --------------------------------------------------------------------------- # Lazy helpers. # --------------------------------------------------------------------------- _chroma_collection = None _sources_cache: dict[str, dict] | None = None def _sources() -> dict[str, dict]: """Load sources.json as {source_id: source_dict}.""" global _sources_cache if _sources_cache is not None: return _sources_cache if not SOURCES_JSON.exists(): _sources_cache = {} return _sources_cache try: items = json.loads(SOURCES_JSON.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: log.warning("sources.json unreadable: %s", exc) items = [] _sources_cache = {s["id"]: s for s in items if "id" in s} return _sources_cache def _collection(): """Get the Chroma collection (lazy — only loads the embedder when first queried, so the server starts cleanly even if Ollama is briefly down).""" global _chroma_collection if _chroma_collection is not None: return _chroma_collection import chromadb from chromadb.config import Settings from rag.embeddings import embedding_function client = chromadb.PersistentClient( path=str(CHROMA_DIR), settings=Settings(anonymized_telemetry=False), ) _chroma_collection = client.get_collection( COLLECTION, embedding_function=embedding_function() ) return _chroma_collection def _build_where( source: str | None, product_class: str | None, registrant_contains: str | None, signal_word: str | None, epa_reg_no: str | None, ) -> dict | None: """Translate filter args into a Chroma `where` clause. Chroma's where supports exact-match per field (and $and/$or). For `registrant_contains` we can only do exact equality at the where level, so substring matching is applied post-query in Python. """ conds: list[dict] = [] if source: conds.append({"source": source}) if product_class: conds.append({"product_class": product_class}) if signal_word: conds.append({"signal_word": signal_word}) if epa_reg_no: conds.append({"epa_reg_no": epa_reg_no}) if not conds: return None if len(conds) == 1: return conds[0] return {"$and": conds} def _read_label(source: str, source_key: str) -> tuple[str, dict] | None: """Read a label off disk. Returns (markdown_body, metadata_dict) or None.""" md_path = CORPUS_ROOT / source / f"{source_key}.md" json_path = CORPUS_ROOT / source / f"{source_key}.json" if not md_path.exists() or not json_path.exists(): return None try: return md_path.read_text(encoding="utf-8"), json.loads( json_path.read_text(encoding="utf-8") ) except (OSError, json.JSONDecodeError): return None def _format_hit(doc: str, meta: dict, score: float) -> str: """Render one search hit as a markdown block.""" product = meta.get("product_name") or meta.get("source_key") or "(unknown)" reg = meta.get("epa_reg_no") or "—" registrant = meta.get("registrant") or "" actives = meta.get("active_ingredients") or "" pclass = meta.get("product_class") or "" signal = meta.get("signal_word") or "" section = meta.get("section") or "" source = meta.get("source") or "?" source_key = meta.get("source_key") or "?" label_url = meta.get("label_url") or "" header = ( f"### {product} (EPA Reg {reg}) · score={score:.3f}\n" f"- **Source:** `{source}/{source_key}`" + (f" · class: {pclass}" if pclass else "") + (f" · signal: {signal}" if signal else "") + (f" · section: {section}" if section else "") + "\n" + (f"- **Registrant:** {registrant}\n" if registrant else "") + (f"- **Active ingredients:** {actives}\n" if actives else "") + (f"- **Label PDF:** {label_url}\n" if label_url else "") ) return header + "\n" + doc.strip() + "\n" # =========================================================================== # Tools # =========================================================================== @mcp.tool() def search_docs( query: Annotated[ str, Field(description="Natural-language query about pesticide labels — " "products, crops, pests, application rates, REI/PHI, " "tank-mix restrictions, signal words, active ingredients."), ], source: Annotated[ str | None, Field(description="OPTIONAL source id to restrict the search (e.g. " "'bayer', 'epa_ppls'). Use list_versions() to discover " "available sources."), ] = None, product_class: Annotated[ str | None, Field(description="OPTIONAL product class filter: 'herbicide', " "'fungicide', 'insecticide', 'seed-treatment'. " "Often null for EPA PPLS records."), ] = None, registrant_contains: Annotated[ str | None, Field(description="OPTIONAL substring of the registrant company name " "(case-insensitive). Use to scope to a manufacturer " "(e.g., 'SYNGENTA', 'BAYER', 'CORTEVA')."), ] = None, signal_word: Annotated[ str | None, Field(description="OPTIONAL EPA signal word filter: 'Danger', 'Warning', " "'Caution', or 'No Signal Word'."), ] = None, epa_reg_no: Annotated[ str | None, Field(description="OPTIONAL exact EPA Registration Number (e.g. " "'524-591', '524-475-12345'). Narrows to chunks from " "just that registration."), ] = None, k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10, ) -> str: """Search the EPA / manufacturer pesticide-label corpus. Returns the top-k most relevant label chunks for a natural-language query. Each hit shows product name, EPA Reg No, registrant, signal word, active ingredients, and a link to the source PDF. Call this proactively whenever the user asks anything that should be answerable from a pesticide product label — application rates, target pests, target crops, re-entry intervals (REI), pre-harvest intervals (PHI), tank-mix restrictions, signal words, environmental hazards, storage requirements, etc. The corpus is scoped to US row crops (corn / soybeans / wheat). For products outside that scope, results will be empty or marginal. """ with TimedCall("search_docs", { "query": query, "source": source, "product_class": product_class, "registrant_contains": registrant_contains, "signal_word": signal_word, "epa_reg_no": epa_reg_no, "k": k, "hybrid": HYBRID_SEARCH, "rerank": bool(RERANK_URL), }) as _call: try: col = _collection() except Exception as exc: # noqa: BLE001 _call.set(hits_returned=0, error=str(exc)) return f"_(search backend unavailable: {exc})_" where = _build_where(source, product_class, registrant_contains, signal_word, epa_reg_no) # Over-fetch — we need a meaningful pool for fusion/reranking, # and registrant_contains filtering trims down post-query. pool = max(k * (5 if (HYBRID_SEARCH or RERANK_URL) else 2), k * (4 if registrant_contains else 2)) scored: list[tuple[str, dict, float]] = [] try: scored = _search_chunks(query, pool, where, registrant_contains) except Exception as exc: # noqa: BLE001 _call.set(hits_returned=0, error=str(exc)) return f"_(search failed: {exc})_" # Optionally rerank the pool (Phase 6) before truncating to k. if RERANK_URL and len(scored) > 1: try: scored = _rerank_pool(query, scored) except Exception as exc: # noqa: BLE001 log.warning("rerank failed (%s) — falling back to base order", exc) scored = scored[:k] _call.set(hits_returned=len(scored)) if not scored: return "_(no results — try broadening the query, dropping filters, or check list_versions() for valid sources/classes)_" mode = "hybrid-rrf+rerank" if (HYBRID_SEARCH and RERANK_URL) else \ "hybrid-rrf" if HYBRID_SEARCH else \ "dense+rerank" if RERANK_URL else "dense" out: list[str] = [ f"# Search results for {query!r} ({len(scored)} hits, mode={mode})", "", ] for doc, meta, score in scored: out.append(_format_hit(doc, meta, score)) return "\n".join(out) def _search_chunks( query: str, pool: int, where: dict | None, registrant_contains: str | None, ) -> list[tuple[str, dict, float]]: """Run dense (and optionally BM25-hybrid) chunk retrieval, return list of (doc_text, metadata, score) sorted by score descending. Filters by ``registrant_contains`` post-query.""" col = _collection() # --- dense (Chroma) ---------------------------------------------------- dense_res = col.query(query_texts=[query], n_results=pool, where=where) dense_ids = dense_res.get("ids", [[]])[0] dense_docs = dense_res.get("documents", [[]])[0] dense_metas = dense_res.get("metadatas", [[]])[0] dense_dists = dense_res.get("distances", [[]])[0] chunk_pool: dict[str, dict] = {} for cid, doc, meta, dist in zip(dense_ids, dense_docs, dense_metas, dense_dists): chunk_pool[cid] = { "doc": doc, "meta": meta or {}, "dense_sim": max(0.0, 1.0 - float(dist)), "dense_rank": None, "bm25_rank": None, } for rank, cid in enumerate(dense_ids, start=1): chunk_pool[cid]["dense_rank"] = rank # --- BM25 (Phase 8 hybrid) -------------------------------------------- if HYBRID_SEARCH: try: from rag.bm25 import BM25Index bm25 = BM25Index(BM25_DB) bm25_hits = bm25.query(query, n=pool) except Exception as exc: # noqa: BLE001 log.warning("bm25 query failed (%s) — dense-only this call", exc) bm25_hits = [] missing_ids = [cid for cid, _ in bm25_hits if cid not in chunk_pool] if missing_ids: got = col.get(ids=missing_ids, include=["documents", "metadatas"]) for cid, doc, meta in zip(got.get("ids", []), got.get("documents", []), got.get("metadatas", [])): chunk_pool[cid] = { "doc": doc, "meta": meta or {}, "dense_sim": 0.0, "dense_rank": None, "bm25_rank": None, } for rank, (cid, _bm25_score) in enumerate(bm25_hits, start=1): if cid in chunk_pool: chunk_pool[cid]["bm25_rank"] = rank # --- RRF fusion or dense-only score ----------------------------------- out: list[tuple[str, dict, float]] = [] for cid, info in chunk_pool.items(): meta = info["meta"] if registrant_contains: reg = (meta.get("registrant") or "").upper() if registrant_contains.upper() not in reg: continue if HYBRID_SEARCH: rrf = 0.0 if info["dense_rank"]: rrf += 1.0 / (RRF_K + info["dense_rank"]) if info["bm25_rank"]: rrf += 1.0 / (RRF_K + info["bm25_rank"]) score = rrf else: score = info["dense_sim"] out.append((info["doc"], meta, score)) out.sort(key=lambda x: -x[2]) return out def _rerank_pool( query: str, pool: list[tuple[str, dict, float]], ) -> list[tuple[str, dict, float]]: """Send (query, doc_text) pairs to a llama.cpp /v1/rerank endpoint and reorder by relevance score. Truncates docs to 2000 chars (the jina-reranker GGUF rejects the ENTIRE batch if any pair exceeds n_ctx_train=1024; full text still goes back to the user).""" import httpx docs_truncated = [d[:2000] for d, _meta, _s in pool[:RERANK_POOL]] if not docs_truncated: return pool r = httpx.post( f"{RERANK_URL}/v1/rerank", json={"query": query, "documents": docs_truncated}, timeout=RERANK_TIMEOUT, ) r.raise_for_status() data = r.json() results = data.get("results") or [] rescored: list[tuple[str, dict, float]] = [] for r_item in results: idx = r_item.get("index") score = r_item.get("relevance_score") or r_item.get("score") or 0.0 if isinstance(idx, int) and 0 <= idx < len(pool): doc, meta, _ = pool[idx] rescored.append((doc, meta, float(score))) rescored.sort(key=lambda x: -x[2]) # Anything in the original pool past RERANK_POOL stays at the tail # in original order (rare — we usually rerank the entire pool). seen = {id(item) for item in rescored} tail = [p for p in pool[RERANK_POOL:] if id(p) not in seen] return rescored + tail @mcp.tool() def get_page( source: Annotated[ str, Field(description="Source id (e.g. 'bayer', 'epa_ppls'). See " "list_versions()."), ], source_key: Annotated[ str, Field(description="Per-source primary key — a product slug for " "manufacturer sources ('warrant', 'huskie') or an " "EPA Reg No for EPA PPLS ('524-475')."), ], ) -> str: """Return the full markdown of one pesticide label, with metadata header. Use this after search_docs surfaces a relevant label and you (or the user) want the complete text — not just the matched chunks. Useful when answering nuanced questions about a specific product's directions, restrictions, or tank-mix table. """ with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call: data = _read_label(source, source_key) if data is None: _call.set(found=False) return f"Label not found: {source}/{source_key}" md, meta = data _call.set(found=True, label_chars=len(md)) label = meta.get("label") or {} actives_list = [ a["name"] for a in (meta.get("active_ingredients") or []) if isinstance(a, dict) and a.get("name") ] header_lines = [ f"# {meta.get('product_name') or source_key}", "", f"- **EPA Reg No:** {meta.get('epa_reg_no') or '(unknown)'}", f"- **Source:** {source}/{source_key}", ] if meta.get("registrant"): header_lines.append(f"- **Registrant:** {meta['registrant']}") if meta.get("product_class"): header_lines.append(f"- **Product class:** {meta['product_class']}") if meta.get("signal_word"): header_lines.append(f"- **Signal word:** {meta['signal_word']}") if actives_list: header_lines.append(f"- **Active ingredients:** {', '.join(actives_list)}") if label.get("accepted_date"): header_lines.append(f"- **Label accepted:** {label['accepted_date']}") if label.get("url"): header_lines.append(f"- **Label PDF:** {label['url']}") header_lines.extend(["", "---", ""]) return "\n".join(header_lines) + md @mcp.tool() def list_versions() -> str: """List the available sources, product classes, and registrants in the corpus. Use this to discover valid filter values for search_docs. The corpus is scoped to US row-crop pesticide labels (corn / soybeans / wheat). Despite the name (preserved for MCP-client compatibility), this returns labels-domain facets — not software-version facets. """ with TimedCall("list_versions", {}) as _call: cat = _sources() # Source-level summary from sources.json lines: list[str] = ["# PPLS docs corpus"] # Live counts from Chroma (best-effort; the server should still # render a useful response if Chroma is unreachable) chunk_count = label_count = None try: col = _collection() chunk_count = col.count() except Exception: # noqa: BLE001 pass if CORPUS_ROOT.exists(): label_count = sum( 1 for p in CORPUS_ROOT.glob("*/*.json") if not p.name.startswith(".") ) if chunk_count is not None or label_count is not None: lines.append("") if label_count is not None: lines.append(f"- **Labels indexed:** {label_count:,}") if chunk_count is not None: lines.append(f"- **Chunks indexed:** {chunk_count:,}") if cat: lines.append("\n## Sources\n") for sid, s in sorted(cat.items()): title = s.get("title") or sid stype = s.get("type") or "" lines.append(f"- `{sid}` *({stype})* — {title}") if s.get("scope_filter"): lines.append(f" - scope: {s['scope_filter']}") else: lines.append("\n_(sources.json missing — corpus may not be initialized)_") # Per-source facets if Chroma is reachable try: col = _collection() # We can't enumerate distinct metadata values from Chroma directly; # walk a sample to discover them. ~50K sample is fine for our # ~200K-chunk corpus and keeps this tool fast. sample = col.get(limit=50000, include=["metadatas"]) metas = sample.get("metadatas") or [] classes = sorted({m.get("product_class") for m in metas if m.get("product_class")}) signals = sorted({m.get("signal_word") for m in metas if m.get("signal_word")}) registrants = sorted({m.get("registrant") for m in metas if m.get("registrant")}) _call.set(sources=len(cat), classes=len(classes), signals=len(signals), registrants=len(registrants)) if classes: lines.append("\n## Product classes\n") for c in classes: lines.append(f"- `{c}`") if signals: lines.append("\n## Signal words\n") for s in signals: lines.append(f"- `{s}`") if registrants: lines.append(f"\n## Registrants ({len(registrants)})\n") for r in registrants[:50]: lines.append(f"- {r}") if len(registrants) > 50: lines.append(f"- _(…{len(registrants)-50} more)_") except Exception as exc: # noqa: BLE001 log.debug("could not sample Chroma metadata: %s", exc) _call.set(sources=len(cat), classes=0, signals=0, registrants=0) return "\n".join(lines) @mcp.tool() def corpus_status() -> str: """Report counts + freshness of the indexed label corpus. Use to confirm the search backend is healthy, see how many labels are indexed, and check which sources are currently feeding the corpus. Cheap — no embedder call. """ with TimedCall("corpus_status", {}) as _call: lines: list[str] = ["# PPLS corpus status\n"] # On-disk corpus labels_by_source: dict[str, int] = {} if CORPUS_ROOT.exists(): for source_dir in sorted(CORPUS_ROOT.iterdir()): if not source_dir.is_dir() or source_dir.name.startswith("."): continue n = sum(1 for _ in source_dir.glob("*.json")) if n: labels_by_source[source_dir.name] = n else: lines.append(f"_(corpus root {CORPUS_ROOT} doesn't exist)_") _call.set(labels=0, chunks=0, sources=0) return "\n".join(lines) total_labels = sum(labels_by_source.values()) lines.append(f"- **Corpus root:** `{CORPUS_ROOT}`") lines.append(f"- **Total labels on disk:** {total_labels:,}") # Chroma try: col = _collection() chunks = col.count() lines.append(f"- **Chunks in Chroma:** {chunks:,}") lines.append(f"- **Chroma dir:** `{CHROMA_DIR}`") lines.append(f"- **Collection:** `{COLLECTION}`") except Exception as exc: # noqa: BLE001 chunks = 0 lines.append(f"- **Chroma:** _unavailable_ ({exc})") # BM25 if BM25_DB.exists(): lines.append(f"- **BM25 db:** `{BM25_DB}` ({BM25_DB.stat().st_size / 1024 / 1024:.0f} MB)") else: lines.append("- **BM25 db:** _not built_") if labels_by_source: lines.append("\n## Labels per source\n") for src, n in sorted(labels_by_source.items(), key=lambda kv: -kv[1]): lines.append(f"- `{src}`: {n:,} labels") # Active feature flags flags = [] if RERANK_URL: flags.append(f"RERANK_URL=`{RERANK_URL}`") if HYBRID_SEARCH: flags.append("HYBRID_SEARCH=on") if flags: lines.append("\n## Active feature flags\n") for f in flags: lines.append(f"- {f}") _call.set(labels=total_labels, chunks=chunks, sources=len(labels_by_source)) return "\n".join(lines) # --------------------------------------------------------------------------- # Stubs for later phases — keep the signatures in this file so refactors # don't lose the contracts. Implementations come per phase. # --------------------------------------------------------------------------- # @mcp.tool() # Phase 12 # def find_doc_inconsistencies(scope_query: str, ...) -> str: ... # @mcp.tool() # Phase 11 # def ppls_label_lessons(topic: str | None = None) -> str: ... # =========================================================================== # Entry point # =========================================================================== def main() -> None: import argparse p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server") p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"], default=os.environ.get("MCP_TRANSPORT", "stdio")) p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0")) p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000"))) args = p.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") if args.transport == "stdio": mcp.run() else: mcp.settings.host = args.host mcp.settings.port = args.port if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}: mcp.settings.transport_security.enable_dns_rebinding_protection = False mcp.run(transport=args.transport) if __name__ == "__main__": main()