Files
morpheus-docs/docs_mcp/server.py
T
justin fa448f94e1 build out morpheus-docs MCP stack, mirroring hvm-docs through Phases 1-13
Initial scaffold: the docs-mcp-template clone with all the
HVM-validated stack ported across, customized for Morpheus
Enterprise (PRODUCT_NAME=morpheus, server name morpheus-docs).

Bundles (live-discovered 2026-05-22; 1710 cataloged pages total):
* morpheus_user_manual_8_1_0  sd00007510en_us  568 pages (Feb 2026)
* morpheus_user_manual_8_1_1  sd00007621en_us  569 pages (Mar 2026)
* morpheus_user_manual_8_1_2  sd00007732en_us  569 pages (Apr 2026)
* morpheus_release_notes_8_1_0  sd00007496en_us  single-doc
* morpheus_release_notes_8_1_1  sd00007610en_us  single-doc
* morpheus_release_notes_8_1_2  sd00007733en_us  single-doc
* morpheus_quickspecs            a50009231enw     html-file (live
  curl_cffi against www.hpe.com; all 12+ Enterprise SKUs captured —
  S6E64..S6E73AAE for new/renewal/upgrade × 1/3/5-yr terms, plus
  services SKUs HA124A1#V38/V39 and H46SBA1).

No Deployment Guide or Qualification Matrix on HPE Support for
Morpheus Enterprise specifically — the only QM (sd00006551en_us)
covers HVM clusters managed by Morpheus and lives in hvm-docs.

Stack carried forward from hvm-docs:
* rag/{index,chunk,embeddings,bm25}.py — including the
  MAX_CHARS=4000 chunk-cap fix for table-dense content
* docs_mcp/{server,usage}.py — 11 MCP tools, BM25-default search,
  cross-encoder rerank, hybrid behind HYBRID_SEARCH=true,
  morpheus_api_lessons (renamed from hvm_api_lessons), env-gated
  submit_doc_bug
* docs_mcp/api_lessons.md — Morpheus-specific scaffold covering
  licensing model, HVM elevation path, REST vs Plugin API, with
  TODO markers for sections to flesh out from real ops experience
* scrape/{runner,quickspecs,changelog,bundles}.py — TOC + single-doc
  + html-file modes, curl_cffi Chrome120 for www.hpe.com edge bypass
* eval/{retrievers,run_eval}.py + queries.jsonl scaffold (4 placeholder
  queries; populate after first scrape)
* scripts/{rerank_server,usage_report,registry_gc}.py
* .gitea/workflows/{refresh,image-only}.yml — same Gitea Actions
  setup zerto-docs uses (push LAN, pull public-URL, GPU Ollama pool)
* deploy/docker-compose.yml — morpheus-docs-mcp service definition,
  shared jina-rerank sidecar, Watchtower-labeled
* Dockerfile, requirements.txt, requirements-rerank.txt

Verified locally: scrape produced 1599 .md pages (some TOC entries
are parent-only and yield no body), 6353 chunks all under the 4 KB
cap, MCP server boots and lists 11 tools cleanly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 15:26:24 -04:00

1301 lines
58 KiB
Python

"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies.
This file is the template's structural anchor. The phases described in
PLAN.md add or extend pieces of this file:
Phase 3 — search_docs, get_page, list_versions stubs (you are here)
Phase 6 — reranker integration in search_docs
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse)
Phase 9 — diff_versions, list_cluster, bundle_changelog
Phase 10 — TimedCall wiring (already imported below)
Phase 11 — <product>_api_lessons tool
Phase 12 — find_doc_inconsistencies, submit_doc_bug
Phase 13 — weekly_digest + _digest_history reader
Every stub below has a docstring + `raise NotImplementedError`. Replace
the body when you reach the corresponding phase. Keep the signatures
stable across products — clients depend on them.
"""
from __future__ import annotations
import datetime as _dt
import difflib
import json
import logging
import os
import re
from pathlib import Path
from typing import Annotated
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from .usage import TimedCall
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Product-specific configuration. Set these for each new build.
# ---------------------------------------------------------------------------
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "morpheus")
PRODUCT_DOCS_URL = os.environ.get(
"PRODUCT_DOCS_URL",
"https://support.hpe.com/hpesc/public/docDisplay?docId=sd00007732en_us",
)
COLLECTION = f"{PRODUCT_NAME}_docs"
# Paths inside the deployed container (and matching layout locally for dev).
ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma"
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
BUNDLES_JSON = ROOT / "bundles.json"
DIGEST_HISTORY_PATH = CORPUS / ".digest" / "history.jsonl"
API_LESSONS_MD = Path(__file__).resolve().parent / "api_lessons.md"
# ---------------------------------------------------------------------------
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase).
# ---------------------------------------------------------------------------
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
RRF_K = int(os.environ.get("RRF_K", "60"))
DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on")
DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint
DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15"))
# ---------------------------------------------------------------------------
# FastMCP setup.
#
# stateless_http=True — every request creates an ephemeral session and
# discards it on return. Critical for production: clients don't get
# 404 storms when the container is recreated by Watchtower.
# ---------------------------------------------------------------------------
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
# ---------------------------------------------------------------------------
# Lazy helpers — instantiate expensive things only when actually needed,
# so the server still starts when (e.g.) Ollama is briefly unreachable.
# ---------------------------------------------------------------------------
def _bundles() -> dict[str, dict]:
"""Cached load of bundles.json into a {slug: bundle_dict} mapping.
bundles.json is the product-specific catalog written by the Phase 1
scraper. See PLAN.md Phase 1 for the schema.
"""
if not BUNDLES_JSON.exists():
return {}
cat = json.loads(BUNDLES_JSON.read_text())
return {b["slug"]: b for b in cat}
def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
"""Translate filter args into a Chroma `where` clause."""
conds: list[dict] = []
if version:
conds.append({"version": version})
if platform:
conds.append({"platform": platform})
if bundle_id:
conds.append({"bundle_id": bundle_id})
if not conds:
return None
if len(conds) == 1:
return conds[0]
return {"$and": conds}
def _where_for_bm25(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
"""BM25Index.query takes a flat dict of equality filters."""
w: dict[str, str] = {}
if version: w["version"] = version
if platform: w["platform"] = platform
if bundle_id: w["bundle_id"] = bundle_id
return w or None
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict)."""
md_path = CORPUS / bundle_id / (page_id + ".md")
json_path = CORPUS / bundle_id / (page_id + ".json")
if not md_path.exists() or not json_path.exists():
return None
return md_path.read_text(), json.loads(json_path.read_text())
_CHROMA = None
_BM25 = None
def _collection():
"""Lazy Chroma collection handle. Cached after first call."""
global _CHROMA
if _CHROMA is None:
import chromadb
from chromadb.config import Settings
from rag.embeddings import embedding_function
client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
_CHROMA = client.get_collection(COLLECTION, embedding_function=embedding_function())
return _CHROMA
def _bm25():
"""Lazy BM25Index handle. None if the FTS5 db isn't built."""
global _BM25
if _BM25 is None:
if not BM25_DB.exists():
return None
try:
from rag.bm25 import BM25Index
_BM25 = BM25Index(str(BM25_DB))
except Exception as e: # defensive: hybrid must never block dense
log.warning("BM25 unavailable, falling back to dense-only: %s", e)
return None
return _BM25
def _enrich_from_chroma(col, chunk_ids: list[str], fused: list | None) -> tuple[list[str], list[dict], list[float]]:
"""Fetch document text + metadata for a list of chunk ids from Chroma, in order."""
if not chunk_ids:
return [], [], []
g = col.get(ids=chunk_ids, include=["documents", "metadatas"])
by_id = {i: (d, m) for i, d, m in zip(g["ids"], g["documents"], g["metadatas"])}
docs = [by_id[i][0] for i in chunk_ids if i in by_id]
metas = [by_id[i][1] for i in chunk_ids if i in by_id]
if fused is not None:
dists = [1.0 - score for _id, score, _src in fused[:len(docs)]]
else:
dists = [0.0] * len(docs)
return docs, metas, dists
def _rerank(query: str, candidates: list[tuple[str, str]]) -> list[tuple[str, str]] | None:
"""POST to RERANK_URL /v1/rerank, return candidates re-ordered by relevance.
`candidates` is `[(chunk_id, text), ...]`. Texts are truncated to ~2000 chars
before sending so we never blow past jina-reranker's 1024-token per-pair
cap (which 400s the entire batch). The full untruncated text still goes
back to the user from Chroma; truncation is reranking-only.
Returns None on any failure — caller treats that as "skip reranking,
keep retrieval-order candidates."
"""
if not RERANK_URL or not candidates:
return None
try:
import httpx
payload = {
"query": query,
"documents": [(text or "")[:2000] for _cid, text in candidates],
"top_n": len(candidates),
}
with httpx.Client(timeout=RERANK_TIMEOUT) as c:
r = c.post(f"{RERANK_URL}/v1/rerank", json=payload)
r.raise_for_status()
results = r.json().get("results") or []
order = [candidates[item["index"]] for item in results
if isinstance(item.get("index"), int) and 0 <= item["index"] < len(candidates)]
return order or None
except Exception as e:
log.warning("rerank failed, keeping retrieval order: %s", e)
return None
def _rrf_fuse(*ranked_lists: list[str], k: int = RRF_K) -> list[tuple[str, float, dict]]:
"""Reciprocal Rank Fusion. Each ranked list is a sequence of ids in
descending relevance. Returns [(id, fused_score, per_retriever_contrib), ...]
sorted by score desc."""
scores: dict[str, float] = {}
sources: dict[str, dict] = {}
names = ("dense", "bm25", "extra")
for idx, lst in enumerate(ranked_lists):
src = names[idx] if idx < len(names) else f"r{idx}"
for rank, ident in enumerate(lst, start=1):
scores[ident] = scores.get(ident, 0.0) + 1.0 / (k + rank)
sources.setdefault(ident, {})[src] = rank
ranked = sorted(scores.items(), key=lambda kv: -kv[1])
return [(ident, score, sources[ident]) for ident, score in ranked]
def _source_url(bundle_id: str, page_id: str) -> str:
"""Build the canonical docs portal URL for a (bundle, page) pair."""
b = _bundles().get(bundle_id)
if not b:
return ""
doc_id = b.get("doc_id", "")
if page_id.startswith("GUID-"):
return f"https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}&page={page_id}.html"
return f"https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}"
# ===========================================================================
# Tools
# ===========================================================================
@mcp.tool()
def search_docs(
query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")],
version: Annotated[
str | None,
Field(description="OPTIONAL version filter — restrict to one product version."),
] = None,
platform: Annotated[
str | None,
Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."),
] = None,
bundle_id: Annotated[
str | None,
Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."),
] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str:
"""Search the HPE Morpheus Enterprise (Morpheus) docs corpus.
Returns the top-k most relevant chunks (with full source page URLs)
given a natural-language query. Optional filters narrow the search
to one version, one platform, or one bundle. Use list_versions()
first if you need to discover the available facet values.
Call this tool whenever the user asks anything that should be
answerable from the official product documentation — install,
upgrade, configuration, backups, networking, HVM clusters, the
Morpheus UI, or any 8.1.x release-notes question.
"""
with TimedCall("search_docs", {
"query": query, "version": version, "platform": platform,
"bundle_id": bundle_id, "k": k,
}) as _call:
try:
col = _collection()
except Exception as e:
log.exception("chroma collection unavailable")
_call.set(hits_returned=0, error=str(e))
return f"_(search backend unavailable: {e})_"
where = _build_where(version, platform, bundle_id)
bm25_where = _where_for_bm25(version, platform, bundle_id)
pool = max(k * 5, 50)
# Retrieval mode selection. Eval on this corpus (2026-05-22, 22 golden
# queries) showed BM25 MRR=0.88 vs dense MRR=0.54 vs hybrid MRR=0.69 —
# HPE structured docs use controlled vocabulary, so lexical match wins.
# Dense is kept as fallback when BM25 has no tokens to chew on (e.g.
# purely stopword queries). HYBRID_SEARCH=true forces RRF fusion.
bm = _bm25()
docs: list[str] = []
metas: list[dict] = []
dists: list[float] = []
retrieval_mode = "dense"
top1_source = "dense_only"
if HYBRID_SEARCH and bm is not None:
try:
dense_res = col.query(query_texts=[query], n_results=pool, where=where)
dense_ids = (dense_res.get("ids") or [[]])[0]
bm_hits = bm.query(query, n=pool, where=bm25_where)
bm_ids = [cid for cid, _s in bm_hits]
fused = _rrf_fuse(dense_ids, bm_ids)
docs, metas, dists = _enrich_from_chroma(col, [c for c, _, _ in fused[:k]], fused)
if fused:
src0 = fused[0][2]
top1_source = ("both" if {"dense", "bm25"} <= set(src0)
else "bm25_only" if "bm25" in src0
else "dense_only")
retrieval_mode = "hybrid"
except Exception as e:
log.warning("hybrid failed, falling back to BM25→dense: %s", e)
if not docs and bm is not None:
try:
bm_hits = bm.query(query, n=k, where=bm25_where)
if bm_hits:
ids = [cid for cid, _s in bm_hits[:k]]
docs, metas, _ = _enrich_from_chroma(col, ids, None)
# FTS5 returns negative scores (lower=better). Map onto a
# similarity-ish [0..1] just for display.
dists = [max(0.0, min(1.0, 1.0 - abs(s) / 20.0)) for _id, s in bm_hits[:k]]
retrieval_mode = "bm25"
top1_source = "bm25_only"
except Exception as e:
log.warning("BM25 retrieval failed, falling back to dense: %s", e)
if not docs:
res = col.query(query_texts=[query], n_results=k, where=where)
docs = (res.get("documents") or [[]])[0]
metas = (res.get("metadatas") or [[]])[0]
dists = (res.get("distances") or [[]])[0]
reranker_fired = False
if RERANK_URL and docs:
# Pull a deeper pool to give the reranker something to chew on.
# We over-fetch up to RERANK_POOL chunks from whichever retriever
# already won, then ask the reranker to pick the final top-k.
pool_size = max(k, RERANK_POOL)
if len(docs) < pool_size:
if retrieval_mode == "bm25":
extra = bm.query(query, n=pool_size, where=bm25_where) if bm else []
extra_ids = [cid for cid, _s in extra]
else:
extra_res = col.query(query_texts=[query], n_results=pool_size, where=where)
extra_ids = (extra_res.get("ids") or [[]])[0]
if extra_ids:
d2, m2, _ = _enrich_from_chroma(col, extra_ids, None)
docs, metas = d2, m2
dists = [0.0] * len(docs)
# Reranker scores chunk_ids — collapse to (id, text) tuples
pairs = list(zip(
[f"{m.get('bundle_id','')}::{m.get('page_id','')}::{m.get('ordinal',0)}" for m in metas],
docs,
))
reranked = _rerank(query, pairs)
if reranked is not None:
# Re-sort docs/metas to match. Recompute distances as descending
# ordinal ranks so display still shows a useful score.
by_cid = {p[0]: i for i, p in enumerate(pairs)}
order = [by_cid[cid] for cid, _t in reranked if cid in by_cid]
docs = [docs[i] for i in order][:k]
metas = [metas[i] for i in order][:k]
dists = [1.0 - (rank / len(reranked)) for rank, _ in enumerate(reranked)][:len(docs)]
reranker_fired = True
else:
docs, metas, dists = docs[:k], metas[:k], dists[:k]
_call.set(hits_returned=len(docs), retrieval_mode=retrieval_mode,
top1_source=top1_source, reranker_fired=reranker_fired)
if not docs:
return f"_No matches for `{query}`._"
out = [f"# {len(docs)} result(s) for `{query}`", ""]
for doc, meta, dist in zip(docs, metas, dists):
bid = meta.get("bundle_id", "")
pid = meta.get("page_id", "")
title = meta.get("title") or pid
ver = meta.get("version") or ""
url = _source_url(bid, pid)
header = f"## {title}"
if ver:
header += f" _(v{ver})_"
out.append(header)
out.append(f"[{bid}/{pid}]({url}) · score={1 - dist:.3f}")
out.append("")
out.append(doc.strip())
out.append("")
return "\n".join(out)
@mcp.tool()
def get_page(
bundle_id: Annotated[str, Field(description="Bundle slug.")],
page_id: Annotated[str, Field(description="Page filename within the bundle.")],
) -> str:
"""Return the full markdown for one page, plus a metadata header.
Use after search_docs surfaces a relevant page and the user (or you)
want the complete text — not just the matched chunks.
"""
with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
data = _read_page(bundle_id, page_id)
if data is None:
_call.set(found=False)
return f"Page not found: {bundle_id}/{page_id}"
md, meta = data
_call.set(found=True, page_chars=len(md))
title = meta.get("title") or page_id
ver = meta.get("version")
parent = meta.get("parent_title")
url = _source_url(bundle_id, page_id)
header = [f"# {title}"]
ctx = []
if ver:
ctx.append(f"version **{ver}**")
if parent:
ctx.append(f"in **{parent}**")
if ctx:
header.append("_" + " · ".join(ctx) + "_")
header.append(f"[source]({url})")
header.append("")
return "\n".join(header) + "\n" + md
@mcp.tool()
def list_versions() -> str:
"""List the available version/platform facets across all bundles.
Use this to discover valid filter values for search_docs.
"""
with TimedCall("list_versions", {}) as _call:
cat = _bundles()
if not cat:
return "_(no bundles indexed yet — run the scraper + indexer)_"
versions = sorted({b.get("version") for b in cat.values() if b.get("version")})
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")})
_call.set(versions=len(versions), platforms=len(platforms))
products = sorted({b.get("product") for b in cat.values() if b.get("product")})
lines = [f"# Facets across {len(cat)} bundle(s)", ""]
if versions:
lines += ["## Versions", ""] + [f"- `{v}`" for v in versions] + [""]
if platforms:
lines += ["## Platforms", ""] + [f"- `{p}`" for p in platforms] + [""]
if products:
lines += ["## Product / doc types", ""] + [f"- {p}" for p in products] + [""]
lines += ["## Bundles", ""]
for slug in sorted(cat):
b = cat[slug]
kind = b.get("product") or ""
ver = b.get("version")
pages = b.get("page_count", "?")
label = f"{kind} {ver}".strip() if ver else kind
lines.append(f"- `{slug}` — {label} ({pages} pages)")
return "\n".join(lines)
# ===========================================================================
# Phase 9 — cross-version tools
# ===========================================================================
def _bundle_pages(bundle_id: str) -> set[str]:
"""Page IDs (= GUID-XXXX) on disk in a bundle. Mirrors rag.index's md_path.stem."""
bd = CORPUS / bundle_id
if not bd.is_dir():
return set()
return {p.stem for p in bd.glob("*.md")}
def _diff_churn(a: str, b: str) -> tuple[int, int]:
"""Cheap (added, removed) line counts for a pair of markdown bodies."""
diff = difflib.unified_diff(a.splitlines(keepends=False),
b.splitlines(keepends=False), n=0)
added = removed = 0
for line in diff:
if line.startswith(("+++", "---", "@@")):
continue
if line.startswith("+"):
added += 1
elif line.startswith("-"):
removed += 1
return added, removed
@mcp.tool()
def list_cluster(
bundle_id: Annotated[str, Field(description="Bundle slug of the source topic.")],
page_id: Annotated[str, Field(description="Page id (GUID-XXXX) of the source topic.")],
) -> str:
"""List cross-version peers of a topic in the HVM docs.
HPE re-mints the docId per product version but keeps page GUIDs stable,
so the scrape pipeline synthesizes `topic_cluster.clustered_topics`
from same-GUID overlap (374/376/376 pages overlap across 8.1.0/.1/.2).
"""
with TimedCall("list_cluster", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
out = _read_page(bundle_id, page_id)
if out is None:
_call.set(found=False)
return f"Not found: {bundle_id}/{page_id}"
_, side = out
cluster = side.get("topic_cluster") or {}
peers = cluster.get("clustered_topics") or []
_call.set(hits_returned=len(peers))
src_label = cluster.get("clustering_title") or side.get("title") or page_id
lines = [f"# Cluster for {bundle_id}/{page_id} ({src_label})", ""]
if not peers:
lines.append("_No peer topics in cluster._")
return "\n".join(lines)
for p in peers:
lines.append(f"- `{p['bundle_id']}/{p['page_id']}` — {p.get('clustering_title') or ''}")
return "\n".join(lines)
@mcp.tool()
def diff_versions(
bundle_id: Annotated[str, Field(description="Bundle slug of the source topic (the 'new' side).")],
page_id: Annotated[str, Field(description="Page id of the source topic.")],
against_bundle_id: Annotated[str, Field(description="Bundle slug to diff against. Must be in the source's cluster, or share the same page_id.")],
context: Annotated[int, Field(description="Lines of context around each hunk.", ge=0, le=10)] = 3,
) -> str:
"""Unified diff of one topic between two bundles (typically two HVM versions).
Two matching strategies, tried in order:
1. `topic_cluster` peer (synthesized from same-GUID overlap by the scraper).
2. Same `page_id` fallback (works because GUIDs are stable across HVM versions).
"""
with TimedCall("diff_versions", {
"bundle_id": bundle_id, "page_id": page_id,
"against_bundle_id": against_bundle_id, "context": context,
}) as _call:
src = _read_page(bundle_id, page_id)
if src is None:
_call.set(matched_via=None, reason="source_not_found")
return f"Source not found: {bundle_id}/{page_id}"
src_md, side = src
cluster = side.get("topic_cluster") or {}
peers = {p["bundle_id"]: p for p in (cluster.get("clustered_topics") or [])}
peer = peers.get(against_bundle_id)
if peer is not None:
peer_page_id = peer["page_id"]
matched_via = "topic_cluster"
elif _read_page(against_bundle_id, page_id) is not None:
peer_page_id = page_id
matched_via = "filename"
else:
_call.set(matched_via=None, reason="no_peer")
valid = list(peers) or ["(no peers)"]
return (f"No match for {bundle_id}/{page_id} in {against_bundle_id}.\n"
f"- No cluster peer. Available peers: {valid}\n"
f"- No page {page_id!r} in {against_bundle_id} either.")
_call.set(matched_via=matched_via)
peer_data = _read_page(against_bundle_id, peer_page_id)
if peer_data is None:
return f"Peer not found in corpus: {against_bundle_id}/{peer_page_id}"
peer_md, _ = peer_data
diff = difflib.unified_diff(peer_md.splitlines(keepends=True),
src_md.splitlines(keepends=True),
fromfile=f"{against_bundle_id}/{peer_page_id}",
tofile=f"{bundle_id}/{page_id}",
n=context)
body = "".join(diff)
header = f"_matched via {matched_via}_\n\n"
if not body.strip():
return header + f"No differences between {bundle_id}/{page_id} and {against_bundle_id}/{peer_page_id}."
return header + f"```diff\n{body}```"
@mcp.tool()
def bundle_changelog(
bundle_id_new: Annotated[str, Field(description="New-side bundle slug, e.g. 'hvm_user_manual_8_1_2'.")],
bundle_id_old: Annotated[str, Field(description="Old-side bundle slug, e.g. 'hvm_user_manual_8_1_1'.")],
min_churn: Annotated[int, Field(description="Min (added + removed) lines to flag a page as changed.", ge=1, le=1000)] = 5,
max_changed: Annotated[int, Field(description="Max changed pages to list (sorted by churn desc).", ge=1, le=500)] = 50,
) -> str:
"""High-level diff between two HVM bundles.
Lists pages added, removed, and changed between an old bundle and a
new one. Match is by page_id (which is the stable GUID — same GUID
across versions = same topic). Use after `list_versions` to discover
valid bundle slugs.
"""
with TimedCall("bundle_changelog", {
"bundle_id_new": bundle_id_new, "bundle_id_old": bundle_id_old,
"min_churn": min_churn, "max_changed": max_changed,
}) as _call:
new_pages = _bundle_pages(bundle_id_new)
old_pages = _bundle_pages(bundle_id_old)
if not new_pages and not old_pages:
_call.set(reason="both_empty")
return f"Neither bundle has pages on disk: {bundle_id_new}, {bundle_id_old}"
if not new_pages:
return f"Bundle not found or empty: {bundle_id_new}"
if not old_pages:
return f"Bundle not found or empty: {bundle_id_old}"
added = sorted(new_pages - old_pages)
removed = sorted(old_pages - new_pages)
common = sorted(new_pages & old_pages)
changed: list[tuple[str, int, int]] = []
for pid in common:
n = _read_page(bundle_id_new, pid)
o = _read_page(bundle_id_old, pid)
if n is None or o is None:
continue
a_lines, r_lines = _diff_churn(o[0], n[0])
if a_lines + r_lines >= min_churn:
changed.append((pid, a_lines, r_lines))
changed.sort(key=lambda t: -(t[1] + t[2]))
_call.set(added=len(added), removed=len(removed),
changed=len(changed), unchanged=len(common) - len(changed))
lines = [
f"# Bundle changelog: {bundle_id_new} vs {bundle_id_old}", "",
f"- pages in new: **{len(new_pages)}**",
f"- pages in old: **{len(old_pages)}**",
f"- common: **{len(common)}**",
f"- **added** (in new only): {len(added)}",
f"- **removed** (in old only): {len(removed)}",
f"- **changed** (≥{min_churn} lines): {len(changed)} of {len(common)} common",
f"- unchanged: {len(common) - len(changed)}", "",
]
if added:
lines += [f"## Added pages ({len(added)})", *(f"- `{p}`" for p in added), ""]
if removed:
lines += [f"## Removed pages ({len(removed)})", *(f"- `{p}`" for p in removed), ""]
if changed:
shown = changed[:max_changed]
lines += [
f"## Changed pages — top {len(shown)} of {len(changed)} by churn", "",
"| page | +lines | -lines | total |", "|---|---|---|---|",
]
for p, a, r in shown:
lines.append(f"| `{p}` | +{a} | -{r} | {a + r} |")
if len(changed) > max_changed:
lines.append(f"\n_({len(changed) - max_changed} more changed pages omitted; raise `max_changed` to see them.)_")
lines.append("\nInspect a specific page: `diff_versions(bundle_id_new, page_id, bundle_id_old)`.")
return "\n".join(lines)
# ===========================================================================
# Phase 13 — weekly digest from corpus/.digest/history.jsonl (built in CI)
# ===========================================================================
_digest_cache: list[dict] | None = None
def _digest_history() -> list[dict]:
"""Lazy load of the digest history JSONL written by scrape.changelog at CI time."""
global _digest_cache
if _digest_cache is not None:
return _digest_cache
if not DIGEST_HISTORY_PATH.exists():
log.warning("digest history not found at %s — weekly_digest will return empty.",
DIGEST_HISTORY_PATH)
_digest_cache = []
return _digest_cache
records: list[dict] = []
try:
with open(DIGEST_HISTORY_PATH) as fh:
for ln, line in enumerate(fh, start=1):
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except json.JSONDecodeError as e:
log.warning("digest history: skipping malformed line %d: %s", ln, e)
except OSError as e:
log.warning("digest history read failed: %s", e)
_digest_cache = records
return _digest_cache
@mcp.tool()
def weekly_digest(
days: Annotated[int, Field(description="How far back to summarize. 7=last week, 30=last month. Horizon ~120 days.", ge=1, le=120)] = 7,
version: Annotated[str | None, Field(description="OPTIONAL version filter, e.g. '8.1.2'.")] = None,
platform: Annotated[str | None, Field(description="OPTIONAL platform filter (HVM bundles don't set platform — leave None).")] = None,
max_bundles: Annotated[int, Field(description="Cap on per-bundle detail blocks.", ge=1, le=100)] = 25,
max_pages_per_bundle: Annotated[int, Field(description="Pages to list per bundle.", ge=1, le=50)] = 10,
) -> str:
"""Summarize what changed in the HVM docs over the past N days.
Call when the user asks *"what's new in HVM docs this week?"*,
*"what changed in 8.1.2?"*, or *"is there anything new since the
last release?"*. Reads the pre-baked digest history JSONL written
by CI from git log over corpus-touching commits.
"""
with TimedCall("weekly_digest", {
"days": days, "version": version, "platform": platform,
"max_bundles": max_bundles, "max_pages_per_bundle": max_pages_per_bundle,
}) as _call:
records = _digest_history()
if not records:
_call.set(returned="empty_no_history", record_count=0)
return ("# Weekly digest\n\n"
f"_No digest history on this image. `{DIGEST_HISTORY_PATH}` is "
"missing — it's populated by the weekly refresh workflow._")
now = _dt.datetime.now(_dt.timezone.utc)
cutoff = now - _dt.timedelta(days=days)
filtered: list[dict] = []
for r in records:
try:
ts = _dt.datetime.fromisoformat(r["timestamp"])
except (KeyError, ValueError):
continue
if ts.tzinfo is None:
ts = ts.replace(tzinfo=_dt.timezone.utc)
if ts >= cutoff:
filtered.append({**r, "_ts": ts})
if not filtered:
_call.set(returned="empty_window", record_count=0)
covers = ""
if records:
oldest = min(records, key=lambda r: r.get("timestamp", ""))
newest = max(records, key=lambda r: r.get("timestamp", ""))
covers = (f"\n\n_(History on this image covers "
f"{oldest.get('timestamp','?')[:10]} through "
f"{newest.get('timestamp','?')[:10]}.)_")
return (f"# Weekly digest — last {days} day{'s' if days != 1 else ''}\n\n"
f"_No corpus changes recorded in this window._" + covers)
cat = _bundles()
def _passes(bid: str) -> bool:
if not (version or platform):
return True
b = cat.get(bid)
if b is None:
return False
if version and b.get("version") != version:
return False
if platform and b.get("platform") != platform:
return False
return True
filtered.sort(key=lambda r: r["_ts"], reverse=True)
per_bundle_pages: dict[str, list[str]] = {}
new_bundles_set: set[str] = set()
drift_bundles_set: set[str] = set()
commits_in_window = 0
for r in filtered:
commits_in_window += 1
for bid in r.get("new_bundles", []):
if _passes(bid):
new_bundles_set.add(bid)
for bid in r.get("json_only_bundles", []):
if _passes(bid):
drift_bundles_set.add(bid)
for bid, pages in (r.get("content_bundles") or {}).items():
if not _passes(bid):
continue
seen = set(per_bundle_pages.get(bid, []))
fresh = [p for p in pages if p not in seen]
if fresh:
per_bundle_pages.setdefault(bid, []).extend(fresh)
total_md = sum(len(p) for p in per_bundle_pages.values())
bundles_ranked = sorted(per_bundle_pages.items(), key=lambda kv: (-len(kv[1]), kv[0]))
_call.set(returned="ok", record_count=commits_in_window,
bundles_changed=len(per_bundle_pages),
new_bundles=len(new_bundles_set))
ts_oldest = filtered[-1]["_ts"].date().isoformat()
ts_newest = filtered[0]["_ts"].date().isoformat()
lines = [
f"# HVM docs digest — last {days} day{'s' if days != 1 else ''}", "",
f"_Window: {ts_oldest}{ts_newest}_ • _Filters: version={version}, platform={platform}_", "",
"## Headline", "",
f"- **{total_md}** page change(s) across **{len(per_bundle_pages)}** bundle(s)",
f"- **{commits_in_window}** corpus-touching commit(s) in this window",
f"- **{len(new_bundles_set)}** bundle(s) newly added",
f"- **{len(drift_bundles_set)}** bundle(s) with sidecar-only drift", "",
]
if not per_bundle_pages and not new_bundles_set:
lines.append(f"_No bundle changes matched the filter in this window._")
return "\n".join(lines)
if new_bundles_set:
lines += ["## New bundles added", ""]
for bid in sorted(new_bundles_set):
b = cat.get(bid, {})
t = b.get("title") or ""
tag = f" *({b.get('version') or '?'})*" if b.get("version") else ""
lines.append(f"- `{bid}`{tag} {t}")
lines.append("")
if bundles_ranked:
top = bundles_ranked[:max_bundles]
remainder = len(bundles_ranked) - len(top)
lines += [f"## Bundles with content changes — top {len(top)}" +
(f" of {len(bundles_ranked)}" if remainder else ""), ""]
for bid, pages in top:
b = cat.get(bid, {})
tag = f" *({b.get('version') or ''})*" if b.get("version") else ""
lines.append(f"### `{bid}`{tag}")
if b.get("title"):
lines.append(f"_{b['title']}_")
lines.append(f"{len(pages)} page change(s).")
for p in pages[:max_pages_per_bundle]:
lines.append(f"- `{p}`")
if len(pages) > max_pages_per_bundle:
lines.append(f" _(+{len(pages) - max_pages_per_bundle} more)_")
lines.append("")
lines.append("\nInspect a specific page: `get_page(bundle_id, page_id)` or `diff_versions(...)`.")
return "\n".join(lines)
@mcp.tool()
def corpus_status() -> str:
"""Freshness + size of the knowledge base.
Combines: (1) image build time (bundles.json mtime in container),
(2) most-recent upstream Published date across bundles, (3) total
bundles / pages / Chroma chunks.
"""
lines: list[str] = ["# Corpus status", ""]
try:
ts = _dt.datetime.fromtimestamp(BUNDLES_JSON.stat().st_mtime, tz=_dt.timezone.utc).isoformat(timespec="seconds")
lines.append(f"- This image built at: **{ts}**")
except OSError:
lines.append("- This image build time: _unknown_")
cat = _bundles()
latest_pub: str | None = None
per_bundle: list[tuple[str, str]] = []
for slug, b in cat.items():
pub = (b.get("dates") or {}).get("Published")
if pub:
if latest_pub is None or pub > latest_pub:
latest_pub = pub
per_bundle.append((slug, pub))
if latest_pub:
lines.append(f"- Most-recent upstream Published date (any bundle): **{latest_pub}**")
lines.append("")
try:
chunk_count = _collection().count()
except Exception:
chunk_count = -1
pages_count = sum(1 for d in (CORPUS.iterdir() if CORPUS.exists() else [])
if d.is_dir() for _ in d.glob("*.md"))
lines += [
f"- Bundles indexed: **{len(cat)}**",
f"- Pages in corpus: **{pages_count}**",
f"- Chunks in Chroma: **{chunk_count}**" if chunk_count >= 0 else "- Chunks in Chroma: _(query failed)_",
"",
]
if per_bundle:
per_bundle.sort(key=lambda kv: kv[1], reverse=True)
lines.append("## Most-recently-edited bundles (by HPE)")
for slug, when in per_bundle[:5]:
b = cat.get(slug, {})
lines.append(f"- `{slug}` — {b.get('title') or slug} (published {when})")
return "\n".join(lines)
# ===========================================================================
# Phase 11 — curated knowledge: morpheus_api_lessons
# ===========================================================================
def _split_lessons_sections(md: str) -> list[tuple[str, str]]:
sections: list[tuple[str, str]] = []
current_title: str | None = None
current_lines: list[str] = []
for line in md.splitlines(keepends=True):
m = re.match(r"^##\s+(.+?)\s*$", line)
if m:
if current_lines:
sections.append((current_title or "(prelude)", "".join(current_lines)))
current_title = m.group(1).strip()
current_lines = [line]
else:
current_lines.append(line)
if current_lines:
sections.append((current_title or "(prelude)", "".join(current_lines)))
return sections
@mcp.tool()
def morpheus_api_lessons(
topic: Annotated[str | None, Field(description="Optional keyword filter — returns only H2 sections whose heading or body contains this substring. Examples: 'manager', 'agent upgrade', 'plugin api', 'worker', 'console keyboard'. Omit for the full doc.")] = None,
) -> str:
"""Curated lessons about HPE Morpheus Enterprise — non-obvious bits
that aren't in the official docs and gotchas learned from real
integration / operation work.
**Call this proactively whenever the user asks you to:**
- install, upgrade, or troubleshoot an HVM cluster or manager
- integrate with HVM (REST API, automation, scripting)
- upgrade across versions (8.1.0 → 8.1.1 → 8.1.2)
- work with HVM Host agents
- configure backups, networking, or storage
- elevate to HPE Morpheus Enterprise
With ``topic=...`` you'll get just the relevant H2 section(s). With
no argument you'll get the full doc — usually the right call when
starting on a new task since the TL;DR at the top primes the rest.
"""
with TimedCall("morpheus_api_lessons", {"topic": topic}) as _call:
try:
md = API_LESSONS_MD.read_text()
except OSError as e:
_call.set(error=str(e))
return f"Lessons doc not present at {API_LESSONS_MD}: {e}"
if not topic:
_call.set(returned="full")
return md
needle = topic.lower()
sections = _split_lessons_sections(md)
kept: list[str] = []
for title, body in sections:
if needle in title.lower() or needle in body.lower():
kept.append(body)
if not kept:
_call.set(returned="empty", topic_matched=False)
return (f"_No sections matched topic={topic!r}. Returning the full document._\n\n" + md)
_call.set(returned="filtered", sections_matched=len(kept))
return f"_Filtered to {len(kept)} section(s) matching topic={topic!r}._\n\n" + "".join(kept)
# ===========================================================================
# Phase 12 — find_doc_inconsistencies + submit_doc_bug
# ===========================================================================
_REDIRECT_PHRASE_RE = re.compile(
r"\bsee\s+(?:the\s+)?[A-Z`\[][^.!?\n]{2,80}(?:for|topic|section|chapter|guide)\b",
re.IGNORECASE,
)
_VERSION_SUFFIX_RE = re.compile(r"_(\d+_\d+_\d+)$")
def _bundle_family(bundle_id: str) -> str:
"""Strip a trailing `_X_Y_Z` version suffix from an HVM bundle slug.
`hvm_user_manual_8_1_0` → `hvm_user_manual`
`hvm_deployment_guide` → `hvm_deployment_guide` (no version)
Same-family bundles are version peers; cross-family pairs (User Manual
vs Release Notes) are intentionally different content.
"""
return _VERSION_SUFFIX_RE.sub("", bundle_id)
def _check_cross_version_drift(bundle_id: str, page_id: str, md: str, meta: dict) -> dict | None:
cluster = (meta.get("topic_cluster") or {}).get("clustered_topics") or []
if not cluster:
return None
src_family = _bundle_family(bundle_id)
src_lines = max(1, len(md.splitlines()))
in_band: list[tuple[int, str, str, int]] = []
out_band: list[tuple[int, str, str, int]] = []
for peer in cluster:
peer_bid = peer.get("bundle_id")
peer_pid = peer.get("page_id")
if not (peer_bid and peer_pid) or peer_bid == bundle_id:
continue
if _bundle_family(peer_bid) != src_family:
continue
peer_data = _read_page(peer_bid, peer_pid)
if peer_data is None:
continue
peer_md, _ = peer_data
added, removed = _diff_churn(md, peer_md)
churn = added + removed
peer_lines = max(1, len(peer_md.splitlines()))
denom = max(src_lines, peer_lines)
pct = (churn * 100) // denom if denom else 0
tup = (churn, peer_bid, peer_pid, peer_lines)
if 10 <= pct <= 60:
in_band.append(tup)
elif churn >= 5:
out_band.append(tup)
if in_band:
chosen = min(in_band, key=lambda t: t[0])
confidence = "high"
elif out_band:
chosen = min(out_band, key=lambda t: t[0])
confidence = "low"
else:
return None
churn, peer_bid, peer_pid, peer_lines = chosen
denom = max(src_lines, peer_lines)
churn_pct = min(100, (churn * 100) // denom) if denom else 0
return {
"check": "cross_version_drift",
"bundle_id": bundle_id, "page_id": page_id,
"page_url": _source_url(bundle_id, page_id),
"peer_bundle_id": peer_bid, "peer_page_id": peer_pid,
"churn_lines": churn, "churn_pct_of_file": churn_pct,
"confidence": confidence,
"summary": (f"Drifts {churn} lines (~{churn_pct}% of file) vs peer "
f"{peer_bid}/{peer_pid}. Inspect with "
f"diff_versions({bundle_id!r}, {page_id!r}, {peer_bid!r})."),
}
def _check_redirect_chain(bundle_id: str, page_id: str, md: str, meta: dict) -> dict | None:
body = re.sub(r"^#[^\n]*\n", "", md, count=1).strip()
if "```" in body:
return None
text_only = re.sub(r"[`\[\]()*_>#-]", "", body)
text_only = re.sub(r"\s+", " ", text_only).strip()
if len(text_only) > 600:
return None
redirect_matches = list(_REDIRECT_PHRASE_RE.finditer(body))
if not redirect_matches:
return None
evidence = redirect_matches[0].group(0).strip()
return {
"check": "redirect_chain",
"bundle_id": bundle_id, "page_id": page_id,
"page_url": _source_url(bundle_id, page_id),
"body_chars": len(text_only),
"redirect_phrase": evidence[:200],
"confidence": "medium",
"summary": (f"Page is {len(text_only)} chars of body text with a "
f'"see ... for ..." redirect: "{evidence[:120]}". '
"Inspect with get_page to confirm."),
}
@mcp.tool()
def find_doc_inconsistencies(
scope_query: Annotated[str, Field(description="Natural-language scope describing what slice to scan. Used as a search to pick candidate pages. Examples: 'backup configuration', 'HVM cluster setup', 'VME manager installation'.")],
version: Annotated[str | None, Field(description="OPTIONAL version filter — e.g. '8.1.2'.")] = None,
platform: Annotated[str | None, Field(description="OPTIONAL platform filter (HVM bundles don't set platform — usually leave None).")] = None,
bundle_id: Annotated[str | None, Field(description="OPTIONAL specific bundle slug to restrict scanning to.")] = None,
max_pages: Annotated[int, Field(description="How many candidate pages to inspect.", ge=5, le=200)] = 30,
checks: Annotated[list[str] | None, Field(description="Which checks to run. Available: 'cross_version_drift', 'redirect_chain'. Defaults to all.")] = None,
) -> str:
"""Scan a scoped set of HVM docs pages for likely documentation bugs.
Surfaces concrete candidates for human review — NOT a stream of
bugs to auto-submit. Workflow:
1. Run this against a focused scope.
2. Review each finding; many will be false positives.
3. For real bugs, drill in with `get_page` / `diff_versions`.
4. Draft a bug report; show the operator; ask explicitly.
5. Only then call `submit_doc_bug`. One bug = one confirmation.
**Do NOT loop submissions.** Even on "submit them all", confirm each
one individually. HPE's docs queue is a shared resource.
"""
with TimedCall("find_doc_inconsistencies", {
"scope_query": scope_query, "version": version, "platform": platform,
"bundle_id": bundle_id, "max_pages": max_pages, "checks": checks,
}) as _call:
all_checks = {"cross_version_drift", "redirect_chain"}
requested = all_checks if checks is None else {c for c in checks if c in all_checks}
if not requested:
_call.set(error="no_valid_checks")
return f"No valid checks requested. Available: {sorted(all_checks)}."
try:
col = _collection()
except Exception as e:
_call.set(error=f"collection: {e}")
return f"Couldn't open Chroma collection: {e}"
where = _build_where(version, platform, bundle_id)
try:
res = col.query(query_texts=[scope_query], n_results=max_pages * 3,
where=where, include=["metadatas"])
except Exception as e:
_call.set(error=f"query: {e}")
return f"Scope query failed: {e}"
seen: set[tuple[str, str]] = set()
candidates: list[tuple[str, str]] = []
for meta in (res.get("metadatas") or [[]])[0]:
key = (meta.get("bundle_id") or "", meta.get("page_id") or "")
if not key[0] or not key[1] or key in seen:
continue
seen.add(key)
candidates.append(key)
if len(candidates) >= max_pages:
break
_call.set(pages_inspected=len(candidates), checks=sorted(requested))
if not candidates:
return f"No pages matched scope `{scope_query}`."
findings: dict[str, list[dict]] = {c: [] for c in requested}
for bid, pid in candidates:
data = _read_page(bid, pid)
if data is None:
continue
md, meta = data
if "cross_version_drift" in requested:
f = _check_cross_version_drift(bid, pid, md, meta)
if f:
findings["cross_version_drift"].append(f)
if "redirect_chain" in requested:
f = _check_redirect_chain(bid, pid, md, meta)
if f:
findings["redirect_chain"].append(f)
findings["cross_version_drift"] = sorted(
findings.get("cross_version_drift", []),
key=lambda f: (-(1 if f["confidence"] == "high" else 0), -f["churn_lines"]))
findings["redirect_chain"] = sorted(
findings.get("redirect_chain", []), key=lambda f: f["body_chars"])
total = sum(len(v) for v in findings.values())
_call.set(findings_total=total,
findings_by_check={k: len(v) for k, v in findings.items()})
lines = [
f"# Doc inconsistency scan — {len(candidates)} pages inspected", "",
f"_Scope_: `{scope_query}` • _Filters_: version={version}, platform={platform}, bundle_id={bundle_id} • _Checks_: {sorted(requested)}", "",
f"**{total} candidate finding{'' if total == 1 else 's'}.** Review each individually. "
"For real bugs, follow up with `get_page` / `diff_versions`, draft the report, "
"show the operator, and only call `submit_doc_bug` after explicit confirmation.", "",
]
if not total:
lines.append("_No findings in this scope._")
return "\n".join(lines)
for check in sorted(requested):
items = findings.get(check, [])
lines += [f"## {check} ({len(items)})", ""]
if not items:
lines.append("_No findings for this check._\n")
continue
for i, f in enumerate(items, 1):
lines.append(f"### {i}. `{f['bundle_id']}/{f['page_id']}` *({f['confidence']} confidence)*")
lines.append(f"- URL: {f['page_url']}")
lines.append(f"- {f['summary']}")
if check == "cross_version_drift":
lines.append(f"- Peer: `{f['peer_bundle_id']}/{f['peer_page_id']}` • churn: {f['churn_lines']} lines ({f['churn_pct_of_file']}% of file)")
elif check == "redirect_chain":
lines.append(f"- Body length: {f['body_chars']} chars • Phrase: *\"{f['redirect_phrase']}\"*")
lines.append("")
lines += ["---",
"_Reminder: `submit_doc_bug` has a real side effect. Draft → show → confirm → submit, one at a time. Do not loop._"]
return "\n".join(lines)
# --- submit_doc_bug ----------------------------------------------------------
# HPE Support DocPortal's "Was this helpful?" widget POSTs to an endpoint
# we haven't sniffed yet. Until DOC_BUG_API_URL is set AND
# DOC_BUG_SUBMIT_ENABLED=true, this tool refuses submission and tells the
# operator to paste manually. When you sniff the endpoint, set both env
# vars and verify the payload shape against the schema below.
_DOC_BUG_ALLOWED_HOSTS = {"support.hpe.com"}
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
@mcp.tool()
def submit_doc_bug(
page_url: Annotated[str, Field(description="Full URL of the support.hpe.com page the bug is about. Must be a support.hpe.com URL.")],
content: Annotated[str, Field(description="Body of the bug report. Be specific: what the page says, what's wrong, what it should say. Cite exact passages. The docs team reads it verbatim.")],
email: Annotated[str | None, Field(description="OPTIONAL submitter email for follow-up. Omit if anonymous.")] = None,
rating: Annotated[int | None, Field(description="OPTIONAL star rating 1-5 (1-2 for serious bugs, 3 unclear, 4-5 only on explicit request).")] = None,
like: Annotated[bool | None, Field(description="OPTIONAL thumbs-up/down. False for bugs, True for positive feedback.")] = None,
) -> str:
"""Submit a documentation bug to HPE's docs feedback channel.
**⚠️ THIS TOOL HAS A REAL SIDE EFFECT (when enabled). It POSTs to
HPE's docs feedback endpoint and the submission lands in their queue.**
**MANDATORY operator-confirmation workflow:**
1. Draft the bug content yourself. Show the operator the exact text
you intend to submit + the page URL + any rating/email fields.
2. Ask explicitly: *"Submit this bug? (yes/no)"*
3. Only call submit_doc_bug AFTER they answer yes.
4. If they say *"submit them all"*, STILL confirm each one. This
tool MUST NOT be called in a loop without per-bug consent.
**Do not call this autonomously.** Don't preemptively submit while
exploring inconsistencies. Don't call inside an agent loop without
a human in the loop. Misuse will get this MCP blocked at HPE's WAF.
**What makes a good bug report:**
- Specific page URL. One bug = one page.
- Concrete quote of the problem text + version/platform context.
- Suggested correction when you have one.
- Avoid editorializing — factual bugs and broken links best.
"""
with TimedCall("submit_doc_bug", {
"page_url": page_url, "content_len": len(content or ""),
"email_present": bool(email), "rating": rating, "like": like,
}) as _call:
if not DOC_BUG_SUBMIT_ENABLED:
_call.set(error="disabled", outcome="refused_disabled")
return (
"submit_doc_bug is disabled on this MCP deployment "
"(DOC_BUG_SUBMIT_ENABLED is not set). The operator's draft is good — "
f"they can paste it into the feedback widget on {page_url} themselves.\n\n"
"_(For maintainers: sniff HPE's feedback endpoint, set DOC_BUG_API_URL "
"to the POST target, and DOC_BUG_SUBMIT_ENABLED=true to activate.)_"
)
if not DOC_BUG_API_URL:
_call.set(error="no_endpoint", outcome="refused_disabled")
return ("submit_doc_bug is enabled but DOC_BUG_API_URL is empty. "
f"Operator should paste manually at {page_url}.")
if not content or not content.strip():
_call.set(error="empty_content", outcome="refused_invalid")
return "Refused: empty `content`."
if len(content) > 10000:
_call.set(error="content_too_long", outcome="refused_invalid")
return f"Refused: `content` is {len(content)} chars (cap 10000)."
try:
from urllib.parse import urlparse
parsed = urlparse(page_url)
except Exception as e:
_call.set(error=f"url_parse: {e}", outcome="refused_invalid")
return f"Refused: couldn't parse page_url ({e})."
if parsed.scheme not in ("http", "https"):
_call.set(error="bad_scheme", outcome="refused_invalid")
return f"Refused: scheme must be http(s), got {parsed.scheme!r}."
if parsed.hostname not in _DOC_BUG_ALLOWED_HOSTS:
_call.set(error=f"bad_host: {parsed.hostname}", outcome="refused_invalid")
return (f"Refused: page_url host {parsed.hostname!r} isn't a "
f"support.hpe.com URL. submit_doc_bug only accepts bugs against HPE Support pages.")
if email is not None and not _EMAIL_RE.match(email):
_call.set(error="bad_email", outcome="refused_invalid")
return f"Refused: email {email!r} doesn't look valid. Omit if anonymous."
if rating is not None and not (1 <= rating <= 5):
_call.set(error="bad_rating", outcome="refused_invalid")
return f"Refused: rating must be 1-5, got {rating}."
href = f"{parsed.scheme}://{parsed.hostname}{parsed.path}{('?' + parsed.query) if parsed.query else ''}"
payload: dict = {"content": content, "href": href}
if email:
payload["email"] = email
if rating is not None:
payload["rating"] = rating
if like is not None:
payload["like"] = like
try:
import httpx
except ImportError:
_call.set(error="httpx_missing", outcome="refused_runtime")
return "Refused: httpx not available."
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "hvm-docs-mcp submit_doc_bug",
"Origin": "https://support.hpe.com",
"Referer": href,
}
try:
with httpx.Client(timeout=DOC_BUG_TIMEOUT) as c:
r = c.post(DOC_BUG_API_URL, json=payload, headers=headers)
except httpx.RequestError as e:
_call.set(error=f"transport: {e}", outcome="failed_transport")
return f"Submission failed (transport): {e}"
comment_id: object = None
body_summary = ""
try:
resp_json = r.json()
comment_id = resp_json.get("commentId") or resp_json.get("id")
body_summary = json.dumps(resp_json)[:300]
except (ValueError, json.JSONDecodeError):
body_summary = (r.text or "")[:300]
_call.set(http_status=r.status_code, comment_id=comment_id,
outcome=("submitted" if r.is_success else "rejected_upstream"))
if r.is_success:
id_note = f" (commentId={comment_id})" if comment_id else ""
return f"Submitted. HTTP {r.status_code}{id_note}. HPE docs team will see this for {href}."
if r.status_code in (401, 403, 429):
return (f"Submission rejected upstream (HTTP {r.status_code}). "
"Likely captcha/auth/rate-limit on anonymous POSTs. "
f"Operator can paste manually at {href}.\n\nResponse (truncated): {body_summary}")
return f"Submission rejected upstream (HTTP {r.status_code}). Response (truncated): {body_summary}"
# ===========================================================================
# Entry point
# ===========================================================================
def main() -> None:
import argparse
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
default=os.environ.get("MCP_TRANSPORT", "stdio"))
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
args = p.parse_args()
if args.transport == "stdio":
mcp.run()
else:
mcp.settings.host = args.host
mcp.settings.port = args.port
# DNS-rebinding protection defaults to localhost-only — disable for
# container-network DNS hostnames. See PLAN.md "Hosting" notes.
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
mcp.settings.transport_security.enable_dns_rebinding_protection = False
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()