Add ProHarvest Seeds: 119 varieties + 161 cross-vendor plot reports (#16)
Image rebuild (skip scrape) / build (push) Successful in 5m46s

Co-authored-by: claude <claude@jpaul.io>
Co-committed-by: claude <claude@jpaul.io>
This commit was merged in pull request #16.
This commit is contained in:
2026-06-04 21:05:30 -04:00
committed by Claude (agent)
parent e356633d4f
commit 22e8092faf
567 changed files with 80023 additions and 8 deletions
+546
View File
@@ -0,0 +1,546 @@
"""ProHarvest Seeds scraper — independent regional brand (Hindsboro, IL).
Source: ``proharvestseeds.com`` — WordPress site exposing a public,
no-auth REST API. robots.txt is permissive (only ``/?s=``, ``/search/``,
``/dealer-files/*``, ``/dealer-section/*`` disallowed; the catalog +
``/wp-json/`` are open). Independent family-owned seed company; corn /
soybeans / wheat (plus forage / cover-crop lines that are out of scope
for the row-crop advisor).
Two-step ingestion:
1. **Enumerate** via the WP REST API. ``/wp/v2/seed`` is the variety
custom-post-type; ``/wp/v2/seed-type`` is the crop taxonomy. We pull
every variety whose seed-type is one of the row-crop terms
(corn-hybrid / soybean / wheat) — ignoring alfalfa / forage / grass /
cover-crop / sweet-corn terms. The REST payload gives the canonical
id / slug / title / permalink, but ``acf`` and ``content`` are NOT
registered to REST (both come back empty), so the ratings have to
come from the detail page.
2. **Parse the detail page.** Each ``/seed/<slug>/`` page server-renders
the agronomic data as ``<h2>`` spec sections, each a flat run of
``<strong>label</strong><div>value</div>`` pairs (General
Characteristics / Agronomic Features / Disease Tolerance / Soil
Adaptability / Nitrogen Application/Timing / Recommended Seeding
Rates). The relative maturity sits in an ``<h1>Maturity: 111
Days</h1>`` heading.
Rating scales are **mixed** and preserved verbatim (the chunker never
fabricates a value):
- Disease Tolerance: **1-9 numeric** (9 = best / most tolerant, per
industry norm; ``NA`` = not rated). Direction is the same as
Bayer/NK so no flip is needed.
- General Characteristics / Agronomic Features: **qualitative**
(Excellent / Very Good / Good / Average / …) with a few raw numerics
(GDD, Kernel Rows).
- Soil Adaptability: ``HR`` (highly recommended) / ``R`` (recommended)
/ etc.
Unlike the Ebbert's scraper (which left ``characteristics_groups`` empty
and relied on a verbatim body), we parse the spec sections into
structured ``characteristics_groups`` so the qualitative + numeric
ratings land in the embedded chunk and are actually retrievable.
Output:
corpus/proharvest/<source_key>.md
corpus/proharvest/<source_key>.json
source_key: ``proharvest-<slug>`` lowercased, e.g. ``proharvest-81p11``.
CLI:
python -m scrape.sources.proharvest --crop corn --limit 5
python -m scrape.sources.proharvest --force
python -m scrape.sources.proharvest --product proharvest-81p11
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import random
import re
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import requests
from bs4 import BeautifulSoup, NavigableString, Tag
SCRAPER_VERSION = "0.1.0"
USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)"
BASE = "https://proharvestseeds.com"
WP = f"{BASE}/wp-json/wp/v2"
# seed-type taxonomy slug -> chunker crop value. The chunker keys on
# "soybeans" (plural) for the MG branch, so map accordingly. Everything
# not listed here (alfalfa / forage / grass / cover-crop / sweet-corn /
# blends) is out of scope for the row-crop advisor.
CROP_TYPES = {
"corn-hybrid": "corn",
"soybean": "soybeans",
"wheat": "wheat",
}
# robots.txt declares no Crawl-delay for "*", but we stay polite — the
# row-crop catalog is only ~120 detail pages.
REQ_INTERVAL_SEC = 1.5
RATING_SCALE_DIRECTION = (
"disease 1-9, 9=best/most-tolerant, NA=not rated; "
"agronomic/general qualitative (Excellent/Very Good/Good/Average); "
"soil HR=highly recommended/R=recommended"
)
# Detail-page <h2> spec sections we extract, in display order. The
# value maps the page header to a characteristics_groups label the
# chunker buckets: DISEASE RATINGS -> disease framing, AGRONOMIC
# CHARACTERISTICS -> agronomic framing; the rest pass through verbatim
# as their own titled section (still embedded + retrievable).
SPEC_SECTIONS = {
"General Characteristics": "GENERAL CHARACTERISTICS",
"Agronomic Features": "AGRONOMIC CHARACTERISTICS",
"Disease Tolerance": "DISEASE RATINGS",
"Soil Adaptability": "SOIL ADAPTABILITY",
"Nitrogen Application/Timing": "NITROGEN APPLICATION/TIMING",
"Recommended Seeding Rates": "RECOMMENDED SEEDING RATES",
}
REPO_ROOT = Path(__file__).resolve().parents[2]
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
CORPUS_DIR = CORPUS_ROOT / "proharvest"
log = logging.getLogger("scrape.proharvest")
# --------------------------------------------------------------------- HTTP
class RateLimitedSession:
"""Polite session with backoff. ProHarvest's row-crop catalog is
small (~120 detail pages) so 1.5 s/req still finishes in a few min."""
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
self.s = requests.Session()
self.s.headers["User-Agent"] = USER_AGENT
self.interval = interval
self._last = 0.0
def _wait(self) -> None:
delta = time.monotonic() - self._last
if delta < self.interval:
time.sleep(self.interval - delta)
self._last = time.monotonic()
def request(self, method: str, url: str, *, max_retries: int = 4,
timeout: float = 30.0, **kw: Any) -> requests.Response:
last_exc: Exception | None = None
for attempt in range(max_retries):
self._wait()
try:
resp = self.s.request(method, url, timeout=timeout, **kw)
except requests.RequestException as exc:
last_exc = exc
backoff = min(30.0, (2 ** attempt) + random.random())
log.warning("network error on %s %s: %s — retry in %.1fs",
method, url, exc, backoff)
time.sleep(backoff)
continue
if resp.status_code == 429 or 500 <= resp.status_code < 600:
ra = resp.headers.get("Retry-After")
backoff = float(ra) if (ra and ra.isdigit()) else min(
30.0, (2 ** attempt) + random.random())
log.warning("HTTP %d on %s %s — retry in %.1fs",
resp.status_code, method, url, backoff)
time.sleep(backoff)
continue
return resp
if last_exc:
raise last_exc
return resp # type: ignore[return-value]
def get(self, url: str, **kw: Any) -> requests.Response:
return self.request("GET", url, **kw)
def get_json(self, url: str, **kw: Any) -> Any:
r = self.get(url, **kw)
r.raise_for_status()
return r.json()
# --------------------------------------------------------------------- model
@dataclass
class PHVariety:
source_key: str
source_url: str
crop: str # chunker value: corn / soybeans / wheat
product_name: str = "" # "81P11"
relative_maturity: int | None = None # corn (days)
maturity_group: float | None = None # soy
wheat_maturity: str | None = None # wheat qualitative
trait_stack: list[str] = field(default_factory=list)
positioning: str | None = None
# [{label, items:[{characteristic, value}]}] — chunker source of truth
groups: list[dict] = field(default_factory=list)
# --------------------------------------------------------------------- discovery (REST)
def _taxonomy_map(http: RateLimitedSession, taxonomy: str) -> dict[int, str]:
"""term_id -> name for a WP taxonomy (paged)."""
out: dict[int, str] = {}
page = 1
while True:
url = f"{WP}/{taxonomy}?per_page=100&page={page}&_fields=id,name,slug"
r = http.get(url)
if r.status_code == 400: # past last page
break
r.raise_for_status()
terms = r.json()
if not terms:
break
for t in terms:
out[t["id"]] = t.get("name") or t.get("slug") or str(t["id"])
if len(terms) < 100:
break
page += 1
return out
def _type_slug_to_id(http: RateLimitedSession) -> dict[str, int]:
out: dict[str, int] = {}
for t in http.get_json(f"{WP}/seed-type?per_page=100&_fields=id,slug"):
out[t["slug"]] = t["id"]
return out
def discover(http: RateLimitedSession, *, only_crop: str | None) -> list[dict]:
"""Return REST seed records for the in-scope row crops."""
type_ids = _type_slug_to_id(http)
records: list[dict] = []
for type_slug, crop in CROP_TYPES.items():
if only_crop and crop != only_crop:
continue
tid = type_ids.get(type_slug)
if tid is None:
log.warning("seed-type %r not found in taxonomy — skipping", type_slug)
continue
page = 1
while True:
url = (f"{WP}/seed?seed-type={tid}&per_page=100&page={page}"
"&_fields=id,slug,title,link,seed-trait")
r = http.get(url)
if r.status_code == 400:
break
r.raise_for_status()
batch = r.json()
if not batch:
break
for s in batch:
s["_crop"] = crop
records.append(s)
if len(batch) < 100:
break
page += 1
log.info("seed-type %-12s (%s): cumulative %d", type_slug, crop, len(records))
return records
# --------------------------------------------------------------------- detail parse
_MATURITY_RE = re.compile(r"([0-9]+(?:\.[0-9]+)?)")
def _clean(s: str) -> str:
return re.sub(r"\s+", " ", s or "").strip()
def _direct_text(el: Tag) -> str:
return _clean("".join(c for c in el.children if isinstance(c, NavigableString)))
def _parse_maturity(soup: BeautifulSoup, crop: str) -> tuple[int | None, float | None, str | None]:
"""Pull RM (corn) / MG (soy) / qualitative (wheat) from the
'Maturity: …' heading. Returns (rm, mg, wheat_maturity)."""
head = None
for h in soup.find_all(["h1", "h2", "h3"]):
txt = h.get_text(" ", strip=True)
if re.match(r"^Maturity\b", txt, re.I):
head = txt
break
if not head:
return None, None, None
m = _MATURITY_RE.search(head)
if crop == "corn":
return (int(float(m.group(1))) if m else None), None, None
if crop == "soybeans":
return None, (float(m.group(1)) if m else None), None
# wheat — keep the qualitative phrase after "Maturity:"
val = head.split(":", 1)[1].strip() if ":" in head else head
return None, None, (val or None)
def _parse_groups(soup: BeautifulSoup) -> list[dict]:
"""Parse each known spec <h2> into a {label, items:[{characteristic,
value}]} group. Each section is a flat run of
<strong>label</strong><div>value</div> pairs up to the next <h2>."""
groups: list[dict] = []
h2s = soup.find_all("h2")
for h2 in h2s:
header = _clean(h2.get_text(" ", strip=True))
label = SPEC_SECTIONS.get(header)
if not label:
continue
# Collect (tag, text) for strong/div leaves until the next <h2>.
seq: list[tuple[str, str]] = []
for el in h2.find_all_next():
if el.name == "h2":
break
if not isinstance(el, Tag):
continue
if el.name == "strong":
t = _clean(el.get_text(" ", strip=True))
if t:
seq.append(("k", t))
elif el.name == "div":
t = _direct_text(el)
if t:
seq.append(("v", t))
# Pair adjacent key->value. A key with no following value (or two
# keys in a row) keeps an em-dash placeholder so nothing silently
# drops.
items: list[dict] = []
i = 0
while i < len(seq):
kind, text = seq[i]
if kind == "k":
value = ""
if i + 1 < len(seq) and seq[i + 1][0] == "v":
value = seq[i + 1][1]
i += 1
items.append({"characteristic": text, "value": value})
i += 1
if items:
groups.append({"label": label, "items": items})
return groups
def _parse_positioning(soup: BeautifulSoup) -> str | None:
"""First substantive paragraph after the variety <h1>, before the
first spec <h2>. Best-effort marketing/positioning blurb."""
h1 = soup.find("h1")
if not h1:
return None
for el in h1.find_all_next():
if el.name == "h2":
break
if isinstance(el, Tag) and el.name == "p":
t = _clean(el.get_text(" ", strip=True))
if len(t) >= 40:
return t
return None
def parse_detail(http: RateLimitedSession, rec: dict,
trait_names: dict[int, str]) -> PHVariety:
crop = rec["_crop"]
slug = rec["slug"]
url = rec.get("link") or f"{BASE}/seed/{slug}/"
name = _clean((rec.get("title") or {}).get("rendered", "")) or slug.upper()
r = http.get(url)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
rm, mg, wheat_mat = _parse_maturity(soup, crop)
groups = _parse_groups(soup)
positioning = _parse_positioning(soup)
traits = [trait_names[t] for t in (rec.get("seed-trait") or []) if t in trait_names]
return PHVariety(
source_key=f"proharvest-{slug.lower()}",
source_url=url,
crop=crop,
product_name=name,
relative_maturity=rm,
maturity_group=mg,
wheat_maturity=wheat_mat,
trait_stack=traits,
positioning=positioning,
groups=groups,
)
# --------------------------------------------------------------------- render
def render_markdown(v: PHVariety) -> str:
crop_label = {"corn": "Corn", "soybeans": "Soybeans",
"wheat": "Wheat"}.get(v.crop, v.crop.title())
head: list[str] = [
f"# {v.product_name}",
"",
"- **Vendor:** ProHarvest Seeds (independent regional brand)",
"- **Brand:** ProHarvest Seeds",
f"- **Crop:** {crop_label}",
]
if v.crop == "corn" and v.relative_maturity is not None:
head.append(f"- **Relative maturity:** {v.relative_maturity} days")
if v.crop == "soybeans" and v.maturity_group is not None:
head.append(f"- **Maturity group:** {v.maturity_group}")
if v.crop == "wheat" and v.wheat_maturity:
head.append(f"- **Maturity:** {v.wheat_maturity}")
if v.trait_stack:
head.append(f"- **Trait(s):** {', '.join(v.trait_stack)}")
head.append(f"- **Source:** {v.source_url}")
head.append(f"- **Rating scale:** {RATING_SCALE_DIRECTION}")
head.append("- **Service area:** Independent dealer network — Eastern/Central Corn Belt (IL/IN/OH/MO/IA/KS/NE)")
head.append("")
if v.positioning:
head += ["---", "", f"_{v.positioning}_", ""]
head += ["---", ""]
for g in v.groups:
head.append(f"## {g['label'].title()}")
head.append("")
for it in g["items"]:
ch = it["characteristic"]
val = it["value"] or ""
head.append(f"- **{ch}:** {val}")
head.append("")
return "\n".join(head)
def write_variety(v: PHVariety, body_md: str) -> None:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
(CORPUS_DIR / f"{v.source_key}.md").write_text(body_md, encoding="utf-8")
sidecar = {
"source": "proharvest",
"source_key": v.source_key,
"vendor": "ProHarvest Seeds",
"brand": "ProHarvest Seeds",
"product_name": v.product_name,
"product_id": v.product_name,
"crop": v.crop,
"release_year": None,
"relative_maturity": v.relative_maturity,
"maturity_group": v.maturity_group,
# Wheat maturity is qualitative; stash it where the chunker reads
# the wheat "Maturity" fact from (relative_maturity), as a string.
"wheat_class": None,
"trait_stack": v.trait_stack,
"trait_descriptions": [],
"positioning_statement": v.positioning,
"strengths": [],
"characteristics_groups": v.groups,
"_scale_direction": RATING_SCALE_DIRECTION,
"regional_recommendations": [
{"product_list_name": "ProHarvest dealer network (Eastern/Central Corn Belt — IL/IN/OH/MO/IA/KS/NE)",
"agronomist": None, "agronomist_email": None, "variant_id": None},
],
"image_url": None,
"source_urls": [v.source_url],
"sitemap_last_modified": None,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"scraper_version": SCRAPER_VERSION,
}
# For wheat, surface the qualitative maturity through relative_maturity
# so the chunker's wheat "Maturity {rm}" branch renders it.
if v.crop == "wheat" and v.wheat_maturity:
sidecar["relative_maturity"] = v.wheat_maturity
(CORPUS_DIR / f"{v.source_key}.json").write_text(
json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
# --------------------------------------------------------------------- pipeline
def run(*, limit: int | None, force: bool,
only_crop: str | None, only_product: str | None) -> int:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
http = RateLimitedSession()
trait_names = _taxonomy_map(http, "seed-trait")
records = discover(http, only_crop=only_crop)
if only_product:
key = only_product.lower()
records = [r for r in records
if f"proharvest-{r['slug'].lower()}" == key
or r["slug"].lower() == key]
if not records:
log.error("no variety matched --product=%s", only_product)
return 2
counts = {"written": 0, "skipped": 0, "empty": 0}
processed = 0
for rec in records:
if limit is not None and processed >= limit:
break
processed += 1
source_key = f"proharvest-{rec['slug'].lower()}"
md_path = CORPUS_DIR / f"{source_key}.md"
if md_path.exists() and not force:
counts["skipped"] += 1
log.info("[%d/%d] %s skipped", processed, len(records), source_key)
continue
try:
v = parse_detail(http, rec, trait_names)
except requests.HTTPError as exc:
log.error("[%d/%d] %s detail fetch failed: %s",
processed, len(records), source_key, exc)
continue
if not v.groups:
counts["empty"] += 1
log.warning("[%d/%d] %s — no spec groups parsed (still writing identity)",
processed, len(records), source_key)
write_variety(v, render_markdown(v))
counts["written"] += 1
log.info("[%d/%d] %s written | crop=%s rm/mg=%s groups=%d traits=%s",
processed, len(records), source_key, v.crop,
v.relative_maturity or v.maturity_group or v.wheat_maturity or "-",
len(v.groups), ",".join(v.trait_stack) or "-")
log.info("done: processed=%d written=%d skipped=%d empty_groups=%d (of %d)",
processed, counts["written"], counts["skipped"], counts["empty"], len(records))
return 0
# --------------------------------------------------------------------- CLI
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="scrape.sources.proharvest",
description="Scrape ProHarvest Seeds (independent Corn Belt brand) — "
"corn / soybeans / wheat via the WP REST API + detail pages.")
p.add_argument("--limit", type=int, default=None,
help="Stop after processing N varieties (default: all).")
p.add_argument("--force", action="store_true",
help="Re-fetch even if the markdown file already exists.")
p.add_argument("--crop", default=None, choices=sorted(set(CROP_TYPES.values())),
help="Limit to one crop (corn / soybeans / wheat).")
p.add_argument("--product", default=None,
help="Process a single variety by source_key or slug.")
p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"))
return p
def main(argv: list[str] | None = None) -> int:
args = _build_argparser().parse_args(argv)
logging.basicConfig(
level=args.log_level.upper(),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr)
return run(limit=args.limit, force=args.force,
only_crop=args.crop, only_product=args.product)
if __name__ == "__main__":
sys.exit(main())
+700
View File
@@ -0,0 +1,700 @@
"""ProHarvest Seeds plot reports — cross-vendor yield trials (data_type=trial).
Source: ``proharvestseeds.com`` exposes a public, no-auth custom REST
endpoint that the site's plot map calls:
GET /wp-json/proharvest/v1/plots?y=<year>
It returns one object per plot for that harvest year with
``{id, title, city, state, county, year, latitude, longitude, file,
product}`` — where ``file`` is the harvest-report **PDF** and ``product``
is ``Corn`` / ``Soybean``. ``/wp-json/proharvest/v1/latest-plot-year``
returns the newest year (currently 2025). Years span 20152025.
The API gives clean location metadata; the PDF carries the plot
management block + the head-to-head results table:
Entry | Brand | Hybrid/Variety | Seed Trtmt. | % H2O | Test Wt. |
Yield/Ac. | +/- Ave | Yield Rank
Plot types (Focus / Strip / Third Party / Other) include ProHarvest-only
strip trials AND third-party cross-vendor comparisons, so a single report
can rank ProHarvest hybrids against DEKALB / Pioneer / etc. — the same
value class as the Golden Harvest / LG / AgriGold plot reports already in
the corpus.
We emit the **same sidecar shape** as ``agrigold_plot_reports`` /
``lg_plot_reports`` / ``gh_plot_reports`` (``results: [{rank, brand,
product, traits, metrics}]``) so the trial chunker's shared
``_render_gh_plot_chunk`` renderer handles it — ``proharvest_plots`` is
added to that renderer's source list in ``rag/chunk.py``.
Scope: 2024 + 2025 baseline (most recent = most relevant for current
decisions). Older years (20152023) deferred behind ``--include-old``,
mirroring how the other trial sources staged 2023.
Output:
corpus/proharvest_plots/<source_key>.md
corpus/proharvest_plots/<source_key>.json
source_key: ``phpr-<crop>-<year>-<plot_id>``, e.g. ``phpr-corn-2025-1234``.
CLI:
python -m scrape.sources.proharvest_plots --year 2025 --limit 3
python -m scrape.sources.proharvest_plots --force
python -m scrape.sources.proharvest_plots --include-old --force
"""
from __future__ import annotations
import argparse
import io
import json
import logging
import os
import random
import re
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import pdfplumber
import requests
SCRAPER_VERSION = "0.1.0"
USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)"
BASE = "https://proharvestseeds.com"
PLOTS_API = f"{BASE}/wp-json/proharvest/v1/plots"
LATEST_YEAR_API = f"{BASE}/wp-json/proharvest/v1/latest-plot-year"
BASELINE_YEARS = [2024, 2025]
OLD_YEARS = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023]
PRODUCT_TO_CROP = {"corn": "corn", "soybean": "soybeans", "soybeans": "soybeans"}
REQ_INTERVAL_SEC = 1.5
REPO_ROOT = Path(__file__).resolve().parents[2]
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
CORPUS_DIR = CORPUS_ROOT / "proharvest_plots"
log = logging.getLogger("scrape.proharvest_plots")
# --------------------------------------------------------------------- HTTP
class RateLimitedSession:
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
self.s = requests.Session()
self.s.headers["User-Agent"] = USER_AGENT
self.interval = interval
self._last = 0.0
def _wait(self) -> None:
delta = time.monotonic() - self._last
if delta < self.interval:
time.sleep(self.interval - delta)
self._last = time.monotonic()
def request(self, method: str, url: str, *, max_retries: int = 4,
timeout: float = 45.0, **kw: Any) -> requests.Response:
last_exc: Exception | None = None
for attempt in range(max_retries):
self._wait()
try:
resp = self.s.request(method, url, timeout=timeout, **kw)
except requests.RequestException as exc:
last_exc = exc
backoff = min(30.0, (2 ** attempt) + random.random())
log.warning("network error on %s %s: %s — retry in %.1fs",
method, url, exc, backoff)
time.sleep(backoff)
continue
if resp.status_code == 429 or 500 <= resp.status_code < 600:
ra = resp.headers.get("Retry-After")
backoff = float(ra) if (ra and ra.isdigit()) else min(
30.0, (2 ** attempt) + random.random())
log.warning("HTTP %d on %s %s — retry in %.1fs",
resp.status_code, method, url, backoff)
time.sleep(backoff)
continue
return resp
if last_exc:
raise last_exc
return resp # type: ignore[return-value]
def get(self, url: str, **kw: Any) -> requests.Response:
return self.request("GET", url, **kw)
# --------------------------------------------------------------------- model
@dataclass
class PHPlot:
source_key: str
plot_id: int
crop: str
year: int
title: str # cooperator / plot name
city: str | None = None
state: str | None = None
county: str | None = None
latitude: float | None = None
longitude: float | None = None
pdf_url: str = ""
# plot management block (from the PDF header)
company_rep: str | None = None
planted_date: str | None = None
harvested_date: str | None = None
previous_crop: str | None = None
row_width: str | None = None
population_seeds_per_acre: int | None = None
fungicide: str | None = None
herbicide: str | None = None
insecticide: str | None = None
tillage: str | None = None
irrigation: str | None = None
results: list[dict] = field(default_factory=list)
# Verbatim PDF text — populated only when structured row parsing
# fails (a foreign-format third-party report), so the data is still
# embedded + retrievable instead of dropped.
verbatim_text: str = ""
# --------------------------------------------------------------------- enumerate
def fetch_plots_for_year(http: RateLimitedSession, year: int) -> list[dict]:
r = http.get(f"{PLOTS_API}?y={year}")
r.raise_for_status()
data = r.json()
return data if isinstance(data, list) else []
# --------------------------------------------------------------------- PDF parse
_NUM_RE = re.compile(r"^-?\d+(?:\.\d+)?$")
def _to_num(s: str) -> float | None:
s = (s or "").strip()
if not s or not _NUM_RE.match(s):
return None
f = float(s)
return int(f) if f.is_integer() else f
# All header field labels on the plot-management lines. Used as the
# boundary for value extraction so an EMPTY field (e.g. "Tillage:"
# with nothing after it) doesn't swallow the next label as its value.
_HEADER_LABELS = [
"Company Representative", "Planted", "Harvested", "Previous Crop",
"Herbicide", "Row Width", "Seeding Rate", "Fungicide", "Fertilizer",
"Tillage", "Insecticide", "Irrigation", "General Plot Comments",
]
_LABEL_BOUNDARY = "|".join(re.escape(l) for l in _HEADER_LABELS)
def _kv(text: str, label: str) -> str | None:
"""Extract a 'Label: value' field. The value runs until the next
known label, end of line, or end of text — so an empty field returns
None instead of capturing the following label."""
m = re.search(
rf"{re.escape(label)}:\s*(.*?)\s*(?=(?:{_LABEL_BOUNDARY}):|\n|$)",
text)
if not m:
return None
v = m.group(1).strip().strip("-").strip()
# Guard: a value that is itself a known label means the field was empty.
if not v or v.rstrip(":") in _HEADER_LABELS:
return None
return v
def _parse_header(text: str, plot: PHPlot) -> None:
plot.company_rep = _kv(text, "Company Representative")
plot.planted_date = _kv(text, "Planted")
plot.harvested_date = _kv(text, "Harvested")
plot.previous_crop = _kv(text, "Previous Crop")
plot.row_width = _kv(text, "Row Width")
plot.fungicide = _kv(text, "Fungicide")
plot.herbicide = _kv(text, "Herbicide")
plot.insecticide = _kv(text, "Insecticide")
plot.tillage = _kv(text, "Tillage")
plot.irrigation = _kv(text, "Irrigation")
sr = _kv(text, "Seeding Rate")
if sr:
m = re.search(r"(\d[\d,]*)", sr)
if m:
plot.population_seeds_per_acre = int(m.group(1).replace(",", ""))
def _norm_label(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip()).lower().rstrip(".")
# header-label -> our metric key (canonical "Yield" so the chunker's
# top-N primary-metric picker finds it).
_COL_MAP = {
"entry": "_entry",
"brand": "_brand",
"hybrid/variety": "_product",
"variety": "_product",
"hybrid": "_product",
"seed trtmt": "_seed_trtmt",
"% h2o": "% H2O",
"%h2o": "% H2O",
"moisture": "% H2O",
"test wt": "Test Wt.",
"test weight": "Test Wt.",
"yield/ac": "Yield",
"yield/acre": "Yield",
"yield": "Yield",
"+/- ave": "+/- Ave",
"+/-ave": "+/- Ave",
"yield rank": "_rank",
"rank": "_rank",
}
def _parse_results_from_tables(pdf: pdfplumber.PDF) -> list[dict]:
"""Walk every table on every page; once we see the header row, map
subsequent digit-led rows by column position (None cells dropped on
both header + row so they stay parallel)."""
results: list[dict] = []
colmap: list[str] | None = None
for page in pdf.pages:
for table in page.extract_tables() or []:
for raw in table:
cells = [c for c in raw if c is not None]
cells = [c.replace("\n", " ").strip() if isinstance(c, str) else c
for c in cells]
if not cells:
continue
labels = [_norm_label(c) for c in cells]
if "brand" in labels and any(
l in ("hybrid/variety", "variety", "hybrid") for l in labels):
colmap = [_COL_MAP.get(l, "") for l in labels]
continue
if colmap is None:
continue
# data row: first cell must be an integer entry number
if not cells or not re.match(r"^\d+$", str(cells[0]).strip()):
continue
if len(cells) < len(colmap):
cells = cells + [""] * (len(colmap) - len(cells))
rec: dict[str, Any] = {}
metrics: dict[str, Any] = {}
for key, cell in zip(colmap, cells):
if not key:
continue
val = cell.strip() if isinstance(cell, str) else cell
if key == "_entry":
rec["_entry"] = _to_num(val)
elif key == "_brand":
rec["brand"] = _strip_check(val) or None
elif key == "_product":
rec["_raw_product"] = val or ""
elif key == "_rank":
rec["rank"] = _to_num(val)
elif key == "_seed_trtmt":
if val:
metrics["Seed Trtmt."] = val
else:
metrics[key] = _to_num(val) if _NUM_RE.match(str(val)) else (val or None)
# split hybrid + trait off the product cell
raw_prod = _strip_check(rec.pop("_raw_product", "")).strip()
parts = raw_prod.split(maxsplit=1)
rec["product"] = parts[0] if parts else raw_prod
rec["traits"] = parts[1] if len(parts) > 1 else None
rec["metrics"] = metrics
rec.pop("_entry", None)
if rec.get("product"):
results.append(rec)
# sort by yield rank when present, else by yield desc
def _sortkey(r: dict) -> tuple:
if isinstance(r.get("rank"), (int, float)):
return (0, r["rank"])
y = r.get("metrics", {}).get("Yield")
return (1, -y if isinstance(y, (int, float)) else 0)
results.sort(key=_sortkey)
return results
_NUM_TOKEN = re.compile(r"^-?\d+(?:\.\d+)?$")
# Strip a "(check)" / "(check₁)" trial annotation from a brand/product token.
_CHECK_RE = re.compile(r"\s*\(check[^)]*\)\s*", re.I)
# Multi-word seed brands seen in ProHarvest's competitor rows. The naive
# "first token = brand" split would chop these (e.g. brand "Golden",
# product "Harvest"), so match the longest known phrase first.
KNOWN_MULTIWORD_BRANDS = [
"golden harvest", "seed consultants", "partners brand", "fs invision",
"sun prairie", "dura crop", "nu tech", "local seed", "prairie brand",
"great lakes", "viking/blueriver",
]
def _strip_check(s: str) -> str:
return _CHECK_RE.sub(" ", s or "").strip()
def _split_brand_product(tokens: list[str]) -> tuple[str, str, str | None]:
"""From the middle tokens (between entry and the trailing numerics),
pull brand / product / traits. Honors known multi-word brands."""
toks = [t for t in tokens if not _CHECK_RE.fullmatch(f"({t.strip('()')})")]
joined = " ".join(toks)
low = joined.lower()
brand_tokens = 1
for phrase in sorted(KNOWN_MULTIWORD_BRANDS, key=len, reverse=True):
if low.startswith(phrase + " "):
brand_tokens = len(phrase.split())
break
brand = _strip_check(" ".join(toks[:brand_tokens])) or (toks[0] if toks else "")
rest = toks[brand_tokens:]
product = _strip_check(rest[0]) if rest else ""
traits = " ".join(rest[1:]) or None
return brand, product, traits
def _row_ok(r: dict) -> bool:
"""A structurally-sound result row: a real (non-numeric) brand, a
product code, and a plausible Yield. Used to drop junk rows and to
decide when a whole plot's parse is too corrupt to trust."""
brand = (r.get("brand") or "").strip()
if not brand or brand.isdigit() or len(brand) <= 1:
return False
if not (r.get("product") or "").strip():
return False
y = r.get("metrics", {}).get("Yield")
if not isinstance(y, (int, float)) or not (1 < y < 400):
return False
rank = r.get("rank")
if isinstance(rank, (int, float)) and rank > 200: # a yield leaked into rank
return False
return True
def _assign_metrics(nums: list[float]) -> dict:
"""Map a row's trailing numeric run to metric columns, anchored from
the RIGHT (Yield Rank, +/- Ave, Yield/Ac. are always the last three).
Optional leading columns vary: soybean reports often drop Test Wt.,
so a row can carry 3, 4, or 5 numerics:
5 → % H2O, Test Wt., Yield, +/- Ave, Rank
4 → % H2O, Yield, +/- Ave, Rank (no Test Wt.)
3 → Yield, +/- Ave, Rank (no moisture/test wt.)
"""
n = len(nums)
rank = nums[-1]
ave = nums[-2]
yld = nums[-3]
h2o = nums[-4] if n >= 4 else None
testwt = None
if n >= 5:
testwt = nums[-4]
h2o = nums[-5]
# Emit in a readable order (Yield is the primary metric the chunker's
# top-N picker keys on).
m: dict = {"Yield": yld}
if h2o is not None:
m["% H2O"] = h2o
if testwt is not None:
m["Test Wt."] = testwt
m["+/- Ave"] = ave
return {"rank": int(rank), "metrics": m}
def _parse_results_from_text(text: str) -> list[dict]:
"""Fallback row parser for PDFs whose tables have no ruling lines
(pdfplumber returns whole rows as one cell). Anchors on the trailing
numeric run, which is positionally stable across layouts and column
counts."""
results: list[dict] = []
started = False
for line in text.splitlines():
low = line.lower()
if not started:
if "brand" in low and ("hybrid" in low or "variety" in low):
started = True
continue
toks = line.split()
if len(toks) < 5 or not toks[0].isdigit():
continue
# trailing run of numeric tokens
j = len(toks)
while j > 0 and _NUM_TOKEN.match(toks[j - 1]):
j -= 1
nums = [float(t) for t in toks[j:]]
if len(nums) < 3:
continue
# rank must be a whole number; if the last token has a decimal the
# row is malformed (wrapped) — skip it rather than guess.
if "." in toks[-1]:
continue
mid = toks[1:j] # brand + hybrid + optional trait/trtmt
if len(mid) < 2:
continue
rec = _assign_metrics(nums)
brand, product, traits = _split_brand_product(mid)
rec["brand"] = brand
rec["product"] = product
rec["traits"] = traits
results.append(rec)
results.sort(key=lambda r: r["rank"])
return results
def parse_pdf(http: RateLimitedSession, plot: PHPlot) -> None:
r = http.get(plot.pdf_url)
r.raise_for_status()
with pdfplumber.open(io.BytesIO(r.content)) as pdf:
text = "\n".join((p.extract_text() or "") for p in pdf.pages)
_parse_header(text, plot)
results = _parse_results_from_tables(pdf)
if not results:
# Tables had no ruling lines → parse the verbatim text rows.
results = _parse_results_from_text(text)
# Sanity-gate the structured parse. Off-template reports (e.g. a
# university land-lab with extra RM / harvest-weight columns and a
# multi-line header) parse into junk rows — numeric brands, a yield
# leaked into the rank, empty metrics. Drop bad rows; if too few
# survive, discard the structured parse entirely and keep the
# verbatim text instead so nothing is silently corrupted.
good = [r for r in results if _row_ok(r)]
if good and len(good) >= 2 and len(good) >= 0.6 * len(results):
plot.results = good
else:
plot.results = []
if not plot.results:
# Foreign / off-template report — keep the verbatim text so the
# cross-vendor data isn't lost.
plot.verbatim_text = text.strip()
# --------------------------------------------------------------------- render
def render_markdown(plot: PHPlot) -> str:
crop_label = {"corn": "Corn", "soybeans": "Soybean"}.get(plot.crop, plot.crop.title())
loc = ", ".join(filter(None, [plot.city, plot.state]))
head: list[str] = [
f"# {crop_label} yield trial — {plot.title} ({loc}, {plot.year})",
"",
"- **Publisher:** ProHarvest Seeds (cross-vendor plot report)",
f"- **Crop:** {crop_label}",
f"- **Year:** {plot.year}",
]
if not plot.results and plot.verbatim_text:
# Foreign-format report — emit the management header we did parse,
# then the verbatim PDF text under the separator the chunker reads.
if loc:
head.append(f"- **Location:** {loc}"
+ (f" · {plot.county} County" if plot.county else ""))
head += [f"- **Source PDF:** {plot.pdf_url}", "", "---", "",
"## Trial data (verbatim from PDF)", "", "```",
plot.verbatim_text, "```", ""]
return "\n".join(head)
if loc:
head.append(f"- **Location:** {loc}"
+ (f" · {plot.county} County" if plot.county else ""))
for label, val in [
("Cooperator", plot.title), ("Company rep", plot.company_rep),
("Planted", plot.planted_date), ("Harvested", plot.harvested_date),
("Previous crop", plot.previous_crop), ("Row width", plot.row_width),
("Population", f"{plot.population_seeds_per_acre:,} seeds/acre"
if plot.population_seeds_per_acre else None),
("Tillage", plot.tillage), ("Irrigation", plot.irrigation),
("Fungicide", plot.fungicide),
]:
if val:
head.append(f"- **{label}:** {val}")
head += [f"- **Source PDF:** {plot.pdf_url}", "", "---", "",
"## Results (head-to-head)", "",
"| Rank | Brand | Hybrid/Variety | Trait | Yield/Ac | % H2O | Test Wt | +/- Ave |",
"|---|---|---|---|---|---|---|---|"]
for r in plot.results:
m = r.get("metrics", {})
head.append("| {rank} | {brand} | {prod} | {tr} | {y} | {h2o} | {tw} | {ave} |".format(
rank=r.get("rank", "-"), brand=r.get("brand") or "-",
prod=r.get("product") or "-", tr=r.get("traits") or "-",
y=m.get("Yield", "-"), h2o=m.get("% H2O", "-"),
tw=m.get("Test Wt.", "-"), ave=m.get("+/- Ave", "-")))
head.append("")
return "\n".join(head)
def write_plot(plot: PHPlot, body_md: str) -> None:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
(CORPUS_DIR / f"{plot.source_key}.md").write_text(body_md, encoding="utf-8")
sidecar = {
"source": "proharvest_plots",
"source_key": plot.source_key,
"data_type": "trial",
"vendor": "ProHarvest Seeds",
"brand": "ProHarvest Seeds",
"crop": plot.crop,
"state": plot.state,
"state_abbrev": (plot.state or "").lower() or None,
"city": plot.city,
"county": plot.county,
"year": plot.year,
"plot_id": plot.plot_id,
"cooperator": plot.title,
"latitude": plot.latitude,
"longitude": plot.longitude,
"company_representative": plot.company_rep,
"planted_date": plot.planted_date,
"harvested_date": plot.harvested_date,
"previous_crop": plot.previous_crop,
"row_width": plot.row_width,
"population_seeds_per_acre": plot.population_seeds_per_acre,
"fungicide": plot.fungicide,
"herbicide": plot.herbicide,
"insecticide": plot.insecticide,
"tillage": plot.tillage,
"irrigation": plot.irrigation,
"results": plot.results,
"n_results": len(plot.results),
# True when no structured rows could be parsed and the body holds
# the verbatim PDF text instead (foreign-format third-party report).
"raw_text": bool(not plot.results and plot.verbatim_text),
"source_urls": [plot.pdf_url],
"fetched_at": datetime.now(timezone.utc).isoformat(),
"scraper_version": SCRAPER_VERSION,
}
(CORPUS_DIR / f"{plot.source_key}.json").write_text(
json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
# --------------------------------------------------------------------- pipeline
def run(*, years: list[int], limit: int | None, force: bool,
only_plot: str | None) -> int:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
http = RateLimitedSession()
counts = {"written": 0, "verbatim": 0, "skipped": 0, "image_skip": 0, "failed": 0}
processed = 0
for year in years:
try:
plots = fetch_plots_for_year(http, year)
except requests.HTTPError as exc:
log.error("year %s enumeration failed: %s", year, exc)
continue
log.info("year %s: %d plots", year, len(plots))
for pj in plots:
if limit is not None and processed >= limit:
break
product = str(pj.get("product") or "").strip().lower()
crop = PRODUCT_TO_CROP.get(product)
if not crop:
continue # skip non-row-crop products if any appear
pid = pj.get("id")
source_key = f"phpr-{crop}-{year}-{pid}"
if only_plot and source_key != only_plot and str(pid) != only_plot:
continue
processed += 1
md_path = CORPUS_DIR / f"{source_key}.md"
if md_path.exists() and not force:
counts["skipped"] += 1
log.info("[%d] %s skipped", processed, source_key)
continue
pdf_url = pj.get("file") or ""
if not pdf_url:
log.warning("%s has no PDF file — skipping", source_key)
continue
plot = PHPlot(
source_key=source_key, plot_id=int(pid), crop=crop, year=int(year),
title=(pj.get("title") or "").strip(),
city=(pj.get("city") or "").strip() or None,
state=(pj.get("state") or "").strip() or None,
county=(pj.get("county") or "").strip() or None,
latitude=pj.get("latitude"), longitude=pj.get("longitude"),
pdf_url=pdf_url)
try:
parse_pdf(http, plot)
except Exception as exc: # PDF parse is best-effort
counts["failed"] += 1
log.error("[%d] %s PDF parse failed: %s", processed, source_key, exc)
continue
has_text_table = (
len(plot.verbatim_text) >= 300
and len(re.findall(r"\d", plot.verbatim_text)) >= 30)
if not plot.results and not has_text_table:
# No structured rows AND no real text layer with numbers →
# image-only / unparseable PDF. Skip, but count it (no
# silent cap). (Column headers vary — e.g. "Bu/Acre" vs
# "Yield" — so we gate on digit density, not a keyword.)
counts["image_skip"] += 1
log.warning("[%d] %s — no rows + no data text (image PDF?); skipping",
processed, source_key)
continue
write_plot(plot, render_markdown(plot))
if plot.results:
counts["written"] += 1
log.info("[%d] %s written | %s %s, %s | %d results",
processed, source_key, plot.crop, plot.state, plot.year,
len(plot.results))
else:
counts["verbatim"] += 1
log.info("[%d] %s written VERBATIM (foreign-format) | %s %s, %s | %d chars",
processed, source_key, plot.crop, plot.state, plot.year,
len(plot.verbatim_text))
if limit is not None and processed >= limit:
break
log.info("done: processed=%d written(structured)=%d written(verbatim)=%d "
"skipped=%d image_skip=%d failed=%d",
processed, counts["written"], counts["verbatim"],
counts["skipped"], counts["image_skip"], counts["failed"])
return 0
# --------------------------------------------------------------------- CLI
def _build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="scrape.sources.proharvest_plots",
description="Scrape ProHarvest Seeds plot reports (cross-vendor yield "
"trials) via the proharvest/v1/plots API + harvest-report PDFs.")
p.add_argument("--year", type=int, default=None,
help="Scrape a single year (default: 2024+2025 baseline).")
p.add_argument("--include-old", action="store_true",
help="Also scrape 20152023 (deferred by default).")
p.add_argument("--limit", type=int, default=None,
help="Stop after processing N plots (default: all).")
p.add_argument("--force", action="store_true",
help="Re-fetch even if the markdown file already exists.")
p.add_argument("--plot", default=None,
help="Process a single plot by source_key or plot id.")
p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"))
return p
def main(argv: list[str] | None = None) -> int:
args = _build_argparser().parse_args(argv)
logging.basicConfig(
level=args.log_level.upper(),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr)
if args.year is not None:
years = [args.year]
elif args.include_old:
years = OLD_YEARS + BASELINE_YEARS
else:
years = BASELINE_YEARS
return run(years=years, limit=args.limit, force=args.force,
only_plot=args.plot)
if __name__ == "__main__":
sys.exit(main())