Trial-data scrapers: gh_plot_reports + agripro_trials + search_trials tool

This PR introduces TRIAL data — yield-performance results from real
field trials — as a SEPARATE data type alongside variety identity.
The two are complementary:

  search_docs  → "What's the disease resistance of DKC62-08RIB?"
                  (variety identity — what it IS)
  search_trials → "Which corn hybrid won the IA 2024 trials?"
                  (performance data — how it PERFORMED)

scrape/sources/gh_plot_reports.py — Golden Harvest plot reports
- 4,618 expected (2024+2025; 2023 deferred to a backfill pass).
- URL: /<crop>/plot-report/<state>/<year>/<plot_id>
- Cross-vendor: each plot lists products from multiple brands
  (NK / DEKALB / Golden Harvest / Enogen / Pioneer / Channel) side
  by side at one cooperator's field — the kind of independent
  comparison data Bayer doesn't publish itself.
- Generic per-column metrics dict (Yield/MST/Test Weight/$/Ac for
  corn+soy, Ton/Acre + Milk + Beef columns for silage).
- Politeness: 1 req/sec, retries on 429/5xx, no redirect-follow.

scrape/sources/agripro_trials.py — AgriPro regional trial PDFs
- 14 unique PDFs (38 sitemap links deduped) at /trials-data
- pdfplumber text extraction, region/year detection from filename
- Verbatim PDF text preserved in chunk body so variety + yield
  number adjacency drives retrieval (AP Iliad's Aberdeen ID yield
  matches a query about "AP Iliad Idaho yield")

rag/chunk.py — chunks_from_trial() dispatching by source
- Plot reports: identity preamble + Top-5 by primary metric + full
  ranking table. Metric labels chosen from the data (corn/soy use
  "Yield", silage uses "Ton/Acre").
- AgriPro PDFs: identity preamble + verbatim trial body inline so
  per-location yields surface for region+variety queries.
- Variety chunks get data_type="variety" metadata; trial chunks get
  data_type="trial". Single Chroma collection; the tool router
  filters by data_type rather than maintaining two collections.

rag/index.py — dispatch by sidecar's data_type field
rag/bm25.py — new filter columns (data_type, year, state)

docs_mcp/server.py — sixth MCP tool: search_trials(crop?, state?,
year?, product?, k=10)
- Filters trial chunks via where={"data_type": "trial", ...}
- Optional product substring post-filter for "DKC62-08RIB Iowa 2024"
  style searches
- search_docs now defaults to data_type="variety" so trial chunks
  don't bleed into variety identity queries
- Tool docstring routes the agent: "use lookup_variety to verify
  identity details on any trial winner you surface"

NK trial endpoint (/NKSeeds/wsProxy.asmx/GetPlotResult) is documented
as deferred — the ASMX-SOAP shape returned empty XML on initial
probe. Bayer per-variety yield data is not publicly indexed at all
— documented in the trial-scope note (DEKALB/Asgrow trial data flows
through Channel reps, not the web). AgRevival research books exist
as 10 large annual PDFs but are deferred (low ROI per parse).

