feat: Phases 9/11/12/13 — diff / lessons / inconsistencies / digest (#3)

This commit was merged in pull request #3.
This commit is contained in:
2026-05-22 13:58:21 -04:00
parent 761552fe69
commit 79d3455de5
2 changed files with 1042 additions and 20 deletions
+806 -20
View File
@@ -18,6 +18,8 @@ stable across products — clients depend on them.
"""
from __future__ import annotations
import datetime as _dt
import difflib
import json
import logging
import os
@@ -48,6 +50,8 @@ CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma"
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
BUNDLES_JSON = ROOT / "bundles.json"
DIGEST_HISTORY_PATH = CORPUS / ".digest" / "history.jsonl"
API_LESSONS_MD = Path(__file__).resolve().parent / "api_lessons.md"
# ---------------------------------------------------------------------------
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase).
@@ -455,34 +459,816 @@ def list_versions() -> str:
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Stubs for later phaseskeep the signatures in this file so refactors
# don't lose the contracts. Implementations come per phase.
# ---------------------------------------------------------------------------
# ===========================================================================
# Phase 9cross-version tools
# ===========================================================================
# @mcp.tool() # Phase 9
# def list_cluster(bundle_id: str, page_id: str) -> str: ...
def _bundle_pages(bundle_id: str) -> set[str]:
"""Page IDs (= GUID-XXXX) on disk in a bundle. Mirrors rag.index's md_path.stem."""
bd = CORPUS / bundle_id
if not bd.is_dir():
return set()
return {p.stem for p in bd.glob("*.md")}
# @mcp.tool() # Phase 9
# def diff_versions(bundle_id: str, page_id: str, against_bundle_id: str, context: int = 3) -> str: ...
# @mcp.tool() # Phase 9
# def bundle_changelog(bundle_id_new: str, bundle_id_old: str, min_churn: int = 5, max_changed: int = 50) -> str: ...
def _diff_churn(a: str, b: str) -> tuple[int, int]:
"""Cheap (added, removed) line counts for a pair of markdown bodies."""
diff = difflib.unified_diff(a.splitlines(keepends=False),
b.splitlines(keepends=False), n=0)
added = removed = 0
for line in diff:
if line.startswith(("+++", "---", "@@")):
continue
if line.startswith("+"):
added += 1
elif line.startswith("-"):
removed += 1
return added, removed
# @mcp.tool() # Phase 13
# def weekly_digest(days: int = 7, version: str | None = None, platform: str | None = None, ...) -> str: ...
# @mcp.tool() # Phase 9 (or 3 — useful early)
# def corpus_status() -> str: ...
@mcp.tool()
def list_cluster(
bundle_id: Annotated[str, Field(description="Bundle slug of the source topic.")],
page_id: Annotated[str, Field(description="Page id (GUID-XXXX) of the source topic.")],
) -> str:
"""List cross-version peers of a topic in the HVM docs.
# @mcp.tool() # Phase 11
# def myproduct_api_lessons(topic: str | None = None) -> str: ...
HPE re-mints the docId per product version but keeps page GUIDs stable,
so the scrape pipeline synthesizes `topic_cluster.clustered_topics`
from same-GUID overlap (374/376/376 pages overlap across 8.1.0/.1/.2).
"""
with TimedCall("list_cluster", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
out = _read_page(bundle_id, page_id)
if out is None:
_call.set(found=False)
return f"Not found: {bundle_id}/{page_id}"
_, side = out
cluster = side.get("topic_cluster") or {}
peers = cluster.get("clustered_topics") or []
_call.set(hits_returned=len(peers))
src_label = cluster.get("clustering_title") or side.get("title") or page_id
lines = [f"# Cluster for {bundle_id}/{page_id} ({src_label})", ""]
if not peers:
lines.append("_No peer topics in cluster._")
return "\n".join(lines)
for p in peers:
lines.append(f"- `{p['bundle_id']}/{p['page_id']}` — {p.get('clustering_title') or ''}")
return "\n".join(lines)
# @mcp.tool() # Phase 12
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ...
# @mcp.tool() # Phase 12
# def submit_doc_bug(page_url: str, content: str, email: str | None = None, ...) -> str: ...
@mcp.tool()
def diff_versions(
bundle_id: Annotated[str, Field(description="Bundle slug of the source topic (the 'new' side).")],
page_id: Annotated[str, Field(description="Page id of the source topic.")],
against_bundle_id: Annotated[str, Field(description="Bundle slug to diff against. Must be in the source's cluster, or share the same page_id.")],
context: Annotated[int, Field(description="Lines of context around each hunk.", ge=0, le=10)] = 3,
) -> str:
"""Unified diff of one topic between two bundles (typically two HVM versions).
Two matching strategies, tried in order:
1. `topic_cluster` peer (synthesized from same-GUID overlap by the scraper).
2. Same `page_id` fallback (works because GUIDs are stable across HVM versions).
"""
with TimedCall("diff_versions", {
"bundle_id": bundle_id, "page_id": page_id,
"against_bundle_id": against_bundle_id, "context": context,
}) as _call:
src = _read_page(bundle_id, page_id)
if src is None:
_call.set(matched_via=None, reason="source_not_found")
return f"Source not found: {bundle_id}/{page_id}"
src_md, side = src
cluster = side.get("topic_cluster") or {}
peers = {p["bundle_id"]: p for p in (cluster.get("clustered_topics") or [])}
peer = peers.get(against_bundle_id)
if peer is not None:
peer_page_id = peer["page_id"]
matched_via = "topic_cluster"
elif _read_page(against_bundle_id, page_id) is not None:
peer_page_id = page_id
matched_via = "filename"
else:
_call.set(matched_via=None, reason="no_peer")
valid = list(peers) or ["(no peers)"]
return (f"No match for {bundle_id}/{page_id} in {against_bundle_id}.\n"
f"- No cluster peer. Available peers: {valid}\n"
f"- No page {page_id!r} in {against_bundle_id} either.")
_call.set(matched_via=matched_via)
peer_data = _read_page(against_bundle_id, peer_page_id)
if peer_data is None:
return f"Peer not found in corpus: {against_bundle_id}/{peer_page_id}"
peer_md, _ = peer_data
diff = difflib.unified_diff(peer_md.splitlines(keepends=True),
src_md.splitlines(keepends=True),
fromfile=f"{against_bundle_id}/{peer_page_id}",
tofile=f"{bundle_id}/{page_id}",
n=context)
body = "".join(diff)
header = f"_matched via {matched_via}_\n\n"
if not body.strip():
return header + f"No differences between {bundle_id}/{page_id} and {against_bundle_id}/{peer_page_id}."
return header + f"```diff\n{body}```"
@mcp.tool()
def bundle_changelog(
bundle_id_new: Annotated[str, Field(description="New-side bundle slug, e.g. 'hvm_user_manual_8_1_2'.")],
bundle_id_old: Annotated[str, Field(description="Old-side bundle slug, e.g. 'hvm_user_manual_8_1_1'.")],
min_churn: Annotated[int, Field(description="Min (added + removed) lines to flag a page as changed.", ge=1, le=1000)] = 5,
max_changed: Annotated[int, Field(description="Max changed pages to list (sorted by churn desc).", ge=1, le=500)] = 50,
) -> str:
"""High-level diff between two HVM bundles.
Lists pages added, removed, and changed between an old bundle and a
new one. Match is by page_id (which is the stable GUID — same GUID
across versions = same topic). Use after `list_versions` to discover
valid bundle slugs.
"""
with TimedCall("bundle_changelog", {
"bundle_id_new": bundle_id_new, "bundle_id_old": bundle_id_old,
"min_churn": min_churn, "max_changed": max_changed,
}) as _call:
new_pages = _bundle_pages(bundle_id_new)
old_pages = _bundle_pages(bundle_id_old)
if not new_pages and not old_pages:
_call.set(reason="both_empty")
return f"Neither bundle has pages on disk: {bundle_id_new}, {bundle_id_old}"
if not new_pages:
return f"Bundle not found or empty: {bundle_id_new}"
if not old_pages:
return f"Bundle not found or empty: {bundle_id_old}"
added = sorted(new_pages - old_pages)
removed = sorted(old_pages - new_pages)
common = sorted(new_pages & old_pages)
changed: list[tuple[str, int, int]] = []
for pid in common:
n = _read_page(bundle_id_new, pid)
o = _read_page(bundle_id_old, pid)
if n is None or o is None:
continue
a_lines, r_lines = _diff_churn(o[0], n[0])
if a_lines + r_lines >= min_churn:
changed.append((pid, a_lines, r_lines))
changed.sort(key=lambda t: -(t[1] + t[2]))
_call.set(added=len(added), removed=len(removed),
changed=len(changed), unchanged=len(common) - len(changed))
lines = [
f"# Bundle changelog: {bundle_id_new} vs {bundle_id_old}", "",
f"- pages in new: **{len(new_pages)}**",
f"- pages in old: **{len(old_pages)}**",
f"- common: **{len(common)}**",
f"- **added** (in new only): {len(added)}",
f"- **removed** (in old only): {len(removed)}",
f"- **changed** (≥{min_churn} lines): {len(changed)} of {len(common)} common",
f"- unchanged: {len(common) - len(changed)}", "",
]
if added:
lines += [f"## Added pages ({len(added)})", *(f"- `{p}`" for p in added), ""]
if removed:
lines += [f"## Removed pages ({len(removed)})", *(f"- `{p}`" for p in removed), ""]
if changed:
shown = changed[:max_changed]
lines += [
f"## Changed pages — top {len(shown)} of {len(changed)} by churn", "",
"| page | +lines | -lines | total |", "|---|---|---|---|",
]
for p, a, r in shown:
lines.append(f"| `{p}` | +{a} | -{r} | {a + r} |")
if len(changed) > max_changed:
lines.append(f"\n_({len(changed) - max_changed} more changed pages omitted; raise `max_changed` to see them.)_")
lines.append("\nInspect a specific page: `diff_versions(bundle_id_new, page_id, bundle_id_old)`.")
return "\n".join(lines)
# ===========================================================================
# Phase 13 — weekly digest from corpus/.digest/history.jsonl (built in CI)
# ===========================================================================
_digest_cache: list[dict] | None = None
def _digest_history() -> list[dict]:
"""Lazy load of the digest history JSONL written by scrape.changelog at CI time."""
global _digest_cache
if _digest_cache is not None:
return _digest_cache
if not DIGEST_HISTORY_PATH.exists():
log.warning("digest history not found at %s — weekly_digest will return empty.",
DIGEST_HISTORY_PATH)
_digest_cache = []
return _digest_cache
records: list[dict] = []
try:
with open(DIGEST_HISTORY_PATH) as fh:
for ln, line in enumerate(fh, start=1):
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except json.JSONDecodeError as e:
log.warning("digest history: skipping malformed line %d: %s", ln, e)
except OSError as e:
log.warning("digest history read failed: %s", e)
_digest_cache = records
return _digest_cache
@mcp.tool()
def weekly_digest(
days: Annotated[int, Field(description="How far back to summarize. 7=last week, 30=last month. Horizon ~120 days.", ge=1, le=120)] = 7,
version: Annotated[str | None, Field(description="OPTIONAL version filter, e.g. '8.1.2'.")] = None,
platform: Annotated[str | None, Field(description="OPTIONAL platform filter (HVM bundles don't set platform — leave None).")] = None,
max_bundles: Annotated[int, Field(description="Cap on per-bundle detail blocks.", ge=1, le=100)] = 25,
max_pages_per_bundle: Annotated[int, Field(description="Pages to list per bundle.", ge=1, le=50)] = 10,
) -> str:
"""Summarize what changed in the HVM docs over the past N days.
Call when the user asks *"what's new in HVM docs this week?"*,
*"what changed in 8.1.2?"*, or *"is there anything new since the
last release?"*. Reads the pre-baked digest history JSONL written
by CI from git log over corpus-touching commits.
"""
with TimedCall("weekly_digest", {
"days": days, "version": version, "platform": platform,
"max_bundles": max_bundles, "max_pages_per_bundle": max_pages_per_bundle,
}) as _call:
records = _digest_history()
if not records:
_call.set(returned="empty_no_history", record_count=0)
return ("# Weekly digest\n\n"
f"_No digest history on this image. `{DIGEST_HISTORY_PATH}` is "
"missing — it's populated by the weekly refresh workflow._")
now = _dt.datetime.now(_dt.timezone.utc)
cutoff = now - _dt.timedelta(days=days)
filtered: list[dict] = []
for r in records:
try:
ts = _dt.datetime.fromisoformat(r["timestamp"])
except (KeyError, ValueError):
continue
if ts.tzinfo is None:
ts = ts.replace(tzinfo=_dt.timezone.utc)
if ts >= cutoff:
filtered.append({**r, "_ts": ts})
if not filtered:
_call.set(returned="empty_window", record_count=0)
covers = ""
if records:
oldest = min(records, key=lambda r: r.get("timestamp", ""))
newest = max(records, key=lambda r: r.get("timestamp", ""))
covers = (f"\n\n_(History on this image covers "
f"{oldest.get('timestamp','?')[:10]} through "
f"{newest.get('timestamp','?')[:10]}.)_")
return (f"# Weekly digest — last {days} day{'s' if days != 1 else ''}\n\n"
f"_No corpus changes recorded in this window._" + covers)
cat = _bundles()
def _passes(bid: str) -> bool:
if not (version or platform):
return True
b = cat.get(bid)
if b is None:
return False
if version and b.get("version") != version:
return False
if platform and b.get("platform") != platform:
return False
return True
filtered.sort(key=lambda r: r["_ts"], reverse=True)
per_bundle_pages: dict[str, list[str]] = {}
new_bundles_set: set[str] = set()
drift_bundles_set: set[str] = set()
commits_in_window = 0
for r in filtered:
commits_in_window += 1
for bid in r.get("new_bundles", []):
if _passes(bid):
new_bundles_set.add(bid)
for bid in r.get("json_only_bundles", []):
if _passes(bid):
drift_bundles_set.add(bid)
for bid, pages in (r.get("content_bundles") or {}).items():
if not _passes(bid):
continue
seen = set(per_bundle_pages.get(bid, []))
fresh = [p for p in pages if p not in seen]
if fresh:
per_bundle_pages.setdefault(bid, []).extend(fresh)
total_md = sum(len(p) for p in per_bundle_pages.values())
bundles_ranked = sorted(per_bundle_pages.items(), key=lambda kv: (-len(kv[1]), kv[0]))
_call.set(returned="ok", record_count=commits_in_window,
bundles_changed=len(per_bundle_pages),
new_bundles=len(new_bundles_set))
ts_oldest = filtered[-1]["_ts"].date().isoformat()
ts_newest = filtered[0]["_ts"].date().isoformat()
lines = [
f"# HVM docs digest — last {days} day{'s' if days != 1 else ''}", "",
f"_Window: {ts_oldest}{ts_newest}_ • _Filters: version={version}, platform={platform}_", "",
"## Headline", "",
f"- **{total_md}** page change(s) across **{len(per_bundle_pages)}** bundle(s)",
f"- **{commits_in_window}** corpus-touching commit(s) in this window",
f"- **{len(new_bundles_set)}** bundle(s) newly added",
f"- **{len(drift_bundles_set)}** bundle(s) with sidecar-only drift", "",
]
if not per_bundle_pages and not new_bundles_set:
lines.append(f"_No bundle changes matched the filter in this window._")
return "\n".join(lines)
if new_bundles_set:
lines += ["## New bundles added", ""]
for bid in sorted(new_bundles_set):
b = cat.get(bid, {})
t = b.get("title") or ""
tag = f" *({b.get('version') or '?'})*" if b.get("version") else ""
lines.append(f"- `{bid}`{tag} {t}")
lines.append("")
if bundles_ranked:
top = bundles_ranked[:max_bundles]
remainder = len(bundles_ranked) - len(top)
lines += [f"## Bundles with content changes — top {len(top)}" +
(f" of {len(bundles_ranked)}" if remainder else ""), ""]
for bid, pages in top:
b = cat.get(bid, {})
tag = f" *({b.get('version') or ''})*" if b.get("version") else ""
lines.append(f"### `{bid}`{tag}")
if b.get("title"):
lines.append(f"_{b['title']}_")
lines.append(f"{len(pages)} page change(s).")
for p in pages[:max_pages_per_bundle]:
lines.append(f"- `{p}`")
if len(pages) > max_pages_per_bundle:
lines.append(f" _(+{len(pages) - max_pages_per_bundle} more)_")
lines.append("")
lines.append("\nInspect a specific page: `get_page(bundle_id, page_id)` or `diff_versions(...)`.")
return "\n".join(lines)
@mcp.tool()
def corpus_status() -> str:
"""Freshness + size of the knowledge base.
Combines: (1) image build time (bundles.json mtime in container),
(2) most-recent upstream Published date across bundles, (3) total
bundles / pages / Chroma chunks.
"""
lines: list[str] = ["# Corpus status", ""]
try:
ts = _dt.datetime.fromtimestamp(BUNDLES_JSON.stat().st_mtime, tz=_dt.timezone.utc).isoformat(timespec="seconds")
lines.append(f"- This image built at: **{ts}**")
except OSError:
lines.append("- This image build time: _unknown_")
cat = _bundles()
latest_pub: str | None = None
per_bundle: list[tuple[str, str]] = []
for slug, b in cat.items():
pub = (b.get("dates") or {}).get("Published")
if pub:
if latest_pub is None or pub > latest_pub:
latest_pub = pub
per_bundle.append((slug, pub))
if latest_pub:
lines.append(f"- Most-recent upstream Published date (any bundle): **{latest_pub}**")
lines.append("")
try:
chunk_count = _collection().count()
except Exception:
chunk_count = -1
pages_count = sum(1 for d in (CORPUS.iterdir() if CORPUS.exists() else [])
if d.is_dir() for _ in d.glob("*.md"))
lines += [
f"- Bundles indexed: **{len(cat)}**",
f"- Pages in corpus: **{pages_count}**",
f"- Chunks in Chroma: **{chunk_count}**" if chunk_count >= 0 else "- Chunks in Chroma: _(query failed)_",
"",
]
if per_bundle:
per_bundle.sort(key=lambda kv: kv[1], reverse=True)
lines.append("## Most-recently-edited bundles (by HPE)")
for slug, when in per_bundle[:5]:
b = cat.get(slug, {})
lines.append(f"- `{slug}` — {b.get('title') or slug} (published {when})")
return "\n".join(lines)
# ===========================================================================
# Phase 11 — curated knowledge: hvm_api_lessons
# ===========================================================================
def _split_lessons_sections(md: str) -> list[tuple[str, str]]:
sections: list[tuple[str, str]] = []
current_title: str | None = None
current_lines: list[str] = []
for line in md.splitlines(keepends=True):
m = re.match(r"^##\s+(.+?)\s*$", line)
if m:
if current_lines:
sections.append((current_title or "(prelude)", "".join(current_lines)))
current_title = m.group(1).strip()
current_lines = [line]
else:
current_lines.append(line)
if current_lines:
sections.append((current_title or "(prelude)", "".join(current_lines)))
return sections
@mcp.tool()
def hvm_api_lessons(
topic: Annotated[str | None, Field(description="Optional keyword filter — returns only H2 sections whose heading or body contains this substring. Examples: 'manager', 'agent upgrade', 'plugin api', 'worker', 'console keyboard'. Omit for the full doc.")] = None,
) -> str:
"""Curated lessons about HPE Morpheus VM Essentials — non-obvious bits
that aren't in the official docs and gotchas learned from real
integration / operation work.
**Call this proactively whenever the user asks you to:**
- install, upgrade, or troubleshoot an HVM cluster or manager
- integrate with HVM (REST API, automation, scripting)
- upgrade across versions (8.1.0 → 8.1.1 → 8.1.2)
- work with HVM Host agents
- configure backups, networking, or storage
- elevate to HPE Morpheus Enterprise
With ``topic=...`` you'll get just the relevant H2 section(s). With
no argument you'll get the full doc — usually the right call when
starting on a new task since the TL;DR at the top primes the rest.
"""
with TimedCall("hvm_api_lessons", {"topic": topic}) as _call:
try:
md = API_LESSONS_MD.read_text()
except OSError as e:
_call.set(error=str(e))
return f"Lessons doc not present at {API_LESSONS_MD}: {e}"
if not topic:
_call.set(returned="full")
return md
needle = topic.lower()
sections = _split_lessons_sections(md)
kept: list[str] = []
for title, body in sections:
if needle in title.lower() or needle in body.lower():
kept.append(body)
if not kept:
_call.set(returned="empty", topic_matched=False)
return (f"_No sections matched topic={topic!r}. Returning the full document._\n\n" + md)
_call.set(returned="filtered", sections_matched=len(kept))
return f"_Filtered to {len(kept)} section(s) matching topic={topic!r}._\n\n" + "".join(kept)
# ===========================================================================
# Phase 12 — find_doc_inconsistencies + submit_doc_bug
# ===========================================================================
_REDIRECT_PHRASE_RE = re.compile(
r"\bsee\s+(?:the\s+)?[A-Z`\[][^.!?\n]{2,80}(?:for|topic|section|chapter|guide)\b",
re.IGNORECASE,
)
_VERSION_SUFFIX_RE = re.compile(r"_(\d+_\d+_\d+)$")
def _bundle_family(bundle_id: str) -> str:
"""Strip a trailing `_X_Y_Z` version suffix from an HVM bundle slug.
`hvm_user_manual_8_1_0` → `hvm_user_manual`
`hvm_deployment_guide` → `hvm_deployment_guide` (no version)
Same-family bundles are version peers; cross-family pairs (User Manual
vs Release Notes) are intentionally different content.
"""
return _VERSION_SUFFIX_RE.sub("", bundle_id)
def _check_cross_version_drift(bundle_id: str, page_id: str, md: str, meta: dict) -> dict | None:
cluster = (meta.get("topic_cluster") or {}).get("clustered_topics") or []
if not cluster:
return None
src_family = _bundle_family(bundle_id)
src_lines = max(1, len(md.splitlines()))
in_band: list[tuple[int, str, str, int]] = []
out_band: list[tuple[int, str, str, int]] = []
for peer in cluster:
peer_bid = peer.get("bundle_id")
peer_pid = peer.get("page_id")
if not (peer_bid and peer_pid) or peer_bid == bundle_id:
continue
if _bundle_family(peer_bid) != src_family:
continue
peer_data = _read_page(peer_bid, peer_pid)
if peer_data is None:
continue
peer_md, _ = peer_data
added, removed = _diff_churn(md, peer_md)
churn = added + removed
peer_lines = max(1, len(peer_md.splitlines()))
denom = max(src_lines, peer_lines)
pct = (churn * 100) // denom if denom else 0
tup = (churn, peer_bid, peer_pid, peer_lines)
if 10 <= pct <= 60:
in_band.append(tup)
elif churn >= 5:
out_band.append(tup)
if in_band:
chosen = min(in_band, key=lambda t: t[0])
confidence = "high"
elif out_band:
chosen = min(out_band, key=lambda t: t[0])
confidence = "low"
else:
return None
churn, peer_bid, peer_pid, peer_lines = chosen
denom = max(src_lines, peer_lines)
churn_pct = min(100, (churn * 100) // denom) if denom else 0
return {
"check": "cross_version_drift",
"bundle_id": bundle_id, "page_id": page_id,
"page_url": _source_url(bundle_id, page_id),
"peer_bundle_id": peer_bid, "peer_page_id": peer_pid,
"churn_lines": churn, "churn_pct_of_file": churn_pct,
"confidence": confidence,
"summary": (f"Drifts {churn} lines (~{churn_pct}% of file) vs peer "
f"{peer_bid}/{peer_pid}. Inspect with "
f"diff_versions({bundle_id!r}, {page_id!r}, {peer_bid!r})."),
}
def _check_redirect_chain(bundle_id: str, page_id: str, md: str, meta: dict) -> dict | None:
body = re.sub(r"^#[^\n]*\n", "", md, count=1).strip()
if "```" in body:
return None
text_only = re.sub(r"[`\[\]()*_>#-]", "", body)
text_only = re.sub(r"\s+", " ", text_only).strip()
if len(text_only) > 600:
return None
redirect_matches = list(_REDIRECT_PHRASE_RE.finditer(body))
if not redirect_matches:
return None
evidence = redirect_matches[0].group(0).strip()
return {
"check": "redirect_chain",
"bundle_id": bundle_id, "page_id": page_id,
"page_url": _source_url(bundle_id, page_id),
"body_chars": len(text_only),
"redirect_phrase": evidence[:200],
"confidence": "medium",
"summary": (f"Page is {len(text_only)} chars of body text with a "
f'"see ... for ..." redirect: "{evidence[:120]}". '
"Inspect with get_page to confirm."),
}
@mcp.tool()
def find_doc_inconsistencies(
scope_query: Annotated[str, Field(description="Natural-language scope describing what slice to scan. Used as a search to pick candidate pages. Examples: 'backup configuration', 'HVM cluster setup', 'VME manager installation'.")],
version: Annotated[str | None, Field(description="OPTIONAL version filter — e.g. '8.1.2'.")] = None,
platform: Annotated[str | None, Field(description="OPTIONAL platform filter (HVM bundles don't set platform — usually leave None).")] = None,
bundle_id: Annotated[str | None, Field(description="OPTIONAL specific bundle slug to restrict scanning to.")] = None,
max_pages: Annotated[int, Field(description="How many candidate pages to inspect.", ge=5, le=200)] = 30,
checks: Annotated[list[str] | None, Field(description="Which checks to run. Available: 'cross_version_drift', 'redirect_chain'. Defaults to all.")] = None,
) -> str:
"""Scan a scoped set of HVM docs pages for likely documentation bugs.
Surfaces concrete candidates for human review — NOT a stream of
bugs to auto-submit. Workflow:
1. Run this against a focused scope.
2. Review each finding; many will be false positives.
3. For real bugs, drill in with `get_page` / `diff_versions`.
4. Draft a bug report; show the operator; ask explicitly.
5. Only then call `submit_doc_bug`. One bug = one confirmation.
**Do NOT loop submissions.** Even on "submit them all", confirm each
one individually. HPE's docs queue is a shared resource.
"""
with TimedCall("find_doc_inconsistencies", {
"scope_query": scope_query, "version": version, "platform": platform,
"bundle_id": bundle_id, "max_pages": max_pages, "checks": checks,
}) as _call:
all_checks = {"cross_version_drift", "redirect_chain"}
requested = all_checks if checks is None else {c for c in checks if c in all_checks}
if not requested:
_call.set(error="no_valid_checks")
return f"No valid checks requested. Available: {sorted(all_checks)}."
try:
col = _collection()
except Exception as e:
_call.set(error=f"collection: {e}")
return f"Couldn't open Chroma collection: {e}"
where = _build_where(version, platform, bundle_id)
try:
res = col.query(query_texts=[scope_query], n_results=max_pages * 3,
where=where, include=["metadatas"])
except Exception as e:
_call.set(error=f"query: {e}")
return f"Scope query failed: {e}"
seen: set[tuple[str, str]] = set()
candidates: list[tuple[str, str]] = []
for meta in (res.get("metadatas") or [[]])[0]:
key = (meta.get("bundle_id") or "", meta.get("page_id") or "")
if not key[0] or not key[1] or key in seen:
continue
seen.add(key)
candidates.append(key)
if len(candidates) >= max_pages:
break
_call.set(pages_inspected=len(candidates), checks=sorted(requested))
if not candidates:
return f"No pages matched scope `{scope_query}`."
findings: dict[str, list[dict]] = {c: [] for c in requested}
for bid, pid in candidates:
data = _read_page(bid, pid)
if data is None:
continue
md, meta = data
if "cross_version_drift" in requested:
f = _check_cross_version_drift(bid, pid, md, meta)
if f:
findings["cross_version_drift"].append(f)
if "redirect_chain" in requested:
f = _check_redirect_chain(bid, pid, md, meta)
if f:
findings["redirect_chain"].append(f)
findings["cross_version_drift"] = sorted(
findings.get("cross_version_drift", []),
key=lambda f: (-(1 if f["confidence"] == "high" else 0), -f["churn_lines"]))
findings["redirect_chain"] = sorted(
findings.get("redirect_chain", []), key=lambda f: f["body_chars"])
total = sum(len(v) for v in findings.values())
_call.set(findings_total=total,
findings_by_check={k: len(v) for k, v in findings.items()})
lines = [
f"# Doc inconsistency scan — {len(candidates)} pages inspected", "",
f"_Scope_: `{scope_query}` • _Filters_: version={version}, platform={platform}, bundle_id={bundle_id} • _Checks_: {sorted(requested)}", "",
f"**{total} candidate finding{'' if total == 1 else 's'}.** Review each individually. "
"For real bugs, follow up with `get_page` / `diff_versions`, draft the report, "
"show the operator, and only call `submit_doc_bug` after explicit confirmation.", "",
]
if not total:
lines.append("_No findings in this scope._")
return "\n".join(lines)
for check in sorted(requested):
items = findings.get(check, [])
lines += [f"## {check} ({len(items)})", ""]
if not items:
lines.append("_No findings for this check._\n")
continue
for i, f in enumerate(items, 1):
lines.append(f"### {i}. `{f['bundle_id']}/{f['page_id']}` *({f['confidence']} confidence)*")
lines.append(f"- URL: {f['page_url']}")
lines.append(f"- {f['summary']}")
if check == "cross_version_drift":
lines.append(f"- Peer: `{f['peer_bundle_id']}/{f['peer_page_id']}` • churn: {f['churn_lines']} lines ({f['churn_pct_of_file']}% of file)")
elif check == "redirect_chain":
lines.append(f"- Body length: {f['body_chars']} chars • Phrase: *\"{f['redirect_phrase']}\"*")
lines.append("")
lines += ["---",
"_Reminder: `submit_doc_bug` has a real side effect. Draft → show → confirm → submit, one at a time. Do not loop._"]
return "\n".join(lines)
# --- submit_doc_bug ----------------------------------------------------------
# HPE Support DocPortal's "Was this helpful?" widget POSTs to an endpoint
# we haven't sniffed yet. Until DOC_BUG_API_URL is set AND
# DOC_BUG_SUBMIT_ENABLED=true, this tool refuses submission and tells the
# operator to paste manually. When you sniff the endpoint, set both env
# vars and verify the payload shape against the schema below.
_DOC_BUG_ALLOWED_HOSTS = {"support.hpe.com"}
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
@mcp.tool()
def submit_doc_bug(
page_url: Annotated[str, Field(description="Full URL of the support.hpe.com page the bug is about. Must be a support.hpe.com URL.")],
content: Annotated[str, Field(description="Body of the bug report. Be specific: what the page says, what's wrong, what it should say. Cite exact passages. The docs team reads it verbatim.")],
email: Annotated[str | None, Field(description="OPTIONAL submitter email for follow-up. Omit if anonymous.")] = None,
rating: Annotated[int | None, Field(description="OPTIONAL star rating 1-5 (1-2 for serious bugs, 3 unclear, 4-5 only on explicit request).")] = None,
like: Annotated[bool | None, Field(description="OPTIONAL thumbs-up/down. False for bugs, True for positive feedback.")] = None,
) -> str:
"""Submit a documentation bug to HPE's docs feedback channel.
**⚠️ THIS TOOL HAS A REAL SIDE EFFECT (when enabled). It POSTs to
HPE's docs feedback endpoint and the submission lands in their queue.**
**MANDATORY operator-confirmation workflow:**
1. Draft the bug content yourself. Show the operator the exact text
you intend to submit + the page URL + any rating/email fields.
2. Ask explicitly: *"Submit this bug? (yes/no)"*
3. Only call submit_doc_bug AFTER they answer yes.
4. If they say *"submit them all"*, STILL confirm each one. This
tool MUST NOT be called in a loop without per-bug consent.
**Do not call this autonomously.** Don't preemptively submit while
exploring inconsistencies. Don't call inside an agent loop without
a human in the loop. Misuse will get this MCP blocked at HPE's WAF.
**What makes a good bug report:**
- Specific page URL. One bug = one page.
- Concrete quote of the problem text + version/platform context.
- Suggested correction when you have one.
- Avoid editorializing — factual bugs and broken links best.
"""
with TimedCall("submit_doc_bug", {
"page_url": page_url, "content_len": len(content or ""),
"email_present": bool(email), "rating": rating, "like": like,
}) as _call:
if not DOC_BUG_SUBMIT_ENABLED:
_call.set(error="disabled", outcome="refused_disabled")
return (
"submit_doc_bug is disabled on this MCP deployment "
"(DOC_BUG_SUBMIT_ENABLED is not set). The operator's draft is good — "
f"they can paste it into the feedback widget on {page_url} themselves.\n\n"
"_(For maintainers: sniff HPE's feedback endpoint, set DOC_BUG_API_URL "
"to the POST target, and DOC_BUG_SUBMIT_ENABLED=true to activate.)_"
)
if not DOC_BUG_API_URL:
_call.set(error="no_endpoint", outcome="refused_disabled")
return ("submit_doc_bug is enabled but DOC_BUG_API_URL is empty. "
f"Operator should paste manually at {page_url}.")
if not content or not content.strip():
_call.set(error="empty_content", outcome="refused_invalid")
return "Refused: empty `content`."
if len(content) > 10000:
_call.set(error="content_too_long", outcome="refused_invalid")
return f"Refused: `content` is {len(content)} chars (cap 10000)."
try:
from urllib.parse import urlparse
parsed = urlparse(page_url)
except Exception as e:
_call.set(error=f"url_parse: {e}", outcome="refused_invalid")
return f"Refused: couldn't parse page_url ({e})."
if parsed.scheme not in ("http", "https"):
_call.set(error="bad_scheme", outcome="refused_invalid")
return f"Refused: scheme must be http(s), got {parsed.scheme!r}."
if parsed.hostname not in _DOC_BUG_ALLOWED_HOSTS:
_call.set(error=f"bad_host: {parsed.hostname}", outcome="refused_invalid")
return (f"Refused: page_url host {parsed.hostname!r} isn't a "
f"support.hpe.com URL. submit_doc_bug only accepts bugs against HPE Support pages.")
if email is not None and not _EMAIL_RE.match(email):
_call.set(error="bad_email", outcome="refused_invalid")
return f"Refused: email {email!r} doesn't look valid. Omit if anonymous."
if rating is not None and not (1 <= rating <= 5):
_call.set(error="bad_rating", outcome="refused_invalid")
return f"Refused: rating must be 1-5, got {rating}."
href = f"{parsed.scheme}://{parsed.hostname}{parsed.path}{('?' + parsed.query) if parsed.query else ''}"
payload: dict = {"content": content, "href": href}
if email:
payload["email"] = email
if rating is not None:
payload["rating"] = rating
if like is not None:
payload["like"] = like
try:
import httpx
except ImportError:
_call.set(error="httpx_missing", outcome="refused_runtime")
return "Refused: httpx not available."
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "hvm-docs-mcp submit_doc_bug",
"Origin": "https://support.hpe.com",
"Referer": href,
}
try:
with httpx.Client(timeout=DOC_BUG_TIMEOUT) as c:
r = c.post(DOC_BUG_API_URL, json=payload, headers=headers)
except httpx.RequestError as e:
_call.set(error=f"transport: {e}", outcome="failed_transport")
return f"Submission failed (transport): {e}"
comment_id: object = None
body_summary = ""
try:
resp_json = r.json()
comment_id = resp_json.get("commentId") or resp_json.get("id")
body_summary = json.dumps(resp_json)[:300]
except (ValueError, json.JSONDecodeError):
body_summary = (r.text or "")[:300]
_call.set(http_status=r.status_code, comment_id=comment_id,
outcome=("submitted" if r.is_success else "rejected_upstream"))
if r.is_success:
id_note = f" (commentId={comment_id})" if comment_id else ""
return f"Submitted. HTTP {r.status_code}{id_note}. HPE docs team will see this for {href}."
if r.status_code in (401, 403, 429):
return (f"Submission rejected upstream (HTTP {r.status_code}). "
"Likely captcha/auth/rate-limit on anonymous POSTs. "
f"Operator can paste manually at {href}.\n\nResponse (truncated): {body_summary}")
return f"Submission rejected upstream (HTTP {r.status_code}). Response (truncated): {body_summary}"
# ===========================================================================