Initial commit: ag-bids MCP server

Exposes live + historical ag-bids commodity data (from the ag-monitor
service at agbids.paul.farm) as MCP tools, sitting behind MetaMCP at
https://mcp.jpaul.io/metamcp/ag-bids/mcp.

Pattern mirrors zerto-docs-rag with one addition: HTTP Basic auth in
front of the streamable-HTTP transport so namespace guessers can't reach
the tools. Stdio transport is unaffected (used by local Claude Desktop
dev).

Tools (markdown returns, ~15 LOC each):
  best_local_bid(commodity)       — where to sell corn/soy/wheat today,
                                    for the current calendar month only
  current_lime_price()            — latest lime quotes ($/ton)
  current_input_price(product?)   — MAP / Potash / Lime
  latest_prices(...)              — filtered snapshot
  price_history(...)              — per-(source,delivery) trend
  list_sources / list_commodities / list_deliveries
  source_health()                 — healthy / stale / down buckets
  todays_summary()                — same shape as morning brief snapshot

Data path: ag-bids-mcp -> X-API-Key -> /api/data/* on ag-monitor
(reuses BRIEF_API_KEY).

Tests: 24 covering the httpx client, markdown formatters, HTTP Basic
middleware (401/200), and JSONL usage logging.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-20 11:34:38 -04:00
commit 875a190983
18 changed files with 1558 additions and 0 deletions
+21
View File
@@ -0,0 +1,21 @@
# --- MCP transport ---
# stdio for local Claude Desktop dev, streamable-http for the deployed container.
MCP_TRANSPORT=streamable-http
MCP_HOST=0.0.0.0
MCP_PORT=8000
# --- Upstream: ag-monitor (where the data actually lives) ---
# Reuse the same key ag-monitor already issues for /api/brief/* — both clients
# are internal-trusted, so a separate key is unnecessary friction.
AG_BIDS_API_URL=https://agbids.paul.farm
AG_BIDS_API_KEY=
AG_BIDS_API_TIMEOUT_SECS=20
# --- HTTP Basic auth (REQUIRED — server refuses to start without both) ---
# These guard the MCP itself. MetaMCP injects them on every upstream call.
AG_BIDS_MCP_USER=
AG_BIDS_MCP_PASS=
# --- Per-tool-call usage logging ---
USAGE_LOG_DIR=/app/var/logs
USAGE_LOG_KEEP_DAYS=90
+10
View File
@@ -0,0 +1,10 @@
.env
.env.local
__pycache__/
*.pyc
.venv/
venv/
.pytest_cache/
var/
*.log
.DS_Store
+29
View File
@@ -0,0 +1,29 @@
FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1
RUN apt-get update && apt-get install -y --no-install-recommends \
curl ca-certificates \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY ag_bids_mcp ./ag_bids_mcp
RUN mkdir -p /app/var/logs
# Streamable-HTTP transport in container; switch to stdio via env for dev.
ENV MCP_TRANSPORT=streamable-http \
MCP_HOST=0.0.0.0 \
MCP_PORT=8000 \
USAGE_LOG_DIR=/app/var/logs \
USAGE_LOG_KEEP_DAYS=90
EXPOSE 8000
CMD ["python", "-m", "ag_bids_mcp.server"]
+95
View File
@@ -0,0 +1,95 @@
# ag-bids-mcp
Model Context Protocol server exposing live + historical commodity price data
collected by [ag-monitor](https://git.jpaul.io/justin/ag-bids). Sits behind
the user's MetaMCP gateway at `https://mcp.jpaul.io/metamcp/ag-bids/mcp`.
## What it does
Lets an LLM client (Claude Desktop, OpenWebUI, anything that speaks MCP) ask
plain-English questions like:
- "What's the best place to sell corn today?"
- "What's the current price of lime?"
- "Show me the corn basis trend at Andersons Greenville over the last 30 days."
- "Which sources are stale or failing?"
## How it talks to data
All data is read from ag-monitor over HTTPS:
```
ag-bids-mcp ── X-API-Key ─► https://agbids.paul.farm/api/data/*
```
Endpoints used: `/api/data/latest`, `/history`, `/best`, `/inputs`, `/sources`,
`/deliveries`. See the [ag-monitor source](https://git.jpaul.io/justin/ag-bids)
for the contract.
## Authentication
This MCP enforces **HTTP Basic** auth in front of the FastMCP HTTP transport.
Set both:
```
AG_BIDS_MCP_USER=<your username>
AG_BIDS_MCP_PASS=<your password>
```
MetaMCP is configured to inject `Authorization: Basic <b64>` on every upstream
call. Direct access without the header returns `401`.
If either env var is unset the server refuses to start (fail closed).
## Local dev (stdio)
```bash
python3 -m venv venv && source venv/bin/activate
pip install -r requirements.txt
cp .env.example .env # fill in AG_BIDS_API_KEY + AG_BIDS_MCP_USER/PASS
MCP_TRANSPORT=stdio python -m ag_bids_mcp.server
```
Wire into Claude Desktop's `claude_desktop_config.json`:
```json
{
"mcpServers": {
"ag-bids": {
"command": "/path/to/venv/bin/python",
"args": ["-m", "ag_bids_mcp.server"],
"env": {
"MCP_TRANSPORT": "stdio",
"AG_BIDS_API_URL": "https://agbids.paul.farm",
"AG_BIDS_API_KEY": "...",
"AG_BIDS_MCP_USER": "x",
"AG_BIDS_MCP_PASS": "y"
}
}
}
}
```
(The Basic auth check is skipped automatically when `MCP_TRANSPORT=stdio` since
stdio has no HTTP layer.)
## Deploy (MetaMCP host)
See [deploy/README.md](deploy/README.md). Container image is pulled from
`git.jpaul.io/justin/ag-bids-mcp:latest`; Watchtower on the MetaMCP host
auto-updates it every 5 minutes.
## Tools exposed
| Tool | Returns |
|---|---|
| `best_local_bid(commodity)` | Where to sell `corn`, `soy`, or `wheat` for this month's delivery — markdown one-liner + table |
| `current_lime_price()` | Latest lime quotes across all manual-entry sources |
| `current_input_price(product?)` | MAP / Potash / Lime — all three or one |
| `latest_prices(commodity?, source?, delivery?)` | Snapshot table, optionally filtered |
| `price_history(commodity, source?, delivery?, days?)` | Compact time series |
| `list_sources()` | Active scrapers + last-success timestamps |
| `list_commodities()` | corn, soy, wheat, map, potash, lime |
| `list_deliveries(commodity)` | Posted delivery labels, chronological |
| `source_health()` | Stale / failing / healthy summary |
| `todays_summary()` | Same shape as the morning brief snapshot |
View File
+76
View File
@@ -0,0 +1,76 @@
"""HTTP Basic auth in front of the FastMCP Streamable-HTTP transport.
MetaMCP can inject ``Authorization: Basic <b64>`` on every upstream call,
so this is the simplest robust gate. Two env vars (``AG_BIDS_MCP_USER`` and
``AG_BIDS_MCP_PASS``) are required at process startup; the server fails
closed if either is missing.
Stdio transport (local dev with Claude Desktop) skips this entirely — no
HTTP layer exists in stdio mode.
"""
from __future__ import annotations
import base64
import logging
import os
import secrets
from starlette.requests import Request
from starlette.responses import PlainTextResponse, Response
log = logging.getLogger(__name__)
REALM = "ag-bids MCP"
def expected_credentials() -> tuple[str, str]:
"""Return the (user, pass) the server enforces. Raises if missing."""
u = os.environ.get("AG_BIDS_MCP_USER", "")
p = os.environ.get("AG_BIDS_MCP_PASS", "")
if not u or not p:
raise RuntimeError(
"AG_BIDS_MCP_USER and AG_BIDS_MCP_PASS must both be set for HTTP "
"Basic auth on the ag-bids MCP server."
)
return u, p
def _decode_basic(header: str) -> tuple[str, str] | None:
if not header or not header.lower().startswith("basic "):
return None
try:
decoded = base64.b64decode(header.split(" ", 1)[1]).decode("utf-8")
except (ValueError, UnicodeDecodeError):
return None
user, _, pw = decoded.partition(":")
return user, pw
def _check(presented_user: str, presented_pass: str) -> bool:
expected_user, expected_pass = expected_credentials()
return (
secrets.compare_digest(presented_user, expected_user)
and secrets.compare_digest(presented_pass, expected_pass)
)
def _unauthorized() -> Response:
return PlainTextResponse(
"Unauthorized",
status_code=401,
headers={"WWW-Authenticate": f'Basic realm="{REALM}"'},
)
async def basic_auth_middleware(request: Request, call_next):
"""Starlette middleware that 401s anything missing valid Basic creds."""
creds = _decode_basic(request.headers.get("authorization", ""))
if creds is None:
log.info("auth: missing/malformed Authorization header (path=%s)", request.url.path)
return _unauthorized()
user, pw = creds
if not _check(user, pw):
log.info("auth: bad credentials (user=%r path=%s)", user, request.url.path)
return _unauthorized()
return await call_next(request)
+87
View File
@@ -0,0 +1,87 @@
"""Thin HTTPS client for ag-monitor's /api/data/* endpoints.
The MCP server doesn't talk to SQLite directly — it goes over HTTPS to the
ag-monitor service on a different LAN host. One X-API-Key header guards
every call. Failures bubble up as ``AgBidsError`` so tool functions can
turn them into user-friendly markdown.
"""
from __future__ import annotations
import logging
import os
from typing import Any
import httpx
logger = logging.getLogger(__name__)
class AgBidsError(RuntimeError):
"""Anything went wrong calling ag-monitor."""
def _config() -> tuple[str, str, float]:
url = os.environ.get("AG_BIDS_API_URL", "https://agbids.paul.farm").rstrip("/")
key = os.environ.get("AG_BIDS_API_KEY", "")
timeout = float(os.environ.get("AG_BIDS_API_TIMEOUT_SECS", "20"))
if not key:
raise AgBidsError("AG_BIDS_API_KEY is not set in the environment")
return url, key, timeout
def _get(path: str, **params: Any) -> dict | list:
url_base, key, timeout = _config()
url = f"{url_base}{path}"
# Drop None values so the upstream doesn't see empty query strings.
clean: dict[str, Any] = {k: v for k, v in params.items() if v is not None and v != ""}
try:
r = httpx.get(url, params=clean, timeout=timeout,
headers={"X-API-Key": key, "Accept": "application/json"})
except httpx.RequestError as e:
raise AgBidsError(f"GET {url} failed: {e}") from e
if r.status_code != 200:
raise AgBidsError(
f"GET {url} returned {r.status_code}: {r.text[:200]}"
)
try:
return r.json()
except ValueError as e:
raise AgBidsError(f"GET {url} returned non-JSON: {r.text[:200]}") from e
# ---- Public client API (mirrors /api/data/* shape) ----
def latest(commodity: str | None = None, source: str | None = None,
delivery: str | None = None, kind: str | None = None) -> dict:
return _get("/api/data/latest",
commodity=commodity, source=source, delivery=delivery, kind=kind)
def history(commodity: str, source_id: int | None = None,
delivery: str | None = None, days: int = 30) -> dict:
return _get("/api/data/history",
commodity=commodity, source_id=source_id,
delivery=delivery, days=days)
def best(commodity: str) -> dict:
return _get("/api/data/best", commodity=commodity)
def inputs(product: str | None = None) -> dict:
return _get("/api/data/inputs", product=product)
def sources() -> dict:
return _get("/api/data/sources")
def deliveries(commodity: str) -> dict:
return _get("/api/data/deliveries", commodity=commodity)
def todays_summary() -> dict:
"""Brief snapshot — same blob the morning email digest uses."""
return _get("/api/brief/snapshot", kind="morning")
+249
View File
@@ -0,0 +1,249 @@
"""JSON-from-ag-monitor → markdown helpers.
One function per @mcp.tool. Markdown is what FastMCP tools return; Claude /
OpenWebUI render it nicely. Cents → dollars formatting is centralized here so
all tools use the same precision (4 decimals for $/bu, 2 decimals for $/ton).
"""
from __future__ import annotations
from typing import Optional
def _bu(cents: Optional[int]) -> str:
if cents is None:
return ""
return f"${cents / 100:.4f}"
def _ton(cents: Optional[int]) -> str:
if cents is None:
return ""
return f"${cents / 100:.2f}"
def _basis(cents: Optional[int]) -> str:
if cents is None:
return ""
sign = "+" if cents >= 0 else ""
return f"{sign}{cents / 100:.2f}"
def _delta_arrow(cents: Optional[int]) -> str:
if cents is None or cents == 0:
return ""
return "" if cents > 0 else ""
# ---------- best_local_bid ----------
def fmt_best(commodity: str, payload: dict) -> str:
best = payload.get("best")
today = payload.get("today")
if not best:
return (
f"### Best place to sell {commodity} today ({today})\n\n"
f"No current-month {commodity} bids posted across the tracked sources.\n"
)
return (
f"### Best place to sell {commodity} today ({today})\n\n"
f"**{best['source_name']}** — delivery **{best['delivery']}** — "
f"bid **{_bu(best['bid_cents'])}/bu** (basis {_basis(best.get('basis_cents'))}, "
f"futures {best.get('futures_contract') or ''})\n\n"
f"_Fetched {best.get('fetched_at') or '?'}_\n"
)
# ---------- inputs / fertilizer ----------
def fmt_inputs(payload: dict) -> str:
rows = payload.get("rows") or []
product = payload.get("product")
title = f"### {product.upper()} prices" if product else "### Fertilizer + lime prices"
if not rows:
scope = product or "any tracked input"
return f"{title}\n\nNo {scope} prices on file.\n"
lines = [
title, "",
"| Source | Product | Delivery | Price ($/ton) | Fetched |",
"|---|---|---|---:|---|",
]
for r in rows:
lines.append(
f"| {r['source_name']} | {r.get('display_name') or r['commodity']} | "
f"{r['delivery']} | {_ton(r.get('bid_cents'))} | {r.get('fetched_at') or '?'} |"
)
return "\n".join(lines) + "\n"
# ---------- latest snapshot ----------
def fmt_latest(payload: dict) -> str:
rows = payload.get("rows") or []
if not rows:
return "### Latest prices\n\nNo rows match those filters.\n"
lines = [
"### Latest prices", "",
"| Source | Commodity | Delivery | Bid | Basis | Futures | Fetched |",
"|---|---|---|---:|---:|---|---|",
]
for r in rows:
unit_fmt = _ton if r.get("commodity_kind") == "fertilizer" else _bu
lines.append(
f"| {r['source_name']} | {r.get('display_name') or r['commodity']} | "
f"{r['delivery']} | {unit_fmt(r.get('bid_cents'))} | "
f"{_basis(r.get('basis_cents'))} | {r.get('futures_contract') or ''} | "
f"{r.get('fetched_at') or '?'} |"
)
return "\n".join(lines) + "\n"
# ---------- price history ----------
def fmt_history(payload: dict, max_rows: int = 60) -> str:
rows = payload.get("rows") or []
commodity = payload.get("commodity")
days = payload.get("days")
if not rows:
return f"### {commodity} history ({days}d)\n\nNo samples in the window.\n"
# Per (source, delivery) trend annotation: first vs last sample.
series: dict[tuple, list[dict]] = {}
for r in rows:
series.setdefault((r["source_name"], r["delivery"]), []).append(r)
lines = [f"### {commodity} price history — last {days} days", ""]
for (src, dlv), pts in sorted(series.items()):
if not pts:
continue
first = pts[0].get("bid_cents")
last = pts[-1].get("bid_cents")
delta = (last - first) if (first is not None and last is not None) else None
arrow = _delta_arrow(delta)
lines.append(
f"- **{src}** / {dlv}: {len(pts)} samples · "
f"{_bu(first)}{_bu(last)} {arrow} {_basis(delta) if delta is not None else ''}".rstrip()
)
# If the history is shallow include the raw rows too (helpful for charts).
if sum(len(p) for p in series.values()) <= max_rows:
lines.extend([
"",
"| Time | Source | Delivery | Bid | Basis |",
"|---|---|---|---:|---:|",
])
for r in rows[-max_rows:]:
lines.append(
f"| {r['fetched_at']} | {r['source_name']} | {r['delivery']} | "
f"{_bu(r.get('bid_cents'))} | {_basis(r.get('basis_cents'))} |"
)
return "\n".join(lines) + "\n"
# ---------- sources / health ----------
def fmt_sources(payload: dict) -> str:
src = payload.get("sources") or []
if not src:
return "### Sources\n\nNo active sources.\n"
lines = [
"### Tracked sources", "",
"| Source | Kind | Last success | Consecutive failures | Last error |",
"|---|---|---|---:|---|",
]
for s in src:
lines.append(
f"| {s['name']} | {s['kind']} | {s.get('last_success_at') or ''} | "
f"{s.get('consecutive_failures') or 0} | {s.get('last_error') or ''} |"
)
return "\n".join(lines) + "\n"
def fmt_health(payload: dict) -> str:
src = payload.get("sources") or []
healthy, stale, down = [], [], []
for s in src:
n = s.get("consecutive_failures") or 0
if n >= 3:
down.append(s)
elif not s.get("last_success_at"):
stale.append(s)
else:
healthy.append(s)
lines = ["### Source health", ""]
lines.append(f"- ✅ Healthy: **{len(healthy)}**")
lines.append(f"- ⚠ Stale (never succeeded): **{len(stale)}**")
lines.append(f"- ❌ Down (3+ consecutive failures): **{len(down)}**")
if stale or down:
lines.append("")
lines.append("| Source | State | Last success | Failures | Error |")
lines.append("|---|---|---|---:|---|")
for s in stale + down:
state = "stale" if s in stale else "down"
lines.append(
f"| {s['name']} | {state} | {s.get('last_success_at') or ''} | "
f"{s.get('consecutive_failures') or 0} | "
f"{(s.get('last_error') or '')[:80]} |"
)
return "\n".join(lines) + "\n"
# ---------- list helpers ----------
def fmt_deliveries(payload: dict) -> str:
commodity = payload.get("commodity")
labels = payload.get("deliveries") or []
if not labels:
return f"### {commodity} deliveries\n\nNo posted deliveries.\n"
return (
f"### Posted delivery labels for {commodity}\n\n"
+ "\n".join(f"- {x}" for x in labels)
+ "\n"
)
def fmt_commodities() -> str:
return (
"### Tracked commodities\n\n"
"- **corn** ($/bu)\n"
"- **soy** — soybeans ($/bu)\n"
"- **wheat** ($/bu) — tracked but excluded from daily brief emails\n"
"- **map** — MAP 11-52-0 ($/ton)\n"
"- **potash** — Potash 0-0-60 ($/ton)\n"
"- **lime** ($/ton)\n"
)
# ---------- today's summary ----------
def fmt_summary(payload: dict) -> str:
today = payload.get("today")
prev = payload.get("prev_trading_day")
lines = [f"### Market summary — today {today} vs prev trading day {prev}", ""]
for c in payload.get("commodities", []):
f = c.get("futures") or {}
chg = f.get("change_cents")
arrow = _delta_arrow(chg)
lines.append(
f"**{c['display_name']}** — CBOT {f.get('contract','?')} "
f"last {_bu(f.get('today_last_cents'))}, prev close "
f"{_bu(f.get('prev_close_cents'))} {arrow} {_basis(chg) if chg is not None else ''}"
)
bft = c.get("best_for_today")
if bft:
lines.append(
f" · Best for today's delivery ({bft['delivery']}): "
f"**{bft['source_name']}** @ {_bu(bft['bid_cents'])} "
f"(basis {_basis(bft.get('basis_cents'))})"
)
else:
lines.append(" · No current-month local bid posted.")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
+275
View File
@@ -0,0 +1,275 @@
"""ag-bids MCP server.
Mirrors the zerto-docs-rag layout — FastMCP, ``@mcp.tool()`` decorators
returning markdown strings, dual stdio/streamable-http transport — with one
addition: HTTP Basic auth in front of the streamable-http transport. MetaMCP
upstream config injects ``Authorization: Basic <b64>`` so the proxied call
sails through; direct hits without the header get 401.
Run locally (stdio for Claude Desktop):
MCP_TRANSPORT=stdio python -m ag_bids_mcp.server
Run in a container (HTTP for MetaMCP upstream):
python -m ag_bids_mcp.server # default transport = streamable-http
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
from typing import Annotated
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from ag_bids_mcp import client, format as fmt
from ag_bids_mcp.auth import basic_auth_middleware, expected_credentials
from ag_bids_mcp.usage import track
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr,
)
log = logging.getLogger("ag-bids-mcp")
mcp = FastMCP("ag-bids", stateless_http=True)
VALID_GRAIN = {"corn", "soy", "wheat"}
VALID_INPUT = {"lime", "map", "potash"}
# ============================================================================
# Tools
# ============================================================================
@mcp.tool()
def best_local_bid(
commodity: Annotated[
str, Field(description="Grain to look up: 'corn', 'soy' (soybeans), or 'wheat'.")
],
) -> str:
"""Return the highest local bid for *this calendar month's* delivery for
the given grain. This is the "where should I haul today" answer."""
commodity = commodity.strip().lower()
with track("best_local_bid", commodity=commodity):
if commodity not in VALID_GRAIN:
return f"`commodity` must be one of: {sorted(VALID_GRAIN)}"
payload = client.best(commodity)
return fmt.fmt_best(commodity, payload)
@mcp.tool()
def current_lime_price() -> str:
"""Latest lime prices on file across all sources. Lime is rarely posted on
public bid pages — entries usually come from manual admin input."""
with track("current_lime_price"):
payload = client.inputs(product="lime")
return fmt.fmt_inputs(payload)
@mcp.tool()
def current_input_price(
product: Annotated[
str | None,
Field(description="One of: 'lime', 'map', 'potash'. Omit for all three."),
] = None,
) -> str:
"""Latest fertilizer / lime prices ($/ton)."""
p = product.strip().lower() if product else None
with track("current_input_price", product=p):
if p is not None and p not in VALID_INPUT:
return f"`product` must be one of: {sorted(VALID_INPUT)} (or omit)"
payload = client.inputs(product=p)
return fmt.fmt_inputs(payload)
@mcp.tool()
def latest_prices(
commodity: Annotated[
str | None,
Field(description="Filter to one commodity (corn / soy / wheat / map / potash / lime)."),
] = None,
source: Annotated[
str | None,
Field(description="Filter to one source by exact display name."),
] = None,
delivery: Annotated[
str | None,
Field(description="Filter to one delivery label (e.g. 'May 2026', 'Oct/Nov 2026')."),
] = None,
) -> str:
"""Snapshot of the latest scraped bid per (source, commodity, delivery)."""
cm = commodity.strip().lower() if commodity else None
with track("latest_prices", commodity=cm, source=source, delivery=delivery):
payload = client.latest(commodity=cm, source=source, delivery=delivery)
return fmt.fmt_latest(payload)
@mcp.tool()
def price_history(
commodity: Annotated[
str, Field(description="One of corn / soy / wheat / map / potash / lime.")
],
source: Annotated[
str | None,
Field(description="Optional source display name to narrow the chart."),
] = None,
delivery: Annotated[
str | None,
Field(description="Optional delivery label to narrow the chart."),
] = None,
days: Annotated[
int, Field(ge=1, le=365, description="Lookback window in days.")
] = 30,
) -> str:
"""Compact price history per (source, delivery) for the chosen commodity.
Returns per-series ▲/▼ trend annotations plus the raw points if the
window has fewer than ~60 samples."""
cm = commodity.strip().lower()
with track("price_history", commodity=cm, source=source, delivery=delivery, days=days):
# source_id lookup would require an extra call; for now we accept
# source by name and let users filter the markdown output. The
# underlying /api/data/history accepts source_id; pass-through is
# source-agnostic.
payload = client.history(commodity=cm, delivery=delivery, days=days)
# If user asked for a single source, filter rows post-fetch.
if source:
payload["rows"] = [r for r in payload.get("rows") or [] if r.get("source_name") == source]
return fmt.fmt_history(payload)
@mcp.tool()
def list_sources() -> str:
"""All active scrapers + their last-success timestamps and any pending failures."""
with track("list_sources"):
payload = client.sources()
return fmt.fmt_sources(payload)
@mcp.tool()
def list_commodities() -> str:
"""The complete set of commodities tracked by ag-monitor."""
with track("list_commodities"):
return fmt.fmt_commodities()
@mcp.tool()
def list_deliveries(
commodity: Annotated[
str, Field(description="Commodity whose posted delivery labels you want.")
],
) -> str:
"""All posted delivery labels for a commodity, sorted chronologically."""
cm = commodity.strip().lower()
with track("list_deliveries", commodity=cm):
payload = client.deliveries(cm)
return fmt.fmt_deliveries(payload)
@mcp.tool()
def source_health() -> str:
"""Operational status of every source: healthy, stale, or down."""
with track("source_health"):
payload = client.sources()
return fmt.fmt_health(payload)
@mcp.tool()
def todays_summary() -> str:
"""Today's market snapshot — same blob used by the morning email brief.
Includes CBOT corn + soy continuous futures vs the previous trading
day's close, and the best local bid for each commodity's current-month
delivery."""
with track("todays_summary"):
payload = client.todays_summary()
return fmt.fmt_summary(payload)
# ============================================================================
# Entry point
# ============================================================================
def _streamable_app_with_auth():
"""Attach Basic-auth middleware directly to FastMCP's Starlette app.
Earlier we tried mounting the FastMCP app under a parent Starlette app to
install middleware. That broke the streamable-HTTP transport because the
parent's lifespan didn't trigger FastMCP's internal session-manager
task-group startup, so requests hit "Task group is not initialized".
Adding middleware on the FastMCP-provided app keeps the original lifespan
intact.
"""
from starlette.middleware.base import BaseHTTPMiddleware
app = mcp.streamable_http_app()
app.add_middleware(BaseHTTPMiddleware, dispatch=basic_auth_middleware)
return app
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument(
"--transport",
default=os.environ.get("MCP_TRANSPORT", "stdio"),
choices=["stdio", "streamable-http", "sse"],
)
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
args = p.parse_args()
if args.transport == "stdio":
# stdio has no HTTP layer → no Basic auth to enforce. Useful for
# local Claude Desktop development.
log.info("starting ag-bids MCP on stdio (auth bypassed in stdio mode)")
mcp.run()
return
# HTTP transports: fail closed if Basic-auth credentials are unset.
expected_credentials()
# Same DNS-rebinding logic as zerto-docs-rag: behind a Docker DNS name
# like "ag-bids-mcp:8000", FastMCP's default localhost-only check would
# 421 every request.
allowed_hosts = os.environ.get("MCP_ALLOWED_HOSTS")
allowed_origins = os.environ.get("MCP_ALLOWED_ORIGINS")
if (
os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}
or allowed_hosts == "*"
or allowed_origins == "*"
):
mcp.settings.transport_security.enable_dns_rebinding_protection = False
else:
if allowed_hosts:
mcp.settings.transport_security.allowed_hosts = [
h.strip() for h in allowed_hosts.split(",") if h.strip()
]
if allowed_origins:
mcp.settings.transport_security.allowed_origins = [
o.strip() for o in allowed_origins.split(",") if o.strip()
]
if args.transport == "sse":
# SSE transport: same direct-add-middleware pattern.
from starlette.middleware.base import BaseHTTPMiddleware
app = mcp.sse_app()
app.add_middleware(BaseHTTPMiddleware, dispatch=basic_auth_middleware)
else:
app = _streamable_app_with_auth()
import uvicorn
log.info("starting ag-bids MCP on %s://%s:%s (Basic auth enforced)",
args.transport, args.host, args.port)
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
if __name__ == "__main__":
main()
+93
View File
@@ -0,0 +1,93 @@
"""Per-tool-call usage logging (one JSONL line per invocation).
Trimmed from zerto-docs-rag/docs_mcp/usage.py. Captures: timestamp, tool
name, args (commodity / source / delivery / etc — all non-PII), success
flag, error class on failure, elapsed ms. Useful for: spotting hot tools,
seeing which queries fail upstream, weekly summaries.
Writes to ``$USAGE_LOG_DIR/usage-YYYY-MM-DD.jsonl``, one file per day,
auto-rotated. Old files beyond ``USAGE_LOG_KEEP_DAYS`` are deleted on
each write.
"""
from __future__ import annotations
import json
import logging
import os
import time
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
log = logging.getLogger(__name__)
def _log_dir() -> Path | None:
raw = os.environ.get("USAGE_LOG_DIR", "")
if not raw:
return None
p = Path(raw)
try:
p.mkdir(parents=True, exist_ok=True)
except OSError as e:
log.warning("usage log dir %s unavailable: %s", raw, e)
return None
return p
def _keep_days() -> int:
try:
return int(os.environ.get("USAGE_LOG_KEEP_DAYS", "90"))
except ValueError:
return 90
def _prune(dir_: Path) -> None:
cutoff = (datetime.now(timezone.utc) - timedelta(days=_keep_days())).date()
for f in dir_.glob("usage-*.jsonl"):
try:
date_part = f.stem.removeprefix("usage-")
file_date = datetime.strptime(date_part, "%Y-%m-%d").date()
if file_date < cutoff:
f.unlink()
except (ValueError, OSError):
continue
def write(record: dict[str, Any]) -> None:
dir_ = _log_dir()
if dir_ is None:
return
today = datetime.now(timezone.utc).date().isoformat()
path = dir_ / f"usage-{today}.jsonl"
try:
with path.open("a", encoding="utf-8") as fp:
json.dump(record, fp, default=str)
fp.write("\n")
_prune(dir_)
except OSError as e:
log.warning("usage log write failed: %s", e)
@contextmanager
def track(tool: str, **fields: Any):
"""Wrap a tool body. Logs on both success and failure with elapsed ms."""
started = time.perf_counter()
record: dict[str, Any] = {
"ts": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"tool": tool,
**fields,
}
try:
yield record
record["ok"] = True
except Exception as e:
record["ok"] = False
record["error_class"] = type(e).__name__
record["error_msg"] = str(e)[:200]
raise
finally:
record["elapsed_ms"] = int((time.perf_counter() - started) * 1000)
write(record)
+101
View File
@@ -0,0 +1,101 @@
# Deploying `ag-bids-mcp` behind MetaMCP
This runs on the **MetaMCP host (192.168.0.2)** alongside `zerto-docs-mcp`.
It joins the same `mcp` Docker network so MetaMCP can proxy to it by container
DNS name (`http://ag-bids-mcp:8000/mcp`).
## One-time setup (on 192.168.0.2)
### 1. Add env vars
Edit `/home/justin/zerto-docs-rag/deploy/.env`:
```ini
# Copy this from ag-monitor's .env on 192.168.0.126 (the BRIEF_API_KEY value).
# Same key powers /api/data/* and /api/brief/*.
AG_BIDS_API_KEY=<paste>
# Credentials MetaMCP will send to the MCP via Basic auth. Generate two
# random values — they're not user-facing, only MetaMCP knows them.
AG_BIDS_MCP_USER=agbids-svc
AG_BIDS_MCP_PASS=<generate: python -c "import secrets; print(secrets.token_urlsafe(32))">
```
### 2. Paste the service block into compose
Open `/home/justin/zerto-docs-rag/deploy/docker-compose.yml` and append the
`ag-bids-mcp:` block from [docker-compose.snippet.yml](docker-compose.snippet.yml)
inside `services:`, alongside `zerto-docs-mcp:`. Keep the same indentation
(2 spaces).
### 3. Build + push the image
On any dev machine with `docker` and the Gitea registry login:
```bash
cd ~/github/ag-bids-mcp
docker login git.jpaul.io # use your Gitea PAT
docker build -t git.jpaul.io/justin/ag-bids-mcp:latest .
docker push git.jpaul.io/justin/ag-bids-mcp:latest
```
### 4. Start the container
```bash
cd /home/justin/zerto-docs-rag/deploy
docker compose pull ag-bids-mcp
docker compose up -d ag-bids-mcp
docker compose logs -f ag-bids-mcp # confirm "starting ag-bids MCP on streamable-http://0.0.0.0:8000 (Basic auth enforced)"
```
Watchtower (already running on the host) will auto-pull updates every 5
minutes from this point forward.
### 5. Register the namespace in MetaMCP
In the MetaMCP web UI at `https://mcp.jpaul.io`:
1. **Create namespace** → name: `ag-bids`
2. **Add upstream MCP server** to that namespace:
- **Transport:** Streamable HTTP
- **URL:** `http://ag-bids-mcp:8000/mcp`
- **Authentication:** Basic
- **Username:** matches `AG_BIDS_MCP_USER`
- **Password:** matches `AG_BIDS_MCP_PASS`
3. Save.
Public endpoint becomes:
**`https://mcp.jpaul.io/metamcp/ag-bids/mcp`**
### 6. Smoke test from the LAN
```bash
# 401 without creds
curl -i http://192.168.0.2:8000/mcp 2>&1 | head -3
# 200 with creds (Initialize handshake will succeed; the MCP doesn't have
# a plain GET, so a curl probe just confirms auth)
curl -i -u "$AG_BIDS_MCP_USER:$AG_BIDS_MCP_PASS" http://192.168.0.2:8000/mcp 2>&1 | head -3
```
Then in a real MCP client (Claude Desktop / OpenWebUI / etc.) configured against
`https://mcp.jpaul.io/metamcp/ag-bids/mcp`, try:
- **"What's the best place to sell corn today?"** → calls `best_local_bid("corn")`
- **"What's the current price of lime?"** → calls `current_lime_price()`
- **"Are any sources down?"** → calls `source_health()`
## Rotating credentials
To rotate the Basic password: change `AG_BIDS_MCP_PASS` in the `.env` on
192.168.0.2 → `docker compose up -d ag-bids-mcp` to restart with the new
value → update the MetaMCP namespace's upstream Basic password to match.
To rotate the upstream API key: change `BRIEF_API_KEY` in ag-monitor's `.env`
on 192.168.0.126 + restart `api` there, then update `AG_BIDS_API_KEY` on
192.168.0.2 + restart `ag-bids-mcp`.
## Observability
- Per-tool-call usage logs: `/home/justin/zerto-docs-rag/deploy/ag-bids-mcp-logs/usage-YYYY-MM-DD.jsonl`
- Container stdout: `docker compose logs ag-bids-mcp`
- Successful auth → no log line; failed auth → INFO line with the offending path
+38
View File
@@ -0,0 +1,38 @@
# Paste this service block into /home/justin/zerto-docs-rag/deploy/docker-compose.yml
# on 192.168.0.2 (the MetaMCP host), alongside zerto-docs-mcp. It joins the
# same `mcp` Docker network so MetaMCP can reach it by container DNS name.
#
# Required env vars (set in the same .env that already powers the rest of the
# MetaMCP stack):
# AG_BIDS_API_KEY — copy from ag-monitor's .env (BRIEF_API_KEY)
# AG_BIDS_MCP_USER — username MetaMCP will send in Basic auth
# AG_BIDS_MCP_PASS — password MetaMCP will send in Basic auth
ag-bids-mcp:
container_name: ag-bids-mcp
image: git.jpaul.io/justin/ag-bids-mcp:latest
pull_policy: always
restart: unless-stopped
environment:
MCP_TRANSPORT: streamable-http
MCP_HOST: 0.0.0.0
MCP_PORT: "8000"
# Behind a Docker DNS name, FastMCP's localhost-only rebinding-protection
# would 421 every call from MetaMCP. Disable it; the mcp network is private.
MCP_DISABLE_DNS_REBINDING_PROTECTION: "1"
# --- upstream ag-monitor (Cloudflare Tunnel from .0.126) ---
AG_BIDS_API_URL: https://agbids.paul.farm
AG_BIDS_API_KEY: ${AG_BIDS_API_KEY}
AG_BIDS_API_TIMEOUT_SECS: "20"
# --- HTTP Basic auth in front of this MCP ---
AG_BIDS_MCP_USER: ${AG_BIDS_MCP_USER}
AG_BIDS_MCP_PASS: ${AG_BIDS_MCP_PASS}
# --- per-tool-call JSONL usage log ---
USAGE_LOG_DIR: /app/var/logs
USAGE_LOG_KEEP_DAYS: "90"
volumes:
# Survive container recreates (Watchtower rolls this every ~5 min).
- ./ag-bids-mcp-logs:/app/var/logs
networks: [mcp]
labels:
com.centurylinklabs.watchtower.enable: "true"
+5
View File
@@ -0,0 +1,5 @@
mcp>=1.10
httpx>=0.27
pydantic>=2.7
starlette>=0.39
uvicorn[standard]>=0.30
View File
+102
View File
@@ -0,0 +1,102 @@
"""HTTP Basic middleware tests."""
from __future__ import annotations
import base64
import importlib
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
def _reload_auth(monkeypatch, user="alice", password="hunter2"):
monkeypatch.setenv("AG_BIDS_MCP_USER", user)
monkeypatch.setenv("AG_BIDS_MCP_PASS", password)
from ag_bids_mcp import auth
importlib.reload(auth)
return auth
def _b64(creds: str) -> str:
return base64.b64encode(creds.encode()).decode()
def test_expected_credentials_requires_both(monkeypatch):
monkeypatch.delenv("AG_BIDS_MCP_USER", raising=False)
monkeypatch.delenv("AG_BIDS_MCP_PASS", raising=False)
from ag_bids_mcp import auth
importlib.reload(auth)
import pytest
with pytest.raises(RuntimeError):
auth.expected_credentials()
monkeypatch.setenv("AG_BIDS_MCP_USER", "alice")
monkeypatch.delenv("AG_BIDS_MCP_PASS", raising=False)
importlib.reload(auth)
with pytest.raises(RuntimeError):
auth.expected_credentials()
def test_middleware_via_starlette_app(monkeypatch):
"""End-to-end: a Starlette app with the middleware returns 401 / 200 correctly."""
auth = _reload_auth(monkeypatch, "alice", "hunter2")
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import PlainTextResponse
from starlette.routing import Route
from starlette.testclient import TestClient
async def hello(request):
return PlainTextResponse("ok")
app = Starlette(
routes=[Route("/x", endpoint=hello)],
middleware=[Middleware(BaseHTTPMiddleware, dispatch=auth.basic_auth_middleware)],
)
c = TestClient(app)
# No header -> 401 + WWW-Authenticate
r = c.get("/x")
assert r.status_code == 401
assert r.headers.get("www-authenticate", "").startswith("Basic")
# Wrong creds -> 401
r = c.get("/x", headers={"Authorization": "Basic " + _b64("alice:wrong")})
assert r.status_code == 401
# Wrong username, right password -> 401
r = c.get("/x", headers={"Authorization": "Basic " + _b64("eve:hunter2")})
assert r.status_code == 401
# Right creds -> 200
r = c.get("/x", headers={"Authorization": "Basic " + _b64("alice:hunter2")})
assert r.status_code == 200
assert r.text == "ok"
def test_malformed_authorization_header(monkeypatch):
auth = _reload_auth(monkeypatch)
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import PlainTextResponse
from starlette.routing import Route
from starlette.testclient import TestClient
async def hello(request):
return PlainTextResponse("ok")
app = Starlette(
routes=[Route("/x", endpoint=hello)],
middleware=[Middleware(BaseHTTPMiddleware, dispatch=auth.basic_auth_middleware)],
)
c = TestClient(app)
# Not even base64
r = c.get("/x", headers={"Authorization": "Basic !!!not_b64!!!"})
assert r.status_code == 401
# Bearer instead of Basic
r = c.get("/x", headers={"Authorization": "Bearer abc"})
assert r.status_code == 401
+137
View File
@@ -0,0 +1,137 @@
"""HTTP client unit tests (no live network)."""
from __future__ import annotations
import importlib
import os
import sys
# Make the package importable when pytest runs from repo root
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
def _reload_client(monkeypatch, key="abc", url="http://test.invalid"):
monkeypatch.setenv("AG_BIDS_API_URL", url)
monkeypatch.setenv("AG_BIDS_API_KEY", key)
from ag_bids_mcp import client
importlib.reload(client)
return client
def test_missing_api_key_raises(monkeypatch):
monkeypatch.setenv("AG_BIDS_API_URL", "http://test.invalid")
monkeypatch.delenv("AG_BIDS_API_KEY", raising=False)
from ag_bids_mcp import client
importlib.reload(client)
import pytest
with pytest.raises(client.AgBidsError):
client.latest()
def test_get_sends_api_key_header(monkeypatch):
client = _reload_client(monkeypatch, key="topsecret")
captured = {}
class FakeResp:
status_code = 200
text = ""
def json(self):
return {"count": 0, "rows": []}
def fake_get(url, params=None, timeout=None, headers=None):
captured["url"] = url
captured["params"] = dict(params or {})
captured["headers"] = dict(headers or {})
return FakeResp()
monkeypatch.setattr(client.httpx, "get", fake_get)
out = client.latest(commodity="corn")
assert captured["headers"]["X-API-Key"] == "topsecret"
assert captured["url"].endswith("/api/data/latest")
assert captured["params"] == {"commodity": "corn"}
assert out == {"count": 0, "rows": []}
def test_get_drops_none_params(monkeypatch):
client = _reload_client(monkeypatch)
captured = {}
class FakeResp:
status_code = 200
text = ""
def json(self): return {"count": 0, "rows": []}
def fake_get(url, params=None, timeout=None, headers=None):
captured["params"] = dict(params or {})
return FakeResp()
monkeypatch.setattr(client.httpx, "get", fake_get)
client.latest(commodity="corn", source=None, delivery="", kind=None)
# None and "" should both be dropped
assert captured["params"] == {"commodity": "corn"}
def test_get_raises_on_non_200(monkeypatch):
client = _reload_client(monkeypatch)
class FakeResp:
status_code = 401
text = "no key"
def json(self): return {}
monkeypatch.setattr(client.httpx, "get", lambda *a, **k: FakeResp())
import pytest
with pytest.raises(client.AgBidsError):
client.best("corn")
def test_get_raises_on_network_error(monkeypatch):
client = _reload_client(monkeypatch)
def boom(*a, **k):
raise client.httpx.ConnectError("network is unreachable")
monkeypatch.setattr(client.httpx, "get", boom)
import pytest
with pytest.raises(client.AgBidsError):
client.sources()
def test_each_endpoint_hits_expected_path(monkeypatch):
client = _reload_client(monkeypatch)
calls = []
class FakeResp:
status_code = 200
text = ""
def json(self): return {}
def fake_get(url, params=None, timeout=None, headers=None):
calls.append((url, dict(params or {})))
return FakeResp()
monkeypatch.setattr(client.httpx, "get", fake_get)
client.latest()
client.history("corn", days=7)
client.best("soy")
client.inputs(product="lime")
client.sources()
client.deliveries("corn")
client.todays_summary()
paths = [u.replace("http://test.invalid", "") for u, _ in calls]
assert paths == [
"/api/data/latest",
"/api/data/history",
"/api/data/best",
"/api/data/inputs",
"/api/data/sources",
"/api/data/deliveries",
"/api/brief/snapshot",
]
# history call sent commodity + days
assert calls[1][1] == {"commodity": "corn", "days": 7}
# best call sent only commodity
assert calls[2][1] == {"commodity": "soy"}
# todays_summary uses kind=morning
assert calls[6][1] == {"kind": "morning"}
+163
View File
@@ -0,0 +1,163 @@
"""Markdown formatter tests — no network, just JSON-in / markdown-out."""
from __future__ import annotations
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from ag_bids_mcp import format as fmt
def test_fmt_best_with_winner():
payload = {
"commodity": "corn", "today": "2026-05-20",
"best": {
"source_name": "Mercer Landmark — St Henry",
"delivery": "May 2026",
"bid_cents": 491, "basis_cents": 33,
"futures_contract": "ZCK26",
"fetched_at": "2026-05-20T15:00:00+00:00",
},
}
out = fmt.fmt_best("corn", payload)
assert "Mercer Landmark — St Henry" in out
assert "$4.9100/bu" in out
assert "May 2026" in out
assert "+0.33" in out
def test_fmt_best_no_winner():
payload = {"commodity": "wheat", "today": "2026-05-20", "best": None}
out = fmt.fmt_best("wheat", payload)
assert "No current-month" in out
assert "wheat" in out
def test_fmt_inputs_lime_table():
payload = {
"product": "lime", "count": 2,
"rows": [
{"source_name": "Bob's Lime Quote", "commodity": "lime",
"display_name": "Lime", "delivery": "spot",
"bid_cents": 42000, "commodity_kind": "fertilizer",
"fetched_at": "2026-05-20T15:00:00+00:00"},
{"source_name": "Coop X", "commodity": "lime",
"display_name": "Lime", "delivery": "spot",
"bid_cents": 45500, "commodity_kind": "fertilizer",
"fetched_at": "2026-05-20T15:01:00+00:00"},
],
}
out = fmt.fmt_inputs(payload)
assert "LIME prices" in out
assert "$420.00" in out
assert "$455.00" in out
assert "Bob's Lime Quote" in out
def test_fmt_inputs_empty():
out = fmt.fmt_inputs({"product": "lime", "rows": []})
assert "No lime prices on file." in out
def test_fmt_latest_table_picks_correct_unit():
payload = {"count": 2, "rows": [
{"source_name": "Bambauer", "commodity": "corn",
"display_name": "Corn", "delivery": "May 2026",
"bid_cents": 458, "basis_cents": 2, "commodity_kind": "grain",
"futures_contract": "ZCN26", "fetched_at": "2026-05-20T15:00:00+00:00"},
{"source_name": "Coop X", "commodity": "lime",
"display_name": "Lime", "delivery": "spot",
"bid_cents": 42000, "basis_cents": None, "commodity_kind": "fertilizer",
"futures_contract": None, "fetched_at": "2026-05-20T15:00:00+00:00"},
]}
out = fmt.fmt_latest(payload)
assert "$4.5800" in out # grain in /bu (4 decimals)
assert "$420.00" in out # fertilizer in /ton (2 decimals)
def test_fmt_history_with_trend_arrow():
payload = {
"commodity": "corn", "days": 7,
"rows": [
{"source_name": "Andersons", "delivery": "May 2026",
"bid_cents": 480, "basis_cents": 25,
"fetched_at": "2026-05-15T15:00:00+00:00"},
{"source_name": "Andersons", "delivery": "May 2026",
"bid_cents": 496, "basis_cents": 30,
"fetched_at": "2026-05-20T15:00:00+00:00"},
],
}
out = fmt.fmt_history(payload)
assert "" in out
assert "$4.8000" in out
assert "$4.9600" in out
def test_fmt_sources_table():
payload = {"sources": [
{"id": 1, "name": "Test Elev", "kind": "elevator",
"last_success_at": "2026-05-20T14:55:00+00:00",
"consecutive_failures": 0, "last_error": None},
]}
out = fmt.fmt_sources(payload)
assert "Test Elev" in out
assert "elevator" in out
def test_fmt_health_buckets():
payload = {"sources": [
{"name": "Healthy", "kind": "elevator",
"last_success_at": "2026-05-20T14:55:00+00:00",
"consecutive_failures": 0, "last_error": None},
{"name": "Stale", "kind": "elevator",
"last_success_at": None,
"consecutive_failures": 0, "last_error": None},
{"name": "Down", "kind": "elevator",
"last_success_at": "2026-05-15T14:55:00+00:00",
"consecutive_failures": 5, "last_error": "boom"},
]}
out = fmt.fmt_health(payload)
assert "Healthy: **1**" in out
assert "Stale (never succeeded): **1**" in out
assert "Down" in out and "**1**" in out
# Down section lists boom
assert "boom" in out
def test_fmt_deliveries():
out = fmt.fmt_deliveries({"commodity": "corn",
"deliveries": ["May 2026", "Jul 2026"]})
assert "- May 2026" in out
assert "- Jul 2026" in out
def test_fmt_commodities_lists_all_six():
out = fmt.fmt_commodities()
for name in ("corn", "soy", "wheat", "map", "potash", "lime"):
assert name in out.lower()
def test_fmt_summary_includes_best_for_today():
payload = {
"today": "2026-05-20", "prev_trading_day": "2026-05-19",
"commodities": [
{"symbol": "corn", "display_name": "Corn",
"futures": {"contract": "ZC=F", "today_last_cents": 458,
"prev_close_cents": 455, "change_cents": 3},
"best_for_today": {
"source_name": "Mercer Landmark — St Henry",
"delivery": "May 2026", "bid_cents": 491, "basis_cents": 33,
}},
{"symbol": "soy", "display_name": "Soybeans",
"futures": {"contract": "ZS=F", "today_last_cents": 1180,
"prev_close_cents": 1177, "change_cents": 3},
"best_for_today": None},
],
}
out = fmt.fmt_summary(payload)
assert "Corn" in out and "Soybeans" in out
assert "Mercer Landmark — St Henry" in out
assert "$4.5800" in out # corn last
assert "No current-month local bid posted" in out # soy fallback
+77
View File
@@ -0,0 +1,77 @@
"""Usage-log smoke tests — make sure JSONL records land on disk."""
from __future__ import annotations
import importlib
import json
import os
import sys
from pathlib import Path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
def test_track_writes_jsonl_record(monkeypatch, tmp_path):
monkeypatch.setenv("USAGE_LOG_DIR", str(tmp_path))
monkeypatch.setenv("USAGE_LOG_KEEP_DAYS", "90")
from ag_bids_mcp import usage
importlib.reload(usage)
with usage.track("best_local_bid", commodity="corn"):
pass
files = list(tmp_path.glob("usage-*.jsonl"))
assert len(files) == 1
line = files[0].read_text().strip().splitlines()[-1]
rec = json.loads(line)
assert rec["tool"] == "best_local_bid"
assert rec["commodity"] == "corn"
assert rec["ok"] is True
assert "elapsed_ms" in rec
def test_track_records_failure(monkeypatch, tmp_path):
monkeypatch.setenv("USAGE_LOG_DIR", str(tmp_path))
from ag_bids_mcp import usage
importlib.reload(usage)
import pytest
with pytest.raises(RuntimeError):
with usage.track("price_history", commodity="corn"):
raise RuntimeError("boom")
files = list(tmp_path.glob("usage-*.jsonl"))
rec = json.loads(files[0].read_text().splitlines()[-1])
assert rec["ok"] is False
assert rec["error_class"] == "RuntimeError"
assert "boom" in rec["error_msg"]
def test_track_no_log_dir_is_silent(monkeypatch):
monkeypatch.delenv("USAGE_LOG_DIR", raising=False)
from ag_bids_mcp import usage
importlib.reload(usage)
# Should not raise even though no log dir is set
with usage.track("noop"):
pass
def test_prune_removes_old_files(monkeypatch, tmp_path):
from datetime import datetime, timedelta, timezone
monkeypatch.setenv("USAGE_LOG_DIR", str(tmp_path))
monkeypatch.setenv("USAGE_LOG_KEEP_DAYS", "1")
from ag_bids_mcp import usage
importlib.reload(usage)
# Create a fake old log file
old_date = (datetime.now(timezone.utc) - timedelta(days=5)).date().isoformat()
old_file = tmp_path / f"usage-{old_date}.jsonl"
old_file.write_text('{"old": true}\n')
assert old_file.exists()
# Writing a new record triggers prune
with usage.track("ping"):
pass
assert not old_file.exists(), "expected old log file to be pruned"