"""ProHarvest Seeds plot reports — cross-vendor yield trials (data_type=trial). Source: ``proharvestseeds.com`` exposes a public, no-auth custom REST endpoint that the site's plot map calls: GET /wp-json/proharvest/v1/plots?y= It returns one object per plot for that harvest year with ``{id, title, city, state, county, year, latitude, longitude, file, product}`` — where ``file`` is the harvest-report **PDF** and ``product`` is ``Corn`` / ``Soybean``. ``/wp-json/proharvest/v1/latest-plot-year`` returns the newest year (currently 2025). Years span 2015–2025. The API gives clean location metadata; the PDF carries the plot management block + the head-to-head results table: Entry | Brand | Hybrid/Variety | Seed Trtmt. | % H2O | Test Wt. | Yield/Ac. | +/- Ave | Yield Rank Plot types (Focus / Strip / Third Party / Other) include ProHarvest-only strip trials AND third-party cross-vendor comparisons, so a single report can rank ProHarvest hybrids against DEKALB / Pioneer / etc. — the same value class as the Golden Harvest / LG / AgriGold plot reports already in the corpus. We emit the **same sidecar shape** as ``agrigold_plot_reports`` / ``lg_plot_reports`` / ``gh_plot_reports`` (``results: [{rank, brand, product, traits, metrics}]``) so the trial chunker's shared ``_render_gh_plot_chunk`` renderer handles it — ``proharvest_plots`` is added to that renderer's source list in ``rag/chunk.py``. Scope: 2024 + 2025 baseline (most recent = most relevant for current decisions). Older years (2015–2023) deferred behind ``--include-old``, mirroring how the other trial sources staged 2023. Output: corpus/proharvest_plots/.md corpus/proharvest_plots/.json source_key: ``phpr---``, e.g. ``phpr-corn-2025-1234``. CLI: python -m scrape.sources.proharvest_plots --year 2025 --limit 3 python -m scrape.sources.proharvest_plots --force python -m scrape.sources.proharvest_plots --include-old --force """ from __future__ import annotations import argparse import io import json import logging import os import random import re import sys import time from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any import pdfplumber import requests SCRAPER_VERSION = "0.1.0" USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)" BASE = "https://proharvestseeds.com" PLOTS_API = f"{BASE}/wp-json/proharvest/v1/plots" LATEST_YEAR_API = f"{BASE}/wp-json/proharvest/v1/latest-plot-year" BASELINE_YEARS = [2024, 2025] OLD_YEARS = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023] PRODUCT_TO_CROP = {"corn": "corn", "soybean": "soybeans", "soybeans": "soybeans"} REQ_INTERVAL_SEC = 1.5 REPO_ROOT = Path(__file__).resolve().parents[2] CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus") CORPUS_DIR = CORPUS_ROOT / "proharvest_plots" log = logging.getLogger("scrape.proharvest_plots") # --------------------------------------------------------------------- HTTP class RateLimitedSession: def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None: self.s = requests.Session() self.s.headers["User-Agent"] = USER_AGENT self.interval = interval self._last = 0.0 def _wait(self) -> None: delta = time.monotonic() - self._last if delta < self.interval: time.sleep(self.interval - delta) self._last = time.monotonic() def request(self, method: str, url: str, *, max_retries: int = 4, timeout: float = 45.0, **kw: Any) -> requests.Response: last_exc: Exception | None = None for attempt in range(max_retries): self._wait() try: resp = self.s.request(method, url, timeout=timeout, **kw) except requests.RequestException as exc: last_exc = exc backoff = min(30.0, (2 ** attempt) + random.random()) log.warning("network error on %s %s: %s — retry in %.1fs", method, url, exc, backoff) time.sleep(backoff) continue if resp.status_code == 429 or 500 <= resp.status_code < 600: ra = resp.headers.get("Retry-After") backoff = float(ra) if (ra and ra.isdigit()) else min( 30.0, (2 ** attempt) + random.random()) log.warning("HTTP %d on %s %s — retry in %.1fs", resp.status_code, method, url, backoff) time.sleep(backoff) continue return resp if last_exc: raise last_exc return resp # type: ignore[return-value] def get(self, url: str, **kw: Any) -> requests.Response: return self.request("GET", url, **kw) # --------------------------------------------------------------------- model @dataclass class PHPlot: source_key: str plot_id: int crop: str year: int title: str # cooperator / plot name city: str | None = None state: str | None = None county: str | None = None latitude: float | None = None longitude: float | None = None pdf_url: str = "" # plot management block (from the PDF header) company_rep: str | None = None planted_date: str | None = None harvested_date: str | None = None previous_crop: str | None = None row_width: str | None = None population_seeds_per_acre: int | None = None fungicide: str | None = None herbicide: str | None = None insecticide: str | None = None tillage: str | None = None irrigation: str | None = None results: list[dict] = field(default_factory=list) # Verbatim PDF text — populated only when structured row parsing # fails (a foreign-format third-party report), so the data is still # embedded + retrievable instead of dropped. verbatim_text: str = "" # --------------------------------------------------------------------- enumerate def fetch_plots_for_year(http: RateLimitedSession, year: int) -> list[dict]: r = http.get(f"{PLOTS_API}?y={year}") r.raise_for_status() data = r.json() return data if isinstance(data, list) else [] # --------------------------------------------------------------------- PDF parse _NUM_RE = re.compile(r"^-?\d+(?:\.\d+)?$") def _to_num(s: str) -> float | None: s = (s or "").strip() if not s or not _NUM_RE.match(s): return None f = float(s) return int(f) if f.is_integer() else f # All header field labels on the plot-management lines. Used as the # boundary for value extraction so an EMPTY field (e.g. "Tillage:" # with nothing after it) doesn't swallow the next label as its value. _HEADER_LABELS = [ "Company Representative", "Planted", "Harvested", "Previous Crop", "Herbicide", "Row Width", "Seeding Rate", "Fungicide", "Fertilizer", "Tillage", "Insecticide", "Irrigation", "General Plot Comments", ] _LABEL_BOUNDARY = "|".join(re.escape(l) for l in _HEADER_LABELS) def _kv(text: str, label: str) -> str | None: """Extract a 'Label: value' field. The value runs until the next known label, end of line, or end of text — so an empty field returns None instead of capturing the following label.""" m = re.search( rf"{re.escape(label)}:\s*(.*?)\s*(?=(?:{_LABEL_BOUNDARY}):|\n|$)", text) if not m: return None v = m.group(1).strip().strip("-").strip() # Guard: a value that is itself a known label means the field was empty. if not v or v.rstrip(":") in _HEADER_LABELS: return None return v def _parse_header(text: str, plot: PHPlot) -> None: plot.company_rep = _kv(text, "Company Representative") plot.planted_date = _kv(text, "Planted") plot.harvested_date = _kv(text, "Harvested") plot.previous_crop = _kv(text, "Previous Crop") plot.row_width = _kv(text, "Row Width") plot.fungicide = _kv(text, "Fungicide") plot.herbicide = _kv(text, "Herbicide") plot.insecticide = _kv(text, "Insecticide") plot.tillage = _kv(text, "Tillage") plot.irrigation = _kv(text, "Irrigation") sr = _kv(text, "Seeding Rate") if sr: m = re.search(r"(\d[\d,]*)", sr) if m: plot.population_seeds_per_acre = int(m.group(1).replace(",", "")) def _norm_label(s: str) -> str: return re.sub(r"\s+", " ", (s or "").strip()).lower().rstrip(".") # header-label -> our metric key (canonical "Yield" so the chunker's # top-N primary-metric picker finds it). _COL_MAP = { "entry": "_entry", "brand": "_brand", "hybrid/variety": "_product", "variety": "_product", "hybrid": "_product", "seed trtmt": "_seed_trtmt", "% h2o": "% H2O", "%h2o": "% H2O", "moisture": "% H2O", "test wt": "Test Wt.", "test weight": "Test Wt.", "yield/ac": "Yield", "yield/acre": "Yield", "yield": "Yield", "+/- ave": "+/- Ave", "+/-ave": "+/- Ave", "yield rank": "_rank", "rank": "_rank", } def _parse_results_from_tables(pdf: pdfplumber.PDF) -> list[dict]: """Walk every table on every page; once we see the header row, map subsequent digit-led rows by column position (None cells dropped on both header + row so they stay parallel).""" results: list[dict] = [] colmap: list[str] | None = None for page in pdf.pages: for table in page.extract_tables() or []: for raw in table: cells = [c for c in raw if c is not None] cells = [c.replace("\n", " ").strip() if isinstance(c, str) else c for c in cells] if not cells: continue labels = [_norm_label(c) for c in cells] if "brand" in labels and any( l in ("hybrid/variety", "variety", "hybrid") for l in labels): colmap = [_COL_MAP.get(l, "") for l in labels] continue if colmap is None: continue # data row: first cell must be an integer entry number if not cells or not re.match(r"^\d+$", str(cells[0]).strip()): continue if len(cells) < len(colmap): cells = cells + [""] * (len(colmap) - len(cells)) rec: dict[str, Any] = {} metrics: dict[str, Any] = {} for key, cell in zip(colmap, cells): if not key: continue val = cell.strip() if isinstance(cell, str) else cell if key == "_entry": rec["_entry"] = _to_num(val) elif key == "_brand": rec["brand"] = _strip_check(val) or None elif key == "_product": rec["_raw_product"] = val or "" elif key == "_rank": rec["rank"] = _to_num(val) elif key == "_seed_trtmt": if val: metrics["Seed Trtmt."] = val else: metrics[key] = _to_num(val) if _NUM_RE.match(str(val)) else (val or None) # split hybrid + trait off the product cell raw_prod = _strip_check(rec.pop("_raw_product", "")).strip() parts = raw_prod.split(maxsplit=1) rec["product"] = parts[0] if parts else raw_prod rec["traits"] = parts[1] if len(parts) > 1 else None rec["metrics"] = metrics rec.pop("_entry", None) if rec.get("product"): results.append(rec) # sort by yield rank when present, else by yield desc def _sortkey(r: dict) -> tuple: if isinstance(r.get("rank"), (int, float)): return (0, r["rank"]) y = r.get("metrics", {}).get("Yield") return (1, -y if isinstance(y, (int, float)) else 0) results.sort(key=_sortkey) return results _NUM_TOKEN = re.compile(r"^-?\d+(?:\.\d+)?$") # Strip a "(check)" / "(check₁)" trial annotation from a brand/product token. _CHECK_RE = re.compile(r"\s*\(check[^)]*\)\s*", re.I) # Multi-word seed brands seen in ProHarvest's competitor rows. The naive # "first token = brand" split would chop these (e.g. brand "Golden", # product "Harvest"), so match the longest known phrase first. KNOWN_MULTIWORD_BRANDS = [ "golden harvest", "seed consultants", "partners brand", "fs invision", "sun prairie", "dura crop", "nu tech", "local seed", "prairie brand", "great lakes", "viking/blueriver", ] def _strip_check(s: str) -> str: return _CHECK_RE.sub(" ", s or "").strip() def _split_brand_product(tokens: list[str]) -> tuple[str, str, str | None]: """From the middle tokens (between entry and the trailing numerics), pull brand / product / traits. Honors known multi-word brands.""" toks = [t for t in tokens if not _CHECK_RE.fullmatch(f"({t.strip('()')})")] joined = " ".join(toks) low = joined.lower() brand_tokens = 1 for phrase in sorted(KNOWN_MULTIWORD_BRANDS, key=len, reverse=True): if low.startswith(phrase + " "): brand_tokens = len(phrase.split()) break brand = _strip_check(" ".join(toks[:brand_tokens])) or (toks[0] if toks else "") rest = toks[brand_tokens:] product = _strip_check(rest[0]) if rest else "" traits = " ".join(rest[1:]) or None return brand, product, traits def _row_ok(r: dict) -> bool: """A structurally-sound result row: a real (non-numeric) brand, a product code, and a plausible Yield. Used to drop junk rows and to decide when a whole plot's parse is too corrupt to trust.""" brand = (r.get("brand") or "").strip() if not brand or brand.isdigit() or len(brand) <= 1: return False if not (r.get("product") or "").strip(): return False y = r.get("metrics", {}).get("Yield") if not isinstance(y, (int, float)) or not (1 < y < 400): return False rank = r.get("rank") if isinstance(rank, (int, float)) and rank > 200: # a yield leaked into rank return False return True def _assign_metrics(nums: list[float]) -> dict: """Map a row's trailing numeric run to metric columns, anchored from the RIGHT (Yield Rank, +/- Ave, Yield/Ac. are always the last three). Optional leading columns vary: soybean reports often drop Test Wt., so a row can carry 3, 4, or 5 numerics: 5 → % H2O, Test Wt., Yield, +/- Ave, Rank 4 → % H2O, Yield, +/- Ave, Rank (no Test Wt.) 3 → Yield, +/- Ave, Rank (no moisture/test wt.) """ n = len(nums) rank = nums[-1] ave = nums[-2] yld = nums[-3] h2o = nums[-4] if n >= 4 else None testwt = None if n >= 5: testwt = nums[-4] h2o = nums[-5] # Emit in a readable order (Yield is the primary metric the chunker's # top-N picker keys on). m: dict = {"Yield": yld} if h2o is not None: m["% H2O"] = h2o if testwt is not None: m["Test Wt."] = testwt m["+/- Ave"] = ave return {"rank": int(rank), "metrics": m} def _parse_results_from_text(text: str) -> list[dict]: """Fallback row parser for PDFs whose tables have no ruling lines (pdfplumber returns whole rows as one cell). Anchors on the trailing numeric run, which is positionally stable across layouts and column counts.""" results: list[dict] = [] started = False for line in text.splitlines(): low = line.lower() if not started: if "brand" in low and ("hybrid" in low or "variety" in low): started = True continue toks = line.split() if len(toks) < 5 or not toks[0].isdigit(): continue # trailing run of numeric tokens j = len(toks) while j > 0 and _NUM_TOKEN.match(toks[j - 1]): j -= 1 nums = [float(t) for t in toks[j:]] if len(nums) < 3: continue # rank must be a whole number; if the last token has a decimal the # row is malformed (wrapped) — skip it rather than guess. if "." in toks[-1]: continue mid = toks[1:j] # brand + hybrid + optional trait/trtmt if len(mid) < 2: continue rec = _assign_metrics(nums) brand, product, traits = _split_brand_product(mid) rec["brand"] = brand rec["product"] = product rec["traits"] = traits results.append(rec) results.sort(key=lambda r: r["rank"]) return results def parse_pdf(http: RateLimitedSession, plot: PHPlot) -> None: r = http.get(plot.pdf_url) r.raise_for_status() with pdfplumber.open(io.BytesIO(r.content)) as pdf: text = "\n".join((p.extract_text() or "") for p in pdf.pages) _parse_header(text, plot) results = _parse_results_from_tables(pdf) if not results: # Tables had no ruling lines → parse the verbatim text rows. results = _parse_results_from_text(text) # Sanity-gate the structured parse. Off-template reports (e.g. a # university land-lab with extra RM / harvest-weight columns and a # multi-line header) parse into junk rows — numeric brands, a yield # leaked into the rank, empty metrics. Drop bad rows; if too few # survive, discard the structured parse entirely and keep the # verbatim text instead so nothing is silently corrupted. good = [r for r in results if _row_ok(r)] if good and len(good) >= 2 and len(good) >= 0.6 * len(results): plot.results = good else: plot.results = [] if not plot.results: # Foreign / off-template report — keep the verbatim text so the # cross-vendor data isn't lost. plot.verbatim_text = text.strip() # --------------------------------------------------------------------- render def render_markdown(plot: PHPlot) -> str: crop_label = {"corn": "Corn", "soybeans": "Soybean"}.get(plot.crop, plot.crop.title()) loc = ", ".join(filter(None, [plot.city, plot.state])) head: list[str] = [ f"# {crop_label} yield trial — {plot.title} ({loc}, {plot.year})", "", "- **Publisher:** ProHarvest Seeds (cross-vendor plot report)", f"- **Crop:** {crop_label}", f"- **Year:** {plot.year}", ] if not plot.results and plot.verbatim_text: # Foreign-format report — emit the management header we did parse, # then the verbatim PDF text under the separator the chunker reads. if loc: head.append(f"- **Location:** {loc}" + (f" · {plot.county} County" if plot.county else "")) head += [f"- **Source PDF:** {plot.pdf_url}", "", "---", "", "## Trial data (verbatim from PDF)", "", "```", plot.verbatim_text, "```", ""] return "\n".join(head) if loc: head.append(f"- **Location:** {loc}" + (f" · {plot.county} County" if plot.county else "")) for label, val in [ ("Cooperator", plot.title), ("Company rep", plot.company_rep), ("Planted", plot.planted_date), ("Harvested", plot.harvested_date), ("Previous crop", plot.previous_crop), ("Row width", plot.row_width), ("Population", f"{plot.population_seeds_per_acre:,} seeds/acre" if plot.population_seeds_per_acre else None), ("Tillage", plot.tillage), ("Irrigation", plot.irrigation), ("Fungicide", plot.fungicide), ]: if val: head.append(f"- **{label}:** {val}") head += [f"- **Source PDF:** {plot.pdf_url}", "", "---", "", "## Results (head-to-head)", "", "| Rank | Brand | Hybrid/Variety | Trait | Yield/Ac | % H2O | Test Wt | +/- Ave |", "|---|---|---|---|---|---|---|---|"] for r in plot.results: m = r.get("metrics", {}) head.append("| {rank} | {brand} | {prod} | {tr} | {y} | {h2o} | {tw} | {ave} |".format( rank=r.get("rank", "-"), brand=r.get("brand") or "-", prod=r.get("product") or "-", tr=r.get("traits") or "-", y=m.get("Yield", "-"), h2o=m.get("% H2O", "-"), tw=m.get("Test Wt.", "-"), ave=m.get("+/- Ave", "-"))) head.append("") return "\n".join(head) def write_plot(plot: PHPlot, body_md: str) -> None: CORPUS_DIR.mkdir(parents=True, exist_ok=True) (CORPUS_DIR / f"{plot.source_key}.md").write_text(body_md, encoding="utf-8") sidecar = { "source": "proharvest_plots", "source_key": plot.source_key, "data_type": "trial", "vendor": "ProHarvest Seeds", "brand": "ProHarvest Seeds", "crop": plot.crop, "state": plot.state, "state_abbrev": (plot.state or "").lower() or None, "city": plot.city, "county": plot.county, "year": plot.year, "plot_id": plot.plot_id, "cooperator": plot.title, "latitude": plot.latitude, "longitude": plot.longitude, "company_representative": plot.company_rep, "planted_date": plot.planted_date, "harvested_date": plot.harvested_date, "previous_crop": plot.previous_crop, "row_width": plot.row_width, "population_seeds_per_acre": plot.population_seeds_per_acre, "fungicide": plot.fungicide, "herbicide": plot.herbicide, "insecticide": plot.insecticide, "tillage": plot.tillage, "irrigation": plot.irrigation, "results": plot.results, "n_results": len(plot.results), # True when no structured rows could be parsed and the body holds # the verbatim PDF text instead (foreign-format third-party report). "raw_text": bool(not plot.results and plot.verbatim_text), "source_urls": [plot.pdf_url], "fetched_at": datetime.now(timezone.utc).isoformat(), "scraper_version": SCRAPER_VERSION, } (CORPUS_DIR / f"{plot.source_key}.json").write_text( json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") # --------------------------------------------------------------------- pipeline def run(*, years: list[int], limit: int | None, force: bool, only_plot: str | None) -> int: CORPUS_DIR.mkdir(parents=True, exist_ok=True) http = RateLimitedSession() counts = {"written": 0, "verbatim": 0, "skipped": 0, "image_skip": 0, "failed": 0} processed = 0 for year in years: try: plots = fetch_plots_for_year(http, year) except requests.HTTPError as exc: log.error("year %s enumeration failed: %s", year, exc) continue log.info("year %s: %d plots", year, len(plots)) for pj in plots: if limit is not None and processed >= limit: break product = str(pj.get("product") or "").strip().lower() crop = PRODUCT_TO_CROP.get(product) if not crop: continue # skip non-row-crop products if any appear pid = pj.get("id") source_key = f"phpr-{crop}-{year}-{pid}" if only_plot and source_key != only_plot and str(pid) != only_plot: continue processed += 1 md_path = CORPUS_DIR / f"{source_key}.md" if md_path.exists() and not force: counts["skipped"] += 1 log.info("[%d] %s skipped", processed, source_key) continue pdf_url = pj.get("file") or "" if not pdf_url: log.warning("%s has no PDF file — skipping", source_key) continue plot = PHPlot( source_key=source_key, plot_id=int(pid), crop=crop, year=int(year), title=(pj.get("title") or "").strip(), city=(pj.get("city") or "").strip() or None, state=(pj.get("state") or "").strip() or None, county=(pj.get("county") or "").strip() or None, latitude=pj.get("latitude"), longitude=pj.get("longitude"), pdf_url=pdf_url) try: parse_pdf(http, plot) except Exception as exc: # PDF parse is best-effort counts["failed"] += 1 log.error("[%d] %s PDF parse failed: %s", processed, source_key, exc) continue has_text_table = ( len(plot.verbatim_text) >= 300 and len(re.findall(r"\d", plot.verbatim_text)) >= 30) if not plot.results and not has_text_table: # No structured rows AND no real text layer with numbers → # image-only / unparseable PDF. Skip, but count it (no # silent cap). (Column headers vary — e.g. "Bu/Acre" vs # "Yield" — so we gate on digit density, not a keyword.) counts["image_skip"] += 1 log.warning("[%d] %s — no rows + no data text (image PDF?); skipping", processed, source_key) continue write_plot(plot, render_markdown(plot)) if plot.results: counts["written"] += 1 log.info("[%d] %s written | %s %s, %s | %d results", processed, source_key, plot.crop, plot.state, plot.year, len(plot.results)) else: counts["verbatim"] += 1 log.info("[%d] %s written VERBATIM (foreign-format) | %s %s, %s | %d chars", processed, source_key, plot.crop, plot.state, plot.year, len(plot.verbatim_text)) if limit is not None and processed >= limit: break log.info("done: processed=%d written(structured)=%d written(verbatim)=%d " "skipped=%d image_skip=%d failed=%d", processed, counts["written"], counts["verbatim"], counts["skipped"], counts["image_skip"], counts["failed"]) return 0 # --------------------------------------------------------------------- CLI def _build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog="scrape.sources.proharvest_plots", description="Scrape ProHarvest Seeds plot reports (cross-vendor yield " "trials) via the proharvest/v1/plots API + harvest-report PDFs.") p.add_argument("--year", type=int, default=None, help="Scrape a single year (default: 2024+2025 baseline).") p.add_argument("--include-old", action="store_true", help="Also scrape 2015–2023 (deferred by default).") p.add_argument("--limit", type=int, default=None, help="Stop after processing N plots (default: all).") p.add_argument("--force", action="store_true", help="Re-fetch even if the markdown file already exists.") p.add_argument("--plot", default=None, help="Process a single plot by source_key or plot id.") p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO")) return p def main(argv: list[str] | None = None) -> int: args = _build_argparser().parse_args(argv) logging.basicConfig( level=args.log_level.upper(), format="%(asctime)s %(levelname)s %(name)s %(message)s", stream=sys.stderr) if args.year is not None: years = [args.year] elif args.include_old: years = OLD_YEARS + BASELINE_YEARS else: years = BASELINE_YEARS return run(years=years, limit=args.limit, force=args.force, only_plot=args.plot) if __name__ == "__main__": sys.exit(main())