"""AgriPro trial-PDF scraper. Source: ``agriprowheat.com/trials-data`` — a single page listing ~38 PDF links to regional wheat trial summary documents. Each PDF is a multi-year multi-location performance test comparing AgriPro varieties against competitors (LCS, Norwest, PNW, UI, etc.). Discovery: walk ``/trials-data``, collect every ``href="*.pdf"``. Per-PDF content (parsed via pdfplumber): - First line: usually the title (e.g. "2024 Pacific Northwest Combined Summary, Three-Year Data") - A multi-column table with one row per variety. Columns vary by PDF but typically include: 3-yr combined yield, 2-yr combined, most-recent-year yield, plus per-location yields with location names in the header. - Footer notes: locations covered, LSD/CV statistical caveats, copyright. Trial PDFs are stable text-extractable (no charts). We capture the full per-page text verbatim in the chunk body — preserving variety-name + yield-number adjacency for the embedder — plus metadata derived from the title (region, year, crop class). This is a deliberate trade-off: perfect table parsing across the PDF variants would be brittle; verbatim text preserves every data point and the embedder + BM25 between them can match queries like "AP Iliad yield Aberdeen Idaho" reliably. Output: corpus/agripro_trials/.md corpus/agripro_trials/.json source_key convention: ``agt-`` lowercased, e.g. ``agt-2024-pnw-combined``. CLI: python -m scrape.sources.agripro_trials --limit 5 python -m scrape.sources.agripro_trials --force """ from __future__ import annotations import argparse import io import json import logging import os import random import re import sys import time from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any import requests from bs4 import BeautifulSoup import pdfplumber SCRAPER_VERSION = "0.1.0" USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)" BASE = "https://agriprowheat.com" LIST_URL = f"{BASE}/trials-data" REPO_ROOT = Path(__file__).resolve().parents[2] CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus") CORPUS_DIR = CORPUS_ROOT / "agripro_trials" REQ_INTERVAL_SEC = 1.0 log = logging.getLogger("scrape.agripro_trials") # Region name patterns we recognize in PDF filenames / titles. The # value is a human-readable normalized region. REGION_PATTERNS = ( (re.compile(r"\bPNW\b|Pacific Northwest", re.I), "Pacific Northwest"), (re.compile(r"\bNE Colorado\b|Northeast Colorado", re.I), "NE Colorado"), (re.compile(r"\bSC KS\b|South Central Kansas", re.I), "SC Kansas / N Central OK"), (re.compile(r"\bWestern Plains\b", re.I), "Western Plains"), (re.compile(r"\bCentral Plains\b", re.I), "Central Plains"), (re.compile(r"\bPlains Irrigated\b", re.I), "Plains Irrigated"), (re.compile(r"\bWashington[/:]?N? *Idaho\b", re.I), "WA / N. Idaho"), (re.compile(r"\bSouthern Idaho\b", re.I), "Southern Idaho"), (re.compile(r"\bMontana\b", re.I), "Montana"), (re.compile(r"\bNP Perf Data\b|Northern Plains", re.I), "Northern Plains"), (re.compile(r"\bWheat after Soy\b", re.I), "Wheat-after-Soy rotation"), ) # --------------------------------------------------------------------- HTTP class RateLimitedSession: def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None: self.s = requests.Session() self.s.headers["User-Agent"] = USER_AGENT self.interval = interval self._last = 0.0 def _wait(self) -> None: delta = time.monotonic() - self._last if delta < self.interval: time.sleep(self.interval - delta) self._last = time.monotonic() def request( self, method: str, url: str, *, max_retries: int = 4, timeout: float = 60.0, **kw: Any, ) -> requests.Response: last_exc: Exception | None = None for attempt in range(max_retries): self._wait() try: resp = self.s.request(method, url, timeout=timeout, **kw) except requests.RequestException as exc: last_exc = exc backoff = min(30.0, (2 ** attempt) + random.random()) log.warning("network error on %s %s: %s — retry in %.1fs", method, url, exc, backoff) time.sleep(backoff) continue if resp.status_code == 429 or 500 <= resp.status_code < 600: ra = resp.headers.get("Retry-After") backoff = float(ra) if (ra and ra.isdigit()) else min(30.0, (2 ** attempt) + random.random()) log.warning("HTTP %d on %s %s — retry in %.1fs", resp.status_code, method, url, backoff) time.sleep(backoff) continue return resp if last_exc: raise last_exc return resp # type: ignore[return-value] def get(self, url: str, **kw: Any) -> requests.Response: return self.request("GET", url, **kw) # --------------------------------------------------------------------- model @dataclass class TrialPDF: source_key: str source_url: str pdf_url: str filename: str title: str | None = None year: int | None = None years_covered: list[int] = field(default_factory=list) region: str | None = None wheat_class_section: str | None = None # e.g. "Soft White Winter Wheat" — derived from PDF text page_text: str = "" varieties_found: list[str] = field(default_factory=list) # --------------------------------------------------------------------- discovery def discover_pdfs(http: RateLimitedSession) -> list[tuple[str, str, str, str]]: """Return ``[(pdf_url, filename, section_heading, section_anchor), ...]`` for every PDF on /trials-data. De-duplicates by pdf_url — multiple section headings may link to the same PDF (e.g. a multi-state summary). """ log.info("fetching trials index %s", LIST_URL) r = http.get(LIST_URL) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") seen: dict[str, tuple[str, str, str, str]] = {} for a in soup.find_all("a", href=re.compile(r"\.pdf(?:$|\?)", re.I)): href = a["href"] from urllib.parse import urljoin full = urljoin(LIST_URL, href) fn = href.rsplit("/", 1)[-1] # Section context — closest preceding h2/h3/h4 section = "" parent = a.parent for _ in range(10): if parent is None: break head = parent.find_previous(["h2", "h3", "h4"]) if head: section = head.get_text(strip=True) break parent = parent.parent if full not in seen: seen[full] = (full, fn, section, href) out = list(seen.values()) log.info("trial PDFs found: %d (deduped from %d total links)", len(out), sum(1 for a in soup.find_all("a", href=re.compile(r"\.pdf", re.I)))) return out # --------------------------------------------------------------------- helpers def source_key_for(filename: str) -> str: """``2024 PNW Combined.pdf`` → ``agt-2024-pnw-combined``.""" from urllib.parse import unquote stem = unquote(filename).rsplit(".", 1)[0] slug = re.sub(r"[^a-zA-Z0-9]+", "-", stem).strip("-").lower() return f"agt-{slug}" def _detect_region(text: str) -> str | None: for pat, label in REGION_PATTERNS: if pat.search(text): return label return None def _detect_years(text: str) -> list[int]: """Return sorted years found in the PDF title / first lines. Filters to 2010-2030 to ignore page numbers / table values.""" years = sorted({ int(y) for y in re.findall(r"\b(20[1-3]\d)\b", text[:600]) }) return years def _detect_wheat_class_section(text: str) -> str | None: """The trial PDFs typically have a class label line like 'Soft White Winter Wheat' near the top of the table.""" for label in ( "Hard Red Winter Wheat", "Hard Red Spring Wheat", "Hard White Spring Wheat", "Hard White Winter Wheat", "Soft White Winter Wheat", "Soft White Spring Wheat", "Soft Red Winter Wheat", "Durum", ): if re.search(r"\b" + re.escape(label) + r"\b", text[:1500], re.I): return label return None # Variety name patterns we expect to see in AgriPro trial PDFs. # AgriPro varieties = AP , SY ; competitors include # LCS , UI , PNW , Norwest . _VARIETY_LINE_RE = re.compile( r"^(?:AP|SY|LCS|UI|PNW|Norwest|WB|Stine|Pioneer)\b[A-Za-z0-9 \-+]*", ) def _detect_varieties(text: str) -> list[str]: out: list[str] = [] seen: set[str] = set() for line in text.splitlines(): line = line.strip() if not line: continue m = _VARIETY_LINE_RE.match(line) if m: # Up to first run of digits / spaces — variety name only name_match = re.match(r"^([A-Za-z][A-Za-z0-9 \-+]*?)\s+\d", line) name = name_match.group(1).strip() if name_match else m.group(0).strip() # Trim trailing single tokens that are clearly stats if name and name not in seen and len(name) <= 40: seen.add(name) out.append(name) return out # --------------------------------------------------------------------- detail def fetch_pdf_detail( http: RateLimitedSession, pdf_url: str, filename: str, ) -> TrialPDF | None: """Download + parse one trial PDF.""" r = http.get(pdf_url) if r.status_code == 404: return None r.raise_for_status() try: with pdfplumber.open(io.BytesIO(r.content)) as pdf: pages_text = [] for p in pdf.pages: t = p.extract_text() or "" pages_text.append(t) text = "\n\n".join(pages_text).strip() except Exception as exc: # noqa: BLE001 log.warning("PDF parse failed for %s: %s", pdf_url, exc) return None title = "" if text: # First non-empty line is usually the title. for line in text.splitlines(): line = line.strip() if line: title = line break region = _detect_region(filename) or _detect_region(title or "") years = _detect_years(title + "\n" + filename) wheat_class_section = _detect_wheat_class_section(text) varieties = _detect_varieties(text) return TrialPDF( source_key=source_key_for(filename), source_url=LIST_URL, pdf_url=pdf_url, filename=filename, title=title or None, year=years[-1] if years else None, years_covered=years, region=region, wheat_class_section=wheat_class_section, page_text=text, varieties_found=varieties, ) # --------------------------------------------------------------------- render def render_markdown(p: TrialPDF) -> str: head: list[str] = [ f"# {p.title or p.filename}", "", "- **Source:** AgriPro (Syngenta) regional trial PDF", "- **Vendor:** Syngenta", "- **Brand:** AgriPro", "- **Crop:** Wheat", "- **Data type:** trial", ] if p.region: head.append(f"- **Region:** {p.region}") if p.wheat_class_section: head.append(f"- **Wheat class:** {p.wheat_class_section}") if p.year: head.append(f"- **Year:** {p.year}") if p.years_covered and len(p.years_covered) > 1: head.append(f"- **Years covered:** {p.years_covered[0]}–{p.years_covered[-1]}") head.append(f"- **PDF:** {p.pdf_url}") head.append(f"- **Index page:** {p.source_url}") if p.varieties_found: head.append( f"- **Varieties listed:** {', '.join(p.varieties_found[:30])}" + ("…" if len(p.varieties_found) > 30 else "") ) head.append("") head.append("---") head.append("") head.append("## Trial data (verbatim from PDF)") head.append("") head.append("```") head.append(p.page_text) head.append("```") return "\n".join(head) # --------------------------------------------------------------------- write def write_pdf(prod: TrialPDF, body_md: str) -> None: CORPUS_DIR.mkdir(parents=True, exist_ok=True) md_path = CORPUS_DIR / f"{prod.source_key}.md" json_path = CORPUS_DIR / f"{prod.source_key}.json" md_path.write_text(body_md, encoding="utf-8") sidecar = { "source": "agripro_trials", "source_key": prod.source_key, "data_type": "trial", "vendor": "Syngenta", "brand": "AgriPro", "crop": "wheat", "title": prod.title, "filename": prod.filename, "region": prod.region, "wheat_class_section": prod.wheat_class_section, "year": prod.year, "years_covered": prod.years_covered, "varieties_found": prod.varieties_found, "pdf_url": prod.pdf_url, "source_urls": [prod.source_url, prod.pdf_url], "page_text_chars": len(prod.page_text), "fetched_at": datetime.now(timezone.utc).isoformat(), "scraper_version": SCRAPER_VERSION, } json_path.write_text( json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) # --------------------------------------------------------------------- pipeline def process_pdf( http: RateLimitedSession, *, pdf_url: str, filename: str, force: bool, ) -> tuple[str, TrialPDF | None]: sk = source_key_for(filename) md_path = CORPUS_DIR / f"{sk}.md" if md_path.exists() and not force: return "skipped", None try: prod = fetch_pdf_detail(http, pdf_url, filename) except Exception as exc: # noqa: BLE001 log.error("PDF fetch/parse failed for %s: %s", pdf_url, exc) return "failed", None if prod is None: return "missing", None body = render_markdown(prod) write_pdf(prod, body) return "written", prod def run(*, limit: int | None, force: bool) -> int: CORPUS_DIR.mkdir(parents=True, exist_ok=True) http = RateLimitedSession() targets = discover_pdfs(http) counts = {"written": 0, "skipped": 0, "missing": 0, "failed": 0} processed = 0 for pdf_url, filename, _section, _href in targets: if limit is not None and processed >= limit: break processed += 1 status, prod = process_pdf( http, pdf_url=pdf_url, filename=filename, force=force, ) counts[status] = counts.get(status, 0) + 1 log.info( "[%d/%d] %s %s | region=%s year=%s varieties=%d chars=%d", processed, len(targets), source_key_for(filename), status, (prod.region if prod else "-") or "-", prod.year if prod else "-", len(prod.varieties_found) if prod else 0, len(prod.page_text) if prod else 0, ) log.info( "done: processed=%d written=%d skipped=%d missing=%d failed=%d (of %d PDFs)", processed, counts["written"], counts["skipped"], counts["missing"], counts["failed"], len(targets), ) return 0 if counts["failed"] == 0 else 1 # --------------------------------------------------------------------- CLI def _build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog="scrape.sources.agripro_trials", description="Scrape AgriPro regional trial PDFs.", ) p.add_argument("--limit", type=int, default=None, help="Stop after processing N PDFs (default: all).") p.add_argument("--force", action="store_true", help="Re-fetch even if the markdown file already exists.") p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO")) return p def main(argv: list[str] | None = None) -> int: args = _build_argparser().parse_args(argv) logging.basicConfig( level=args.log_level.upper(), format="%(asctime)s %(levelname)s %(name)s %(message)s", stream=sys.stderr, ) return run(limit=args.limit, force=args.force) if __name__ == "__main__": sys.exit(main())