dda044eb95
Phase 3/6/7/8 in one pass since they depend on each other.
* docs_mcp/server.py
- Wire search_docs / get_page / list_versions tool bodies.
- search_docs flow: BM25 first (rag.bm25 FTS5) → over-fetch RERANK_POOL
chunks → POST to RERANK_URL/v1/rerank → return top-k. Dense is the
fallback when BM25 finds nothing. HYBRID_SEARCH=true switches to
dense+BM25+RRF (fused via the new _rrf_fuse helper).
- All retrieval failures are caught and fall back to the next layer,
so a dead reranker or missing BM25 db never blocks a search.
- Source URLs built from the bundle's docId so results link straight
into support.hpe.com.
* eval/
- 22 hand-curated golden queries grounded in real corpus page titles.
- DenseRetriever / BM25Retriever / HybridRetriever / RerankedRetriever
+ MRR/Recall@K/nDCG@K harness. RERANK_URL env activates the
reranked variants.
- Committed eval/results/baseline.md. On this corpus:
dense: MRR 0.539
bm25: MRR 0.880
hybrid_rrf: MRR 0.692
bm25+rerank: MRR 0.920 (winner)
hybrid_rrf+rerank: MRR 0.875
HPE structured docs use controlled vocabulary, so lexical match
dominates. Hybrid loses because dense pollutes the fused pool.
* scripts/rerank_server.py
- Minimal HTTP /v1/rerank over sentence-transformers
cross-encoder/ms-marco-MiniLM-L-6-v2. Cohere-style request/response.
- This is the dev/CPU fallback; production replaces it with the
llama.cpp + jina-reranker-v2-base GGUF sidecar (same wire protocol).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
515 lines
21 KiB
Python
515 lines
21 KiB
Python
"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies.
|
|
|
|
This file is the template's structural anchor. The phases described in
|
|
PLAN.md add or extend pieces of this file:
|
|
|
|
Phase 3 — search_docs, get_page, list_versions stubs (you are here)
|
|
Phase 6 — reranker integration in search_docs
|
|
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse)
|
|
Phase 9 — diff_versions, list_cluster, bundle_changelog
|
|
Phase 10 — TimedCall wiring (already imported below)
|
|
Phase 11 — <product>_api_lessons tool
|
|
Phase 12 — find_doc_inconsistencies, submit_doc_bug
|
|
Phase 13 — weekly_digest + _digest_history reader
|
|
|
|
Every stub below has a docstring + `raise NotImplementedError`. Replace
|
|
the body when you reach the corresponding phase. Keep the signatures
|
|
stable across products — clients depend on them.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pydantic import Field
|
|
|
|
from .usage import TimedCall
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Product-specific configuration. Set these for each new build.
|
|
# ---------------------------------------------------------------------------
|
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "hvm")
|
|
PRODUCT_DOCS_URL = os.environ.get(
|
|
"PRODUCT_DOCS_URL",
|
|
"https://support.hpe.com/hpesc/public/docDisplay?docId=sd00007735en_us",
|
|
)
|
|
COLLECTION = f"{PRODUCT_NAME}_docs"
|
|
|
|
# Paths inside the deployed container (and matching layout locally for dev).
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
CORPUS = ROOT / "corpus"
|
|
CHROMA_DIR = ROOT / "chroma"
|
|
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
|
|
BUNDLES_JSON = ROOT / "bundles.json"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase).
|
|
# ---------------------------------------------------------------------------
|
|
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
|
|
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
|
|
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
|
|
|
|
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
|
|
RRF_K = int(os.environ.get("RRF_K", "60"))
|
|
|
|
DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on")
|
|
DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint
|
|
DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FastMCP setup.
|
|
#
|
|
# stateless_http=True — every request creates an ephemeral session and
|
|
# discards it on return. Critical for production: clients don't get
|
|
# 404 storms when the container is recreated by Watchtower.
|
|
# ---------------------------------------------------------------------------
|
|
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lazy helpers — instantiate expensive things only when actually needed,
|
|
# so the server still starts when (e.g.) Ollama is briefly unreachable.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _bundles() -> dict[str, dict]:
|
|
"""Cached load of bundles.json into a {slug: bundle_dict} mapping.
|
|
|
|
bundles.json is the product-specific catalog written by the Phase 1
|
|
scraper. See PLAN.md Phase 1 for the schema.
|
|
"""
|
|
if not BUNDLES_JSON.exists():
|
|
return {}
|
|
cat = json.loads(BUNDLES_JSON.read_text())
|
|
return {b["slug"]: b for b in cat}
|
|
|
|
|
|
def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
|
|
"""Translate filter args into a Chroma `where` clause."""
|
|
conds: list[dict] = []
|
|
if version:
|
|
conds.append({"version": version})
|
|
if platform:
|
|
conds.append({"platform": platform})
|
|
if bundle_id:
|
|
conds.append({"bundle_id": bundle_id})
|
|
if not conds:
|
|
return None
|
|
if len(conds) == 1:
|
|
return conds[0]
|
|
return {"$and": conds}
|
|
|
|
|
|
def _where_for_bm25(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
|
|
"""BM25Index.query takes a flat dict of equality filters."""
|
|
w: dict[str, str] = {}
|
|
if version: w["version"] = version
|
|
if platform: w["platform"] = platform
|
|
if bundle_id: w["bundle_id"] = bundle_id
|
|
return w or None
|
|
|
|
|
|
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
|
|
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict)."""
|
|
md_path = CORPUS / bundle_id / (page_id + ".md")
|
|
json_path = CORPUS / bundle_id / (page_id + ".json")
|
|
if not md_path.exists() or not json_path.exists():
|
|
return None
|
|
return md_path.read_text(), json.loads(json_path.read_text())
|
|
|
|
|
|
_CHROMA = None
|
|
_BM25 = None
|
|
|
|
|
|
def _collection():
|
|
"""Lazy Chroma collection handle. Cached after first call."""
|
|
global _CHROMA
|
|
if _CHROMA is None:
|
|
import chromadb
|
|
from chromadb.config import Settings
|
|
from rag.embeddings import embedding_function
|
|
|
|
client = chromadb.PersistentClient(
|
|
path=str(CHROMA_DIR),
|
|
settings=Settings(anonymized_telemetry=False),
|
|
)
|
|
_CHROMA = client.get_collection(COLLECTION, embedding_function=embedding_function())
|
|
return _CHROMA
|
|
|
|
|
|
def _bm25():
|
|
"""Lazy BM25Index handle. None if the FTS5 db isn't built."""
|
|
global _BM25
|
|
if _BM25 is None:
|
|
if not BM25_DB.exists():
|
|
return None
|
|
try:
|
|
from rag.bm25 import BM25Index
|
|
_BM25 = BM25Index(str(BM25_DB))
|
|
except Exception as e: # defensive: hybrid must never block dense
|
|
log.warning("BM25 unavailable, falling back to dense-only: %s", e)
|
|
return None
|
|
return _BM25
|
|
|
|
|
|
def _enrich_from_chroma(col, chunk_ids: list[str], fused: list | None) -> tuple[list[str], list[dict], list[float]]:
|
|
"""Fetch document text + metadata for a list of chunk ids from Chroma, in order."""
|
|
if not chunk_ids:
|
|
return [], [], []
|
|
g = col.get(ids=chunk_ids, include=["documents", "metadatas"])
|
|
by_id = {i: (d, m) for i, d, m in zip(g["ids"], g["documents"], g["metadatas"])}
|
|
docs = [by_id[i][0] for i in chunk_ids if i in by_id]
|
|
metas = [by_id[i][1] for i in chunk_ids if i in by_id]
|
|
if fused is not None:
|
|
dists = [1.0 - score for _id, score, _src in fused[:len(docs)]]
|
|
else:
|
|
dists = [0.0] * len(docs)
|
|
return docs, metas, dists
|
|
|
|
|
|
def _rerank(query: str, candidates: list[tuple[str, str]]) -> list[tuple[str, str]] | None:
|
|
"""POST to RERANK_URL /v1/rerank, return candidates re-ordered by relevance.
|
|
|
|
`candidates` is `[(chunk_id, text), ...]`. Texts are truncated to ~2000 chars
|
|
before sending so we never blow past jina-reranker's 1024-token per-pair
|
|
cap (which 400s the entire batch). The full untruncated text still goes
|
|
back to the user from Chroma; truncation is reranking-only.
|
|
|
|
Returns None on any failure — caller treats that as "skip reranking,
|
|
keep retrieval-order candidates."
|
|
"""
|
|
if not RERANK_URL or not candidates:
|
|
return None
|
|
try:
|
|
import httpx
|
|
payload = {
|
|
"query": query,
|
|
"documents": [(text or "")[:2000] for _cid, text in candidates],
|
|
"top_n": len(candidates),
|
|
}
|
|
with httpx.Client(timeout=RERANK_TIMEOUT) as c:
|
|
r = c.post(f"{RERANK_URL}/v1/rerank", json=payload)
|
|
r.raise_for_status()
|
|
results = r.json().get("results") or []
|
|
order = [candidates[item["index"]] for item in results
|
|
if isinstance(item.get("index"), int) and 0 <= item["index"] < len(candidates)]
|
|
return order or None
|
|
except Exception as e:
|
|
log.warning("rerank failed, keeping retrieval order: %s", e)
|
|
return None
|
|
|
|
|
|
def _rrf_fuse(*ranked_lists: list[str], k: int = RRF_K) -> list[tuple[str, float, dict]]:
|
|
"""Reciprocal Rank Fusion. Each ranked list is a sequence of ids in
|
|
descending relevance. Returns [(id, fused_score, per_retriever_contrib), ...]
|
|
sorted by score desc."""
|
|
scores: dict[str, float] = {}
|
|
sources: dict[str, dict] = {}
|
|
names = ("dense", "bm25", "extra")
|
|
for idx, lst in enumerate(ranked_lists):
|
|
src = names[idx] if idx < len(names) else f"r{idx}"
|
|
for rank, ident in enumerate(lst, start=1):
|
|
scores[ident] = scores.get(ident, 0.0) + 1.0 / (k + rank)
|
|
sources.setdefault(ident, {})[src] = rank
|
|
ranked = sorted(scores.items(), key=lambda kv: -kv[1])
|
|
return [(ident, score, sources[ident]) for ident, score in ranked]
|
|
|
|
|
|
def _source_url(bundle_id: str, page_id: str) -> str:
|
|
"""Build the canonical docs portal URL for a (bundle, page) pair."""
|
|
b = _bundles().get(bundle_id)
|
|
if not b:
|
|
return ""
|
|
doc_id = b.get("doc_id", "")
|
|
if page_id.startswith("GUID-"):
|
|
return f"https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}&page={page_id}.html"
|
|
return f"https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}"
|
|
|
|
|
|
# ===========================================================================
|
|
# Tools
|
|
# ===========================================================================
|
|
|
|
@mcp.tool()
|
|
def search_docs(
|
|
query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")],
|
|
version: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL version filter — restrict to one product version."),
|
|
] = None,
|
|
platform: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."),
|
|
] = None,
|
|
bundle_id: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."),
|
|
] = None,
|
|
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
|
|
) -> str:
|
|
"""Search the HPE Morpheus VM Essentials (HVM) docs corpus.
|
|
|
|
Returns the top-k most relevant chunks (with full source page URLs)
|
|
given a natural-language query. Optional filters narrow the search
|
|
to one version, one platform, or one bundle. Use list_versions()
|
|
first if you need to discover the available facet values.
|
|
|
|
Call this tool whenever the user asks anything that should be
|
|
answerable from the official product documentation — install,
|
|
upgrade, configuration, backups, networking, HVM clusters, the
|
|
Morpheus UI, or any 8.1.x release-notes question.
|
|
"""
|
|
with TimedCall("search_docs", {
|
|
"query": query, "version": version, "platform": platform,
|
|
"bundle_id": bundle_id, "k": k,
|
|
}) as _call:
|
|
try:
|
|
col = _collection()
|
|
except Exception as e:
|
|
log.exception("chroma collection unavailable")
|
|
_call.set(hits_returned=0, error=str(e))
|
|
return f"_(search backend unavailable: {e})_"
|
|
|
|
where = _build_where(version, platform, bundle_id)
|
|
bm25_where = _where_for_bm25(version, platform, bundle_id)
|
|
pool = max(k * 5, 50)
|
|
|
|
# Retrieval mode selection. Eval on this corpus (2026-05-22, 22 golden
|
|
# queries) showed BM25 MRR=0.88 vs dense MRR=0.54 vs hybrid MRR=0.69 —
|
|
# HPE structured docs use controlled vocabulary, so lexical match wins.
|
|
# Dense is kept as fallback when BM25 has no tokens to chew on (e.g.
|
|
# purely stopword queries). HYBRID_SEARCH=true forces RRF fusion.
|
|
bm = _bm25()
|
|
docs: list[str] = []
|
|
metas: list[dict] = []
|
|
dists: list[float] = []
|
|
retrieval_mode = "dense"
|
|
top1_source = "dense_only"
|
|
|
|
if HYBRID_SEARCH and bm is not None:
|
|
try:
|
|
dense_res = col.query(query_texts=[query], n_results=pool, where=where)
|
|
dense_ids = (dense_res.get("ids") or [[]])[0]
|
|
bm_hits = bm.query(query, n=pool, where=bm25_where)
|
|
bm_ids = [cid for cid, _s in bm_hits]
|
|
fused = _rrf_fuse(dense_ids, bm_ids)
|
|
docs, metas, dists = _enrich_from_chroma(col, [c for c, _, _ in fused[:k]], fused)
|
|
if fused:
|
|
src0 = fused[0][2]
|
|
top1_source = ("both" if {"dense", "bm25"} <= set(src0)
|
|
else "bm25_only" if "bm25" in src0
|
|
else "dense_only")
|
|
retrieval_mode = "hybrid"
|
|
except Exception as e:
|
|
log.warning("hybrid failed, falling back to BM25→dense: %s", e)
|
|
|
|
if not docs and bm is not None:
|
|
try:
|
|
bm_hits = bm.query(query, n=k, where=bm25_where)
|
|
if bm_hits:
|
|
ids = [cid for cid, _s in bm_hits[:k]]
|
|
docs, metas, _ = _enrich_from_chroma(col, ids, None)
|
|
# FTS5 returns negative scores (lower=better). Map onto a
|
|
# similarity-ish [0..1] just for display.
|
|
dists = [max(0.0, min(1.0, 1.0 - abs(s) / 20.0)) for _id, s in bm_hits[:k]]
|
|
retrieval_mode = "bm25"
|
|
top1_source = "bm25_only"
|
|
except Exception as e:
|
|
log.warning("BM25 retrieval failed, falling back to dense: %s", e)
|
|
|
|
if not docs:
|
|
res = col.query(query_texts=[query], n_results=k, where=where)
|
|
docs = (res.get("documents") or [[]])[0]
|
|
metas = (res.get("metadatas") or [[]])[0]
|
|
dists = (res.get("distances") or [[]])[0]
|
|
|
|
reranker_fired = False
|
|
if RERANK_URL and docs:
|
|
# Pull a deeper pool to give the reranker something to chew on.
|
|
# We over-fetch up to RERANK_POOL chunks from whichever retriever
|
|
# already won, then ask the reranker to pick the final top-k.
|
|
pool_size = max(k, RERANK_POOL)
|
|
if len(docs) < pool_size:
|
|
if retrieval_mode == "bm25":
|
|
extra = bm.query(query, n=pool_size, where=bm25_where) if bm else []
|
|
extra_ids = [cid for cid, _s in extra]
|
|
else:
|
|
extra_res = col.query(query_texts=[query], n_results=pool_size, where=where)
|
|
extra_ids = (extra_res.get("ids") or [[]])[0]
|
|
if extra_ids:
|
|
d2, m2, _ = _enrich_from_chroma(col, extra_ids, None)
|
|
docs, metas = d2, m2
|
|
dists = [0.0] * len(docs)
|
|
# Reranker scores chunk_ids — collapse to (id, text) tuples
|
|
pairs = list(zip(
|
|
[f"{m.get('bundle_id','')}::{m.get('page_id','')}::{m.get('ordinal',0)}" for m in metas],
|
|
docs,
|
|
))
|
|
reranked = _rerank(query, pairs)
|
|
if reranked is not None:
|
|
# Re-sort docs/metas to match. Recompute distances as descending
|
|
# ordinal ranks so display still shows a useful score.
|
|
by_cid = {p[0]: i for i, p in enumerate(pairs)}
|
|
order = [by_cid[cid] for cid, _t in reranked if cid in by_cid]
|
|
docs = [docs[i] for i in order][:k]
|
|
metas = [metas[i] for i in order][:k]
|
|
dists = [1.0 - (rank / len(reranked)) for rank, _ in enumerate(reranked)][:len(docs)]
|
|
reranker_fired = True
|
|
else:
|
|
docs, metas, dists = docs[:k], metas[:k], dists[:k]
|
|
|
|
_call.set(hits_returned=len(docs), retrieval_mode=retrieval_mode,
|
|
top1_source=top1_source, reranker_fired=reranker_fired)
|
|
if not docs:
|
|
return f"_No matches for `{query}`._"
|
|
|
|
out = [f"# {len(docs)} result(s) for `{query}`", ""]
|
|
for doc, meta, dist in zip(docs, metas, dists):
|
|
bid = meta.get("bundle_id", "")
|
|
pid = meta.get("page_id", "")
|
|
title = meta.get("title") or pid
|
|
ver = meta.get("version") or ""
|
|
url = _source_url(bid, pid)
|
|
header = f"## {title}"
|
|
if ver:
|
|
header += f" _(v{ver})_"
|
|
out.append(header)
|
|
out.append(f"[{bid}/{pid}]({url}) · score={1 - dist:.3f}")
|
|
out.append("")
|
|
out.append(doc.strip())
|
|
out.append("")
|
|
return "\n".join(out)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_page(
|
|
bundle_id: Annotated[str, Field(description="Bundle slug.")],
|
|
page_id: Annotated[str, Field(description="Page filename within the bundle.")],
|
|
) -> str:
|
|
"""Return the full markdown for one page, plus a metadata header.
|
|
|
|
Use after search_docs surfaces a relevant page and the user (or you)
|
|
want the complete text — not just the matched chunks.
|
|
"""
|
|
with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
|
|
data = _read_page(bundle_id, page_id)
|
|
if data is None:
|
|
_call.set(found=False)
|
|
return f"Page not found: {bundle_id}/{page_id}"
|
|
md, meta = data
|
|
_call.set(found=True, page_chars=len(md))
|
|
title = meta.get("title") or page_id
|
|
ver = meta.get("version")
|
|
parent = meta.get("parent_title")
|
|
url = _source_url(bundle_id, page_id)
|
|
header = [f"# {title}"]
|
|
ctx = []
|
|
if ver:
|
|
ctx.append(f"version **{ver}**")
|
|
if parent:
|
|
ctx.append(f"in **{parent}**")
|
|
if ctx:
|
|
header.append("_" + " · ".join(ctx) + "_")
|
|
header.append(f"[source]({url})")
|
|
header.append("")
|
|
return "\n".join(header) + "\n" + md
|
|
|
|
|
|
@mcp.tool()
|
|
def list_versions() -> str:
|
|
"""List the available version/platform facets across all bundles.
|
|
|
|
Use this to discover valid filter values for search_docs.
|
|
"""
|
|
with TimedCall("list_versions", {}) as _call:
|
|
cat = _bundles()
|
|
if not cat:
|
|
return "_(no bundles indexed yet — run the scraper + indexer)_"
|
|
versions = sorted({b.get("version") for b in cat.values() if b.get("version")})
|
|
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")})
|
|
_call.set(versions=len(versions), platforms=len(platforms))
|
|
products = sorted({b.get("product") for b in cat.values() if b.get("product")})
|
|
lines = [f"# Facets across {len(cat)} bundle(s)", ""]
|
|
if versions:
|
|
lines += ["## Versions", ""] + [f"- `{v}`" for v in versions] + [""]
|
|
if platforms:
|
|
lines += ["## Platforms", ""] + [f"- `{p}`" for p in platforms] + [""]
|
|
if products:
|
|
lines += ["## Product / doc types", ""] + [f"- {p}" for p in products] + [""]
|
|
lines += ["## Bundles", ""]
|
|
for slug in sorted(cat):
|
|
b = cat[slug]
|
|
kind = b.get("product") or ""
|
|
ver = b.get("version")
|
|
pages = b.get("page_count", "?")
|
|
label = f"{kind} {ver}".strip() if ver else kind
|
|
lines.append(f"- `{slug}` — {label} ({pages} pages)")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stubs for later phases — keep the signatures in this file so refactors
|
|
# don't lose the contracts. Implementations come per phase.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# @mcp.tool() # Phase 9
|
|
# def list_cluster(bundle_id: str, page_id: str) -> str: ...
|
|
|
|
# @mcp.tool() # Phase 9
|
|
# def diff_versions(bundle_id: str, page_id: str, against_bundle_id: str, context: int = 3) -> str: ...
|
|
|
|
# @mcp.tool() # Phase 9
|
|
# def bundle_changelog(bundle_id_new: str, bundle_id_old: str, min_churn: int = 5, max_changed: int = 50) -> str: ...
|
|
|
|
# @mcp.tool() # Phase 13
|
|
# def weekly_digest(days: int = 7, version: str | None = None, platform: str | None = None, ...) -> str: ...
|
|
|
|
# @mcp.tool() # Phase 9 (or 3 — useful early)
|
|
# def corpus_status() -> str: ...
|
|
|
|
# @mcp.tool() # Phase 11
|
|
# def myproduct_api_lessons(topic: str | None = None) -> str: ...
|
|
|
|
# @mcp.tool() # Phase 12
|
|
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ...
|
|
|
|
# @mcp.tool() # Phase 12
|
|
# def submit_doc_bug(page_url: str, content: str, email: str | None = None, ...) -> str: ...
|
|
|
|
|
|
# ===========================================================================
|
|
# Entry point
|
|
# ===========================================================================
|
|
|
|
def main() -> None:
|
|
import argparse
|
|
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
|
|
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
|
|
default=os.environ.get("MCP_TRANSPORT", "stdio"))
|
|
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
|
|
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
|
|
args = p.parse_args()
|
|
|
|
if args.transport == "stdio":
|
|
mcp.run()
|
|
else:
|
|
mcp.settings.host = args.host
|
|
mcp.settings.port = args.port
|
|
# DNS-rebinding protection defaults to localhost-only — disable for
|
|
# container-network DNS hostnames. See PLAN.md "Hosting" notes.
|
|
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
|
|
mcp.settings.transport_security.enable_dns_rebinding_protection = False
|
|
mcp.run(transport=args.transport)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|