"""JSON-from-ag-monitor → markdown helpers. One function per @mcp.tool. Markdown is what FastMCP tools return; Claude / OpenWebUI render it nicely. Cents → dollars formatting is centralized here so all tools use the same precision (4 decimals for $/bu, 2 decimals for $/ton). """ from __future__ import annotations from typing import Optional def _bu(cents: Optional[int]) -> str: if cents is None: return "—" return f"${cents / 100:.4f}" def _ton(cents: Optional[int]) -> str: if cents is None: return "—" return f"${cents / 100:.2f}" def _per_lb(cents: Optional[int]) -> str: if cents is None: return "—" return f"${cents / 100:.2f}" def _basis(cents: Optional[int]) -> str: if cents is None: return "—" sign = "+" if cents >= 0 else "" return f"{sign}{cents / 100:.2f}" def _delta_arrow(cents: Optional[int]) -> str: if cents is None or cents == 0: return "—" return "▲" if cents > 0 else "▼" GRAIN = ("corn", "soy", "wheat") def _basis_move(delta_cents: Optional[int]) -> str: """Describe a basis change: positive = cash strengthened vs futures.""" if delta_cents is None: return "—" if delta_cents == 0: return "→ flat" arrow = "▲" if delta_cents > 0 else "▼" word = "stronger" if delta_cents > 0 else "weaker" return f"{arrow} {_basis(delta_cents)} ({word})" def _build_series(rows: list[dict], default_commodity: Optional[str] = None) -> dict: """Group history rows into ordered per-(source, commodity, delivery) lists. Rows arrive ordered by fetched_at ASC, so each series list stays in time order. `default_commodity` backfills rows that don't carry their own (older payload shape / single-commodity queries).""" series: dict[tuple, list[dict]] = {} for r in rows: com = r.get("commodity") or default_commodity or "?" series.setdefault((r.get("source_name"), com, r.get("delivery")), []).append(r) return series def _first_last(points: list[dict], field: str): """First and last non-null values of `field` across an ordered point list.""" vals = [p.get(field) for p in points if p.get(field) is not None] if not vals: return None, None return vals[0], vals[-1] # ---------- best_local_bid ---------- def _loc_suffix(d: dict) -> str: """' (City, ST)' when a row/result carries city/state, else ''.""" city, state = d.get("city"), d.get("state") if city and state: return f" ({city}, {state})" return f" ({state})" if state else "" def _mi(d) -> str: return "—" if d is None else f"{d:.1f} mi" def _where(center: dict) -> str: """Human label for a resolved center point.""" if center.get("source") == "zip": return f"ZIP {center.get('zip')}" near = f" (near {center.get('zip')})" if center.get("zip") else "" return f"{center['lat']:.4f}, {center['lng']:.4f}{near}" def fmt_best(commodity: str, payload: dict) -> str: best = payload.get("best") today = payload.get("today") center = payload.get("center") radius = payload.get("radius_miles") scope = f" within {radius:.0f} mi of {_where(center)}" if center else "" head = f"### Best place to sell {commodity} today ({today})\n\n" if not best: if center: out = head + f"No current-month {commodity} bids{scope}.\n" near = payload.get("nearest") if near: out += (f"\nNearest tracked elevator: **{near['source_name']}**" f"{_loc_suffix(near)} — about **{_mi(near.get('distance_miles'))}** " f"away, outside the {radius:.0f} mi radius.\n") return out return head + f"No current-month {commodity} bids posted across the tracked sources.\n" dist = best.get("distance_miles") dist_txt = f" — **{_mi(dist)}** away" if dist is not None else "" out = ( head + f"**{best['source_name']}**{_loc_suffix(best)}{dist_txt} — delivery " f"**{best['delivery']}** — bid **{_bu(best['bid_cents'])}/bu** " f"(basis {_basis(best.get('basis_cents'))}, " f"futures {best.get('futures_contract') or '—'})\n" ) if center: out += f"\n_Searched{scope}._\n" out += f"\n_Fetched {best.get('fetched_at') or '?'}_\n" return out # ---------- reference price trends (USDA NASS / EIA) ---------- def _unit_money(cents, unit: str) -> str: if cents is None: return "—" dollars = cents / 100 if unit == "$/gal": return f"${dollars:.3f}/gal" if unit == "$/bu": return f"${dollars:.2f}/bu" if unit == "$/ton": return f"${dollars:.2f}/ton" return f"${dollars:.2f}" def _signed(cents) -> str: if cents is None: return "—" return f"{'+' if cents >= 0 else '−'}${abs(cents)/100:.2f}" def _pct(p) -> str: return "—" if p is None else f"{'+' if p >= 0 else '−'}{abs(p):.1f}%" def _ym(period: str) -> str: # 2026-04-01 -> "Apr 2026"; weekly date -> as-is try: y, m, d = period.split("-") return f"{_MONTH_NAMES_3[int(m)-1]} {y}" if d == "01" else period except Exception: return period _MONTH_NAMES_3 = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] def fmt_price_trend(payload: dict, label: str = "price received") -> str: name = payload.get("commodity") or payload.get("item") geo = payload.get("geo") label = payload.get("label") or label unit = payload.get("unit") or "$/bu" t = payload.get("trend") loc = f" — {geo}" if geo else "" if not t: return f"### {name}{loc} — {label}\n\nNo data on file.\n" when = _ym(t["period"]) lines = [f"### {name}{loc} — {label}, {when}: {_unit_money(t['value_cents'], unit)}", ""] arrow = _delta_arrow(t.get("change_cents")) lines.append(f"- **Change:** {arrow} {_signed(t.get('change_cents'))} ({_pct(t.get('change_pct'))}) vs prior period") if t.get("yoy_cents") is not None: lines.append(f"- **Year-over-year:** {_signed(t.get('yoy_change_cents'))} ({_pct(t.get('yoy_pct'))})") s = t.get("seasonal") if s: lines.append( f"- **Seasonal:** {s['percentile']}th pct vs last {s['sample_years']} same-months " f"(normal {_unit_money(s['normal_cents'], unit)}, range " f"{_unit_money(s['min_cents'], unit)}–{_unit_money(s['max_cents'], unit)}) " f"· {_pct(s.get('vs_normal_pct'))} vs normal") lines.append(f"- **Recent direction:** {_delta_arrow({'up':1,'down':-1,'flat':0}[t['recent_direction']])} {t['recent_direction']}") lines.append(f"\n_Source: {payload.get('source') or 'USDA NASS'} · {t['points']} periods on file_") return "\n".join(lines) + "\n" def fmt_price_series(payload: dict, max_points: int = 60) -> str: name = payload.get("commodity") or payload.get("item") geo = payload.get("geo") unit = payload.get("unit") or "$/bu" series = payload.get("series") or [] loc = f" — {geo}" if geo else "" if not series: return f"### {name}{loc} series\n\nNo data on file.\n" shown = series[-max_points:] head = [f"### {name}{loc} — {payload.get('count', len(series))} periods " f"(showing last {len(shown)})", "", "| Period | Price |", "|---|---:|"] body = [f"| {_ym(p['period'])} | {_unit_money(p['value_cents'], unit)} |" for p in shown] return "\n".join(head + body) + "\n" _NUTRIENT_LABEL = {"n": "Nitrogen (N)", "p2o5": "Phosphate (P₂O₅)", "k2o": "Potash (K₂O)"} def fmt_nutrient_cost(payload: dict) -> str: """Cheapest fertilizer per pound of N / P2O5 / K2O for a region.""" geo = payload.get("geo") or "Cornbelt" src = payload.get("source") or "USDA AgTransport" products = payload.get("products") or [] cheapest = payload.get("cheapest") or {} if not products: return f"### Fertilizer value per nutrient — {geo}\n\nNo data on file.\n" by_item = {p["item"]: p for p in products} lines = [f"### Fertilizer value per pound of nutrient — {geo}", ""] for nut in ("n", "p2o5", "k2o"): it = cheapest.get(nut) prod = by_item.get(it or "") if prod is not None: c = (prod.get("cost_per_lb") or {}).get(nut) lines.append( f"- **Cheapest {_NUTRIENT_LABEL[nut]}:** " f"{prod.get('label') or it} at {_per_lb(c)}/lb" ) lines += ["", "| Product | Grade | $/ton | $/lb N | $/lb P₂O₅ | $/lb K₂O |", "|---|---|---:|---:|---:|---:|"] def _nkey(p: dict): v = (p.get("cost_per_lb") or {}).get("n") return (v is None, v if v is not None else 0) for p in sorted(products, key=_nkey): # cheapest N first, blanks last a = p.get("analysis") or {} cpl = p.get("cost_per_lb") or {} grade = (a.get("grade") or "") + ("*" if a.get("grade_assumed") else "") lines.append( f"| {p.get('label') or p['item']} | {grade} | {_ton(p.get('price_cents_per_ton'))} | " f"{_per_lb(cpl.get('n'))} | {_per_lb(cpl.get('p2o5'))} | {_per_lb(cpl.get('k2o'))} |" ) period = next((p.get("period") for p in products if p.get("period")), None) assumed = any((p.get("analysis") or {}).get("grade_assumed") for p in products) foot = f"_Source: {src}" if period: foot += f" · as of {_ym(period)}" foot += " · $/lb = $/ton ÷ (analysis% × 2000 lb)" if assumed: foot += " · *UAN grade assumed 32-0-0" lines.append("\n" + foot + "_") return "\n".join(lines) + "\n" # ---------- futures quote + change ---------- def fmt_futures(payload: dict) -> str: commodity = payload.get("commodity") delivery = payload.get("delivery") contract = payload.get("contract") q = payload.get("quote") scope = f"{commodity} {contract}" + (f" ({delivery})" if delivery else " (continuous nearby)") if not q: return f"### CBOT futures — {scope}\n\nNo futures quote on file yet for this contract.\n" last = q.get("last_cents") open_c = q.get("open_cents") prev = q.get("prev_close_cents") d_open = q.get("change_since_open_cents") d_day = q.get("change_on_day_cents") lines = [f"### CBOT futures — {scope}", ""] lines.append(f"- **Last**: {_bu(last)} _(settle/last for {q.get('settle_date') or '?'})_") if open_c is not None: lines.append(f"- **Open**: {_bu(open_c)}") if prev is not None: lines.append(f"- **Prev close**: {_bu(prev)}") if d_open is not None: lines.append(f"- **Change since open**: {_delta_arrow(d_open)} {_basis(d_open)}") else: lines.append("- **Change since open**: — (no open captured yet)") if d_day is not None: lines.append(f"- **Change on day**: {_delta_arrow(d_day)} {_basis(d_day)}") else: lines.append("- **Change on day**: — (no prior settle yet)") return "\n".join(lines) + "\n" # ---------- inputs / fertilizer ---------- def fmt_inputs(payload: dict) -> str: rows = payload.get("rows") or [] product = payload.get("product") title = f"### {product.upper()} prices" if product else "### Fertilizer + lime prices" if not rows: scope = product or "any tracked input" return f"{title}\n\nNo {scope} prices on file.\n" lines = [ title, "", "| Source | Product | Delivery | Price ($/ton) | Fetched |", "|---|---|---|---:|---|", ] for r in rows: lines.append( f"| {r['source_name']} | {r.get('display_name') or r['commodity']} | " f"{r['delivery']} | {_ton(r.get('bid_cents'))} | {r.get('fetched_at') or '?'} |" ) return "\n".join(lines) + "\n" # ---------- latest snapshot ---------- def fmt_latest(payload: dict) -> str: rows = payload.get("rows") or [] center = payload.get("center") radius = payload.get("radius_miles") if not rows: head = "### Latest prices\n\n" if center: return head + f"No sources within {radius:.0f} mi of {_where(center)}.\n" return head + "No rows match those filters.\n" title = "### Latest prices" if center: title += f" — within {radius:.0f} mi of {_where(center)}" dist_col = " Distance |" if center else "" dist_sep = " ---: |" if center else "" lines = [ title, "", f"| Source | Commodity | Delivery | Bid | Basis | Futures |{dist_col} Fetched |", f"|---|---|---|---:|---:|---|{dist_sep}---|", ] for r in rows: unit_fmt = _ton if r.get("commodity_kind") == "fertilizer" else _bu dist_cell = f" {_mi(r.get('distance_miles'))} |" if center else "" lines.append( f"| {r['source_name']} | {r.get('display_name') or r['commodity']} | " f"{r['delivery']} | {unit_fmt(r.get('bid_cents'))} | " f"{_basis(r.get('basis_cents'))} | {r.get('futures_contract') or '—'} |" f"{dist_cell} {r.get('fetched_at') or '?'} |" ) return "\n".join(lines) + "\n" # ---------- price history ---------- def fmt_history(payload: dict, max_rows: int = 60) -> str: rows = payload.get("rows") or [] commodity = payload.get("commodity") days = payload.get("days") scope = commodity or "all crops" if not rows: return f"### Price history — {scope} ({days}d)\n\nNo samples in the window.\n" # Per (source, commodity, delivery) trend annotation: first vs last sample. series = _build_series(rows, default_commodity=commodity) def _com(r: dict) -> str: return r.get("commodity") or commodity or "?" lines = [f"### Price history — {scope} — last {days} days", ""] for (src, com, dlv), pts in sorted(series.items()): if not pts: continue b_first, b_last = _first_last(pts, "bid_cents") delta = (b_last - b_first) if (b_first is not None and b_last is not None) else None arrow = _delta_arrow(delta) bz_first, bz_last = _first_last(pts, "basis_cents") basis_part = "" if bz_first is not None: basis_part = (f" · basis {_basis(bz_first)} → {_basis(bz_last)} " f"{_basis_move(bz_last - bz_first)}") lines.append( f"- **{src}** / {com} / {dlv}: {len(pts)} samples · " f"{_bu(b_first)} → {_bu(b_last)} {arrow} " f"{_basis(delta) if delta is not None else ''}{basis_part}".rstrip() ) # If the history is shallow include the raw rows too (helpful for charts). if sum(len(p) for p in series.values()) <= max_rows: lines.extend([ "", "| Time | Source | Commodity | Delivery | Bid | Basis | Futures |", "|---|---|---|---|---:|---:|---:|", ]) for r in rows[-max_rows:]: lines.append( f"| {r['fetched_at']} | {r['source_name']} | {_com(r)} | {r['delivery']} | " f"{_bu(r.get('bid_cents'))} | {_basis(r.get('basis_cents'))} | " f"{_bu(r.get('futures_cents'))} |" ) return "\n".join(lines) + "\n" # ---------- basis movement ---------- def fmt_basis_movement(payload: dict) -> str: """Aggregated basis trend per commodity (the cheap, headline view). Rolls every matching (source, delivery) series up to one line per crop: average basis first→last across the window and how far it moved. Skips non-grain rows and series with no basis on file.""" rows = [r for r in (payload.get("rows") or []) if r.get("commodity") in GRAIN] days = payload.get("days") if not rows: return f"### Basis movement — last {days} days\n\nNo grain basis samples in the window.\n" series = _build_series(rows) agg: dict[str, dict] = {} for (src, com, _dlv), pts in series.items(): first, last = _first_last(pts, "basis_cents") if first is None: continue a = agg.setdefault(com, {"firsts": [], "lasts": [], "elevators": set(), "series": 0}) a["firsts"].append(first) a["lasts"].append(last) a["elevators"].add(src) a["series"] += 1 if not agg: return f"### Basis movement — last {days} days\n\nNo basis data on the matching series.\n" lines = [f"### Basis movement — last {days} days", ""] for com in sorted(agg): a = agg[com] avg_first = round(sum(a["firsts"]) / len(a["firsts"])) avg_last = round(sum(a["lasts"]) / len(a["lasts"])) lines.append( f"- **{com}**: avg basis {_basis(avg_first)} → {_basis(avg_last)} " f"{_basis_move(avg_last - avg_first)} · " f"{len(a['elevators'])} elevators, {a['series']} series" ) return "\n".join(lines) + "\n" def fmt_basis_detail(payload: dict, max_rows: int = 80) -> str: """Per-(elevator, crop, delivery) basis trend — the drill-down view. One row per series: basis first→last and how far it moved. Done MCP-side so the caller gets a compact table instead of every raw sample.""" rows = [r for r in (payload.get("rows") or []) if r.get("commodity") in GRAIN] days = payload.get("days") if not rows: return f"### Basis movement by elevator — last {days} days\n\nNo grain basis samples in the window.\n" series = _build_series(rows) body: list[str] = [] # Sort by commodity, then elevator, then delivery for stable readable output. for (src, com, dlv), pts in sorted(series.items(), key=lambda kv: (kv[0][1], kv[0][0], kv[0][2])): first, last = _first_last(pts, "basis_cents") if first is None: continue body.append( f"| {com} | {src} | {dlv} | {_basis(first)} | {_basis(last)} | " f"{_basis_move(last - first)} | {len(pts)} |" ) if len(body) >= max_rows: break if not body: return f"### Basis movement by elevator — last {days} days\n\nNo basis data on the matching series.\n" header = [ f"### Basis movement by elevator — last {days} days", "", "| Commodity | Elevator | Delivery | First | Last | Move | Samples |", "|---|---|---|---:|---:|---|---:|", ] return "\n".join(header + body) + "\n" # ---------- sources / health ---------- def _location(s: dict) -> str: city, state = s.get("city"), s.get("state") loc = ", ".join(p for p in (city, state) if p) if s.get("zip"): loc = f"{loc} {s['zip']}".strip() return loc or "—" def fmt_sources(payload: dict) -> str: src = payload.get("sources") or [] if not src: return "### Sources\n\nNo active sources.\n" lines = [ "### Tracked sources", "", "| Source | Kind | Location | Last success | Consecutive failures | Last error |", "|---|---|---|---|---:|---|", ] for s in src: lines.append( f"| {s['name']} | {s['kind']} | {_location(s)} | {s.get('last_success_at') or '—'} | " f"{s.get('consecutive_failures') or 0} | {s.get('last_error') or ''} |" ) return "\n".join(lines) + "\n" def fmt_health(payload: dict) -> str: src = payload.get("sources") or [] healthy, stale, down = [], [], [] for s in src: n = s.get("consecutive_failures") or 0 if n >= 3: down.append(s) elif not s.get("last_success_at"): stale.append(s) else: healthy.append(s) lines = ["### Source health", ""] lines.append(f"- ✅ Healthy: **{len(healthy)}**") lines.append(f"- ⚠ Stale (never succeeded): **{len(stale)}**") lines.append(f"- ❌ Down (3+ consecutive failures): **{len(down)}**") if stale or down: lines.append("") lines.append("| Source | State | Last success | Failures | Error |") lines.append("|---|---|---|---:|---|") for s in stale + down: state = "stale" if s in stale else "down" lines.append( f"| {s['name']} | {state} | {s.get('last_success_at') or '—'} | " f"{s.get('consecutive_failures') or 0} | " f"{(s.get('last_error') or '')[:80]} |" ) return "\n".join(lines) + "\n" # ---------- list helpers ---------- def fmt_deliveries(payload: dict) -> str: commodity = payload.get("commodity") labels = payload.get("deliveries") or [] if not labels: return f"### {commodity} deliveries\n\nNo posted deliveries.\n" return ( f"### Posted delivery labels for {commodity}\n\n" + "\n".join(f"- {x}" for x in labels) + "\n" ) def fmt_commodities() -> str: return ( "### Tracked commodities\n\n" "- **corn** ($/bu)\n" "- **soy** — soybeans ($/bu)\n" "- **wheat** ($/bu) — tracked but excluded from daily brief emails\n" "- **map** — MAP 11-52-0 ($/ton)\n" "- **potash** — Potash 0-0-60 ($/ton)\n" "- **lime** ($/ton)\n" ) # ---------- today's summary ---------- def fmt_summary(payload: dict) -> str: today = payload.get("today") prev = payload.get("prev_trading_day") lines = [f"### Market summary — today {today} vs prev trading day {prev}", ""] for c in payload.get("commodities", []): f = c.get("futures") or {} chg = f.get("change_cents") arrow = _delta_arrow(chg) lines.append( f"**{c['display_name']}** — CBOT {f.get('contract','?')} " f"last {_bu(f.get('today_last_cents'))}, prev close " f"{_bu(f.get('prev_close_cents'))} {arrow} {_basis(chg) if chg is not None else ''}" ) bft = c.get("best_for_today") if bft: lines.append( f" · Best for today's delivery ({bft['delivery']}): " f"**{bft['source_name']}** @ {_bu(bft['bid_cents'])} " f"(basis {_basis(bft.get('basis_cents'))})" ) else: lines.append(" · No current-month local bid posted.") lines.append("") return "\n".join(lines).rstrip() + "\n"