seed-mcp scaffold: clone docs-mcp-template, customize for crop_seed PRODUCT_NAME
Image rebuild (skip scrape) / build (push) Failing after 7s

Sibling project to crop-chem-docs, same MCP-template lineage. Corpus is
seed/hybrid varieties across 6 vendors instead of pesticide labels.

What's customized vs. the template:
- CLAUDE.md: vendor matrix, build priority, Pioneer fallback policy,
  canonical sidecar schema (per-crop), Golden Harvest disease-scale
  reversal gotcha, no-IPv6 / HTTPS-clone note
- README.md: vendor coverage table, tool list, phase status
- Dockerfile: PRODUCT_NAME=crop_seed default, sources.json (not
  bundles.json), HYBRID_SEARCH=true, OLLAMA_URL + RERANK_URL Docker
  DNS defaults (same llama-rerank sidecar as crop-chem-docs)
- .gitea/workflows/refresh.yml: monthly cron (seed catalogs move
  slowly), 5 GREEN scraper steps, corpus-YYYY.MM.DD tag for Drawbar
  pinning, continue-on-error on GC step
- .gitea/workflows/image-only.yml: paths filter + cancel-in-progress
  concurrency group
- scripts/registry_gc.py: lifted from crop-chem-docs (correct Gitea
  packages API URL + UA header to bypass CF block on default
  Python-urllib UA)
- sources.json: catalog of 6 sources + scope_filter + per-source
  schema notes + Pioneer-exclusion rationale
- scrape/runner.py: dispatcher with --all = GREEN-only
- scrape/sources/{bayer_seeds,golden_harvest,nk,agripro,becks_pfr,
  becks_products}.py: stub modules with implementation notes
- docs_mcp/server.py: PRODUCT_NAME default → crop_seed,
  PRODUCT_DOCS_URL → repo URL

Pioneer is intentionally NOT a source. ToS bans automation; dealer
locator is login-gated. The MCP returns a curated fallback lesson
directing the user to pioneer.com.

Next phases:
- Phase 1: implement bayer_seeds (lift-and-shift from crop-chem-docs
  Bayer scraper; same __NEXT_DATA__ infra)
