Files
seed-mcp/rag/chunk.py
T
justin c737871c4c Trial-data scrapers: gh_plot_reports + agripro_trials + search_trials tool
This PR introduces TRIAL data — yield-performance results from real
field trials — as a SEPARATE data type alongside variety identity.
The two are complementary:

  search_docs  → "What's the disease resistance of DKC62-08RIB?"
                  (variety identity — what it IS)
  search_trials → "Which corn hybrid won the IA 2024 trials?"
                  (performance data — how it PERFORMED)

scrape/sources/gh_plot_reports.py — Golden Harvest plot reports
- 4,618 expected (2024+2025; 2023 deferred to a backfill pass).
- URL: /<crop>/plot-report/<state>/<year>/<plot_id>
- Cross-vendor: each plot lists products from multiple brands
  (NK / DEKALB / Golden Harvest / Enogen / Pioneer / Channel) side
  by side at one cooperator's field — the kind of independent
  comparison data Bayer doesn't publish itself.
- Generic per-column metrics dict (Yield/MST/Test Weight/$/Ac for
  corn+soy, Ton/Acre + Milk + Beef columns for silage).
- Politeness: 1 req/sec, retries on 429/5xx, no redirect-follow.

scrape/sources/agripro_trials.py — AgriPro regional trial PDFs
- 14 unique PDFs (38 sitemap links deduped) at /trials-data
- pdfplumber text extraction, region/year detection from filename
- Verbatim PDF text preserved in chunk body so variety + yield
  number adjacency drives retrieval (AP Iliad's Aberdeen ID yield
  matches a query about "AP Iliad Idaho yield")

rag/chunk.py — chunks_from_trial() dispatching by source
- Plot reports: identity preamble + Top-5 by primary metric + full
  ranking table. Metric labels chosen from the data (corn/soy use
  "Yield", silage uses "Ton/Acre").
- AgriPro PDFs: identity preamble + verbatim trial body inline so
  per-location yields surface for region+variety queries.
- Variety chunks get data_type="variety" metadata; trial chunks get
  data_type="trial". Single Chroma collection; the tool router
  filters by data_type rather than maintaining two collections.

rag/index.py — dispatch by sidecar's data_type field
rag/bm25.py — new filter columns (data_type, year, state)

docs_mcp/server.py — sixth MCP tool: search_trials(crop?, state?,
year?, product?, k=10)
- Filters trial chunks via where={"data_type": "trial", ...}
- Optional product substring post-filter for "DKC62-08RIB Iowa 2024"
  style searches
- search_docs now defaults to data_type="variety" so trial chunks
  don't bleed into variety identity queries
- Tool docstring routes the agent: "use lookup_variety to verify
  identity details on any trial winner you surface"

NK trial endpoint (/NKSeeds/wsProxy.asmx/GetPlotResult) is documented
as deferred — the ASMX-SOAP shape returned empty XML on initial
probe. Bayer per-variety yield data is not publicly indexed at all
— documented in the trial-scope note (DEKALB/Asgrow trial data flows
through Channel reps, not the web). AgRevival research books exist
as 10 large annual PDFs but are deferred (low ROI per parse).

Initial corpus shipped in this PR: 14 AgriPro trial PDFs. The 4,618
Golden Harvest plot reports are scraping in background and will be
added in a follow-up corpus-snapshot PR (~70 min ETA).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 15:19:03 -04:00

