From e9250de8e799781e690de593f5917364d1d766ca Mon Sep 17 00:00:00 2001 From: Justin Paul Date: Sat, 23 May 2026 18:27:07 -0400 Subject: [PATCH] =?UTF-8?q?scrape:=20Phase=201=20=E2=80=94=20Bayer=20+=20E?= =?UTF-8?q?PA=20PPLS=20scrapers=20with=20unified=20label=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adapts the docs-mcp-template scraping layer for the pesticide-labels domain. The template's bundle/version/platform concepts don't map to labels (there's no "Bayer 8.1.0" — there's just the current accepted label per EPA Reg No), so the scraper layer is reshaped around a "source" abstraction: one source per manufacturer or regulator, one per-product label per source. Sources shipped: - bayer — Bayer Crop Science US (Next.js JSON catalog + Scene7 PDFs) - epa_ppls — EPA PPLS via PPIS bulk index + undocumented /cswu/ ORDS REST endpoint Canonical sidecar schema (see scrape/README.md) unifies fields across sources: - active_ingredients always [{name, cas, percent}] - label/* nested (url, filename, accepted_date, last_modified, page_count, text_layer) - all timestamps normalized to ISO 8601 UTC - signal_word surfaced (operationally critical for the farmer advisor) - source_key + epa_reg_no separate per-source PK from the cross-source join key bundles.json → sources.json. --bundle → --source. The runner walks sources.json and dispatches by id; per-source modules remain independently runnable for development. PLAN.md gets a one-block domain note up front; later phases (chunking, embeddings, retrieval, eval) still apply as written. Smoke test: python -m scrape.runner --all --limit 2 # works python -m scrape.runner --source bayer --limit 3 # 3 written, idempotent re-run skips python -m scrape.runner --source epa_ppls --reg-no 524-475 # Roundup Ultra, 167 pages, ISO last_modified Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 1 + PLAN.md | 13 + requirements.txt | 1 + scrape/README.md | 159 ++++++--- scrape/runner.py | 87 +++++ scrape/sources/__init__.py | 0 scrape/sources/bayer.py | 696 +++++++++++++++++++++++++++++++++++++ scrape/sources/epa_ppls.py | 599 +++++++++++++++++++++++++++++++ sources.json | 20 ++ 9 files changed, 1531 insertions(+), 45 deletions(-) create mode 100644 scrape/runner.py create mode 100644 scrape/sources/__init__.py create mode 100644 scrape/sources/bayer.py create mode 100644 scrape/sources/epa_ppls.py create mode 100644 sources.json diff --git a/.gitignore b/.gitignore index fbc0883..0597037 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ var/ .vscode/ .idea/ *.swp +.claude/ diff --git a/PLAN.md b/PLAN.md index e369d88..369c109 100644 --- a/PLAN.md +++ b/PLAN.md @@ -9,6 +9,19 @@ any LLM client (Claude Desktop, Claude Code, Cursor, Copilot) can call to answer questions against the docs, surface what changed recently, and flag likely inconsistencies. +> **Domain note for ppls-docs.** This template was originally written +> for versioned software product documentation (Zoomin bundles, Hugo +> sites, etc.). For ppls-docs the domain is pesticide product labels — +> the "bundle" abstraction has been replaced with "source" +> (manufacturer or regulator), and "page" with "product label". The +> canonical on-disk schema lives in [`scrape/README.md`](scrape/README.md), +> not in this document. References below to `bundles.json`, `bundle_id`, +> `--bundle`, `version`, and `platform` are template artifacts — read +> them as `sources.json`, `source_id`, `--source`, and (mostly) +> not-applicable. Phase 1 (scraper) is the most heavily adapted; later +> phases (chunking, embeddings, retrieval, eval) apply largely as +> written. + --- ## What you're building diff --git a/requirements.txt b/requirements.txt index b9982a9..e5873e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ ollama>=0.4.0 # if using Ollama-hosted embedder; swap if not # Scraping (Phase 1; adjust per product) beautifulsoup4>=4.12 requests>=2.31 +pypdf>=4.0 # PDF -> text for label extraction # playwright>=1.40 # uncomment if you need headless browser fallback # Evaluation diff --git a/scrape/README.md b/scrape/README.md index 44d6df3..99d8ae7 100644 --- a/scrape/README.md +++ b/scrape/README.md @@ -1,59 +1,128 @@ # scrape/ -Product-specific. **You implement this for each product.** The -template gives you the contract; the extraction logic depends on -the upstream doc portal. +Per-source scrapers for pesticide / herbicide product labels. Each +module under `scrape/sources/` pulls a single upstream catalog and +writes its results into `corpus//` using the canonical +sidecar schema documented below. -See `PLAN.md` Phase 1 for the corpus layout the rest of the pipeline -expects. +## Architecture -## What you write - -At minimum, two scripts: - -### `scrape/bundles.py` - -Discovers the upstream portal's bundle catalog and writes -`bundles.json` at the repo root. One entry per bundle (versioned doc -set) with the schema in PLAN.md. - -```bash -python -m scrape.bundles +``` +sources.json — registry of active sources +scrape/runner.py — thin dispatcher (--source | --all) +scrape/sources/.py — one source per file +corpus//.md — extracted label text (markdown) +corpus//.json — canonical metadata sidecar ``` -### `scrape/runner.py` +`` is the per-source primary key — a slug for manufacturer +sources (e.g. `warrant`, `roundup-powermax-3`) or an EPA Reg No +for regulator sources (e.g. `524-475`). The sidecar's +`epa_reg_no` field is the cross-source join key that lets the +corpus consumer reconcile records from different sources for the +same product. -Scrapes the pages of each bundle (or a single bundle with `--bundle -`). Writes: - -- `corpus//.md` — extracted markdown body -- `corpus//.json` — per-page metadata sidecar +## CLI ```bash -python -m scrape.runner --all --force --concurrency 6 -python -m scrape.runner --bundle Admin.VC.HTML.10.9 +# Run a single source +python -m scrape.runner --source bayer --limit 20 +python -m scrape.runner --source epa_ppls --reg-no 524-475 + +# Run every source registered in sources.json +python -m scrape.runner --all --limit 50 + +# Per-source modules also run standalone +python -m scrape.sources.bayer --class herbicide --limit 5 +python -m scrape.sources.epa_ppls --seed-file seeds.txt ``` -## Tips +Every scraper is **idempotent** by default — re-running with the +same arguments skips records already on disk. Use `--force` to +re-fetch. -- **Sniff before you scrape.** Almost every modern doc portal is an - SPA that calls a backend API. Open the browser's Network tab, - click around, find the underlying JSON. Scraping the API is 10× - cheaper and 100× more reliable than scraping the rendered HTML. -- **Idempotent re-scrapes.** Without `--force`, the runner should - skip pages already on disk so a resume doesn't have to re-fetch - everything. With `--force`, re-fetch every page — that's the - weekly cron mode that catches edits. -- **Respect the portal.** Backoff on 429s. Set a recognizable - user-agent so the portal owner can identify you if they want to. -- **Whitespace normalize.** Markdown that round-trips through HTML - often has extra blank lines. Normalize to a single blank between - paragraphs so diffs are clean (the changelog summary and digest - tools care about line counts). +## Canonical sidecar schema -## What's already reusable +Every `corpus//.json` conforms to this shape. Fields +that don't apply to a given source are `null` (not omitted) so the +JSON is uniform across sources. -`scrape/changelog.py` is fully product-agnostic and ready to use -as-is. It walks `git diff --name-status` output to produce a -structured summary, and walks `git log` for the digest history -(Phase 13). +```json +{ + "source": "bayer", + "source_key": "warrant", + "epa_reg_no": "524-591", + "product_name": "Warrant Herbicide", + "product_class": "herbicide", + "registrant": null, + "active_ingredients": [ + {"name": "acetochlor", "cas": "34256-82-1", "percent": 35.4} + ], + "signal_word": "Caution", + "label": { + "url": "https://cs-assets.bayer.com/is/content/bayer/Warrant_2025pdf", + "filename": "Warrant_2025pdf", + "accepted_date": "2024-01-15", + "last_modified": "2026-05-15T20:21:54+00:00", + "page_count": 24, + "text_layer": true + }, + "supplemental_documents": [ + {"kind": "2EE", "title": "Warrant tank-mix 2EE — cotton", + "url": "https://cs-assets.bayer.com/.../...pdf", + "last_modified": "2026-04-01T12:00:00+00:00"} + ], + "source_urls": { + "product_page": "https://www.cropscience.bayer.us/products/herbicides/warrant/label-msds", + "label_api": null, + "label_index": null + }, + "fetched_at": "2026-05-23T22:05:29+00:00", + "scraper_version": "0.1.0" +} +``` + +### Field reference + +| Field | Type | Required | Notes | +|---|---|---|---| +| `source` | string | yes | Matches an `id` in `sources.json`. | +| `source_key` | string | yes | Per-source primary key. Filesystem-safe. | +| `epa_reg_no` | string \| null | best-effort | Canonical EPA registration (e.g. `524-591`, or `524-591-12345` with distributor suffix). The cross-source join key. | +| `product_name` | string \| null | yes | Display name. | +| `product_class` | string \| null | best-effort | One of `herbicide`, `fungicide`, `insecticide`, `seed-treatment`, `rodenticide`, `other`. EPA PPLS leaves this `null`; manufacturer sources usually know. | +| `registrant` | string \| null | best-effort | Required-ish for regulator sources, often `null` for MFR sources where redundant. | +| `active_ingredients` | array of objects | yes (may be empty) | `[{name, cas, percent}]`. `cas` and `percent` are `null` when the source doesn't expose them. | +| `signal_word` | string \| null | best-effort | `Danger`, `Warning`, `Caution`, or `null`. Operationally critical for the farmer advisor. | +| `label.url` | string \| null | yes | Direct URL of the current label PDF. | +| `label.filename` | string \| null | best-effort | Last URL segment, useful for diffing revisions. | +| `label.accepted_date` | ISO date \| null | best-effort | EPA-stamped acceptance date. MFR sources may not expose this. | +| `label.last_modified` | ISO 8601 datetime \| null | best-effort | From the PDF's HTTP `Last-Modified` header. Always normalized to ISO 8601 UTC. | +| `label.page_count` | int \| null | best-effort | After download. | +| `label.text_layer` | bool \| null | best-effort | `false` for scanned PDFs that need OCR. | +| `supplemental_documents` | array | yes (may be empty) | 24(c) labels, 2(ee) bulletins, MSDS/SDS, product bulletins. EPA PPLS leaves this empty (those are separate API calls). | +| `source_urls.product_page` | string \| null | best-effort | The HTML product page on the source site. | +| `source_urls.label_api` | string \| null | best-effort | The JSON API endpoint that returned this record (for traceability). | +| `source_urls.label_index` | string \| null | best-effort | The human-readable index/search URL. | +| `fetched_at` | ISO 8601 datetime | yes | When this sidecar was generated. | +| `scraper_version` | string | yes | Source module's `SCRAPER_VERSION` constant. | + +Sources may add their own extra fields beyond the canonical schema +(EPA's sidecars carry `registration_status` and +`registrant_company_number`, for instance). Consumers should ignore +unknown fields. + +## Adding a new source + +1. Write `scrape/sources/.py` exposing a `main(argv: list[str]) -> int` + that accepts at minimum `--limit N` and `--force`. +2. Conform to the canonical sidecar schema. Add source-specific + extras as additional top-level keys if they don't fit. +3. Add an entry to `sources.json` (`id`, `title`, `type`, `homepage`, + `scraper`, `scraper_version`, `license_note`). +4. Scrapers MUST be polite: rate-limit to ≤1 req/sec, set a real + User-Agent identifying the project, retry with backoff on 429/5xx, + and respect robots.txt unless an explicit carve-out exists (e.g. + Bayer's RAG allowlist). +5. Scrapers MUST be idempotent: skip records already on disk unless + `--force` is set. diff --git a/scrape/runner.py b/scrape/runner.py new file mode 100644 index 0000000..1e20bbb --- /dev/null +++ b/scrape/runner.py @@ -0,0 +1,87 @@ +"""Thin dispatcher that routes ``--source `` to the right per-source +scraper module. + +For ppls-docs the convention is **one source per scraper module** under +``scrape.sources.``. Each module is independently runnable via +``python -m scrape.sources.`` and accepts its own flags — this +runner is a convenience shim for CI / the weekly refresh workflow. + +Examples: + + python -m scrape.runner --source bayer --limit 20 + python -m scrape.runner --source epa_ppls --limit 20 + python -m scrape.runner --all # walk every source in sources.json + +Anything after the recognized flags is passed through to the source +scraper, so: + + python -m scrape.runner --source bayer --force --product warrant + +just dispatches to ``scrape.sources.bayer`` with ``--force --product +warrant`` as argv. +""" + +from __future__ import annotations + +import argparse +import importlib +import json +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +SOURCES_JSON = REPO_ROOT / "sources.json" + + +def _load_sources() -> list[dict]: + if not SOURCES_JSON.exists(): + return [] + try: + return json.loads(SOURCES_JSON.read_text()) + except json.JSONDecodeError: + return [] + + +def _run_source(source_id: str, passthrough: list[str]) -> int: + mod_name = f"scrape.sources.{source_id}" + try: + mod = importlib.import_module(mod_name) + except ImportError as exc: + print(f"runner: no source module {mod_name}: {exc}", file=sys.stderr) + return 2 + main = getattr(mod, "main", None) + if not callable(main): + print(f"runner: {mod_name} has no main() entrypoint", file=sys.stderr) + return 2 + return int(main(passthrough) or 0) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(prog="scrape.runner") + parser.add_argument("--source", help="Source id (matches sources.json)") + parser.add_argument("--all", action="store_true", + help="Run every source listed in sources.json") + args, passthrough = parser.parse_known_args(argv) + + if not args.source and not args.all: + parser.error("specify --source or --all") + + sources = _load_sources() + if args.all: + ids = [s["id"] for s in sources if "id" in s] + if not ids: + print("runner: sources.json is empty or missing", file=sys.stderr) + return 2 + else: + # If the source isn't registered in sources.json yet, dispatch anyway + # so the scraper can be exercised during initial development. + ids = [args.source] + + rc = 0 + for sid in ids: + rc |= _run_source(sid, passthrough) + return rc + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scrape/sources/__init__.py b/scrape/sources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scrape/sources/bayer.py b/scrape/sources/bayer.py new file mode 100644 index 0000000..ed0da6f --- /dev/null +++ b/scrape/sources/bayer.py @@ -0,0 +1,696 @@ +"""Bayer Crop Science US label scraper. + +Pulls herbicide / fungicide / insecticide / seed-treatment product +metadata and label PDFs from https://www.cropscience.bayer.us, extracts +each PDF to markdown, and writes a metadata sidecar JSON per product. + +Output: + corpus/bayer/.md extracted label text + corpus/bayer/.json metadata sidecar (see SIDECAR_SCHEMA in + PLAN.md / this repo's CLAUDE.md) + +The scraper resolves Bayer's rotating Next.js ``buildId`` from the +homepage at runtime, then walks the catalog JSON API for each product +class. It extracts the rest of the label/MSDS/supplemental download +URLs from each product page's ``__NEXT_DATA__`` JSON island — this is +strictly cheaper and more stable than scraping rendered HTML. + +robots.txt for cropscience.bayer.us explicitly allows scraping for +"search engine indexing or artificial intelligence retrieval augmented +generation" use cases, which is what this corpus feeds. + +CLI: + + python -m scrape.sources.bayer --limit 20 + python -m scrape.sources.bayer --limit 20 --force + python -m scrape.sources.bayer --product warrant + python -m scrape.sources.bayer --class herbicide --limit 5 +""" + +from __future__ import annotations + +import argparse +import io +import json +import logging +import os +import random +import re +import sys +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable + +import requests +from pypdf import PdfReader + +SCRAPER_VERSION = "0.1.0" +USER_AGENT = "ppls-docs-scraper/0.1 (+https://drawbar.example/contact)" +BASE = "https://www.cropscience.bayer.us" + +# Catalog product-type values used in the Next.js data API. +PRODUCT_TYPES = ("Herbicide", "Fungicide", "Insecticide", "Seed_Treatment") + +# Map product-type filter -> the canonical "product_class" we record +# in the sidecar (matches the legacy URL segments). +PRODUCT_CLASS = { + "Herbicide": "herbicide", + "Fungicide": "fungicide", + "Insecticide": "insecticide", + "Seed_Treatment": "seed-treatment", +} + +# Repo root: scrape/sources/bayer.py -> repo root is 3 parents up. +REPO_ROOT = Path(__file__).resolve().parents[2] +CORPUS_DIR = REPO_ROOT / "corpus" / "bayer" + +# Politeness: target ~1 req/sec to Bayer. Each HTTP method goes through +# a tiny token-bucket sleeper to enforce this without per-call asyncio. +REQ_INTERVAL_SEC = 1.0 + +log = logging.getLogger("scrape.bayer") + + +# --------------------------------------------------------------------- HTTP + + +class RateLimitedSession: + """``requests.Session`` wrapper with sleep-based rate limiting and + polite retries on 429/5xx.""" + + def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None: + self.s = requests.Session() + self.s.headers["User-Agent"] = USER_AGENT + self.interval = interval + self._last = 0.0 + + def _wait(self) -> None: + delta = time.monotonic() - self._last + if delta < self.interval: + time.sleep(self.interval - delta) + self._last = time.monotonic() + + def request( + self, + method: str, + url: str, + *, + max_retries: int = 4, + timeout: float = 30.0, + **kw: Any, + ) -> requests.Response: + last_exc: Exception | None = None + for attempt in range(max_retries): + self._wait() + try: + resp = self.s.request(method, url, timeout=timeout, **kw) + except requests.RequestException as exc: + last_exc = exc + backoff = min(30.0, (2 ** attempt) + random.random()) + log.warning("network error on %s %s: %s — retry in %.1fs", + method, url, exc, backoff) + time.sleep(backoff) + continue + if resp.status_code in (429,) or 500 <= resp.status_code < 600: + # Honor Retry-After if present, else exponential backoff. + ra = resp.headers.get("Retry-After") + if ra and ra.isdigit(): + backoff = float(ra) + else: + backoff = min(30.0, (2 ** attempt) + random.random()) + log.warning("HTTP %d on %s %s — retry in %.1fs", + resp.status_code, method, url, backoff) + time.sleep(backoff) + continue + return resp + if last_exc: + raise last_exc + # Final response (still bad) returned for caller to handle. + return resp + + def get(self, url: str, **kw: Any) -> requests.Response: + return self.request("GET", url, **kw) + + def head(self, url: str, **kw: Any) -> requests.Response: + kw.setdefault("allow_redirects", True) + return self.request("HEAD", url, **kw) + + +# --------------------------------------------------------------------- model + + +@dataclass +class SupplementalDoc: + kind: str + title: str + url: str + last_modified: str | None = None + + +@dataclass +class BayerProduct: + slug: str # filesystem-safe slug, e.g. "warrant" + catalog_slug: str # bayer's seoSlug, e.g. "warrant-herbicide" + product_url_path: str # e.g. "/crop-protection/herbicide/warrant-herbicide" + product_class: str # "herbicide" | "fungicide" | ... + product_name: str = "" + epa_reg_no: str | None = None + active_ingredients: list[dict] = field(default_factory=list) # [{name, cas, percent}] + label_url: str | None = None + label_filename: str | None = None + label_last_modified: str | None = None + label_page_count: int | None = None + label_text_layer: bool | None = None + supplemental_pdfs: list[SupplementalDoc] = field(default_factory=list) + source_page_url: str = "" + + +# --------------------------------------------------------------------- helpers + + +_NEXT_DATA_RE = re.compile( + r'', re.S +) + + +def parse_next_data(html: str) -> dict[str, Any]: + """Pull the ``__NEXT_DATA__`` JSON blob out of a Next.js page.""" + m = _NEXT_DATA_RE.search(html) + if not m: + raise RuntimeError("no __NEXT_DATA__ script tag found") + return json.loads(m.group(1)) + + +def fetch_build_id(http: RateLimitedSession) -> str: + """Grab the rotating ``buildId`` from the Bayer homepage.""" + r = http.get(BASE + "/") + r.raise_for_status() + data = parse_next_data(r.text) + bid = data.get("buildId") + if not bid: + raise RuntimeError("buildId missing from homepage __NEXT_DATA__") + log.info("resolved Bayer buildId=%s", bid) + return bid + + +def normalize_epa_reg(raw: str | None) -> str | None: + """Convert Bayer's padded reg number to canonical EPA form. + + Example: ``0000524-00591-AA-0000000`` -> ``524-591``. + The trailing ``-AA-0000000`` is a Bayer-internal qualifier we + don't surface. We keep ``524-591/`` if a non-empty sub-reg + appears (rare). + """ + if not raw: + return None + parts = raw.split("-") + if len(parts) < 2: + return raw.strip() or None + company = parts[0].lstrip("0") or "0" + product = parts[1].lstrip("0") or "0" + epa = f"{company}-{product}" + # If the third segment is something other than the default "AA", + # it's likely a distributor sub-reg. Preserve it. + if len(parts) >= 3 and parts[2] and parts[2] != "AA": + epa += f"-{parts[2]}" + return epa + + +def classify_supplemental(title: str, url: str) -> str: + """Classify a supplemental/auxiliary doc by its title or URL. + + Returns a short kind tag like ``2EE``, ``24C``, ``24C-CA``, + ``Bulletin``, ``MSDS``, ``Label``, or ``Other``. The exact tag + isn't load-bearing for the scraper — it's metadata to help the + chunker/agent later. Best-effort regex; ambiguous = ``Other``. + """ + t = (title or "").upper() + u = (url or "").upper() + blob = f"{t} {u}" + + # State-specific 24c labels usually carry a two-letter state code, + # but Bayer's titles rarely encode it. Best we can do is flag 24c. + if "24C" in blob or "SECTION_24C" in blob or "SECTION 24C" in blob: + # Try to spot a state suffix in the URL (e.g. "_24c_ca"). + m = re.search(r"24[_-]?C[_-]([A-Z]{2})\b", u) + if m: + return f"24C-{m.group(1)}" + return "24C" + if "2EE" in blob or "2_EE" in blob: + return "2EE" + if "MSDS" in blob or "SDS" in blob or "SAFETY DATA" in blob: + return "MSDS" + if "BULLETIN" in blob: + return "Bulletin" + if "SUPPLEMENTAL" in blob: + return "Supplemental" + if "LABEL" in blob: + return "Label" + return "Other" + + +def safe_slug(catalog_slug: str, product_class: str) -> str: + """Strip the trailing class suffix so ``warrant-herbicide`` becomes + ``warrant``; falls back to the full slug for slugs that don't end + with the class word.""" + suffix = f"-{product_class}" + if catalog_slug.endswith(suffix): + return catalog_slug[: -len(suffix)] + # seed-treatment is sometimes split or omitted; just return as-is. + return catalog_slug + + +def iso_from_http_date(http_date: str | None) -> str | None: + """RFC1123 -> ISO 8601 UTC. Returns None if unparseable.""" + if not http_date: + return None + try: + from email.utils import parsedate_to_datetime + dt = parsedate_to_datetime(http_date) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc).isoformat() + except Exception: # noqa: BLE001 + return None + + +# --------------------------------------------------------------------- catalog + + +def walk_catalog( + http: RateLimitedSession, build_id: str +) -> Iterable[BayerProduct]: + """Yield ``BayerProduct`` stubs for every product across all classes. + + Stubs carry only catalog-level info (slug, URL, class). The detail + fetch (EPA reg, ingredients, PDFs) happens later via + :func:`fetch_product_detail`. + """ + for ptype in PRODUCT_TYPES: + product_class = PRODUCT_CLASS[ptype] + page = 1 + seen = 0 + while True: + url = ( + f"{BASE}/_next/data/{build_id}/crop-protection/catalog.json" + f"?productType={ptype}&p={page}" + ) + r = http.get(url) + if r.status_code != 200: + log.warning("catalog %s p=%d -> HTTP %d, stopping class", + ptype, page, r.status_code) + break + data = r.json().get("pageProps", {}) + products = data.get("serverProducts") or [] + total = data.get("total") or 0 + if not products: + break + for p in products: + slug = p.get("seoSlug") or "" + product_url = p.get("productURL") or "" + if not slug or not product_url: + continue + yield BayerProduct( + slug=safe_slug(slug, product_class), + catalog_slug=slug, + product_url_path=product_url, + product_class=product_class, + ) + seen += len(products) + if seen >= total: + break + page += 1 + + +# --------------------------------------------------------------------- detail + + +def fetch_product_detail( + http: RateLimitedSession, prod: BayerProduct +) -> BayerProduct: + """Populate EPA reg, active ingredients, and the full PDF list on + a catalog stub by fetching its product page __NEXT_DATA__.""" + page_url = BASE + prod.product_url_path + prod.source_page_url = page_url + r = http.get(page_url) + r.raise_for_status() + data = parse_next_data(r.text) + pp = (data.get("props") or {}).get("pageProps") or {} + pd = pp.get("productDetails") or {} + + prod.product_name = pd.get("productLabel") or pd.get("productName") or prod.slug + prod.epa_reg_no = normalize_epa_reg(pd.get("registrationNumber")) + # Bayer's product page exposes ingredient names only — no CAS or percent. + # Conform to the canonical schema by emitting objects with name set and + # the other fields null; downstream consumers can hydrate from EPA PPLS. + prod.active_ingredients = [ + {"name": a.get("ingredient"), "cas": None, "percent": None} + for a in (pd.get("activeIngredients") or []) + if a.get("ingredient") + ] + + # Primary label: prefer downloadLabelUrl, then importantDocuments. + important = (pp.get("importantDocuments") or {}).get("labelData") or [] + additional = (pp.get("additionalDownloads") or {}).get("labelData") or [] + download_url = pp.get("downloadLabelUrl") + + label_url: str | None = None + if download_url and looks_like_pdf(download_url): + label_url = download_url + else: + # First entry titled "Label" or simply the first PDF. + for d in important: + t = (d.get("title") or "").lower() + u = d.get("url") or "" + if not looks_like_pdf(u): + continue + if "label" in t and "msds" not in t and "sds" not in t: + label_url = u + break + if not label_url: + for d in important + additional: + u = d.get("url") or "" + if looks_like_pdf(u): + label_url = u + break + + prod.label_url = label_url + if label_url: + # Last URL segment is the Scene7 asset id (e.g. "Warrant_2025pdf"). + prod.label_filename = label_url.rsplit("/", 1)[-1] + + # Collect ALL other PDFs as supplementals (label/MSDS/24c/2EE/bulletin + # /etc.). The kind tag is best-effort; the chunker can refine later. + supplementals: list[SupplementalDoc] = [] + seen_urls: set[str] = set() + if label_url: + seen_urls.add(label_url) + for d in important + additional: + u = d.get("url") or "" + t = d.get("title") or "" + if not u or u in seen_urls: + continue + if not looks_like_pdf(u): + continue + seen_urls.add(u) + supplementals.append(SupplementalDoc( + kind=classify_supplemental(t, u), + title=t, + url=u, + )) + prod.supplemental_pdfs = supplementals + + return prod + + +def looks_like_pdf(url: str) -> bool: + """True if the URL is one of Bayer's PDF endpoints. + + Bayer serves PDFs via Adobe Scene7 with the literal ``pdf`` (no + dot) appended to the asset ID, plus some assets on cs-contentapi + with a real ``.pdf`` extension. + """ + u = url.lower() + if u.endswith("pdf"): + return True + if u.endswith(".pdf"): + return True + return False + + +# --------------------------------------------------------------------- PDF + + +def head_last_modified(http: RateLimitedSession, url: str) -> str | None: + """Resolve Last-Modified for a PDF URL. Returns ISO 8601 or None.""" + try: + r = http.head(url) + except requests.RequestException as exc: + log.warning("HEAD failed for %s: %s", url, exc) + return None + if r.status_code != 200: + log.warning("HEAD %s -> HTTP %d", url, r.status_code) + return None + return iso_from_http_date(r.headers.get("Last-Modified")) + + +def fetch_pdf_text(http: RateLimitedSession, url: str) -> tuple[str, int, bool]: + """Download a PDF and return ``(text, page_count, has_text_layer)``. + + Concatenates all pages, normalizes whitespace, and collapses runs + of blank lines so the resulting markdown diffs cleanly. ``has_text_layer`` + is False for scanned PDFs whose pypdf extract produced no text. + """ + r = http.get(url) + r.raise_for_status() + if "pdf" not in (r.headers.get("Content-Type") or "").lower(): + log.warning("expected PDF Content-Type at %s, got %s", + url, r.headers.get("Content-Type")) + reader = PdfReader(io.BytesIO(r.content)) + page_count = len(reader.pages) + chunks: list[str] = [] + for page in reader.pages: + try: + text = page.extract_text() or "" + except Exception as exc: # noqa: BLE001 + log.warning("pypdf extract_text failed on a page of %s: %s", + url, exc) + text = "" + chunks.append(text) + raw = "\n\n".join(chunks) + normalized = normalize_text(raw) + has_text_layer = bool(normalized.strip()) + return normalized, page_count, has_text_layer + + +def normalize_text(s: str) -> str: + # Strip trailing spaces per line, collapse 3+ blank lines to 2, + # and trim NBSPs that pypdf often leaves behind. + s = s.replace("\u00a0", " ") + s = re.sub(r"[ \t]+\n", "\n", s) + s = re.sub(r"\n{3,}", "\n\n", s) + return s.strip() + "\n" + + +# --------------------------------------------------------------------- write + + +def write_product(prod: BayerProduct, body_md: str) -> None: + """Write the canonical sidecar + markdown body. See scrape/README.md + for the schema.""" + CORPUS_DIR.mkdir(parents=True, exist_ok=True) + md_path = CORPUS_DIR / f"{prod.slug}.md" + json_path = CORPUS_DIR / f"{prod.slug}.json" + + # Lightweight markdown frontmatter for human eyeballing — canonical + # metadata lives in the sidecar. + title = prod.product_name or prod.slug + ai_summary = ", ".join(a["name"] for a in prod.active_ingredients if a.get("name")) or "(unknown)" + header = ( + f"# {title}\n\n" + f"- **Product class:** {prod.product_class}\n" + f"- **EPA Reg No:** {prod.epa_reg_no or '(unknown)'}\n" + f"- **Active ingredients:** {ai_summary}\n" + f"- **Source:** {prod.source_page_url}\n" + f"- **Label PDF:** {prod.label_url or '(none on page)'}\n\n" + "---\n\n" + ) + md_path.write_text(header + body_md, encoding="utf-8") + + sidecar = { + "source": "bayer", + "source_key": prod.slug, + "epa_reg_no": prod.epa_reg_no, + "product_name": prod.product_name, + "product_class": prod.product_class, + "registrant": None, + "active_ingredients": prod.active_ingredients, + "signal_word": None, + "label": { + "url": prod.label_url, + "filename": prod.label_filename, + "accepted_date": None, + "last_modified": prod.label_last_modified, + "page_count": prod.label_page_count, + "text_layer": prod.label_text_layer, + }, + "supplemental_documents": [ + { + "kind": s.kind, + "title": s.title, + "url": s.url, + "last_modified": s.last_modified, + } + for s in prod.supplemental_pdfs + ], + "source_urls": { + "product_page": prod.source_page_url, + "label_api": None, + "label_index": None, + }, + "fetched_at": datetime.now(timezone.utc).isoformat(), + "scraper_version": SCRAPER_VERSION, + } + json_path.write_text( + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + + +# --------------------------------------------------------------------- pipeline + + +def process_product( + http: RateLimitedSession, + prod: BayerProduct, + *, + force: bool, +) -> str: + """Fetch detail + PDF and write to disk. Returns a status string + suitable for logging: ``written``, ``skipped``, ``no-pdf``, + ``failed``.""" + md_path = CORPUS_DIR / f"{prod.slug}.md" + if md_path.exists() and not force: + return "skipped" + try: + fetch_product_detail(http, prod) + except Exception as exc: # noqa: BLE001 + log.error("detail fetch failed for %s: %s", prod.slug, exc) + return "failed" + + # Resolve Last-Modified for label + supplementals (HEAD only, cheap). + if prod.label_url: + prod.label_last_modified = head_last_modified(http, prod.label_url) + for s in prod.supplemental_pdfs: + s.last_modified = head_last_modified(http, s.url) + + if not prod.label_url: + # Some Bayer products have no public label PDF (e.g. product was + # discontinued or the page only carries a Product Bulletin). We + # still record the metadata sidecar so the catalog is complete, + # but write a stub body so the file count reflects reality. + log.info("%s — no label PDF; writing metadata only", prod.slug) + prod.label_text_layer = False + write_product(prod, "_(No label PDF was found on the product page.)_\n") + return "no-pdf" + + try: + body, page_count, text_layer = fetch_pdf_text(http, prod.label_url) + except Exception as exc: # noqa: BLE001 + log.error("PDF fetch/extract failed for %s (%s): %s", + prod.slug, prod.label_url, exc) + return "failed" + + prod.label_page_count = page_count + prod.label_text_layer = text_layer + if not body.strip(): + log.warning("%s — extracted PDF was empty (scanned?)", prod.slug) + body = "[SCANNED PDF — OCR REQUIRED]\n" + + write_product(prod, body) + return "written" + + +def run( + *, + limit: int | None, + force: bool, + only_product: str | None, + only_class: str | None, +) -> int: + CORPUS_DIR.mkdir(parents=True, exist_ok=True) + http = RateLimitedSession() + build_id = fetch_build_id(http) + + products: list[BayerProduct] = [] + for prod in walk_catalog(http, build_id): + if only_class and prod.product_class != only_class: + continue + if only_product and prod.slug != only_product and prod.catalog_slug != only_product: + continue + products.append(prod) + + if only_product and not products: + log.error("no product matched --product=%s", only_product) + return 2 + + log.info("catalog yielded %d candidate product(s)", len(products)) + + counts = {"written": 0, "skipped": 0, "no-pdf": 0, "failed": 0} + processed = 0 + for prod in products: + if limit is not None and processed >= limit: + break + processed += 1 + status = process_product(http, prod, force=force) + counts[status] = counts.get(status, 0) + 1 + log.info( + "[%d/%s] %s %s | class=%s epa=%s ai=%s label=%s", + processed, str(limit) if limit else "all", + prod.slug, status, + prod.product_class, + prod.epa_reg_no or "-", + ",".join(a["name"] for a in prod.active_ingredients if a.get("name")) or "-", + prod.label_url or "-", + ) + + log.info( + "done: processed=%d written=%d skipped=%d no-pdf=%d failed=%d", + processed, + counts["written"], counts["skipped"], counts["no-pdf"], counts["failed"], + ) + return 0 if counts["failed"] == 0 else 1 + + +# --------------------------------------------------------------------- CLI + + +def _build_argparser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + prog="scrape.sources.bayer", + description="Scrape Bayer Crop Science US product labels.", + ) + p.add_argument( + "--limit", type=int, default=None, + help="Stop after processing N products (default: all).", + ) + p.add_argument( + "--force", action="store_true", + help="Re-download even if the markdown file already exists.", + ) + p.add_argument( + "--product", default=None, + help="Process a single product by slug (e.g. 'warrant' or " + "'warrant-herbicide').", + ) + p.add_argument( + "--class", dest="product_class", default=None, + choices=sorted(set(PRODUCT_CLASS.values())), + help="Limit to one product class.", + ) + p.add_argument( + "--log-level", default=os.environ.get("LOG_LEVEL", "INFO"), + help="Python logging level (default INFO).", + ) + return p + + +def main(argv: list[str] | None = None) -> int: + args = _build_argparser().parse_args(argv) + logging.basicConfig( + level=args.log_level.upper(), + format="%(asctime)s %(levelname)s %(name)s %(message)s", + stream=sys.stderr, + ) + return run( + limit=args.limit, + force=args.force, + only_product=args.product, + only_class=args.product_class, + ) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scrape/sources/epa_ppls.py b/scrape/sources/epa_ppls.py new file mode 100644 index 0000000..e8f3938 --- /dev/null +++ b/scrape/sources/epa_ppls.py @@ -0,0 +1,599 @@ +"""EPA PPLS (Pesticide Product Label System) scraper. + +Enumeration strategy +==================== +The PPLS Oracle APEX portal (ordspub.epa.gov/ords/pesticides/f?p=PPLS:1) +is session-stateful and hostile to enumeration, so we use a two-phase +approach that bypasses APEX entirely: + +1. **List products** via the public PPIS bulk download + ``https://www3.epa.gov/pesticides/PPISdata/product.zip`` — a 107-char + fixed-width flat file (``product.txt``, ~102K active Section 3 + registrations, refreshed every Tuesday). Gives us the universe of + EPA Reg Nos (company-product), plus the product name. + +2. **Hydrate per product** via the PPLS REST data service at + ``https://ordspub.epa.gov/ords/pesticides/cswu/ppls/{regno}`` — + returns rich JSON: registrant, active ingredients (with CAS + %), + formulations, status, signal word, AND a ``pdffiles`` array + listing every stamped label PDF EPA has accepted for the product. + The most recent entry gives us the canonical PDF filename + (``{company6}-{product5}-{YYYYMMDD}.pdf``), solving the + stamped-date-suffix problem without having to guess. + +3. **Fetch label PDF** from + ``https://www3.epa.gov/pesticides/chem_search/ppls/{filename}`` + and extract text with pypdf. Many EPA labels are scans with no + text layer — those are flagged ``text_layer: false`` and the .md + body is a ``[SCANNED PDF — OCR REQUIRED]`` placeholder. OCR is + deferred to Phase 2. + +Paths rejected and why +---------------------- +- ``/ords/pesticides/ppls/{reg}`` (no ``/cswu/`` prefix): returns the + APEX HTML splash, not JSON. The undocumented ``/cswu/`` prefix is + the actual ORDS REST handler. +- Scraping the APEX UI: session-stateful, fragile, blocked. +- data.gov mirror: redirects to the same APEX page, no extract. +- NPIRS (Purdue): subscription-walled; PPIS is the same authoritative + feed anyway. + +Required sidecar fields (per task spec): ``source``, ``epa_reg_no``, +``label_pdf_url``, ``fetched_at``. Everything else best-effort. +""" + +from __future__ import annotations + +import argparse +import io +import json +import logging +import re +import sys +import time +import zipfile +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, Iterable + +import httpx +from pypdf import PdfReader +from pypdf.errors import PdfReadError + +SCRAPER_VERSION = "0.1.0" +USER_AGENT = "ppls-docs-scraper/0.1 (+https://drawbar.example/contact)" + +PPIS_PRODUCT_ZIP_URL = "https://www3.epa.gov/pesticides/PPISdata/product.zip" +PPLS_API_BASE = "https://ordspub.epa.gov/ords/pesticides/cswu/ppls" +PPLS_PDF_BASE = "https://www3.epa.gov/pesticides/chem_search/ppls" +PPLS_INDEX_URL_TEMPLATE = ( + "https://ordspub.epa.gov/ords/pesticides/f?p=PPLS:102:::NO::P102_REG_NUM:{regno}" +) + +REPO_ROOT = Path(__file__).resolve().parents[2] +CORPUS_DIR = REPO_ROOT / "corpus" / "epa_ppls" + +REQUEST_DELAY_SECONDS = 1.1 # polite: ~1 req/sec +HTTP_TIMEOUT = httpx.Timeout(60.0, connect=15.0) +MAX_RETRIES = 4 + +log = logging.getLogger("epa_ppls") + + +# --------------------------------------------------------------------------- +# HTTP helpers +# --------------------------------------------------------------------------- + + +def _client() -> httpx.Client: + return httpx.Client( + headers={"User-Agent": USER_AGENT, "Accept-Encoding": "gzip, deflate"}, + timeout=HTTP_TIMEOUT, + follow_redirects=True, + ) + + +def _get_with_retries( + client: httpx.Client, url: str, *, expect_json: bool = False +) -> httpx.Response: + """GET with exponential backoff on 5xx/429/network errors.""" + last_exc: Exception | None = None + for attempt in range(1, MAX_RETRIES + 1): + try: + resp = client.get(url) + if resp.status_code in (429, 500, 502, 503, 504): + wait = min(2 ** attempt, 30) + log.warning( + "HTTP %s on %s (attempt %d/%d) — sleeping %ds", + resp.status_code, url, attempt, MAX_RETRIES, wait, + ) + time.sleep(wait) + continue + resp.raise_for_status() + if expect_json: + # ORDS sometimes returns text/html error pages with 200 — sanity + ctype = resp.headers.get("content-type", "") + if "json" not in ctype.lower(): + raise httpx.HTTPError( + f"Expected JSON, got content-type={ctype!r} for {url}" + ) + return resp + except (httpx.TransportError, httpx.HTTPError) as exc: + last_exc = exc + wait = min(2 ** attempt, 30) + log.warning( + "Network error on %s (attempt %d/%d): %s — sleeping %ds", + url, attempt, MAX_RETRIES, exc, wait, + ) + time.sleep(wait) + raise RuntimeError(f"GET {url} failed after {MAX_RETRIES} attempts: {last_exc}") + + +# --------------------------------------------------------------------------- +# Enumeration: PPIS bulk product.zip +# --------------------------------------------------------------------------- + + +@dataclass +class PpisRow: + """One row of PPIS product.txt — enough to hydrate via the API.""" + epa_reg_no: str + product_name: str + status_flag: str # 'F' (federal/active) or 'T' (transferred) + rup_flag: str # 'Y' or 'N' + + +def _parse_ppis_line(line: str) -> PpisRow | None: + """Parse one 107-char PPIS product.txt row. + + Layout (1-indexed, inferred from inspection): + 1-6 company number (zero-padded, may contain trailing spaces) + 7-11 product number (zero-padded, may contain trailing spaces) + 33-102 product name (70 chars, space-padded) + 103 status flag ('F' or 'T') + 106 RUP flag ('Y' or 'N') + """ + if len(line) < 106: + return None + company_raw = line[0:6].strip() + product_raw = line[6:11].strip() + if not company_raw or not product_raw: + return None + # Strip leading zeros for canonical EPA Reg No display + try: + company = str(int(company_raw)) + product = str(int(product_raw)) + except ValueError: + return None + name = line[32:102].strip() + status_flag = line[102:103] + rup_flag = line[105:106] if len(line) > 105 else "N" + return PpisRow( + epa_reg_no=f"{company}-{product}", + product_name=name, + status_flag=status_flag, + rup_flag=rup_flag, + ) + + +def fetch_ppis_index(client: httpx.Client) -> list[PpisRow]: + """Download PPIS product.zip and parse into PpisRow list.""" + log.info("Fetching PPIS index from %s", PPIS_PRODUCT_ZIP_URL) + resp = _get_with_retries(client, PPIS_PRODUCT_ZIP_URL) + rows: list[PpisRow] = [] + with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: + with zf.open("product.txt") as fh: + for raw in fh: + line = raw.decode("latin-1").rstrip("\n").rstrip("\r") + row = _parse_ppis_line(line) + if row is not None: + rows.append(row) + log.info("Parsed %d rows from PPIS index", len(rows)) + return rows + + +# --------------------------------------------------------------------------- +# Hydration: PPLS JSON API +# --------------------------------------------------------------------------- + + +def _zero_pad_regno(regno: str) -> str: + """524-475 -> 000524-00475 (canonical filename form). Distributor suffix + (524-475-12345) -> 000524-00475-12345.""" + parts = regno.split("-") + if len(parts) == 2: + c, p = parts + return f"{int(c):06d}-{int(p):05d}" + if len(parts) == 3: + c, p, d = parts + return f"{int(c):06d}-{int(p):05d}-{int(d):05d}" + return regno + + +_MONTHS = { + "january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6, + "july": 7, "august": 8, "september": 9, "october": 10, "november": 11, + "december": 12, +} + + +def _parse_label_date(text: str | None) -> str | None: + """'October 18, 2016' -> '2016-10-18'. Returns None on any parse issue.""" + if not text: + return None + m = re.match(r"^([A-Za-z]+)\s+(\d{1,2}),\s+(\d{4})$", text.strip()) + if not m: + return None + month = _MONTHS.get(m.group(1).lower()) + if month is None: + return None + try: + return f"{int(m.group(3)):04d}-{month:02d}-{int(m.group(2)):02d}" + except ValueError: + return None + + +def _http_date_to_iso(http_date: str | None) -> str | None: + """RFC1123 'Wed, 19 Oct 2016 17:48:09 GMT' -> ISO 8601 UTC. + + Returns None on unparseable input. Matches the canonical schema's + requirement that all timestamps be ISO 8601. + """ + if not http_date: + return None + try: + from email.utils import parsedate_to_datetime + dt = parsedate_to_datetime(http_date) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return dt.astimezone(UTC).isoformat() + except Exception: # noqa: BLE001 + return None + + +@dataclass +class ProductRecord: + epa_reg_no: str + product_name: str | None + registrant: str | None + registrant_company_number: str | None + active_ingredients: list[dict[str, Any]] + label_pdf_url: str | None + label_pdf_filename: str | None + label_accepted_date: str | None + registration_status: str | None + signal_word: str | None + raw_api_item: dict[str, Any] | None = field(repr=False, default=None) + + +def fetch_product_record(client: httpx.Client, regno: str) -> ProductRecord: + """Call the PPLS API for one EPA Reg No; build a ProductRecord.""" + url = f"{PPLS_API_BASE}/{regno}" + resp = _get_with_retries(client, url, expect_json=True) + payload = resp.json() + items = payload.get("items") or [] + if not items: + return ProductRecord( + epa_reg_no=regno, + product_name=None, + registrant=None, + registrant_company_number=None, + active_ingredients=[], + label_pdf_url=None, + label_pdf_filename=None, + label_accepted_date=None, + registration_status=None, + signal_word=None, + raw_api_item=None, + ) + item = items[0] + company_info = (item.get("companyinfo") or [{}])[0] + registrant = company_info.get("name") + company_num = regno.split("-")[0] + ingredients = [] + for ai in item.get("active_ingredients") or []: + ingredients.append({ + "name": ai.get("active_ing"), + "cas": ai.get("cas_number"), + "percent": ai.get("active_ing_percent"), + "pc_code": ai.get("pc_code"), + }) + pdffiles = item.get("pdffiles") or [] + # Most recent PDF first (sorted by date desc); API returns them in + # date-descending order but we sort defensively. + pdf_entry: dict[str, Any] | None = None + if pdffiles: + def _date_key(e: dict[str, Any]) -> str: + d = _parse_label_date(e.get("pdffile_accepted_date")) + return d or "0000-00-00" + pdf_entry = max(pdffiles, key=_date_key) + pdf_filename = pdf_entry.get("pdffile") if pdf_entry else None + pdf_url = f"{PPLS_PDF_BASE}/{pdf_filename}" if pdf_filename else None + accepted = _parse_label_date(pdf_entry.get("pdffile_accepted_date")) if pdf_entry else None + return ProductRecord( + epa_reg_no=regno, + product_name=item.get("productname"), + registrant=registrant, + registrant_company_number=company_num, + active_ingredients=ingredients, + label_pdf_url=pdf_url, + label_pdf_filename=pdf_filename, + label_accepted_date=accepted, + registration_status=item.get("product_status"), + signal_word=item.get("signal_word"), + raw_api_item=item, + ) + + +# --------------------------------------------------------------------------- +# PDF download + text extraction +# --------------------------------------------------------------------------- + + +def download_pdf(client: httpx.Client, url: str) -> tuple[bytes, str | None]: + """Download a label PDF; return (bytes, Last-Modified header or None).""" + resp = _get_with_retries(client, url) + last_modified = resp.headers.get("last-modified") + return resp.content, last_modified + + +def extract_pdf_text(pdf_bytes: bytes) -> tuple[str, bool]: + """Extract text from a PDF. + + Returns (text, has_text_layer). Concatenates pages, normalizes whitespace. + If no extractable text is found, returns ("", False). + """ + try: + reader = PdfReader(io.BytesIO(pdf_bytes)) + except PdfReadError as exc: + log.warning("pypdf failed to read PDF: %s", exc) + return "", False + chunks: list[str] = [] + for i, page in enumerate(reader.pages): + try: + page_text = page.extract_text() or "" + except Exception as exc: # pypdf can throw on malformed pages + log.warning("pypdf extract_text failed on page %d: %s", i, exc) + page_text = "" + page_text = re.sub(r"[ \t]+", " ", page_text) + page_text = re.sub(r"\n{3,}", "\n\n", page_text).strip() + if page_text: + chunks.append(page_text) + combined = "\n\n".join(chunks).strip() + return combined, bool(combined) + + +# --------------------------------------------------------------------------- +# Per-product processing +# --------------------------------------------------------------------------- + + +def _md_path(regno: str) -> Path: + return CORPUS_DIR / f"{regno}.md" + + +def _json_path(regno: str) -> Path: + return CORPUS_DIR / f"{regno}.json" + + +def process_one( + client: httpx.Client, + regno: str, + *, + force: bool = False, +) -> str: + """Fetch + extract one product. Returns 'skipped'|'wrote'|'no-pdf'|'error'.""" + md_path = _md_path(regno) + json_path = _json_path(regno) + if not force and md_path.exists() and json_path.exists(): + log.info("[%s] skip (already on disk)", regno) + return "skipped" + + try: + record = fetch_product_record(client, regno) + except Exception as exc: + log.error("[%s] API fetch failed: %s", regno, exc) + return "error" + time.sleep(REQUEST_DELAY_SECONDS) + + def _build_sidecar( + *, + label_url: str | None, + label_filename: str | None, + label_last_modified_iso: str | None, + page_count: int | None, + text_layer: bool | None, + ) -> dict[str, Any]: + return { + "source": "epa_ppls", + "source_key": regno, + "epa_reg_no": regno, + "product_name": record.product_name, + "product_class": None, # EPA PPLS doesn't expose a clean class taxonomy + "registrant": record.registrant, + "active_ingredients": record.active_ingredients, + "signal_word": record.signal_word, + "label": { + "url": label_url, + "filename": label_filename, + "accepted_date": record.label_accepted_date, + "last_modified": label_last_modified_iso, + "page_count": page_count, + "text_layer": text_layer, + }, + "supplemental_documents": [], # EPA PPLS sidecar omits supplementals; query API per regno + "source_urls": { + "product_page": None, + "label_api": f"{PPLS_API_BASE}/{regno}", + "label_index": PPLS_INDEX_URL_TEMPLATE.format(regno=regno), + }, + # EPA-specific extras (kept out of the strict canonical schema but + # useful for joins back to EPA's data model) + "registration_status": record.registration_status, + "registrant_company_number": record.registrant_company_number, + "fetched_at": datetime.now(UTC).isoformat(), + "scraper_version": SCRAPER_VERSION, + } + + if not record.label_pdf_url: + log.warning("[%s] no label PDF available — writing sidecar only", regno) + md_path.write_text( + f"# {record.product_name or regno}\n\n" + f"EPA Reg No: {regno}\n\n" + "[NO LABEL PDF AVAILABLE FROM EPA PPLS]\n", + encoding="utf-8", + ) + sidecar = _build_sidecar( + label_url=None, label_filename=None, + label_last_modified_iso=None, + page_count=None, text_layer=False, + ) + json_path.write_text(json.dumps(sidecar, indent=2), encoding="utf-8") + return "no-pdf" + + try: + pdf_bytes, last_modified_raw = download_pdf(client, record.label_pdf_url) + except Exception as exc: + log.error("[%s] PDF download failed: %s", regno, exc) + return "error" + time.sleep(REQUEST_DELAY_SECONDS) + + text, has_text = extract_pdf_text(pdf_bytes) + last_modified_iso = _http_date_to_iso(last_modified_raw) + + page_count: int | None = None + try: + page_count = len(PdfReader(io.BytesIO(pdf_bytes)).pages) + except Exception: + pass + + sidecar = _build_sidecar( + label_url=record.label_pdf_url, + label_filename=record.label_pdf_filename, + label_last_modified_iso=last_modified_iso, + page_count=page_count, + text_layer=has_text, + ) + + header_lines = [f"# {record.product_name or regno}", ""] + header_lines.append(f"- EPA Reg No: **{regno}**") + if record.registrant: + header_lines.append(f"- Registrant: {record.registrant}") + if record.signal_word: + header_lines.append(f"- Signal word: {record.signal_word}") + if record.active_ingredients: + ai_strs = [ + f"{ai.get('name')} ({ai.get('percent')}%)" + for ai in record.active_ingredients + if ai.get("name") + ] + if ai_strs: + header_lines.append("- Active ingredients: " + "; ".join(ai_strs)) + if record.label_accepted_date: + header_lines.append(f"- Label accepted: {record.label_accepted_date}") + header_lines.append(f"- Source PDF: {record.label_pdf_url}") + header_lines.append("") + header_lines.append("---") + header_lines.append("") + + if has_text: + body = text + else: + body = "[SCANNED PDF — OCR REQUIRED]\n\nThis label has no extractable text layer." + log.info("[%s] PDF has no text layer (scanned)", regno) + + md_content = "\n".join(header_lines) + body + "\n" + md_path.write_text(md_content, encoding="utf-8") + json_path.write_text(json.dumps(sidecar, indent=2), encoding="utf-8") + log.info( + "[%s] wrote (text_layer=%s, pages=%s, name=%r)", + regno, has_text, page_count, record.product_name, + ) + return "wrote" + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def _iter_regnos( + args: argparse.Namespace, + client: httpx.Client, +) -> Iterable[str]: + """Yield reg nos to process based on CLI args.""" + if args.reg_no: + for r in args.reg_no: + yield r + return + if args.seed_file: + with open(args.seed_file, encoding="utf-8") as fh: + for raw in fh: + line = raw.strip() + if not line or line.startswith("#"): + continue + yield line + return + # Default: enumerate via PPIS bulk index + rows = fetch_ppis_index(client) + count = 0 + for row in rows: + # Skip transferred-out (status_flag 'T') entries by default; their + # registration has moved to another company-product pairing. + if row.status_flag == "T": + continue + yield row.epa_reg_no + count += 1 + if args.limit and count >= args.limit: + return + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + prog="python -m scrape.sources.epa_ppls", + description="Scrape EPA PPLS pesticide labels into corpus/epa_ppls/.", + ) + parser.add_argument( + "--limit", type=int, default=None, + help="Max products to process when enumerating from PPIS.", + ) + parser.add_argument( + "--force", action="store_true", + help="Re-fetch even if .md/.json already exist.", + ) + parser.add_argument( + "--reg-no", action="append", metavar="REGNO", + help="Process specific EPA Reg No (e.g. 524-475). Repeatable.", + ) + parser.add_argument( + "--seed-file", metavar="PATH", + help="Text file with one EPA Reg No per line (# comments OK).", + ) + parser.add_argument( + "--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + ) + args = parser.parse_args(argv) + + logging.basicConfig( + stream=sys.stderr, + level=getattr(logging, args.log_level), + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + CORPUS_DIR.mkdir(parents=True, exist_ok=True) + + summary = {"wrote": 0, "skipped": 0, "no-pdf": 0, "error": 0} + with _client() as client: + for regno in _iter_regnos(args, client): + result = process_one(client, regno, force=args.force) + summary[result] = summary.get(result, 0) + 1 + + log.info("done: %s", summary) + print(json.dumps(summary), file=sys.stderr) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/sources.json b/sources.json new file mode 100644 index 0000000..0c92a42 --- /dev/null +++ b/sources.json @@ -0,0 +1,20 @@ +[ + { + "id": "bayer", + "title": "Bayer Crop Science US — Product Labels", + "type": "manufacturer", + "homepage": "https://www.cropscience.bayer.us", + "scraper": "scrape.sources.bayer", + "scraper_version": "0.1.0", + "license_note": "robots.txt explicitly permits scraping for AI retrieval-augmented generation (verified 2026-05)" + }, + { + "id": "epa_ppls", + "title": "EPA Pesticide Product Label System", + "type": "regulator", + "homepage": "https://ordspub.epa.gov/ords/pesticides/f?p=PPLS:1", + "scraper": "scrape.sources.epa_ppls", + "scraper_version": "0.1.0", + "license_note": "US federal government — public domain (no ToS restriction)" + } +]