92a95d5e78
Cuts the PPIS-enumeration universe from 102K rows to ~11.5K rows by dropping products from non-row-crop-ag registrants BEFORE the per- product API call. This is the biggest cost lever we have on the EPA scraper — full backfill drops from ~28 h to ~3.5 h. scrape/sources/epa_registrant_allowlist.json holds the 34 verified ag-chem company numbers (Syngenta, Bayer, BASF, Corteva, FMC, Nufarm, ADAMA, UPL, Albaugh, Loveland, AMVAC, Helena, Drexel, Atticus, etc.). Each entry was verified by querying the EPA PPLS API for the first active product registered under that company number. Edit the JSON freely — scraper loads it at run time. Bypass with --no-registrant-filter when you suspect a row-crop product registered to a specialty company not on the list. Why a curated allowlist rather than blacklist consumer brands: the 102K PPIS rows are 89% non-ag-relevant; an allowlist is shorter to maintain and harder to false-positive. Excluded with intent (not omissions): Bayer Environmental Science (turf/ornamental), Scotts (consumer lawn & garden), Wellmark/Zoecon (animal flea/tick), Control Solutions (structural pest), Cleary (turf), PBI/Gordon (mostly turf), Buckman Labs (industrial water). Smoke test --limit 100: - 1239 PPIS rows considered (in first slice of file) - 1139 skipped by registrant filter (no API call paid) - 100 hit API, 81 filtered by row-crop sites, 19 written - = 91% API-call reduction over the prior version Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
704 lines
25 KiB
Python
704 lines
25 KiB
Python
"""EPA PPLS (Pesticide Product Label System) scraper.
|
|
|
|
Enumeration strategy
|
|
====================
|
|
The PPLS Oracle APEX portal (ordspub.epa.gov/ords/pesticides/f?p=PPLS:1)
|
|
is session-stateful and hostile to enumeration, so we use a two-phase
|
|
approach that bypasses APEX entirely:
|
|
|
|
1. **List products** via the public PPIS bulk download
|
|
``https://www3.epa.gov/pesticides/PPISdata/product.zip`` — a 107-char
|
|
fixed-width flat file (``product.txt``, ~102K active Section 3
|
|
registrations, refreshed every Tuesday). Gives us the universe of
|
|
EPA Reg Nos (company-product), plus the product name.
|
|
|
|
2. **Hydrate per product** via the PPLS REST data service at
|
|
``https://ordspub.epa.gov/ords/pesticides/cswu/ppls/{regno}`` —
|
|
returns rich JSON: registrant, active ingredients (with CAS + %),
|
|
formulations, status, signal word, AND a ``pdffiles`` array
|
|
listing every stamped label PDF EPA has accepted for the product.
|
|
The most recent entry gives us the canonical PDF filename
|
|
(``{company6}-{product5}-{YYYYMMDD}.pdf``), solving the
|
|
stamped-date-suffix problem without having to guess.
|
|
|
|
3. **Fetch label PDF** from
|
|
``https://www3.epa.gov/pesticides/chem_search/ppls/{filename}``
|
|
and extract text with pypdf. Many EPA labels are scans with no
|
|
text layer — those are flagged ``text_layer: false`` and the .md
|
|
body is a ``[SCANNED PDF — OCR REQUIRED]`` placeholder. OCR is
|
|
deferred to Phase 2.
|
|
|
|
Paths rejected and why
|
|
----------------------
|
|
- ``/ords/pesticides/ppls/{reg}`` (no ``/cswu/`` prefix): returns the
|
|
APEX HTML splash, not JSON. The undocumented ``/cswu/`` prefix is
|
|
the actual ORDS REST handler.
|
|
- Scraping the APEX UI: session-stateful, fragile, blocked.
|
|
- data.gov mirror: redirects to the same APEX page, no extract.
|
|
- NPIRS (Purdue): subscription-walled; PPIS is the same authoritative
|
|
feed anyway.
|
|
|
|
Required sidecar fields (per task spec): ``source``, ``epa_reg_no``,
|
|
``label_pdf_url``, ``fetched_at``. Everything else best-effort.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import zipfile
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
import httpx
|
|
from pypdf import PdfReader
|
|
from pypdf.errors import PdfReadError
|
|
|
|
SCRAPER_VERSION = "0.1.0"
|
|
USER_AGENT = "ppls-docs-scraper/0.1 (+https://drawbar.example/contact)"
|
|
|
|
PPIS_PRODUCT_ZIP_URL = "https://www3.epa.gov/pesticides/PPISdata/product.zip"
|
|
PPLS_API_BASE = "https://ordspub.epa.gov/ords/pesticides/cswu/ppls"
|
|
PPLS_PDF_BASE = "https://www3.epa.gov/pesticides/chem_search/ppls"
|
|
PPLS_INDEX_URL_TEMPLATE = (
|
|
"https://ordspub.epa.gov/ords/pesticides/f?p=PPLS:102:::NO::P102_REG_NUM:{regno}"
|
|
)
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
# Corpus root is overridable via PPLS_CORPUS_ROOT for routing the
|
|
# corpus to external storage (USB drive, NAS mount, etc.) without
|
|
# editing the repo.
|
|
CORPUS_ROOT = Path(os.environ.get("PPLS_CORPUS_ROOT") or REPO_ROOT / "corpus")
|
|
CORPUS_DIR = CORPUS_ROOT / "epa_ppls"
|
|
|
|
REQUEST_DELAY_SECONDS = 1.1 # polite: ~1 req/sec
|
|
HTTP_TIMEOUT = httpx.Timeout(60.0, connect=15.0)
|
|
MAX_RETRIES = 4
|
|
|
|
# Row-crop scoping. Each pattern is matched case-insensitively against a
|
|
# product's "sites" array from the PPLS API. Word boundaries matter — bare
|
|
# "OATS" naively matches "SHIPS, BOATS, SHIPHOLDS"; bare "RICE" matches
|
|
# "LICORICE"; bare "RYE" matches "FRYER".
|
|
#
|
|
# Scope = the three crops the farmer-advisor consumer focuses on: corn,
|
|
# soybeans, and wheat. Sweet/seed/pop corn included alongside field corn.
|
|
# Empirically (random N=100 sample, 2026-05-23): this narrow allowlist
|
|
# matches ~16% of all PPLS products and only loses ~6% of the broader
|
|
# "all US row crops" hit set, because corn/soy/wheat dominate ag chemistry
|
|
# registrations — almost every product registered for e.g. cotton or
|
|
# sorghum is co-registered for at least one of corn/soy/wheat.
|
|
ROW_CROP_KEYWORDS = (
|
|
"CORN", "MAIZE", "POPCORN",
|
|
"SOYBEAN", "SOYBEANS",
|
|
"WHEAT",
|
|
)
|
|
_ROW_CROP_PATTERNS = tuple(
|
|
re.compile(rf"\b{re.escape(kw)}\b", re.IGNORECASE)
|
|
for kw in ROW_CROP_KEYWORDS
|
|
)
|
|
|
|
|
|
def matches_row_crop(record: "ProductRecord") -> bool:
|
|
"""True if the product's PPLS API sites array contains at least one
|
|
row-crop site (CORN, SOYBEANS, COTTON, etc., with word boundaries)."""
|
|
item = record.raw_api_item or {}
|
|
sites = item.get("sites") or []
|
|
for s in sites:
|
|
site = (s.get("site") or "") if isinstance(s, dict) else str(s)
|
|
if any(p.search(site) for p in _ROW_CROP_PATTERNS):
|
|
return True
|
|
return False
|
|
|
|
|
|
# Registrant allowlist — pre-API filter. Loaded from
|
|
# epa_registrant_allowlist.json so the list can be edited without
|
|
# touching code. Set to None to disable (via --no-registrant-filter).
|
|
_REGISTRANT_ALLOWLIST_PATH = Path(__file__).resolve().parent / "epa_registrant_allowlist.json"
|
|
|
|
|
|
def load_registrant_allowlist() -> set[str]:
|
|
"""Return the set of EPA company numbers (as strings) whose products
|
|
are worth hitting the API for. Empty set on any load error — caller
|
|
should treat that as 'pass everything through'."""
|
|
try:
|
|
data = json.loads(_REGISTRANT_ALLOWLIST_PATH.read_text(encoding="utf-8"))
|
|
return {c["number"] for c in data.get("companies", []) if "number" in c}
|
|
except (OSError, json.JSONDecodeError, KeyError) as exc:
|
|
# Don't fail the scrape over a missing/broken allowlist — log and
|
|
# fall back to no filtering.
|
|
import logging as _log
|
|
_log.getLogger("epa_ppls").warning(
|
|
"could not load registrant allowlist at %s: %s — filter disabled",
|
|
_REGISTRANT_ALLOWLIST_PATH, exc,
|
|
)
|
|
return set()
|
|
|
|
|
|
log = logging.getLogger("epa_ppls")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _client() -> httpx.Client:
|
|
return httpx.Client(
|
|
headers={"User-Agent": USER_AGENT, "Accept-Encoding": "gzip, deflate"},
|
|
timeout=HTTP_TIMEOUT,
|
|
follow_redirects=True,
|
|
)
|
|
|
|
|
|
def _get_with_retries(
|
|
client: httpx.Client, url: str, *, expect_json: bool = False
|
|
) -> httpx.Response:
|
|
"""GET with exponential backoff on 5xx/429/network errors."""
|
|
last_exc: Exception | None = None
|
|
for attempt in range(1, MAX_RETRIES + 1):
|
|
try:
|
|
resp = client.get(url)
|
|
if resp.status_code in (429, 500, 502, 503, 504):
|
|
wait = min(2 ** attempt, 30)
|
|
log.warning(
|
|
"HTTP %s on %s (attempt %d/%d) — sleeping %ds",
|
|
resp.status_code, url, attempt, MAX_RETRIES, wait,
|
|
)
|
|
time.sleep(wait)
|
|
continue
|
|
resp.raise_for_status()
|
|
if expect_json:
|
|
# ORDS sometimes returns text/html error pages with 200 — sanity
|
|
ctype = resp.headers.get("content-type", "")
|
|
if "json" not in ctype.lower():
|
|
raise httpx.HTTPError(
|
|
f"Expected JSON, got content-type={ctype!r} for {url}"
|
|
)
|
|
return resp
|
|
except (httpx.TransportError, httpx.HTTPError) as exc:
|
|
last_exc = exc
|
|
wait = min(2 ** attempt, 30)
|
|
log.warning(
|
|
"Network error on %s (attempt %d/%d): %s — sleeping %ds",
|
|
url, attempt, MAX_RETRIES, exc, wait,
|
|
)
|
|
time.sleep(wait)
|
|
raise RuntimeError(f"GET {url} failed after {MAX_RETRIES} attempts: {last_exc}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Enumeration: PPIS bulk product.zip
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class PpisRow:
|
|
"""One row of PPIS product.txt — enough to hydrate via the API."""
|
|
epa_reg_no: str
|
|
product_name: str
|
|
status_flag: str # 'F' (federal/active) or 'T' (transferred)
|
|
rup_flag: str # 'Y' or 'N'
|
|
|
|
|
|
def _parse_ppis_line(line: str) -> PpisRow | None:
|
|
"""Parse one 107-char PPIS product.txt row.
|
|
|
|
Layout (1-indexed, inferred from inspection):
|
|
1-6 company number (zero-padded, may contain trailing spaces)
|
|
7-11 product number (zero-padded, may contain trailing spaces)
|
|
33-102 product name (70 chars, space-padded)
|
|
103 status flag ('F' or 'T')
|
|
106 RUP flag ('Y' or 'N')
|
|
"""
|
|
if len(line) < 106:
|
|
return None
|
|
company_raw = line[0:6].strip()
|
|
product_raw = line[6:11].strip()
|
|
if not company_raw or not product_raw:
|
|
return None
|
|
# Strip leading zeros for canonical EPA Reg No display
|
|
try:
|
|
company = str(int(company_raw))
|
|
product = str(int(product_raw))
|
|
except ValueError:
|
|
return None
|
|
name = line[32:102].strip()
|
|
status_flag = line[102:103]
|
|
rup_flag = line[105:106] if len(line) > 105 else "N"
|
|
return PpisRow(
|
|
epa_reg_no=f"{company}-{product}",
|
|
product_name=name,
|
|
status_flag=status_flag,
|
|
rup_flag=rup_flag,
|
|
)
|
|
|
|
|
|
def fetch_ppis_index(client: httpx.Client) -> list[PpisRow]:
|
|
"""Download PPIS product.zip and parse into PpisRow list."""
|
|
log.info("Fetching PPIS index from %s", PPIS_PRODUCT_ZIP_URL)
|
|
resp = _get_with_retries(client, PPIS_PRODUCT_ZIP_URL)
|
|
rows: list[PpisRow] = []
|
|
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
|
|
with zf.open("product.txt") as fh:
|
|
for raw in fh:
|
|
line = raw.decode("latin-1").rstrip("\n").rstrip("\r")
|
|
row = _parse_ppis_line(line)
|
|
if row is not None:
|
|
rows.append(row)
|
|
log.info("Parsed %d rows from PPIS index", len(rows))
|
|
return rows
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hydration: PPLS JSON API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _zero_pad_regno(regno: str) -> str:
|
|
"""524-475 -> 000524-00475 (canonical filename form). Distributor suffix
|
|
(524-475-12345) -> 000524-00475-12345."""
|
|
parts = regno.split("-")
|
|
if len(parts) == 2:
|
|
c, p = parts
|
|
return f"{int(c):06d}-{int(p):05d}"
|
|
if len(parts) == 3:
|
|
c, p, d = parts
|
|
return f"{int(c):06d}-{int(p):05d}-{int(d):05d}"
|
|
return regno
|
|
|
|
|
|
_MONTHS = {
|
|
"january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6,
|
|
"july": 7, "august": 8, "september": 9, "october": 10, "november": 11,
|
|
"december": 12,
|
|
}
|
|
|
|
|
|
def _parse_label_date(text: str | None) -> str | None:
|
|
"""'October 18, 2016' -> '2016-10-18'. Returns None on any parse issue."""
|
|
if not text:
|
|
return None
|
|
m = re.match(r"^([A-Za-z]+)\s+(\d{1,2}),\s+(\d{4})$", text.strip())
|
|
if not m:
|
|
return None
|
|
month = _MONTHS.get(m.group(1).lower())
|
|
if month is None:
|
|
return None
|
|
try:
|
|
return f"{int(m.group(3)):04d}-{month:02d}-{int(m.group(2)):02d}"
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _http_date_to_iso(http_date: str | None) -> str | None:
|
|
"""RFC1123 'Wed, 19 Oct 2016 17:48:09 GMT' -> ISO 8601 UTC.
|
|
|
|
Returns None on unparseable input. Matches the canonical schema's
|
|
requirement that all timestamps be ISO 8601.
|
|
"""
|
|
if not http_date:
|
|
return None
|
|
try:
|
|
from email.utils import parsedate_to_datetime
|
|
dt = parsedate_to_datetime(http_date)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=UTC)
|
|
return dt.astimezone(UTC).isoformat()
|
|
except Exception: # noqa: BLE001
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class ProductRecord:
|
|
epa_reg_no: str
|
|
product_name: str | None
|
|
registrant: str | None
|
|
registrant_company_number: str | None
|
|
active_ingredients: list[dict[str, Any]]
|
|
label_pdf_url: str | None
|
|
label_pdf_filename: str | None
|
|
label_accepted_date: str | None
|
|
registration_status: str | None
|
|
signal_word: str | None
|
|
raw_api_item: dict[str, Any] | None = field(repr=False, default=None)
|
|
|
|
|
|
def fetch_product_record(client: httpx.Client, regno: str) -> ProductRecord:
|
|
"""Call the PPLS API for one EPA Reg No; build a ProductRecord."""
|
|
url = f"{PPLS_API_BASE}/{regno}"
|
|
resp = _get_with_retries(client, url, expect_json=True)
|
|
payload = resp.json()
|
|
items = payload.get("items") or []
|
|
if not items:
|
|
return ProductRecord(
|
|
epa_reg_no=regno,
|
|
product_name=None,
|
|
registrant=None,
|
|
registrant_company_number=None,
|
|
active_ingredients=[],
|
|
label_pdf_url=None,
|
|
label_pdf_filename=None,
|
|
label_accepted_date=None,
|
|
registration_status=None,
|
|
signal_word=None,
|
|
raw_api_item=None,
|
|
)
|
|
item = items[0]
|
|
company_info = (item.get("companyinfo") or [{}])[0]
|
|
registrant = company_info.get("name")
|
|
company_num = regno.split("-")[0]
|
|
ingredients = []
|
|
for ai in item.get("active_ingredients") or []:
|
|
ingredients.append({
|
|
"name": ai.get("active_ing"),
|
|
"cas": ai.get("cas_number"),
|
|
"percent": ai.get("active_ing_percent"),
|
|
"pc_code": ai.get("pc_code"),
|
|
})
|
|
pdffiles = item.get("pdffiles") or []
|
|
# Most recent PDF first (sorted by date desc); API returns them in
|
|
# date-descending order but we sort defensively.
|
|
pdf_entry: dict[str, Any] | None = None
|
|
if pdffiles:
|
|
def _date_key(e: dict[str, Any]) -> str:
|
|
d = _parse_label_date(e.get("pdffile_accepted_date"))
|
|
return d or "0000-00-00"
|
|
pdf_entry = max(pdffiles, key=_date_key)
|
|
pdf_filename = pdf_entry.get("pdffile") if pdf_entry else None
|
|
pdf_url = f"{PPLS_PDF_BASE}/{pdf_filename}" if pdf_filename else None
|
|
accepted = _parse_label_date(pdf_entry.get("pdffile_accepted_date")) if pdf_entry else None
|
|
return ProductRecord(
|
|
epa_reg_no=regno,
|
|
product_name=item.get("productname"),
|
|
registrant=registrant,
|
|
registrant_company_number=company_num,
|
|
active_ingredients=ingredients,
|
|
label_pdf_url=pdf_url,
|
|
label_pdf_filename=pdf_filename,
|
|
label_accepted_date=accepted,
|
|
registration_status=item.get("product_status"),
|
|
signal_word=item.get("signal_word"),
|
|
raw_api_item=item,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PDF download + text extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def download_pdf(client: httpx.Client, url: str) -> tuple[bytes, str | None]:
|
|
"""Download a label PDF; return (bytes, Last-Modified header or None)."""
|
|
resp = _get_with_retries(client, url)
|
|
last_modified = resp.headers.get("last-modified")
|
|
return resp.content, last_modified
|
|
|
|
|
|
def extract_pdf_text(pdf_bytes: bytes) -> tuple[str, bool]:
|
|
"""Extract text from a PDF.
|
|
|
|
Returns (text, has_text_layer). Concatenates pages, normalizes whitespace.
|
|
If no extractable text is found, returns ("", False).
|
|
"""
|
|
try:
|
|
reader = PdfReader(io.BytesIO(pdf_bytes))
|
|
except PdfReadError as exc:
|
|
log.warning("pypdf failed to read PDF: %s", exc)
|
|
return "", False
|
|
chunks: list[str] = []
|
|
for i, page in enumerate(reader.pages):
|
|
try:
|
|
page_text = page.extract_text() or ""
|
|
except Exception as exc: # pypdf can throw on malformed pages
|
|
log.warning("pypdf extract_text failed on page %d: %s", i, exc)
|
|
page_text = ""
|
|
page_text = re.sub(r"[ \t]+", " ", page_text)
|
|
page_text = re.sub(r"\n{3,}", "\n\n", page_text).strip()
|
|
if page_text:
|
|
chunks.append(page_text)
|
|
combined = "\n\n".join(chunks).strip()
|
|
return combined, bool(combined)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-product processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _md_path(regno: str) -> Path:
|
|
return CORPUS_DIR / f"{regno}.md"
|
|
|
|
|
|
def _json_path(regno: str) -> Path:
|
|
return CORPUS_DIR / f"{regno}.json"
|
|
|
|
|
|
def process_one(
|
|
client: httpx.Client,
|
|
regno: str,
|
|
*,
|
|
force: bool = False,
|
|
row_crop_filter: bool = True,
|
|
) -> str:
|
|
"""Fetch + extract one product. Returns
|
|
'skipped'|'wrote'|'no-pdf'|'error'|'filtered'."""
|
|
md_path = _md_path(regno)
|
|
json_path = _json_path(regno)
|
|
if not force and md_path.exists() and json_path.exists():
|
|
log.info("[%s] skip (already on disk)", regno)
|
|
return "skipped"
|
|
|
|
try:
|
|
record = fetch_product_record(client, regno)
|
|
except Exception as exc:
|
|
log.error("[%s] API fetch failed: %s", regno, exc)
|
|
return "error"
|
|
time.sleep(REQUEST_DELAY_SECONDS)
|
|
|
|
if row_crop_filter and not matches_row_crop(record):
|
|
log.info("[%s] filtered (not row-crop)", regno)
|
|
return "filtered"
|
|
|
|
def _build_sidecar(
|
|
*,
|
|
label_url: str | None,
|
|
label_filename: str | None,
|
|
label_last_modified_iso: str | None,
|
|
page_count: int | None,
|
|
text_layer: bool | None,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"source": "epa_ppls",
|
|
"source_key": regno,
|
|
"epa_reg_no": regno,
|
|
"product_name": record.product_name,
|
|
"product_class": None, # EPA PPLS doesn't expose a clean class taxonomy
|
|
"registrant": record.registrant,
|
|
"active_ingredients": record.active_ingredients,
|
|
"signal_word": record.signal_word,
|
|
"label": {
|
|
"url": label_url,
|
|
"filename": label_filename,
|
|
"accepted_date": record.label_accepted_date,
|
|
"last_modified": label_last_modified_iso,
|
|
"page_count": page_count,
|
|
"text_layer": text_layer,
|
|
},
|
|
"supplemental_documents": [], # EPA PPLS sidecar omits supplementals; query API per regno
|
|
"source_urls": {
|
|
"product_page": None,
|
|
"label_api": f"{PPLS_API_BASE}/{regno}",
|
|
"label_index": PPLS_INDEX_URL_TEMPLATE.format(regno=regno),
|
|
},
|
|
# EPA-specific extras (kept out of the strict canonical schema but
|
|
# useful for joins back to EPA's data model)
|
|
"registration_status": record.registration_status,
|
|
"registrant_company_number": record.registrant_company_number,
|
|
"fetched_at": datetime.now(UTC).isoformat(),
|
|
"scraper_version": SCRAPER_VERSION,
|
|
}
|
|
|
|
if not record.label_pdf_url:
|
|
log.warning("[%s] no label PDF available — writing sidecar only", regno)
|
|
md_path.write_text(
|
|
f"# {record.product_name or regno}\n\n"
|
|
f"EPA Reg No: {regno}\n\n"
|
|
"[NO LABEL PDF AVAILABLE FROM EPA PPLS]\n",
|
|
encoding="utf-8",
|
|
)
|
|
sidecar = _build_sidecar(
|
|
label_url=None, label_filename=None,
|
|
label_last_modified_iso=None,
|
|
page_count=None, text_layer=False,
|
|
)
|
|
json_path.write_text(json.dumps(sidecar, indent=2), encoding="utf-8")
|
|
return "no-pdf"
|
|
|
|
try:
|
|
pdf_bytes, last_modified_raw = download_pdf(client, record.label_pdf_url)
|
|
except Exception as exc:
|
|
log.error("[%s] PDF download failed: %s", regno, exc)
|
|
return "error"
|
|
time.sleep(REQUEST_DELAY_SECONDS)
|
|
|
|
text, has_text = extract_pdf_text(pdf_bytes)
|
|
last_modified_iso = _http_date_to_iso(last_modified_raw)
|
|
|
|
page_count: int | None = None
|
|
try:
|
|
page_count = len(PdfReader(io.BytesIO(pdf_bytes)).pages)
|
|
except Exception:
|
|
pass
|
|
|
|
sidecar = _build_sidecar(
|
|
label_url=record.label_pdf_url,
|
|
label_filename=record.label_pdf_filename,
|
|
label_last_modified_iso=last_modified_iso,
|
|
page_count=page_count,
|
|
text_layer=has_text,
|
|
)
|
|
|
|
header_lines = [f"# {record.product_name or regno}", ""]
|
|
header_lines.append(f"- EPA Reg No: **{regno}**")
|
|
if record.registrant:
|
|
header_lines.append(f"- Registrant: {record.registrant}")
|
|
if record.signal_word:
|
|
header_lines.append(f"- Signal word: {record.signal_word}")
|
|
if record.active_ingredients:
|
|
ai_strs = [
|
|
f"{ai.get('name')} ({ai.get('percent')}%)"
|
|
for ai in record.active_ingredients
|
|
if ai.get("name")
|
|
]
|
|
if ai_strs:
|
|
header_lines.append("- Active ingredients: " + "; ".join(ai_strs))
|
|
if record.label_accepted_date:
|
|
header_lines.append(f"- Label accepted: {record.label_accepted_date}")
|
|
header_lines.append(f"- Source PDF: {record.label_pdf_url}")
|
|
header_lines.append("")
|
|
header_lines.append("---")
|
|
header_lines.append("")
|
|
|
|
if has_text:
|
|
body = text
|
|
else:
|
|
body = "[SCANNED PDF — OCR REQUIRED]\n\nThis label has no extractable text layer."
|
|
log.info("[%s] PDF has no text layer (scanned)", regno)
|
|
|
|
md_content = "\n".join(header_lines) + body + "\n"
|
|
md_path.write_text(md_content, encoding="utf-8")
|
|
json_path.write_text(json.dumps(sidecar, indent=2), encoding="utf-8")
|
|
log.info(
|
|
"[%s] wrote (text_layer=%s, pages=%s, name=%r)",
|
|
regno, has_text, page_count, record.product_name,
|
|
)
|
|
return "wrote"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _iter_regnos(
|
|
args: argparse.Namespace,
|
|
client: httpx.Client,
|
|
) -> Iterable[str]:
|
|
"""Yield reg nos to process based on CLI args."""
|
|
if args.reg_no:
|
|
for r in args.reg_no:
|
|
yield r
|
|
return
|
|
if args.seed_file:
|
|
with open(args.seed_file, encoding="utf-8") as fh:
|
|
for raw in fh:
|
|
line = raw.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
yield line
|
|
return
|
|
# Default: enumerate via PPIS bulk index
|
|
rows = fetch_ppis_index(client)
|
|
allowlist = load_registrant_allowlist() if args.registrant_filter else set()
|
|
if allowlist:
|
|
log.info("registrant filter ON: %d companies in allowlist", len(allowlist))
|
|
else:
|
|
log.info("registrant filter OFF: enumerating all PPIS active products")
|
|
count = 0
|
|
skipped_registrant = 0
|
|
for row in rows:
|
|
# Skip transferred-out (status_flag 'T') entries by default; their
|
|
# registration has moved to another company-product pairing.
|
|
if row.status_flag == "T":
|
|
continue
|
|
# Pre-API filter: skip products from registrants not on the
|
|
# row-crop ag-chem allowlist. Saves one API call per skipped
|
|
# product. Bypass with --no-registrant-filter.
|
|
if allowlist:
|
|
company_num = row.epa_reg_no.split("-", 1)[0]
|
|
if company_num not in allowlist:
|
|
skipped_registrant += 1
|
|
continue
|
|
yield row.epa_reg_no
|
|
count += 1
|
|
if args.limit and count >= args.limit:
|
|
break
|
|
if skipped_registrant:
|
|
log.info("registrant filter skipped %d PPIS rows", skipped_registrant)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(
|
|
prog="python -m scrape.sources.epa_ppls",
|
|
description="Scrape EPA PPLS pesticide labels into corpus/epa_ppls/.",
|
|
)
|
|
parser.add_argument(
|
|
"--limit", type=int, default=None,
|
|
help="Max products to process when enumerating from PPIS.",
|
|
)
|
|
parser.add_argument(
|
|
"--force", action="store_true",
|
|
help="Re-fetch even if .md/.json already exist.",
|
|
)
|
|
parser.add_argument(
|
|
"--reg-no", action="append", metavar="REGNO",
|
|
help="Process specific EPA Reg No (e.g. 524-475). Repeatable.",
|
|
)
|
|
parser.add_argument(
|
|
"--seed-file", metavar="PATH",
|
|
help="Text file with one EPA Reg No per line (# comments OK).",
|
|
)
|
|
parser.add_argument(
|
|
"--row-crop-filter", action=argparse.BooleanOptionalAction, default=True,
|
|
help="Keep only products with row-crop sites (corn, soy, cotton, "
|
|
"wheat, rice, sorghum, etc.). Default on; use --no-row-crop-filter "
|
|
"to scrape every PPLS product regardless of crop.",
|
|
)
|
|
parser.add_argument(
|
|
"--registrant-filter", action=argparse.BooleanOptionalAction, default=True,
|
|
help="Pre-API filter at PPIS enumeration time: only consider products "
|
|
"whose company number is in scrape/sources/epa_registrant_allowlist.json "
|
|
"(the major US row-crop ag-chem registrants). Default on, since most "
|
|
"of the PPIS universe is non-ag. --no-registrant-filter to enumerate "
|
|
"everything.",
|
|
)
|
|
parser.add_argument(
|
|
"--log-level", default="INFO",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
|
|
)
|
|
args = parser.parse_args(argv)
|
|
|
|
logging.basicConfig(
|
|
stream=sys.stderr,
|
|
level=getattr(logging, args.log_level),
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
|
|
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
summary = {"wrote": 0, "skipped": 0, "no-pdf": 0, "filtered": 0, "error": 0}
|
|
with _client() as client:
|
|
for regno in _iter_regnos(args, client):
|
|
result = process_one(
|
|
client, regno,
|
|
force=args.force,
|
|
row_crop_filter=args.row_crop_filter,
|
|
)
|
|
summary[result] = summary.get(result, 0) + 1
|
|
|
|
log.info("done: %s", summary)
|
|
print(json.dumps(summary), file=sys.stderr)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|