60657aa6df
The farmer-advisor consumer only cares about US row crops, so the EPA
scraper now drops products without at least one row-crop site in the
PPLS API response. Filter is on by default; --no-row-crop-filter
overrides for one-off broader pulls.
Filter shape:
- Word-boundary regex match against each entry in the API's `sites`
array (e.g., "SOYBEANS (FOLIAR TREATMENT)" → keep, "SHIPS, BOATS,
SHIPHOLDS" → drop even though it contains "OATS" as substring).
- Allowlist covers the major US row + small-grain + oilseed + sugar/
fiber crops, plus alfalfa as a common rotation crop. See
ROW_CROP_KEYWORDS in scrape/sources/epa_ppls.py for the full list.
Cost model:
- 102K PPIS rows still need one API call each (no bulk filter
available upstream), so enumeration still takes ~28h at 1 req/sec.
- But PDF downloads drop from ~67K → ~5-10K (estimated row-crop
hit rate), saving ~17h wall time and ~60GB disk on a full backfill.
Smoke test (4 mixed reg nos):
524-475 Roundup Ultra → kept (CORN/SOYBEANS/COTTON sites)
524-591 Warrant → kept (CORN/SOYBEANS/SORGHUM sites)
100-1486 Advion Cockroach → filtered (building/transport sites only)
432-1276 (Bayer pet flea) → filtered (no row crops)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
656 lines
23 KiB
Python
656 lines
23 KiB
Python
"""EPA PPLS (Pesticide Product Label System) scraper.
|
|
|
|
Enumeration strategy
|
|
====================
|
|
The PPLS Oracle APEX portal (ordspub.epa.gov/ords/pesticides/f?p=PPLS:1)
|
|
is session-stateful and hostile to enumeration, so we use a two-phase
|
|
approach that bypasses APEX entirely:
|
|
|
|
1. **List products** via the public PPIS bulk download
|
|
``https://www3.epa.gov/pesticides/PPISdata/product.zip`` — a 107-char
|
|
fixed-width flat file (``product.txt``, ~102K active Section 3
|
|
registrations, refreshed every Tuesday). Gives us the universe of
|
|
EPA Reg Nos (company-product), plus the product name.
|
|
|
|
2. **Hydrate per product** via the PPLS REST data service at
|
|
``https://ordspub.epa.gov/ords/pesticides/cswu/ppls/{regno}`` —
|
|
returns rich JSON: registrant, active ingredients (with CAS + %),
|
|
formulations, status, signal word, AND a ``pdffiles`` array
|
|
listing every stamped label PDF EPA has accepted for the product.
|
|
The most recent entry gives us the canonical PDF filename
|
|
(``{company6}-{product5}-{YYYYMMDD}.pdf``), solving the
|
|
stamped-date-suffix problem without having to guess.
|
|
|
|
3. **Fetch label PDF** from
|
|
``https://www3.epa.gov/pesticides/chem_search/ppls/{filename}``
|
|
and extract text with pypdf. Many EPA labels are scans with no
|
|
text layer — those are flagged ``text_layer: false`` and the .md
|
|
body is a ``[SCANNED PDF — OCR REQUIRED]`` placeholder. OCR is
|
|
deferred to Phase 2.
|
|
|
|
Paths rejected and why
|
|
----------------------
|
|
- ``/ords/pesticides/ppls/{reg}`` (no ``/cswu/`` prefix): returns the
|
|
APEX HTML splash, not JSON. The undocumented ``/cswu/`` prefix is
|
|
the actual ORDS REST handler.
|
|
- Scraping the APEX UI: session-stateful, fragile, blocked.
|
|
- data.gov mirror: redirects to the same APEX page, no extract.
|
|
- NPIRS (Purdue): subscription-walled; PPIS is the same authoritative
|
|
feed anyway.
|
|
|
|
Required sidecar fields (per task spec): ``source``, ``epa_reg_no``,
|
|
``label_pdf_url``, ``fetched_at``. Everything else best-effort.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import time
|
|
import zipfile
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
import httpx
|
|
from pypdf import PdfReader
|
|
from pypdf.errors import PdfReadError
|
|
|
|
SCRAPER_VERSION = "0.1.0"
|
|
USER_AGENT = "ppls-docs-scraper/0.1 (+https://drawbar.example/contact)"
|
|
|
|
PPIS_PRODUCT_ZIP_URL = "https://www3.epa.gov/pesticides/PPISdata/product.zip"
|
|
PPLS_API_BASE = "https://ordspub.epa.gov/ords/pesticides/cswu/ppls"
|
|
PPLS_PDF_BASE = "https://www3.epa.gov/pesticides/chem_search/ppls"
|
|
PPLS_INDEX_URL_TEMPLATE = (
|
|
"https://ordspub.epa.gov/ords/pesticides/f?p=PPLS:102:::NO::P102_REG_NUM:{regno}"
|
|
)
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
CORPUS_DIR = REPO_ROOT / "corpus" / "epa_ppls"
|
|
|
|
REQUEST_DELAY_SECONDS = 1.1 # polite: ~1 req/sec
|
|
HTTP_TIMEOUT = httpx.Timeout(60.0, connect=15.0)
|
|
MAX_RETRIES = 4
|
|
|
|
# Row-crop scoping. Each pattern is matched case-insensitively against a
|
|
# product's "sites" array from the PPLS API. Word boundaries matter — bare
|
|
# "OATS" naively matches "SHIPS, BOATS, SHIPHOLDS"; bare "RICE" matches
|
|
# "LICORICE"; bare "RYE" matches "FRYER".
|
|
#
|
|
# Scope = the major US row + small-grain + oilseed + sugar/fiber crops the
|
|
# farmer-advisor consumer cares about. Alfalfa included as a common rotation
|
|
# crop; sweet/seed corn included alongside field corn.
|
|
ROW_CROP_KEYWORDS = (
|
|
"CORN", "MAIZE", "POPCORN",
|
|
"SOYBEAN", "SOYBEANS",
|
|
"COTTON",
|
|
"WHEAT",
|
|
"RICE",
|
|
"SORGHUM", "MILO",
|
|
"BARLEY", "OATS", "RYE",
|
|
"SUNFLOWER", "SUNFLOWERS",
|
|
"PEANUT", "PEANUTS",
|
|
"SUGAR BEET", "SUGAR BEETS",
|
|
"DRY BEAN", "DRY BEANS", "FIELD BEAN", "FIELD BEANS",
|
|
"CANOLA", "RAPESEED",
|
|
"ALFALFA",
|
|
)
|
|
_ROW_CROP_PATTERNS = tuple(
|
|
re.compile(rf"\b{re.escape(kw)}\b", re.IGNORECASE)
|
|
for kw in ROW_CROP_KEYWORDS
|
|
)
|
|
|
|
|
|
def matches_row_crop(record: "ProductRecord") -> bool:
|
|
"""True if the product's PPLS API sites array contains at least one
|
|
row-crop site (CORN, SOYBEANS, COTTON, etc., with word boundaries)."""
|
|
item = record.raw_api_item or {}
|
|
sites = item.get("sites") or []
|
|
for s in sites:
|
|
site = (s.get("site") or "") if isinstance(s, dict) else str(s)
|
|
if any(p.search(site) for p in _ROW_CROP_PATTERNS):
|
|
return True
|
|
return False
|
|
|
|
log = logging.getLogger("epa_ppls")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _client() -> httpx.Client:
|
|
return httpx.Client(
|
|
headers={"User-Agent": USER_AGENT, "Accept-Encoding": "gzip, deflate"},
|
|
timeout=HTTP_TIMEOUT,
|
|
follow_redirects=True,
|
|
)
|
|
|
|
|
|
def _get_with_retries(
|
|
client: httpx.Client, url: str, *, expect_json: bool = False
|
|
) -> httpx.Response:
|
|
"""GET with exponential backoff on 5xx/429/network errors."""
|
|
last_exc: Exception | None = None
|
|
for attempt in range(1, MAX_RETRIES + 1):
|
|
try:
|
|
resp = client.get(url)
|
|
if resp.status_code in (429, 500, 502, 503, 504):
|
|
wait = min(2 ** attempt, 30)
|
|
log.warning(
|
|
"HTTP %s on %s (attempt %d/%d) — sleeping %ds",
|
|
resp.status_code, url, attempt, MAX_RETRIES, wait,
|
|
)
|
|
time.sleep(wait)
|
|
continue
|
|
resp.raise_for_status()
|
|
if expect_json:
|
|
# ORDS sometimes returns text/html error pages with 200 — sanity
|
|
ctype = resp.headers.get("content-type", "")
|
|
if "json" not in ctype.lower():
|
|
raise httpx.HTTPError(
|
|
f"Expected JSON, got content-type={ctype!r} for {url}"
|
|
)
|
|
return resp
|
|
except (httpx.TransportError, httpx.HTTPError) as exc:
|
|
last_exc = exc
|
|
wait = min(2 ** attempt, 30)
|
|
log.warning(
|
|
"Network error on %s (attempt %d/%d): %s — sleeping %ds",
|
|
url, attempt, MAX_RETRIES, exc, wait,
|
|
)
|
|
time.sleep(wait)
|
|
raise RuntimeError(f"GET {url} failed after {MAX_RETRIES} attempts: {last_exc}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Enumeration: PPIS bulk product.zip
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class PpisRow:
|
|
"""One row of PPIS product.txt — enough to hydrate via the API."""
|
|
epa_reg_no: str
|
|
product_name: str
|
|
status_flag: str # 'F' (federal/active) or 'T' (transferred)
|
|
rup_flag: str # 'Y' or 'N'
|
|
|
|
|
|
def _parse_ppis_line(line: str) -> PpisRow | None:
|
|
"""Parse one 107-char PPIS product.txt row.
|
|
|
|
Layout (1-indexed, inferred from inspection):
|
|
1-6 company number (zero-padded, may contain trailing spaces)
|
|
7-11 product number (zero-padded, may contain trailing spaces)
|
|
33-102 product name (70 chars, space-padded)
|
|
103 status flag ('F' or 'T')
|
|
106 RUP flag ('Y' or 'N')
|
|
"""
|
|
if len(line) < 106:
|
|
return None
|
|
company_raw = line[0:6].strip()
|
|
product_raw = line[6:11].strip()
|
|
if not company_raw or not product_raw:
|
|
return None
|
|
# Strip leading zeros for canonical EPA Reg No display
|
|
try:
|
|
company = str(int(company_raw))
|
|
product = str(int(product_raw))
|
|
except ValueError:
|
|
return None
|
|
name = line[32:102].strip()
|
|
status_flag = line[102:103]
|
|
rup_flag = line[105:106] if len(line) > 105 else "N"
|
|
return PpisRow(
|
|
epa_reg_no=f"{company}-{product}",
|
|
product_name=name,
|
|
status_flag=status_flag,
|
|
rup_flag=rup_flag,
|
|
)
|
|
|
|
|
|
def fetch_ppis_index(client: httpx.Client) -> list[PpisRow]:
|
|
"""Download PPIS product.zip and parse into PpisRow list."""
|
|
log.info("Fetching PPIS index from %s", PPIS_PRODUCT_ZIP_URL)
|
|
resp = _get_with_retries(client, PPIS_PRODUCT_ZIP_URL)
|
|
rows: list[PpisRow] = []
|
|
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
|
|
with zf.open("product.txt") as fh:
|
|
for raw in fh:
|
|
line = raw.decode("latin-1").rstrip("\n").rstrip("\r")
|
|
row = _parse_ppis_line(line)
|
|
if row is not None:
|
|
rows.append(row)
|
|
log.info("Parsed %d rows from PPIS index", len(rows))
|
|
return rows
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hydration: PPLS JSON API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _zero_pad_regno(regno: str) -> str:
|
|
"""524-475 -> 000524-00475 (canonical filename form). Distributor suffix
|
|
(524-475-12345) -> 000524-00475-12345."""
|
|
parts = regno.split("-")
|
|
if len(parts) == 2:
|
|
c, p = parts
|
|
return f"{int(c):06d}-{int(p):05d}"
|
|
if len(parts) == 3:
|
|
c, p, d = parts
|
|
return f"{int(c):06d}-{int(p):05d}-{int(d):05d}"
|
|
return regno
|
|
|
|
|
|
_MONTHS = {
|
|
"january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6,
|
|
"july": 7, "august": 8, "september": 9, "october": 10, "november": 11,
|
|
"december": 12,
|
|
}
|
|
|
|
|
|
def _parse_label_date(text: str | None) -> str | None:
|
|
"""'October 18, 2016' -> '2016-10-18'. Returns None on any parse issue."""
|
|
if not text:
|
|
return None
|
|
m = re.match(r"^([A-Za-z]+)\s+(\d{1,2}),\s+(\d{4})$", text.strip())
|
|
if not m:
|
|
return None
|
|
month = _MONTHS.get(m.group(1).lower())
|
|
if month is None:
|
|
return None
|
|
try:
|
|
return f"{int(m.group(3)):04d}-{month:02d}-{int(m.group(2)):02d}"
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _http_date_to_iso(http_date: str | None) -> str | None:
|
|
"""RFC1123 'Wed, 19 Oct 2016 17:48:09 GMT' -> ISO 8601 UTC.
|
|
|
|
Returns None on unparseable input. Matches the canonical schema's
|
|
requirement that all timestamps be ISO 8601.
|
|
"""
|
|
if not http_date:
|
|
return None
|
|
try:
|
|
from email.utils import parsedate_to_datetime
|
|
dt = parsedate_to_datetime(http_date)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=UTC)
|
|
return dt.astimezone(UTC).isoformat()
|
|
except Exception: # noqa: BLE001
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class ProductRecord:
|
|
epa_reg_no: str
|
|
product_name: str | None
|
|
registrant: str | None
|
|
registrant_company_number: str | None
|
|
active_ingredients: list[dict[str, Any]]
|
|
label_pdf_url: str | None
|
|
label_pdf_filename: str | None
|
|
label_accepted_date: str | None
|
|
registration_status: str | None
|
|
signal_word: str | None
|
|
raw_api_item: dict[str, Any] | None = field(repr=False, default=None)
|
|
|
|
|
|
def fetch_product_record(client: httpx.Client, regno: str) -> ProductRecord:
|
|
"""Call the PPLS API for one EPA Reg No; build a ProductRecord."""
|
|
url = f"{PPLS_API_BASE}/{regno}"
|
|
resp = _get_with_retries(client, url, expect_json=True)
|
|
payload = resp.json()
|
|
items = payload.get("items") or []
|
|
if not items:
|
|
return ProductRecord(
|
|
epa_reg_no=regno,
|
|
product_name=None,
|
|
registrant=None,
|
|
registrant_company_number=None,
|
|
active_ingredients=[],
|
|
label_pdf_url=None,
|
|
label_pdf_filename=None,
|
|
label_accepted_date=None,
|
|
registration_status=None,
|
|
signal_word=None,
|
|
raw_api_item=None,
|
|
)
|
|
item = items[0]
|
|
company_info = (item.get("companyinfo") or [{}])[0]
|
|
registrant = company_info.get("name")
|
|
company_num = regno.split("-")[0]
|
|
ingredients = []
|
|
for ai in item.get("active_ingredients") or []:
|
|
ingredients.append({
|
|
"name": ai.get("active_ing"),
|
|
"cas": ai.get("cas_number"),
|
|
"percent": ai.get("active_ing_percent"),
|
|
"pc_code": ai.get("pc_code"),
|
|
})
|
|
pdffiles = item.get("pdffiles") or []
|
|
# Most recent PDF first (sorted by date desc); API returns them in
|
|
# date-descending order but we sort defensively.
|
|
pdf_entry: dict[str, Any] | None = None
|
|
if pdffiles:
|
|
def _date_key(e: dict[str, Any]) -> str:
|
|
d = _parse_label_date(e.get("pdffile_accepted_date"))
|
|
return d or "0000-00-00"
|
|
pdf_entry = max(pdffiles, key=_date_key)
|
|
pdf_filename = pdf_entry.get("pdffile") if pdf_entry else None
|
|
pdf_url = f"{PPLS_PDF_BASE}/{pdf_filename}" if pdf_filename else None
|
|
accepted = _parse_label_date(pdf_entry.get("pdffile_accepted_date")) if pdf_entry else None
|
|
return ProductRecord(
|
|
epa_reg_no=regno,
|
|
product_name=item.get("productname"),
|
|
registrant=registrant,
|
|
registrant_company_number=company_num,
|
|
active_ingredients=ingredients,
|
|
label_pdf_url=pdf_url,
|
|
label_pdf_filename=pdf_filename,
|
|
label_accepted_date=accepted,
|
|
registration_status=item.get("product_status"),
|
|
signal_word=item.get("signal_word"),
|
|
raw_api_item=item,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PDF download + text extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def download_pdf(client: httpx.Client, url: str) -> tuple[bytes, str | None]:
|
|
"""Download a label PDF; return (bytes, Last-Modified header or None)."""
|
|
resp = _get_with_retries(client, url)
|
|
last_modified = resp.headers.get("last-modified")
|
|
return resp.content, last_modified
|
|
|
|
|
|
def extract_pdf_text(pdf_bytes: bytes) -> tuple[str, bool]:
|
|
"""Extract text from a PDF.
|
|
|
|
Returns (text, has_text_layer). Concatenates pages, normalizes whitespace.
|
|
If no extractable text is found, returns ("", False).
|
|
"""
|
|
try:
|
|
reader = PdfReader(io.BytesIO(pdf_bytes))
|
|
except PdfReadError as exc:
|
|
log.warning("pypdf failed to read PDF: %s", exc)
|
|
return "", False
|
|
chunks: list[str] = []
|
|
for i, page in enumerate(reader.pages):
|
|
try:
|
|
page_text = page.extract_text() or ""
|
|
except Exception as exc: # pypdf can throw on malformed pages
|
|
log.warning("pypdf extract_text failed on page %d: %s", i, exc)
|
|
page_text = ""
|
|
page_text = re.sub(r"[ \t]+", " ", page_text)
|
|
page_text = re.sub(r"\n{3,}", "\n\n", page_text).strip()
|
|
if page_text:
|
|
chunks.append(page_text)
|
|
combined = "\n\n".join(chunks).strip()
|
|
return combined, bool(combined)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-product processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _md_path(regno: str) -> Path:
|
|
return CORPUS_DIR / f"{regno}.md"
|
|
|
|
|
|
def _json_path(regno: str) -> Path:
|
|
return CORPUS_DIR / f"{regno}.json"
|
|
|
|
|
|
def process_one(
|
|
client: httpx.Client,
|
|
regno: str,
|
|
*,
|
|
force: bool = False,
|
|
row_crop_filter: bool = True,
|
|
) -> str:
|
|
"""Fetch + extract one product. Returns
|
|
'skipped'|'wrote'|'no-pdf'|'error'|'filtered'."""
|
|
md_path = _md_path(regno)
|
|
json_path = _json_path(regno)
|
|
if not force and md_path.exists() and json_path.exists():
|
|
log.info("[%s] skip (already on disk)", regno)
|
|
return "skipped"
|
|
|
|
try:
|
|
record = fetch_product_record(client, regno)
|
|
except Exception as exc:
|
|
log.error("[%s] API fetch failed: %s", regno, exc)
|
|
return "error"
|
|
time.sleep(REQUEST_DELAY_SECONDS)
|
|
|
|
if row_crop_filter and not matches_row_crop(record):
|
|
log.info("[%s] filtered (not row-crop)", regno)
|
|
return "filtered"
|
|
|
|
def _build_sidecar(
|
|
*,
|
|
label_url: str | None,
|
|
label_filename: str | None,
|
|
label_last_modified_iso: str | None,
|
|
page_count: int | None,
|
|
text_layer: bool | None,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"source": "epa_ppls",
|
|
"source_key": regno,
|
|
"epa_reg_no": regno,
|
|
"product_name": record.product_name,
|
|
"product_class": None, # EPA PPLS doesn't expose a clean class taxonomy
|
|
"registrant": record.registrant,
|
|
"active_ingredients": record.active_ingredients,
|
|
"signal_word": record.signal_word,
|
|
"label": {
|
|
"url": label_url,
|
|
"filename": label_filename,
|
|
"accepted_date": record.label_accepted_date,
|
|
"last_modified": label_last_modified_iso,
|
|
"page_count": page_count,
|
|
"text_layer": text_layer,
|
|
},
|
|
"supplemental_documents": [], # EPA PPLS sidecar omits supplementals; query API per regno
|
|
"source_urls": {
|
|
"product_page": None,
|
|
"label_api": f"{PPLS_API_BASE}/{regno}",
|
|
"label_index": PPLS_INDEX_URL_TEMPLATE.format(regno=regno),
|
|
},
|
|
# EPA-specific extras (kept out of the strict canonical schema but
|
|
# useful for joins back to EPA's data model)
|
|
"registration_status": record.registration_status,
|
|
"registrant_company_number": record.registrant_company_number,
|
|
"fetched_at": datetime.now(UTC).isoformat(),
|
|
"scraper_version": SCRAPER_VERSION,
|
|
}
|
|
|
|
if not record.label_pdf_url:
|
|
log.warning("[%s] no label PDF available — writing sidecar only", regno)
|
|
md_path.write_text(
|
|
f"# {record.product_name or regno}\n\n"
|
|
f"EPA Reg No: {regno}\n\n"
|
|
"[NO LABEL PDF AVAILABLE FROM EPA PPLS]\n",
|
|
encoding="utf-8",
|
|
)
|
|
sidecar = _build_sidecar(
|
|
label_url=None, label_filename=None,
|
|
label_last_modified_iso=None,
|
|
page_count=None, text_layer=False,
|
|
)
|
|
json_path.write_text(json.dumps(sidecar, indent=2), encoding="utf-8")
|
|
return "no-pdf"
|
|
|
|
try:
|
|
pdf_bytes, last_modified_raw = download_pdf(client, record.label_pdf_url)
|
|
except Exception as exc:
|
|
log.error("[%s] PDF download failed: %s", regno, exc)
|
|
return "error"
|
|
time.sleep(REQUEST_DELAY_SECONDS)
|
|
|
|
text, has_text = extract_pdf_text(pdf_bytes)
|
|
last_modified_iso = _http_date_to_iso(last_modified_raw)
|
|
|
|
page_count: int | None = None
|
|
try:
|
|
page_count = len(PdfReader(io.BytesIO(pdf_bytes)).pages)
|
|
except Exception:
|
|
pass
|
|
|
|
sidecar = _build_sidecar(
|
|
label_url=record.label_pdf_url,
|
|
label_filename=record.label_pdf_filename,
|
|
label_last_modified_iso=last_modified_iso,
|
|
page_count=page_count,
|
|
text_layer=has_text,
|
|
)
|
|
|
|
header_lines = [f"# {record.product_name or regno}", ""]
|
|
header_lines.append(f"- EPA Reg No: **{regno}**")
|
|
if record.registrant:
|
|
header_lines.append(f"- Registrant: {record.registrant}")
|
|
if record.signal_word:
|
|
header_lines.append(f"- Signal word: {record.signal_word}")
|
|
if record.active_ingredients:
|
|
ai_strs = [
|
|
f"{ai.get('name')} ({ai.get('percent')}%)"
|
|
for ai in record.active_ingredients
|
|
if ai.get("name")
|
|
]
|
|
if ai_strs:
|
|
header_lines.append("- Active ingredients: " + "; ".join(ai_strs))
|
|
if record.label_accepted_date:
|
|
header_lines.append(f"- Label accepted: {record.label_accepted_date}")
|
|
header_lines.append(f"- Source PDF: {record.label_pdf_url}")
|
|
header_lines.append("")
|
|
header_lines.append("---")
|
|
header_lines.append("")
|
|
|
|
if has_text:
|
|
body = text
|
|
else:
|
|
body = "[SCANNED PDF — OCR REQUIRED]\n\nThis label has no extractable text layer."
|
|
log.info("[%s] PDF has no text layer (scanned)", regno)
|
|
|
|
md_content = "\n".join(header_lines) + body + "\n"
|
|
md_path.write_text(md_content, encoding="utf-8")
|
|
json_path.write_text(json.dumps(sidecar, indent=2), encoding="utf-8")
|
|
log.info(
|
|
"[%s] wrote (text_layer=%s, pages=%s, name=%r)",
|
|
regno, has_text, page_count, record.product_name,
|
|
)
|
|
return "wrote"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _iter_regnos(
|
|
args: argparse.Namespace,
|
|
client: httpx.Client,
|
|
) -> Iterable[str]:
|
|
"""Yield reg nos to process based on CLI args."""
|
|
if args.reg_no:
|
|
for r in args.reg_no:
|
|
yield r
|
|
return
|
|
if args.seed_file:
|
|
with open(args.seed_file, encoding="utf-8") as fh:
|
|
for raw in fh:
|
|
line = raw.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
yield line
|
|
return
|
|
# Default: enumerate via PPIS bulk index
|
|
rows = fetch_ppis_index(client)
|
|
count = 0
|
|
for row in rows:
|
|
# Skip transferred-out (status_flag 'T') entries by default; their
|
|
# registration has moved to another company-product pairing.
|
|
if row.status_flag == "T":
|
|
continue
|
|
yield row.epa_reg_no
|
|
count += 1
|
|
if args.limit and count >= args.limit:
|
|
return
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(
|
|
prog="python -m scrape.sources.epa_ppls",
|
|
description="Scrape EPA PPLS pesticide labels into corpus/epa_ppls/.",
|
|
)
|
|
parser.add_argument(
|
|
"--limit", type=int, default=None,
|
|
help="Max products to process when enumerating from PPIS.",
|
|
)
|
|
parser.add_argument(
|
|
"--force", action="store_true",
|
|
help="Re-fetch even if .md/.json already exist.",
|
|
)
|
|
parser.add_argument(
|
|
"--reg-no", action="append", metavar="REGNO",
|
|
help="Process specific EPA Reg No (e.g. 524-475). Repeatable.",
|
|
)
|
|
parser.add_argument(
|
|
"--seed-file", metavar="PATH",
|
|
help="Text file with one EPA Reg No per line (# comments OK).",
|
|
)
|
|
parser.add_argument(
|
|
"--row-crop-filter", action=argparse.BooleanOptionalAction, default=True,
|
|
help="Keep only products with row-crop sites (corn, soy, cotton, "
|
|
"wheat, rice, sorghum, etc.). Default on; use --no-row-crop-filter "
|
|
"to scrape every PPLS product regardless of crop.",
|
|
)
|
|
parser.add_argument(
|
|
"--log-level", default="INFO",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
|
|
)
|
|
args = parser.parse_args(argv)
|
|
|
|
logging.basicConfig(
|
|
stream=sys.stderr,
|
|
level=getattr(logging, args.log_level),
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
|
|
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
summary = {"wrote": 0, "skipped": 0, "no-pdf": 0, "filtered": 0, "error": 0}
|
|
with _client() as client:
|
|
for regno in _iter_regnos(args, client):
|
|
result = process_one(
|
|
client, regno,
|
|
force=args.force,
|
|
row_crop_filter=args.row_crop_filter,
|
|
)
|
|
summary[result] = summary.get(result, 0) + 1
|
|
|
|
log.info("done: %s", summary)
|
|
print(json.dumps(summary), file=sys.stderr)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|