fa448f94e1
Initial scaffold: the docs-mcp-template clone with all the
HVM-validated stack ported across, customized for Morpheus
Enterprise (PRODUCT_NAME=morpheus, server name morpheus-docs).
Bundles (live-discovered 2026-05-22; 1710 cataloged pages total):
* morpheus_user_manual_8_1_0 sd00007510en_us 568 pages (Feb 2026)
* morpheus_user_manual_8_1_1 sd00007621en_us 569 pages (Mar 2026)
* morpheus_user_manual_8_1_2 sd00007732en_us 569 pages (Apr 2026)
* morpheus_release_notes_8_1_0 sd00007496en_us single-doc
* morpheus_release_notes_8_1_1 sd00007610en_us single-doc
* morpheus_release_notes_8_1_2 sd00007733en_us single-doc
* morpheus_quickspecs a50009231enw html-file (live
curl_cffi against www.hpe.com; all 12+ Enterprise SKUs captured —
S6E64..S6E73AAE for new/renewal/upgrade × 1/3/5-yr terms, plus
services SKUs HA124A1#V38/V39 and H46SBA1).
No Deployment Guide or Qualification Matrix on HPE Support for
Morpheus Enterprise specifically — the only QM (sd00006551en_us)
covers HVM clusters managed by Morpheus and lives in hvm-docs.
Stack carried forward from hvm-docs:
* rag/{index,chunk,embeddings,bm25}.py — including the
MAX_CHARS=4000 chunk-cap fix for table-dense content
* docs_mcp/{server,usage}.py — 11 MCP tools, BM25-default search,
cross-encoder rerank, hybrid behind HYBRID_SEARCH=true,
morpheus_api_lessons (renamed from hvm_api_lessons), env-gated
submit_doc_bug
* docs_mcp/api_lessons.md — Morpheus-specific scaffold covering
licensing model, HVM elevation path, REST vs Plugin API, with
TODO markers for sections to flesh out from real ops experience
* scrape/{runner,quickspecs,changelog,bundles}.py — TOC + single-doc
+ html-file modes, curl_cffi Chrome120 for www.hpe.com edge bypass
* eval/{retrievers,run_eval}.py + queries.jsonl scaffold (4 placeholder
queries; populate after first scrape)
* scripts/{rerank_server,usage_report,registry_gc}.py
* .gitea/workflows/{refresh,image-only}.yml — same Gitea Actions
setup zerto-docs uses (push LAN, pull public-URL, GPU Ollama pool)
* deploy/docker-compose.yml — morpheus-docs-mcp service definition,
shared jina-rerank sidecar, Watchtower-labeled
* Dockerfile, requirements.txt, requirements-rerank.txt
Verified locally: scrape produced 1599 .md pages (some TOC entries
are parent-only and yield no body), 6353 chunks all under the 4 KB
cap, MCP server boots and lists 11 tools cleanly.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1301 lines
58 KiB
Python
1301 lines
58 KiB
Python
"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies.
|
|
|
|
This file is the template's structural anchor. The phases described in
|
|
PLAN.md add or extend pieces of this file:
|
|
|
|
Phase 3 — search_docs, get_page, list_versions stubs (you are here)
|
|
Phase 6 — reranker integration in search_docs
|
|
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse)
|
|
Phase 9 — diff_versions, list_cluster, bundle_changelog
|
|
Phase 10 — TimedCall wiring (already imported below)
|
|
Phase 11 — <product>_api_lessons tool
|
|
Phase 12 — find_doc_inconsistencies, submit_doc_bug
|
|
Phase 13 — weekly_digest + _digest_history reader
|
|
|
|
Every stub below has a docstring + `raise NotImplementedError`. Replace
|
|
the body when you reach the corresponding phase. Keep the signatures
|
|
stable across products — clients depend on them.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import datetime as _dt
|
|
import difflib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pydantic import Field
|
|
|
|
from .usage import TimedCall
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Product-specific configuration. Set these for each new build.
|
|
# ---------------------------------------------------------------------------
|
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "morpheus")
|
|
PRODUCT_DOCS_URL = os.environ.get(
|
|
"PRODUCT_DOCS_URL",
|
|
"https://support.hpe.com/hpesc/public/docDisplay?docId=sd00007732en_us",
|
|
)
|
|
COLLECTION = f"{PRODUCT_NAME}_docs"
|
|
|
|
# Paths inside the deployed container (and matching layout locally for dev).
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
CORPUS = ROOT / "corpus"
|
|
CHROMA_DIR = ROOT / "chroma"
|
|
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
|
|
BUNDLES_JSON = ROOT / "bundles.json"
|
|
DIGEST_HISTORY_PATH = CORPUS / ".digest" / "history.jsonl"
|
|
API_LESSONS_MD = Path(__file__).resolve().parent / "api_lessons.md"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase).
|
|
# ---------------------------------------------------------------------------
|
|
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
|
|
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
|
|
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
|
|
|
|
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
|
|
RRF_K = int(os.environ.get("RRF_K", "60"))
|
|
|
|
DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on")
|
|
DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint
|
|
DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FastMCP setup.
|
|
#
|
|
# stateless_http=True — every request creates an ephemeral session and
|
|
# discards it on return. Critical for production: clients don't get
|
|
# 404 storms when the container is recreated by Watchtower.
|
|
# ---------------------------------------------------------------------------
|
|
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lazy helpers — instantiate expensive things only when actually needed,
|
|
# so the server still starts when (e.g.) Ollama is briefly unreachable.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _bundles() -> dict[str, dict]:
|
|
"""Cached load of bundles.json into a {slug: bundle_dict} mapping.
|
|
|
|
bundles.json is the product-specific catalog written by the Phase 1
|
|
scraper. See PLAN.md Phase 1 for the schema.
|
|
"""
|
|
if not BUNDLES_JSON.exists():
|
|
return {}
|
|
cat = json.loads(BUNDLES_JSON.read_text())
|
|
return {b["slug"]: b for b in cat}
|
|
|
|
|
|
def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
|
|
"""Translate filter args into a Chroma `where` clause."""
|
|
conds: list[dict] = []
|
|
if version:
|
|
conds.append({"version": version})
|
|
if platform:
|
|
conds.append({"platform": platform})
|
|
if bundle_id:
|
|
conds.append({"bundle_id": bundle_id})
|
|
if not conds:
|
|
return None
|
|
if len(conds) == 1:
|
|
return conds[0]
|
|
return {"$and": conds}
|
|
|
|
|
|
def _where_for_bm25(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
|
|
"""BM25Index.query takes a flat dict of equality filters."""
|
|
w: dict[str, str] = {}
|
|
if version: w["version"] = version
|
|
if platform: w["platform"] = platform
|
|
if bundle_id: w["bundle_id"] = bundle_id
|
|
return w or None
|
|
|
|
|
|
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
|
|
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict)."""
|
|
md_path = CORPUS / bundle_id / (page_id + ".md")
|
|
json_path = CORPUS / bundle_id / (page_id + ".json")
|
|
if not md_path.exists() or not json_path.exists():
|
|
return None
|
|
return md_path.read_text(), json.loads(json_path.read_text())
|
|
|
|
|
|
_CHROMA = None
|
|
_BM25 = None
|
|
|
|
|
|
def _collection():
|
|
"""Lazy Chroma collection handle. Cached after first call."""
|
|
global _CHROMA
|
|
if _CHROMA is None:
|
|
import chromadb
|
|
from chromadb.config import Settings
|
|
from rag.embeddings import embedding_function
|
|
|
|
client = chromadb.PersistentClient(
|
|
path=str(CHROMA_DIR),
|
|
settings=Settings(anonymized_telemetry=False),
|
|
)
|
|
_CHROMA = client.get_collection(COLLECTION, embedding_function=embedding_function())
|
|
return _CHROMA
|
|
|
|
|
|
def _bm25():
|
|
"""Lazy BM25Index handle. None if the FTS5 db isn't built."""
|
|
global _BM25
|
|
if _BM25 is None:
|
|
if not BM25_DB.exists():
|
|
return None
|
|
try:
|
|
from rag.bm25 import BM25Index
|
|
_BM25 = BM25Index(str(BM25_DB))
|
|
except Exception as e: # defensive: hybrid must never block dense
|
|
log.warning("BM25 unavailable, falling back to dense-only: %s", e)
|
|
return None
|
|
return _BM25
|
|
|
|
|
|
def _enrich_from_chroma(col, chunk_ids: list[str], fused: list | None) -> tuple[list[str], list[dict], list[float]]:
|
|
"""Fetch document text + metadata for a list of chunk ids from Chroma, in order."""
|
|
if not chunk_ids:
|
|
return [], [], []
|
|
g = col.get(ids=chunk_ids, include=["documents", "metadatas"])
|
|
by_id = {i: (d, m) for i, d, m in zip(g["ids"], g["documents"], g["metadatas"])}
|
|
docs = [by_id[i][0] for i in chunk_ids if i in by_id]
|
|
metas = [by_id[i][1] for i in chunk_ids if i in by_id]
|
|
if fused is not None:
|
|
dists = [1.0 - score for _id, score, _src in fused[:len(docs)]]
|
|
else:
|
|
dists = [0.0] * len(docs)
|
|
return docs, metas, dists
|
|
|
|
|
|
def _rerank(query: str, candidates: list[tuple[str, str]]) -> list[tuple[str, str]] | None:
|
|
"""POST to RERANK_URL /v1/rerank, return candidates re-ordered by relevance.
|
|
|
|
`candidates` is `[(chunk_id, text), ...]`. Texts are truncated to ~2000 chars
|
|
before sending so we never blow past jina-reranker's 1024-token per-pair
|
|
cap (which 400s the entire batch). The full untruncated text still goes
|
|
back to the user from Chroma; truncation is reranking-only.
|
|
|
|
Returns None on any failure — caller treats that as "skip reranking,
|
|
keep retrieval-order candidates."
|
|
"""
|
|
if not RERANK_URL or not candidates:
|
|
return None
|
|
try:
|
|
import httpx
|
|
payload = {
|
|
"query": query,
|
|
"documents": [(text or "")[:2000] for _cid, text in candidates],
|
|
"top_n": len(candidates),
|
|
}
|
|
with httpx.Client(timeout=RERANK_TIMEOUT) as c:
|
|
r = c.post(f"{RERANK_URL}/v1/rerank", json=payload)
|
|
r.raise_for_status()
|
|
results = r.json().get("results") or []
|
|
order = [candidates[item["index"]] for item in results
|
|
if isinstance(item.get("index"), int) and 0 <= item["index"] < len(candidates)]
|
|
return order or None
|
|
except Exception as e:
|
|
log.warning("rerank failed, keeping retrieval order: %s", e)
|
|
return None
|
|
|
|
|
|
def _rrf_fuse(*ranked_lists: list[str], k: int = RRF_K) -> list[tuple[str, float, dict]]:
|
|
"""Reciprocal Rank Fusion. Each ranked list is a sequence of ids in
|
|
descending relevance. Returns [(id, fused_score, per_retriever_contrib), ...]
|
|
sorted by score desc."""
|
|
scores: dict[str, float] = {}
|
|
sources: dict[str, dict] = {}
|
|
names = ("dense", "bm25", "extra")
|
|
for idx, lst in enumerate(ranked_lists):
|
|
src = names[idx] if idx < len(names) else f"r{idx}"
|
|
for rank, ident in enumerate(lst, start=1):
|
|
scores[ident] = scores.get(ident, 0.0) + 1.0 / (k + rank)
|
|
sources.setdefault(ident, {})[src] = rank
|
|
ranked = sorted(scores.items(), key=lambda kv: -kv[1])
|
|
return [(ident, score, sources[ident]) for ident, score in ranked]
|
|
|
|
|
|
def _source_url(bundle_id: str, page_id: str) -> str:
|
|
"""Build the canonical docs portal URL for a (bundle, page) pair."""
|
|
b = _bundles().get(bundle_id)
|
|
if not b:
|
|
return ""
|
|
doc_id = b.get("doc_id", "")
|
|
if page_id.startswith("GUID-"):
|
|
return f"https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}&page={page_id}.html"
|
|
return f"https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}"
|
|
|
|
|
|
# ===========================================================================
|
|
# Tools
|
|
# ===========================================================================
|
|
|
|
@mcp.tool()
|
|
def search_docs(
|
|
query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")],
|
|
version: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL version filter — restrict to one product version."),
|
|
] = None,
|
|
platform: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."),
|
|
] = None,
|
|
bundle_id: Annotated[
|
|
str | None,
|
|
Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."),
|
|
] = None,
|
|
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
|
|
) -> str:
|
|
"""Search the HPE Morpheus Enterprise (Morpheus) docs corpus.
|
|
|
|
Returns the top-k most relevant chunks (with full source page URLs)
|
|
given a natural-language query. Optional filters narrow the search
|
|
to one version, one platform, or one bundle. Use list_versions()
|
|
first if you need to discover the available facet values.
|
|
|
|
Call this tool whenever the user asks anything that should be
|
|
answerable from the official product documentation — install,
|
|
upgrade, configuration, backups, networking, HVM clusters, the
|
|
Morpheus UI, or any 8.1.x release-notes question.
|
|
"""
|
|
with TimedCall("search_docs", {
|
|
"query": query, "version": version, "platform": platform,
|
|
"bundle_id": bundle_id, "k": k,
|
|
}) as _call:
|
|
try:
|
|
col = _collection()
|
|
except Exception as e:
|
|
log.exception("chroma collection unavailable")
|
|
_call.set(hits_returned=0, error=str(e))
|
|
return f"_(search backend unavailable: {e})_"
|
|
|
|
where = _build_where(version, platform, bundle_id)
|
|
bm25_where = _where_for_bm25(version, platform, bundle_id)
|
|
pool = max(k * 5, 50)
|
|
|
|
# Retrieval mode selection. Eval on this corpus (2026-05-22, 22 golden
|
|
# queries) showed BM25 MRR=0.88 vs dense MRR=0.54 vs hybrid MRR=0.69 —
|
|
# HPE structured docs use controlled vocabulary, so lexical match wins.
|
|
# Dense is kept as fallback when BM25 has no tokens to chew on (e.g.
|
|
# purely stopword queries). HYBRID_SEARCH=true forces RRF fusion.
|
|
bm = _bm25()
|
|
docs: list[str] = []
|
|
metas: list[dict] = []
|
|
dists: list[float] = []
|
|
retrieval_mode = "dense"
|
|
top1_source = "dense_only"
|
|
|
|
if HYBRID_SEARCH and bm is not None:
|
|
try:
|
|
dense_res = col.query(query_texts=[query], n_results=pool, where=where)
|
|
dense_ids = (dense_res.get("ids") or [[]])[0]
|
|
bm_hits = bm.query(query, n=pool, where=bm25_where)
|
|
bm_ids = [cid for cid, _s in bm_hits]
|
|
fused = _rrf_fuse(dense_ids, bm_ids)
|
|
docs, metas, dists = _enrich_from_chroma(col, [c for c, _, _ in fused[:k]], fused)
|
|
if fused:
|
|
src0 = fused[0][2]
|
|
top1_source = ("both" if {"dense", "bm25"} <= set(src0)
|
|
else "bm25_only" if "bm25" in src0
|
|
else "dense_only")
|
|
retrieval_mode = "hybrid"
|
|
except Exception as e:
|
|
log.warning("hybrid failed, falling back to BM25→dense: %s", e)
|
|
|
|
if not docs and bm is not None:
|
|
try:
|
|
bm_hits = bm.query(query, n=k, where=bm25_where)
|
|
if bm_hits:
|
|
ids = [cid for cid, _s in bm_hits[:k]]
|
|
docs, metas, _ = _enrich_from_chroma(col, ids, None)
|
|
# FTS5 returns negative scores (lower=better). Map onto a
|
|
# similarity-ish [0..1] just for display.
|
|
dists = [max(0.0, min(1.0, 1.0 - abs(s) / 20.0)) for _id, s in bm_hits[:k]]
|
|
retrieval_mode = "bm25"
|
|
top1_source = "bm25_only"
|
|
except Exception as e:
|
|
log.warning("BM25 retrieval failed, falling back to dense: %s", e)
|
|
|
|
if not docs:
|
|
res = col.query(query_texts=[query], n_results=k, where=where)
|
|
docs = (res.get("documents") or [[]])[0]
|
|
metas = (res.get("metadatas") or [[]])[0]
|
|
dists = (res.get("distances") or [[]])[0]
|
|
|
|
reranker_fired = False
|
|
if RERANK_URL and docs:
|
|
# Pull a deeper pool to give the reranker something to chew on.
|
|
# We over-fetch up to RERANK_POOL chunks from whichever retriever
|
|
# already won, then ask the reranker to pick the final top-k.
|
|
pool_size = max(k, RERANK_POOL)
|
|
if len(docs) < pool_size:
|
|
if retrieval_mode == "bm25":
|
|
extra = bm.query(query, n=pool_size, where=bm25_where) if bm else []
|
|
extra_ids = [cid for cid, _s in extra]
|
|
else:
|
|
extra_res = col.query(query_texts=[query], n_results=pool_size, where=where)
|
|
extra_ids = (extra_res.get("ids") or [[]])[0]
|
|
if extra_ids:
|
|
d2, m2, _ = _enrich_from_chroma(col, extra_ids, None)
|
|
docs, metas = d2, m2
|
|
dists = [0.0] * len(docs)
|
|
# Reranker scores chunk_ids — collapse to (id, text) tuples
|
|
pairs = list(zip(
|
|
[f"{m.get('bundle_id','')}::{m.get('page_id','')}::{m.get('ordinal',0)}" for m in metas],
|
|
docs,
|
|
))
|
|
reranked = _rerank(query, pairs)
|
|
if reranked is not None:
|
|
# Re-sort docs/metas to match. Recompute distances as descending
|
|
# ordinal ranks so display still shows a useful score.
|
|
by_cid = {p[0]: i for i, p in enumerate(pairs)}
|
|
order = [by_cid[cid] for cid, _t in reranked if cid in by_cid]
|
|
docs = [docs[i] for i in order][:k]
|
|
metas = [metas[i] for i in order][:k]
|
|
dists = [1.0 - (rank / len(reranked)) for rank, _ in enumerate(reranked)][:len(docs)]
|
|
reranker_fired = True
|
|
else:
|
|
docs, metas, dists = docs[:k], metas[:k], dists[:k]
|
|
|
|
_call.set(hits_returned=len(docs), retrieval_mode=retrieval_mode,
|
|
top1_source=top1_source, reranker_fired=reranker_fired)
|
|
if not docs:
|
|
return f"_No matches for `{query}`._"
|
|
|
|
out = [f"# {len(docs)} result(s) for `{query}`", ""]
|
|
for doc, meta, dist in zip(docs, metas, dists):
|
|
bid = meta.get("bundle_id", "")
|
|
pid = meta.get("page_id", "")
|
|
title = meta.get("title") or pid
|
|
ver = meta.get("version") or ""
|
|
url = _source_url(bid, pid)
|
|
header = f"## {title}"
|
|
if ver:
|
|
header += f" _(v{ver})_"
|
|
out.append(header)
|
|
out.append(f"[{bid}/{pid}]({url}) · score={1 - dist:.3f}")
|
|
out.append("")
|
|
out.append(doc.strip())
|
|
out.append("")
|
|
return "\n".join(out)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_page(
|
|
bundle_id: Annotated[str, Field(description="Bundle slug.")],
|
|
page_id: Annotated[str, Field(description="Page filename within the bundle.")],
|
|
) -> str:
|
|
"""Return the full markdown for one page, plus a metadata header.
|
|
|
|
Use after search_docs surfaces a relevant page and the user (or you)
|
|
want the complete text — not just the matched chunks.
|
|
"""
|
|
with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
|
|
data = _read_page(bundle_id, page_id)
|
|
if data is None:
|
|
_call.set(found=False)
|
|
return f"Page not found: {bundle_id}/{page_id}"
|
|
md, meta = data
|
|
_call.set(found=True, page_chars=len(md))
|
|
title = meta.get("title") or page_id
|
|
ver = meta.get("version")
|
|
parent = meta.get("parent_title")
|
|
url = _source_url(bundle_id, page_id)
|
|
header = [f"# {title}"]
|
|
ctx = []
|
|
if ver:
|
|
ctx.append(f"version **{ver}**")
|
|
if parent:
|
|
ctx.append(f"in **{parent}**")
|
|
if ctx:
|
|
header.append("_" + " · ".join(ctx) + "_")
|
|
header.append(f"[source]({url})")
|
|
header.append("")
|
|
return "\n".join(header) + "\n" + md
|
|
|
|
|
|
@mcp.tool()
|
|
def list_versions() -> str:
|
|
"""List the available version/platform facets across all bundles.
|
|
|
|
Use this to discover valid filter values for search_docs.
|
|
"""
|
|
with TimedCall("list_versions", {}) as _call:
|
|
cat = _bundles()
|
|
if not cat:
|
|
return "_(no bundles indexed yet — run the scraper + indexer)_"
|
|
versions = sorted({b.get("version") for b in cat.values() if b.get("version")})
|
|
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")})
|
|
_call.set(versions=len(versions), platforms=len(platforms))
|
|
products = sorted({b.get("product") for b in cat.values() if b.get("product")})
|
|
lines = [f"# Facets across {len(cat)} bundle(s)", ""]
|
|
if versions:
|
|
lines += ["## Versions", ""] + [f"- `{v}`" for v in versions] + [""]
|
|
if platforms:
|
|
lines += ["## Platforms", ""] + [f"- `{p}`" for p in platforms] + [""]
|
|
if products:
|
|
lines += ["## Product / doc types", ""] + [f"- {p}" for p in products] + [""]
|
|
lines += ["## Bundles", ""]
|
|
for slug in sorted(cat):
|
|
b = cat[slug]
|
|
kind = b.get("product") or ""
|
|
ver = b.get("version")
|
|
pages = b.get("page_count", "?")
|
|
label = f"{kind} {ver}".strip() if ver else kind
|
|
lines.append(f"- `{slug}` — {label} ({pages} pages)")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ===========================================================================
|
|
# Phase 9 — cross-version tools
|
|
# ===========================================================================
|
|
|
|
def _bundle_pages(bundle_id: str) -> set[str]:
|
|
"""Page IDs (= GUID-XXXX) on disk in a bundle. Mirrors rag.index's md_path.stem."""
|
|
bd = CORPUS / bundle_id
|
|
if not bd.is_dir():
|
|
return set()
|
|
return {p.stem for p in bd.glob("*.md")}
|
|
|
|
|
|
def _diff_churn(a: str, b: str) -> tuple[int, int]:
|
|
"""Cheap (added, removed) line counts for a pair of markdown bodies."""
|
|
diff = difflib.unified_diff(a.splitlines(keepends=False),
|
|
b.splitlines(keepends=False), n=0)
|
|
added = removed = 0
|
|
for line in diff:
|
|
if line.startswith(("+++", "---", "@@")):
|
|
continue
|
|
if line.startswith("+"):
|
|
added += 1
|
|
elif line.startswith("-"):
|
|
removed += 1
|
|
return added, removed
|
|
|
|
|
|
@mcp.tool()
|
|
def list_cluster(
|
|
bundle_id: Annotated[str, Field(description="Bundle slug of the source topic.")],
|
|
page_id: Annotated[str, Field(description="Page id (GUID-XXXX) of the source topic.")],
|
|
) -> str:
|
|
"""List cross-version peers of a topic in the HVM docs.
|
|
|
|
HPE re-mints the docId per product version but keeps page GUIDs stable,
|
|
so the scrape pipeline synthesizes `topic_cluster.clustered_topics`
|
|
from same-GUID overlap (374/376/376 pages overlap across 8.1.0/.1/.2).
|
|
"""
|
|
with TimedCall("list_cluster", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
|
|
out = _read_page(bundle_id, page_id)
|
|
if out is None:
|
|
_call.set(found=False)
|
|
return f"Not found: {bundle_id}/{page_id}"
|
|
_, side = out
|
|
cluster = side.get("topic_cluster") or {}
|
|
peers = cluster.get("clustered_topics") or []
|
|
_call.set(hits_returned=len(peers))
|
|
src_label = cluster.get("clustering_title") or side.get("title") or page_id
|
|
lines = [f"# Cluster for {bundle_id}/{page_id} ({src_label})", ""]
|
|
if not peers:
|
|
lines.append("_No peer topics in cluster._")
|
|
return "\n".join(lines)
|
|
for p in peers:
|
|
lines.append(f"- `{p['bundle_id']}/{p['page_id']}` — {p.get('clustering_title') or ''}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def diff_versions(
|
|
bundle_id: Annotated[str, Field(description="Bundle slug of the source topic (the 'new' side).")],
|
|
page_id: Annotated[str, Field(description="Page id of the source topic.")],
|
|
against_bundle_id: Annotated[str, Field(description="Bundle slug to diff against. Must be in the source's cluster, or share the same page_id.")],
|
|
context: Annotated[int, Field(description="Lines of context around each hunk.", ge=0, le=10)] = 3,
|
|
) -> str:
|
|
"""Unified diff of one topic between two bundles (typically two HVM versions).
|
|
|
|
Two matching strategies, tried in order:
|
|
|
|
1. `topic_cluster` peer (synthesized from same-GUID overlap by the scraper).
|
|
2. Same `page_id` fallback (works because GUIDs are stable across HVM versions).
|
|
"""
|
|
with TimedCall("diff_versions", {
|
|
"bundle_id": bundle_id, "page_id": page_id,
|
|
"against_bundle_id": against_bundle_id, "context": context,
|
|
}) as _call:
|
|
src = _read_page(bundle_id, page_id)
|
|
if src is None:
|
|
_call.set(matched_via=None, reason="source_not_found")
|
|
return f"Source not found: {bundle_id}/{page_id}"
|
|
src_md, side = src
|
|
cluster = side.get("topic_cluster") or {}
|
|
peers = {p["bundle_id"]: p for p in (cluster.get("clustered_topics") or [])}
|
|
|
|
peer = peers.get(against_bundle_id)
|
|
if peer is not None:
|
|
peer_page_id = peer["page_id"]
|
|
matched_via = "topic_cluster"
|
|
elif _read_page(against_bundle_id, page_id) is not None:
|
|
peer_page_id = page_id
|
|
matched_via = "filename"
|
|
else:
|
|
_call.set(matched_via=None, reason="no_peer")
|
|
valid = list(peers) or ["(no peers)"]
|
|
return (f"No match for {bundle_id}/{page_id} in {against_bundle_id}.\n"
|
|
f"- No cluster peer. Available peers: {valid}\n"
|
|
f"- No page {page_id!r} in {against_bundle_id} either.")
|
|
|
|
_call.set(matched_via=matched_via)
|
|
peer_data = _read_page(against_bundle_id, peer_page_id)
|
|
if peer_data is None:
|
|
return f"Peer not found in corpus: {against_bundle_id}/{peer_page_id}"
|
|
peer_md, _ = peer_data
|
|
diff = difflib.unified_diff(peer_md.splitlines(keepends=True),
|
|
src_md.splitlines(keepends=True),
|
|
fromfile=f"{against_bundle_id}/{peer_page_id}",
|
|
tofile=f"{bundle_id}/{page_id}",
|
|
n=context)
|
|
body = "".join(diff)
|
|
header = f"_matched via {matched_via}_\n\n"
|
|
if not body.strip():
|
|
return header + f"No differences between {bundle_id}/{page_id} and {against_bundle_id}/{peer_page_id}."
|
|
return header + f"```diff\n{body}```"
|
|
|
|
|
|
@mcp.tool()
|
|
def bundle_changelog(
|
|
bundle_id_new: Annotated[str, Field(description="New-side bundle slug, e.g. 'hvm_user_manual_8_1_2'.")],
|
|
bundle_id_old: Annotated[str, Field(description="Old-side bundle slug, e.g. 'hvm_user_manual_8_1_1'.")],
|
|
min_churn: Annotated[int, Field(description="Min (added + removed) lines to flag a page as changed.", ge=1, le=1000)] = 5,
|
|
max_changed: Annotated[int, Field(description="Max changed pages to list (sorted by churn desc).", ge=1, le=500)] = 50,
|
|
) -> str:
|
|
"""High-level diff between two HVM bundles.
|
|
|
|
Lists pages added, removed, and changed between an old bundle and a
|
|
new one. Match is by page_id (which is the stable GUID — same GUID
|
|
across versions = same topic). Use after `list_versions` to discover
|
|
valid bundle slugs.
|
|
"""
|
|
with TimedCall("bundle_changelog", {
|
|
"bundle_id_new": bundle_id_new, "bundle_id_old": bundle_id_old,
|
|
"min_churn": min_churn, "max_changed": max_changed,
|
|
}) as _call:
|
|
new_pages = _bundle_pages(bundle_id_new)
|
|
old_pages = _bundle_pages(bundle_id_old)
|
|
if not new_pages and not old_pages:
|
|
_call.set(reason="both_empty")
|
|
return f"Neither bundle has pages on disk: {bundle_id_new}, {bundle_id_old}"
|
|
if not new_pages:
|
|
return f"Bundle not found or empty: {bundle_id_new}"
|
|
if not old_pages:
|
|
return f"Bundle not found or empty: {bundle_id_old}"
|
|
|
|
added = sorted(new_pages - old_pages)
|
|
removed = sorted(old_pages - new_pages)
|
|
common = sorted(new_pages & old_pages)
|
|
|
|
changed: list[tuple[str, int, int]] = []
|
|
for pid in common:
|
|
n = _read_page(bundle_id_new, pid)
|
|
o = _read_page(bundle_id_old, pid)
|
|
if n is None or o is None:
|
|
continue
|
|
a_lines, r_lines = _diff_churn(o[0], n[0])
|
|
if a_lines + r_lines >= min_churn:
|
|
changed.append((pid, a_lines, r_lines))
|
|
changed.sort(key=lambda t: -(t[1] + t[2]))
|
|
_call.set(added=len(added), removed=len(removed),
|
|
changed=len(changed), unchanged=len(common) - len(changed))
|
|
|
|
lines = [
|
|
f"# Bundle changelog: {bundle_id_new} vs {bundle_id_old}", "",
|
|
f"- pages in new: **{len(new_pages)}**",
|
|
f"- pages in old: **{len(old_pages)}**",
|
|
f"- common: **{len(common)}**",
|
|
f"- **added** (in new only): {len(added)}",
|
|
f"- **removed** (in old only): {len(removed)}",
|
|
f"- **changed** (≥{min_churn} lines): {len(changed)} of {len(common)} common",
|
|
f"- unchanged: {len(common) - len(changed)}", "",
|
|
]
|
|
if added:
|
|
lines += [f"## Added pages ({len(added)})", *(f"- `{p}`" for p in added), ""]
|
|
if removed:
|
|
lines += [f"## Removed pages ({len(removed)})", *(f"- `{p}`" for p in removed), ""]
|
|
if changed:
|
|
shown = changed[:max_changed]
|
|
lines += [
|
|
f"## Changed pages — top {len(shown)} of {len(changed)} by churn", "",
|
|
"| page | +lines | -lines | total |", "|---|---|---|---|",
|
|
]
|
|
for p, a, r in shown:
|
|
lines.append(f"| `{p}` | +{a} | -{r} | {a + r} |")
|
|
if len(changed) > max_changed:
|
|
lines.append(f"\n_({len(changed) - max_changed} more changed pages omitted; raise `max_changed` to see them.)_")
|
|
lines.append("\nInspect a specific page: `diff_versions(bundle_id_new, page_id, bundle_id_old)`.")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ===========================================================================
|
|
# Phase 13 — weekly digest from corpus/.digest/history.jsonl (built in CI)
|
|
# ===========================================================================
|
|
|
|
_digest_cache: list[dict] | None = None
|
|
|
|
|
|
def _digest_history() -> list[dict]:
|
|
"""Lazy load of the digest history JSONL written by scrape.changelog at CI time."""
|
|
global _digest_cache
|
|
if _digest_cache is not None:
|
|
return _digest_cache
|
|
if not DIGEST_HISTORY_PATH.exists():
|
|
log.warning("digest history not found at %s — weekly_digest will return empty.",
|
|
DIGEST_HISTORY_PATH)
|
|
_digest_cache = []
|
|
return _digest_cache
|
|
records: list[dict] = []
|
|
try:
|
|
with open(DIGEST_HISTORY_PATH) as fh:
|
|
for ln, line in enumerate(fh, start=1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
records.append(json.loads(line))
|
|
except json.JSONDecodeError as e:
|
|
log.warning("digest history: skipping malformed line %d: %s", ln, e)
|
|
except OSError as e:
|
|
log.warning("digest history read failed: %s", e)
|
|
_digest_cache = records
|
|
return _digest_cache
|
|
|
|
|
|
@mcp.tool()
|
|
def weekly_digest(
|
|
days: Annotated[int, Field(description="How far back to summarize. 7=last week, 30=last month. Horizon ~120 days.", ge=1, le=120)] = 7,
|
|
version: Annotated[str | None, Field(description="OPTIONAL version filter, e.g. '8.1.2'.")] = None,
|
|
platform: Annotated[str | None, Field(description="OPTIONAL platform filter (HVM bundles don't set platform — leave None).")] = None,
|
|
max_bundles: Annotated[int, Field(description="Cap on per-bundle detail blocks.", ge=1, le=100)] = 25,
|
|
max_pages_per_bundle: Annotated[int, Field(description="Pages to list per bundle.", ge=1, le=50)] = 10,
|
|
) -> str:
|
|
"""Summarize what changed in the HVM docs over the past N days.
|
|
|
|
Call when the user asks *"what's new in HVM docs this week?"*,
|
|
*"what changed in 8.1.2?"*, or *"is there anything new since the
|
|
last release?"*. Reads the pre-baked digest history JSONL written
|
|
by CI from git log over corpus-touching commits.
|
|
"""
|
|
with TimedCall("weekly_digest", {
|
|
"days": days, "version": version, "platform": platform,
|
|
"max_bundles": max_bundles, "max_pages_per_bundle": max_pages_per_bundle,
|
|
}) as _call:
|
|
records = _digest_history()
|
|
if not records:
|
|
_call.set(returned="empty_no_history", record_count=0)
|
|
return ("# Weekly digest\n\n"
|
|
f"_No digest history on this image. `{DIGEST_HISTORY_PATH}` is "
|
|
"missing — it's populated by the weekly refresh workflow._")
|
|
|
|
now = _dt.datetime.now(_dt.timezone.utc)
|
|
cutoff = now - _dt.timedelta(days=days)
|
|
filtered: list[dict] = []
|
|
for r in records:
|
|
try:
|
|
ts = _dt.datetime.fromisoformat(r["timestamp"])
|
|
except (KeyError, ValueError):
|
|
continue
|
|
if ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=_dt.timezone.utc)
|
|
if ts >= cutoff:
|
|
filtered.append({**r, "_ts": ts})
|
|
|
|
if not filtered:
|
|
_call.set(returned="empty_window", record_count=0)
|
|
covers = ""
|
|
if records:
|
|
oldest = min(records, key=lambda r: r.get("timestamp", ""))
|
|
newest = max(records, key=lambda r: r.get("timestamp", ""))
|
|
covers = (f"\n\n_(History on this image covers "
|
|
f"{oldest.get('timestamp','?')[:10]} through "
|
|
f"{newest.get('timestamp','?')[:10]}.)_")
|
|
return (f"# Weekly digest — last {days} day{'s' if days != 1 else ''}\n\n"
|
|
f"_No corpus changes recorded in this window._" + covers)
|
|
|
|
cat = _bundles()
|
|
def _passes(bid: str) -> bool:
|
|
if not (version or platform):
|
|
return True
|
|
b = cat.get(bid)
|
|
if b is None:
|
|
return False
|
|
if version and b.get("version") != version:
|
|
return False
|
|
if platform and b.get("platform") != platform:
|
|
return False
|
|
return True
|
|
|
|
filtered.sort(key=lambda r: r["_ts"], reverse=True)
|
|
per_bundle_pages: dict[str, list[str]] = {}
|
|
new_bundles_set: set[str] = set()
|
|
drift_bundles_set: set[str] = set()
|
|
commits_in_window = 0
|
|
for r in filtered:
|
|
commits_in_window += 1
|
|
for bid in r.get("new_bundles", []):
|
|
if _passes(bid):
|
|
new_bundles_set.add(bid)
|
|
for bid in r.get("json_only_bundles", []):
|
|
if _passes(bid):
|
|
drift_bundles_set.add(bid)
|
|
for bid, pages in (r.get("content_bundles") or {}).items():
|
|
if not _passes(bid):
|
|
continue
|
|
seen = set(per_bundle_pages.get(bid, []))
|
|
fresh = [p for p in pages if p not in seen]
|
|
if fresh:
|
|
per_bundle_pages.setdefault(bid, []).extend(fresh)
|
|
|
|
total_md = sum(len(p) for p in per_bundle_pages.values())
|
|
bundles_ranked = sorted(per_bundle_pages.items(), key=lambda kv: (-len(kv[1]), kv[0]))
|
|
_call.set(returned="ok", record_count=commits_in_window,
|
|
bundles_changed=len(per_bundle_pages),
|
|
new_bundles=len(new_bundles_set))
|
|
|
|
ts_oldest = filtered[-1]["_ts"].date().isoformat()
|
|
ts_newest = filtered[0]["_ts"].date().isoformat()
|
|
lines = [
|
|
f"# HVM docs digest — last {days} day{'s' if days != 1 else ''}", "",
|
|
f"_Window: {ts_oldest} → {ts_newest}_ • _Filters: version={version}, platform={platform}_", "",
|
|
"## Headline", "",
|
|
f"- **{total_md}** page change(s) across **{len(per_bundle_pages)}** bundle(s)",
|
|
f"- **{commits_in_window}** corpus-touching commit(s) in this window",
|
|
f"- **{len(new_bundles_set)}** bundle(s) newly added",
|
|
f"- **{len(drift_bundles_set)}** bundle(s) with sidecar-only drift", "",
|
|
]
|
|
if not per_bundle_pages and not new_bundles_set:
|
|
lines.append(f"_No bundle changes matched the filter in this window._")
|
|
return "\n".join(lines)
|
|
if new_bundles_set:
|
|
lines += ["## New bundles added", ""]
|
|
for bid in sorted(new_bundles_set):
|
|
b = cat.get(bid, {})
|
|
t = b.get("title") or ""
|
|
tag = f" *({b.get('version') or '?'})*" if b.get("version") else ""
|
|
lines.append(f"- `{bid}`{tag} {t}")
|
|
lines.append("")
|
|
if bundles_ranked:
|
|
top = bundles_ranked[:max_bundles]
|
|
remainder = len(bundles_ranked) - len(top)
|
|
lines += [f"## Bundles with content changes — top {len(top)}" +
|
|
(f" of {len(bundles_ranked)}" if remainder else ""), ""]
|
|
for bid, pages in top:
|
|
b = cat.get(bid, {})
|
|
tag = f" *({b.get('version') or ''})*" if b.get("version") else ""
|
|
lines.append(f"### `{bid}`{tag}")
|
|
if b.get("title"):
|
|
lines.append(f"_{b['title']}_")
|
|
lines.append(f"{len(pages)} page change(s).")
|
|
for p in pages[:max_pages_per_bundle]:
|
|
lines.append(f"- `{p}`")
|
|
if len(pages) > max_pages_per_bundle:
|
|
lines.append(f" _(+{len(pages) - max_pages_per_bundle} more)_")
|
|
lines.append("")
|
|
lines.append("\nInspect a specific page: `get_page(bundle_id, page_id)` or `diff_versions(...)`.")
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def corpus_status() -> str:
|
|
"""Freshness + size of the knowledge base.
|
|
|
|
Combines: (1) image build time (bundles.json mtime in container),
|
|
(2) most-recent upstream Published date across bundles, (3) total
|
|
bundles / pages / Chroma chunks.
|
|
"""
|
|
lines: list[str] = ["# Corpus status", ""]
|
|
try:
|
|
ts = _dt.datetime.fromtimestamp(BUNDLES_JSON.stat().st_mtime, tz=_dt.timezone.utc).isoformat(timespec="seconds")
|
|
lines.append(f"- This image built at: **{ts}**")
|
|
except OSError:
|
|
lines.append("- This image build time: _unknown_")
|
|
|
|
cat = _bundles()
|
|
latest_pub: str | None = None
|
|
per_bundle: list[tuple[str, str]] = []
|
|
for slug, b in cat.items():
|
|
pub = (b.get("dates") or {}).get("Published")
|
|
if pub:
|
|
if latest_pub is None or pub > latest_pub:
|
|
latest_pub = pub
|
|
per_bundle.append((slug, pub))
|
|
if latest_pub:
|
|
lines.append(f"- Most-recent upstream Published date (any bundle): **{latest_pub}**")
|
|
lines.append("")
|
|
try:
|
|
chunk_count = _collection().count()
|
|
except Exception:
|
|
chunk_count = -1
|
|
pages_count = sum(1 for d in (CORPUS.iterdir() if CORPUS.exists() else [])
|
|
if d.is_dir() for _ in d.glob("*.md"))
|
|
lines += [
|
|
f"- Bundles indexed: **{len(cat)}**",
|
|
f"- Pages in corpus: **{pages_count}**",
|
|
f"- Chunks in Chroma: **{chunk_count}**" if chunk_count >= 0 else "- Chunks in Chroma: _(query failed)_",
|
|
"",
|
|
]
|
|
if per_bundle:
|
|
per_bundle.sort(key=lambda kv: kv[1], reverse=True)
|
|
lines.append("## Most-recently-edited bundles (by HPE)")
|
|
for slug, when in per_bundle[:5]:
|
|
b = cat.get(slug, {})
|
|
lines.append(f"- `{slug}` — {b.get('title') or slug} (published {when})")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ===========================================================================
|
|
# Phase 11 — curated knowledge: morpheus_api_lessons
|
|
# ===========================================================================
|
|
|
|
def _split_lessons_sections(md: str) -> list[tuple[str, str]]:
|
|
sections: list[tuple[str, str]] = []
|
|
current_title: str | None = None
|
|
current_lines: list[str] = []
|
|
for line in md.splitlines(keepends=True):
|
|
m = re.match(r"^##\s+(.+?)\s*$", line)
|
|
if m:
|
|
if current_lines:
|
|
sections.append((current_title or "(prelude)", "".join(current_lines)))
|
|
current_title = m.group(1).strip()
|
|
current_lines = [line]
|
|
else:
|
|
current_lines.append(line)
|
|
if current_lines:
|
|
sections.append((current_title or "(prelude)", "".join(current_lines)))
|
|
return sections
|
|
|
|
|
|
@mcp.tool()
|
|
def morpheus_api_lessons(
|
|
topic: Annotated[str | None, Field(description="Optional keyword filter — returns only H2 sections whose heading or body contains this substring. Examples: 'manager', 'agent upgrade', 'plugin api', 'worker', 'console keyboard'. Omit for the full doc.")] = None,
|
|
) -> str:
|
|
"""Curated lessons about HPE Morpheus Enterprise — non-obvious bits
|
|
that aren't in the official docs and gotchas learned from real
|
|
integration / operation work.
|
|
|
|
**Call this proactively whenever the user asks you to:**
|
|
- install, upgrade, or troubleshoot an HVM cluster or manager
|
|
- integrate with HVM (REST API, automation, scripting)
|
|
- upgrade across versions (8.1.0 → 8.1.1 → 8.1.2)
|
|
- work with HVM Host agents
|
|
- configure backups, networking, or storage
|
|
- elevate to HPE Morpheus Enterprise
|
|
|
|
With ``topic=...`` you'll get just the relevant H2 section(s). With
|
|
no argument you'll get the full doc — usually the right call when
|
|
starting on a new task since the TL;DR at the top primes the rest.
|
|
"""
|
|
with TimedCall("morpheus_api_lessons", {"topic": topic}) as _call:
|
|
try:
|
|
md = API_LESSONS_MD.read_text()
|
|
except OSError as e:
|
|
_call.set(error=str(e))
|
|
return f"Lessons doc not present at {API_LESSONS_MD}: {e}"
|
|
if not topic:
|
|
_call.set(returned="full")
|
|
return md
|
|
needle = topic.lower()
|
|
sections = _split_lessons_sections(md)
|
|
kept: list[str] = []
|
|
for title, body in sections:
|
|
if needle in title.lower() or needle in body.lower():
|
|
kept.append(body)
|
|
if not kept:
|
|
_call.set(returned="empty", topic_matched=False)
|
|
return (f"_No sections matched topic={topic!r}. Returning the full document._\n\n" + md)
|
|
_call.set(returned="filtered", sections_matched=len(kept))
|
|
return f"_Filtered to {len(kept)} section(s) matching topic={topic!r}._\n\n" + "".join(kept)
|
|
|
|
|
|
# ===========================================================================
|
|
# Phase 12 — find_doc_inconsistencies + submit_doc_bug
|
|
# ===========================================================================
|
|
|
|
_REDIRECT_PHRASE_RE = re.compile(
|
|
r"\bsee\s+(?:the\s+)?[A-Z`\[][^.!?\n]{2,80}(?:for|topic|section|chapter|guide)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
_VERSION_SUFFIX_RE = re.compile(r"_(\d+_\d+_\d+)$")
|
|
|
|
|
|
def _bundle_family(bundle_id: str) -> str:
|
|
"""Strip a trailing `_X_Y_Z` version suffix from an HVM bundle slug.
|
|
|
|
`hvm_user_manual_8_1_0` → `hvm_user_manual`
|
|
`hvm_deployment_guide` → `hvm_deployment_guide` (no version)
|
|
|
|
Same-family bundles are version peers; cross-family pairs (User Manual
|
|
vs Release Notes) are intentionally different content.
|
|
"""
|
|
return _VERSION_SUFFIX_RE.sub("", bundle_id)
|
|
|
|
|
|
def _check_cross_version_drift(bundle_id: str, page_id: str, md: str, meta: dict) -> dict | None:
|
|
cluster = (meta.get("topic_cluster") or {}).get("clustered_topics") or []
|
|
if not cluster:
|
|
return None
|
|
src_family = _bundle_family(bundle_id)
|
|
src_lines = max(1, len(md.splitlines()))
|
|
in_band: list[tuple[int, str, str, int]] = []
|
|
out_band: list[tuple[int, str, str, int]] = []
|
|
for peer in cluster:
|
|
peer_bid = peer.get("bundle_id")
|
|
peer_pid = peer.get("page_id")
|
|
if not (peer_bid and peer_pid) or peer_bid == bundle_id:
|
|
continue
|
|
if _bundle_family(peer_bid) != src_family:
|
|
continue
|
|
peer_data = _read_page(peer_bid, peer_pid)
|
|
if peer_data is None:
|
|
continue
|
|
peer_md, _ = peer_data
|
|
added, removed = _diff_churn(md, peer_md)
|
|
churn = added + removed
|
|
peer_lines = max(1, len(peer_md.splitlines()))
|
|
denom = max(src_lines, peer_lines)
|
|
pct = (churn * 100) // denom if denom else 0
|
|
tup = (churn, peer_bid, peer_pid, peer_lines)
|
|
if 10 <= pct <= 60:
|
|
in_band.append(tup)
|
|
elif churn >= 5:
|
|
out_band.append(tup)
|
|
if in_band:
|
|
chosen = min(in_band, key=lambda t: t[0])
|
|
confidence = "high"
|
|
elif out_band:
|
|
chosen = min(out_band, key=lambda t: t[0])
|
|
confidence = "low"
|
|
else:
|
|
return None
|
|
churn, peer_bid, peer_pid, peer_lines = chosen
|
|
denom = max(src_lines, peer_lines)
|
|
churn_pct = min(100, (churn * 100) // denom) if denom else 0
|
|
return {
|
|
"check": "cross_version_drift",
|
|
"bundle_id": bundle_id, "page_id": page_id,
|
|
"page_url": _source_url(bundle_id, page_id),
|
|
"peer_bundle_id": peer_bid, "peer_page_id": peer_pid,
|
|
"churn_lines": churn, "churn_pct_of_file": churn_pct,
|
|
"confidence": confidence,
|
|
"summary": (f"Drifts {churn} lines (~{churn_pct}% of file) vs peer "
|
|
f"{peer_bid}/{peer_pid}. Inspect with "
|
|
f"diff_versions({bundle_id!r}, {page_id!r}, {peer_bid!r})."),
|
|
}
|
|
|
|
|
|
def _check_redirect_chain(bundle_id: str, page_id: str, md: str, meta: dict) -> dict | None:
|
|
body = re.sub(r"^#[^\n]*\n", "", md, count=1).strip()
|
|
if "```" in body:
|
|
return None
|
|
text_only = re.sub(r"[`\[\]()*_>#-]", "", body)
|
|
text_only = re.sub(r"\s+", " ", text_only).strip()
|
|
if len(text_only) > 600:
|
|
return None
|
|
redirect_matches = list(_REDIRECT_PHRASE_RE.finditer(body))
|
|
if not redirect_matches:
|
|
return None
|
|
evidence = redirect_matches[0].group(0).strip()
|
|
return {
|
|
"check": "redirect_chain",
|
|
"bundle_id": bundle_id, "page_id": page_id,
|
|
"page_url": _source_url(bundle_id, page_id),
|
|
"body_chars": len(text_only),
|
|
"redirect_phrase": evidence[:200],
|
|
"confidence": "medium",
|
|
"summary": (f"Page is {len(text_only)} chars of body text with a "
|
|
f'"see ... for ..." redirect: "{evidence[:120]}". '
|
|
"Inspect with get_page to confirm."),
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def find_doc_inconsistencies(
|
|
scope_query: Annotated[str, Field(description="Natural-language scope describing what slice to scan. Used as a search to pick candidate pages. Examples: 'backup configuration', 'HVM cluster setup', 'VME manager installation'.")],
|
|
version: Annotated[str | None, Field(description="OPTIONAL version filter — e.g. '8.1.2'.")] = None,
|
|
platform: Annotated[str | None, Field(description="OPTIONAL platform filter (HVM bundles don't set platform — usually leave None).")] = None,
|
|
bundle_id: Annotated[str | None, Field(description="OPTIONAL specific bundle slug to restrict scanning to.")] = None,
|
|
max_pages: Annotated[int, Field(description="How many candidate pages to inspect.", ge=5, le=200)] = 30,
|
|
checks: Annotated[list[str] | None, Field(description="Which checks to run. Available: 'cross_version_drift', 'redirect_chain'. Defaults to all.")] = None,
|
|
) -> str:
|
|
"""Scan a scoped set of HVM docs pages for likely documentation bugs.
|
|
|
|
Surfaces concrete candidates for human review — NOT a stream of
|
|
bugs to auto-submit. Workflow:
|
|
|
|
1. Run this against a focused scope.
|
|
2. Review each finding; many will be false positives.
|
|
3. For real bugs, drill in with `get_page` / `diff_versions`.
|
|
4. Draft a bug report; show the operator; ask explicitly.
|
|
5. Only then call `submit_doc_bug`. One bug = one confirmation.
|
|
|
|
**Do NOT loop submissions.** Even on "submit them all", confirm each
|
|
one individually. HPE's docs queue is a shared resource.
|
|
"""
|
|
with TimedCall("find_doc_inconsistencies", {
|
|
"scope_query": scope_query, "version": version, "platform": platform,
|
|
"bundle_id": bundle_id, "max_pages": max_pages, "checks": checks,
|
|
}) as _call:
|
|
all_checks = {"cross_version_drift", "redirect_chain"}
|
|
requested = all_checks if checks is None else {c for c in checks if c in all_checks}
|
|
if not requested:
|
|
_call.set(error="no_valid_checks")
|
|
return f"No valid checks requested. Available: {sorted(all_checks)}."
|
|
try:
|
|
col = _collection()
|
|
except Exception as e:
|
|
_call.set(error=f"collection: {e}")
|
|
return f"Couldn't open Chroma collection: {e}"
|
|
where = _build_where(version, platform, bundle_id)
|
|
try:
|
|
res = col.query(query_texts=[scope_query], n_results=max_pages * 3,
|
|
where=where, include=["metadatas"])
|
|
except Exception as e:
|
|
_call.set(error=f"query: {e}")
|
|
return f"Scope query failed: {e}"
|
|
seen: set[tuple[str, str]] = set()
|
|
candidates: list[tuple[str, str]] = []
|
|
for meta in (res.get("metadatas") or [[]])[0]:
|
|
key = (meta.get("bundle_id") or "", meta.get("page_id") or "")
|
|
if not key[0] or not key[1] or key in seen:
|
|
continue
|
|
seen.add(key)
|
|
candidates.append(key)
|
|
if len(candidates) >= max_pages:
|
|
break
|
|
_call.set(pages_inspected=len(candidates), checks=sorted(requested))
|
|
if not candidates:
|
|
return f"No pages matched scope `{scope_query}`."
|
|
findings: dict[str, list[dict]] = {c: [] for c in requested}
|
|
for bid, pid in candidates:
|
|
data = _read_page(bid, pid)
|
|
if data is None:
|
|
continue
|
|
md, meta = data
|
|
if "cross_version_drift" in requested:
|
|
f = _check_cross_version_drift(bid, pid, md, meta)
|
|
if f:
|
|
findings["cross_version_drift"].append(f)
|
|
if "redirect_chain" in requested:
|
|
f = _check_redirect_chain(bid, pid, md, meta)
|
|
if f:
|
|
findings["redirect_chain"].append(f)
|
|
findings["cross_version_drift"] = sorted(
|
|
findings.get("cross_version_drift", []),
|
|
key=lambda f: (-(1 if f["confidence"] == "high" else 0), -f["churn_lines"]))
|
|
findings["redirect_chain"] = sorted(
|
|
findings.get("redirect_chain", []), key=lambda f: f["body_chars"])
|
|
total = sum(len(v) for v in findings.values())
|
|
_call.set(findings_total=total,
|
|
findings_by_check={k: len(v) for k, v in findings.items()})
|
|
lines = [
|
|
f"# Doc inconsistency scan — {len(candidates)} pages inspected", "",
|
|
f"_Scope_: `{scope_query}` • _Filters_: version={version}, platform={platform}, bundle_id={bundle_id} • _Checks_: {sorted(requested)}", "",
|
|
f"**{total} candidate finding{'' if total == 1 else 's'}.** Review each individually. "
|
|
"For real bugs, follow up with `get_page` / `diff_versions`, draft the report, "
|
|
"show the operator, and only call `submit_doc_bug` after explicit confirmation.", "",
|
|
]
|
|
if not total:
|
|
lines.append("_No findings in this scope._")
|
|
return "\n".join(lines)
|
|
for check in sorted(requested):
|
|
items = findings.get(check, [])
|
|
lines += [f"## {check} ({len(items)})", ""]
|
|
if not items:
|
|
lines.append("_No findings for this check._\n")
|
|
continue
|
|
for i, f in enumerate(items, 1):
|
|
lines.append(f"### {i}. `{f['bundle_id']}/{f['page_id']}` *({f['confidence']} confidence)*")
|
|
lines.append(f"- URL: {f['page_url']}")
|
|
lines.append(f"- {f['summary']}")
|
|
if check == "cross_version_drift":
|
|
lines.append(f"- Peer: `{f['peer_bundle_id']}/{f['peer_page_id']}` • churn: {f['churn_lines']} lines ({f['churn_pct_of_file']}% of file)")
|
|
elif check == "redirect_chain":
|
|
lines.append(f"- Body length: {f['body_chars']} chars • Phrase: *\"{f['redirect_phrase']}\"*")
|
|
lines.append("")
|
|
lines += ["---",
|
|
"_Reminder: `submit_doc_bug` has a real side effect. Draft → show → confirm → submit, one at a time. Do not loop._"]
|
|
return "\n".join(lines)
|
|
|
|
|
|
# --- submit_doc_bug ----------------------------------------------------------
|
|
# HPE Support DocPortal's "Was this helpful?" widget POSTs to an endpoint
|
|
# we haven't sniffed yet. Until DOC_BUG_API_URL is set AND
|
|
# DOC_BUG_SUBMIT_ENABLED=true, this tool refuses submission and tells the
|
|
# operator to paste manually. When you sniff the endpoint, set both env
|
|
# vars and verify the payload shape against the schema below.
|
|
|
|
_DOC_BUG_ALLOWED_HOSTS = {"support.hpe.com"}
|
|
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
|
|
|
|
|
@mcp.tool()
|
|
def submit_doc_bug(
|
|
page_url: Annotated[str, Field(description="Full URL of the support.hpe.com page the bug is about. Must be a support.hpe.com URL.")],
|
|
content: Annotated[str, Field(description="Body of the bug report. Be specific: what the page says, what's wrong, what it should say. Cite exact passages. The docs team reads it verbatim.")],
|
|
email: Annotated[str | None, Field(description="OPTIONAL submitter email for follow-up. Omit if anonymous.")] = None,
|
|
rating: Annotated[int | None, Field(description="OPTIONAL star rating 1-5 (1-2 for serious bugs, 3 unclear, 4-5 only on explicit request).")] = None,
|
|
like: Annotated[bool | None, Field(description="OPTIONAL thumbs-up/down. False for bugs, True for positive feedback.")] = None,
|
|
) -> str:
|
|
"""Submit a documentation bug to HPE's docs feedback channel.
|
|
|
|
**⚠️ THIS TOOL HAS A REAL SIDE EFFECT (when enabled). It POSTs to
|
|
HPE's docs feedback endpoint and the submission lands in their queue.**
|
|
|
|
**MANDATORY operator-confirmation workflow:**
|
|
|
|
1. Draft the bug content yourself. Show the operator the exact text
|
|
you intend to submit + the page URL + any rating/email fields.
|
|
2. Ask explicitly: *"Submit this bug? (yes/no)"*
|
|
3. Only call submit_doc_bug AFTER they answer yes.
|
|
4. If they say *"submit them all"*, STILL confirm each one. This
|
|
tool MUST NOT be called in a loop without per-bug consent.
|
|
|
|
**Do not call this autonomously.** Don't preemptively submit while
|
|
exploring inconsistencies. Don't call inside an agent loop without
|
|
a human in the loop. Misuse will get this MCP blocked at HPE's WAF.
|
|
|
|
**What makes a good bug report:**
|
|
- Specific page URL. One bug = one page.
|
|
- Concrete quote of the problem text + version/platform context.
|
|
- Suggested correction when you have one.
|
|
- Avoid editorializing — factual bugs and broken links best.
|
|
"""
|
|
with TimedCall("submit_doc_bug", {
|
|
"page_url": page_url, "content_len": len(content or ""),
|
|
"email_present": bool(email), "rating": rating, "like": like,
|
|
}) as _call:
|
|
if not DOC_BUG_SUBMIT_ENABLED:
|
|
_call.set(error="disabled", outcome="refused_disabled")
|
|
return (
|
|
"submit_doc_bug is disabled on this MCP deployment "
|
|
"(DOC_BUG_SUBMIT_ENABLED is not set). The operator's draft is good — "
|
|
f"they can paste it into the feedback widget on {page_url} themselves.\n\n"
|
|
"_(For maintainers: sniff HPE's feedback endpoint, set DOC_BUG_API_URL "
|
|
"to the POST target, and DOC_BUG_SUBMIT_ENABLED=true to activate.)_"
|
|
)
|
|
if not DOC_BUG_API_URL:
|
|
_call.set(error="no_endpoint", outcome="refused_disabled")
|
|
return ("submit_doc_bug is enabled but DOC_BUG_API_URL is empty. "
|
|
f"Operator should paste manually at {page_url}.")
|
|
if not content or not content.strip():
|
|
_call.set(error="empty_content", outcome="refused_invalid")
|
|
return "Refused: empty `content`."
|
|
if len(content) > 10000:
|
|
_call.set(error="content_too_long", outcome="refused_invalid")
|
|
return f"Refused: `content` is {len(content)} chars (cap 10000)."
|
|
try:
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(page_url)
|
|
except Exception as e:
|
|
_call.set(error=f"url_parse: {e}", outcome="refused_invalid")
|
|
return f"Refused: couldn't parse page_url ({e})."
|
|
if parsed.scheme not in ("http", "https"):
|
|
_call.set(error="bad_scheme", outcome="refused_invalid")
|
|
return f"Refused: scheme must be http(s), got {parsed.scheme!r}."
|
|
if parsed.hostname not in _DOC_BUG_ALLOWED_HOSTS:
|
|
_call.set(error=f"bad_host: {parsed.hostname}", outcome="refused_invalid")
|
|
return (f"Refused: page_url host {parsed.hostname!r} isn't a "
|
|
f"support.hpe.com URL. submit_doc_bug only accepts bugs against HPE Support pages.")
|
|
if email is not None and not _EMAIL_RE.match(email):
|
|
_call.set(error="bad_email", outcome="refused_invalid")
|
|
return f"Refused: email {email!r} doesn't look valid. Omit if anonymous."
|
|
if rating is not None and not (1 <= rating <= 5):
|
|
_call.set(error="bad_rating", outcome="refused_invalid")
|
|
return f"Refused: rating must be 1-5, got {rating}."
|
|
|
|
href = f"{parsed.scheme}://{parsed.hostname}{parsed.path}{('?' + parsed.query) if parsed.query else ''}"
|
|
payload: dict = {"content": content, "href": href}
|
|
if email:
|
|
payload["email"] = email
|
|
if rating is not None:
|
|
payload["rating"] = rating
|
|
if like is not None:
|
|
payload["like"] = like
|
|
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
_call.set(error="httpx_missing", outcome="refused_runtime")
|
|
return "Refused: httpx not available."
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"User-Agent": "hvm-docs-mcp submit_doc_bug",
|
|
"Origin": "https://support.hpe.com",
|
|
"Referer": href,
|
|
}
|
|
try:
|
|
with httpx.Client(timeout=DOC_BUG_TIMEOUT) as c:
|
|
r = c.post(DOC_BUG_API_URL, json=payload, headers=headers)
|
|
except httpx.RequestError as e:
|
|
_call.set(error=f"transport: {e}", outcome="failed_transport")
|
|
return f"Submission failed (transport): {e}"
|
|
|
|
comment_id: object = None
|
|
body_summary = ""
|
|
try:
|
|
resp_json = r.json()
|
|
comment_id = resp_json.get("commentId") or resp_json.get("id")
|
|
body_summary = json.dumps(resp_json)[:300]
|
|
except (ValueError, json.JSONDecodeError):
|
|
body_summary = (r.text or "")[:300]
|
|
_call.set(http_status=r.status_code, comment_id=comment_id,
|
|
outcome=("submitted" if r.is_success else "rejected_upstream"))
|
|
if r.is_success:
|
|
id_note = f" (commentId={comment_id})" if comment_id else ""
|
|
return f"Submitted. HTTP {r.status_code}{id_note}. HPE docs team will see this for {href}."
|
|
if r.status_code in (401, 403, 429):
|
|
return (f"Submission rejected upstream (HTTP {r.status_code}). "
|
|
"Likely captcha/auth/rate-limit on anonymous POSTs. "
|
|
f"Operator can paste manually at {href}.\n\nResponse (truncated): {body_summary}")
|
|
return f"Submission rejected upstream (HTTP {r.status_code}). Response (truncated): {body_summary}"
|
|
|
|
|
|
# ===========================================================================
|
|
# Entry point
|
|
# ===========================================================================
|
|
|
|
def main() -> None:
|
|
import argparse
|
|
p = argparse.ArgumentParser(description=f"{PRODUCT_NAME} docs MCP server")
|
|
p.add_argument("--transport", choices=["stdio", "streamable-http", "sse"],
|
|
default=os.environ.get("MCP_TRANSPORT", "stdio"))
|
|
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
|
|
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
|
|
args = p.parse_args()
|
|
|
|
if args.transport == "stdio":
|
|
mcp.run()
|
|
else:
|
|
mcp.settings.host = args.host
|
|
mcp.settings.port = args.port
|
|
# DNS-rebinding protection defaults to localhost-only — disable for
|
|
# container-network DNS hostnames. See PLAN.md "Hosting" notes.
|
|
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
|
|
mcp.settings.transport_security.enable_dns_rebinding_protection = False
|
|
mcp.run(transport=args.transport)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|