"""ag-bids MCP server. Mirrors the zerto-docs-rag layout — FastMCP, ``@mcp.tool()`` decorators returning markdown strings, dual stdio/streamable-http transport. No in-container auth. The MCP's port 8000 isn't exposed outside the private ``mcp-servers_mcp`` Docker network; the only thing that can reach it is the MetaMCP gateway, and MetaMCP handles auth at the gateway→client edge. Run locally (stdio for Claude Desktop): MCP_TRANSPORT=stdio python -m ag_bids_mcp.server Run in a container (HTTP for MetaMCP upstream): python -m ag_bids_mcp.server # default transport = streamable-http """ from __future__ import annotations import argparse import logging import os import sys from typing import Annotated from mcp.server.fastmcp import FastMCP from pydantic import Field from ag_bids_mcp import client, format as fmt from ag_bids_mcp.usage import track logging.basicConfig( level=os.environ.get("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(name)s %(message)s", stream=sys.stderr, ) log = logging.getLogger("ag-bids-mcp") mcp = FastMCP("ag-bids", stateless_http=True) VALID_GRAIN = {"corn", "soy", "wheat"} VALID_INPUT = {"lime", "map", "potash"} # ============================================================================ # Tools # ============================================================================ @mcp.tool() def best_local_bid( commodity: Annotated[ str, Field(description="Grain to look up: 'corn', 'soy' (soybeans), or 'wheat'.") ], ) -> str: """Return the highest local bid for *this calendar month's* delivery for the given grain. This is the "where should I haul today" answer.""" commodity = commodity.strip().lower() with track("best_local_bid", commodity=commodity): if commodity not in VALID_GRAIN: return f"`commodity` must be one of: {sorted(VALID_GRAIN)}" payload = client.best(commodity) return fmt.fmt_best(commodity, payload) @mcp.tool() def futures_quote( commodity: Annotated[ str, Field(description="Grain: 'corn', 'soy' (soybeans), or 'wheat'.") ], delivery: Annotated[ str | None, Field(description="Delivery month (e.g. 'Jul 2026') to resolve the listed " "contract. Omit for the continuous nearby."), ] = None, ) -> str: """CBOT futures price and change for a commodity (optionally one delivery month). Reports the latest price, today's session open, the prior day's close, and both moves: change since the open and change on the day (vs the previous settle). With `delivery` it resolves the listed contract that month prices against; without it, the continuous nearby.""" cm = commodity.strip().lower() with track("futures_quote", commodity=cm, delivery=delivery): if cm not in VALID_GRAIN: return f"`commodity` must be one of: {sorted(VALID_GRAIN)}" payload = client.futures(commodity=cm, delivery=delivery) return fmt.fmt_futures(payload) @mcp.tool() def current_lime_price() -> str: """Latest lime prices on file across all sources. Lime is rarely posted on public bid pages — entries usually come from manual admin input.""" with track("current_lime_price"): payload = client.inputs(product="lime") return fmt.fmt_inputs(payload) @mcp.tool() def current_input_price( product: Annotated[ str | None, Field(description="One of: 'lime', 'map', 'potash'. Omit for all three."), ] = None, ) -> str: """Latest fertilizer / lime prices ($/ton).""" p = product.strip().lower() if product else None with track("current_input_price", product=p): if p is not None and p not in VALID_INPUT: return f"`product` must be one of: {sorted(VALID_INPUT)} (or omit)" payload = client.inputs(product=p) return fmt.fmt_inputs(payload) @mcp.tool() def latest_prices( commodity: Annotated[ str | None, Field(description="Filter to one commodity (corn / soy / wheat / map / potash / lime)."), ] = None, source: Annotated[ str | None, Field(description="Filter to one source by exact display name."), ] = None, delivery: Annotated[ str | None, Field(description="Filter to one delivery label (e.g. 'May 2026', 'Oct/Nov 2026')."), ] = None, kind: Annotated[ str | None, Field(description="Filter to one commodity kind: 'grain' or 'fertilizer'. Omit for all."), ] = None, ) -> str: """Snapshot of the latest scraped bid per (source, commodity, delivery). Every filter is optional and AND'd together — pivot by elevator, crop, delivery, or kind in any combination.""" cm = commodity.strip().lower() if commodity else None with track("latest_prices", commodity=cm, source=source, delivery=delivery, kind=kind): payload = client.latest(commodity=cm, source=source, delivery=delivery, kind=kind) return fmt.fmt_latest(payload) def _filter_source(payload: dict, source: str | None) -> dict: """Narrow a history payload's rows to one source display name, in place.""" if source: payload["rows"] = [ r for r in payload.get("rows") or [] if r.get("source_name") == source ] return payload @mcp.tool() def price_history( commodity: Annotated[ str | None, Field(description="Filter to one crop (corn / soy / wheat / map / potash / " "lime). Omit to span every crop."), ] = None, source: Annotated[ str | None, Field(description="Filter to one elevator by exact display name. Omit for all."), ] = None, delivery: Annotated[ str | None, Field(description="Filter to one delivery label (e.g. 'Jul 2026'). Omit for all."), ] = None, days: Annotated[ int, Field(ge=1, le=365, description="Lookback window in days.") ] = 30, ) -> str: """Price history per (elevator, crop, delivery), with every filter optional. Pivot it however you like — one crop at one elevator, every elevator for one crop, one delivery period across all crops, etc. Each series gets a ▲/▼ bid trend plus its basis first→last; the raw bid/basis/futures points are included when the window has fewer than ~60 samples.""" cm = commodity.strip().lower() if commodity else None with track("price_history", commodity=cm, source=source, delivery=delivery, days=days): payload = client.history(commodity=cm, delivery=delivery, days=days) return fmt.fmt_history(_filter_source(payload, source)) @mcp.tool() def basis_movement( commodity: Annotated[ str | None, Field(description="Filter to one grain (corn / soy / wheat). Omit to span all grains."), ] = None, source: Annotated[ str | None, Field(description="Filter to one elevator by exact display name. Omit for all."), ] = None, delivery: Annotated[ str | None, Field(description="Filter to one delivery label (e.g. 'Jul 2026'). Omit for all."), ] = None, days: Annotated[ int, Field(ge=1, le=365, description="Lookback window in days.") ] = 30, ) -> str: """Aggregated basis trend — one headline line per crop (the cheap overview). Rolls every matching elevator/delivery series up to a single average basis first→last per crop, with how far it moved (positive = cash strengthened vs futures). Use this for 'how is basis moving overall', optionally narrowed to a crop and/or elevator. For the per-elevator breakdown use basis_detail.""" cm = commodity.strip().lower() if commodity else None with track("basis_movement", commodity=cm, source=source, delivery=delivery, days=days): if cm is not None and cm not in VALID_GRAIN: return f"`commodity` must be one of {sorted(VALID_GRAIN)} (or omit for all grains)" payload = client.history(commodity=cm, delivery=delivery, days=days) return fmt.fmt_basis_movement(_filter_source(payload, source)) @mcp.tool() def basis_detail( commodity: Annotated[ str | None, Field(description="Filter to one grain (corn / soy / wheat). Omit to span all grains."), ] = None, source: Annotated[ str | None, Field(description="Filter to one elevator by exact display name. Omit for all."), ] = None, delivery: Annotated[ str | None, Field(description="Filter to one delivery label (e.g. 'Jul 2026'). Omit for all."), ] = None, days: Annotated[ int, Field(ge=1, le=365, description="Lookback window in days.") ] = 30, ) -> str: """Per-(elevator, crop, delivery) basis trend — the drill-down for basis_movement. One compact row per series: basis first→last over the window and how far it moved. All filters optional, so you can scope to one crop, one elevator, one delivery, or any combination.""" cm = commodity.strip().lower() if commodity else None with track("basis_detail", commodity=cm, source=source, delivery=delivery, days=days): if cm is not None and cm not in VALID_GRAIN: return f"`commodity` must be one of {sorted(VALID_GRAIN)} (or omit for all grains)" payload = client.history(commodity=cm, delivery=delivery, days=days) return fmt.fmt_basis_detail(_filter_source(payload, source)) @mcp.tool() def list_sources() -> str: """All active scrapers + their last-success timestamps and any pending failures.""" with track("list_sources"): payload = client.sources() return fmt.fmt_sources(payload) @mcp.tool() def list_commodities() -> str: """The complete set of commodities tracked by ag-monitor.""" with track("list_commodities"): return fmt.fmt_commodities() @mcp.tool() def list_deliveries( commodity: Annotated[ str, Field(description="Commodity whose posted delivery labels you want.") ], ) -> str: """All posted delivery labels for a commodity, sorted chronologically.""" cm = commodity.strip().lower() with track("list_deliveries", commodity=cm): payload = client.deliveries(cm) return fmt.fmt_deliveries(payload) @mcp.tool() def source_health() -> str: """Operational status of every source: healthy, stale, or down.""" with track("source_health"): payload = client.sources() return fmt.fmt_health(payload) @mcp.tool() def todays_summary() -> str: """Today's market snapshot — same blob used by the morning email brief. Includes CBOT corn + soy continuous futures vs the previous trading day's close, and the best local bid for each commodity's current-month delivery.""" with track("todays_summary"): payload = client.todays_summary() return fmt.fmt_summary(payload) # ============================================================================ # Entry point # ============================================================================ def main() -> None: p = argparse.ArgumentParser() p.add_argument( "--transport", default=os.environ.get("MCP_TRANSPORT", "stdio"), choices=["stdio", "streamable-http", "sse"], ) p.add_argument("--host", default=os.environ.get("MCP_HOST", "0.0.0.0")) p.add_argument("--port", type=int, default=int(os.environ.get("MCP_PORT", "8000"))) args = p.parse_args() if args.transport == "stdio": log.info("starting ag-bids MCP on stdio") mcp.run() return # Same DNS-rebinding logic as zerto-docs-rag: behind a Docker DNS name # like "ag-bids-mcp:8000", FastMCP's default localhost-only check would # 421 every request. allowed_hosts = os.environ.get("MCP_ALLOWED_HOSTS") allowed_origins = os.environ.get("MCP_ALLOWED_ORIGINS") if ( os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"} or allowed_hosts == "*" or allowed_origins == "*" ): mcp.settings.transport_security.enable_dns_rebinding_protection = False else: if allowed_hosts: mcp.settings.transport_security.allowed_hosts = [ h.strip() for h in allowed_hosts.split(",") if h.strip() ] if allowed_origins: mcp.settings.transport_security.allowed_origins = [ o.strip() for o in allowed_origins.split(",") if o.strip() ] mcp.settings.host = args.host mcp.settings.port = args.port log.info("starting ag-bids MCP on %s://%s:%s", args.transport, args.host, args.port) mcp.run(transport=args.transport) if __name__ == "__main__": main()