Files
ag-bids-mcp/ag_bids_mcp/format.py
T
justin ffc705f485
CI / test (push) Successful in 17s
CI / build-push (push) Successful in 5s
Show source location (city/state/zip) in list_sources output
/api/data/sources now returns per-source geo; surface it as a Location column
in the sources table.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-30 11:07:34 -04:00

413 lines
15 KiB
Python

"""JSON-from-ag-monitor → markdown helpers.
One function per @mcp.tool. Markdown is what FastMCP tools return; Claude /
OpenWebUI render it nicely. Cents → dollars formatting is centralized here so
all tools use the same precision (4 decimals for $/bu, 2 decimals for $/ton).
"""
from __future__ import annotations
from typing import Optional
def _bu(cents: Optional[int]) -> str:
if cents is None:
return "—"
return f"${cents / 100:.4f}"
def _ton(cents: Optional[int]) -> str:
if cents is None:
return "—"
return f"${cents / 100:.2f}"
def _basis(cents: Optional[int]) -> str:
if cents is None:
return "—"
sign = "+" if cents >= 0 else ""
return f"{sign}{cents / 100:.2f}"
def _delta_arrow(cents: Optional[int]) -> str:
if cents is None or cents == 0:
return "—"
return "▲" if cents > 0 else "▼"
GRAIN = ("corn", "soy", "wheat")
def _basis_move(delta_cents: Optional[int]) -> str:
"""Describe a basis change: positive = cash strengthened vs futures."""
if delta_cents is None:
return "—"
if delta_cents == 0:
return "→ flat"
arrow = "▲" if delta_cents > 0 else "▼"
word = "stronger" if delta_cents > 0 else "weaker"
return f"{arrow} {_basis(delta_cents)} ({word})"
def _build_series(rows: list[dict], default_commodity: Optional[str] = None) -> dict:
"""Group history rows into ordered per-(source, commodity, delivery) lists.
Rows arrive ordered by fetched_at ASC, so each series list stays in time
order. `default_commodity` backfills rows that don't carry their own (older
payload shape / single-commodity queries)."""
series: dict[tuple, list[dict]] = {}
for r in rows:
com = r.get("commodity") or default_commodity or "?"
series.setdefault((r.get("source_name"), com, r.get("delivery")), []).append(r)
return series
def _first_last(points: list[dict], field: str):
"""First and last non-null values of `field` across an ordered point list."""
vals = [p.get(field) for p in points if p.get(field) is not None]
if not vals:
return None, None
return vals[0], vals[-1]
# ---------- best_local_bid ----------
def fmt_best(commodity: str, payload: dict) -> str:
best = payload.get("best")
today = payload.get("today")
if not best:
return (
f"### Best place to sell {commodity} today ({today})\n\n"
f"No current-month {commodity} bids posted across the tracked sources.\n"
)
return (
f"### Best place to sell {commodity} today ({today})\n\n"
f"**{best['source_name']}** — delivery **{best['delivery']}** — "
f"bid **{_bu(best['bid_cents'])}/bu** (basis {_basis(best.get('basis_cents'))}, "
f"futures {best.get('futures_contract') or '—'})\n\n"
f"_Fetched {best.get('fetched_at') or '?'}_\n"
)
# ---------- futures quote + change ----------
def fmt_futures(payload: dict) -> str:
commodity = payload.get("commodity")
delivery = payload.get("delivery")
contract = payload.get("contract")
q = payload.get("quote")
scope = f"{commodity} {contract}" + (f" ({delivery})" if delivery else " (continuous nearby)")
if not q:
return f"### CBOT futures — {scope}\n\nNo futures quote on file yet for this contract.\n"
last = q.get("last_cents")
open_c = q.get("open_cents")
prev = q.get("prev_close_cents")
d_open = q.get("change_since_open_cents")
d_day = q.get("change_on_day_cents")
lines = [f"### CBOT futures — {scope}", ""]
lines.append(f"- **Last**: {_bu(last)} _(settle/last for {q.get('settle_date') or '?'})_")
if open_c is not None:
lines.append(f"- **Open**: {_bu(open_c)}")
if prev is not None:
lines.append(f"- **Prev close**: {_bu(prev)}")
if d_open is not None:
lines.append(f"- **Change since open**: {_delta_arrow(d_open)} {_basis(d_open)}")
else:
lines.append("- **Change since open**: — (no open captured yet)")
if d_day is not None:
lines.append(f"- **Change on day**: {_delta_arrow(d_day)} {_basis(d_day)}")
else:
lines.append("- **Change on day**: — (no prior settle yet)")
return "\n".join(lines) + "\n"
# ---------- inputs / fertilizer ----------
def fmt_inputs(payload: dict) -> str:
rows = payload.get("rows") or []
product = payload.get("product")
title = f"### {product.upper()} prices" if product else "### Fertilizer + lime prices"
if not rows:
scope = product or "any tracked input"
return f"{title}\n\nNo {scope} prices on file.\n"
lines = [
title, "",
"| Source | Product | Delivery | Price ($/ton) | Fetched |",
"|---|---|---|---:|---|",
]
for r in rows:
lines.append(
f"| {r['source_name']} | {r.get('display_name') or r['commodity']} | "
f"{r['delivery']} | {_ton(r.get('bid_cents'))} | {r.get('fetched_at') or '?'} |"
)
return "\n".join(lines) + "\n"
# ---------- latest snapshot ----------
def fmt_latest(payload: dict) -> str:
rows = payload.get("rows") or []
if not rows:
return "### Latest prices\n\nNo rows match those filters.\n"
lines = [
"### Latest prices", "",
"| Source | Commodity | Delivery | Bid | Basis | Futures | Fetched |",
"|---|---|---|---:|---:|---|---|",
]
for r in rows:
unit_fmt = _ton if r.get("commodity_kind") == "fertilizer" else _bu
lines.append(
f"| {r['source_name']} | {r.get('display_name') or r['commodity']} | "
f"{r['delivery']} | {unit_fmt(r.get('bid_cents'))} | "
f"{_basis(r.get('basis_cents'))} | {r.get('futures_contract') or '—'} | "
f"{r.get('fetched_at') or '?'} |"
)
return "\n".join(lines) + "\n"
# ---------- price history ----------
def fmt_history(payload: dict, max_rows: int = 60) -> str:
rows = payload.get("rows") or []
commodity = payload.get("commodity")
days = payload.get("days")
scope = commodity or "all crops"
if not rows:
return f"### Price history — {scope} ({days}d)\n\nNo samples in the window.\n"
# Per (source, commodity, delivery) trend annotation: first vs last sample.
series = _build_series(rows, default_commodity=commodity)
def _com(r: dict) -> str:
return r.get("commodity") or commodity or "?"
lines = [f"### Price history — {scope} — last {days} days", ""]
for (src, com, dlv), pts in sorted(series.items()):
if not pts:
continue
b_first, b_last = _first_last(pts, "bid_cents")
delta = (b_last - b_first) if (b_first is not None and b_last is not None) else None
arrow = _delta_arrow(delta)
bz_first, bz_last = _first_last(pts, "basis_cents")
basis_part = ""
if bz_first is not None:
basis_part = (f" · basis {_basis(bz_first)}{_basis(bz_last)} "
f"{_basis_move(bz_last - bz_first)}")
lines.append(
f"- **{src}** / {com} / {dlv}: {len(pts)} samples · "
f"{_bu(b_first)}{_bu(b_last)} {arrow} "
f"{_basis(delta) if delta is not None else ''}{basis_part}".rstrip()
)
# If the history is shallow include the raw rows too (helpful for charts).
if sum(len(p) for p in series.values()) <= max_rows:
lines.extend([
"",
"| Time | Source | Commodity | Delivery | Bid | Basis | Futures |",
"|---|---|---|---|---:|---:|---:|",
])
for r in rows[-max_rows:]:
lines.append(
f"| {r['fetched_at']} | {r['source_name']} | {_com(r)} | {r['delivery']} | "
f"{_bu(r.get('bid_cents'))} | {_basis(r.get('basis_cents'))} | "
f"{_bu(r.get('futures_cents'))} |"
)
return "\n".join(lines) + "\n"
# ---------- basis movement ----------
def fmt_basis_movement(payload: dict) -> str:
"""Aggregated basis trend per commodity (the cheap, headline view).
Rolls every matching (source, delivery) series up to one line per crop:
average basis first→last across the window and how far it moved. Skips
non-grain rows and series with no basis on file."""
rows = [r for r in (payload.get("rows") or []) if r.get("commodity") in GRAIN]
days = payload.get("days")
if not rows:
return f"### Basis movement — last {days} days\n\nNo grain basis samples in the window.\n"
series = _build_series(rows)
agg: dict[str, dict] = {}
for (src, com, _dlv), pts in series.items():
first, last = _first_last(pts, "basis_cents")
if first is None:
continue
a = agg.setdefault(com, {"firsts": [], "lasts": [], "elevators": set(), "series": 0})
a["firsts"].append(first)
a["lasts"].append(last)
a["elevators"].add(src)
a["series"] += 1
if not agg:
return f"### Basis movement — last {days} days\n\nNo basis data on the matching series.\n"
lines = [f"### Basis movement — last {days} days", ""]
for com in sorted(agg):
a = agg[com]
avg_first = round(sum(a["firsts"]) / len(a["firsts"]))
avg_last = round(sum(a["lasts"]) / len(a["lasts"]))
lines.append(
f"- **{com}**: avg basis {_basis(avg_first)}{_basis(avg_last)} "
f"{_basis_move(avg_last - avg_first)} · "
f"{len(a['elevators'])} elevators, {a['series']} series"
)
return "\n".join(lines) + "\n"
def fmt_basis_detail(payload: dict, max_rows: int = 80) -> str:
"""Per-(elevator, crop, delivery) basis trend — the drill-down view.
One row per series: basis first→last and how far it moved. Done MCP-side so
the caller gets a compact table instead of every raw sample."""
rows = [r for r in (payload.get("rows") or []) if r.get("commodity") in GRAIN]
days = payload.get("days")
if not rows:
return f"### Basis movement by elevator — last {days} days\n\nNo grain basis samples in the window.\n"
series = _build_series(rows)
body: list[str] = []
# Sort by commodity, then elevator, then delivery for stable readable output.
for (src, com, dlv), pts in sorted(series.items(), key=lambda kv: (kv[0][1], kv[0][0], kv[0][2])):
first, last = _first_last(pts, "basis_cents")
if first is None:
continue
body.append(
f"| {com} | {src} | {dlv} | {_basis(first)} | {_basis(last)} | "
f"{_basis_move(last - first)} | {len(pts)} |"
)
if len(body) >= max_rows:
break
if not body:
return f"### Basis movement by elevator — last {days} days\n\nNo basis data on the matching series.\n"
header = [
f"### Basis movement by elevator — last {days} days", "",
"| Commodity | Elevator | Delivery | First | Last | Move | Samples |",
"|---|---|---|---:|---:|---|---:|",
]
return "\n".join(header + body) + "\n"
# ---------- sources / health ----------
def _location(s: dict) -> str:
city, state = s.get("city"), s.get("state")
loc = ", ".join(p for p in (city, state) if p)
if s.get("zip"):
loc = f"{loc} {s['zip']}".strip()
return loc or "—"
def fmt_sources(payload: dict) -> str:
src = payload.get("sources") or []
if not src:
return "### Sources\n\nNo active sources.\n"
lines = [
"### Tracked sources", "",
"| Source | Kind | Location | Last success | Consecutive failures | Last error |",
"|---|---|---|---|---:|---|",
]
for s in src:
lines.append(
f"| {s['name']} | {s['kind']} | {_location(s)} | {s.get('last_success_at') or '—'} | "
f"{s.get('consecutive_failures') or 0} | {s.get('last_error') or ''} |"
)
return "\n".join(lines) + "\n"
def fmt_health(payload: dict) -> str:
src = payload.get("sources") or []
healthy, stale, down = [], [], []
for s in src:
n = s.get("consecutive_failures") or 0
if n >= 3:
down.append(s)
elif not s.get("last_success_at"):
stale.append(s)
else:
healthy.append(s)
lines = ["### Source health", ""]
lines.append(f"- ✅ Healthy: **{len(healthy)}**")
lines.append(f"- ⚠ Stale (never succeeded): **{len(stale)}**")
lines.append(f"- ❌ Down (3+ consecutive failures): **{len(down)}**")
if stale or down:
lines.append("")
lines.append("| Source | State | Last success | Failures | Error |")
lines.append("|---|---|---|---:|---|")
for s in stale + down:
state = "stale" if s in stale else "down"
lines.append(
f"| {s['name']} | {state} | {s.get('last_success_at') or '—'} | "
f"{s.get('consecutive_failures') or 0} | "
f"{(s.get('last_error') or '')[:80]} |"
)
return "\n".join(lines) + "\n"
# ---------- list helpers ----------
def fmt_deliveries(payload: dict) -> str:
commodity = payload.get("commodity")
labels = payload.get("deliveries") or []
if not labels:
return f"### {commodity} deliveries\n\nNo posted deliveries.\n"
return (
f"### Posted delivery labels for {commodity}\n\n"
+ "\n".join(f"- {x}" for x in labels)
+ "\n"
)
def fmt_commodities() -> str:
return (
"### Tracked commodities\n\n"
"- **corn** ($/bu)\n"
"- **soy** — soybeans ($/bu)\n"
"- **wheat** ($/bu) — tracked but excluded from daily brief emails\n"
"- **map** — MAP 11-52-0 ($/ton)\n"
"- **potash** — Potash 0-0-60 ($/ton)\n"
"- **lime** ($/ton)\n"
)
# ---------- today's summary ----------
def fmt_summary(payload: dict) -> str:
today = payload.get("today")
prev = payload.get("prev_trading_day")
lines = [f"### Market summary — today {today} vs prev trading day {prev}", ""]
for c in payload.get("commodities", []):
f = c.get("futures") or {}
chg = f.get("change_cents")
arrow = _delta_arrow(chg)
lines.append(
f"**{c['display_name']}** — CBOT {f.get('contract','?')} "
f"last {_bu(f.get('today_last_cents'))}, prev close "
f"{_bu(f.get('prev_close_cents'))} {arrow} {_basis(chg) if chg is not None else ''}"
)
bft = c.get("best_for_today")
if bft:
lines.append(
f" · Best for today's delivery ({bft['delivery']}): "
f"**{bft['source_name']}** @ {_bu(bft['bid_cents'])} "
f"(basis {_basis(bft.get('basis_cents'))})"
)
else:
lines.append(" · No current-month local bid posted.")
lines.append("")
return "\n".join(lines).rstrip() + "\n"