22e8092faf
Image rebuild (skip scrape) / build (push) Successful in 5m46s
Co-authored-by: claude <claude@jpaul.io> Co-committed-by: claude <claude@jpaul.io>
701 lines
27 KiB
Python
701 lines
27 KiB
Python
"""ProHarvest Seeds plot reports — cross-vendor yield trials (data_type=trial).
|
||
|
||
Source: ``proharvestseeds.com`` exposes a public, no-auth custom REST
|
||
endpoint that the site's plot map calls:
|
||
|
||
GET /wp-json/proharvest/v1/plots?y=<year>
|
||
|
||
It returns one object per plot for that harvest year with
|
||
``{id, title, city, state, county, year, latitude, longitude, file,
|
||
product}`` — where ``file`` is the harvest-report **PDF** and ``product``
|
||
is ``Corn`` / ``Soybean``. ``/wp-json/proharvest/v1/latest-plot-year``
|
||
returns the newest year (currently 2025). Years span 2015–2025.
|
||
|
||
The API gives clean location metadata; the PDF carries the plot
|
||
management block + the head-to-head results table:
|
||
|
||
Entry | Brand | Hybrid/Variety | Seed Trtmt. | % H2O | Test Wt. |
|
||
Yield/Ac. | +/- Ave | Yield Rank
|
||
|
||
Plot types (Focus / Strip / Third Party / Other) include ProHarvest-only
|
||
strip trials AND third-party cross-vendor comparisons, so a single report
|
||
can rank ProHarvest hybrids against DEKALB / Pioneer / etc. — the same
|
||
value class as the Golden Harvest / LG / AgriGold plot reports already in
|
||
the corpus.
|
||
|
||
We emit the **same sidecar shape** as ``agrigold_plot_reports`` /
|
||
``lg_plot_reports`` / ``gh_plot_reports`` (``results: [{rank, brand,
|
||
product, traits, metrics}]``) so the trial chunker's shared
|
||
``_render_gh_plot_chunk`` renderer handles it — ``proharvest_plots`` is
|
||
added to that renderer's source list in ``rag/chunk.py``.
|
||
|
||
Scope: 2024 + 2025 baseline (most recent = most relevant for current
|
||
decisions). Older years (2015–2023) deferred behind ``--include-old``,
|
||
mirroring how the other trial sources staged 2023.
|
||
|
||
Output:
|
||
corpus/proharvest_plots/<source_key>.md
|
||
corpus/proharvest_plots/<source_key>.json
|
||
|
||
source_key: ``phpr-<crop>-<year>-<plot_id>``, e.g. ``phpr-corn-2025-1234``.
|
||
|
||
CLI:
|
||
python -m scrape.sources.proharvest_plots --year 2025 --limit 3
|
||
python -m scrape.sources.proharvest_plots --force
|
||
python -m scrape.sources.proharvest_plots --include-old --force
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import io
|
||
import json
|
||
import logging
|
||
import os
|
||
import random
|
||
import re
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import pdfplumber
|
||
import requests
|
||
|
||
SCRAPER_VERSION = "0.1.0"
|
||
USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)"
|
||
BASE = "https://proharvestseeds.com"
|
||
PLOTS_API = f"{BASE}/wp-json/proharvest/v1/plots"
|
||
LATEST_YEAR_API = f"{BASE}/wp-json/proharvest/v1/latest-plot-year"
|
||
|
||
BASELINE_YEARS = [2024, 2025]
|
||
OLD_YEARS = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023]
|
||
|
||
PRODUCT_TO_CROP = {"corn": "corn", "soybean": "soybeans", "soybeans": "soybeans"}
|
||
|
||
REQ_INTERVAL_SEC = 1.5
|
||
|
||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
|
||
CORPUS_DIR = CORPUS_ROOT / "proharvest_plots"
|
||
|
||
log = logging.getLogger("scrape.proharvest_plots")
|
||
|
||
|
||
# --------------------------------------------------------------------- HTTP
|
||
|
||
|
||
class RateLimitedSession:
|
||
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
|
||
self.s = requests.Session()
|
||
self.s.headers["User-Agent"] = USER_AGENT
|
||
self.interval = interval
|
||
self._last = 0.0
|
||
|
||
def _wait(self) -> None:
|
||
delta = time.monotonic() - self._last
|
||
if delta < self.interval:
|
||
time.sleep(self.interval - delta)
|
||
self._last = time.monotonic()
|
||
|
||
def request(self, method: str, url: str, *, max_retries: int = 4,
|
||
timeout: float = 45.0, **kw: Any) -> requests.Response:
|
||
last_exc: Exception | None = None
|
||
for attempt in range(max_retries):
|
||
self._wait()
|
||
try:
|
||
resp = self.s.request(method, url, timeout=timeout, **kw)
|
||
except requests.RequestException as exc:
|
||
last_exc = exc
|
||
backoff = min(30.0, (2 ** attempt) + random.random())
|
||
log.warning("network error on %s %s: %s — retry in %.1fs",
|
||
method, url, exc, backoff)
|
||
time.sleep(backoff)
|
||
continue
|
||
if resp.status_code == 429 or 500 <= resp.status_code < 600:
|
||
ra = resp.headers.get("Retry-After")
|
||
backoff = float(ra) if (ra and ra.isdigit()) else min(
|
||
30.0, (2 ** attempt) + random.random())
|
||
log.warning("HTTP %d on %s %s — retry in %.1fs",
|
||
resp.status_code, method, url, backoff)
|
||
time.sleep(backoff)
|
||
continue
|
||
return resp
|
||
if last_exc:
|
||
raise last_exc
|
||
return resp # type: ignore[return-value]
|
||
|
||
def get(self, url: str, **kw: Any) -> requests.Response:
|
||
return self.request("GET", url, **kw)
|
||
|
||
|
||
# --------------------------------------------------------------------- model
|
||
|
||
|
||
@dataclass
|
||
class PHPlot:
|
||
source_key: str
|
||
plot_id: int
|
||
crop: str
|
||
year: int
|
||
title: str # cooperator / plot name
|
||
city: str | None = None
|
||
state: str | None = None
|
||
county: str | None = None
|
||
latitude: float | None = None
|
||
longitude: float | None = None
|
||
pdf_url: str = ""
|
||
# plot management block (from the PDF header)
|
||
company_rep: str | None = None
|
||
planted_date: str | None = None
|
||
harvested_date: str | None = None
|
||
previous_crop: str | None = None
|
||
row_width: str | None = None
|
||
population_seeds_per_acre: int | None = None
|
||
fungicide: str | None = None
|
||
herbicide: str | None = None
|
||
insecticide: str | None = None
|
||
tillage: str | None = None
|
||
irrigation: str | None = None
|
||
results: list[dict] = field(default_factory=list)
|
||
# Verbatim PDF text — populated only when structured row parsing
|
||
# fails (a foreign-format third-party report), so the data is still
|
||
# embedded + retrievable instead of dropped.
|
||
verbatim_text: str = ""
|
||
|
||
|
||
# --------------------------------------------------------------------- enumerate
|
||
|
||
|
||
def fetch_plots_for_year(http: RateLimitedSession, year: int) -> list[dict]:
|
||
r = http.get(f"{PLOTS_API}?y={year}")
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
return data if isinstance(data, list) else []
|
||
|
||
|
||
# --------------------------------------------------------------------- PDF parse
|
||
|
||
|
||
_NUM_RE = re.compile(r"^-?\d+(?:\.\d+)?$")
|
||
|
||
|
||
def _to_num(s: str) -> float | None:
|
||
s = (s or "").strip()
|
||
if not s or not _NUM_RE.match(s):
|
||
return None
|
||
f = float(s)
|
||
return int(f) if f.is_integer() else f
|
||
|
||
|
||
# All header field labels on the plot-management lines. Used as the
|
||
# boundary for value extraction so an EMPTY field (e.g. "Tillage:"
|
||
# with nothing after it) doesn't swallow the next label as its value.
|
||
_HEADER_LABELS = [
|
||
"Company Representative", "Planted", "Harvested", "Previous Crop",
|
||
"Herbicide", "Row Width", "Seeding Rate", "Fungicide", "Fertilizer",
|
||
"Tillage", "Insecticide", "Irrigation", "General Plot Comments",
|
||
]
|
||
_LABEL_BOUNDARY = "|".join(re.escape(l) for l in _HEADER_LABELS)
|
||
|
||
|
||
def _kv(text: str, label: str) -> str | None:
|
||
"""Extract a 'Label: value' field. The value runs until the next
|
||
known label, end of line, or end of text — so an empty field returns
|
||
None instead of capturing the following label."""
|
||
m = re.search(
|
||
rf"{re.escape(label)}:\s*(.*?)\s*(?=(?:{_LABEL_BOUNDARY}):|\n|$)",
|
||
text)
|
||
if not m:
|
||
return None
|
||
v = m.group(1).strip().strip("-").strip()
|
||
# Guard: a value that is itself a known label means the field was empty.
|
||
if not v or v.rstrip(":") in _HEADER_LABELS:
|
||
return None
|
||
return v
|
||
|
||
|
||
def _parse_header(text: str, plot: PHPlot) -> None:
|
||
plot.company_rep = _kv(text, "Company Representative")
|
||
plot.planted_date = _kv(text, "Planted")
|
||
plot.harvested_date = _kv(text, "Harvested")
|
||
plot.previous_crop = _kv(text, "Previous Crop")
|
||
plot.row_width = _kv(text, "Row Width")
|
||
plot.fungicide = _kv(text, "Fungicide")
|
||
plot.herbicide = _kv(text, "Herbicide")
|
||
plot.insecticide = _kv(text, "Insecticide")
|
||
plot.tillage = _kv(text, "Tillage")
|
||
plot.irrigation = _kv(text, "Irrigation")
|
||
sr = _kv(text, "Seeding Rate")
|
||
if sr:
|
||
m = re.search(r"(\d[\d,]*)", sr)
|
||
if m:
|
||
plot.population_seeds_per_acre = int(m.group(1).replace(",", ""))
|
||
|
||
|
||
def _norm_label(s: str) -> str:
|
||
return re.sub(r"\s+", " ", (s or "").strip()).lower().rstrip(".")
|
||
|
||
|
||
# header-label -> our metric key (canonical "Yield" so the chunker's
|
||
# top-N primary-metric picker finds it).
|
||
_COL_MAP = {
|
||
"entry": "_entry",
|
||
"brand": "_brand",
|
||
"hybrid/variety": "_product",
|
||
"variety": "_product",
|
||
"hybrid": "_product",
|
||
"seed trtmt": "_seed_trtmt",
|
||
"% h2o": "% H2O",
|
||
"%h2o": "% H2O",
|
||
"moisture": "% H2O",
|
||
"test wt": "Test Wt.",
|
||
"test weight": "Test Wt.",
|
||
"yield/ac": "Yield",
|
||
"yield/acre": "Yield",
|
||
"yield": "Yield",
|
||
"+/- ave": "+/- Ave",
|
||
"+/-ave": "+/- Ave",
|
||
"yield rank": "_rank",
|
||
"rank": "_rank",
|
||
}
|
||
|
||
|
||
def _parse_results_from_tables(pdf: pdfplumber.PDF) -> list[dict]:
|
||
"""Walk every table on every page; once we see the header row, map
|
||
subsequent digit-led rows by column position (None cells dropped on
|
||
both header + row so they stay parallel)."""
|
||
results: list[dict] = []
|
||
colmap: list[str] | None = None
|
||
for page in pdf.pages:
|
||
for table in page.extract_tables() or []:
|
||
for raw in table:
|
||
cells = [c for c in raw if c is not None]
|
||
cells = [c.replace("\n", " ").strip() if isinstance(c, str) else c
|
||
for c in cells]
|
||
if not cells:
|
||
continue
|
||
labels = [_norm_label(c) for c in cells]
|
||
if "brand" in labels and any(
|
||
l in ("hybrid/variety", "variety", "hybrid") for l in labels):
|
||
colmap = [_COL_MAP.get(l, "") for l in labels]
|
||
continue
|
||
if colmap is None:
|
||
continue
|
||
# data row: first cell must be an integer entry number
|
||
if not cells or not re.match(r"^\d+$", str(cells[0]).strip()):
|
||
continue
|
||
if len(cells) < len(colmap):
|
||
cells = cells + [""] * (len(colmap) - len(cells))
|
||
rec: dict[str, Any] = {}
|
||
metrics: dict[str, Any] = {}
|
||
for key, cell in zip(colmap, cells):
|
||
if not key:
|
||
continue
|
||
val = cell.strip() if isinstance(cell, str) else cell
|
||
if key == "_entry":
|
||
rec["_entry"] = _to_num(val)
|
||
elif key == "_brand":
|
||
rec["brand"] = _strip_check(val) or None
|
||
elif key == "_product":
|
||
rec["_raw_product"] = val or ""
|
||
elif key == "_rank":
|
||
rec["rank"] = _to_num(val)
|
||
elif key == "_seed_trtmt":
|
||
if val:
|
||
metrics["Seed Trtmt."] = val
|
||
else:
|
||
metrics[key] = _to_num(val) if _NUM_RE.match(str(val)) else (val or None)
|
||
# split hybrid + trait off the product cell
|
||
raw_prod = _strip_check(rec.pop("_raw_product", "")).strip()
|
||
parts = raw_prod.split(maxsplit=1)
|
||
rec["product"] = parts[0] if parts else raw_prod
|
||
rec["traits"] = parts[1] if len(parts) > 1 else None
|
||
rec["metrics"] = metrics
|
||
rec.pop("_entry", None)
|
||
if rec.get("product"):
|
||
results.append(rec)
|
||
# sort by yield rank when present, else by yield desc
|
||
def _sortkey(r: dict) -> tuple:
|
||
if isinstance(r.get("rank"), (int, float)):
|
||
return (0, r["rank"])
|
||
y = r.get("metrics", {}).get("Yield")
|
||
return (1, -y if isinstance(y, (int, float)) else 0)
|
||
results.sort(key=_sortkey)
|
||
return results
|
||
|
||
|
||
_NUM_TOKEN = re.compile(r"^-?\d+(?:\.\d+)?$")
|
||
|
||
# Strip a "(check)" / "(check₁)" trial annotation from a brand/product token.
|
||
_CHECK_RE = re.compile(r"\s*\(check[^)]*\)\s*", re.I)
|
||
|
||
# Multi-word seed brands seen in ProHarvest's competitor rows. The naive
|
||
# "first token = brand" split would chop these (e.g. brand "Golden",
|
||
# product "Harvest"), so match the longest known phrase first.
|
||
KNOWN_MULTIWORD_BRANDS = [
|
||
"golden harvest", "seed consultants", "partners brand", "fs invision",
|
||
"sun prairie", "dura crop", "nu tech", "local seed", "prairie brand",
|
||
"great lakes", "viking/blueriver",
|
||
]
|
||
|
||
|
||
def _strip_check(s: str) -> str:
|
||
return _CHECK_RE.sub(" ", s or "").strip()
|
||
|
||
|
||
def _split_brand_product(tokens: list[str]) -> tuple[str, str, str | None]:
|
||
"""From the middle tokens (between entry and the trailing numerics),
|
||
pull brand / product / traits. Honors known multi-word brands."""
|
||
toks = [t for t in tokens if not _CHECK_RE.fullmatch(f"({t.strip('()')})")]
|
||
joined = " ".join(toks)
|
||
low = joined.lower()
|
||
brand_tokens = 1
|
||
for phrase in sorted(KNOWN_MULTIWORD_BRANDS, key=len, reverse=True):
|
||
if low.startswith(phrase + " "):
|
||
brand_tokens = len(phrase.split())
|
||
break
|
||
brand = _strip_check(" ".join(toks[:brand_tokens])) or (toks[0] if toks else "")
|
||
rest = toks[brand_tokens:]
|
||
product = _strip_check(rest[0]) if rest else ""
|
||
traits = " ".join(rest[1:]) or None
|
||
return brand, product, traits
|
||
|
||
|
||
def _row_ok(r: dict) -> bool:
|
||
"""A structurally-sound result row: a real (non-numeric) brand, a
|
||
product code, and a plausible Yield. Used to drop junk rows and to
|
||
decide when a whole plot's parse is too corrupt to trust."""
|
||
brand = (r.get("brand") or "").strip()
|
||
if not brand or brand.isdigit() or len(brand) <= 1:
|
||
return False
|
||
if not (r.get("product") or "").strip():
|
||
return False
|
||
y = r.get("metrics", {}).get("Yield")
|
||
if not isinstance(y, (int, float)) or not (1 < y < 400):
|
||
return False
|
||
rank = r.get("rank")
|
||
if isinstance(rank, (int, float)) and rank > 200: # a yield leaked into rank
|
||
return False
|
||
return True
|
||
|
||
|
||
def _assign_metrics(nums: list[float]) -> dict:
|
||
"""Map a row's trailing numeric run to metric columns, anchored from
|
||
the RIGHT (Yield Rank, +/- Ave, Yield/Ac. are always the last three).
|
||
Optional leading columns vary: soybean reports often drop Test Wt.,
|
||
so a row can carry 3, 4, or 5 numerics:
|
||
5 → % H2O, Test Wt., Yield, +/- Ave, Rank
|
||
4 → % H2O, Yield, +/- Ave, Rank (no Test Wt.)
|
||
3 → Yield, +/- Ave, Rank (no moisture/test wt.)
|
||
"""
|
||
n = len(nums)
|
||
rank = nums[-1]
|
||
ave = nums[-2]
|
||
yld = nums[-3]
|
||
h2o = nums[-4] if n >= 4 else None
|
||
testwt = None
|
||
if n >= 5:
|
||
testwt = nums[-4]
|
||
h2o = nums[-5]
|
||
# Emit in a readable order (Yield is the primary metric the chunker's
|
||
# top-N picker keys on).
|
||
m: dict = {"Yield": yld}
|
||
if h2o is not None:
|
||
m["% H2O"] = h2o
|
||
if testwt is not None:
|
||
m["Test Wt."] = testwt
|
||
m["+/- Ave"] = ave
|
||
return {"rank": int(rank), "metrics": m}
|
||
|
||
|
||
def _parse_results_from_text(text: str) -> list[dict]:
|
||
"""Fallback row parser for PDFs whose tables have no ruling lines
|
||
(pdfplumber returns whole rows as one cell). Anchors on the trailing
|
||
numeric run, which is positionally stable across layouts and column
|
||
counts."""
|
||
results: list[dict] = []
|
||
started = False
|
||
for line in text.splitlines():
|
||
low = line.lower()
|
||
if not started:
|
||
if "brand" in low and ("hybrid" in low or "variety" in low):
|
||
started = True
|
||
continue
|
||
toks = line.split()
|
||
if len(toks) < 5 or not toks[0].isdigit():
|
||
continue
|
||
# trailing run of numeric tokens
|
||
j = len(toks)
|
||
while j > 0 and _NUM_TOKEN.match(toks[j - 1]):
|
||
j -= 1
|
||
nums = [float(t) for t in toks[j:]]
|
||
if len(nums) < 3:
|
||
continue
|
||
# rank must be a whole number; if the last token has a decimal the
|
||
# row is malformed (wrapped) — skip it rather than guess.
|
||
if "." in toks[-1]:
|
||
continue
|
||
mid = toks[1:j] # brand + hybrid + optional trait/trtmt
|
||
if len(mid) < 2:
|
||
continue
|
||
rec = _assign_metrics(nums)
|
||
brand, product, traits = _split_brand_product(mid)
|
||
rec["brand"] = brand
|
||
rec["product"] = product
|
||
rec["traits"] = traits
|
||
results.append(rec)
|
||
results.sort(key=lambda r: r["rank"])
|
||
return results
|
||
|
||
|
||
def parse_pdf(http: RateLimitedSession, plot: PHPlot) -> None:
|
||
r = http.get(plot.pdf_url)
|
||
r.raise_for_status()
|
||
with pdfplumber.open(io.BytesIO(r.content)) as pdf:
|
||
text = "\n".join((p.extract_text() or "") for p in pdf.pages)
|
||
_parse_header(text, plot)
|
||
results = _parse_results_from_tables(pdf)
|
||
if not results:
|
||
# Tables had no ruling lines → parse the verbatim text rows.
|
||
results = _parse_results_from_text(text)
|
||
# Sanity-gate the structured parse. Off-template reports (e.g. a
|
||
# university land-lab with extra RM / harvest-weight columns and a
|
||
# multi-line header) parse into junk rows — numeric brands, a yield
|
||
# leaked into the rank, empty metrics. Drop bad rows; if too few
|
||
# survive, discard the structured parse entirely and keep the
|
||
# verbatim text instead so nothing is silently corrupted.
|
||
good = [r for r in results if _row_ok(r)]
|
||
if good and len(good) >= 2 and len(good) >= 0.6 * len(results):
|
||
plot.results = good
|
||
else:
|
||
plot.results = []
|
||
if not plot.results:
|
||
# Foreign / off-template report — keep the verbatim text so the
|
||
# cross-vendor data isn't lost.
|
||
plot.verbatim_text = text.strip()
|
||
|
||
|
||
# --------------------------------------------------------------------- render
|
||
|
||
|
||
def render_markdown(plot: PHPlot) -> str:
|
||
crop_label = {"corn": "Corn", "soybeans": "Soybean"}.get(plot.crop, plot.crop.title())
|
||
loc = ", ".join(filter(None, [plot.city, plot.state]))
|
||
head: list[str] = [
|
||
f"# {crop_label} yield trial — {plot.title} ({loc}, {plot.year})",
|
||
"",
|
||
"- **Publisher:** ProHarvest Seeds (cross-vendor plot report)",
|
||
f"- **Crop:** {crop_label}",
|
||
f"- **Year:** {plot.year}",
|
||
]
|
||
if not plot.results and plot.verbatim_text:
|
||
# Foreign-format report — emit the management header we did parse,
|
||
# then the verbatim PDF text under the separator the chunker reads.
|
||
if loc:
|
||
head.append(f"- **Location:** {loc}"
|
||
+ (f" · {plot.county} County" if plot.county else ""))
|
||
head += [f"- **Source PDF:** {plot.pdf_url}", "", "---", "",
|
||
"## Trial data (verbatim from PDF)", "", "```",
|
||
plot.verbatim_text, "```", ""]
|
||
return "\n".join(head)
|
||
if loc:
|
||
head.append(f"- **Location:** {loc}"
|
||
+ (f" · {plot.county} County" if plot.county else ""))
|
||
for label, val in [
|
||
("Cooperator", plot.title), ("Company rep", plot.company_rep),
|
||
("Planted", plot.planted_date), ("Harvested", plot.harvested_date),
|
||
("Previous crop", plot.previous_crop), ("Row width", plot.row_width),
|
||
("Population", f"{plot.population_seeds_per_acre:,} seeds/acre"
|
||
if plot.population_seeds_per_acre else None),
|
||
("Tillage", plot.tillage), ("Irrigation", plot.irrigation),
|
||
("Fungicide", plot.fungicide),
|
||
]:
|
||
if val:
|
||
head.append(f"- **{label}:** {val}")
|
||
head += [f"- **Source PDF:** {plot.pdf_url}", "", "---", "",
|
||
"## Results (head-to-head)", "",
|
||
"| Rank | Brand | Hybrid/Variety | Trait | Yield/Ac | % H2O | Test Wt | +/- Ave |",
|
||
"|---|---|---|---|---|---|---|---|"]
|
||
for r in plot.results:
|
||
m = r.get("metrics", {})
|
||
head.append("| {rank} | {brand} | {prod} | {tr} | {y} | {h2o} | {tw} | {ave} |".format(
|
||
rank=r.get("rank", "-"), brand=r.get("brand") or "-",
|
||
prod=r.get("product") or "-", tr=r.get("traits") or "-",
|
||
y=m.get("Yield", "-"), h2o=m.get("% H2O", "-"),
|
||
tw=m.get("Test Wt.", "-"), ave=m.get("+/- Ave", "-")))
|
||
head.append("")
|
||
return "\n".join(head)
|
||
|
||
|
||
def write_plot(plot: PHPlot, body_md: str) -> None:
|
||
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
||
(CORPUS_DIR / f"{plot.source_key}.md").write_text(body_md, encoding="utf-8")
|
||
sidecar = {
|
||
"source": "proharvest_plots",
|
||
"source_key": plot.source_key,
|
||
"data_type": "trial",
|
||
"vendor": "ProHarvest Seeds",
|
||
"brand": "ProHarvest Seeds",
|
||
"crop": plot.crop,
|
||
"state": plot.state,
|
||
"state_abbrev": (plot.state or "").lower() or None,
|
||
"city": plot.city,
|
||
"county": plot.county,
|
||
"year": plot.year,
|
||
"plot_id": plot.plot_id,
|
||
"cooperator": plot.title,
|
||
"latitude": plot.latitude,
|
||
"longitude": plot.longitude,
|
||
"company_representative": plot.company_rep,
|
||
"planted_date": plot.planted_date,
|
||
"harvested_date": plot.harvested_date,
|
||
"previous_crop": plot.previous_crop,
|
||
"row_width": plot.row_width,
|
||
"population_seeds_per_acre": plot.population_seeds_per_acre,
|
||
"fungicide": plot.fungicide,
|
||
"herbicide": plot.herbicide,
|
||
"insecticide": plot.insecticide,
|
||
"tillage": plot.tillage,
|
||
"irrigation": plot.irrigation,
|
||
"results": plot.results,
|
||
"n_results": len(plot.results),
|
||
# True when no structured rows could be parsed and the body holds
|
||
# the verbatim PDF text instead (foreign-format third-party report).
|
||
"raw_text": bool(not plot.results and plot.verbatim_text),
|
||
"source_urls": [plot.pdf_url],
|
||
"fetched_at": datetime.now(timezone.utc).isoformat(),
|
||
"scraper_version": SCRAPER_VERSION,
|
||
}
|
||
(CORPUS_DIR / f"{plot.source_key}.json").write_text(
|
||
json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
|
||
|
||
|
||
# --------------------------------------------------------------------- pipeline
|
||
|
||
|
||
def run(*, years: list[int], limit: int | None, force: bool,
|
||
only_plot: str | None) -> int:
|
||
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
||
http = RateLimitedSession()
|
||
counts = {"written": 0, "verbatim": 0, "skipped": 0, "image_skip": 0, "failed": 0}
|
||
processed = 0
|
||
|
||
for year in years:
|
||
try:
|
||
plots = fetch_plots_for_year(http, year)
|
||
except requests.HTTPError as exc:
|
||
log.error("year %s enumeration failed: %s", year, exc)
|
||
continue
|
||
log.info("year %s: %d plots", year, len(plots))
|
||
for pj in plots:
|
||
if limit is not None and processed >= limit:
|
||
break
|
||
product = str(pj.get("product") or "").strip().lower()
|
||
crop = PRODUCT_TO_CROP.get(product)
|
||
if not crop:
|
||
continue # skip non-row-crop products if any appear
|
||
pid = pj.get("id")
|
||
source_key = f"phpr-{crop}-{year}-{pid}"
|
||
if only_plot and source_key != only_plot and str(pid) != only_plot:
|
||
continue
|
||
processed += 1
|
||
md_path = CORPUS_DIR / f"{source_key}.md"
|
||
if md_path.exists() and not force:
|
||
counts["skipped"] += 1
|
||
log.info("[%d] %s skipped", processed, source_key)
|
||
continue
|
||
pdf_url = pj.get("file") or ""
|
||
if not pdf_url:
|
||
log.warning("%s has no PDF file — skipping", source_key)
|
||
continue
|
||
plot = PHPlot(
|
||
source_key=source_key, plot_id=int(pid), crop=crop, year=int(year),
|
||
title=(pj.get("title") or "").strip(),
|
||
city=(pj.get("city") or "").strip() or None,
|
||
state=(pj.get("state") or "").strip() or None,
|
||
county=(pj.get("county") or "").strip() or None,
|
||
latitude=pj.get("latitude"), longitude=pj.get("longitude"),
|
||
pdf_url=pdf_url)
|
||
try:
|
||
parse_pdf(http, plot)
|
||
except Exception as exc: # PDF parse is best-effort
|
||
counts["failed"] += 1
|
||
log.error("[%d] %s PDF parse failed: %s", processed, source_key, exc)
|
||
continue
|
||
has_text_table = (
|
||
len(plot.verbatim_text) >= 300
|
||
and len(re.findall(r"\d", plot.verbatim_text)) >= 30)
|
||
if not plot.results and not has_text_table:
|
||
# No structured rows AND no real text layer with numbers →
|
||
# image-only / unparseable PDF. Skip, but count it (no
|
||
# silent cap). (Column headers vary — e.g. "Bu/Acre" vs
|
||
# "Yield" — so we gate on digit density, not a keyword.)
|
||
counts["image_skip"] += 1
|
||
log.warning("[%d] %s — no rows + no data text (image PDF?); skipping",
|
||
processed, source_key)
|
||
continue
|
||
write_plot(plot, render_markdown(plot))
|
||
if plot.results:
|
||
counts["written"] += 1
|
||
log.info("[%d] %s written | %s %s, %s | %d results",
|
||
processed, source_key, plot.crop, plot.state, plot.year,
|
||
len(plot.results))
|
||
else:
|
||
counts["verbatim"] += 1
|
||
log.info("[%d] %s written VERBATIM (foreign-format) | %s %s, %s | %d chars",
|
||
processed, source_key, plot.crop, plot.state, plot.year,
|
||
len(plot.verbatim_text))
|
||
if limit is not None and processed >= limit:
|
||
break
|
||
|
||
log.info("done: processed=%d written(structured)=%d written(verbatim)=%d "
|
||
"skipped=%d image_skip=%d failed=%d",
|
||
processed, counts["written"], counts["verbatim"],
|
||
counts["skipped"], counts["image_skip"], counts["failed"])
|
||
return 0
|
||
|
||
|
||
# --------------------------------------------------------------------- CLI
|
||
|
||
|
||
def _build_argparser() -> argparse.ArgumentParser:
|
||
p = argparse.ArgumentParser(
|
||
prog="scrape.sources.proharvest_plots",
|
||
description="Scrape ProHarvest Seeds plot reports (cross-vendor yield "
|
||
"trials) via the proharvest/v1/plots API + harvest-report PDFs.")
|
||
p.add_argument("--year", type=int, default=None,
|
||
help="Scrape a single year (default: 2024+2025 baseline).")
|
||
p.add_argument("--include-old", action="store_true",
|
||
help="Also scrape 2015–2023 (deferred by default).")
|
||
p.add_argument("--limit", type=int, default=None,
|
||
help="Stop after processing N plots (default: all).")
|
||
p.add_argument("--force", action="store_true",
|
||
help="Re-fetch even if the markdown file already exists.")
|
||
p.add_argument("--plot", default=None,
|
||
help="Process a single plot by source_key or plot id.")
|
||
p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"))
|
||
return p
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
args = _build_argparser().parse_args(argv)
|
||
logging.basicConfig(
|
||
level=args.log_level.upper(),
|
||
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
||
stream=sys.stderr)
|
||
if args.year is not None:
|
||
years = [args.year]
|
||
elif args.include_old:
|
||
years = OLD_YEARS + BASELINE_YEARS
|
||
else:
|
||
years = BASELINE_YEARS
|
||
return run(years=years, limit=args.limit, force=args.force,
|
||
only_plot=args.plot)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|