Files
crop-chem-docs/scrape/sources/epa_ppls.py
T
justin e9250de8e7 scrape: Phase 1 — Bayer + EPA PPLS scrapers with unified label schema
Adapts the docs-mcp-template scraping layer for the pesticide-labels
domain. The template's bundle/version/platform concepts don't map to
labels (there's no "Bayer 8.1.0" — there's just the current accepted
label per EPA Reg No), so the scraper layer is reshaped around a
"source" abstraction: one source per manufacturer or regulator, one
per-product label per source.

Sources shipped:
  - bayer       — Bayer Crop Science US (Next.js JSON catalog + Scene7 PDFs)
  - epa_ppls    — EPA PPLS via PPIS bulk index + undocumented /cswu/ ORDS REST endpoint

Canonical sidecar schema (see scrape/README.md) unifies fields across
sources:
  - active_ingredients always [{name, cas, percent}]
  - label/* nested (url, filename, accepted_date, last_modified,
    page_count, text_layer)
  - all timestamps normalized to ISO 8601 UTC
  - signal_word surfaced (operationally critical for the farmer advisor)
  - source_key + epa_reg_no separate per-source PK from the
    cross-source join key

bundles.json → sources.json. --bundle → --source. The runner walks
sources.json and dispatches by id; per-source modules remain
independently runnable for development.

PLAN.md gets a one-block domain note up front; later phases (chunking,
embeddings, retrieval, eval) still apply as written.

Smoke test:
  python -m scrape.runner --all --limit 2     # works
  python -m scrape.runner --source bayer --limit 3    # 3 written, idempotent re-run skips
  python -m scrape.runner --source epa_ppls --reg-no 524-475   # Roundup Ultra, 167 pages, ISO last_modified

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:27:07 -04:00

600 lines
21 KiB
Python

"""EPA PPLS (Pesticide Product Label System) scraper.
Enumeration strategy
====================
The PPLS Oracle APEX portal (ordspub.epa.gov/ords/pesticides/f?p=PPLS:1)
is session-stateful and hostile to enumeration, so we use a two-phase
approach that bypasses APEX entirely:
1. **List products** via the public PPIS bulk download
``https://www3.epa.gov/pesticides/PPISdata/product.zip`` — a 107-char
fixed-width flat file (``product.txt``, ~102K active Section 3
registrations, refreshed every Tuesday). Gives us the universe of
EPA Reg Nos (company-product), plus the product name.
2. **Hydrate per product** via the PPLS REST data service at
``https://ordspub.epa.gov/ords/pesticides/cswu/ppls/{regno}`` —
returns rich JSON: registrant, active ingredients (with CAS + %),
formulations, status, signal word, AND a ``pdffiles`` array
listing every stamped label PDF EPA has accepted for the product.
The most recent entry gives us the canonical PDF filename
(``{company6}-{product5}-{YYYYMMDD}.pdf``), solving the
stamped-date-suffix problem without having to guess.
3. **Fetch label PDF** from
``https://www3.epa.gov/pesticides/chem_search/ppls/{filename}``
and extract text with pypdf. Many EPA labels are scans with no
text layer — those are flagged ``text_layer: false`` and the .md
body is a ``[SCANNED PDF — OCR REQUIRED]`` placeholder. OCR is
deferred to Phase 2.
Paths rejected and why
----------------------
- ``/ords/pesticides/ppls/{reg}`` (no ``/cswu/`` prefix): returns the
APEX HTML splash, not JSON. The undocumented ``/cswu/`` prefix is
the actual ORDS REST handler.
- Scraping the APEX UI: session-stateful, fragile, blocked.
- data.gov mirror: redirects to the same APEX page, no extract.
- NPIRS (Purdue): subscription-walled; PPIS is the same authoritative
feed anyway.
Required sidecar fields (per task spec): ``source``, ``epa_reg_no``,
``label_pdf_url``, ``fetched_at``. Everything else best-effort.
"""
from __future__ import annotations
import argparse
import io
import json
import logging
import re
import sys
import time
import zipfile
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Iterable
import httpx
from pypdf import PdfReader
from pypdf.errors import PdfReadError
SCRAPER_VERSION = "0.1.0"
USER_AGENT = "ppls-docs-scraper/0.1 (+https://drawbar.example/contact)"
PPIS_PRODUCT_ZIP_URL = "https://www3.epa.gov/pesticides/PPISdata/product.zip"
PPLS_API_BASE = "https://ordspub.epa.gov/ords/pesticides/cswu/ppls"
PPLS_PDF_BASE = "https://www3.epa.gov/pesticides/chem_search/ppls"
PPLS_INDEX_URL_TEMPLATE = (
"https://ordspub.epa.gov/ords/pesticides/f?p=PPLS:102:::NO::P102_REG_NUM:{regno}"
)
REPO_ROOT = Path(__file__).resolve().parents[2]
CORPUS_DIR = REPO_ROOT / "corpus" / "epa_ppls"
REQUEST_DELAY_SECONDS = 1.1 # polite: ~1 req/sec
HTTP_TIMEOUT = httpx.Timeout(60.0, connect=15.0)
MAX_RETRIES = 4
log = logging.getLogger("epa_ppls")
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def _client() -> httpx.Client:
return httpx.Client(
headers={"User-Agent": USER_AGENT, "Accept-Encoding": "gzip, deflate"},
timeout=HTTP_TIMEOUT,
follow_redirects=True,
)
def _get_with_retries(
client: httpx.Client, url: str, *, expect_json: bool = False
) -> httpx.Response:
"""GET with exponential backoff on 5xx/429/network errors."""
last_exc: Exception | None = None
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = client.get(url)
if resp.status_code in (429, 500, 502, 503, 504):
wait = min(2 ** attempt, 30)
log.warning(
"HTTP %s on %s (attempt %d/%d) — sleeping %ds",
resp.status_code, url, attempt, MAX_RETRIES, wait,
)
time.sleep(wait)
continue
resp.raise_for_status()
if expect_json:
# ORDS sometimes returns text/html error pages with 200 — sanity
ctype = resp.headers.get("content-type", "")
if "json" not in ctype.lower():
raise httpx.HTTPError(
f"Expected JSON, got content-type={ctype!r} for {url}"
)
return resp
except (httpx.TransportError, httpx.HTTPError) as exc:
last_exc = exc
wait = min(2 ** attempt, 30)
log.warning(
"Network error on %s (attempt %d/%d): %s — sleeping %ds",
url, attempt, MAX_RETRIES, exc, wait,
)
time.sleep(wait)
raise RuntimeError(f"GET {url} failed after {MAX_RETRIES} attempts: {last_exc}")
# ---------------------------------------------------------------------------
# Enumeration: PPIS bulk product.zip
# ---------------------------------------------------------------------------
@dataclass
class PpisRow:
"""One row of PPIS product.txt — enough to hydrate via the API."""
epa_reg_no: str
product_name: str
status_flag: str # 'F' (federal/active) or 'T' (transferred)
rup_flag: str # 'Y' or 'N'
def _parse_ppis_line(line: str) -> PpisRow | None:
"""Parse one 107-char PPIS product.txt row.
Layout (1-indexed, inferred from inspection):
1-6 company number (zero-padded, may contain trailing spaces)
7-11 product number (zero-padded, may contain trailing spaces)
33-102 product name (70 chars, space-padded)
103 status flag ('F' or 'T')
106 RUP flag ('Y' or 'N')
"""
if len(line) < 106:
return None
company_raw = line[0:6].strip()
product_raw = line[6:11].strip()
if not company_raw or not product_raw:
return None
# Strip leading zeros for canonical EPA Reg No display
try:
company = str(int(company_raw))
product = str(int(product_raw))
except ValueError:
return None
name = line[32:102].strip()
status_flag = line[102:103]
rup_flag = line[105:106] if len(line) > 105 else "N"
return PpisRow(
epa_reg_no=f"{company}-{product}",
product_name=name,
status_flag=status_flag,
rup_flag=rup_flag,
)
def fetch_ppis_index(client: httpx.Client) -> list[PpisRow]:
"""Download PPIS product.zip and parse into PpisRow list."""
log.info("Fetching PPIS index from %s", PPIS_PRODUCT_ZIP_URL)
resp = _get_with_retries(client, PPIS_PRODUCT_ZIP_URL)
rows: list[PpisRow] = []
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
with zf.open("product.txt") as fh:
for raw in fh:
line = raw.decode("latin-1").rstrip("\n").rstrip("\r")
row = _parse_ppis_line(line)
if row is not None:
rows.append(row)
log.info("Parsed %d rows from PPIS index", len(rows))
return rows
# ---------------------------------------------------------------------------
# Hydration: PPLS JSON API
# ---------------------------------------------------------------------------
def _zero_pad_regno(regno: str) -> str:
"""524-475 -> 000524-00475 (canonical filename form). Distributor suffix
(524-475-12345) -> 000524-00475-12345."""
parts = regno.split("-")
if len(parts) == 2:
c, p = parts
return f"{int(c):06d}-{int(p):05d}"
if len(parts) == 3:
c, p, d = parts
return f"{int(c):06d}-{int(p):05d}-{int(d):05d}"
return regno
_MONTHS = {
"january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6,
"july": 7, "august": 8, "september": 9, "october": 10, "november": 11,
"december": 12,
}
def _parse_label_date(text: str | None) -> str | None:
"""'October 18, 2016' -> '2016-10-18'. Returns None on any parse issue."""
if not text:
return None
m = re.match(r"^([A-Za-z]+)\s+(\d{1,2}),\s+(\d{4})$", text.strip())
if not m:
return None
month = _MONTHS.get(m.group(1).lower())
if month is None:
return None
try:
return f"{int(m.group(3)):04d}-{month:02d}-{int(m.group(2)):02d}"
except ValueError:
return None
def _http_date_to_iso(http_date: str | None) -> str | None:
"""RFC1123 'Wed, 19 Oct 2016 17:48:09 GMT' -> ISO 8601 UTC.
Returns None on unparseable input. Matches the canonical schema's
requirement that all timestamps be ISO 8601.
"""
if not http_date:
return None
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(http_date)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt.astimezone(UTC).isoformat()
except Exception: # noqa: BLE001
return None
@dataclass
class ProductRecord:
epa_reg_no: str
product_name: str | None
registrant: str | None
registrant_company_number: str | None
active_ingredients: list[dict[str, Any]]
label_pdf_url: str | None
label_pdf_filename: str | None
label_accepted_date: str | None
registration_status: str | None
signal_word: str | None
raw_api_item: dict[str, Any] | None = field(repr=False, default=None)
def fetch_product_record(client: httpx.Client, regno: str) -> ProductRecord:
"""Call the PPLS API for one EPA Reg No; build a ProductRecord."""
url = f"{PPLS_API_BASE}/{regno}"
resp = _get_with_retries(client, url, expect_json=True)
payload = resp.json()
items = payload.get("items") or []
if not items:
return ProductRecord(
epa_reg_no=regno,
product_name=None,
registrant=None,
registrant_company_number=None,
active_ingredients=[],
label_pdf_url=None,
label_pdf_filename=None,
label_accepted_date=None,
registration_status=None,
signal_word=None,
raw_api_item=None,
)
item = items[0]
company_info = (item.get("companyinfo") or [{}])[0]
registrant = company_info.get("name")
company_num = regno.split("-")[0]
ingredients = []
for ai in item.get("active_ingredients") or []:
ingredients.append({
"name": ai.get("active_ing"),
"cas": ai.get("cas_number"),
"percent": ai.get("active_ing_percent"),
"pc_code": ai.get("pc_code"),
})
pdffiles = item.get("pdffiles") or []
# Most recent PDF first (sorted by date desc); API returns them in
# date-descending order but we sort defensively.
pdf_entry: dict[str, Any] | None = None
if pdffiles:
def _date_key(e: dict[str, Any]) -> str:
d = _parse_label_date(e.get("pdffile_accepted_date"))
return d or "0000-00-00"
pdf_entry = max(pdffiles, key=_date_key)
pdf_filename = pdf_entry.get("pdffile") if pdf_entry else None
pdf_url = f"{PPLS_PDF_BASE}/{pdf_filename}" if pdf_filename else None
accepted = _parse_label_date(pdf_entry.get("pdffile_accepted_date")) if pdf_entry else None
return ProductRecord(
epa_reg_no=regno,
product_name=item.get("productname"),
registrant=registrant,
registrant_company_number=company_num,
active_ingredients=ingredients,
label_pdf_url=pdf_url,
label_pdf_filename=pdf_filename,
label_accepted_date=accepted,
registration_status=item.get("product_status"),
signal_word=item.get("signal_word"),
raw_api_item=item,
)
# ---------------------------------------------------------------------------
# PDF download + text extraction
# ---------------------------------------------------------------------------
def download_pdf(client: httpx.Client, url: str) -> tuple[bytes, str | None]:
"""Download a label PDF; return (bytes, Last-Modified header or None)."""
resp = _get_with_retries(client, url)
last_modified = resp.headers.get("last-modified")
return resp.content, last_modified
def extract_pdf_text(pdf_bytes: bytes) -> tuple[str, bool]:
"""Extract text from a PDF.
Returns (text, has_text_layer). Concatenates pages, normalizes whitespace.
If no extractable text is found, returns ("", False).
"""
try:
reader = PdfReader(io.BytesIO(pdf_bytes))
except PdfReadError as exc:
log.warning("pypdf failed to read PDF: %s", exc)
return "", False
chunks: list[str] = []
for i, page in enumerate(reader.pages):
try:
page_text = page.extract_text() or ""
except Exception as exc: # pypdf can throw on malformed pages
log.warning("pypdf extract_text failed on page %d: %s", i, exc)
page_text = ""
page_text = re.sub(r"[ \t]+", " ", page_text)
page_text = re.sub(r"\n{3,}", "\n\n", page_text).strip()
if page_text:
chunks.append(page_text)
combined = "\n\n".join(chunks).strip()
return combined, bool(combined)
# ---------------------------------------------------------------------------
# Per-product processing
# ---------------------------------------------------------------------------
def _md_path(regno: str) -> Path:
return CORPUS_DIR / f"{regno}.md"
def _json_path(regno: str) -> Path:
return CORPUS_DIR / f"{regno}.json"
def process_one(
client: httpx.Client,
regno: str,
*,
force: bool = False,
) -> str:
"""Fetch + extract one product. Returns 'skipped'|'wrote'|'no-pdf'|'error'."""
md_path = _md_path(regno)
json_path = _json_path(regno)
if not force and md_path.exists() and json_path.exists():
log.info("[%s] skip (already on disk)", regno)
return "skipped"
try:
record = fetch_product_record(client, regno)
except Exception as exc:
log.error("[%s] API fetch failed: %s", regno, exc)
return "error"
time.sleep(REQUEST_DELAY_SECONDS)
def _build_sidecar(
*,
label_url: str | None,
label_filename: str | None,
label_last_modified_iso: str | None,
page_count: int | None,
text_layer: bool | None,
) -> dict[str, Any]:
return {
"source": "epa_ppls",
"source_key": regno,
"epa_reg_no": regno,
"product_name": record.product_name,
"product_class": None, # EPA PPLS doesn't expose a clean class taxonomy
"registrant": record.registrant,
"active_ingredients": record.active_ingredients,
"signal_word": record.signal_word,
"label": {
"url": label_url,
"filename": label_filename,
"accepted_date": record.label_accepted_date,
"last_modified": label_last_modified_iso,
"page_count": page_count,
"text_layer": text_layer,
},
"supplemental_documents": [], # EPA PPLS sidecar omits supplementals; query API per regno
"source_urls": {
"product_page": None,
"label_api": f"{PPLS_API_BASE}/{regno}",
"label_index": PPLS_INDEX_URL_TEMPLATE.format(regno=regno),
},
# EPA-specific extras (kept out of the strict canonical schema but
# useful for joins back to EPA's data model)
"registration_status": record.registration_status,
"registrant_company_number": record.registrant_company_number,
"fetched_at": datetime.now(UTC).isoformat(),
"scraper_version": SCRAPER_VERSION,
}
if not record.label_pdf_url:
log.warning("[%s] no label PDF available — writing sidecar only", regno)
md_path.write_text(
f"# {record.product_name or regno}\n\n"
f"EPA Reg No: {regno}\n\n"
"[NO LABEL PDF AVAILABLE FROM EPA PPLS]\n",
encoding="utf-8",
)
sidecar = _build_sidecar(
label_url=None, label_filename=None,
label_last_modified_iso=None,
page_count=None, text_layer=False,
)
json_path.write_text(json.dumps(sidecar, indent=2), encoding="utf-8")
return "no-pdf"
try:
pdf_bytes, last_modified_raw = download_pdf(client, record.label_pdf_url)
except Exception as exc:
log.error("[%s] PDF download failed: %s", regno, exc)
return "error"
time.sleep(REQUEST_DELAY_SECONDS)
text, has_text = extract_pdf_text(pdf_bytes)
last_modified_iso = _http_date_to_iso(last_modified_raw)
page_count: int | None = None
try:
page_count = len(PdfReader(io.BytesIO(pdf_bytes)).pages)
except Exception:
pass
sidecar = _build_sidecar(
label_url=record.label_pdf_url,
label_filename=record.label_pdf_filename,
label_last_modified_iso=last_modified_iso,
page_count=page_count,
text_layer=has_text,
)
header_lines = [f"# {record.product_name or regno}", ""]
header_lines.append(f"- EPA Reg No: **{regno}**")
if record.registrant:
header_lines.append(f"- Registrant: {record.registrant}")
if record.signal_word:
header_lines.append(f"- Signal word: {record.signal_word}")
if record.active_ingredients:
ai_strs = [
f"{ai.get('name')} ({ai.get('percent')}%)"
for ai in record.active_ingredients
if ai.get("name")
]
if ai_strs:
header_lines.append("- Active ingredients: " + "; ".join(ai_strs))
if record.label_accepted_date:
header_lines.append(f"- Label accepted: {record.label_accepted_date}")
header_lines.append(f"- Source PDF: {record.label_pdf_url}")
header_lines.append("")
header_lines.append("---")
header_lines.append("")
if has_text:
body = text
else:
body = "[SCANNED PDF — OCR REQUIRED]\n\nThis label has no extractable text layer."
log.info("[%s] PDF has no text layer (scanned)", regno)
md_content = "\n".join(header_lines) + body + "\n"
md_path.write_text(md_content, encoding="utf-8")
json_path.write_text(json.dumps(sidecar, indent=2), encoding="utf-8")
log.info(
"[%s] wrote (text_layer=%s, pages=%s, name=%r)",
regno, has_text, page_count, record.product_name,
)
return "wrote"
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _iter_regnos(
args: argparse.Namespace,
client: httpx.Client,
) -> Iterable[str]:
"""Yield reg nos to process based on CLI args."""
if args.reg_no:
for r in args.reg_no:
yield r
return
if args.seed_file:
with open(args.seed_file, encoding="utf-8") as fh:
for raw in fh:
line = raw.strip()
if not line or line.startswith("#"):
continue
yield line
return
# Default: enumerate via PPIS bulk index
rows = fetch_ppis_index(client)
count = 0
for row in rows:
# Skip transferred-out (status_flag 'T') entries by default; their
# registration has moved to another company-product pairing.
if row.status_flag == "T":
continue
yield row.epa_reg_no
count += 1
if args.limit and count >= args.limit:
return
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="python -m scrape.sources.epa_ppls",
description="Scrape EPA PPLS pesticide labels into corpus/epa_ppls/.",
)
parser.add_argument(
"--limit", type=int, default=None,
help="Max products to process when enumerating from PPIS.",
)
parser.add_argument(
"--force", action="store_true",
help="Re-fetch even if .md/.json already exist.",
)
parser.add_argument(
"--reg-no", action="append", metavar="REGNO",
help="Process specific EPA Reg No (e.g. 524-475). Repeatable.",
)
parser.add_argument(
"--seed-file", metavar="PATH",
help="Text file with one EPA Reg No per line (# comments OK).",
)
parser.add_argument(
"--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
)
args = parser.parse_args(argv)
logging.basicConfig(
stream=sys.stderr,
level=getattr(logging, args.log_level),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
summary = {"wrote": 0, "skipped": 0, "no-pdf": 0, "error": 0}
with _client() as client:
for regno in _iter_regnos(args, client):
result = process_one(client, regno, force=args.force)
summary[result] = summary.get(result, 0) + 1
log.info("done: %s", summary)
print(json.dumps(summary), file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main())