Files
justin 831c3e19de
CI / test (push) Successful in 19s
CI / build-push (push) Successful in 5s
latest_prices: include_expired passthrough for historical data (#2)
2026-06-08 19:45:03 -04:00

127 lines
4.5 KiB
Python

"""Thin HTTPS client for ag-monitor's /api/data/* endpoints.
The MCP server doesn't talk to SQLite directly — it goes over HTTPS to the
ag-monitor service on a different LAN host. One X-API-Key header guards
every call. Failures bubble up as ``AgBidsError`` so tool functions can
turn them into user-friendly markdown.
"""
from __future__ import annotations
import logging
import os
from typing import Any
import httpx
logger = logging.getLogger(__name__)
class AgBidsError(RuntimeError):
"""Anything went wrong calling ag-monitor."""
def _config() -> tuple[str, str, float]:
url = os.environ.get("AG_BIDS_API_URL", "https://agbids.paul.farm").rstrip("/")
key = os.environ.get("AG_BIDS_API_KEY", "")
timeout = float(os.environ.get("AG_BIDS_API_TIMEOUT_SECS", "20"))
if not key:
raise AgBidsError("AG_BIDS_API_KEY is not set in the environment")
return url, key, timeout
def _get(path: str, **params: Any) -> dict | list:
url_base, key, timeout = _config()
url = f"{url_base}{path}"
# Drop None values so the upstream doesn't see empty query strings.
clean: dict[str, Any] = {k: v for k, v in params.items() if v is not None and v != ""}
try:
r = httpx.get(url, params=clean, timeout=timeout,
headers={"X-API-Key": key, "Accept": "application/json"})
except httpx.RequestError as e:
raise AgBidsError(f"GET {url} failed: {e}") from e
if r.status_code != 200:
raise AgBidsError(
f"GET {url} returned {r.status_code}: {r.text[:200]}"
)
try:
return r.json()
except ValueError as e:
raise AgBidsError(f"GET {url} returned non-JSON: {r.text[:200]}") from e
# ---- Public client API (mirrors /api/data/* shape) ----
def latest(commodity: str | None = None, source: str | None = None,
delivery: str | None = None, kind: str | None = None,
zip: str | None = None, lat: float | None = None,
lng: float | None = None, radius_miles: float | None = None,
include_expired: bool = False) -> dict:
# Only send include_expired when True — _get drops None, so the default
# call keeps its existing query string (current + future months only).
return _get("/api/data/latest",
commodity=commodity, source=source, delivery=delivery, kind=kind,
zip=zip, lat=lat, lng=lng, radius_miles=radius_miles,
include_expired=True if include_expired else None)
def history(commodity: str | None = None, source_id: int | None = None,
delivery: str | None = None, days: int = 30) -> dict:
return _get("/api/data/history",
commodity=commodity, source_id=source_id,
delivery=delivery, days=days)
def best(commodity: str, zip: str | None = None, lat: float | None = None,
lng: float | None = None, radius_miles: float | None = None) -> dict:
return _get("/api/data/best", commodity=commodity,
zip=zip, lat=lat, lng=lng, radius_miles=radius_miles)
def price_trend(commodity: str, geo: str = "US", years: int = 10) -> dict:
return _get("/api/data/price-trend", commodity=commodity, geo=geo, years=years)
def price_series(commodity: str, geo: str = "US",
start_year: int | None = None, end_year: int | None = None) -> dict:
return _get("/api/data/price-series", commodity=commodity, geo=geo,
start_year=start_year, end_year=end_year)
def input_cost_trend(item: str, years: int = 10, geo: str | None = None) -> dict:
return _get("/api/data/input-cost-trend", item=item, years=years, geo=geo)
def input_cost_series(item: str, geo: str | None = None) -> dict:
return _get("/api/data/input-cost-series", item=item, geo=geo)
def input_cost_geographies(item: str) -> dict:
return _get("/api/data/input-cost-geographies", item=item)
def nutrient_cost(geo: str | None = None) -> dict:
return _get("/api/data/nutrient-cost", geo=geo)
def futures(commodity: str, delivery: str | None = None) -> dict:
return _get("/api/data/futures", commodity=commodity, delivery=delivery)
def inputs(product: str | None = None) -> dict:
return _get("/api/data/inputs", product=product)
def sources() -> dict:
return _get("/api/data/sources")
def deliveries(commodity: str) -> dict:
return _get("/api/data/deliveries", commodity=commodity)
def todays_summary() -> dict:
"""Brief snapshot — same blob the morning email digest uses."""
return _get("/api/brief/snapshot", kind="morning")