Files
justin 0e625553e5 gh_plot_reports corpus (4,299 plots) + concurrency + 4-GPU pool
CORPUS — 4,299 GH plot reports added (3,797 written + 502 from the
earlier slow run + 319 sitemap-listed URLs that 404'd as
discontinued). Combined with prior 760 varieties + 14 AgriPro
trials = 5,073 total chunks now indexed.

scrape/sources/gh_plot_reports.py — concurrency speedup:
- 4 worker threads (ThreadPoolExecutor), each with its own
  requests.Session for connection-pool efficiency.
- Shared class-level rate limiter (0.25 sec between ANY two
  requests across all threads). Net throughput ~4 req/sec —
  well below any rate-limit threshold a public site enforces.
- Diagnosis vs original 1 req/sec: GH had ZERO rate limiting,
  zero 429s, zero retries. The 1 sec self-throttle was just too
  conservative. Bench:
    1 worker  / 1.0 sec throttle:  ~0.4 plots/sec (190 min ETA)
    4 workers / 0.25 sec throttle: ~3 plots/sec  (~25 min actual)

rag/chunk.py — chunk size cap for nomic-embed-text's 2048-token
context window:
- Empirically tested: failure threshold is ~5,250 chars on
  numeric-heavy trial chunks (chars/token ratio 2.4 vs 3.5 for
  prose). Cap at 4,500 chars to be safely under at worst-case
  2.2 chars/token.
- Applied to BOTH variety and trial chunks. Marked truncated
  chunks with metadata.embed_truncated = True; FULL text stays
  in the on-disk .md for get_page to return verbatim.

.gitea/workflows/{refresh,image-only}.yml — OLLAMA_URL pool
restructured for the 4 GPU-pinned endpoints. Bench (50-chunk
batches on nomic-embed-text):

    .0.125:11434  (RTX 40-series)  242 embeds/sec  ← weight ×4
    .0.2:11436    (GPU-pinned)     108 embeds/sec  ← weight ×2
    .0.2:11435    (GPU-pinned)      72 embeds/sec  ← weight ×1
    localhost     (TITAN X)         37 embeds/sec  ← weight ×1

Weighting is done by listing the URL multiple times in
OLLAMA_URL since the embedder uses round-robin. .0.2:11434 is
explicitly EXCLUDED — it isn't pinned to a specific GPU.

Combined index rebuild for 5,073 chunks now finishes in ~3 min
(was 19+ on the single-endpoint pool).

Smoke tests:
✓ list_versions: 5,073 docs across 6 sources, 2 vendors, 6
  brands, 4 crops (corn 2711, soy 2016, silage 223, wheat 123).
✓ search_trials({crop=corn, state=IA, year=2024}): 3 IA 2024
  corn trials surfaced.
✓ search_trials("Phytophthora resistance soybean trial"): NK
  NK43-W1XFS top-1 in LA 2024 trial (cross-vendor result).
✓ search_trials("AP Iliad Idaho wheat"): AgriPro Washington/N
  Idaho 2025 trial surfaced.
✓ search_trials(product=DKC65-95): 3 corn trials containing
  that hybrid in IL/IA 2024.
✓ search_trials(product=NK1701): 3 corn trials in AR/MS 2024.
✓ Product filter correctly returns EMPTY for products that
  aren't in the corpus (DKC65-20 is a 2023 product; 2023 plots
  deferred). Anti-hallucination contract preserved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 16:46:35 -04:00

848 lines
30 KiB
Python

