initial: docs-mcp-template — build guide + scaffolded server

Template for building hosted MCP servers over a product's public
documentation. Distilled from one production build; everything
product-specific has been factored out.

Contents:

- PLAN.md — comprehensive build guide. 13 phases from project
  skeleton through weekly_digest. Includes the gotchas
  ("fetch-depth: 0 always", reranker per-pair token limit,
  Cloudflare body cap, dash-not-bash on Gitea runners), the
  decisions worth carrying forward, and a per-product
  customization checklist.

- CLAUDE.md — guidance for Claude Code working in a clone of this
  template. Phase identification table, conventions (env-gating +
  operator confirmation for side-effecting tools, defensive
  fallback for retrieval components), common commands.

- README.md — quick-start summary.

Scaffolded code (all signature-stable, with NotImplementedError
stubs where phase-specific work is required):

  docs_mcp/server.py    FastMCP server, stateless_http=True, with
                        search_docs / get_page / list_versions
                        baseline tools and commented stubs for the
                        rest of the phase set.
  docs_mcp/usage.py     TimedCall telemetry, JSONL, daily rotation,
                        90-day retention. Reusable as-is.
  rag/embeddings.py     Ollama embedder (nomic-embed-text default),
                        load-balanced across N URLs. Reusable.
  rag/chunk.py          Paragraph-aware chunker with synthetic
                        chunk 0. Per-product tunable.
  rag/index.py          Chroma + BM25 builder. --rebuild and
                        --bm25-only flags.
  rag/bm25.py           SQLite FTS5 lexical index. Reusable.
  scrape/changelog.py   --cached / --ref / --json / --history-out.
                        Reusable.
  scrape/README.md      What you write per-product.
  eval/queries.jsonl.example
                        Curate ~25 hand-labeled queries here.
  eval/retrievers.py    Retriever protocol + stub classes.
  eval/run_eval.py      MRR / Recall@K / nDCG@K harness skeleton.
  scripts/usage_report.py
                        Standalone log analyzer; the
                        FOLLOW-UP CHECKS pattern noted in the
                        module docstring.
  scripts/registry_gc.py
                        Gitea container registry cleanup. Reusable.

Deployment + CI:

  Dockerfile               Python 3.12-slim; COPY corpus + chroma
                           + bm25 last for cache efficiency.
  deploy/docker-compose.yml MCP + reranker sidecar + Watchtower.
                           Templated with <placeholders>.
  .gitea/workflows/refresh.yml    Weekly cron + manual dispatch.
                                  fetch-depth: 0, retry-on-race,
                                  three-tag image scheme.
  .gitea/workflows/image-only.yml Code-only ship cycle, ~18min.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-22 09:18:17 -04:00
