Files
justin 831c3e19de
CI / test (push) Successful in 19s
CI / build-push (push) Successful in 5s
latest_prices: include_expired passthrough for historical data (#2)
2026-06-08 19:45:03 -04:00

543 lines
21 KiB
Python

"""ag-bids MCP server.
Mirrors the zerto-docs-rag layout — FastMCP, ``@mcp.tool()`` decorators
returning markdown strings, dual stdio/streamable-http transport.
No in-container auth. The MCP's port 8000 isn't exposed outside the private
``mcp-servers_mcp`` Docker network; the only thing that can reach it is the
MetaMCP gateway, and MetaMCP handles auth at the gateway→client edge.
Run locally (stdio for Claude Desktop):
MCP_TRANSPORT=stdio python -m ag_bids_mcp.server
Run in a container (HTTP for MetaMCP upstream):
python -m ag_bids_mcp.server # default transport = streamable-http
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
from typing import Annotated
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from ag_bids_mcp import client, format as fmt
from ag_bids_mcp.usage import track
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr,
)
log = logging.getLogger("ag-bids-mcp")
mcp = FastMCP("ag-bids", stateless_http=True)
VALID_GRAIN = {"corn", "soy", "wheat"}
VALID_INPUT = {"lime", "map", "potash"}
# ============================================================================
# Tools
# ============================================================================
def _location_error(zip: str | None, lat: float | None, lng: float | None) -> str | None:
"""Friendly guard for the (zip XOR gps) rule before hitting the API."""
if zip and (lat is not None or lng is not None):
return "Pass either `zip` or `lat`+`lng`, not both."
if (lat is None) != (lng is None):
return "GPS needs both `lat` and `lng`."
return None
@mcp.tool()
def best_local_bid(
commodity: Annotated[
str, Field(description="Grain to look up: 'corn', 'soy' (soybeans), or 'wheat'.")
],
zip: Annotated[
str | None,
Field(description="Your 5-digit ZIP — limits the search to nearby elevators."),
] = None,
lat: Annotated[
float | None, Field(description="Your latitude (use with `lng` instead of a ZIP).")
] = None,
lng: Annotated[float | None, Field(description="Your longitude (use with `lat`).")] = None,
radius_miles: Annotated[
float | None,
Field(description="Search radius in miles around the location (default 50). "
"Ignored unless a ZIP or lat/lng is given."),
] = None,
) -> str:
"""Return the highest local bid for *this calendar month's* delivery for
the given grain — the "where should I haul today" answer.
Pass a `zip` (or `lat`+`lng`) with optional `radius_miles` to restrict to
elevators near the farmer; results then show the distance to each. Without a
location it considers every scraped elevator. If nothing is in range, it
reports the nearest source and its distance (a coverage gap, not an error).
Note: cash-bid coverage is currently concentrated in Ohio."""
commodity = commodity.strip().lower()
with track("best_local_bid", commodity=commodity, zip=zip or "", radius=radius_miles or 0):
if commodity not in VALID_GRAIN:
return f"`commodity` must be one of: {sorted(VALID_GRAIN)}"
err = _location_error(zip, lat, lng)
if err:
return err
payload = client.best(commodity, zip=zip, lat=lat, lng=lng, radius_miles=radius_miles)
return fmt.fmt_best(commodity, payload)
@mcp.tool()
def futures_quote(
commodity: Annotated[
str, Field(description="Grain: 'corn', 'soy' (soybeans), or 'wheat'.")
],
delivery: Annotated[
str | None,
Field(description="Delivery month (e.g. 'Jul 2026') to resolve the listed "
"contract. Omit for the continuous nearby."),
] = None,
) -> str:
"""CBOT futures price and change for a commodity (optionally one delivery month).
Reports the latest price, today's session open, the prior day's close, and
both moves: change since the open and change on the day (vs the previous
settle). With `delivery` it resolves the listed contract that month prices
against; without it, the continuous nearby."""
cm = commodity.strip().lower()
with track("futures_quote", commodity=cm, delivery=delivery):
if cm not in VALID_GRAIN:
return f"`commodity` must be one of: {sorted(VALID_GRAIN)}"
payload = client.futures(commodity=cm, delivery=delivery)
return fmt.fmt_futures(payload)
@mcp.tool()
def current_lime_price() -> str:
"""Latest lime prices on file across all sources. Lime is rarely posted on
public bid pages — entries usually come from manual admin input."""
with track("current_lime_price"):
payload = client.inputs(product="lime")
return fmt.fmt_inputs(payload)
@mcp.tool()
def current_input_price(
product: Annotated[
str | None,
Field(description="One of: 'lime', 'map', 'potash'. Omit for all three."),
] = None,
) -> str:
"""Latest fertilizer / lime prices ($/ton)."""
p = product.strip().lower() if product else None
with track("current_input_price", product=p):
if p is not None and p not in VALID_INPUT:
return f"`product` must be one of: {sorted(VALID_INPUT)} (or omit)"
payload = client.inputs(product=p)
return fmt.fmt_inputs(payload)
@mcp.tool()
def latest_prices(
commodity: Annotated[
str | None,
Field(description="Filter to one commodity (corn / soy / wheat / map / potash / lime)."),
] = None,
source: Annotated[
str | None,
Field(description="Filter to one source by exact display name."),
] = None,
delivery: Annotated[
str | None,
Field(description="Filter to one delivery label (e.g. 'May 2026', 'Oct/Nov 2026')."),
] = None,
kind: Annotated[
str | None,
Field(description="Filter to one commodity kind: 'grain' or 'fertilizer'. Omit for all."),
] = None,
zip: Annotated[
str | None,
Field(description="Your 5-digit ZIP — keep only nearby sources, nearest first."),
] = None,
lat: Annotated[
float | None, Field(description="Your latitude (use with `lng` instead of a ZIP).")
] = None,
lng: Annotated[float | None, Field(description="Your longitude (use with `lat`).")] = None,
radius_miles: Annotated[
float | None,
Field(description="Search radius in miles around the location (default 50). "
"Ignored unless a ZIP or lat/lng is given."),
] = None,
include_expired: Annotated[
bool,
Field(description="Include past/expired delivery months. Default off — only "
"the current and future months are shown. Set true only when "
"the farmer explicitly asks for historical/past delivery data."),
] = False,
) -> str:
"""Snapshot of the latest scraped bid per (source, commodity, delivery).
Every filter is optional and AND'd together — pivot by elevator, crop,
delivery, or kind in any combination. Pass a `zip` (or `lat`+`lng`) with
optional `radius_miles` to keep only elevators near the farmer, sorted
nearest-first with the distance shown.
By default this shows current + future delivery months only; expired
months are rolled off. Set `include_expired=true` for a historical view."""
cm = commodity.strip().lower() if commodity else None
with track("latest_prices", commodity=cm, source=source, delivery=delivery, kind=kind,
zip=zip or "", radius=radius_miles or 0):
err = _location_error(zip, lat, lng)
if err:
return err
payload = client.latest(commodity=cm, source=source, delivery=delivery, kind=kind,
zip=zip, lat=lat, lng=lng, radius_miles=radius_miles,
include_expired=include_expired)
return fmt.fmt_latest(payload)
def _filter_source(payload: dict, source: str | None) -> dict:
"""Narrow a history payload's rows to one source display name, in place."""
if source:
payload["rows"] = [
r for r in payload.get("rows") or [] if r.get("source_name") == source
]
return payload
@mcp.tool()
def price_history(
commodity: Annotated[
str | None,
Field(description="Filter to one crop (corn / soy / wheat / map / potash / "
"lime). Omit to span every crop."),
] = None,
source: Annotated[
str | None,
Field(description="Filter to one elevator by exact display name. Omit for all."),
] = None,
delivery: Annotated[
str | None,
Field(description="Filter to one delivery label (e.g. 'Jul 2026'). Omit for all."),
] = None,
days: Annotated[
int, Field(ge=1, le=365, description="Lookback window in days.")
] = 30,
) -> str:
"""Price history per (elevator, crop, delivery), with every filter optional.
Pivot it however you like — one crop at one elevator, every elevator for one
crop, one delivery period across all crops, etc. Each series gets a ▲/▼ bid
trend plus its basis first→last; the raw bid/basis/futures points are
included when the window has fewer than ~60 samples."""
cm = commodity.strip().lower() if commodity else None
with track("price_history", commodity=cm, source=source, delivery=delivery, days=days):
payload = client.history(commodity=cm, delivery=delivery, days=days)
return fmt.fmt_history(_filter_source(payload, source))
@mcp.tool()
def basis_movement(
commodity: Annotated[
str | None,
Field(description="Filter to one grain (corn / soy / wheat). Omit to span all grains."),
] = None,
source: Annotated[
str | None,
Field(description="Filter to one elevator by exact display name. Omit for all."),
] = None,
delivery: Annotated[
str | None,
Field(description="Filter to one delivery label (e.g. 'Jul 2026'). Omit for all."),
] = None,
days: Annotated[
int, Field(ge=1, le=365, description="Lookback window in days.")
] = 30,
) -> str:
"""Aggregated basis trend — one headline line per crop (the cheap overview).
Rolls every matching elevator/delivery series up to a single average basis
first→last per crop, with how far it moved (positive = cash strengthened vs
futures). Use this for 'how is basis moving overall', optionally narrowed to
a crop and/or elevator. For the per-elevator breakdown use basis_detail."""
cm = commodity.strip().lower() if commodity else None
with track("basis_movement", commodity=cm, source=source, delivery=delivery, days=days):
if cm is not None and cm not in VALID_GRAIN:
return f"`commodity` must be one of {sorted(VALID_GRAIN)} (or omit for all grains)"
payload = client.history(commodity=cm, delivery=delivery, days=days)
return fmt.fmt_basis_movement(_filter_source(payload, source))
@mcp.tool()
def basis_detail(
commodity: Annotated[
str | None,
Field(description="Filter to one grain (corn / soy / wheat). Omit to span all grains."),
] = None,
source: Annotated[
str | None,
Field(description="Filter to one elevator by exact display name. Omit for all."),
] = None,
delivery: Annotated[
str | None,
Field(description="Filter to one delivery label (e.g. 'Jul 2026'). Omit for all."),
] = None,
days: Annotated[
int, Field(ge=1, le=365, description="Lookback window in days.")
] = 30,
) -> str:
"""Per-(elevator, crop, delivery) basis trend — the drill-down for basis_movement.
One compact row per series: basis first→last over the window and how far it
moved. All filters optional, so you can scope to one crop, one elevator, one
delivery, or any combination."""
cm = commodity.strip().lower() if commodity else None
with track("basis_detail", commodity=cm, source=source, delivery=delivery, days=days):
if cm is not None and cm not in VALID_GRAIN:
return f"`commodity` must be one of {sorted(VALID_GRAIN)} (or omit for all grains)"
payload = client.history(commodity=cm, delivery=delivery, days=days)
return fmt.fmt_basis_detail(_filter_source(payload, source))
@mcp.tool()
def price_trend(
commodity: Annotated[
str, Field(description="Grain: 'corn', 'soy', or 'wheat'.")
],
geo: Annotated[
str,
Field(description="'US' or a 2-letter state (e.g. 'OH', 'IA'). Default US."),
] = "US",
years: Annotated[
int, Field(ge=1, le=120, description="Baseline window for the seasonal normal/percentile."),
] = 10,
) -> str:
"""USDA NASS monthly price *received by farmers* ($/bu) with the change.
Returns the latest real price plus month-over-month and year-over-year moves
(dollars + percent) and where it sits seasonally (percentile vs the same
month over the last N years, normal, and range). This is the macro/seasonal
benchmark to compare local cash bids against — corn history goes back to 1908."""
cm = commodity.strip().lower()
g = geo.strip().upper()
with track("price_trend", commodity=cm, geo=g, years=years):
if cm not in VALID_GRAIN:
return f"`commodity` must be one of: {sorted(VALID_GRAIN)}"
return fmt.fmt_price_trend(client.price_trend(commodity=cm, geo=g, years=years))
@mcp.tool()
def price_series(
commodity: Annotated[
str, Field(description="Grain: 'corn', 'soy', or 'wheat'.")
],
geo: Annotated[
str, Field(description="'US' or a 2-letter state. Default US."),
] = "US",
start_year: Annotated[
int | None, Field(description="Optional first year (default: ~last 5 yrs of the full series).")
] = None,
end_year: Annotated[int | None, Field(description="Optional last year.")] = None,
) -> str:
"""Raw monthly price-received series ($/bu) for charting / drill-down.
Use price_trend for the headline; use this when you need the actual monthly
numbers. Defaults to the recent window unless you pass a start_year."""
cm = commodity.strip().lower()
g = geo.strip().upper()
with track("price_series", commodity=cm, geo=g):
if cm not in VALID_GRAIN:
return f"`commodity` must be one of: {sorted(VALID_GRAIN)}"
# Default to a readable recent window if no range given.
if start_year is None and end_year is None:
from datetime import datetime
start_year = datetime.utcnow().year - 5
payload = client.price_series(commodity=cm, geo=g, start_year=start_year, end_year=end_year)
return fmt.fmt_price_series(payload)
# Fuel is national (US); fertilizer is regional ($/ton, USDA AgTransport).
VALID_INPUTS = {"diesel", "urea", "uan", "anhydrous", "dap", "map", "potash"}
_FERT_INPUTS = {"urea", "uan", "anhydrous", "dap", "map", "potash"}
@mcp.tool()
def input_cost_trend(
item: Annotated[
str,
Field(description="Input to price: 'diesel' (U.S. retail $/gal) or a fertilizer "
"($/ton) — 'urea', 'uan', 'anhydrous', 'dap', 'map', 'potash'."),
],
geo: Annotated[
str | None,
Field(description="Fertilizer region (default 'Cornbelt'). Other regions: "
"'U.S. Gulf NOLA', 'Northern Plains', 'Southern Plains', "
"'Southeast', 'Northeast', 'California', 'Pacific Northwest', "
"'South Central', 'Central Florida', 'Tampa'. Ignored for diesel (US)."),
] = None,
years: Annotated[
int, Field(ge=1, le=120, description="Baseline window for the seasonal normal/percentile."),
] = 10,
) -> str:
"""Real input cost with the change — retail diesel ($/gal) or fertilizer ($/ton).
Latest real price + period-over-period and year-over-year moves + seasonal
percentile/range. Diesel is U.S. weekly (back to 1994); fertilizer is monthly
regional (USDA AgTransport, Cornbelt default, back to 2023). This is the
input-cost side for the advisor. For a one-off current fertilizer quote from a
local dealer feed, current_input_price (DTN) still applies."""
it = item.strip().lower()
g = geo.strip() if geo else None
with track("input_cost_trend", item=it, geo=g or "", years=years):
if it not in VALID_INPUTS:
return f"`item` must be one of: {sorted(VALID_INPUTS)}"
return fmt.fmt_price_trend(client.input_cost_trend(item=it, years=years, geo=g))
@mcp.tool()
def input_cost_series(
item: Annotated[
str, Field(description="Input: 'diesel' or a fertilizer ('urea', 'uan', "
"'anhydrous', 'dap', 'map', 'potash').")],
geo: Annotated[
str | None,
Field(description="Fertilizer region (default 'Cornbelt'). Ignored for diesel."),
] = None,
) -> str:
"""Raw historical series for a tracked input cost (diesel $/gal or fertilizer $/ton)."""
it = item.strip().lower()
g = geo.strip() if geo else None
with track("input_cost_series", item=it, geo=g or ""):
if it not in VALID_INPUTS:
return f"`item` must be one of: {sorted(VALID_INPUTS)}"
return fmt.fmt_price_series(client.input_cost_series(item=it, geo=g))
@mcp.tool()
def nutrient_cost(
geo: Annotated[
str | None,
Field(description="Fertilizer region (default 'Cornbelt'). Same regions as "
"input_cost_trend (e.g. 'U.S. Gulf NOLA', 'Northern Plains')."),
] = None,
) -> str:
"""Cheapest fertilizer per POUND of nutrient — "what's the best value for N (or P/K)?".
Converts each fertilizer's regional $/ton (USDA AgTransport, latest month) into
$/lb of N, P2O5, and K2O from its grade, ranks them, and names the cheapest
source of each nutrient (anhydrous usually wins on N; DAP/MAP are phosphate
buys; potash is the K source). Use THIS — not input_cost_trend ($/ton only) —
whenever the grower asks which fertilizer is the best buy / best nitrogen value.
UAN grade is assumed 32-0-0."""
g = geo.strip() if geo else None
with track("nutrient_cost", geo=g or ""):
return fmt.fmt_nutrient_cost(client.nutrient_cost(geo=g))
@mcp.tool()
def list_sources() -> str:
"""All active scrapers + their last-success timestamps and any pending failures."""
with track("list_sources"):
payload = client.sources()
return fmt.fmt_sources(payload)
@mcp.tool()
def list_commodities() -> str:
"""The complete set of commodities tracked by ag-monitor."""
with track("list_commodities"):
return fmt.fmt_commodities()
@mcp.tool()
def list_deliveries(
commodity: Annotated[
str, Field(description="Commodity whose posted delivery labels you want.")
],
) -> str:
"""All posted delivery labels for a commodity, sorted chronologically."""
cm = commodity.strip().lower()
with track("list_deliveries", commodity=cm):
payload = client.deliveries(cm)
return fmt.fmt_deliveries(payload)
@mcp.tool()
def source_health() -> str:
"""Operational status of every source: healthy, stale, or down."""
with track("source_health"):
payload = client.sources()
return fmt.fmt_health(payload)
@mcp.tool()
def todays_summary() -> str:
"""Today's market snapshot — same blob used by the morning email brief.
Includes CBOT corn + soy continuous futures vs the previous trading
day's close, and the best local bid for each commodity's current-month
delivery."""
with track("todays_summary"):
payload = client.todays_summary()
return fmt.fmt_summary(payload)
# ============================================================================
# Entry point
# ============================================================================
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument(
"--transport",
default=os.environ.get("MCP_TRANSPORT", "stdio"),
choices=["stdio", "streamable-http", "sse"],
)
p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0"))
p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000")))
args = p.parse_args()
if args.transport == "stdio":
log.info("starting ag-bids MCP on stdio")
mcp.run()
return
# Same DNS-rebinding logic as zerto-docs-rag: behind a Docker DNS name
# like "ag-bids-mcp:8000", FastMCP's default localhost-only check would
# 421 every request.
allowed_hosts = os.environ.get("MCP_ALLOWED_HOSTS")
allowed_origins = os.environ.get("MCP_ALLOWED_ORIGINS")
if (
os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}
or allowed_hosts == "*"
or allowed_origins == "*"
):
mcp.settings.transport_security.enable_dns_rebinding_protection = False
else:
if allowed_hosts:
mcp.settings.transport_security.allowed_hosts = [
h.strip() for h in allowed_hosts.split(",") if h.strip()
]
if allowed_origins:
mcp.settings.transport_security.allowed_origins = [
o.strip() for o in allowed_origins.split(",") if o.strip()
]
mcp.settings.host = args.host
mcp.settings.port = args.port
log.info("starting ag-bids MCP on %s://%s:%s", args.transport, args.host, args.port)
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()