"""Golden Harvest plot-report scraper — cross-vendor yield trials.
This is the FIRST source in the seed-mcp corpus with ``data_type:
"trial"`` rather than the per-variety identity records all other
scrapers emit. Each document is one head-to-head yield trial at a
specific state/year/site, comparing products across brands (NK,
DEKALB, Golden Harvest, sometimes Pioneer/Channel etc. listed as
competitor entries) — i.e. **third-party-feeling cross-vendor data
that Bayer doesn't publish itself**.
Source: ``goldenharvestseeds.com`` — same site as ``golden_harvest``
variety scraper. ``/sitemap-ghs-hybrids.xml`` (already walked for
the variety scraper) lists 8,237 plot reports across:
Year Corn Soy Silage Total
2023 1,832 1,614 173 3,619
2024 1,432 1,277 137 2,846
2025 973 703 96 1,772
Initial scrape: 2024 + 2025 (4,618 reports). 2023 is older data
that's still informative but lower priority. Defer 2023 to a later
backfill pass via ``--include-2023``.
URL shape:
/<crop>/plot-report/<state-abbrev>/<year>/<plot-id>
e.g. /corn/plot-report/al/2023/2374765
Per-report data (server-rendered HTML):
- Cooperator name (h1 area)
- State (full name, e.g. "Alabama")
- Planted date / Harvested date
- Population (seeds/acre), Row Width
- One <table> with columns:
Rank | Brand | Product | Traits | Yield (BU/Acre) | %MST |
Test Weight | Gross Revenue | Entry #
Each row in the results table can be from any seed brand — the
trial is the test, not the catalog. Brand and product are the join
keys back to the per-variety corpus (lookup_variety can pull the
identity record if we have the same brand/product).
Output:
corpus/gh_plot_reports/<source_key>.md LLM-visible body
corpus/gh_plot_reports/<source_key>.json sidecar metadata
source_key convention: ``ghpr-<crop>-<state>-<year>-<plot_id>``
e.g. ``ghpr-corn-al-2023-2374765``.
CLI:
python -m scrape.sources.gh_plot_reports --limit 5
python -m scrape.sources.gh_plot_reports --crop corn --state ia --year 2024
python -m scrape.sources.gh_plot_reports --include-2023 --force
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import random
import re
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import requests
from bs4 import BeautifulSoup
SCRAPER_VERSION = "0.1.0"
USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)"
BASE = "https://www.goldenharvestseeds.com"
SITEMAP_HYBRIDS = f"{BASE}/sitemap-ghs-hybrids.xml"
REPO_ROOT = Path(__file__).resolve().parents[2]
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
CORPUS_DIR = CORPUS_ROOT / "gh_plot_reports"
# 0.25 sec between any two requests in the WHOLE process (shared
# across worker threads). The site's pages take ~1.5 sec to serve on
# their own; combined with concurrent workers this gives ~4 req/sec
# net — polite by any normal standard, and the GH plot-reports
# scrape on 4,600 docs finishes in ~20 min instead of ~3 hours.
REQ_INTERVAL_SEC = 0.25
DEFAULT_WORKERS = 4
log = logging.getLogger("scrape.gh_plot_reports")
# State name normalization: URL gives a 2-letter abbrev; sidecar keeps
# both forms so search filters can use either.
STATE_NAMES = {
"al": "Alabama", "ak": "Alaska", "az": "Arizona", "ar": "Arkansas",
"ca": "California", "co": "Colorado", "ct": "Connecticut",
"de": "Delaware", "fl": "Florida", "ga": "Georgia", "hi": "Hawaii",
"id": "Idaho", "il": "Illinois", "in": "Indiana", "ia": "Iowa",
"ks": "Kansas", "ky": "Kentucky", "la": "Louisiana", "me": "Maine",
"md": "Maryland", "ma": "Massachusetts", "mi": "Michigan",
"mn": "Minnesota", "ms": "Mississippi", "mo": "Missouri",
"mt": "Montana", "ne": "Nebraska", "nv": "Nevada", "nh": "New Hampshire",
"nj": "New Jersey", "nm": "New Mexico", "ny": "New York",
"nc": "North Carolina", "nd": "North Dakota", "oh": "Ohio",
"ok": "Oklahoma", "or": "Oregon", "pa": "Pennsylvania",
"ri": "Rhode Island", "sc": "South Carolina", "sd": "South Dakota",
"tn": "Tennessee", "tx": "Texas", "ut": "Utah", "vt": "Vermont",
"va": "Virginia", "wa": "Washington", "wv": "West Virginia",
"wi": "Wisconsin", "wy": "Wyoming",
}
# --------------------------------------------------------------------- HTTP
class RateLimitedSession:
"""Thread-safe rate-limited requests.Session wrapper.
The lock + last-request timestamp are class-level so multiple
sessions (one per worker thread) share the same global interval.
Each thread has its own requests.Session for connection-pool
efficiency, but they all coordinate on the request-cadence
floor.
"""
_lock = threading.Lock()
_last_global: float = 0.0
_global_interval: float = REQ_INTERVAL_SEC
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
self.s = requests.Session()
self.s.headers["User-Agent"] = USER_AGENT
# Set the class-level interval to the most-restrictive caller
# (so a 0.25s caller can't be overridden by a later 1.0s
# caller starting another scraper in the same process).
with RateLimitedSession._lock:
if interval > RateLimitedSession._global_interval:
RateLimitedSession._global_interval = interval
def _wait(self) -> None:
with RateLimitedSession._lock:
delta = time.monotonic() - RateLimitedSession._last_global
if delta < RateLimitedSession._global_interval:
time.sleep(RateLimitedSession._global_interval - delta)
RateLimitedSession._last_global = time.monotonic()
def request(
self,
method: str,
url: str,
*,
max_retries: int = 4,
timeout: float = 30.0,
**kw: Any,
) -> requests.Response:
last_exc: Exception | None = None
for attempt in range(max_retries):
self._wait()
try:
resp = self.s.request(method, url, timeout=timeout, **kw)
except requests.RequestException as exc:
last_exc = exc
backoff = min(30.0, (2 ** attempt) + random.random())
log.warning("network error on %s %s: %s — retry in %.1fs",
method, url, exc, backoff)
time.sleep(backoff)
continue
if resp.status_code == 429 or 500 <= resp.status_code < 600:
ra = resp.headers.get("Retry-After")
backoff = float(ra) if (ra and ra.isdigit()) else min(30.0, (2 ** attempt) + random.random())
log.warning("HTTP %d on %s %s — retry in %.1fs",
resp.status_code, method, url, backoff)
time.sleep(backoff)
continue
return resp
if last_exc:
raise last_exc
return resp # type: ignore[return-value]
def get(self, url: str, **kw: Any) -> requests.Response:
return self.request("GET", url, **kw)
# --------------------------------------------------------------------- model
@dataclass
class TrialResult:
rank: int | None = None
brand: str = ""
product: str = ""
traits: str = ""
# Generic per-column metrics — keyed by the header from the table
# (e.g. "Yield" / "%MST" / "Ton/Acre" / "Milk Per Acre" /
# "Beef Per Ton"). Corn + soy use Yield/MST/Test Weight/Gross
# Revenue; silage uses Ton/Acre + Milk + Beef columns. Storing as
# an open dict keeps the scraper robust across crop types.
metrics: dict[str, float | str | None] = field(default_factory=dict)
entry_num: int | None = None
# Convenience accessors — back-compat for the chunker that looks
# up these specific keys.
@property
def yield_bu_ac(self) -> float | None:
v = self.metrics.get("Yield")
return v if isinstance(v, (int, float)) else None
@property
def mst_pct(self) -> float | None:
v = self.metrics.get("%MST")
return v if isinstance(v, (int, float)) else None
@property
def test_weight(self) -> float | None:
v = self.metrics.get("Test Weight")
return v if isinstance(v, (int, float)) else None
@property
def gross_revenue_dol_ac(self) -> float | None:
v = self.metrics.get("Gross Revenue")
return v if isinstance(v, (int, float)) else None
@property
def primary_metric(self) -> tuple[str, float | None]:
"""The first numeric metric — used as the canonical 'yield'
for ranking in the chunk preamble. Corn/soy: Yield (BU/Ac).
Silage: Ton/Acre."""
for k in ("Yield", "Ton/Acre", "Tons/Acre"):
v = self.metrics.get(k)
if isinstance(v, (int, float)):
return (k, v)
# Fallback to first numeric metric
for k, v in self.metrics.items():
if isinstance(v, (int, float)):
return (k, v)
return ("", None)
@dataclass
class PlotReport:
source_key: str
source_url: str
crop: str # "corn" / "soybeans" / "silage"
state_abbrev: str # "al"
state_name: str # "Alabama"
year: int
plot_id: str
cooperator: str | None = None
planted_date: str | None = None # ISO date
harvested_date: str | None = None # ISO date
population: int | None = None
row_width: int | None = None
results: list[TrialResult] = field(default_factory=list)
# --------------------------------------------------------------------- discovery
_PLOT_URL_RE = re.compile(
r".*?/(?P<crop>corn|soybean|silage)/plot-report/"
r"(?P<state>[a-z]{2})/(?P<year>\d{4})/(?P<plot>\d+)"
)
def discover_plots(
http: RateLimitedSession,
*,
crops: set[str],
states: set[str] | None,
years: set[int],
) -> list[tuple[str, str, str, int, str]]:
"""Walk the hybrids sitemap and return matching plot URLs as
``[(url, crop, state, year, plot_id), ...]`` tuples. ``crop`` is
normalized to the schema's terms (soybean → soybeans)."""
log.info("fetching sitemap %s", SITEMAP_HYBRIDS)
r = http.get(SITEMAP_HYBRIDS)
r.raise_for_status()
entries = re.findall(r"<loc>([^<]+)</loc>", r.text)
log.info("sitemap parsed: %d total locs", len(entries))
out: list[tuple[str, str, str, int, str]] = []
for url in entries:
m = _PLOT_URL_RE.match(url)
if not m:
continue
crop_url = m.group("crop")
# Normalize "soybean" → "soybeans" to match the rest of the corpus.
crop = "soybeans" if crop_url == "soybean" else crop_url
state = m.group("state").lower()
year = int(m.group("year"))
plot = m.group("plot")
if crops and crop not in crops:
continue
if states and state not in states:
continue
if years and year not in years:
continue
out.append((url, crop, state, year, plot))
log.info("after filters: %d plot URLs", len(out))
return out
# --------------------------------------------------------------------- helpers
def source_key_for(crop: str, state: str, year: int, plot_id: str) -> str:
return f"ghpr-{crop}-{state}-{year}-{plot_id}"
def _parse_date_mdy(s: str) -> str | None:
"""``04/06/23`` → ``2023-04-06``. Two-digit years are assumed to
be 20xx (sane for current-century trial data)."""
s = (s or "").strip()
m = re.match(r"^(\d{1,2})/(\d{1,2})/(\d{2,4})$", s)
if not m:
return None
mo, dy, yr = m.group(1), m.group(2), m.group(3)
if len(yr) == 2:
yr = "20" + yr
try:
return f"{int(yr):04d}-{int(mo):02d}-{int(dy):02d}"
except ValueError:
return None
def _parse_int(s: str | None) -> int | None:
if not s:
return None
s = re.sub(r"[,$]", "", str(s).strip())
try:
return int(s)
except ValueError:
return None
def _parse_float(s: str | None) -> float | None:
if not s:
return None
s = re.sub(r"[,$]", "", str(s).strip())
try:
return float(s)
except ValueError:
return None
# --------------------------------------------------------------------- detail
def fetch_plot_detail(
http: RateLimitedSession,
url: str,
crop: str,
state: str,
year: int,
plot_id: str,
) -> PlotReport | None:
"""Fetch one plot-report page and parse it."""
r = http.get(url)
if r.status_code == 404:
return None
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
prod = PlotReport(
source_key=source_key_for(crop, state, year, plot_id),
source_url=url,
crop=crop,
state_abbrev=state,
state_name=STATE_NAMES.get(state, state.upper()),
year=year,
plot_id=plot_id,
)
# Pull metadata from the header area. The page renders cooperator
# name + state + key fields as text following the h1.
h1 = soup.find("h1")
if h1:
# Walk up to a parent that includes the metadata strip
container = h1.parent
while container is not None and not container.find("table"):
parent = container.parent
if parent is None:
break
container = parent
if container:
text = container.get_text(" | ", strip=True)
# Cooperator is usually the segment right after the H1.
# Pattern: "Corn Plot Results | <Name> | <State> | Planted: | ..."
parts = [p.strip() for p in text.split("|") if p.strip()]
# Drop the title segment
if parts and parts[0].lower().startswith(("corn plot", "soybean plot", "silage plot")):
parts = parts[1:]
if parts:
# First segment that doesn't match a state name is the cooperator
cand = parts[0]
if cand and cand != prod.state_name and not cand.endswith(":"):
prod.cooperator = cand
# Walk the page text for known labeled fields.
page_text = soup.get_text(" ", strip=True)
m = re.search(r"Planted:\s*(\d{1,2}/\d{1,2}/\d{2,4})", page_text)
if m:
prod.planted_date = _parse_date_mdy(m.group(1))
m = re.search(r"Harvested:\s*(\d{1,2}/\d{1,2}/\d{2,4})", page_text)
if m:
prod.harvested_date = _parse_date_mdy(m.group(1))
m = re.search(r"Population:\s*([\d,]+)", page_text)
if m:
prod.population = _parse_int(m.group(1))
m = re.search(r"Row Width:\s*(\d+)", page_text)
if m:
prod.row_width = _parse_int(m.group(1))
# Parse the results table. The HTML uses ONE merged cell for
# "Brand Product Traits" (despite the header containing all
# three labels); subsequent cells are Yield, %MST, Test Weight,
# Gross Revenue, Entry #. We split the merged cell using a
# known-brand prefix match.
table = soup.find("table")
if not table:
return prod
rows = table.find_all("tr")
if not rows:
return prod
header_cells = [c.get_text(" ", strip=True) for c in rows[0].find_all(["th", "td"])]
def col_idx(*names: str) -> int | None:
for n in names:
for i, h in enumerate(header_cells):
if n.lower() in h.lower():
return i
return None
# Position of the merged identity cell, by header containing "Brand".
i_identity = col_idx("Brand")
i_rank = col_idx("Rank")
i_entry = col_idx("Entry")
# Build a list of (header, index) for the OTHER columns (the
# metric columns). Skips Rank, Brand-merge-cell, and Entry #.
metric_columns: list[tuple[str, int]] = []
skip_idx = {i_identity, i_rank, i_entry}
for i, h in enumerate(header_cells):
if i in skip_idx:
continue
h_clean = h.strip()
if h_clean:
metric_columns.append((h_clean, i))
for row in rows[1:]:
cells = [c.get_text(" ", strip=True) for c in row.find_all(["td", "th"])]
if len(cells) < 2:
continue
def cell(i: int | None) -> str:
return cells[i] if i is not None and 0 <= i < len(cells) else ""
identity = cell(i_identity).strip()
if any(k in identity.lower() for k in ("plot average", "trial average", "average")):
continue
brand, product, traits = _split_identity(identity)
# Collect every metric column verbatim. Numeric where parseable,
# else preserve the raw string (e.g. "ns" for not-significant).
metrics: dict[str, float | str | None] = {}
for h, idx in metric_columns:
raw = cell(idx).strip()
if not raw or raw == "-":
metrics[h] = None
else:
f = _parse_float(raw)
metrics[h] = f if f is not None else raw
result = TrialResult(
rank=_parse_int(cell(i_rank)),
brand=brand,
product=product,
traits=traits,
metrics=metrics,
entry_num=_parse_int(cell(i_entry)),
)
has_data = result.brand or result.product or any(
v is not None for v in metrics.values()
)
if has_data:
prod.results.append(result)
return prod
# Known seed brands that can appear in plot-report identity cells.
# Sorted longest-first so multi-word brands match before sub-strings.
_BRAND_NAMES = (
"Golden Harvest", "WestBred", "AgriPro", "DEKALB", "Pioneer",
"Channel", "Asgrow", "NK", "Becks", "Beck's", "Brevant",
"Stine", "Renk", "Wyffels", "LG Seeds", "Croplan", "FS",
"Local Choice", "Mycogen", "AgriGold", "Hoegemeyer",
)
_BRAND_RE = re.compile(
r"^(?:" + "|".join(re.escape(b) for b in _BRAND_NAMES) + r")\b",
re.I,
)
def _split_identity(identity: str) -> tuple[str, str, str]:
"""Split a plot-report identity cell into ``(brand, product, traits)``.
The HTML emits one merged cell like "NK NK1748-3110 Agrisure ®"
or "Golden Harvest G16Q82-DV DuracadeViptera™" or just
"DEKALB DKC65-20". We:
1. Match the brand against a known-brand list at the start.
2. The token immediately after the brand is the product.
3. Anything remaining is the trait stack (free text).
"""
if not identity:
return "", "", ""
s = identity.strip()
m = _BRAND_RE.match(s)
if not m:
# Unknown brand prefix — best-effort: first token is brand,
# second is product, rest is traits.
parts = s.split(maxsplit=2)
if len(parts) == 1:
return parts[0], "", ""
if len(parts) == 2:
return parts[0], parts[1], ""
return parts[0], parts[1], parts[2]
brand = m.group(0)
rest = s[len(brand):].strip()
parts = rest.split(maxsplit=1)
product = parts[0] if parts else ""
traits = parts[1].strip() if len(parts) > 1 else ""
return brand, product, traits
# --------------------------------------------------------------------- render
def render_markdown(p: PlotReport) -> str:
crop_label = {
"corn": "Corn", "soybeans": "Soybean", "silage": "Silage",
}.get(p.crop, p.crop.title())
head: list[str] = [
f"# {crop_label} yield trial — {p.state_name}, {p.year}",
"",
f"- **Source:** Golden Harvest plot report (cross-vendor head-to-head)",
f"- **Crop:** {crop_label}",
f"- **State:** {p.state_name} ({p.state_abbrev.upper()})",
f"- **Year:** {p.year}",
f"- **Plot ID:** {p.plot_id}",
]
if p.cooperator:
head.append(f"- **Cooperator:** {p.cooperator}")
if p.planted_date:
head.append(f"- **Planted:** {p.planted_date}")
if p.harvested_date:
head.append(f"- **Harvested:** {p.harvested_date}")
if p.population:
head.append(f"- **Population:** {p.population:,} seeds/acre")
if p.row_width:
head.append(f"- **Row width:** {p.row_width}\"")
head.append(f"- **URL:** {p.source_url}")
head.append("")
head.append("---")
head.append("")
sections: list[str] = []
if p.results:
# Discover all metric columns present across results, in
# first-seen order. This keeps corn (Yield/MST/...) and silage
# (Ton/Acre/Milk/Beef) using their own header sets.
metric_keys: list[str] = []
seen_keys: set[str] = set()
for r in p.results:
for k in r.metrics.keys():
if k not in seen_keys:
seen_keys.add(k)
metric_keys.append(k)
sections.append("## Results (top-down by rank)")
sections.append("")
header_cells = ["Rank", "Brand", "Product", "Traits"] + metric_keys
sections.append("| " + " | ".join(header_cells) + " |")
sections.append("|" + "|".join(["---"] * len(header_cells)) + "|")
for r in p.results:
row = [
str(r.rank) if r.rank is not None else "-",
r.brand or "-",
r.product or "-",
r.traits or "-",
]
for k in metric_keys:
v = r.metrics.get(k)
if v is None:
row.append("-")
elif isinstance(v, (int, float)):
# Dollar columns rendered with $ prefix
if "Revenue" in k or "$" in k:
row.append(f"${v:.2f}")
else:
row.append(str(v))
else:
row.append(str(v))
sections.append("| " + " | ".join(row) + " |")
sections.append("")
# Compact text summary for embedder signal — uses the primary
# metric (Yield for corn/soy, Ton/Acre for silage).
top = p.results[: min(5, len(p.results))]
if top:
primary_label, _ = top[0].primary_metric
if primary_label:
summary = ", ".join(
f"{r.product or '?'} ({r.brand or '?'}) {r.primary_metric[1]}"
for r in top
if r.primary_metric[1] is not None
)
if summary:
sections.append(f"Top {len(top)} by {primary_label}: {summary}.")
sections.append("")
return "\n".join(head) + "\n".join(sections)
# --------------------------------------------------------------------- write
def write_plot(prod: PlotReport, body_md: str) -> None:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
md_path = CORPUS_DIR / f"{prod.source_key}.md"
json_path = CORPUS_DIR / f"{prod.source_key}.json"
md_path.write_text(body_md, encoding="utf-8")
sidecar = {
"source": "gh_plot_reports",
"source_key": prod.source_key,
"data_type": "trial",
"vendor": "Syngenta", # Golden Harvest publishes the trial
"brand": "Golden Harvest",
"crop": prod.crop,
"state": prod.state_name,
"state_abbrev": prod.state_abbrev,
"year": prod.year,
"plot_id": prod.plot_id,
"cooperator": prod.cooperator,
"planted_date": prod.planted_date,
"harvested_date": prod.harvested_date,
"population_seeds_per_acre": prod.population,
"row_width_in": prod.row_width,
"results": [
{
"rank": r.rank,
"brand": r.brand,
"product": r.product,
"traits": r.traits,
# All per-column metrics verbatim. Corn/soy: Yield,
# %MST, Test Weight, Gross Revenue. Silage: Ton/Acre,
# Milk Per Acre, Milk Per Ton, Beef Per Acre, Beef Per
# Ton. (Plus any other column the source publishes.)
"metrics": r.metrics,
"entry_num": r.entry_num,
}
for r in prod.results
],
"n_results": len(prod.results),
"source_urls": [prod.source_url],
"fetched_at": datetime.now(timezone.utc).isoformat(),
"scraper_version": SCRAPER_VERSION,
}
json_path.write_text(
json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
# --------------------------------------------------------------------- pipeline
def process_plot(
http: RateLimitedSession,
*,
url: str,
crop: str,
state: str,
year: int,
plot_id: str,
force: bool,
) -> tuple[str, PlotReport | None]:
sk = source_key_for(crop, state, year, plot_id)
md_path = CORPUS_DIR / f"{sk}.md"
if md_path.exists() and not force:
return "skipped", None
try:
prod = fetch_plot_detail(http, url, crop, state, year, plot_id)
except Exception as exc: # noqa: BLE001
log.error("detail fetch failed for %s: %s", url, exc)
return "failed", None
if prod is None:
return "missing", None
body = render_markdown(prod)
write_plot(prod, body)
return "written", prod
def run(
*,
limit: int | None,
force: bool,
only_crop: str | None,
only_state: str | None,
only_year: int | None,
include_2023: bool,
workers: int = DEFAULT_WORKERS,
) -> int:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
crops = {only_crop} if only_crop else {"corn", "soybeans", "silage"}
states = {only_state} if only_state else None
if only_year:
years = {only_year}
elif include_2023:
years = {2023, 2024, 2025}
else:
years = {2024, 2025}
# One shared session for sitemap walk (single-threaded).
discovery_http = RateLimitedSession()
targets = discover_plots(discovery_http, crops=crops, states=states, years=years)
if limit is not None:
targets = targets[:limit]
counts = {"written": 0, "skipped": 0, "missing": 0, "failed": 0}
counts_lock = threading.Lock()
processed_counter = {"n": 0}
total = len(targets)
# One requests.Session per worker thread — they share the
# class-level rate limiter (REQ_INTERVAL_SEC between any two
# requests across all threads), but each has its own HTTP
# connection pool.
thread_local = threading.local()
def _session() -> RateLimitedSession:
s = getattr(thread_local, "session", None)
if s is None:
s = RateLimitedSession()
thread_local.session = s
return s
def _worker(target: tuple[str, str, str, int, str]) -> tuple[str, Any]:
url, crop, state, year, plot_id = target
return process_plot(
_session(), url=url, crop=crop, state=state, year=year,
plot_id=plot_id, force=force,
)
log.info("dispatching %d plots across %d workers (shared rate limiter %.2f sec/req)",
total, workers, REQ_INTERVAL_SEC)
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(_worker, t): t for t in targets}
for fut in as_completed(futures):
target = futures[fut]
url, crop, state, year, plot_id = target
try:
status, prod = fut.result()
except Exception as exc: # noqa: BLE001
log.error("worker failed for %s: %s", url, exc)
status, prod = "failed", None
with counts_lock:
counts[status] = counts.get(status, 0) + 1
processed_counter["n"] += 1
n = processed_counter["n"]
if (prod is not None and n <= 5) or n % 100 == 0 or status == "failed":
log.info(
"[%d/%d] %s %s | results=%d coop=%s",
n, total,
source_key_for(crop, state, year, plot_id), status,
len(prod.results) if prod else 0,
(prod.cooperator if prod else "-") or "-",
)
log.info(
"done: processed=%d written=%d skipped=%d missing=%d failed=%d (of %d candidates)",
processed_counter["n"], counts["written"], counts["skipped"],
counts["missing"], counts["failed"], total,
)
return 0 if counts["failed"] == 0 else 1
# --------------------------------------------------------------------- CLI
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="scrape.sources.gh_plot_reports",
description="Scrape Golden Harvest cross-vendor plot reports (yield trials).",
)
p.add_argument("--limit", type=int, default=None,
help="Stop after processing N plots (default: all).")
p.add_argument("--force", action="store_true",
help="Re-fetch even if the markdown file already exists.")
p.add_argument("--crop", default=None,
choices=("corn", "soybeans", "silage"),
help="Limit to one crop.")
p.add_argument("--state", default=None,
help="Limit to one state (2-letter abbrev: ia, il, ne, ...).")
p.add_argument("--year", type=int, default=None, choices=(2023, 2024, 2025),
help="Limit to one year.")
p.add_argument("--include-2023", action="store_true",
help="Include 2023 plot reports (default: 2024-2025 only).")
p.add_argument("--workers", type=int, default=DEFAULT_WORKERS,
help=f"Concurrent worker threads (default {DEFAULT_WORKERS}, "
f"all share a global {REQ_INTERVAL_SEC}-sec rate limiter).")
p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"))
return p
def main(argv: list[str] | None = None) -> int:
args = _build_argparser().parse_args(argv)
logging.basicConfig(
level=args.log_level.upper(),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr,
)
return run(
limit=args.limit,
force=args.force,
only_crop=args.crop,
only_state=args.state.lower() if args.state else None,
only_year=args.year,
include_2023=args.include_2023,
workers=args.workers,
)
if __name__ == "__main__":
sys.exit(main())