Initial corpus shipped in this PR: 14 AgriPro trial PDFs. The 4,618
Golden Harvest plot reports are scraping in background and will be
added in a follow-up corpus-snapshot PR (~70 min ETA).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 15:19:03 -04:00
parent 7b3da908e0
commit c737871c4c
35 changed files with 3302 additions and 25 deletions
+483
View File
@@ -0,0 +1,483 @@
"""AgriPro trial-PDF scraper.
Source: ``agriprowheat.com/trials-data`` — a single page listing
~38 PDF links to regional wheat trial summary documents. Each PDF
is a multi-year multi-location performance test comparing AgriPro
varieties against competitors (LCS, Norwest, PNW, UI, etc.).
Discovery: walk ``/trials-data``, collect every ``href="*.pdf"``.
Per-PDF content (parsed via pdfplumber):
- First line: usually the title (e.g.
"2024 Pacific Northwest Combined Summary, Three-Year Data")
- A multi-column table with one row per variety. Columns vary by
PDF but typically include: 3-yr combined yield, 2-yr combined,
most-recent-year yield, plus per-location yields with location
names in the header.
- Footer notes: locations covered, LSD/CV statistical caveats,
copyright.
Trial PDFs are stable text-extractable (no charts). We capture the
full per-page text verbatim in the chunk body — preserving
variety-name + yield-number adjacency for the embedder — plus
metadata derived from the title (region, year, crop class). This is
a deliberate trade-off: perfect table parsing across the PDF
variants would be brittle; verbatim text preserves every data point
and the embedder + BM25 between them can match queries like
"AP Iliad yield Aberdeen Idaho" reliably.
Output:
corpus/agripro_trials/<source_key>.md
corpus/agripro_trials/<source_key>.json
source_key convention: ``agt-<slugified-filename-stem>`` lowercased,
e.g. ``agt-2024-pnw-combined``.
CLI:
python -m scrape.sources.agripro_trials --limit 5
python -m scrape.sources.agripro_trials --force
"""
from __future__ import annotations
import argparse
import io
import json
import logging
import os
import random
import re
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import requests
from bs4 import BeautifulSoup
import pdfplumber
SCRAPER_VERSION = "0.1.0"
USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)"
BASE = "https://agriprowheat.com"
LIST_URL = f"{BASE}/trials-data"
REPO_ROOT = Path(__file__).resolve().parents[2]
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
CORPUS_DIR = CORPUS_ROOT / "agripro_trials"
REQ_INTERVAL_SEC = 1.0
log = logging.getLogger("scrape.agripro_trials")
# Region name patterns we recognize in PDF filenames / titles. The
# value is a human-readable normalized region.
REGION_PATTERNS = (
(re.compile(r"\bPNW\b|Pacific Northwest", re.I), "Pacific Northwest"),
(re.compile(r"\bNE Colorado\b|Northeast Colorado", re.I), "NE Colorado"),
(re.compile(r"\bSC KS\b|South Central Kansas", re.I), "SC Kansas / N Central OK"),
(re.compile(r"\bWestern Plains\b", re.I), "Western Plains"),
(re.compile(r"\bCentral Plains\b", re.I), "Central Plains"),
(re.compile(r"\bPlains Irrigated\b", re.I), "Plains Irrigated"),
(re.compile(r"\bWashington[/:]?N? *Idaho\b", re.I), "WA / N. Idaho"),
(re.compile(r"\bSouthern Idaho\b", re.I), "Southern Idaho"),
(re.compile(r"\bMontana\b", re.I), "Montana"),
(re.compile(r"\bNP Perf Data\b|Northern Plains", re.I), "Northern Plains"),
(re.compile(r"\bWheat after Soy\b", re.I), "Wheat-after-Soy rotation"),
)
# --------------------------------------------------------------------- HTTP
class RateLimitedSession:
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
self.s = requests.Session()
self.s.headers["User-Agent"] = USER_AGENT
self.interval = interval
self._last = 0.0
def _wait(self) -> None:
delta = time.monotonic() - self._last
if delta < self.interval:
time.sleep(self.interval - delta)
self._last = time.monotonic()
def request(
self,
method: str,
url: str,
*,
max_retries: int = 4,
timeout: float = 60.0,
**kw: Any,
) -> requests.Response:
last_exc: Exception | None = None
for attempt in range(max_retries):
self._wait()
try:
resp = self.s.request(method, url, timeout=timeout, **kw)
except requests.RequestException as exc:
last_exc = exc
backoff = min(30.0, (2 ** attempt) + random.random())
log.warning("network error on %s %s: %s — retry in %.1fs",
method, url, exc, backoff)
time.sleep(backoff)
continue
if resp.status_code == 429 or 500 <= resp.status_code < 600:
ra = resp.headers.get("Retry-After")
backoff = float(ra) if (ra and ra.isdigit()) else min(30.0, (2 ** attempt) + random.random())
log.warning("HTTP %d on %s %s — retry in %.1fs",
resp.status_code, method, url, backoff)
time.sleep(backoff)
continue
return resp
if last_exc:
raise last_exc
return resp # type: ignore[return-value]
def get(self, url: str, **kw: Any) -> requests.Response:
return self.request("GET", url, **kw)
# --------------------------------------------------------------------- model
@dataclass
class TrialPDF:
source_key: str
source_url: str
pdf_url: str
filename: str
title: str | None = None
year: int | None = None
years_covered: list[int] = field(default_factory=list)
region: str | None = None
wheat_class_section: str | None = None # e.g. "Soft White Winter Wheat" — derived from PDF text
page_text: str = ""
varieties_found: list[str] = field(default_factory=list)
# --------------------------------------------------------------------- discovery
def discover_pdfs(http: RateLimitedSession) -> list[tuple[str, str, str, str]]:
"""Return ``[(pdf_url, filename, section_heading, section_anchor), ...]``
for every PDF on /trials-data.
De-duplicates by pdf_url — multiple section headings may link to
the same PDF (e.g. a multi-state summary).
"""
log.info("fetching trials index %s", LIST_URL)
r = http.get(LIST_URL)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
seen: dict[str, tuple[str, str, str, str]] = {}
for a in soup.find_all("a", href=re.compile(r"\.pdf(?:$|\?)", re.I)):
href = a["href"]
from urllib.parse import urljoin
full = urljoin(LIST_URL, href)
fn = href.rsplit("/", 1)[-1]
# Section context — closest preceding h2/h3/h4
section = ""
parent = a.parent
for _ in range(10):
if parent is None:
break
head = parent.find_previous(["h2", "h3", "h4"])
if head:
section = head.get_text(strip=True)
break
parent = parent.parent
if full not in seen:
seen[full] = (full, fn, section, href)
out = list(seen.values())
log.info("trial PDFs found: %d (deduped from %d total links)",
len(out),
sum(1 for a in soup.find_all("a", href=re.compile(r"\.pdf", re.I))))
return out
# --------------------------------------------------------------------- helpers
def source_key_for(filename: str) -> str:
"""``2024 PNW Combined.pdf`` → ``agt-2024-pnw-combined``."""
from urllib.parse import unquote
stem = unquote(filename).rsplit(".", 1)[0]
slug = re.sub(r"[^a-zA-Z0-9]+", "-", stem).strip("-").lower()
return f"agt-{slug}"
def _detect_region(text: str) -> str | None:
for pat, label in REGION_PATTERNS:
if pat.search(text):
return label
return None
def _detect_years(text: str) -> list[int]:
"""Return sorted years found in the PDF title / first lines.
Filters to 2010-2030 to ignore page numbers / table values."""
years = sorted({
int(y) for y in re.findall(r"\b(20[1-3]\d)\b", text[:600])
})
return years
def _detect_wheat_class_section(text: str) -> str | None:
"""The trial PDFs typically have a class label line like
'Soft White Winter Wheat' near the top of the table."""
for label in (
"Hard Red Winter Wheat", "Hard Red Spring Wheat",
"Hard White Spring Wheat", "Hard White Winter Wheat",
"Soft White Winter Wheat", "Soft White Spring Wheat",
"Soft Red Winter Wheat", "Durum",
):
if re.search(r"\b" + re.escape(label) + r"\b", text[:1500], re.I):
return label
return None
# Variety name patterns we expect to see in AgriPro trial PDFs.
# AgriPro varieties = AP <name>, SY <name>; competitors include
# LCS <name>, UI <name>, PNW <name>, Norwest <name>.
_VARIETY_LINE_RE = re.compile(
r"^(?:AP|SY|LCS|UI|PNW|Norwest|WB|Stine|Pioneer)\b[A-Za-z0-9 \-+]*",
)
def _detect_varieties(text: str) -> list[str]:
out: list[str] = []
seen: set[str] = set()
for line in text.splitlines():
line = line.strip()
if not line:
continue
m = _VARIETY_LINE_RE.match(line)
if m:
# Up to first run of digits / spaces — variety name only
name_match = re.match(r"^([A-Za-z][A-Za-z0-9 \-+]*?)\s+\d", line)
name = name_match.group(1).strip() if name_match else m.group(0).strip()
# Trim trailing single tokens that are clearly stats
if name and name not in seen and len(name) <= 40:
seen.add(name)
out.append(name)
return out
# --------------------------------------------------------------------- detail
def fetch_pdf_detail(
http: RateLimitedSession,
pdf_url: str,
filename: str,
) -> TrialPDF | None:
"""Download + parse one trial PDF."""
r = http.get(pdf_url)
if r.status_code == 404:
return None
r.raise_for_status()
try:
with pdfplumber.open(io.BytesIO(r.content)) as pdf:
pages_text = []
for p in pdf.pages:
t = p.extract_text() or ""
pages_text.append(t)
text = "\n\n".join(pages_text).strip()
except Exception as exc: # noqa: BLE001
log.warning("PDF parse failed for %s: %s", pdf_url, exc)
return None
title = ""
if text:
# First non-empty line is usually the title.
for line in text.splitlines():
line = line.strip()
if line:
title = line
break
region = _detect_region(filename) or _detect_region(title or "")
years = _detect_years(title + "\n" + filename)
wheat_class_section = _detect_wheat_class_section(text)
varieties = _detect_varieties(text)
return TrialPDF(
source_key=source_key_for(filename),
source_url=LIST_URL,
pdf_url=pdf_url,
filename=filename,
title=title or None,
year=years[-1] if years else None,
years_covered=years,
region=region,
wheat_class_section=wheat_class_section,
page_text=text,
varieties_found=varieties,
)
# --------------------------------------------------------------------- render
def render_markdown(p: TrialPDF) -> str:
head: list[str] = [
f"# {p.title or p.filename}",
"",
"- **Source:** AgriPro (Syngenta) regional trial PDF",
"- **Vendor:** Syngenta",
"- **Brand:** AgriPro",
"- **Crop:** Wheat",
"- **Data type:** trial",
]
if p.region:
head.append(f"- **Region:** {p.region}")
if p.wheat_class_section:
head.append(f"- **Wheat class:** {p.wheat_class_section}")
if p.year:
head.append(f"- **Year:** {p.year}")
if p.years_covered and len(p.years_covered) > 1:
head.append(f"- **Years covered:** {p.years_covered[0]}{p.years_covered[-1]}")
head.append(f"- **PDF:** {p.pdf_url}")
head.append(f"- **Index page:** {p.source_url}")
if p.varieties_found:
head.append(
f"- **Varieties listed:** {', '.join(p.varieties_found[:30])}"
+ ("" if len(p.varieties_found) > 30 else "")
)
head.append("")
head.append("---")
head.append("")
head.append("## Trial data (verbatim from PDF)")
head.append("")
head.append("```")
head.append(p.page_text)
head.append("```")
return "\n".join(head)
# --------------------------------------------------------------------- write
def write_pdf(prod: TrialPDF, body_md: str) -> None:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
md_path = CORPUS_DIR / f"{prod.source_key}.md"
json_path = CORPUS_DIR / f"{prod.source_key}.json"
md_path.write_text(body_md, encoding="utf-8")
sidecar = {
"source": "agripro_trials",
"source_key": prod.source_key,
"data_type": "trial",
"vendor": "Syngenta",
"brand": "AgriPro",
"crop": "wheat",
"title": prod.title,
"filename": prod.filename,
"region": prod.region,
"wheat_class_section": prod.wheat_class_section,
"year": prod.year,
"years_covered": prod.years_covered,
"varieties_found": prod.varieties_found,
"pdf_url": prod.pdf_url,
"source_urls": [prod.source_url, prod.pdf_url],
"page_text_chars": len(prod.page_text),
"fetched_at": datetime.now(timezone.utc).isoformat(),
"scraper_version": SCRAPER_VERSION,
}
json_path.write_text(
json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
# --------------------------------------------------------------------- pipeline
def process_pdf(
http: RateLimitedSession,
*,
pdf_url: str,
filename: str,
force: bool,
) -> tuple[str, TrialPDF | None]:
sk = source_key_for(filename)
md_path = CORPUS_DIR / f"{sk}.md"
if md_path.exists() and not force:
return "skipped", None
try:
prod = fetch_pdf_detail(http, pdf_url, filename)
except Exception as exc: # noqa: BLE001
log.error("PDF fetch/parse failed for %s: %s", pdf_url, exc)
return "failed", None
if prod is None:
return "missing", None
body = render_markdown(prod)
write_pdf(prod, body)
return "written", prod
def run(*, limit: int | None, force: bool) -> int:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
http = RateLimitedSession()
targets = discover_pdfs(http)
counts = {"written": 0, "skipped": 0, "missing": 0, "failed": 0}
processed = 0
for pdf_url, filename, _section, _href in targets:
if limit is not None and processed >= limit:
break
processed += 1
status, prod = process_pdf(
http, pdf_url=pdf_url, filename=filename, force=force,
)
counts[status] = counts.get(status, 0) + 1
log.info(
"[%d/%d] %s %s | region=%s year=%s varieties=%d chars=%d",
processed, len(targets),
source_key_for(filename), status,
(prod.region if prod else "-") or "-",
prod.year if prod else "-",
len(prod.varieties_found) if prod else 0,
len(prod.page_text) if prod else 0,
)
log.info(
"done: processed=%d written=%d skipped=%d missing=%d failed=%d (of %d PDFs)",
processed, counts["written"], counts["skipped"],
counts["missing"], counts["failed"], len(targets),
)
return 0 if counts["failed"] == 0 else 1
# --------------------------------------------------------------------- CLI
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="scrape.sources.agripro_trials",
description="Scrape AgriPro regional trial PDFs.",
)
p.add_argument("--limit", type=int, default=None,
help="Stop after processing N PDFs (default: all).")
p.add_argument("--force", action="store_true",
help="Re-fetch even if the markdown file already exists.")
p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"))
return p
def main(argv: list[str] | None = None) -> int:
args = _build_argparser().parse_args(argv)
logging.basicConfig(
level=args.log_level.upper(),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr,
)
return run(limit=args.limit, force=args.force)
if __name__ == "__main__":
sys.exit(main())
+781
View File
@@ -0,0 +1,781 @@
"""Golden Harvest plot-report scraper — cross-vendor yield trials.
This is the FIRST source in the seed-mcp corpus with ``data_type:
"trial"`` rather than the per-variety identity records all other
scrapers emit. Each document is one head-to-head yield trial at a
specific state/year/site, comparing products across brands (NK,
DEKALB, Golden Harvest, sometimes Pioneer/Channel etc. listed as
competitor entries) — i.e. **third-party-feeling cross-vendor data
that Bayer doesn't publish itself**.
Source: ``goldenharvestseeds.com`` — same site as ``golden_harvest``
variety scraper. ``/sitemap-ghs-hybrids.xml`` (already walked for
the variety scraper) lists 8,237 plot reports across:
Year Corn Soy Silage Total
2023 1,832 1,614 173 3,619
2024 1,432 1,277 137 2,846
2025 973 703 96 1,772
Initial scrape: 2024 + 2025 (4,618 reports). 2023 is older data
that's still informative but lower priority. Defer 2023 to a later
backfill pass via ``--include-2023``.
URL shape:
/<crop>/plot-report/<state-abbrev>/<year>/<plot-id>
e.g. /corn/plot-report/al/2023/2374765
Per-report data (server-rendered HTML):
- Cooperator name (h1 area)
- State (full name, e.g. "Alabama")
- Planted date / Harvested date
- Population (seeds/acre), Row Width
- One <table> with columns:
Rank | Brand | Product | Traits | Yield (BU/Acre) | %MST |
Test Weight | Gross Revenue | Entry #
Each row in the results table can be from any seed brand — the
trial is the test, not the catalog. Brand and product are the join
keys back to the per-variety corpus (lookup_variety can pull the
identity record if we have the same brand/product).
Output:
corpus/gh_plot_reports/<source_key>.md LLM-visible body
corpus/gh_plot_reports/<source_key>.json sidecar metadata
source_key convention: ``ghpr-<crop>-<state>-<year>-<plot_id>``
e.g. ``ghpr-corn-al-2023-2374765``.
CLI:
python -m scrape.sources.gh_plot_reports --limit 5
python -m scrape.sources.gh_plot_reports --crop corn --state ia --year 2024
python -m scrape.sources.gh_plot_reports --include-2023 --force
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import random
import re
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import requests
from bs4 import BeautifulSoup
SCRAPER_VERSION = "0.1.0"
USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)"
BASE = "https://www.goldenharvestseeds.com"
SITEMAP_HYBRIDS = f"{BASE}/sitemap-ghs-hybrids.xml"
REPO_ROOT = Path(__file__).resolve().parents[2]
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
CORPUS_DIR = CORPUS_ROOT / "gh_plot_reports"
REQ_INTERVAL_SEC = 1.0
log = logging.getLogger("scrape.gh_plot_reports")
# State name normalization: URL gives a 2-letter abbrev; sidecar keeps
# both forms so search filters can use either.
STATE_NAMES = {
"al": "Alabama", "ak": "Alaska", "az": "Arizona", "ar": "Arkansas",
"ca": "California", "co": "Colorado", "ct": "Connecticut",
"de": "Delaware", "fl": "Florida", "ga": "Georgia", "hi": "Hawaii",
"id": "Idaho", "il": "Illinois", "in": "Indiana", "ia": "Iowa",
"ks": "Kansas", "ky": "Kentucky", "la": "Louisiana", "me": "Maine",
"md": "Maryland", "ma": "Massachusetts", "mi": "Michigan",
"mn": "Minnesota", "ms": "Mississippi", "mo": "Missouri",
"mt": "Montana", "ne": "Nebraska", "nv": "Nevada", "nh": "New Hampshire",
"nj": "New Jersey", "nm": "New Mexico", "ny": "New York",
"nc": "North Carolina", "nd": "North Dakota", "oh": "Ohio",
"ok": "Oklahoma", "or": "Oregon", "pa": "Pennsylvania",
"ri": "Rhode Island", "sc": "South Carolina", "sd": "South Dakota",
"tn": "Tennessee", "tx": "Texas", "ut": "Utah", "vt": "Vermont",
"va": "Virginia", "wa": "Washington", "wv": "West Virginia",
"wi": "Wisconsin", "wy": "Wyoming",
}
# --------------------------------------------------------------------- HTTP
class RateLimitedSession:
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
self.s = requests.Session()
self.s.headers["User-Agent"] = USER_AGENT
self.interval = interval
self._last = 0.0
def _wait(self) -> None:
delta = time.monotonic() - self._last
if delta < self.interval:
time.sleep(self.interval - delta)
self._last = time.monotonic()
def request(
self,
method: str,
url: str,
*,
max_retries: int = 4,
timeout: float = 30.0,
**kw: Any,
) -> requests.Response:
last_exc: Exception | None = None
for attempt in range(max_retries):
self._wait()
try:
resp = self.s.request(method, url, timeout=timeout, **kw)
except requests.RequestException as exc:
last_exc = exc
backoff = min(30.0, (2 ** attempt) + random.random())
log.warning("network error on %s %s: %s — retry in %.1fs",
method, url, exc, backoff)
time.sleep(backoff)
continue
if resp.status_code == 429 or 500 <= resp.status_code < 600:
ra = resp.headers.get("Retry-After")
backoff = float(ra) if (ra and ra.isdigit()) else min(30.0, (2 ** attempt) + random.random())
log.warning("HTTP %d on %s %s — retry in %.1fs",
resp.status_code, method, url, backoff)
time.sleep(backoff)
continue
return resp
if last_exc:
raise last_exc
return resp # type: ignore[return-value]
def get(self, url: str, **kw: Any) -> requests.Response:
return self.request("GET", url, **kw)
# --------------------------------------------------------------------- model
@dataclass
class TrialResult:
rank: int | None = None
brand: str = ""
product: str = ""
traits: str = ""
# Generic per-column metrics — keyed by the header from the table
# (e.g. "Yield" / "%MST" / "Ton/Acre" / "Milk Per Acre" /
# "Beef Per Ton"). Corn + soy use Yield/MST/Test Weight/Gross
# Revenue; silage uses Ton/Acre + Milk + Beef columns. Storing as
# an open dict keeps the scraper robust across crop types.
metrics: dict[str, float | str | None] = field(default_factory=dict)
entry_num: int | None = None
# Convenience accessors — back-compat for the chunker that looks
# up these specific keys.
@property
def yield_bu_ac(self) -> float | None:
v = self.metrics.get("Yield")
return v if isinstance(v, (int, float)) else None
@property
def mst_pct(self) -> float | None:
v = self.metrics.get("%MST")
return v if isinstance(v, (int, float)) else None
@property
def test_weight(self) -> float | None:
v = self.metrics.get("Test Weight")
return v if isinstance(v, (int, float)) else None
@property
def gross_revenue_dol_ac(self) -> float | None:
v = self.metrics.get("Gross Revenue")
return v if isinstance(v, (int, float)) else None
@property
def primary_metric(self) -> tuple[str, float | None]:
"""The first numeric metric — used as the canonical 'yield'
for ranking in the chunk preamble. Corn/soy: Yield (BU/Ac).
Silage: Ton/Acre."""
for k in ("Yield", "Ton/Acre", "Tons/Acre"):
v = self.metrics.get(k)
if isinstance(v, (int, float)):
return (k, v)
# Fallback to first numeric metric
for k, v in self.metrics.items():
if isinstance(v, (int, float)):
return (k, v)
return ("", None)
@dataclass
class PlotReport:
source_key: str
source_url: str
crop: str # "corn" / "soybeans" / "silage"
state_abbrev: str # "al"
state_name: str # "Alabama"
year: int
plot_id: str
cooperator: str | None = None
planted_date: str | None = None # ISO date
harvested_date: str | None = None # ISO date
population: int | None = None
row_width: int | None = None
results: list[TrialResult] = field(default_factory=list)
# --------------------------------------------------------------------- discovery
_PLOT_URL_RE = re.compile(
r".*?/(?P<crop>corn|soybean|silage)/plot-report/"
r"(?P<state>[a-z]{2})/(?P<year>\d{4})/(?P<plot>\d+)"
)
def discover_plots(
http: RateLimitedSession,
*,
crops: set[str],
states: set[str] | None,
years: set[int],
) -> list[tuple[str, str, str, int, str]]:
"""Walk the hybrids sitemap and return matching plot URLs as
``[(url, crop, state, year, plot_id), ...]`` tuples. ``crop`` is
normalized to the schema's terms (soybean → soybeans)."""
log.info("fetching sitemap %s", SITEMAP_HYBRIDS)
r = http.get(SITEMAP_HYBRIDS)
r.raise_for_status()
entries = re.findall(r"<loc>([^<]+)</loc>", r.text)
log.info("sitemap parsed: %d total locs", len(entries))
out: list[tuple[str, str, str, int, str]] = []
for url in entries:
m = _PLOT_URL_RE.match(url)
if not m:
continue
crop_url = m.group("crop")
# Normalize "soybean" → "soybeans" to match the rest of the corpus.
crop = "soybeans" if crop_url == "soybean" else crop_url
state = m.group("state").lower()
year = int(m.group("year"))
plot = m.group("plot")
if crops and crop not in crops:
continue
if states and state not in states:
continue
if years and year not in years:
continue
out.append((url, crop, state, year, plot))
log.info("after filters: %d plot URLs", len(out))
return out
# --------------------------------------------------------------------- helpers
def source_key_for(crop: str, state: str, year: int, plot_id: str) -> str:
return f"ghpr-{crop}-{state}-{year}-{plot_id}"
def _parse_date_mdy(s: str) -> str | None:
"""``04/06/23`` → ``2023-04-06``. Two-digit years are assumed to
be 20xx (sane for current-century trial data)."""
s = (s or "").strip()
m = re.match(r"^(\d{1,2})/(\d{1,2})/(\d{2,4})$", s)
if not m:
return None
mo, dy, yr = m.group(1), m.group(2), m.group(3)
if len(yr) == 2:
yr = "20" + yr
try:
return f"{int(yr):04d}-{int(mo):02d}-{int(dy):02d}"
except ValueError:
return None
def _parse_int(s: str | None) -> int | None:
if not s:
return None
s = re.sub(r"[,$]", "", str(s).strip())
try:
return int(s)
except ValueError:
return None
def _parse_float(s: str | None) -> float | None:
if not s:
return None
s = re.sub(r"[,$]", "", str(s).strip())
try:
return float(s)
except ValueError:
return None
# --------------------------------------------------------------------- detail
def fetch_plot_detail(
http: RateLimitedSession,
url: str,
crop: str,
state: str,
year: int,
plot_id: str,
) -> PlotReport | None:
"""Fetch one plot-report page and parse it."""
r = http.get(url)
if r.status_code == 404:
return None
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
prod = PlotReport(
source_key=source_key_for(crop, state, year, plot_id),
source_url=url,
crop=crop,
state_abbrev=state,
state_name=STATE_NAMES.get(state, state.upper()),
year=year,
plot_id=plot_id,
)
# Pull metadata from the header area. The page renders cooperator
# name + state + key fields as text following the h1.
h1 = soup.find("h1")
if h1:
# Walk up to a parent that includes the metadata strip
container = h1.parent
while container is not None and not container.find("table"):
parent = container.parent
if parent is None:
break
container = parent
if container:
text = container.get_text(" | ", strip=True)
# Cooperator is usually the segment right after the H1.
# Pattern: "Corn Plot Results | <Name> | <State> | Planted: | ..."
parts = [p.strip() for p in text.split("|") if p.strip()]
# Drop the title segment
if parts and parts[0].lower().startswith(("corn plot", "soybean plot", "silage plot")):
parts = parts[1:]
if parts:
# First segment that doesn't match a state name is the cooperator
cand = parts[0]
if cand and cand != prod.state_name and not cand.endswith(":"):
prod.cooperator = cand
# Walk the page text for known labeled fields.
page_text = soup.get_text(" ", strip=True)
m = re.search(r"Planted:\s*(\d{1,2}/\d{1,2}/\d{2,4})", page_text)
if m:
prod.planted_date = _parse_date_mdy(m.group(1))
m = re.search(r"Harvested:\s*(\d{1,2}/\d{1,2}/\d{2,4})", page_text)
if m:
prod.harvested_date = _parse_date_mdy(m.group(1))
m = re.search(r"Population:\s*([\d,]+)", page_text)
if m:
prod.population = _parse_int(m.group(1))
m = re.search(r"Row Width:\s*(\d+)", page_text)
if m:
prod.row_width = _parse_int(m.group(1))
# Parse the results table. The HTML uses ONE merged cell for
# "Brand Product Traits" (despite the header containing all
# three labels); subsequent cells are Yield, %MST, Test Weight,
# Gross Revenue, Entry #. We split the merged cell using a
# known-brand prefix match.
table = soup.find("table")
if not table:
return prod
rows = table.find_all("tr")
if not rows:
return prod
header_cells = [c.get_text(" ", strip=True) for c in rows[0].find_all(["th", "td"])]
def col_idx(*names: str) -> int | None:
for n in names:
for i, h in enumerate(header_cells):
if n.lower() in h.lower():
return i
return None
# Position of the merged identity cell, by header containing "Brand".
i_identity = col_idx("Brand")
i_rank = col_idx("Rank")
i_entry = col_idx("Entry")
# Build a list of (header, index) for the OTHER columns (the
# metric columns). Skips Rank, Brand-merge-cell, and Entry #.
metric_columns: list[tuple[str, int]] = []
skip_idx = {i_identity, i_rank, i_entry}
for i, h in enumerate(header_cells):
if i in skip_idx:
continue
h_clean = h.strip()
if h_clean:
metric_columns.append((h_clean, i))
for row in rows[1:]:
cells = [c.get_text(" ", strip=True) for c in row.find_all(["td", "th"])]
if len(cells) < 2:
continue
def cell(i: int | None) -> str:
return cells[i] if i is not None and 0 <= i < len(cells) else ""
identity = cell(i_identity).strip()
if any(k in identity.lower() for k in ("plot average", "trial average", "average")):
continue
brand, product, traits = _split_identity(identity)
# Collect every metric column verbatim. Numeric where parseable,
# else preserve the raw string (e.g. "ns" for not-significant).
metrics: dict[str, float | str | None] = {}
for h, idx in metric_columns:
raw = cell(idx).strip()
if not raw or raw == "-":
metrics[h] = None
else:
f = _parse_float(raw)
metrics[h] = f if f is not None else raw
result = TrialResult(
rank=_parse_int(cell(i_rank)),
brand=brand,
product=product,
traits=traits,
metrics=metrics,
entry_num=_parse_int(cell(i_entry)),
)
has_data = result.brand or result.product or any(
v is not None for v in metrics.values()
)
if has_data:
prod.results.append(result)
return prod
# Known seed brands that can appear in plot-report identity cells.
# Sorted longest-first so multi-word brands match before sub-strings.
_BRAND_NAMES = (
"Golden Harvest", "WestBred", "AgriPro", "DEKALB", "Pioneer",
"Channel", "Asgrow", "NK", "Becks", "Beck's", "Brevant",
"Stine", "Renk", "Wyffels", "LG Seeds", "Croplan", "FS",
"Local Choice", "Mycogen", "AgriGold", "Hoegemeyer",
)
_BRAND_RE = re.compile(
r"^(?:" + "|".join(re.escape(b) for b in _BRAND_NAMES) + r")\b",
re.I,
)
def _split_identity(identity: str) -> tuple[str, str, str]:
"""Split a plot-report identity cell into ``(brand, product, traits)``.
The HTML emits one merged cell like "NK NK1748-3110 Agrisure ®"
or "Golden Harvest G16Q82-DV DuracadeViptera™" or just
"DEKALB DKC65-20". We:
1. Match the brand against a known-brand list at the start.
2. The token immediately after the brand is the product.
3. Anything remaining is the trait stack (free text).
"""
if not identity:
return "", "", ""
s = identity.strip()
m = _BRAND_RE.match(s)
if not m:
# Unknown brand prefix — best-effort: first token is brand,
# second is product, rest is traits.
parts = s.split(maxsplit=2)
if len(parts) == 1:
return parts[0], "", ""
if len(parts) == 2:
return parts[0], parts[1], ""
return parts[0], parts[1], parts[2]
brand = m.group(0)
rest = s[len(brand):].strip()
parts = rest.split(maxsplit=1)
product = parts[0] if parts else ""
traits = parts[1].strip() if len(parts) > 1 else ""
return brand, product, traits
# --------------------------------------------------------------------- render
def render_markdown(p: PlotReport) -> str:
crop_label = {
"corn": "Corn", "soybeans": "Soybean", "silage": "Silage",
}.get(p.crop, p.crop.title())
head: list[str] = [
f"# {crop_label} yield trial — {p.state_name}, {p.year}",
"",
f"- **Source:** Golden Harvest plot report (cross-vendor head-to-head)",
f"- **Crop:** {crop_label}",
f"- **State:** {p.state_name} ({p.state_abbrev.upper()})",
f"- **Year:** {p.year}",
f"- **Plot ID:** {p.plot_id}",
]
if p.cooperator:
head.append(f"- **Cooperator:** {p.cooperator}")
if p.planted_date:
head.append(f"- **Planted:** {p.planted_date}")
if p.harvested_date:
head.append(f"- **Harvested:** {p.harvested_date}")
if p.population:
head.append(f"- **Population:** {p.population:,} seeds/acre")
if p.row_width:
head.append(f"- **Row width:** {p.row_width}\"")
head.append(f"- **URL:** {p.source_url}")
head.append("")
head.append("---")
head.append("")
sections: list[str] = []
if p.results:
# Discover all metric columns present across results, in
# first-seen order. This keeps corn (Yield/MST/...) and silage
# (Ton/Acre/Milk/Beef) using their own header sets.
metric_keys: list[str] = []
seen_keys: set[str] = set()
for r in p.results:
for k in r.metrics.keys():
if k not in seen_keys:
seen_keys.add(k)
metric_keys.append(k)
sections.append("## Results (top-down by rank)")
sections.append("")
header_cells = ["Rank", "Brand", "Product", "Traits"] + metric_keys
sections.append("| " + " | ".join(header_cells) + " |")
sections.append("|" + "|".join(["---"] * len(header_cells)) + "|")
for r in p.results:
row = [
str(r.rank) if r.rank is not None else "-",
r.brand or "-",
r.product or "-",
r.traits or "-",
]
for k in metric_keys:
v = r.metrics.get(k)
if v is None:
row.append("-")
elif isinstance(v, (int, float)):
# Dollar columns rendered with $ prefix
if "Revenue" in k or "$" in k:
row.append(f"${v:.2f}")
else:
row.append(str(v))
else:
row.append(str(v))
sections.append("| " + " | ".join(row) + " |")
sections.append("")
# Compact text summary for embedder signal — uses the primary
# metric (Yield for corn/soy, Ton/Acre for silage).
top = p.results[: min(5, len(p.results))]
if top:
primary_label, _ = top[0].primary_metric
if primary_label:
summary = ", ".join(
f"{r.product or '?'} ({r.brand or '?'}) {r.primary_metric[1]}"
for r in top
if r.primary_metric[1] is not None
)
if summary:
sections.append(f"Top {len(top)} by {primary_label}: {summary}.")
sections.append("")
return "\n".join(head) + "\n".join(sections)
# --------------------------------------------------------------------- write
def write_plot(prod: PlotReport, body_md: str) -> None:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
md_path = CORPUS_DIR / f"{prod.source_key}.md"
json_path = CORPUS_DIR / f"{prod.source_key}.json"
md_path.write_text(body_md, encoding="utf-8")
sidecar = {
"source": "gh_plot_reports",
"source_key": prod.source_key,
"data_type": "trial",
"vendor": "Syngenta", # Golden Harvest publishes the trial
"brand": "Golden Harvest",
"crop": prod.crop,
"state": prod.state_name,
"state_abbrev": prod.state_abbrev,
"year": prod.year,
"plot_id": prod.plot_id,
"cooperator": prod.cooperator,
"planted_date": prod.planted_date,
"harvested_date": prod.harvested_date,
"population_seeds_per_acre": prod.population,
"row_width_in": prod.row_width,
"results": [
{
"rank": r.rank,
"brand": r.brand,
"product": r.product,
"traits": r.traits,
# All per-column metrics verbatim. Corn/soy: Yield,
# %MST, Test Weight, Gross Revenue. Silage: Ton/Acre,
# Milk Per Acre, Milk Per Ton, Beef Per Acre, Beef Per
# Ton. (Plus any other column the source publishes.)
"metrics": r.metrics,
"entry_num": r.entry_num,
}
for r in prod.results
],
"n_results": len(prod.results),
"source_urls": [prod.source_url],
"fetched_at": datetime.now(timezone.utc).isoformat(),
"scraper_version": SCRAPER_VERSION,
}
json_path.write_text(
json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
# --------------------------------------------------------------------- pipeline
def process_plot(
http: RateLimitedSession,
*,
url: str,
crop: str,
state: str,
year: int,
plot_id: str,
force: bool,
) -> tuple[str, PlotReport | None]:
sk = source_key_for(crop, state, year, plot_id)
md_path = CORPUS_DIR / f"{sk}.md"
if md_path.exists() and not force:
return "skipped", None
try:
prod = fetch_plot_detail(http, url, crop, state, year, plot_id)
except Exception as exc: # noqa: BLE001
log.error("detail fetch failed for %s: %s", url, exc)
return "failed", None
if prod is None:
return "missing", None
body = render_markdown(prod)
write_plot(prod, body)
return "written", prod
def run(
*,
limit: int | None,
force: bool,
only_crop: str | None,
only_state: str | None,
only_year: int | None,
include_2023: bool,
) -> int:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
http = RateLimitedSession()
crops = {only_crop} if only_crop else {"corn", "soybeans", "silage"}
states = {only_state} if only_state else None
if only_year:
years = {only_year}
elif include_2023:
years = {2023, 2024, 2025}
else:
years = {2024, 2025}
targets = discover_plots(http, crops=crops, states=states, years=years)
counts = {"written": 0, "skipped": 0, "missing": 0, "failed": 0}
processed = 0
for url, crop, state, year, plot_id in targets:
if limit is not None and processed >= limit:
break
processed += 1
status, prod = process_plot(
http, url=url, crop=crop, state=state, year=year,
plot_id=plot_id, force=force,
)
counts[status] = counts.get(status, 0) + 1
if prod is not None and processed <= 5 or processed % 100 == 0:
log.info(
"[%d/%s] %s %s | results=%d coop=%s",
processed, str(limit) if limit else len(targets),
source_key_for(crop, state, year, plot_id), status,
len(prod.results) if prod else 0,
(prod.cooperator if prod else "-") or "-",
)
log.info(
"done: processed=%d written=%d skipped=%d missing=%d failed=%d (of %d candidates)",
processed, counts["written"], counts["skipped"],
counts["missing"], counts["failed"], len(targets),
)
return 0 if counts["failed"] == 0 else 1
# --------------------------------------------------------------------- CLI
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="scrape.sources.gh_plot_reports",
description="Scrape Golden Harvest cross-vendor plot reports (yield trials).",
)
p.add_argument("--limit", type=int, default=None,
help="Stop after processing N plots (default: all).")
p.add_argument("--force", action="store_true",
help="Re-fetch even if the markdown file already exists.")
p.add_argument("--crop", default=None,
choices=("corn", "soybeans", "silage"),
help="Limit to one crop.")
p.add_argument("--state", default=None,
help="Limit to one state (2-letter abbrev: ia, il, ne, ...).")
p.add_argument("--year", type=int, default=None, choices=(2023, 2024, 2025),
help="Limit to one year.")
p.add_argument("--include-2023", action="store_true",
help="Include 2023 plot reports (default: 2024-2025 only).")
p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"))
return p
def main(argv: list[str] | None = None) -> int:
args = _build_argparser().parse_args(argv)
logging.basicConfig(
level=args.log_level.upper(),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr,
)
return run(
limit=args.limit,
force=args.force,
only_crop=args.crop,
only_state=args.state.lower() if args.state else None,
only_year=args.year,
include_2023=args.include_2023,
)
if __name__ == "__main__":
sys.exit(main())