583 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Chunker for seed-variety corpus.
Each variety becomes ONE chunk by default. Variety pages are small
(typically 2-3 KB of useful signal) and nomic-embed-text handles up
to ~8 K tokens cleanly. Splitting a variety across chunks dilutes
the named-rating embeddings (e.g. "SCN resistance 7") that farmers
search by — keep them together.
The chunk text is a synthetic preamble assembled deterministically
from the sidecar JSON. Every value in the chunk text comes verbatim
from the source. The framing words ("Disease ratings (1-9, 9=best):",
"Maturity group:", etc.) are template glue — *we add structure, we
do NOT add facts*. Given the same sidecar, this chunker always
produces the same chunk text. That's the anti-hallucination
contract: the retriever can never surface a rating value that
wasn't in the source.
Metadata is flattened to Chroma-safe primitives (str/int/float/bool):
source "bayer_seeds"
source_key "dekalb-dkc075-70rib"
vendor "Bayer"
brand "DEKALB"
crop "corn" | "soybeans" | "wheat"
product_name "DKC075-70RIB BRAND BLEND"
product_id canonical full id
source_url the variety's page URL
rm corn RM as int when parseable (else absent)
mg soy MG as float when parseable (else absent)
release_year int when known
trait_codes_csv comma-separated trait codes for substring search
rating_scale "1-9 (9 = best)" — chunker should ALWAYS attach
this so downstream code can detect a flip
ordinal chunk index within variety (0-based)
Lists like ``regional_recommendations`` and the full per-rating dicts
do NOT fit Chroma's metadata constraints — they stay in the sidecar
JSON, surfaced by ``get_page`` / ``lookup_variety``.
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Iterator
# Rating-group classification. The source publishes characteristics
# grouped by label; we map those labels to one of three buckets in
# the chunk preamble so retrieval gets coherent text. Group labels not
# listed here fall into "other" and are still emitted, just in their
# own section.
DISEASE_GROUP_LABELS = {
"DISEASE RATINGS",
"PEST AND DISEASE RESISTANCE",
}
AGRONOMIC_GROUP_LABELS = {
"GROWTH",
"HARVEST",
"PRODUCTION",
"KEY CHARACTERISTICS",
"QUALITY",
}
MANAGEMENT_GROUP_LABELS = {
"MANAGEMENT",
"HERBICIDE",
"SENSITIVITY",
"PLANT DESCRIPTION",
}
def _parse_rm(value: object) -> int | None:
"""Best-effort RM-days int. Returns None if not a clean integer
(e.g. wheat's qualitative 'Early'/'Medium-Early' values)."""
if value is None:
return None
s = str(value).strip()
if not s:
return None
try:
# Handle floats stored as strings ("105.0") and the trailing
# tenths sometimes seen on early corn ("75").
return int(float(s))
except ValueError:
return None
def _parse_mg(value: object) -> float | None:
"""Best-effort MG float. Soy MGs go from 00 to 9.0 with one decimal."""
if value is None:
return None
s = str(value).strip()
if not s:
return None
try:
return float(s)
except ValueError:
return None
def _format_items(items: list[dict]) -> str:
"""Render `[{characteristic, value}, ...]` to a compact inline list."""
out: list[str] = []
for it in items:
ch = (it.get("characteristic") or "").strip()
v = (it.get("value") or "").strip()
if ch and v:
out.append(f"{ch} {v}")
elif ch:
out.append(f"{ch} —")
return ", ".join(out)
def _render_variety_chunk(sidecar: dict) -> str:
"""Build the dense preamble for one variety from its sidecar JSON.
Faithful to source: every numeric/categorical *value* is verbatim
from ``sidecar``. The only generated text is the framing language.
"""
lines: list[str] = []
# ---- Identity line --------------------------------------------------
name = sidecar.get("product_name") or sidecar.get("source_key") or ""
brand = (sidecar.get("brand") or "").strip()
vendor = sidecar.get("vendor") or ""
crop = (sidecar.get("crop") or "").strip()
crop_label = crop.capitalize() if crop else ""
ident = f"# {name}"
sub = " ".join(filter(None, [
f"({brand.title()} {crop_label} variety, {vendor})" if brand and crop_label and vendor else "",
]))
lines.append(ident)
if sub:
lines.append("")
lines.append(sub)
# ---- Identity body --------------------------------------------------
facts: list[str] = []
rm = sidecar.get("relative_maturity")
mg = sidecar.get("maturity_group")
wc = sidecar.get("wheat_class")
if crop == "corn" and rm:
facts.append(f"Relative maturity {rm}")
elif crop == "soybeans" and mg:
facts.append(f"Maturity group {mg}")
elif crop == "wheat":
if rm:
facts.append(f"Maturity {rm}")
if wc:
facts.append(f"Wheat class {wc}")
traits = sidecar.get("trait_stack") or []
trait_descs = sidecar.get("trait_descriptions") or []
if traits:
if trait_descs:
facts.append(
"Trait stack: "
+ ", ".join(traits)
+ " ("
+ "; ".join(trait_descs)
+ ")"
)
else:
facts.append("Trait stack: " + ", ".join(traits))
if sidecar.get("release_year"):
facts.append(f"Released {sidecar['release_year']}")
if facts:
lines.append("")
lines.append(". ".join(facts) + ".")
# ---- Positioning ----------------------------------------------------
pos = (sidecar.get("positioning_statement") or "").strip()
if pos:
lines.append("")
lines.append(f"Positioning: {pos}")
# ---- Ratings, bucketed for retrieval --------------------------------
scale = sidecar.get("_scale_direction") or "(scale direction not declared)"
groups = sidecar.get("characteristics_groups") or []
disease: list[dict] = []
agronomic: list[dict] = []
management: list[dict] = []
other: list[tuple[str, list[dict]]] = []
for g in groups:
label = (g.get("label") or "").upper().strip()
items = g.get("items") or []
if not items:
continue
if label in DISEASE_GROUP_LABELS:
disease.extend(items)
elif label in AGRONOMIC_GROUP_LABELS:
agronomic.extend(items)
elif label in MANAGEMENT_GROUP_LABELS:
management.extend(items)
else:
other.append((g.get("label") or "Other", items))
if disease:
lines.append("")
lines.append(f"Disease ratings ({scale}): {_format_items(disease)}.")
if agronomic:
lines.append("")
lines.append(f"Agronomic ratings ({scale}): {_format_items(agronomic)}.")
if management:
lines.append("")
lines.append(f"Management notes: {_format_items(management)}.")
for label, items in other:
lines.append("")
lines.append(f"{label.title()}: {_format_items(items)}.")
# ---- Strengths narrative --------------------------------------------
strengths = sidecar.get("strengths") or []
if strengths:
lines.append("")
lines.append("Strengths and management notes:")
for s in strengths:
s = (s or "").strip()
if s:
lines.append(f"- {s}")
# ---- Regional listings (compact, not the agronomist emails) ---------
rec = sidecar.get("regional_recommendations") or []
if rec:
names = sorted({
(r.get("product_list_name") or "").strip()
for r in rec
if (r.get("product_list_name") or "").strip()
})
if names:
lines.append("")
lines.append("Listed in regional seed guides: " + "; ".join(names) + ".")
# ---- Provenance footer (must always be in the chunk text so it
# can never be lost between retrieval and LLM rendering) --------
urls = sidecar.get("source_urls") or []
if urls:
lines.append("")
lines.append(f"Source: {urls[0]}")
return "\n".join(lines).strip() + "\n"
def _flat_metadata(sidecar: dict) -> dict:
"""Distil sidecar into Chroma-safe metadata (primitives only)."""
# Normalize brand to uppercase so cross-vendor filter matching is
# case-stable. Vendors are inconsistent (Bayer uses "DEKALB",
# Syngenta uses "Golden Harvest") and the filter shouldn't have to
# know which is which. The sidecar JSON keeps the original casing
# for display; only Chroma metadata is normalized.
md: dict = {
"source": sidecar.get("source") or "",
"source_key": sidecar.get("source_key") or "",
"data_type": "variety",
"vendor": sidecar.get("vendor") or "",
"brand": (sidecar.get("brand") or "").upper(),
"crop": (sidecar.get("crop") or "").lower(),
"product_name": sidecar.get("product_name") or "",
"product_id": sidecar.get("product_id") or "",
"source_url": (sidecar.get("source_urls") or [""])[0],
"rating_scale": sidecar.get("_scale_direction") or "",
}
rm = _parse_rm(sidecar.get("relative_maturity"))
mg = _parse_mg(sidecar.get("maturity_group"))
if rm is not None:
md["rm"] = rm
if mg is not None:
md["mg"] = mg
ry = sidecar.get("release_year")
if isinstance(ry, int):
md["release_year"] = ry
traits = sidecar.get("trait_stack") or []
if traits:
# Comma-delimited for partial-match / human eyeballing.
# Bracket-padded so `LIKE '%,XF,%'` finds whole tokens.
md["trait_codes_csv"] = "," + ",".join(traits) + ","
if sidecar.get("wheat_class"):
md["wheat_class"] = sidecar["wheat_class"]
return md
def chunks_from_variety(
sidecar_path: Path | str,
*,
md_path: Path | str | None = None,
) -> Iterator[dict]:
"""Yield chunk dict(s) for one variety. Currently emits exactly one.
Args:
sidecar_path: path to the variety's JSON sidecar.
md_path: ignored (the chunker rebuilds from sidecar); kept
in the signature in case a future split-chunker
wants the rendered body.
"""
sidecar = json.loads(Path(sidecar_path).read_text(encoding="utf-8"))
text = _render_variety_chunk(sidecar)
meta = _flat_metadata(sidecar)
chunk_id = f"{meta['source']}::{meta['source_key']}::0"
yield {
"id": chunk_id,
"text": text,
"metadata": {**meta, "ordinal": 0},
}
# ===========================================================================
# Trial chunker — for sidecars with data_type="trial"
# ===========================================================================
#
# Trial documents are a different shape from variety identity:
# - GH plot reports: per-site head-to-head yield comparison across brands
# - AgriPro trial PDFs: regional multi-year multi-location summary
#
# Both produce ONE chunk per document with a preamble that emphasizes
# the trial's location/year/top performers so the embedder gets clean
# signal for queries like "best corn for sandy soil Iowa 2024".
def _render_gh_plot_chunk(sidecar: dict) -> str:
"""Render a Golden Harvest plot report (per-site cross-vendor)."""
lines: list[str] = []
crop = (sidecar.get("crop") or "").lower()
crop_label = {"corn": "Corn", "soybeans": "Soybean", "silage": "Silage"}.get(crop, crop.title())
state = sidecar.get("state") or sidecar.get("state_abbrev") or ""
year = sidecar.get("year") or ""
cooperator = sidecar.get("cooperator") or ""
lines.append(f"# {crop_label} yield trial — {state}, {year}")
lines.append("")
facts = ["Golden Harvest plot report (cross-vendor)"]
if cooperator:
facts.append(f"cooperator {cooperator}")
if sidecar.get("planted_date"):
facts.append(f"planted {sidecar['planted_date']}")
if sidecar.get("harvested_date"):
facts.append(f"harvested {sidecar['harvested_date']}")
if sidecar.get("population_seeds_per_acre"):
facts.append(f"population {sidecar['population_seeds_per_acre']:,} seeds/acre")
if sidecar.get("row_width_in"):
facts.append(f"{sidecar['row_width_in']}\" rows")
lines.append(". ".join(facts) + ".")
lines.append("")
results = sidecar.get("results") or []
if results:
# Pick the primary metric for ranking: corn/soy use "Yield",
# silage uses "Ton/Acre". Find the first metric key with a
# numeric value in the top result.
def _primary(r: dict) -> tuple[str, float | None]:
metrics = r.get("metrics") or {}
# Back-compat: old sidecars had yield_bu_ac directly.
if not metrics and r.get("yield_bu_ac") is not None:
return ("Yield", r["yield_bu_ac"])
for k in ("Yield", "Ton/Acre", "Tons/Acre"):
v = metrics.get(k)
if isinstance(v, (int, float)):
return (k, v)
for k, v in metrics.items():
if isinstance(v, (int, float)):
return (k, v)
return ("", None)
top = results[: min(5, len(results))]
primary_label, _ = _primary(top[0]) if top else ("", None)
rendered_top_parts: list[str] = []
for i, r in enumerate(top):
label, val = _primary(r)
piece = f"#{r.get('rank') or i+1} {r.get('brand','?')} {r.get('product','?')}"
if r.get('traits'):
piece += f" {r['traits']}"
if val is not None:
piece += f" — {val} {label}"
rendered_top_parts.append(piece)
if rendered_top_parts:
lines.append(
f"Top {len(top)} ({crop_label}, {state} {year}): "
+ ", ".join(rendered_top_parts) + "."
)
lines.append("")
# Discover the metric column order from the first result with metrics.
metric_keys: list[str] = []
for r in results:
metrics = r.get("metrics") or {}
if metrics:
metric_keys = list(metrics.keys())
break
# Back-compat: synthesize from legacy fields if no metrics dict.
if not metric_keys and any(
r.get("yield_bu_ac") is not None for r in results
):
metric_keys = ["Yield", "%MST", "Test Weight", "Gross Revenue"]
# Full ranking — preserves every datapoint verbatim.
col_headers = ["rank", "brand", "product", "traits"] + metric_keys
lines.append("Full ranking (" + " | ".join(col_headers) + "):")
for r in results:
row = [
f"#{r.get('rank') or '-'}",
r.get("brand") or "-",
r.get("product") or "-",
r.get("traits") or "-",
]
metrics = r.get("metrics") or {}
# Back-compat shim
if not metrics:
metrics = {
"Yield": r.get("yield_bu_ac"),
"%MST": r.get("mst_pct"),
"Test Weight": r.get("test_weight"),
"Gross Revenue": r.get("gross_revenue_dol_ac"),
}
for k in metric_keys:
v = metrics.get(k)
if v is None:
row.append("-")
elif isinstance(v, (int, float)):
if "Revenue" in k or "$" in k:
row.append(f"${v:.2f}")
else:
row.append(str(v))
else:
row.append(str(v))
lines.append(" " + " | ".join(row))
lines.append("")
urls = sidecar.get("source_urls") or []
if urls:
lines.append(f"Source: {urls[0]}")
return "\n".join(lines).strip() + "\n"
def _render_agripro_trial_chunk(sidecar: dict) -> str:
"""Render an AgriPro regional trial PDF — preamble + verbatim text."""
lines: list[str] = []
title = sidecar.get("title") or sidecar.get("filename") or sidecar.get("source_key", "")
lines.append(f"# {title}")
lines.append("")
facts = ["AgriPro / Syngenta regional wheat trial"]
if sidecar.get("region"):
facts.append(f"region {sidecar['region']}")
if sidecar.get("wheat_class_section"):
facts.append(f"class {sidecar['wheat_class_section']}")
if sidecar.get("years_covered") and len(sidecar["years_covered"]) > 1:
yc = sidecar["years_covered"]
facts.append(f"years {yc[0]}{yc[-1]}")
elif sidecar.get("year"):
facts.append(f"year {sidecar['year']}")
lines.append(". ".join(facts) + ".")
lines.append("")
varieties = sidecar.get("varieties_found") or []
if varieties:
lines.append("Varieties listed: " + ", ".join(varieties) + ".")
lines.append("")
# Verbatim trial data — preserves variety + yield numbers adjacent
# so BM25/dense can match "AP Iliad Aberdeen Idaho" queries.
lines.append("Trial data (verbatim from PDF):")
lines.append("")
# The actual text was in the .md body but isn't in the sidecar
# JSON. We render a brief marker; full text goes in the .md file
# that get_page returns. For embedding signal, the title +
# varieties + region is usually enough.
# If we want the FULL text in the chunk we'd need to either store
# it in the sidecar OR read it from the .md path at chunk time.
# Read from the .md path:
return "\n".join(lines).strip() + "\n"
def _render_trial_chunk(sidecar: dict, md_text: str | None = None) -> str:
"""Dispatch to the right trial renderer by source. Includes the
verbatim trial body for sources whose value lives in the body text
(currently agripro_trials)."""
source = sidecar.get("source")
if source == "gh_plot_reports":
return _render_gh_plot_chunk(sidecar)
if source == "agripro_trials":
header = _render_agripro_trial_chunk(sidecar)
if md_text:
# Strip the markdown frontmatter so the body text is the
# actual trial data, not the per-source preamble.
body = md_text
sep = "## Trial data (verbatim from PDF)"
if sep in body:
body = body.split(sep, 1)[1].strip()
# Strip fence markers
body = re.sub(r"```", "", body).strip()
return header + "\n" + body + "\n"
return header
# Fallback: generic trial render
return _render_gh_plot_chunk(sidecar)
def _flat_trial_metadata(sidecar: dict) -> dict:
"""Chroma-safe metadata for trial chunks. Mirrors variety metadata
plus trial-specific facets (state, year, data_type)."""
md: dict = {
"source": sidecar.get("source") or "",
"source_key": sidecar.get("source_key") or "",
"data_type": sidecar.get("data_type") or "trial",
"vendor": sidecar.get("vendor") or "",
"brand": (sidecar.get("brand") or "").upper(),
"crop": (sidecar.get("crop") or "").lower(),
"source_url": (sidecar.get("source_urls") or [""])[0],
}
year = sidecar.get("year")
if isinstance(year, int):
md["year"] = year
state = sidecar.get("state_abbrev") or sidecar.get("state")
if state:
md["state"] = state.upper() if len(state) <= 3 else state
md["state_abbrev"] = (sidecar.get("state_abbrev") or "").upper()
if sidecar.get("region"):
md["region"] = sidecar["region"]
if sidecar.get("wheat_class_section"):
md["wheat_class"] = sidecar["wheat_class_section"]
if sidecar.get("plot_id"):
md["plot_id"] = sidecar["plot_id"]
if isinstance(sidecar.get("n_results"), int):
md["n_results"] = sidecar["n_results"]
return md
def chunks_from_trial(
sidecar_path: Path | str,
*,
md_path: Path | str | None = None,
) -> Iterator[dict]:
"""Yield chunk dict(s) for one trial document. Emits exactly one
chunk per trial.
Args:
sidecar_path: path to the trial's JSON sidecar.
md_path: path to the trial's markdown body (used for
AgriPro PDFs whose value lives in the verbatim
text). If omitted we infer it from sidecar_path.
"""
sc_path = Path(sidecar_path)
sidecar = json.loads(sc_path.read_text(encoding="utf-8"))
md_text: str | None = None
md_p = Path(md_path) if md_path else sc_path.with_suffix(".md")
if md_p.exists():
md_text = md_p.read_text(encoding="utf-8")
text = _render_trial_chunk(sidecar, md_text=md_text)
meta = _flat_trial_metadata(sidecar)
chunk_id = f"{meta['source']}::{meta['source_key']}::0"
yield {
"id": chunk_id,
"text": text,
"metadata": {**meta, "ordinal": 0},
}
# ----- Backwards-compat shim for the template's index.py -------------------
#
# The template's ``rag.index.page_records`` calls
# ``chunks_from_page(md, page_id, base_meta)`` which doesn't know about
# sidecar JSON. We accept that signature but ignore it — index.py has
# been updated to use ``chunks_from_variety`` directly, and this shim
# is here only so a stray import of the old name doesn't break.
#
def chunks_from_page(text: str, page_id: str, metadata: dict) -> Iterator[dict]:
"""Deprecated for seed-mcp; prefer ``chunks_from_variety``."""
# Best-effort: if metadata carries a sidecar_path, dispatch.
sidecar_path = metadata.get("_sidecar_path")
if sidecar_path:
yield from chunks_from_variety(sidecar_path)
return
# Fallback — emit a single chunk of the raw markdown with whatever
# metadata we have. Better than crashing if someone calls this.
chunk_id = f"{metadata.get('source','unknown')}::{page_id}::0"
yield {
"id": chunk_id,
"text": text,
"metadata": {**metadata, "ordinal": 0},
}