"""Run all retrievers against eval/queries.jsonl, emit a markdown report. For seed-mcp, the "expected" answer for many queries isn't a single chunk — it's "a chunk satisfying these constraints." So per-query scoring is one of: expected_source_keys — at least one of these source_keys appears in top-k (used for variety-code queries with a single canonical answer) expected_metadata — all top-k must match these key=value constraints (e.g. crop=corn, year=2024) expected_substrings — at least one top-k chunk's text/metadata contains each substring (e.g. "SCN" must appear when querying SCN resistance) must_not_contain_source_keys — anti-hallucination: NO top-k chunk's source_key may contain these tokens (Pioneer fallback queries) expected_empty — top-k MUST be empty (anti-hallucination) expect_lessons_call — the agent should call api_lessons; not measurable from retrieval alone, recorded as an advisory note Metrics computed per retriever: recall_known — fraction of queries where the retriever returned a chunk satisfying the query's expectations precision_top1 — fraction of queries where the FIRST result satisfied expectations mrr — mean reciprocal rank of the FIRST satisfying chunk Plus a per-query breakdown table so you can see exactly where each retriever wins or loses. Usage: python -m eval.run_eval \\ --queries eval/queries.jsonl \\ --k 5 \\ --rerank-url http://localhost:18080 \\ --output eval/results/baseline.md """ from __future__ import annotations import argparse import json import logging import os import sys import time from pathlib import Path # Add repo root for imports sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from eval.retrievers import build_all_retrievers # noqa: E402 logging.getLogger("chromadb").setLevel(logging.ERROR) logging.getLogger("httpx").setLevel(logging.ERROR) def load_queries(path: Path) -> list[dict]: with open(path) as fh: return [json.loads(line) for line in fh if line.strip()] def _doc_satisfies(meta: dict, doc: str, query_spec: dict) -> bool: """Does this single retrieved (metadata, doc) tuple satisfy the query spec? Used by the 'first satisfying' metric.""" sk = meta.get("source_key") or "" # exact source_key match if "expected_source_keys" in query_spec: for want in query_spec["expected_source_keys"]: if want.lower() == sk.lower(): return True return False # all metadata constraints match if "expected_metadata" in query_spec: for k, v in query_spec["expected_metadata"].items(): mv = meta.get(k) if isinstance(v, int): if mv != v: return False else: if (mv or "").lower() != str(v).lower(): return False # if no substring requirement, metadata match is enough if "expected_substrings" not in query_spec: return True # at least one substring present (in doc OR metadata values) if "expected_substrings" in query_spec: haystack = (doc + " " + " ".join(str(v) for v in meta.values())).lower() return any(s.lower() in haystack for s in query_spec["expected_substrings"]) return False def _evaluate_one(retriever, query_spec: dict, k: int, col) -> dict: """Return per-query metrics for one retriever.""" query = query_spec["query"] filters = dict(query_spec.get("filters") or {}) # search_trials queries imply data_type=trial; search_docs implies variety tool = query_spec.get("tool", "search_docs") if tool == "search_trials": filters.setdefault("data_type", "trial") elif tool == "search_docs": filters.setdefault("data_type", "variety") # 'product' is a server-side post-filter, not Chroma; strip product = filters.pop("product", None) t0 = time.monotonic() ids = retriever.retrieve(query, k, filters) elapsed_ms = (time.monotonic() - t0) * 1000 # Anti-hallucination queries: expected_empty should return nothing # (BUT we still allow the retriever to surface chunks if the # product filter would filter them out at the server level — so # we re-apply the product filter here). if product: try: extra = col.get(ids=ids, include=["documents"]) id_to_doc = dict(zip(extra.get("ids") or [], extra.get("documents") or [])) except Exception: id_to_doc = {} ids = [cid for cid in ids if product.lower() in id_to_doc.get(cid, "").lower()] if query_spec.get("expected_empty"): passed = len(ids) == 0 return { "query": query, "retriever": retriever.name, "k": k, "n_hits": len(ids), "rank_first_match": None, "passed": passed, "elapsed_ms": round(elapsed_ms, 1), "kind": "expected_empty", } if "must_not_contain_source_keys" in query_spec: bad_tokens = [t.lower() for t in query_spec["must_not_contain_source_keys"]] try: extra = col.get(ids=ids, include=["metadatas"]) metas = extra.get("metadatas") or [] except Exception: metas = [] # PASS = no top-k chunk's source_key contains a forbidden token for m in metas: sk = (m.get("source_key") or "").lower() if any(t in sk for t in bad_tokens): return { "query": query, "retriever": retriever.name, "k": k, "n_hits": len(ids), "rank_first_match": None, "passed": False, "elapsed_ms": round(elapsed_ms, 1), "kind": "must_not_contain", } return { "query": query, "retriever": retriever.name, "k": k, "n_hits": len(ids), "rank_first_match": None, "passed": True, "elapsed_ms": round(elapsed_ms, 1), "kind": "must_not_contain", } # Positive-match query: pull docs+meta and check each try: extra = col.get(ids=ids, include=["documents", "metadatas"]) docs = extra.get("documents") or [] metas = extra.get("metadatas") or [] ext_ids = extra.get("ids") or [] order_idx = {cid: i for i, cid in enumerate(ext_ids)} except Exception: docs = [] metas = [] order_idx = {} rank_first = None for rank, cid in enumerate(ids, start=1): i = order_idx.get(cid) if i is None: continue if _doc_satisfies(metas[i], docs[i], query_spec): rank_first = rank break return { "query": query, "retriever": retriever.name, "k": k, "n_hits": len(ids), "rank_first_match": rank_first, "passed": rank_first is not None, "elapsed_ms": round(elapsed_ms, 1), "kind": "positive", } def _aggregate(results: list[dict]) -> dict: """Aggregate per-query results into MRR / recall / precision@1.""" by_retriever: dict[str, list[dict]] = {} for r in results: by_retriever.setdefault(r["retriever"], []).append(r) out: dict[str, dict] = {} for name, rows in by_retriever.items(): n = len(rows) passed = sum(1 for r in rows if r["passed"]) ranks = [r["rank_first_match"] for r in rows if r["passed"] and r.get("rank_first_match")] mrr = sum(1.0 / r for r in ranks) / n if n else 0.0 precision1 = sum(1 for r in rows if r["passed"] and r.get("rank_first_match") == 1) / n if n else 0.0 avg_ms = sum(r["elapsed_ms"] for r in rows) / n if n else 0.0 out[name] = { "n_queries": n, "passed": passed, "recall_known": passed / n if n else 0.0, "precision_top1": precision1, "mrr": mrr, "avg_latency_ms": round(avg_ms, 1), } return out def _emit_markdown(queries: list[dict], results: list[dict], summary: dict, k: int) -> str: lines: list[str] = [] lines.append(f"# seed-mcp retrieval eval — k={k}") lines.append("") lines.append(f"_{len(queries)} golden queries × {len(summary)} retrievers_") lines.append("") lines.append("## Summary") lines.append("") lines.append("| Retriever | Passed | Recall | P@1 | MRR | Avg ms |") lines.append("|---|---|---|---|---|---|") for name in sorted(summary, key=lambda n: -summary[n]["mrr"]): s = summary[name] lines.append( f"| **{name}** | {s['passed']}/{s['n_queries']} " f"| {s['recall_known']:.2%} | {s['precision_top1']:.2%} " f"| {s['mrr']:.3f} | {s['avg_latency_ms']:.0f} |" ) lines.append("") lines.append("**Recall** = % of queries where ≥1 top-k chunk satisfied the spec. " "**P@1** = % where the very first result satisfied it. " "**MRR** = mean of `1 / rank-of-first-satisfying-result` (0 if missed).") lines.append("") # Per-query breakdown lines.append("## Per-query results") lines.append("") by_query: dict[str, list[dict]] = {} for r in results: by_query.setdefault(r["query"], []).append(r) retriever_names = sorted({r["retriever"] for r in results}) header = "| Query | " + " | ".join(retriever_names) + " |" sep = "|" + "---|" * (len(retriever_names) + 1) lines.append(header) lines.append(sep) for q in queries: cells = [f"`{q['query'][:60]}`"] for name in retriever_names: r = next((x for x in by_query.get(q["query"], []) if x["retriever"] == name), None) if r is None: cells.append("?") elif r["passed"]: rk = r.get("rank_first_match") cells.append(f"✅ #{rk}" if rk else "✅") else: cells.append("❌") lines.append("| " + " | ".join(cells) + " |") lines.append("") return "\n".join(lines) + "\n" def main() -> int: p = argparse.ArgumentParser() p.add_argument("--queries", type=Path, default=Path("eval/queries.jsonl")) p.add_argument("--k", type=int, default=5) p.add_argument("--output", type=Path, default=Path("eval/results/baseline.md")) p.add_argument("--rerank-url", default=os.environ.get("RERANK_URL", "")) p.add_argument("--product-name", default=os.environ.get("PRODUCT_NAME", "crop_seed")) args = p.parse_args() if not args.queries.exists(): print(f"queries file not found: {args.queries}") return 1 queries = load_queries(args.queries) print(f"loaded {len(queries)} queries") # Connect to Chroma + BM25 import chromadb from chromadb.config import Settings from rag.embeddings import embedding_function from rag.bm25 import BM25Index repo_root = Path(__file__).resolve().parent.parent client = chromadb.PersistentClient( path=str(repo_root / "chroma"), settings=Settings(anonymized_telemetry=False), ) col = client.get_collection(f"{args.product_name}_docs", embedding_function=embedding_function()) bm25 = BM25Index(repo_root / "bm25" / f"{args.product_name}_docs.db") print(f"chroma: {col.count()} chunks; bm25: {bm25.count()} chunks") retrievers = build_all_retrievers(col, bm25, args.rerank_url or None) print(f"retrievers: {[r.name for r in retrievers]}") all_results: list[dict] = [] for r in retrievers: print(f"running {r.name}...") for q in queries: res = _evaluate_one(r, q, args.k, col) all_results.append(res) summary = _aggregate(all_results) md = _emit_markdown(queries, all_results, summary, args.k) args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(md, encoding="utf-8") print(f"\nreport: {args.output}") print() # Print summary to stdout too for line in md.split("\n"): if line.startswith("|"): print(line) if line.startswith("## Per-query"): break return 0 if __name__ == "__main__": raise SystemExit(main())