"""Golden Harvest plot-report scraper — cross-vendor yield trials. This is the FIRST source in the seed-mcp corpus with ``data_type: "trial"`` rather than the per-variety identity records all other scrapers emit. Each document is one head-to-head yield trial at a specific state/year/site, comparing products across brands (NK, DEKALB, Golden Harvest, sometimes Pioneer/Channel etc. listed as competitor entries) — i.e. **third-party-feeling cross-vendor data that Bayer doesn't publish itself**. Source: ``goldenharvestseeds.com`` — same site as ``golden_harvest`` variety scraper. ``/sitemap-ghs-hybrids.xml`` (already walked for the variety scraper) lists 8,237 plot reports across: Year Corn Soy Silage Total 2023 1,832 1,614 173 3,619 2024 1,432 1,277 137 2,846 2025 973 703 96 1,772 Initial scrape: 2024 + 2025 (4,618 reports). 2023 is older data that's still informative but lower priority. Defer 2023 to a later backfill pass via ``--include-2023``. URL shape: //plot-report/// e.g. /corn/plot-report/al/2023/2374765 Per-report data (server-rendered HTML): - Cooperator name (h1 area) - State (full name, e.g. "Alabama") - Planted date / Harvested date - Population (seeds/acre), Row Width - One with columns: Rank | Brand | Product | Traits | Yield (BU/Acre) | %MST | Test Weight | Gross Revenue | Entry # Each row in the results table can be from any seed brand — the trial is the test, not the catalog. Brand and product are the join keys back to the per-variety corpus (lookup_variety can pull the identity record if we have the same brand/product). Output: corpus/gh_plot_reports/.md LLM-visible body corpus/gh_plot_reports/.json sidecar metadata source_key convention: ``ghpr----`` e.g. ``ghpr-corn-al-2023-2374765``. CLI: python -m scrape.sources.gh_plot_reports --limit 5 python -m scrape.sources.gh_plot_reports --crop corn --state ia --year 2024 python -m scrape.sources.gh_plot_reports --include-2023 --force """ from __future__ import annotations import argparse import json import logging import os import random import re import sys import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any import requests from bs4 import BeautifulSoup SCRAPER_VERSION = "0.1.0" USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)" BASE = "https://www.goldenharvestseeds.com" SITEMAP_HYBRIDS = f"{BASE}/sitemap-ghs-hybrids.xml" REPO_ROOT = Path(__file__).resolve().parents[2] CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus") CORPUS_DIR = CORPUS_ROOT / "gh_plot_reports" # 0.25 sec between any two requests in the WHOLE process (shared # across worker threads). The site's pages take ~1.5 sec to serve on # their own; combined with concurrent workers this gives ~4 req/sec # net — polite by any normal standard, and the GH plot-reports # scrape on 4,600 docs finishes in ~20 min instead of ~3 hours. REQ_INTERVAL_SEC = 0.25 DEFAULT_WORKERS = 4 log = logging.getLogger("scrape.gh_plot_reports") # State name normalization: URL gives a 2-letter abbrev; sidecar keeps # both forms so search filters can use either. STATE_NAMES = { "al": "Alabama", "ak": "Alaska", "az": "Arizona", "ar": "Arkansas", "ca": "California", "co": "Colorado", "ct": "Connecticut", "de": "Delaware", "fl": "Florida", "ga": "Georgia", "hi": "Hawaii", "id": "Idaho", "il": "Illinois", "in": "Indiana", "ia": "Iowa", "ks": "Kansas", "ky": "Kentucky", "la": "Louisiana", "me": "Maine", "md": "Maryland", "ma": "Massachusetts", "mi": "Michigan", "mn": "Minnesota", "ms": "Mississippi", "mo": "Missouri", "mt": "Montana", "ne": "Nebraska", "nv": "Nevada", "nh": "New Hampshire", "nj": "New Jersey", "nm": "New Mexico", "ny": "New York", "nc": "North Carolina", "nd": "North Dakota", "oh": "Ohio", "ok": "Oklahoma", "or": "Oregon", "pa": "Pennsylvania", "ri": "Rhode Island", "sc": "South Carolina", "sd": "South Dakota", "tn": "Tennessee", "tx": "Texas", "ut": "Utah", "vt": "Vermont", "va": "Virginia", "wa": "Washington", "wv": "West Virginia", "wi": "Wisconsin", "wy": "Wyoming", } # --------------------------------------------------------------------- HTTP class RateLimitedSession: """Thread-safe rate-limited requests.Session wrapper. The lock + last-request timestamp are class-level so multiple sessions (one per worker thread) share the same global interval. Each thread has its own requests.Session for connection-pool efficiency, but they all coordinate on the request-cadence floor. """ _lock = threading.Lock() _last_global: float = 0.0 _global_interval: float = REQ_INTERVAL_SEC def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None: self.s = requests.Session() self.s.headers["User-Agent"] = USER_AGENT # Set the class-level interval to the most-restrictive caller # (so a 0.25s caller can't be overridden by a later 1.0s # caller starting another scraper in the same process). with RateLimitedSession._lock: if interval > RateLimitedSession._global_interval: RateLimitedSession._global_interval = interval def _wait(self) -> None: with RateLimitedSession._lock: delta = time.monotonic() - RateLimitedSession._last_global if delta < RateLimitedSession._global_interval: time.sleep(RateLimitedSession._global_interval - delta) RateLimitedSession._last_global = time.monotonic() def request( self, method: str, url: str, *, max_retries: int = 4, timeout: float = 30.0, **kw: Any, ) -> requests.Response: last_exc: Exception | None = None for attempt in range(max_retries): self._wait() try: resp = self.s.request(method, url, timeout=timeout, **kw) except requests.RequestException as exc: last_exc = exc backoff = min(30.0, (2 ** attempt) + random.random()) log.warning("network error on %s %s: %s — retry in %.1fs", method, url, exc, backoff) time.sleep(backoff) continue if resp.status_code == 429 or 500 <= resp.status_code < 600: ra = resp.headers.get("Retry-After") backoff = float(ra) if (ra and ra.isdigit()) else min(30.0, (2 ** attempt) + random.random()) log.warning("HTTP %d on %s %s — retry in %.1fs", resp.status_code, method, url, backoff) time.sleep(backoff) continue return resp if last_exc: raise last_exc return resp # type: ignore[return-value] def get(self, url: str, **kw: Any) -> requests.Response: return self.request("GET", url, **kw) # --------------------------------------------------------------------- model @dataclass class TrialResult: rank: int | None = None brand: str = "" product: str = "" traits: str = "" # Generic per-column metrics — keyed by the header from the table # (e.g. "Yield" / "%MST" / "Ton/Acre" / "Milk Per Acre" / # "Beef Per Ton"). Corn + soy use Yield/MST/Test Weight/Gross # Revenue; silage uses Ton/Acre + Milk + Beef columns. Storing as # an open dict keeps the scraper robust across crop types. metrics: dict[str, float | str | None] = field(default_factory=dict) entry_num: int | None = None # Convenience accessors — back-compat for the chunker that looks # up these specific keys. @property def yield_bu_ac(self) -> float | None: v = self.metrics.get("Yield") return v if isinstance(v, (int, float)) else None @property def mst_pct(self) -> float | None: v = self.metrics.get("%MST") return v if isinstance(v, (int, float)) else None @property def test_weight(self) -> float | None: v = self.metrics.get("Test Weight") return v if isinstance(v, (int, float)) else None @property def gross_revenue_dol_ac(self) -> float | None: v = self.metrics.get("Gross Revenue") return v if isinstance(v, (int, float)) else None @property def primary_metric(self) -> tuple[str, float | None]: """The first numeric metric — used as the canonical 'yield' for ranking in the chunk preamble. Corn/soy: Yield (BU/Ac). Silage: Ton/Acre.""" for k in ("Yield", "Ton/Acre", "Tons/Acre"): v = self.metrics.get(k) if isinstance(v, (int, float)): return (k, v) # Fallback to first numeric metric for k, v in self.metrics.items(): if isinstance(v, (int, float)): return (k, v) return ("", None) @dataclass class PlotReport: source_key: str source_url: str crop: str # "corn" / "soybeans" / "silage" state_abbrev: str # "al" state_name: str # "Alabama" year: int plot_id: str cooperator: str | None = None planted_date: str | None = None # ISO date harvested_date: str | None = None # ISO date population: int | None = None row_width: int | None = None results: list[TrialResult] = field(default_factory=list) # --------------------------------------------------------------------- discovery _PLOT_URL_RE = re.compile( r".*?/(?Pcorn|soybean|silage)/plot-report/" r"(?P[a-z]{2})/(?P\d{4})/(?P\d+)" ) def discover_plots( http: RateLimitedSession, *, crops: set[str], states: set[str] | None, years: set[int], ) -> list[tuple[str, str, str, int, str]]: """Walk the hybrids sitemap and return matching plot URLs as ``[(url, crop, state, year, plot_id), ...]`` tuples. ``crop`` is normalized to the schema's terms (soybean → soybeans).""" log.info("fetching sitemap %s", SITEMAP_HYBRIDS) r = http.get(SITEMAP_HYBRIDS) r.raise_for_status() entries = re.findall(r"([^<]+)", r.text) log.info("sitemap parsed: %d total locs", len(entries)) out: list[tuple[str, str, str, int, str]] = [] for url in entries: m = _PLOT_URL_RE.match(url) if not m: continue crop_url = m.group("crop") # Normalize "soybean" → "soybeans" to match the rest of the corpus. crop = "soybeans" if crop_url == "soybean" else crop_url state = m.group("state").lower() year = int(m.group("year")) plot = m.group("plot") if crops and crop not in crops: continue if states and state not in states: continue if years and year not in years: continue out.append((url, crop, state, year, plot)) log.info("after filters: %d plot URLs", len(out)) return out # --------------------------------------------------------------------- helpers def source_key_for(crop: str, state: str, year: int, plot_id: str) -> str: return f"ghpr-{crop}-{state}-{year}-{plot_id}" def _parse_date_mdy(s: str) -> str | None: """``04/06/23`` → ``2023-04-06``. Two-digit years are assumed to be 20xx (sane for current-century trial data).""" s = (s or "").strip() m = re.match(r"^(\d{1,2})/(\d{1,2})/(\d{2,4})$", s) if not m: return None mo, dy, yr = m.group(1), m.group(2), m.group(3) if len(yr) == 2: yr = "20" + yr try: return f"{int(yr):04d}-{int(mo):02d}-{int(dy):02d}" except ValueError: return None def _parse_int(s: str | None) -> int | None: if not s: return None s = re.sub(r"[,$]", "", str(s).strip()) try: return int(s) except ValueError: return None def _parse_float(s: str | None) -> float | None: if not s: return None s = re.sub(r"[,$]", "", str(s).strip()) try: return float(s) except ValueError: return None # --------------------------------------------------------------------- detail def fetch_plot_detail( http: RateLimitedSession, url: str, crop: str, state: str, year: int, plot_id: str, ) -> PlotReport | None: """Fetch one plot-report page and parse it.""" r = http.get(url) if r.status_code == 404: return None r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") prod = PlotReport( source_key=source_key_for(crop, state, year, plot_id), source_url=url, crop=crop, state_abbrev=state, state_name=STATE_NAMES.get(state, state.upper()), year=year, plot_id=plot_id, ) # Pull metadata from the header area. The page renders cooperator # name + state + key fields as text following the h1. h1 = soup.find("h1") if h1: # Walk up to a parent that includes the metadata strip container = h1.parent while container is not None and not container.find("table"): parent = container.parent if parent is None: break container = parent if container: text = container.get_text(" | ", strip=True) # Cooperator is usually the segment right after the H1. # Pattern: "Corn Plot Results | | | Planted: | ..." parts = [p.strip() for p in text.split("|") if p.strip()] # Drop the title segment if parts and parts[0].lower().startswith(("corn plot", "soybean plot", "silage plot")): parts = parts[1:] if parts: # First segment that doesn't match a state name is the cooperator cand = parts[0] if cand and cand != prod.state_name and not cand.endswith(":"): prod.cooperator = cand # Walk the page text for known labeled fields. page_text = soup.get_text(" ", strip=True) m = re.search(r"Planted:\s*(\d{1,2}/\d{1,2}/\d{2,4})", page_text) if m: prod.planted_date = _parse_date_mdy(m.group(1)) m = re.search(r"Harvested:\s*(\d{1,2}/\d{1,2}/\d{2,4})", page_text) if m: prod.harvested_date = _parse_date_mdy(m.group(1)) m = re.search(r"Population:\s*([\d,]+)", page_text) if m: prod.population = _parse_int(m.group(1)) m = re.search(r"Row Width:\s*(\d+)", page_text) if m: prod.row_width = _parse_int(m.group(1)) # Parse the results table. The HTML uses ONE merged cell for # "Brand Product Traits" (despite the header containing all # three labels); subsequent cells are Yield, %MST, Test Weight, # Gross Revenue, Entry #. We split the merged cell using a # known-brand prefix match. table = soup.find("table") if not table: return prod rows = table.find_all("tr") if not rows: return prod header_cells = [c.get_text(" ", strip=True) for c in rows[0].find_all(["th", "td"])] def col_idx(*names: str) -> int | None: for n in names: for i, h in enumerate(header_cells): if n.lower() in h.lower(): return i return None # Position of the merged identity cell, by header containing "Brand". i_identity = col_idx("Brand") i_rank = col_idx("Rank") i_entry = col_idx("Entry") # Build a list of (header, index) for the OTHER columns (the # metric columns). Skips Rank, Brand-merge-cell, and Entry #. metric_columns: list[tuple[str, int]] = [] skip_idx = {i_identity, i_rank, i_entry} for i, h in enumerate(header_cells): if i in skip_idx: continue h_clean = h.strip() if h_clean: metric_columns.append((h_clean, i)) for row in rows[1:]: cells = [c.get_text(" ", strip=True) for c in row.find_all(["td", "th"])] if len(cells) < 2: continue def cell(i: int | None) -> str: return cells[i] if i is not None and 0 <= i < len(cells) else "" identity = cell(i_identity).strip() if any(k in identity.lower() for k in ("plot average", "trial average", "average")): continue brand, product, traits = _split_identity(identity) # Collect every metric column verbatim. Numeric where parseable, # else preserve the raw string (e.g. "ns" for not-significant). metrics: dict[str, float | str | None] = {} for h, idx in metric_columns: raw = cell(idx).strip() if not raw or raw == "-": metrics[h] = None else: f = _parse_float(raw) metrics[h] = f if f is not None else raw result = TrialResult( rank=_parse_int(cell(i_rank)), brand=brand, product=product, traits=traits, metrics=metrics, entry_num=_parse_int(cell(i_entry)), ) has_data = result.brand or result.product or any( v is not None for v in metrics.values() ) if has_data: prod.results.append(result) return prod # Known seed brands that can appear in plot-report identity cells. # Sorted longest-first so multi-word brands match before sub-strings. _BRAND_NAMES = ( "Golden Harvest", "WestBred", "AgriPro", "DEKALB", "Pioneer", "Channel", "Asgrow", "NK", "Becks", "Beck's", "Brevant", "Stine", "Renk", "Wyffels", "LG Seeds", "Croplan", "FS", "Local Choice", "Mycogen", "AgriGold", "Hoegemeyer", ) _BRAND_RE = re.compile( r"^(?:" + "|".join(re.escape(b) for b in _BRAND_NAMES) + r")\b", re.I, ) def _split_identity(identity: str) -> tuple[str, str, str]: """Split a plot-report identity cell into ``(brand, product, traits)``. The HTML emits one merged cell like "NK NK1748-3110 Agrisure ®" or "Golden Harvest G16Q82-DV DuracadeViptera™" or just "DEKALB DKC65-20". We: 1. Match the brand against a known-brand list at the start. 2. The token immediately after the brand is the product. 3. Anything remaining is the trait stack (free text). """ if not identity: return "", "", "" s = identity.strip() m = _BRAND_RE.match(s) if not m: # Unknown brand prefix — best-effort: first token is brand, # second is product, rest is traits. parts = s.split(maxsplit=2) if len(parts) == 1: return parts[0], "", "" if len(parts) == 2: return parts[0], parts[1], "" return parts[0], parts[1], parts[2] brand = m.group(0) rest = s[len(brand):].strip() parts = rest.split(maxsplit=1) product = parts[0] if parts else "" traits = parts[1].strip() if len(parts) > 1 else "" return brand, product, traits # --------------------------------------------------------------------- render def render_markdown(p: PlotReport) -> str: crop_label = { "corn": "Corn", "soybeans": "Soybean", "silage": "Silage", }.get(p.crop, p.crop.title()) head: list[str] = [ f"# {crop_label} yield trial — {p.state_name}, {p.year}", "", f"- **Source:** Golden Harvest plot report (cross-vendor head-to-head)", f"- **Crop:** {crop_label}", f"- **State:** {p.state_name} ({p.state_abbrev.upper()})", f"- **Year:** {p.year}", f"- **Plot ID:** {p.plot_id}", ] if p.cooperator: head.append(f"- **Cooperator:** {p.cooperator}") if p.planted_date: head.append(f"- **Planted:** {p.planted_date}") if p.harvested_date: head.append(f"- **Harvested:** {p.harvested_date}") if p.population: head.append(f"- **Population:** {p.population:,} seeds/acre") if p.row_width: head.append(f"- **Row width:** {p.row_width}\"") head.append(f"- **URL:** {p.source_url}") head.append("") head.append("---") head.append("") sections: list[str] = [] if p.results: # Discover all metric columns present across results, in # first-seen order. This keeps corn (Yield/MST/...) and silage # (Ton/Acre/Milk/Beef) using their own header sets. metric_keys: list[str] = [] seen_keys: set[str] = set() for r in p.results: for k in r.metrics.keys(): if k not in seen_keys: seen_keys.add(k) metric_keys.append(k) sections.append("## Results (top-down by rank)") sections.append("") header_cells = ["Rank", "Brand", "Product", "Traits"] + metric_keys sections.append("| " + " | ".join(header_cells) + " |") sections.append("|" + "|".join(["---"] * len(header_cells)) + "|") for r in p.results: row = [ str(r.rank) if r.rank is not None else "-", r.brand or "-", r.product or "-", r.traits or "-", ] for k in metric_keys: v = r.metrics.get(k) if v is None: row.append("-") elif isinstance(v, (int, float)): # Dollar columns rendered with $ prefix if "Revenue" in k or "$" in k: row.append(f"${v:.2f}") else: row.append(str(v)) else: row.append(str(v)) sections.append("| " + " | ".join(row) + " |") sections.append("") # Compact text summary for embedder signal — uses the primary # metric (Yield for corn/soy, Ton/Acre for silage). top = p.results[: min(5, len(p.results))] if top: primary_label, _ = top[0].primary_metric if primary_label: summary = ", ".join( f"{r.product or '?'} ({r.brand or '?'}) {r.primary_metric[1]}" for r in top if r.primary_metric[1] is not None ) if summary: sections.append(f"Top {len(top)} by {primary_label}: {summary}.") sections.append("") return "\n".join(head) + "\n".join(sections) # --------------------------------------------------------------------- write def write_plot(prod: PlotReport, body_md: str) -> None: CORPUS_DIR.mkdir(parents=True, exist_ok=True) md_path = CORPUS_DIR / f"{prod.source_key}.md" json_path = CORPUS_DIR / f"{prod.source_key}.json" md_path.write_text(body_md, encoding="utf-8") sidecar = { "source": "gh_plot_reports", "source_key": prod.source_key, "data_type": "trial", "vendor": "Syngenta", # Golden Harvest publishes the trial "brand": "Golden Harvest", "crop": prod.crop, "state": prod.state_name, "state_abbrev": prod.state_abbrev, "year": prod.year, "plot_id": prod.plot_id, "cooperator": prod.cooperator, "planted_date": prod.planted_date, "harvested_date": prod.harvested_date, "population_seeds_per_acre": prod.population, "row_width_in": prod.row_width, "results": [ { "rank": r.rank, "brand": r.brand, "product": r.product, "traits": r.traits, # All per-column metrics verbatim. Corn/soy: Yield, # %MST, Test Weight, Gross Revenue. Silage: Ton/Acre, # Milk Per Acre, Milk Per Ton, Beef Per Acre, Beef Per # Ton. (Plus any other column the source publishes.) "metrics": r.metrics, "entry_num": r.entry_num, } for r in prod.results ], "n_results": len(prod.results), "source_urls": [prod.source_url], "fetched_at": datetime.now(timezone.utc).isoformat(), "scraper_version": SCRAPER_VERSION, } json_path.write_text( json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) # --------------------------------------------------------------------- pipeline def process_plot( http: RateLimitedSession, *, url: str, crop: str, state: str, year: int, plot_id: str, force: bool, ) -> tuple[str, PlotReport | None]: sk = source_key_for(crop, state, year, plot_id) md_path = CORPUS_DIR / f"{sk}.md" if md_path.exists() and not force: return "skipped", None try: prod = fetch_plot_detail(http, url, crop, state, year, plot_id) except Exception as exc: # noqa: BLE001 log.error("detail fetch failed for %s: %s", url, exc) return "failed", None if prod is None: return "missing", None body = render_markdown(prod) write_plot(prod, body) return "written", prod def run( *, limit: int | None, force: bool, only_crop: str | None, only_state: str | None, only_year: int | None, include_2023: bool, workers: int = DEFAULT_WORKERS, ) -> int: CORPUS_DIR.mkdir(parents=True, exist_ok=True) crops = {only_crop} if only_crop else {"corn", "soybeans", "silage"} states = {only_state} if only_state else None if only_year: years = {only_year} elif include_2023: years = {2023, 2024, 2025} else: years = {2024, 2025} # One shared session for sitemap walk (single-threaded). discovery_http = RateLimitedSession() targets = discover_plots(discovery_http, crops=crops, states=states, years=years) if limit is not None: targets = targets[:limit] counts = {"written": 0, "skipped": 0, "missing": 0, "failed": 0} counts_lock = threading.Lock() processed_counter = {"n": 0} total = len(targets) # One requests.Session per worker thread — they share the # class-level rate limiter (REQ_INTERVAL_SEC between any two # requests across all threads), but each has its own HTTP # connection pool. thread_local = threading.local() def _session() -> RateLimitedSession: s = getattr(thread_local, "session", None) if s is None: s = RateLimitedSession() thread_local.session = s return s def _worker(target: tuple[str, str, str, int, str]) -> tuple[str, Any]: url, crop, state, year, plot_id = target return process_plot( _session(), url=url, crop=crop, state=state, year=year, plot_id=plot_id, force=force, ) log.info("dispatching %d plots across %d workers (shared rate limiter %.2f sec/req)", total, workers, REQ_INTERVAL_SEC) with ThreadPoolExecutor(max_workers=workers) as pool: futures = {pool.submit(_worker, t): t for t in targets} for fut in as_completed(futures): target = futures[fut] url, crop, state, year, plot_id = target try: status, prod = fut.result() except Exception as exc: # noqa: BLE001 log.error("worker failed for %s: %s", url, exc) status, prod = "failed", None with counts_lock: counts[status] = counts.get(status, 0) + 1 processed_counter["n"] += 1 n = processed_counter["n"] if (prod is not None and n <= 5) or n % 100 == 0 or status == "failed": log.info( "[%d/%d] %s %s | results=%d coop=%s", n, total, source_key_for(crop, state, year, plot_id), status, len(prod.results) if prod else 0, (prod.cooperator if prod else "-") or "-", ) log.info( "done: processed=%d written=%d skipped=%d missing=%d failed=%d (of %d candidates)", processed_counter["n"], counts["written"], counts["skipped"], counts["missing"], counts["failed"], total, ) return 0 if counts["failed"] == 0 else 1 # --------------------------------------------------------------------- CLI def _build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog="scrape.sources.gh_plot_reports", description="Scrape Golden Harvest cross-vendor plot reports (yield trials).", ) p.add_argument("--limit", type=int, default=None, help="Stop after processing N plots (default: all).") p.add_argument("--force", action="store_true", help="Re-fetch even if the markdown file already exists.") p.add_argument("--crop", default=None, choices=("corn", "soybeans", "silage"), help="Limit to one crop.") p.add_argument("--state", default=None, help="Limit to one state (2-letter abbrev: ia, il, ne, ...).") p.add_argument("--year", type=int, default=None, choices=(2023, 2024, 2025), help="Limit to one year.") p.add_argument("--include-2023", action="store_true", help="Include 2023 plot reports (default: 2024-2025 only).") p.add_argument("--workers", type=int, default=DEFAULT_WORKERS, help=f"Concurrent worker threads (default {DEFAULT_WORKERS}, " f"all share a global {REQ_INTERVAL_SEC}-sec rate limiter).") p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO")) return p def main(argv: list[str] | None = None) -> int: args = _build_argparser().parse_args(argv) logging.basicConfig( level=args.log_level.upper(), format="%(asctime)s %(levelname)s %(name)s %(message)s", stream=sys.stderr, ) return run( limit=args.limit, force=args.force, only_crop=args.crop, only_state=args.state.lower() if args.state else None, only_year=args.year, include_2023=args.include_2023, workers=args.workers, ) if __name__ == "__main__": sys.exit(main())