Files
ag-bids-mcp/ag_bids_mcp/format.py
claude bb4219da87
CI / test (push) Successful in 18s
CI / build-push (push) Successful in 5s
feat: nutrient_cost tool — cheapest fertilizer per lb of N/P/K (#1)
Co-authored-by: claude <claude@jpaul.io>
Co-committed-by: claude <claude@jpaul.io>
2026-06-04 15:58:59 -04:00

598 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""JSON-from-ag-monitor → markdown helpers.
One function per @mcp.tool. Markdown is what FastMCP tools return; Claude /
OpenWebUI render it nicely. Cents → dollars formatting is centralized here so
all tools use the same precision (4 decimals for $/bu, 2 decimals for $/ton).
"""
from __future__ import annotations
from typing import Optional
def _bu(cents: Optional[int]) -> str:
if cents is None:
return "—"
return f"${cents / 100:.4f}"
def _ton(cents: Optional[int]) -> str:
if cents is None:
return "—"
return f"${cents / 100:.2f}"
def _per_lb(cents: Optional[int]) -> str:
if cents is None:
return "—"
return f"${cents / 100:.2f}"
def _basis(cents: Optional[int]) -> str:
if cents is None:
return "—"
sign = "+" if cents >= 0 else ""
return f"{sign}{cents / 100:.2f}"
def _delta_arrow(cents: Optional[int]) -> str:
if cents is None or cents == 0:
return "—"
return "▲" if cents > 0 else "▼"
GRAIN = ("corn", "soy", "wheat")
def _basis_move(delta_cents: Optional[int]) -> str:
"""Describe a basis change: positive = cash strengthened vs futures."""
if delta_cents is None:
return "—"
if delta_cents == 0:
return "→ flat"
arrow = "▲" if delta_cents > 0 else "▼"
word = "stronger" if delta_cents > 0 else "weaker"
return f"{arrow} {_basis(delta_cents)} ({word})"
def _build_series(rows: list[dict], default_commodity: Optional[str] = None) -> dict:
"""Group history rows into ordered per-(source, commodity, delivery) lists.
Rows arrive ordered by fetched_at ASC, so each series list stays in time
order. `default_commodity` backfills rows that don't carry their own (older
payload shape / single-commodity queries)."""
series: dict[tuple, list[dict]] = {}
for r in rows:
com = r.get("commodity") or default_commodity or "?"
series.setdefault((r.get("source_name"), com, r.get("delivery")), []).append(r)
return series
def _first_last(points: list[dict], field: str):
"""First and last non-null values of `field` across an ordered point list."""
vals = [p.get(field) for p in points if p.get(field) is not None]
if not vals:
return None, None
return vals[0], vals[-1]
# ---------- best_local_bid ----------
def _loc_suffix(d: dict) -> str:
"""' (City, ST)' when a row/result carries city/state, else ''."""
city, state = d.get("city"), d.get("state")
if city and state:
return f" ({city}, {state})"
return f" ({state})" if state else ""
def _mi(d) -> str:
return "—" if d is None else f"{d:.1f} mi"
def _where(center: dict) -> str:
"""Human label for a resolved center point."""
if center.get("source") == "zip":
return f"ZIP {center.get('zip')}"
near = f" (near {center.get('zip')})" if center.get("zip") else ""
return f"{center['lat']:.4f}, {center['lng']:.4f}{near}"
def fmt_best(commodity: str, payload: dict) -> str:
best = payload.get("best")
today = payload.get("today")
center = payload.get("center")
radius = payload.get("radius_miles")
scope = f" within {radius:.0f} mi of {_where(center)}" if center else ""
head = f"### Best place to sell {commodity} today ({today})\n\n"
if not best:
if center:
out = head + f"No current-month {commodity} bids{scope}.\n"
near = payload.get("nearest")
if near:
out += (f"\nNearest tracked elevator: **{near['source_name']}**"
f"{_loc_suffix(near)} — about **{_mi(near.get('distance_miles'))}** "
f"away, outside the {radius:.0f} mi radius.\n")
return out
return head + f"No current-month {commodity} bids posted across the tracked sources.\n"
dist = best.get("distance_miles")
dist_txt = f" — **{_mi(dist)}** away" if dist is not None else ""
out = (
head
+ f"**{best['source_name']}**{_loc_suffix(best)}{dist_txt} — delivery "
f"**{best['delivery']}** — bid **{_bu(best['bid_cents'])}/bu** "
f"(basis {_basis(best.get('basis_cents'))}, "
f"futures {best.get('futures_contract') or '—'})\n"
)
if center:
out += f"\n_Searched{scope}._\n"
out += f"\n_Fetched {best.get('fetched_at') or '?'}_\n"
return out
# ---------- reference price trends (USDA NASS / EIA) ----------
def _unit_money(cents, unit: str) -> str:
if cents is None:
return "—"
dollars = cents / 100
if unit == "$/gal":
return f"${dollars:.3f}/gal"
if unit == "$/bu":
return f"${dollars:.2f}/bu"
if unit == "$/ton":
return f"${dollars:.2f}/ton"
return f"${dollars:.2f}"
def _signed(cents) -> str:
if cents is None:
return "—"
return f"{'+' if cents >= 0 else ''}${abs(cents)/100:.2f}"
def _pct(p) -> str:
return "—" if p is None else f"{'+' if p >= 0 else ''}{abs(p):.1f}%"
def _ym(period: str) -> str:
# 2026-04-01 -> "Apr 2026"; weekly date -> as-is
try:
y, m, d = period.split("-")
return f"{_MONTH_NAMES_3[int(m)-1]} {y}" if d == "01" else period
except Exception:
return period
_MONTH_NAMES_3 = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
def fmt_price_trend(payload: dict, label: str = "price received") -> str:
name = payload.get("commodity") or payload.get("item")
geo = payload.get("geo")
label = payload.get("label") or label
unit = payload.get("unit") or "$/bu"
t = payload.get("trend")
loc = f" — {geo}" if geo else ""
if not t:
return f"### {name}{loc}{label}\n\nNo data on file.\n"
when = _ym(t["period"])
lines = [f"### {name}{loc}{label}, {when}: {_unit_money(t['value_cents'], unit)}", ""]
arrow = _delta_arrow(t.get("change_cents"))
lines.append(f"- **Change:** {arrow} {_signed(t.get('change_cents'))} ({_pct(t.get('change_pct'))}) vs prior period")
if t.get("yoy_cents") is not None:
lines.append(f"- **Year-over-year:** {_signed(t.get('yoy_change_cents'))} ({_pct(t.get('yoy_pct'))})")
s = t.get("seasonal")
if s:
lines.append(
f"- **Seasonal:** {s['percentile']}th pct vs last {s['sample_years']} same-months "
f"(normal {_unit_money(s['normal_cents'], unit)}, range "
f"{_unit_money(s['min_cents'], unit)}{_unit_money(s['max_cents'], unit)}) "
f{_pct(s.get('vs_normal_pct'))} vs normal")
lines.append(f"- **Recent direction:** {_delta_arrow({'up':1,'down':-1,'flat':0}[t['recent_direction']])} {t['recent_direction']}")
lines.append(f"\n_Source: {payload.get('source') or 'USDA NASS'} · {t['points']} periods on file_")
return "\n".join(lines) + "\n"
def fmt_price_series(payload: dict, max_points: int = 60) -> str:
name = payload.get("commodity") or payload.get("item")
geo = payload.get("geo")
unit = payload.get("unit") or "$/bu"
series = payload.get("series") or []
loc = f" — {geo}" if geo else ""
if not series:
return f"### {name}{loc} series\n\nNo data on file.\n"
shown = series[-max_points:]
head = [f"### {name}{loc}{payload.get('count', len(series))} periods "
f"(showing last {len(shown)})", "",
"| Period | Price |", "|---|---:|"]
body = [f"| {_ym(p['period'])} | {_unit_money(p['value_cents'], unit)} |" for p in shown]
return "\n".join(head + body) + "\n"
_NUTRIENT_LABEL = {"n": "Nitrogen (N)", "p2o5": "Phosphate (P₂O₅)", "k2o": "Potash (K₂O)"}
def fmt_nutrient_cost(payload: dict) -> str:
"""Cheapest fertilizer per pound of N / P2O5 / K2O for a region."""
geo = payload.get("geo") or "Cornbelt"
src = payload.get("source") or "USDA AgTransport"
products = payload.get("products") or []
cheapest = payload.get("cheapest") or {}
if not products:
return f"### Fertilizer value per nutrient — {geo}\n\nNo data on file.\n"
by_item = {p["item"]: p for p in products}
lines = [f"### Fertilizer value per pound of nutrient — {geo}", ""]
for nut in ("n", "p2o5", "k2o"):
it = cheapest.get(nut)
prod = by_item.get(it or "")
if prod is not None:
c = (prod.get("cost_per_lb") or {}).get(nut)
lines.append(
f"- **Cheapest {_NUTRIENT_LABEL[nut]}:** "
f"{prod.get('label') or it} at {_per_lb(c)}/lb"
)
lines += ["", "| Product | Grade | $/ton | $/lb N | $/lb P₂O₅ | $/lb K₂O |",
"|---|---|---:|---:|---:|---:|"]
def _nkey(p: dict):
v = (p.get("cost_per_lb") or {}).get("n")
return (v is None, v if v is not None else 0)
for p in sorted(products, key=_nkey): # cheapest N first, blanks last
a = p.get("analysis") or {}
cpl = p.get("cost_per_lb") or {}
grade = (a.get("grade") or "") + ("*" if a.get("grade_assumed") else "")
lines.append(
f"| {p.get('label') or p['item']} | {grade} | {_ton(p.get('price_cents_per_ton'))} | "
f"{_per_lb(cpl.get('n'))} | {_per_lb(cpl.get('p2o5'))} | {_per_lb(cpl.get('k2o'))} |"
)
period = next((p.get("period") for p in products if p.get("period")), None)
assumed = any((p.get("analysis") or {}).get("grade_assumed") for p in products)
foot = f"_Source: {src}"
if period:
foot += f" · as of {_ym(period)}"
foot += " · $/lb = $/ton ÷ (analysis% × 2000 lb)"
if assumed:
foot += " · *UAN grade assumed 32-0-0"
lines.append("\n" + foot + "_")
return "\n".join(lines) + "\n"
# ---------- futures quote + change ----------
def fmt_futures(payload: dict) -> str:
commodity = payload.get("commodity")
delivery = payload.get("delivery")
contract = payload.get("contract")
q = payload.get("quote")
scope = f"{commodity} {contract}" + (f" ({delivery})" if delivery else " (continuous nearby)")
if not q:
return f"### CBOT futures — {scope}\n\nNo futures quote on file yet for this contract.\n"
last = q.get("last_cents")
open_c = q.get("open_cents")
prev = q.get("prev_close_cents")
d_open = q.get("change_since_open_cents")
d_day = q.get("change_on_day_cents")
lines = [f"### CBOT futures — {scope}", ""]
lines.append(f"- **Last**: {_bu(last)} _(settle/last for {q.get('settle_date') or '?'})_")
if open_c is not None:
lines.append(f"- **Open**: {_bu(open_c)}")
if prev is not None:
lines.append(f"- **Prev close**: {_bu(prev)}")
if d_open is not None:
lines.append(f"- **Change since open**: {_delta_arrow(d_open)} {_basis(d_open)}")
else:
lines.append("- **Change since open**: — (no open captured yet)")
if d_day is not None:
lines.append(f"- **Change on day**: {_delta_arrow(d_day)} {_basis(d_day)}")
else:
lines.append("- **Change on day**: — (no prior settle yet)")
return "\n".join(lines) + "\n"
# ---------- inputs / fertilizer ----------
def fmt_inputs(payload: dict) -> str:
rows = payload.get("rows") or []
product = payload.get("product")
title = f"### {product.upper()} prices" if product else "### Fertilizer + lime prices"
if not rows:
scope = product or "any tracked input"
return f"{title}\n\nNo {scope} prices on file.\n"
lines = [
title, "",
"| Source | Product | Delivery | Price ($/ton) | Fetched |",
"|---|---|---|---:|---|",
]
for r in rows:
lines.append(
f"| {r['source_name']} | {r.get('display_name') or r['commodity']} | "
f"{r['delivery']} | {_ton(r.get('bid_cents'))} | {r.get('fetched_at') or '?'} |"
)
return "\n".join(lines) + "\n"
# ---------- latest snapshot ----------
def fmt_latest(payload: dict) -> str:
rows = payload.get("rows") or []
center = payload.get("center")
radius = payload.get("radius_miles")
if not rows:
head = "### Latest prices\n\n"
if center:
return head + f"No sources within {radius:.0f} mi of {_where(center)}.\n"
return head + "No rows match those filters.\n"
title = "### Latest prices"
if center:
title += f" — within {radius:.0f} mi of {_where(center)}"
dist_col = " Distance |" if center else ""
dist_sep = " ---: |" if center else ""
lines = [
title, "",
f"| Source | Commodity | Delivery | Bid | Basis | Futures |{dist_col} Fetched |",
f"|---|---|---|---:|---:|---|{dist_sep}---|",
]
for r in rows:
unit_fmt = _ton if r.get("commodity_kind") == "fertilizer" else _bu
dist_cell = f" {_mi(r.get('distance_miles'))} |" if center else ""
lines.append(
f"| {r['source_name']} | {r.get('display_name') or r['commodity']} | "
f"{r['delivery']} | {unit_fmt(r.get('bid_cents'))} | "
f"{_basis(r.get('basis_cents'))} | {r.get('futures_contract') or '—'} |"
f"{dist_cell} {r.get('fetched_at') or '?'} |"
)
return "\n".join(lines) + "\n"
# ---------- price history ----------
def fmt_history(payload: dict, max_rows: int = 60) -> str:
rows = payload.get("rows") or []
commodity = payload.get("commodity")
days = payload.get("days")
scope = commodity or "all crops"
if not rows:
return f"### Price history — {scope} ({days}d)\n\nNo samples in the window.\n"
# Per (source, commodity, delivery) trend annotation: first vs last sample.
series = _build_series(rows, default_commodity=commodity)
def _com(r: dict) -> str:
return r.get("commodity") or commodity or "?"
lines = [f"### Price history — {scope} — last {days} days", ""]
for (src, com, dlv), pts in sorted(series.items()):
if not pts:
continue
b_first, b_last = _first_last(pts, "bid_cents")
delta = (b_last - b_first) if (b_first is not None and b_last is not None) else None
arrow = _delta_arrow(delta)
bz_first, bz_last = _first_last(pts, "basis_cents")
basis_part = ""
if bz_first is not None:
basis_part = (f" · basis {_basis(bz_first)}{_basis(bz_last)} "
f"{_basis_move(bz_last - bz_first)}")
lines.append(
f"- **{src}** / {com} / {dlv}: {len(pts)} samples · "
f"{_bu(b_first)}{_bu(b_last)} {arrow} "
f"{_basis(delta) if delta is not None else ''}{basis_part}".rstrip()
)
# If the history is shallow include the raw rows too (helpful for charts).
if sum(len(p) for p in series.values()) <= max_rows:
lines.extend([
"",
"| Time | Source | Commodity | Delivery | Bid | Basis | Futures |",
"|---|---|---|---|---:|---:|---:|",
])
for r in rows[-max_rows:]:
lines.append(
f"| {r['fetched_at']} | {r['source_name']} | {_com(r)} | {r['delivery']} | "
f"{_bu(r.get('bid_cents'))} | {_basis(r.get('basis_cents'))} | "
f"{_bu(r.get('futures_cents'))} |"
)
return "\n".join(lines) + "\n"
# ---------- basis movement ----------
def fmt_basis_movement(payload: dict) -> str:
"""Aggregated basis trend per commodity (the cheap, headline view).
Rolls every matching (source, delivery) series up to one line per crop:
average basis first→last across the window and how far it moved. Skips
non-grain rows and series with no basis on file."""
rows = [r for r in (payload.get("rows") or []) if r.get("commodity") in GRAIN]
days = payload.get("days")
if not rows:
return f"### Basis movement — last {days} days\n\nNo grain basis samples in the window.\n"
series = _build_series(rows)
agg: dict[str, dict] = {}
for (src, com, _dlv), pts in series.items():
first, last = _first_last(pts, "basis_cents")
if first is None:
continue
a = agg.setdefault(com, {"firsts": [], "lasts": [], "elevators": set(), "series": 0})
a["firsts"].append(first)
a["lasts"].append(last)
a["elevators"].add(src)
a["series"] += 1
if not agg:
return f"### Basis movement — last {days} days\n\nNo basis data on the matching series.\n"
lines = [f"### Basis movement — last {days} days", ""]
for com in sorted(agg):
a = agg[com]
avg_first = round(sum(a["firsts"]) / len(a["firsts"]))
avg_last = round(sum(a["lasts"]) / len(a["lasts"]))
lines.append(
f"- **{com}**: avg basis {_basis(avg_first)}{_basis(avg_last)} "
f"{_basis_move(avg_last - avg_first)} · "
f"{len(a['elevators'])} elevators, {a['series']} series"
)
return "\n".join(lines) + "\n"
def fmt_basis_detail(payload: dict, max_rows: int = 80) -> str:
"""Per-(elevator, crop, delivery) basis trend — the drill-down view.
One row per series: basis first→last and how far it moved. Done MCP-side so
the caller gets a compact table instead of every raw sample."""
rows = [r for r in (payload.get("rows") or []) if r.get("commodity") in GRAIN]
days = payload.get("days")
if not rows:
return f"### Basis movement by elevator — last {days} days\n\nNo grain basis samples in the window.\n"
series = _build_series(rows)
body: list[str] = []
# Sort by commodity, then elevator, then delivery for stable readable output.
for (src, com, dlv), pts in sorted(series.items(), key=lambda kv: (kv[0][1], kv[0][0], kv[0][2])):
first, last = _first_last(pts, "basis_cents")
if first is None:
continue
body.append(
f"| {com} | {src} | {dlv} | {_basis(first)} | {_basis(last)} | "
f"{_basis_move(last - first)} | {len(pts)} |"
)
if len(body) >= max_rows:
break
if not body:
return f"### Basis movement by elevator — last {days} days\n\nNo basis data on the matching series.\n"
header = [
f"### Basis movement by elevator — last {days} days", "",
"| Commodity | Elevator | Delivery | First | Last | Move | Samples |",
"|---|---|---|---:|---:|---|---:|",
]
return "\n".join(header + body) + "\n"
# ---------- sources / health ----------
def _location(s: dict) -> str:
city, state = s.get("city"), s.get("state")
loc = ", ".join(p for p in (city, state) if p)
if s.get("zip"):
loc = f"{loc} {s['zip']}".strip()
return loc or "—"
def fmt_sources(payload: dict) -> str:
src = payload.get("sources") or []
if not src:
return "### Sources\n\nNo active sources.\n"
lines = [
"### Tracked sources", "",
"| Source | Kind | Location | Last success | Consecutive failures | Last error |",
"|---|---|---|---|---:|---|",
]
for s in src:
lines.append(
f"| {s['name']} | {s['kind']} | {_location(s)} | {s.get('last_success_at') or '—'} | "
f"{s.get('consecutive_failures') or 0} | {s.get('last_error') or ''} |"
)
return "\n".join(lines) + "\n"
def fmt_health(payload: dict) -> str:
src = payload.get("sources") or []
healthy, stale, down = [], [], []
for s in src:
n = s.get("consecutive_failures") or 0
if n >= 3:
down.append(s)
elif not s.get("last_success_at"):
stale.append(s)
else:
healthy.append(s)
lines = ["### Source health", ""]
lines.append(f"- ✅ Healthy: **{len(healthy)}**")
lines.append(f"- ⚠ Stale (never succeeded): **{len(stale)}**")
lines.append(f"- ❌ Down (3+ consecutive failures): **{len(down)}**")
if stale or down:
lines.append("")
lines.append("| Source | State | Last success | Failures | Error |")
lines.append("|---|---|---|---:|---|")
for s in stale + down:
state = "stale" if s in stale else "down"
lines.append(
f"| {s['name']} | {state} | {s.get('last_success_at') or '—'} | "
f"{s.get('consecutive_failures') or 0} | "
f"{(s.get('last_error') or '')[:80]} |"
)
return "\n".join(lines) + "\n"
# ---------- list helpers ----------
def fmt_deliveries(payload: dict) -> str:
commodity = payload.get("commodity")
labels = payload.get("deliveries") or []
if not labels:
return f"### {commodity} deliveries\n\nNo posted deliveries.\n"
return (
f"### Posted delivery labels for {commodity}\n\n"
+ "\n".join(f"- {x}" for x in labels)
+ "\n"
)
def fmt_commodities() -> str:
return (
"### Tracked commodities\n\n"
"- **corn** ($/bu)\n"
"- **soy** — soybeans ($/bu)\n"
"- **wheat** ($/bu) — tracked but excluded from daily brief emails\n"
"- **map** — MAP 11-52-0 ($/ton)\n"
"- **potash** — Potash 0-0-60 ($/ton)\n"
"- **lime** ($/ton)\n"
)
# ---------- today's summary ----------
def fmt_summary(payload: dict) -> str:
today = payload.get("today")
prev = payload.get("prev_trading_day")
lines = [f"### Market summary — today {today} vs prev trading day {prev}", ""]
for c in payload.get("commodities", []):
f = c.get("futures") or {}
chg = f.get("change_cents")
arrow = _delta_arrow(chg)
lines.append(
f"**{c['display_name']}** — CBOT {f.get('contract','?')} "
f"last {_bu(f.get('today_last_cents'))}, prev close "
f"{_bu(f.get('prev_close_cents'))} {arrow} {_basis(chg) if chg is not None else ''}"
)
bft = c.get("best_for_today")
if bft:
lines.append(
f" · Best for today's delivery ({bft['delivery']}): "
f"**{bft['source_name']}** @ {_bu(bft['bid_cents'])} "
f"(basis {_basis(bft.get('basis_cents'))})"
)
else:
lines.append(" · No current-month local bid posted.")
lines.append("")
return "\n".join(lines).rstrip() + "\n"