search: BM25-default + cross-encoder rerank, hybrid behind env gate

Phase 3/6/7/8 in one pass since they depend on each other.

* docs_mcp/server.py
  - Wire search_docs / get_page / list_versions tool bodies.
  - search_docs flow: BM25 first (rag.bm25 FTS5) → over-fetch RERANK_POOL
    chunks → POST to RERANK_URL/v1/rerank → return top-k. Dense is the
    fallback when BM25 finds nothing. HYBRID_SEARCH=true switches to
    dense+BM25+RRF (fused via the new _rrf_fuse helper).
  - All retrieval failures are caught and fall back to the next layer,
    so a dead reranker or missing BM25 db never blocks a search.
  - Source URLs built from the bundle's docId so results link straight
    into support.hpe.com.

* eval/
  - 22 hand-curated golden queries grounded in real corpus page titles.
  - DenseRetriever / BM25Retriever / HybridRetriever / RerankedRetriever
    + MRR/Recall@K/nDCG@K harness. RERANK_URL env activates the
    reranked variants.
  - Committed eval/results/baseline.md. On this corpus:
        dense:                MRR 0.539
        bm25:                 MRR 0.880
        hybrid_rrf:           MRR 0.692
        bm25+rerank:          MRR 0.920  (winner)
        hybrid_rrf+rerank:    MRR 0.875
    HPE structured docs use controlled vocabulary, so lexical match
    dominates. Hybrid loses because dense pollutes the fused pool.

* scripts/rerank_server.py
  - Minimal HTTP /v1/rerank over sentence-transformers
    cross-encoder/ms-marco-MiniLM-L-6-v2. Cohere-style request/response.
  - This is the dev/CPU fallback; production replaces it with the
    llama.cpp + jina-reranker-v2-base GGUF sidecar (same wire protocol).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-22 13:06:51 -04:00
