"""Summarize usage logs from docs_mcp.usage into a quick scan.
Reads one or more usage.jsonl* files and prints sections for:
- per-tool call counts
- top search_docs queries by frequency
- 0-hit queries (where we returned nothing — high-signal for tuning)
- filter usage histogram (which version / platform / bundle filters get hit)
- reranker effectiveness (calls where the reranker fired vs not)
- hybrid retrieval top-1 attribution (dense vs bm25 vs both)
Usage:
# Default: read /app/var/logs in the production container
python scripts/usage_report.py --logs-dir /path/to/usage/logs
# Last N days only:
python scripts/usage_report.py --logs-dir
--since 7d
# Markdown output (for piping into a weekly digest email, etc):
python scripts/usage_report.py --logs-dir --format markdown
The script doesn't depend on anything in the docs_mcp package — it's a
standalone tool that can run anywhere with the log files available
(scp them off the host, point it at the directory).
----------------------------------------------------------------------
FOLLOW-UP CHECKS
----------------------------------------------------------------------
Pattern: when you ship a retrieval change with a hypothesis attached
(e.g. "hybrid will rescue queries dense misses"), add a note HERE
describing what the usage report should show and at what threshold
the change earns its keep. Future-you running the report a month
later will be glad. Example:
Q: Does the dense leg of hybrid retrieval earn its keep on
real traffic, or could we simplify to BM25-only?
- bm25_only >= 80%% --> dense not doing much; consider
simplifying to BM25 mode
- both >= 50%% --> hybrid is tie-breaking; keep it
- dense_only > bm25_only --> dense is the workhorse; keep
Also worth a glance every month:
- 0-hit queries list (tuning candidates)
- reranker p95 latency drift (slow reranker = bad UX)
- filter usage (does anyone actually use version/platform
filters? if not, simplify the tool surface)
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterable
def parse_since(s: str | None) -> datetime | None:
"""Accept '7d', '24h', '30m', or an ISO timestamp. None → no cutoff."""
if not s:
return None
m = re.fullmatch(r"(\d+)([dhm])", s)
if m:
n, unit = int(m.group(1)), m.group(2)
delta = {"d": timedelta(days=n), "h": timedelta(hours=n), "m": timedelta(minutes=n)}[unit]
return datetime.now(timezone.utc) - delta
return datetime.fromisoformat(s.replace("Z", "+00:00"))
def load_events(logs_dir: Path, since: datetime | None) -> Iterable[dict[str, Any]]:
"""Yield every JSONL record across all files in logs_dir."""
if not logs_dir.exists():
print(f"warning: logs dir {logs_dir} does not exist", file=sys.stderr)
return
# usage.jsonl is the active file; usage.jsonl.YYYY-MM-DD are rotated.
files = sorted(logs_dir.glob("usage.jsonl*"))
for f in files:
with open(f) as fh:
for ln, line in enumerate(fh, start=1):
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError as e:
print(f" ! skipping {f}:{ln}: {e}", file=sys.stderr)
continue
if since:
ts = rec.get("ts", "")
try:
rec_ts = datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
continue
if rec_ts < since:
continue
yield rec
def main() -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--logs-dir", type=Path, default=Path("/app/var/logs"),
help="directory with usage.jsonl* files")
p.add_argument("--since", default=None,
help="time window: '7d', '24h', '30m', or ISO timestamp")
p.add_argument("--top", type=int, default=25,
help="how many top queries / filters to show")
p.add_argument("--format", choices=("text", "markdown"), default="text")
args = p.parse_args()
since = parse_since(args.since)
events = list(load_events(args.logs_dir, since))
if not events:
print("(no events in window)")
return 0
print(f"# Usage report — {len(events)} events"
+ (f" since {since.isoformat()}" if since else "")
+ f" from {args.logs_dir}")
print()
# 1. Per-tool counts
by_tool = Counter(e["tool"] for e in events)
print("## Per-tool call counts")
print()
if args.format == "markdown":
print("| tool | calls |")
print("|---|---|")
for tool, n in by_tool.most_common():
print(f"| `{tool}` | {n} |")
else:
for tool, n in by_tool.most_common():
print(f" {tool:<25s} {n:>6d}")
print()
# 2. Top search_docs queries
search_events = [e for e in events if e["tool"] == "search_docs"]
queries = Counter(e["args"].get("query", "") for e in search_events)
print(f"## Top {args.top} search_docs queries (of {len(search_events)} searches)")
print()
if args.format == "markdown":
print("| count | query |")
print("|---|---|")
for q, n in queries.most_common(args.top):
print(f"| {n} | `{q}` |")
else:
for q, n in queries.most_common(args.top):
print(f" {n:>5d} {q!r}")
print()
# 3. 0-hit queries — the highest-signal data for tuning
zero_hit = [e for e in search_events if e.get("hits_returned") == 0]
zero_q = Counter(e["args"].get("query", "") for e in zero_hit)
print(f"## 0-hit queries ({len(zero_hit)} of {len(search_events)} searches returned nothing)")
print()
if zero_q:
if args.format == "markdown":
print("| count | query | filters |")
print("|---|---|---|")
# Group by query, show filter examples for each
examples_by_query: dict[str, list[dict]] = defaultdict(list)
for e in zero_hit:
examples_by_query[e["args"].get("query", "")].append(e["args"])
for q, n in zero_q.most_common(args.top):
ex = examples_by_query[q][0]
f = {k: v for k, v in ex.items()
if k in ("version", "platform", "bundle_id") and v}
print(f"| {n} | `{q}` | `{f}` |")
else:
for q, n in zero_q.most_common(args.top):
print(f" {n:>5d} {q!r}")
else:
print(" _(no 0-hit queries in window)_")
print()
# 4. Filter usage
filter_use = Counter()
for e in search_events:
a = e["args"]
v = a.get("version")
p_ = a.get("platform")
b = a.get("bundle_id")
if v:
filter_use[f"version={v}"] += 1
if p_:
filter_use[f"platform={p_}"] += 1
if b:
filter_use[f"bundle_id={b}"] += 1
if not (v or p_ or b):
filter_use["(no filter)"] += 1
print(f"## search_docs filter usage")
print()
if args.format == "markdown":
print("| filter | count |")
print("|---|---|")
for f, n in filter_use.most_common(args.top):
print(f"| `{f}` | {n} |")
else:
for f, n in filter_use.most_common(args.top):
print(f" {n:>5d} {f}")
print()
# 5. Reranker effectiveness
reranked = [e for e in search_events if e.get("reranked") is True]
dense_only = [e for e in search_events if e.get("reranked") is False]
print(f"## Reranker activity")
print()
print(f" reranked: {len(reranked):>5d}")
print(f" dense only: {len(dense_only):>5d} (filter too narrow or 0 results)")
if reranked:
elapsed = [e["elapsed_ms"] for e in reranked if e.get("elapsed_ms") is not None]
if elapsed:
elapsed.sort()
p50 = elapsed[len(elapsed) // 2]
p95 = elapsed[int(len(elapsed) * 0.95)]
print(f" reranked latency p50: {p50:.0f} ms, p95: {p95:.0f} ms")
print()
# 6. Hybrid retrieval activity — which retriever contributed the top-1?
# Empty unless HYBRID_SEARCH=true is set on the MCP container.
hybrid_events = [e for e in search_events if e.get("retrieval_mode") == "hybrid"]
if hybrid_events:
by_source = Counter(e.get("top1_source") for e in hybrid_events
if e.get("top1_source"))
print("## Hybrid retrieval — top-1 attribution")
print()
print(f" hybrid mode events: {len(hybrid_events)}")
total = sum(by_source.values()) or 1
for src in ("both", "dense_only", "bm25_only"):
n = by_source.get(src, 0)
pct = 100.0 * n / total
label = {
"both": "in BOTH retrievers' top-N",
"dense_only": "dense found it, BM25 didn't",
"bm25_only": "BM25 found it, dense didn't",
}[src]
print(f" {src:<11s} {n:>5d} ({pct:5.1f}%) — {label}")
rescued = by_source.get("bm25_only", 0)
if rescued and total:
print(f"\n → {rescued} ({100.0 * rescued / total:.1f}%) of hybrid queries had the top-1 "
"result that ONLY BM25 surfaced. Without hybrid those would have been dense-misses.")
return 0
if __name__ == "__main__":
sys.exit(main())