- Phase 7: curate eval/queries.jsonl
- Phase 11: lessons.md with Pioneer fallback + disease-scale notes

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 12:28:49 -04:00
commit ac40e05734
35 changed files with 3833 additions and 0 deletions
+167
View File
@@ -0,0 +1,167 @@
"""Gitea container-registry garbage collection.
Prunes old container tags from a Gitea registry package. Always
preserves:
- The ``latest`` tag (Watchtower auto-pull target)
- Any ``corpus-*`` tag (production pins; Drawbar may have them locked)
- The ``--keep-latest`` most-recent OTHER tags (typically commit-sha pins)
- Anything pushed within ``--keep-days`` days
The actual disk reclaim happens on Gitea's next package GC cron
(admin site settings). This script marks versions for deletion.
Why this script doesn't use the Docker Registry v2 API: that API has
tag listing + manifest delete by digest, but no per-tag created-at
timestamp without an extra blob-fetch round-trip. Gitea's packages
API gives us {tag, created_at} in one call, which is what the keep
policy needs.
The endpoint shape that actually works (matches Gitea 1.21+):
GET /api/v1/packages/{owner}?type=container&q={name}
→ JSON array, ONE entry per tag, each with id + version=tag + created_at
DELETE /api/v1/packages/{owner}/container/{name}/{tag}
→ 204 on success, 404 if already gone
Auth: GITEA_TOKEN env var (PAT with delete:packages scope; the
push-only PAT we use as REGISTRY_TOKEN may not be enough — if you
see 403s, mint a separate PAT and pass it as GITEA_TOKEN here).
Usage:
python scripts/registry_gc.py \\
--owner justin \\
--package crop-chem-docs \\
--keep-days 180 \\
--keep-latest 6
[--dry-run]
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from urllib.error import HTTPError
from urllib.request import Request, urlopen
GITEA_HOST = os.environ.get("GITEA_HOST", "https://git.jpaul.io")
def api(token: str, method: str, path: str) -> object:
# User-Agent matters: Cloudflare in front of git.jpaul.io returns
# 403 to the default `Python-urllib/3.x` UA. Any non-Python UA
# passes. Curl works, requests works, we just need to not look
# like a vanilla urllib script.
req = Request(
f"{GITEA_HOST}{path}",
headers={
"Authorization": f"token {token}",
"User-Agent": "crop-chem-docs-registry-gc/0.1",
},
method=method,
)
try:
with urlopen(req, timeout=30) as r:
body = r.read()
return json.loads(body) if body else None
except HTTPError as e:
if e.code == 404:
return None
raise
def _parse_created(version: dict) -> datetime:
"""Gitea returns RFC3339 with offset like '2026-05-24T16:07:50-04:00'.
Python 3.11+ handles this directly via fromisoformat."""
return datetime.fromisoformat(version["created_at"])
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--owner", required=True)
p.add_argument("--package", required=True)
p.add_argument("--keep-days", type=int, default=180)
p.add_argument("--keep-latest", type=int, default=6,
help="Keep this many most-recent commit-sha (etc.) "
"tags BEFORE applying --keep-days. corpus-* and "
":latest are kept regardless.")
p.add_argument("--dry-run", action="store_true",
help="Show what would be deleted without calling DELETE.")
args = p.parse_args()
token = os.environ.get("GITEA_TOKEN")
if not token:
print("GITEA_TOKEN env var not set", file=sys.stderr)
return 1
# Gitea's q= is a substring match; filter to exact name so we don't
# accidentally GC a sibling package that shares the prefix.
versions = api(
token, "GET",
f"/api/v1/packages/{args.owner}?type=container&q={args.package}",
) or []
versions = [v for v in versions if v.get("name") == args.package]
if not versions:
print(f"no versions found for {args.owner}/{args.package} — nothing to GC")
return 0
cutoff = datetime.now(timezone.utc) - timedelta(days=args.keep_days)
versions.sort(key=_parse_created, reverse=True) # newest first
keep: list[tuple[str, str]] = [] # (tag, reason)
delete: list[dict] = []
other_kept = 0
for v in versions:
tag = v.get("version", "")
created = _parse_created(v)
if tag == "latest":
keep.append((tag, "always-keep (:latest)"))
continue
if tag.startswith("corpus-"):
keep.append((tag, "production pin (corpus-*)"))
continue
if other_kept < args.keep_latest:
other_kept += 1
keep.append((tag, f"keep-latest #{other_kept}/{args.keep_latest}"))
continue
if created >= cutoff:
keep.append((tag, f"within --keep-days ({args.keep_days})"))
continue
delete.append(v)
print(f"=== {args.owner}/{args.package}: {len(versions)} total tag(s) ===")
for tag, reason in keep:
print(f" KEEP {tag:<28} {reason}")
for v in delete:
print(f" DEL {v['version']:<28} created={v['created_at']}")
if not delete:
print("nothing to delete")
return 0
if args.dry_run:
print(f"--dry-run; would delete {len(delete)} tag(s)")
return 0
failed = 0
for v in delete:
tag = v["version"]
try:
api(token, "DELETE",
f"/api/v1/packages/{args.owner}/container/{args.package}/{tag}")
print(f" ✓ deleted {tag}")
except HTTPError as e:
print(f" ✗ failed {tag}: HTTP {e.code} {e.reason}", file=sys.stderr)
failed += 1
print(f"done: deleted {len(delete) - failed} / {len(delete)} tag(s)")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())
+251
View File
@@ -0,0 +1,251 @@
"""Summarize usage logs from docs_mcp.usage into a quick scan.
Reads one or more usage.jsonl* files and prints sections for:
- per-tool call counts
- top search_docs queries by frequency
- 0-hit queries (where we returned nothing — high-signal for tuning)
- filter usage histogram (which version / platform / bundle filters get hit)
- reranker effectiveness (calls where the reranker fired vs not)
- hybrid retrieval top-1 attribution (dense vs bm25 vs both)
Usage:
# Default: read /app/var/logs in the production container
python scripts/usage_report.py --logs-dir /path/to/usage/logs
# Last N days only:
python scripts/usage_report.py --logs-dir <dir> --since 7d
# Markdown output (for piping into a weekly digest email, etc):
python scripts/usage_report.py --logs-dir <dir> --format markdown
The script doesn't depend on anything in the docs_mcp package — it's a
standalone tool that can run anywhere with the log files available
(scp them off the host, point it at the directory).
----------------------------------------------------------------------
FOLLOW-UP CHECKS
----------------------------------------------------------------------
Pattern: when you ship a retrieval change with a hypothesis attached
(e.g. "hybrid will rescue queries dense misses"), add a note HERE
describing what the usage report should show and at what threshold
the change earns its keep. Future-you running the report a month
later will be glad. Example:
Q: Does the dense leg of hybrid retrieval earn its keep on
real traffic, or could we simplify to BM25-only?
- bm25_only >= 80%% --> dense not doing much; consider
simplifying to BM25 mode
- both >= 50%% --> hybrid is tie-breaking; keep it
- dense_only > bm25_only --> dense is the workhorse; keep
Also worth a glance every month:
- 0-hit queries list (tuning candidates)
- reranker p95 latency drift (slow reranker = bad UX)
- filter usage (does anyone actually use version/platform
filters? if not, simplify the tool surface)
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterable
def parse_since(s: str | None) -> datetime | None:
"""Accept '7d', '24h', '30m', or an ISO timestamp. None → no cutoff."""
if not s:
return None
m = re.fullmatch(r"(\d+)([dhm])", s)
if m:
n, unit = int(m.group(1)), m.group(2)
delta = {"d": timedelta(days=n), "h": timedelta(hours=n), "m": timedelta(minutes=n)}[unit]
return datetime.now(timezone.utc) - delta
return datetime.fromisoformat(s.replace("Z", "+00:00"))
def load_events(logs_dir: Path, since: datetime | None) -> Iterable[dict[str, Any]]:
"""Yield every JSONL record across all files in logs_dir."""
if not logs_dir.exists():
print(f"warning: logs dir {logs_dir} does not exist", file=sys.stderr)
return
# usage.jsonl is the active file; usage.jsonl.YYYY-MM-DD are rotated.
files = sorted(logs_dir.glob("usage.jsonl*"))
for f in files:
with open(f) as fh:
for ln, line in enumerate(fh, start=1):
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError as e:
print(f" ! skipping {f}:{ln}: {e}", file=sys.stderr)
continue
if since:
ts = rec.get("ts", "")
try:
rec_ts = datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
continue
if rec_ts < since:
continue
yield rec
def main() -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--logs-dir", type=Path, default=Path("/app/var/logs"),
help="directory with usage.jsonl* files")
p.add_argument("--since", default=None,
help="time window: '7d', '24h', '30m', or ISO timestamp")
p.add_argument("--top", type=int, default=25,
help="how many top queries / filters to show")
p.add_argument("--format", choices=("text", "markdown"), default="text")
args = p.parse_args()
since = parse_since(args.since)
events = list(load_events(args.logs_dir, since))
if not events:
print("(no events in window)")
return 0
print(f"# Usage report — {len(events)} events"
+ (f" since {since.isoformat()}" if since else "")
+ f" from {args.logs_dir}")
print()
# 1. Per-tool counts
by_tool = Counter(e["tool"] for e in events)
print("## Per-tool call counts")
print()
if args.format == "markdown":
print("| tool | calls |")
print("|---|---|")
for tool, n in by_tool.most_common():
print(f"| `{tool}` | {n} |")
else:
for tool, n in by_tool.most_common():
print(f" {tool:<25s} {n:>6d}")
print()
# 2. Top search_docs queries
search_events = [e for e in events if e["tool"] == "search_docs"]
queries = Counter(e["args"].get("query", "") for e in search_events)
print(f"## Top {args.top} search_docs queries (of {len(search_events)} searches)")
print()
if args.format == "markdown":
print("| count | query |")
print("|---|---|")
for q, n in queries.most_common(args.top):
print(f"| {n} | `{q}` |")
else:
for q, n in queries.most_common(args.top):
print(f" {n:>5d} {q!r}")
print()
# 3. 0-hit queries — the highest-signal data for tuning
zero_hit = [e for e in search_events if e.get("hits_returned") == 0]
zero_q = Counter(e["args"].get("query", "") for e in zero_hit)
print(f"## 0-hit queries ({len(zero_hit)} of {len(search_events)} searches returned nothing)")
print()
if zero_q:
if args.format == "markdown":
print("| count | query | filters |")
print("|---|---|---|")
# Group by query, show filter examples for each
examples_by_query: dict[str, list[dict]] = defaultdict(list)
for e in zero_hit:
examples_by_query[e["args"].get("query", "")].append(e["args"])
for q, n in zero_q.most_common(args.top):
ex = examples_by_query[q][0]
f = {k: v for k, v in ex.items()
if k in ("version", "platform", "bundle_id") and v}
print(f"| {n} | `{q}` | `{f}` |")
else:
for q, n in zero_q.most_common(args.top):
print(f" {n:>5d} {q!r}")
else:
print(" _(no 0-hit queries in window)_")
print()
# 4. Filter usage
filter_use = Counter()
for e in search_events:
a = e["args"]
v = a.get("version")
p_ = a.get("platform")
b = a.get("bundle_id")
if v:
filter_use[f"version={v}"] += 1
if p_:
filter_use[f"platform={p_}"] += 1
if b:
filter_use[f"bundle_id={b}"] += 1
if not (v or p_ or b):
filter_use["(no filter)"] += 1
print(f"## search_docs filter usage")
print()
if args.format == "markdown":
print("| filter | count |")
print("|---|---|")
for f, n in filter_use.most_common(args.top):
print(f"| `{f}` | {n} |")
else:
for f, n in filter_use.most_common(args.top):
print(f" {n:>5d} {f}")
print()
# 5. Reranker effectiveness
reranked = [e for e in search_events if e.get("reranked") is True]
dense_only = [e for e in search_events if e.get("reranked") is False]
print(f"## Reranker activity")
print()
print(f" reranked: {len(reranked):>5d}")
print(f" dense only: {len(dense_only):>5d} (filter too narrow or 0 results)")
if reranked:
elapsed = [e["elapsed_ms"] for e in reranked if e.get("elapsed_ms") is not None]
if elapsed:
elapsed.sort()
p50 = elapsed[len(elapsed) // 2]
p95 = elapsed[int(len(elapsed) * 0.95)]
print(f" reranked latency p50: {p50:.0f} ms, p95: {p95:.0f} ms")
print()
# 6. Hybrid retrieval activity — which retriever contributed the top-1?
# Empty unless HYBRID_SEARCH=true is set on the MCP container.
hybrid_events = [e for e in search_events if e.get("retrieval_mode") == "hybrid"]
if hybrid_events:
by_source = Counter(e.get("top1_source") for e in hybrid_events
if e.get("top1_source"))
print("## Hybrid retrieval — top-1 attribution")
print()
print(f" hybrid mode events: {len(hybrid_events)}")
total = sum(by_source.values()) or 1
for src in ("both", "dense_only", "bm25_only"):
n = by_source.get(src, 0)
pct = 100.0 * n / total
label = {
"both": "in BOTH retrievers' top-N",
"dense_only": "dense found it, BM25 didn't",
"bm25_only": "BM25 found it, dense didn't",
}[src]
print(f" {src:<11s} {n:>5d} ({pct:5.1f}%) — {label}")
rescued = by_source.get("bm25_only", 0)
if rescued and total:
print(f"\n{rescued} ({100.0 * rescued / total:.1f}%) of hybrid queries had the top-1 "
"result that ONLY BM25 surfaced. Without hybrid those would have been dense-misses.")
return 0
if __name__ == "__main__":
sys.exit(main())