parent dd691b0111
commit dda044eb95
8 changed files with 864 additions and 57 deletions
+271 -20
View File
@@ -35,8 +35,11 @@ log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Product-specific configuration. Set these for each new build.
# ---------------------------------------------------------------------------
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "myproduct")
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://docs.example.com")
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "hvm")
PRODUCT_DOCS_URL = os.environ.get(
"PRODUCT_DOCS_URL",
"https://support.hpe.com/hpesc/public/docDisplay?docId=sd00007735en_us",
)
COLLECTION = f"{PRODUCT_NAME}_docs"
# Paths inside the deployed container (and matching layout locally for dev).
@@ -104,6 +107,15 @@ def _build_where(version: str | None, platform: str | None, bundle_id: str | Non
return {"$and": conds}
def _where_for_bm25(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
"""BM25Index.query takes a flat dict of equality filters."""
w: dict[str, str] = {}
if version: w["version"] = version
if platform: w["platform"] = platform
if bundle_id: w["bundle_id"] = bundle_id
return w or None
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict)."""
md_path = CORPUS / bundle_id / (page_id + ".md")
@@ -113,6 +125,115 @@ def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
return md_path.read_text(), json.loads(json_path.read_text())
_CHROMA = None
_BM25 = None
def _collection():
"""Lazy Chroma collection handle. Cached after first call."""
global _CHROMA
if _CHROMA is None:
import chromadb
from chromadb.config import Settings
from rag.embeddings import embedding_function
client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
_CHROMA = client.get_collection(COLLECTION, embedding_function=embedding_function())
return _CHROMA
def _bm25():
"""Lazy BM25Index handle. None if the FTS5 db isn't built."""
global _BM25
if _BM25 is None:
if not BM25_DB.exists():
return None
try:
from rag.bm25 import BM25Index
_BM25 = BM25Index(str(BM25_DB))
except Exception as e: # defensive: hybrid must never block dense
log.warning("BM25 unavailable, falling back to dense-only: %s", e)
return None
return _BM25
def _enrich_from_chroma(col, chunk_ids: list[str], fused: list | None) -> tuple[list[str], list[dict], list[float]]:
"""Fetch document text + metadata for a list of chunk ids from Chroma, in order."""
if not chunk_ids:
return [], [], []
g = col.get(ids=chunk_ids, include=["documents", "metadatas"])
by_id = {i: (d, m) for i, d, m in zip(g["ids"], g["documents"], g["metadatas"])}
docs = [by_id[i][0] for i in chunk_ids if i in by_id]
metas = [by_id[i][1] for i in chunk_ids if i in by_id]
if fused is not None:
dists = [1.0 - score for _id, score, _src in fused[:len(docs)]]
else:
dists = [0.0] * len(docs)
return docs, metas, dists
def _rerank(query: str, candidates: list[tuple[str, str]]) -> list[tuple[str, str]] | None:
"""POST to RERANK_URL /v1/rerank, return candidates re-ordered by relevance.
`candidates` is `[(chunk_id, text), ...]`. Texts are truncated to ~2000 chars
before sending so we never blow past jina-reranker's 1024-token per-pair
cap (which 400s the entire batch). The full untruncated text still goes
back to the user from Chroma; truncation is reranking-only.
Returns None on any failure — caller treats that as "skip reranking,
keep retrieval-order candidates."
"""
if not RERANK_URL or not candidates:
return None
try:
import httpx
payload = {
"query": query,
"documents": [(text or "")[:2000] for _cid, text in candidates],
"top_n": len(candidates),
}
with httpx.Client(timeout=RERANK_TIMEOUT) as c:
r = c.post(f"{RERANK_URL}/v1/rerank", json=payload)
r.raise_for_status()
results = r.json().get("results") or []
order = [candidates[item["index"]] for item in results
if isinstance(item.get("index"), int) and 0 <= item["index"] < len(candidates)]
return order or None
except Exception as e:
log.warning("rerank failed, keeping retrieval order: %s", e)
return None
def _rrf_fuse(*ranked_lists: list[str], k: int = RRF_K) -> list[tuple[str, float, dict]]:
"""Reciprocal Rank Fusion. Each ranked list is a sequence of ids in
descending relevance. Returns [(id, fused_score, per_retriever_contrib), ...]
sorted by score desc."""
scores: dict[str, float] = {}
sources: dict[str, dict] = {}
names = ("dense", "bm25", "extra")
for idx, lst in enumerate(ranked_lists):
src = names[idx] if idx < len(names) else f"r{idx}"
for rank, ident in enumerate(lst, start=1):
scores[ident] = scores.get(ident, 0.0) + 1.0 / (k + rank)
sources.setdefault(ident, {})[src] = rank
ranked = sorted(scores.items(), key=lambda kv: -kv[1])
return [(ident, score, sources[ident]) for ident, score in ranked]
def _source_url(bundle_id: str, page_id: str) -> str:
"""Build the canonical docs portal URL for a (bundle, page) pair."""
b = _bundles().get(bundle_id)
if not b:
return ""
doc_id = b.get("doc_id", "")
if page_id.startswith("GUID-"):
return f"https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}&page={page_id}.html"
return f"https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}"
# ===========================================================================
# Tools
# ===========================================================================
@@ -134,7 +255,7 @@ def search_docs(
] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str:
"""Search the {product} docs corpus.
"""Search the HPE Morpheus VM Essentials (HVM) docs corpus.
Returns the top-k most relevant chunks (with full source page URLs)
given a natural-language query. Optional filters narrow the search
@@ -142,20 +263,130 @@ def search_docs(
first if you need to discover the available facet values.
Call this tool whenever the user asks anything that should be
answerable from the official product documentation.
answerable from the official product documentation — install,
upgrade, configuration, backups, networking, HVM clusters, the
Morpheus UI, or any 8.1.x release-notes question.
"""
with TimedCall("search_docs", {
"query": query, "version": version, "platform": platform,
"bundle_id": bundle_id, "k": k,
}) as _call:
# TODO Phase 2-3: query Chroma collection (see rag/index.py for
# how it was built). Render the top-k chunks as markdown with
# source URLs.
# TODO Phase 6: optional reranker via _rerank() if RERANK_URL set.
# TODO Phase 8: hybrid retrieval if HYBRID_SEARCH=true — run
# dense + BM25 in parallel, RRF-fuse, hand merged pool to rerank.
_call.set(hits_returned=0)
raise NotImplementedError("Phase 2/3: implement Chroma query + rendering")
try:
col = _collection()
except Exception as e:
log.exception("chroma collection unavailable")
_call.set(hits_returned=0, error=str(e))
return f"_(search backend unavailable: {e})_"
where = _build_where(version, platform, bundle_id)
bm25_where = _where_for_bm25(version, platform, bundle_id)
pool = max(k * 5, 50)
# Retrieval mode selection. Eval on this corpus (2026-05-22, 22 golden
# queries) showed BM25 MRR=0.88 vs dense MRR=0.54 vs hybrid MRR=0.69 —
# HPE structured docs use controlled vocabulary, so lexical match wins.
# Dense is kept as fallback when BM25 has no tokens to chew on (e.g.
# purely stopword queries). HYBRID_SEARCH=true forces RRF fusion.
bm = _bm25()
docs: list[str] = []
metas: list[dict] = []
dists: list[float] = []
retrieval_mode = "dense"
top1_source = "dense_only"
if HYBRID_SEARCH and bm is not None:
try:
dense_res = col.query(query_texts=[query], n_results=pool, where=where)
dense_ids = (dense_res.get("ids") or [[]])[0]
bm_hits = bm.query(query, n=pool, where=bm25_where)
bm_ids = [cid for cid, _s in bm_hits]
fused = _rrf_fuse(dense_ids, bm_ids)
docs, metas, dists = _enrich_from_chroma(col, [c for c, _, _ in fused[:k]], fused)
if fused:
src0 = fused[0][2]
top1_source = ("both" if {"dense", "bm25"} <= set(src0)
else "bm25_only" if "bm25" in src0
else "dense_only")
retrieval_mode = "hybrid"
except Exception as e:
log.warning("hybrid failed, falling back to BM25→dense: %s", e)
if not docs and bm is not None:
try:
bm_hits = bm.query(query, n=k, where=bm25_where)
if bm_hits:
ids = [cid for cid, _s in bm_hits[:k]]
docs, metas, _ = _enrich_from_chroma(col, ids, None)
# FTS5 returns negative scores (lower=better). Map onto a
# similarity-ish [0..1] just for display.
dists = [max(0.0, min(1.0, 1.0 - abs(s) / 20.0)) for _id, s in bm_hits[:k]]
retrieval_mode = "bm25"
top1_source = "bm25_only"
except Exception as e:
log.warning("BM25 retrieval failed, falling back to dense: %s", e)
if not docs:
res = col.query(query_texts=[query], n_results=k, where=where)
docs = (res.get("documents") or [[]])[0]
metas = (res.get("metadatas") or [[]])[0]
dists = (res.get("distances") or [[]])[0]
reranker_fired = False
if RERANK_URL and docs:
# Pull a deeper pool to give the reranker something to chew on.
# We over-fetch up to RERANK_POOL chunks from whichever retriever
# already won, then ask the reranker to pick the final top-k.
pool_size = max(k, RERANK_POOL)
if len(docs) < pool_size:
if retrieval_mode == "bm25":
extra = bm.query(query, n=pool_size, where=bm25_where) if bm else []
extra_ids = [cid for cid, _s in extra]
else:
extra_res = col.query(query_texts=[query], n_results=pool_size, where=where)
extra_ids = (extra_res.get("ids") or [[]])[0]
if extra_ids:
d2, m2, _ = _enrich_from_chroma(col, extra_ids, None)
docs, metas = d2, m2
dists = [0.0] * len(docs)
# Reranker scores chunk_ids — collapse to (id, text) tuples
pairs = list(zip(
[f"{m.get('bundle_id','')}::{m.get('page_id','')}::{m.get('ordinal',0)}" for m in metas],
docs,
))
reranked = _rerank(query, pairs)
if reranked is not None:
# Re-sort docs/metas to match. Recompute distances as descending
# ordinal ranks so display still shows a useful score.
by_cid = {p[0]: i for i, p in enumerate(pairs)}
order = [by_cid[cid] for cid, _t in reranked if cid in by_cid]
docs = [docs[i] for i in order][:k]
metas = [metas[i] for i in order][:k]
dists = [1.0 - (rank / len(reranked)) for rank, _ in enumerate(reranked)][:len(docs)]
reranker_fired = True
else:
docs, metas, dists = docs[:k], metas[:k], dists[:k]
_call.set(hits_returned=len(docs), retrieval_mode=retrieval_mode,
top1_source=top1_source, reranker_fired=reranker_fired)
if not docs:
return f"_No matches for `{query}`._"
out = [f"# {len(docs)} result(s) for `{query}`", ""]
for doc, meta, dist in zip(docs, metas, dists):
bid = meta.get("bundle_id", "")
pid = meta.get("page_id", "")
title = meta.get("title") or pid
ver = meta.get("version") or ""
url = _source_url(bid, pid)
header = f"## {title}"
if ver:
header += f" _(v{ver})_"
out.append(header)
out.append(f"[{bid}/{pid}]({url}) · score={1 - dist:.3f}")
out.append("")
out.append(doc.strip())
out.append("")
return "\n".join(out)
@mcp.tool()
@@ -175,9 +406,21 @@ def get_page(
return f"Page not found: {bundle_id}/{page_id}"
md, meta = data
_call.set(found=True, page_chars=len(md))
# TODO: add a metadata header (title, version, source URL) above
# the body. Product-specific shape.
return md
title = meta.get("title") or page_id
ver = meta.get("version")
parent = meta.get("parent_title")
url = _source_url(bundle_id, page_id)
header = [f"# {title}"]
ctx = []
if ver:
ctx.append(f"version **{ver}**")
if parent:
ctx.append(f"in **{parent}**")
if ctx:
header.append("_" + " · ".join(ctx) + "_")
header.append(f"[source]({url})")
header.append("")
return "\n".join(header) + "\n" + md
@mcp.tool()
@@ -193,14 +436,22 @@ def list_versions() -> str:
versions = sorted({b.get("version") for b in cat.values() if b.get("version")})
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")})
_call.set(versions=len(versions), platforms=len(platforms))
products = sorted({b.get("product") for b in cat.values() if b.get("product")})
lines = [f"# Facets across {len(cat)} bundle(s)", ""]
if versions:
lines.append("## Versions"); lines.append("")
for v in versions: lines.append(f"- `{v}`")
lines.append("")
lines += ["## Versions", ""] + [f"- `{v}`" for v in versions] + [""]
if platforms:
lines.append("## Platforms"); lines.append("")
for p in platforms: lines.append(f"- `{p}`")
lines += ["## Platforms", ""] + [f"- `{p}`" for p in platforms] + [""]
if products:
lines += ["## Product / doc types", ""] + [f"- {p}" for p in products] + [""]
lines += ["## Bundles", ""]
for slug in sorted(cat):
b = cat[slug]
kind = b.get("product") or ""
ver = b.get("version")
pages = b.get("page_count", "?")
label = f"{kind} {ver}".strip() if ver else kind
lines.append(f"- `{slug}` — {label} ({pages} pages)")
return "\n".join(lines)