Files
ag-bids-mcp/ag_bids_mcp/client.py
T
justin 3340747600 Add basis_movement/basis_detail tools; make history + latest fully filterable
New MCP tools:
- basis_movement: aggregated basis trend, one headline line per crop. The cheap
  "how is basis moving overall" view; optional commodity/source/delivery/days.
- basis_detail: per-(elevator, crop, delivery) basis first→last drill-down.

Both do the aggregation MCP-side and return compact markdown to keep token
burn low, so a client can call the cheap aggregate first and drill in only when
needed.

Flexibility/parity changes:
- price_history: commodity is now optional (spans all crops); groups by
  (source, commodity, delivery); surfaces basis first→last in the summary and
  adds a futures column to the raw table.
- latest_prices: expose the `kind` filter (grain/fertilizer) that the API and
  client already supported.
- client.history(): commodity optional.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 15:25:39 -04:00

88 lines
2.8 KiB
Python

"""Thin HTTPS client for ag-monitor's /api/data/* endpoints.
The MCP server doesn't talk to SQLite directly — it goes over HTTPS to the
ag-monitor service on a different LAN host. One X-API-Key header guards
every call. Failures bubble up as ``AgBidsError`` so tool functions can
turn them into user-friendly markdown.
"""
from __future__ import annotations
import logging
import os
from typing import Any
import httpx
logger = logging.getLogger(__name__)
class AgBidsError(RuntimeError):
"""Anything went wrong calling ag-monitor."""
def _config() -> tuple[str, str, float]:
url = os.environ.get("AG_BIDS_API_URL", "https://agbids.paul.farm").rstrip("/")
key = os.environ.get("AG_BIDS_API_KEY", "")
timeout = float(os.environ.get("AG_BIDS_API_TIMEOUT_SECS", "20"))
if not key:
raise AgBidsError("AG_BIDS_API_KEY is not set in the environment")
return url, key, timeout
def _get(path: str, **params: Any) -> dict | list:
url_base, key, timeout = _config()
url = f"{url_base}{path}"
# Drop None values so the upstream doesn't see empty query strings.
clean: dict[str, Any] = {k: v for k, v in params.items() if v is not None and v != ""}
try:
r = httpx.get(url, params=clean, timeout=timeout,
headers={"X-API-Key": key, "Accept": "application/json"})
except httpx.RequestError as e:
raise AgBidsError(f"GET {url} failed: {e}") from e
if r.status_code != 200:
raise AgBidsError(
f"GET {url} returned {r.status_code}: {r.text[:200]}"
)
try:
return r.json()
except ValueError as e:
raise AgBidsError(f"GET {url} returned non-JSON: {r.text[:200]}") from e
# ---- Public client API (mirrors /api/data/* shape) ----
def latest(commodity: str | None = None, source: str | None = None,
delivery: str | None = None, kind: str | None = None) -> dict:
return _get("/api/data/latest",
commodity=commodity, source=source, delivery=delivery, kind=kind)
def history(commodity: str | None = None, source_id: int | None = None,
delivery: str | None = None, days: int = 30) -> dict:
return _get("/api/data/history",
commodity=commodity, source_id=source_id,
delivery=delivery, days=days)
def best(commodity: str) -> dict:
return _get("/api/data/best", commodity=commodity)
def inputs(product: str | None = None) -> dict:
return _get("/api/data/inputs", product=product)
def sources() -> dict:
return _get("/api/data/sources")
def deliveries(commodity: str) -> dict:
return _get("/api/data/deliveries", commodity=commodity)
def todays_summary() -> dict:
"""Brief snapshot — same blob the morning email digest uses."""
return _get("/api/brief/snapshot", kind="morning")