97a2a05b24
Adapt docs_mcp/server.py from versioned-software-docs domain to
pesticide-labels domain. Standard MCP tool names preserved
(search_docs / get_page / list_versions) so existing MCP clients
(Claude Desktop, Cursor) still pick them up; docstrings + argument
shape are labels-domain.
Tools shipped:
- search_docs(query, source, product_class, registrant_contains,
signal_word, epa_reg_no, k) — dense Chroma query with optional
filters, post-filtered for registrant substring. Returns top-k
chunks rendered as markdown with product / reg / registrant /
actives / signal / section / label-PDF URL.
- get_page(source, source_key) — full label markdown + metadata
header. source_key is slug for MFR sources, EPA Reg No for EPA PPLS.
- list_versions() — discovers facet values: sources, product
classes, signal words, registrants (samples up to 50K chunks
from Chroma to enumerate distinct metadata values).
- corpus_status() — fast no-embedder counts: labels on disk per
source, chunks in Chroma, BM25 db size, active feature flags.
Wiring:
- Reads PPLS_CORPUS_ROOT + PPLS_CHROMA_DIR (matches the scrapers
and indexer).
- Uses sources.json (not the template's bundles.json).
- Lazy Chroma init so the server starts cleanly even when Ollama
is briefly down (e.g. during HVM corpus rebuilds).
- Phase 6 reranker + Phase 8 hybrid hooks left as feature flags
(RERANK_URL, HYBRID_SEARCH) — fail open to dense-only when unset.
Smoke test against the live 216K-chunk corpus:
- corpus_status: 4,157 labels / 216,467 chunks / 416 MB BM25 ✓
- search_docs("waterhemp control on soybeans", k=2) returns
Tackle Herbicide (FMC, 279-3564, glyph+imazethapyr) and
R14640 Herbicide (Bayer, 524-724, glyph) with section context
(ROUNDUP READY SOYBEANS / SOYBEAN) and dist-derived scores
of 0.76 each — highly relevant.
Run as stdio for Claude Desktop:
PPLS_CORPUS_ROOT=/run/media/justin/USB/ppls-corpus \
OLLAMA_URL=http://gpu1:11434,http://gpu2:11434 \
PRODUCT_NAME=ppls \
python -m docs_mcp.server --transport stdio
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
535 lines
21 KiB
Python
535 lines
21 KiB
Python
"""MCP server for the ppls-docs pesticide label corpus.
|
|
|
|
Adapted from the docs-mcp-template (which targeted versioned software
|
|
docs) for the EPA pesticide-labels domain: ``bundle_id`` → ``source``,
|
|
``page_id`` → ``source_key`` (slug for MFRs, EPA Reg No for EPA PPLS),
|
|
and ``version``/``platform`` filters → product-class / registrant /
|
|
signal-word filters. See ``scrape/README.md`` for the corpus schema.
|
|
|
|
Phase progression in this file:
|
|
Phase 3 — search_docs, get_page, list_versions, corpus_status (you are here)
|
|
Phase 6 — reranker integration in search_docs
|
|
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate)
|
|
|
|
Standard MCP tool names (search_docs / get_page / list_versions) are
|
|
preserved so clients that expect a docs MCP shape still work; the
|
|
docstrings make the labels-domain semantics explicit.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pydantic import Field
|
|
|
|
from .usage import TimedCall
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Product configuration.
|
|
# ---------------------------------------------------------------------------
|
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "ppls")
|
|
PRODUCT_DOCS_URL = os.environ.get(
|
|
"PRODUCT_DOCS_URL",
|
|
"https://ordspub.epa.gov/ords/pesticides/f?p=PPLS:1",
|
|
)
|
|
COLLECTION = f"{PRODUCT_NAME}_docs"
|
|
|
|
# Paths — corpus on (possibly) external storage, indexes always at repo root.
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
CORPUS_ROOT = Path(os.environ.get("PPLS_CORPUS_ROOT") or REPO_ROOT / "corpus")
|
|
CHROMA_DIR = Path(os.environ.get("PPLS_CHROMA_DIR") or REPO_ROOT / "chroma")
|
|
BM25_DB = Path(os.environ.get("BM25_DB",
|
|
str(REPO_ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
|
|
SOURCES_JSON = REPO_ROOT / "sources.json"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature flags (enabled in later phases).
|
|
# ---------------------------------------------------------------------------
|
|
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
|
|
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
|
|
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
|
|
|
|
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
|
|
RRF_K = int(os.environ.get("RRF_K", "60"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FastMCP setup.
|
|
# ---------------------------------------------------------------------------
|
|
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lazy helpers.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_chroma_collection = None
|
|
_sources_cache: dict[str, dict] | None = None
|
|
|
|
|
|
def _sources() -> dict[str, dict]:
|
|
"""Load sources.json as {source_id: source_dict}."""
|
|
global _sources_cache
|
|
if _sources_cache is not None:
|
|
return _sources_cache
|
|
if not SOURCES_JSON.exists():
|
|
_sources_cache = {}
|
|
return _sources_cache
|
|
try:
|
|
items = json.loads(SOURCES_JSON.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
log.warning("sources.json unreadable: %s", exc)
|
|
items = []
|
|
_sources_cache = {s["id"]: s for s in items if "id" in s}
|
|
return _sources_cache
|
|
|
|
|
|
def _collection():
|
|
"""Get the Chroma collection (lazy — only loads the embedder when first
|
|
queried, so the server starts cleanly even if Ollama is briefly down)."""
|
|
global _chroma_collection
|
|
if _chroma_collection is not None:
|
|
return _chroma_collection
|
|
import chromadb
|
|
from chromadb.config import Settings
|
|
from rag.embeddings import embedding_function
|
|
client = chromadb.PersistentClient(
|
|
path=str(CHROMA_DIR),
|
|
settings=Settings(anonymized_telemetry=False),
|
|
)
|
|
_chroma_collection = client.get_collection(
|
|
COLLECTION, embedding_function=embedding_function()
|
|
)
|
|
return _chroma_collection
|
|
|
|
|
|
def _build_where(
|
|
source: str | None,
|
|
product_class: str | None,
|
|
registrant_contains: str | None,
|
|
signal_word: str | None,
|
|
epa_reg_no: str | None,
|
|
) -> dict | None:
|
|
"""Translate filter args into a Chroma `where` clause.
|
|
|
|
Chroma's where supports exact-match per field (and $and/$or). For
|
|
`registrant_contains` we can only do exact equality at the where level,
|
|
so substring matching is applied post-query in Python.
|
|
"""
|
|
conds: list[dict] = []
|
|
if source:
|
|
conds.append({"source": source})
|
|
if product_class:
|
|
conds.append({"product_class": product_class})
|
|
if signal_word:
|
|
conds.append({"signal_word": signal_word})
|
|
if epa_reg_no:
|
|
conds.append({"epa_reg_no": epa_reg_no})
|
|
if not conds:
|
|
return None
|
|
if len(conds) == 1:
|
|
return conds[0]
|
|
return {"$and": conds}
|
|
|
|
|
|
def _read_label(source: str, source_key: str) -> tuple[str, dict] | None:
|
|
"""Read a label off disk. Returns (markdown_body, metadata_dict) or None."""
|
|
md_path = CORPUS_ROOT / source / f"{source_key}.md"
|
|
json_path = CORPUS_ROOT / source / f"{source_key}.json"
|
|
if not md_path.exists() or not json_path.exists():
|
|
return None
|
|
try:
|
|
return md_path.read_text(encoding="utf-8"), json.loads(
|
|
json_path.read_text(encoding="utf-8")
|
|
)
|
|
except (OSError, json.JSONDecodeError):
|
|
return None
|
|
|
|
|
|
def _format_hit(doc: str, meta: dict, score: float) -> str:
|
|
"""Render one search hit as a markdown block."""
|
|
product = meta.get("product_name") or meta.get("source_key") or "(unknown)"
|
|
reg = meta.get("epa_reg_no") or "—"
|
|
registrant = meta.get("registrant") or ""
|
|
actives = meta.get("active_ingredients") or ""
|
|
pclass = meta.get("product_class") or ""
|
|
signal = meta.get("signal_word") or ""
|
|
section = meta.get("section") or ""
|
|
source = meta.get("source") or "?"
|
|
source_key = meta.get("source_key") or "?"
|
|
label_url = meta.get("label_url") or ""
|
|
|
|
header = (
|
|
f"### {product} (EPA Reg {reg}) · score={score:.3f}\n"
|
|
f"- **Source:** `{source}/{source_key}`"
|
|
+ (f" · class: {pclass}" if pclass else "")
|
|
+ (f" · signal: {signal}" if signal else "")
|
|
+ (f" · section: {section}" if section else "")
|
|
+ "\n"
|
|
+ (f"- **Registrant:** {registrant}\n" if registrant else "")
|
|
+ (f"- **Active ingredients:** {actives}\n" if actives else "")
|
|
+ (f"- **Label PDF:** {label_url}\n" if label_url else "")
|
|
)
|
|
return header + "\n" + doc.strip() + "\n"
|
|
|
|
|
|
# ===========================================================================
|
|
# Tools
|
|
# ===========================================================================
|
|
|
|
@mcp.tool()
|
|
def search_docs(
|
|
query: Annotated[
|
|
str,
|
|
Field(description="Natural-language query about pesticide labels — "
|
|
"products, crops, pests, application rates, REI/PHI, "
|
|
"tank-mix restrictions, signal words, active ingredients."),
|
|
],
|
|
source: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL source id to restrict the search (e.g. "
|
|
"'bayer', 'epa_ppls'). Use list_versions() to discover "
|
|
"available sources."),
|
|
] = None,
|
|
product_class: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL product class filter: 'herbicide', "
|
|
"'fungicide', 'insecticide', 'seed-treatment'. "
|
|
"Often null for EPA PPLS records."),
|
|
] = None,
|
|
registrant_contains: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL substring of the registrant company name "
|
|
"(case-insensitive). Use to scope to a manufacturer "
|
|
"(e.g., 'SYNGENTA', 'BAYER', 'CORTEVA')."),
|
|
] = None,
|
|
signal_word: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL EPA signal word filter: 'Danger', 'Warning', "
|
|
"'Caution', or 'No Signal Word'."),
|
|
] = None,
|
|
epa_reg_no: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL exact EPA Registration Number (e.g. "
|
|
"'524-591', '524-475-12345'). Narrows to chunks from "
|
|
"just that registration."),
|
|
] = None,
|
|
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
|
|
) -> str:
|
|
"""Search the EPA / manufacturer pesticide-label corpus.
|
|
|
|
Returns the top-k most relevant label chunks for a natural-language
|
|
query. Each hit shows product name, EPA Reg No, registrant, signal
|
|
word, active ingredients, and a link to the source PDF.
|
|
|
|
Call this proactively whenever the user asks anything that should
|
|
be answerable from a pesticide product label — application rates,
|
|
target pests, target crops, re-entry intervals (REI), pre-harvest
|
|
intervals (PHI), tank-mix restrictions, signal words, environmental
|
|
hazards, storage requirements, etc.
|
|
|
|
The corpus is scoped to US row crops (corn / soybeans / wheat).
|
|
For products outside that scope, results will be empty or marginal.
|
|
"""
|
|
with TimedCall("search_docs", {
|
|
"query": query, "source": source, "product_class": product_class,
|
|
"registrant_contains": registrant_contains, "signal_word": signal_word,
|
|
"epa_reg_no": epa_reg_no, "k": k,
|
|
}) as _call:
|
|
try:
|
|
col = _collection()
|
|
except Exception as exc: # noqa: BLE001
|
|
_call.set(hits_returned=0, error=str(exc))
|
|
return f"_(search backend unavailable: {exc})_"
|
|
|
|
where = _build_where(source, product_class, registrant_contains,
|
|
signal_word, epa_reg_no)
|
|
# Over-fetch when we'll post-filter on registrant substring, so we
|
|
# still have ~k matches after the filter trims.
|
|
n_fetch = k * 4 if registrant_contains else k
|
|
try:
|
|
res = col.query(query_texts=[query], n_results=n_fetch, where=where)
|
|
except Exception as exc: # noqa: BLE001
|
|
_call.set(hits_returned=0, error=str(exc))
|
|
return f"_(search failed: {exc})_"
|
|
|
|
docs = res.get("documents", [[]])[0]
|
|
metas = res.get("metadatas", [[]])[0]
|
|
dists = res.get("distances", [[]])[0]
|
|
|
|
# Cosine distance → similarity score (1 - d). Clip to [0,1] for display.
|
|
scored: list[tuple[str, dict, float]] = []
|
|
for doc, meta, dist in zip(docs, metas, dists):
|
|
if registrant_contains:
|
|
reg = (meta.get("registrant") or "").upper()
|
|
if registrant_contains.upper() not in reg:
|
|
continue
|
|
score = max(0.0, 1.0 - float(dist))
|
|
scored.append((doc, meta, score))
|
|
if len(scored) >= k:
|
|
break
|
|
|
|
_call.set(hits_returned=len(scored))
|
|
if not scored:
|
|
return "_(no results — try broadening the query, dropping filters, or check list_versions() for valid sources/classes)_"
|
|
|
|
out: list[str] = [
|
|
f"# Search results for {query!r} ({len(scored)} of top-{n_fetch} dense hits)",
|
|
"",
|
|
]
|
|
for doc, meta, score in scored:
|
|
out.append(_format_hit(doc, meta, score))
|
|
return "\n".join(out)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_page(
|
|
source: Annotated[
|
|
str,
|
|
Field(description="Source id (e.g. 'bayer', 'epa_ppls'). See "
|
|
"list_versions()."),
|
|
],
|
|
source_key: Annotated[
|
|
str,
|
|
Field(description="Per-source primary key — a product slug for "
|
|
"manufacturer sources ('warrant', 'huskie') or an "
|
|
"EPA Reg No for EPA PPLS ('524-475')."),
|
|
],
|
|
) -> str:
|
|
"""Return the full markdown of one pesticide label, with metadata header.
|
|
|
|
Use this after search_docs surfaces a relevant label and you (or the
|
|
user) want the complete text — not just the matched chunks. Useful
|
|
when answering nuanced questions about a specific product's
|
|
directions, restrictions, or tank-mix table.
|
|
"""
|
|
with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call:
|
|
data = _read_label(source, source_key)
|
|
if data is None:
|
|
_call.set(found=False)
|
|
return f"Label not found: {source}/{source_key}"
|
|
md, meta = data
|
|
_call.set(found=True, label_chars=len(md))
|
|
label = meta.get("label") or {}
|
|
actives_list = [
|
|
a["name"] for a in (meta.get("active_ingredients") or [])
|
|
if isinstance(a, dict) and a.get("name")
|
|
]
|
|
header_lines = [
|
|
f"# {meta.get('product_name') or source_key}",
|
|
"",
|
|
f"- **EPA Reg No:** {meta.get('epa_reg_no') or '(unknown)'}",
|
|
f"- **Source:** {source}/{source_key}",
|
|
]
|
|
if meta.get("registrant"):
|
|
header_lines.append(f"- **Registrant:** {meta['registrant']}")
|
|
if meta.get("product_class"):
|
|
header_lines.append(f"- **Product class:** {meta['product_class']}")
|
|
if meta.get("signal_word"):
|
|
header_lines.append(f"- **Signal word:** {meta['signal_word']}")
|
|
if actives_list:
|
|
header_lines.append(f"- **Active ingredients:** {', '.join(actives_list)}")
|
|
if label.get("accepted_date"):
|
|
header_lines.append(f"- **Label accepted:** {label['accepted_date']}")
|
|
if label.get("url"):
|
|
header_lines.append(f"- **Label PDF:** {label['url']}")
|
|
header_lines.extend(["", "---", ""])
|
|
return "\n".join(header_lines) + md
|
|
|
|
|
|
@mcp.tool()
|
|
def list_versions() -> str:
|
|
"""List the available sources, product classes, and registrants in the corpus.
|
|
|
|
Use this to discover valid filter values for search_docs. The corpus
|
|
is scoped to US row-crop pesticide labels (corn / soybeans / wheat).
|
|
|
|
Despite the name (preserved for MCP-client compatibility), this
|
|
returns labels-domain facets — not software-version facets.
|
|
"""
|
|
with TimedCall("list_versions", {}) as _call:
|
|
cat = _sources()
|
|
|
|
# Source-level summary from sources.json
|
|
lines: list[str] = ["# PPLS docs corpus"]
|
|
|
|
# Live counts from Chroma (best-effort; the server should still
|
|
# render a useful response if Chroma is unreachable)
|
|
chunk_count = label_count = None
|
|
try:
|
|
col = _collection()
|
|
chunk_count = col.count()
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
if CORPUS_ROOT.exists():
|
|
label_count = sum(
|
|
1 for p in CORPUS_ROOT.glob("*/*.json")
|
|
if not p.name.startswith(".")
|
|
)
|
|
|
|
if chunk_count is not None or label_count is not None:
|
|
lines.append("")
|
|
if label_count is not None:
|
|
lines.append(f"- **Labels indexed:** {label_count:,}")
|
|
if chunk_count is not None:
|
|
lines.append(f"- **Chunks indexed:** {chunk_count:,}")
|
|
|
|
if cat:
|
|
lines.append("\n## Sources\n")
|
|
for sid, s in sorted(cat.items()):
|
|
title = s.get("title") or sid
|
|
stype = s.get("type") or ""
|
|
lines.append(f"- `{sid}` *({stype})* — {title}")
|
|
if s.get("scope_filter"):
|
|
lines.append(f" - scope: {s['scope_filter']}")
|
|
else:
|
|
lines.append("\n_(sources.json missing — corpus may not be initialized)_")
|
|
|
|
# Per-source facets if Chroma is reachable
|
|
try:
|
|
col = _collection()
|
|
# We can't enumerate distinct metadata values from Chroma directly;
|
|
# walk a sample to discover them. ~50K sample is fine for our
|
|
# ~200K-chunk corpus and keeps this tool fast.
|
|
sample = col.get(limit=50000, include=["metadatas"])
|
|
metas = sample.get("metadatas") or []
|
|
classes = sorted({m.get("product_class") for m in metas if m.get("product_class")})
|
|
signals = sorted({m.get("signal_word") for m in metas if m.get("signal_word")})
|
|
registrants = sorted({m.get("registrant") for m in metas if m.get("registrant")})
|
|
_call.set(sources=len(cat), classes=len(classes),
|
|
signals=len(signals), registrants=len(registrants))
|
|
if classes:
|
|
lines.append("\n## Product classes\n")
|
|
for c in classes:
|
|
lines.append(f"- `{c}`")
|
|
if signals:
|
|
lines.append("\n## Signal words\n")
|
|
for s in signals:
|
|
lines.append(f"- `{s}`")
|
|
if registrants:
|
|
lines.append(f"\n## Registrants ({len(registrants)})\n")
|
|
for r in registrants[:50]:
|
|
lines.append(f"- {r}")
|
|
if len(registrants) > 50:
|
|
lines.append(f"- _(…{len(registrants)-50} more)_")
|
|
except Exception as exc: # noqa: BLE001
|
|
log.debug("could not sample Chroma metadata: %s", exc)
|
|
_call.set(sources=len(cat), classes=0, signals=0, registrants=0)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def corpus_status() -> str:
|
|
"""Report counts + freshness of the indexed label corpus.
|
|
|
|
Use to confirm the search backend is healthy, see how many labels are
|
|
indexed, and check which sources are currently feeding the corpus.
|
|
Cheap — no embedder call.
|
|
"""
|
|
with TimedCall("corpus_status", {}) as _call:
|
|
lines: list[str] = ["# PPLS corpus status\n"]
|
|
|
|
# On-disk corpus
|
|
labels_by_source: dict[str, int] = {}
|
|
if CORPUS_ROOT.exists():
|
|
for source_dir in sorted(CORPUS_ROOT.iterdir()):
|
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
|
continue
|
|
n = sum(1 for _ in source_dir.glob("*.json"))
|
|
if n:
|
|
labels_by_source[source_dir.name] = n
|
|
else:
|
|
lines.append(f"_(corpus root {CORPUS_ROOT} doesn't exist)_")
|
|
_call.set(labels=0, chunks=0, sources=0)
|
|
return "\n".join(lines)
|
|
|
|
total_labels = sum(labels_by_source.values())
|
|
lines.append(f"- **Corpus root:** `{CORPUS_ROOT}`")
|
|
lines.append(f"- **Total labels on disk:** {total_labels:,}")
|
|
|
|
# Chroma
|
|
try:
|
|
col = _collection()
|
|
chunks = col.count()
|
|
lines.append(f"- **Chunks in Chroma:** {chunks:,}")
|
|
lines.append(f"- **Chroma dir:** `{CHROMA_DIR}`")
|
|
lines.append(f"- **Collection:** `{COLLECTION}`")
|
|
except Exception as exc: # noqa: BLE001
|
|
chunks = 0
|
|
lines.append(f"- **Chroma:** _unavailable_ ({exc})")
|
|
|
|
# BM25
|
|
if BM25_DB.exists():
|
|
lines.append(f"- **BM25 db:** `{BM25_DB}` ({BM25_DB.stat().st_size / 1024 / 1024:.0f} MB)")
|
|
else:
|
|
lines.append("- **BM25 db:** _not built_")
|
|
|
|
if labels_by_source:
|
|
lines.append("\n## Labels per source\n")
|
|
for src, n in sorted(labels_by_source.items(), key=lambda kv: -kv[1]):
|
|
lines.append(f"- `{src}`: {n:,} labels")
|
|
|
|
# Active feature flags
|
|
flags = []
|
|
if RERANK_URL:
|
|
flags.append(f"RERANK_URL=`{RERANK_URL}`")
|
|
if HYBRID_SEARCH:
|
|
flags.append("HYBRID_SEARCH=on")
|
|
if flags:
|
|
lines.append("\n## Active feature flags\n")
|
|
for f in flags:
|
|
lines.append(f"- {f}")
|
|
|
|
_call.set(labels=total_labels, chunks=chunks, sources=len(labels_by_source))
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stubs for later phases — keep the signatures in this file so refactors
|
|
# don't lose the contracts. Implementations come per phase.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# @mcp.tool() # Phase 12
|
|
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ...
|
|
|
|
# @mcp.tool() # Phase 11
|
|
# def ppls_label_lessons(topic: str | None = None) -> str: ...
|
|
|
|
|
|
# ===========================================================================
|
|
# Entry point
|
|
# ===========================================================================
|
|
|
|
def main() -> None:
|
|
import argparse
|
|
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
|
|
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
|
|
default=os.environ.get("MCP_TRANSPORT", "stdio"))
|
|
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
|
|
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
|
|
args = p.parse_args()
|
|
|
|
logging.basicConfig(level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
|
|
|
if args.transport == "stdio":
|
|
mcp.run()
|
|
else:
|
|
mcp.settings.host = args.host
|
|
mcp.settings.port = args.port
|
|
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
|
|
mcp.settings.transport_security.enable_dns_rebinding_protection = False
|
|
mcp.run(transport=args.transport)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|