commit 9ba615c8ee
26 changed files with 3280 additions and 0 deletions
View File
+263
View File
@@ -0,0 +1,263 @@
"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies.
This file is the template's structural anchor. The phases described in
PLAN.md add or extend pieces of this file:
Phase 3 — search_docs, get_page, list_versions stubs (you are here)
Phase 6 — reranker integration in search_docs
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse)
Phase 9 — diff_versions, list_cluster, bundle_changelog
Phase 10 — TimedCall wiring (already imported below)
Phase 11 — <product>_api_lessons tool
Phase 12 — find_doc_inconsistencies, submit_doc_bug
Phase 13 — weekly_digest + _digest_history reader
Every stub below has a docstring + `raise NotImplementedError`. Replace
the body when you reach the corresponding phase. Keep the signatures
stable across products — clients depend on them.
"""
from __future__ import annotations
import json
import logging
import os
import re
from pathlib import Path
from typing import Annotated
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from .usage import TimedCall
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Product-specific configuration. Set these for each new build.
# ---------------------------------------------------------------------------
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "myproduct")
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://docs.example.com")
COLLECTION = f"{PRODUCT_NAME}_docs"
# Paths inside the deployed container (and matching layout locally for dev).
ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma"
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
BUNDLES_JSON = ROOT / "bundles.json"
# ---------------------------------------------------------------------------
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase).
# ---------------------------------------------------------------------------
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
RRF_K = int(os.environ.get("RRF_K", "60"))
DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on")
DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint
DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15"))
# ---------------------------------------------------------------------------
# FastMCP setup.
#
# stateless_http=True — every request creates an ephemeral session and
# discards it on return. Critical for production: clients don't get
# 404 storms when the container is recreated by Watchtower.
# ---------------------------------------------------------------------------
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
# ---------------------------------------------------------------------------
# Lazy helpers — instantiate expensive things only when actually needed,
# so the server still starts when (e.g.) Ollama is briefly unreachable.
# ---------------------------------------------------------------------------
def _bundles() -> dict[str, dict]:
"""Cached load of bundles.json into a {slug: bundle_dict} mapping.
bundles.json is the product-specific catalog written by the Phase 1
scraper. See PLAN.md Phase 1 for the schema.
"""
if not BUNDLES_JSON.exists():
return {}
cat = json.loads(BUNDLES_JSON.read_text())
return {b["slug"]: b for b in cat}
def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
"""Translate filter args into a Chroma `where` clause."""
conds: list[dict] = []
if version:
conds.append({"version": version})
if platform:
conds.append({"platform": platform})
if bundle_id:
conds.append({"bundle_id": bundle_id})
if not conds:
return None
if len(conds) == 1:
return conds[0]
return {"$and": conds}
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict)."""
md_path = CORPUS / bundle_id / (page_id + ".md")
json_path = CORPUS / bundle_id / (page_id + ".json")
if not md_path.exists() or not json_path.exists():
return None
return md_path.read_text(), json.loads(json_path.read_text())
# ===========================================================================
# Tools
# ===========================================================================
@mcp.tool()
def search_docs(
query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")],
version: Annotated[
str | None,
Field(description="OPTIONAL version filter — restrict to one product version."),
] = None,
platform: Annotated[
str | None,
Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."),
] = None,
bundle_id: Annotated[
str | None,
Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."),
] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str:
"""Search the {product} docs corpus.
Returns the top-k most relevant chunks (with full source page URLs)
given a natural-language query. Optional filters narrow the search
to one version, one platform, or one bundle. Use list_versions()
first if you need to discover the available facet values.
Call this tool whenever the user asks anything that should be
answerable from the official product documentation.
"""
with TimedCall("search_docs", {
"query": query, "version": version, "platform": platform,
"bundle_id": bundle_id, "k": k,
}) as _call:
# TODO Phase 2-3: query Chroma collection (see rag/index.py for
# how it was built). Render the top-k chunks as markdown with
# source URLs.
# TODO Phase 6: optional reranker via _rerank() if RERANK_URL set.
# TODO Phase 8: hybrid retrieval if HYBRID_SEARCH=true — run
# dense + BM25 in parallel, RRF-fuse, hand merged pool to rerank.
_call.set(hits_returned=0)
raise NotImplementedError("Phase 2/3: implement Chroma query + rendering")
@mcp.tool()
def get_page(
bundle_id: Annotated[str, Field(description="Bundle slug.")],
page_id: Annotated[str, Field(description="Page filename within the bundle.")],
) -> str:
"""Return the full markdown for one page, plus a metadata header.
Use after search_docs surfaces a relevant page and the user (or you)
want the complete text — not just the matched chunks.
"""
with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
data = _read_page(bundle_id, page_id)
if data is None:
_call.set(found=False)
return f"Page not found: {bundle_id}/{page_id}"
md, meta = data
_call.set(found=True, page_chars=len(md))
# TODO: add a metadata header (title, version, source URL) above
# the body. Product-specific shape.
return md
@mcp.tool()
def list_versions() -> str:
"""List the available version/platform facets across all bundles.
Use this to discover valid filter values for search_docs.
"""
with TimedCall("list_versions", {}) as _call:
cat = _bundles()
if not cat:
return "_(no bundles indexed yet — run the scraper + indexer)_"
versions = sorted({b.get("version") for b in cat.values() if b.get("version")})
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")})
_call.set(versions=len(versions), platforms=len(platforms))
lines = [f"# Facets across {len(cat)} bundle(s)", ""]
if versions:
lines.append("## Versions"); lines.append("")
for v in versions: lines.append(f"- `{v}`")
lines.append("")
if platforms:
lines.append("## Platforms"); lines.append("")
for p in platforms: lines.append(f"- `{p}`")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Stubs for later phases — keep the signatures in this file so refactors
# don't lose the contracts. Implementations come per phase.
# ---------------------------------------------------------------------------
# @mcp.tool() # Phase 9
# def list_cluster(bundle_id: str, page_id: str) -> str: ...
# @mcp.tool() # Phase 9
# def diff_versions(bundle_id: str, page_id: str, against_bundle_id: str, context: int = 3) -> str: ...
# @mcp.tool() # Phase 9
# def bundle_changelog(bundle_id_new: str, bundle_id_old: str, min_churn: int = 5, max_changed: int = 50) -> str: ...
# @mcp.tool() # Phase 13
# def weekly_digest(days: int = 7, version: str | None = None, platform: str | None = None, ...) -> str: ...
# @mcp.tool() # Phase 9 (or 3 — useful early)
# def corpus_status() -> str: ...
# @mcp.tool() # Phase 11
# def myproduct_api_lessons(topic: str | None = None) -> str: ...
# @mcp.tool() # Phase 12
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ...
# @mcp.tool() # Phase 12
# def submit_doc_bug(page_url: str, content: str, email: str | None = None, ...) -> str: ...
# ===========================================================================
# Entry point
# ===========================================================================
def main() -> None:
import argparse
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
default=os.environ.get("MCP_TRANSPORT", "stdio"))
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
args = p.parse_args()
if args.transport == "stdio":
mcp.run()
else:
mcp.settings.host = args.host
mcp.settings.port = args.port
# DNS-rebinding protection defaults to localhost-only — disable for
# container-network DNS hostnames. See PLAN.md "Hosting" notes.
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
mcp.settings.transport_security.enable_dns_rebinding_protection = False
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()
+127
View File
@@ -0,0 +1,127 @@
"""Per-call usage telemetry — JSONL with daily rotation and retention.
Reusable as-is across products. Drop the import + `with TimedCall(...)`
into any tool body and the call gets logged with the tool name, args,
elapsed time, and any extra fields the tool sets via `_call.set(...)`.
The log file is `var/logs/usage.jsonl` by default (override with the
`USAGE_LOG_DIR` env). Daily rotation; files older than
`USAGE_LOG_KEEP_DAYS` (default 90) are deleted on next write.
Layout of one record:
{
"ts": "2026-05-22T13:14:15+00:00",
"tool": "search_docs",
"args": {"query": "...", "version": "10.9", "k": 10},
"elapsed_ms": 142.5,
"hits_returned": 7, # optional, set by the tool
"reranked": true, # optional, set by the tool
// ... any other key the tool sets via _call.set(...)
}
"""
from __future__ import annotations
import json
import os
import time
import threading
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
USAGE_LOG_DIR = Path(os.environ.get("USAGE_LOG_DIR", "var/logs"))
USAGE_LOG_KEEP_DAYS = int(os.environ.get("USAGE_LOG_KEEP_DAYS", "90"))
# Single global lock to serialize writes from multiple request handlers.
# JSONL appends are atomic at the OS level for short records on most
# filesystems, but the lock is cheap and saves you from cross-platform
# surprises.
_lock = threading.Lock()
_last_rotation_check: float = 0.0
def _maybe_rotate() -> None:
"""Move usage.jsonl → usage.jsonl.<yesterday> if the date has rolled.
Cheap to call; we only do filesystem work when a day has actually
passed since the last check.
"""
global _last_rotation_check
now = time.time()
if now - _last_rotation_check < 300: # 5 min cap between checks
return
_last_rotation_check = now
USAGE_LOG_DIR.mkdir(parents=True, exist_ok=True)
active = USAGE_LOG_DIR / "usage.jsonl"
if active.exists():
try:
mtime = datetime.fromtimestamp(active.stat().st_mtime, tz=timezone.utc).date()
today = datetime.now(timezone.utc).date()
if mtime < today:
rotated = USAGE_LOG_DIR / f"usage.jsonl.{mtime.isoformat()}"
if not rotated.exists():
active.rename(rotated)
except OSError:
pass
# Retention: delete usage.jsonl.YYYY-MM-DD files older than the
# retention window. The active file is never deleted by this.
cutoff = datetime.now(timezone.utc).date() - timedelta(days=USAGE_LOG_KEEP_DAYS)
for f in USAGE_LOG_DIR.glob("usage.jsonl.*"):
try:
datestamp = f.name.split(".", 2)[-1]
if datetime.fromisoformat(datestamp).date() < cutoff:
f.unlink()
except (ValueError, OSError):
continue
class TimedCall:
"""Context manager that captures one tool call's telemetry record.
Usage:
with TimedCall("search_docs", {"query": q, ...}) as call:
... do the work ...
call.set(hits_returned=len(results), reranked=True)
On exit, writes one JSONL record to usage.jsonl. Exceptions are
captured into the `error` field; the exception is re-raised so
the tool's caller sees the failure.
"""
def __init__(self, tool: str, args: dict[str, Any]):
self.tool = tool
self.args = args
self.extra: dict[str, Any] = {}
self._t0: float = 0.0
def set(self, **kwargs: Any) -> None:
"""Attach extra fields to the eventual telemetry record."""
self.extra.update(kwargs)
def __enter__(self) -> "TimedCall":
self._t0 = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
elapsed_ms = (time.perf_counter() - self._t0) * 1000.0
record: dict[str, Any] = {
"ts": datetime.now(timezone.utc).isoformat(),
"tool": self.tool,
"args": self.args,
"elapsed_ms": round(elapsed_ms, 2),
}
if exc_type is not None:
record["error"] = f"{exc_type.__name__}: {exc_val}"
record.update(self.extra)
_maybe_rotate()
with _lock:
USAGE_LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(USAGE_LOG_DIR / "usage.jsonl", "a") as fh:
fh.write(json.dumps(record, separators=(",", ":")) + "\n")
# Don't swallow the exception — the caller still needs to see it.