1a45280e45
Repo/project rename to better reflect scope. PPLS is EPA's term for
their Pesticide Product Label System — accurate when the corpus was
EPA-only, narrow now that it also pulls from Bayer's own catalog
(and may expand to Syngenta/Corteva/BASF/FMC labels in the future).
crop-chem-docs scopes flexibly without acronyms to explain.
Renames:
- directory: ppls-docs → crop-chem-docs
- PRODUCT_NAME: ppls → crop_chem
- Chroma collection: ppls_docs → crop_chem_docs (in-place via .modify(), no re-embed)
- BM25 db: bm25/ppls_docs.db → bm25/crop_chem_docs.db
- MCP tool name: ppls_api_lessons → crop_chem_api_lessons
- FastMCP server name: ppls-docs → crop-chem-docs
- Env vars: PPLS_CORPUS_ROOT → CORPUS_ROOT
PPLS_CHROMA_DIR → CHROMA_DIR_OVERRIDE
- User-Agent: ppls-docs-scraper → crop-chem-docs-scraper
Preserved (intentional, correct):
- epa_ppls (source id) — refers specifically to EPA's PPLS database
- "EPA PPLS" mentions in regulatory text (lessons.md, server docstrings)
- PPLS_API_BASE / PPLS_PDF_BASE / PPLS_INDEX_URL_TEMPLATE in
scrape/sources/epa_ppls.py — these point at EPA's actual endpoints
Memory entries get updated in a follow-up commit so the rename is
isolated.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
743 lines
26 KiB
Python
743 lines
26 KiB
Python
"""Bayer Crop Science US label scraper.
|
|
|
|
Pulls herbicide / fungicide / insecticide / seed-treatment product
|
|
metadata and label PDFs from https://www.cropscience.bayer.us, extracts
|
|
each PDF to markdown, and writes a metadata sidecar JSON per product.
|
|
|
|
Output:
|
|
corpus/bayer/<slug>.md extracted label text
|
|
corpus/bayer/<slug>.json metadata sidecar (see SIDECAR_SCHEMA in
|
|
PLAN.md / this repo's CLAUDE.md)
|
|
|
|
The scraper resolves Bayer's rotating Next.js ``buildId`` from the
|
|
homepage at runtime, then walks the catalog JSON API for each product
|
|
class. It extracts the rest of the label/MSDS/supplemental download
|
|
URLs from each product page's ``__NEXT_DATA__`` JSON island — this is
|
|
strictly cheaper and more stable than scraping rendered HTML.
|
|
|
|
robots.txt for cropscience.bayer.us explicitly allows scraping for
|
|
"search engine indexing or artificial intelligence retrieval augmented
|
|
generation" use cases, which is what this corpus feeds.
|
|
|
|
CLI:
|
|
|
|
python -m scrape.sources.bayer --limit 20
|
|
python -m scrape.sources.bayer --limit 20 --force
|
|
python -m scrape.sources.bayer --product warrant
|
|
python -m scrape.sources.bayer --class herbicide --limit 5
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
import requests
|
|
from pypdf import PdfReader
|
|
|
|
SCRAPER_VERSION = "0.1.0"
|
|
USER_AGENT = "crop-chem-docs-scraper/0.1 (+https://drawbar.example/contact)"
|
|
BASE = "https://www.cropscience.bayer.us"
|
|
|
|
# Catalog product-type values used in the Next.js data API.
|
|
PRODUCT_TYPES = ("Herbicide", "Fungicide", "Insecticide", "Seed_Treatment")
|
|
|
|
# Map product-type filter -> the canonical "product_class" we record
|
|
# in the sidecar (matches the legacy URL segments).
|
|
PRODUCT_CLASS = {
|
|
"Herbicide": "herbicide",
|
|
"Fungicide": "fungicide",
|
|
"Insecticide": "insecticide",
|
|
"Seed_Treatment": "seed-treatment",
|
|
}
|
|
|
|
# Repo root: scrape/sources/bayer.py -> repo root is 3 parents up.
|
|
# Corpus root is overridable via CORPUS_ROOT for routing the
|
|
# corpus to external storage (USB drive, NAS mount, etc.) without
|
|
# editing the repo.
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
|
|
CORPUS_DIR = CORPUS_ROOT / "bayer"
|
|
|
|
# Politeness: target ~1 req/sec to Bayer. Each HTTP method goes through
|
|
# a tiny token-bucket sleeper to enforce this without per-call asyncio.
|
|
REQ_INTERVAL_SEC = 1.0
|
|
|
|
log = logging.getLogger("scrape.bayer")
|
|
|
|
|
|
# --------------------------------------------------------------------- HTTP
|
|
|
|
|
|
class RateLimitedSession:
|
|
"""``requests.Session`` wrapper with sleep-based rate limiting and
|
|
polite retries on 429/5xx."""
|
|
|
|
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
|
|
self.s = requests.Session()
|
|
self.s.headers["User-Agent"] = USER_AGENT
|
|
self.interval = interval
|
|
self._last = 0.0
|
|
|
|
def _wait(self) -> None:
|
|
delta = time.monotonic() - self._last
|
|
if delta < self.interval:
|
|
time.sleep(self.interval - delta)
|
|
self._last = time.monotonic()
|
|
|
|
def request(
|
|
self,
|
|
method: str,
|
|
url: str,
|
|
*,
|
|
max_retries: int = 4,
|
|
timeout: float = 30.0,
|
|
**kw: Any,
|
|
) -> requests.Response:
|
|
last_exc: Exception | None = None
|
|
for attempt in range(max_retries):
|
|
self._wait()
|
|
try:
|
|
resp = self.s.request(method, url, timeout=timeout, **kw)
|
|
except requests.RequestException as exc:
|
|
last_exc = exc
|
|
backoff = min(30.0, (2 ** attempt) + random.random())
|
|
log.warning("network error on %s %s: %s — retry in %.1fs",
|
|
method, url, exc, backoff)
|
|
time.sleep(backoff)
|
|
continue
|
|
if resp.status_code in (429,) or 500 <= resp.status_code < 600:
|
|
# Honor Retry-After if present, else exponential backoff.
|
|
ra = resp.headers.get("Retry-After")
|
|
if ra and ra.isdigit():
|
|
backoff = float(ra)
|
|
else:
|
|
backoff = min(30.0, (2 ** attempt) + random.random())
|
|
log.warning("HTTP %d on %s %s — retry in %.1fs",
|
|
resp.status_code, method, url, backoff)
|
|
time.sleep(backoff)
|
|
continue
|
|
return resp
|
|
if last_exc:
|
|
raise last_exc
|
|
# Final response (still bad) returned for caller to handle.
|
|
return resp
|
|
|
|
def get(self, url: str, **kw: Any) -> requests.Response:
|
|
return self.request("GET", url, **kw)
|
|
|
|
def head(self, url: str, **kw: Any) -> requests.Response:
|
|
kw.setdefault("allow_redirects", True)
|
|
return self.request("HEAD", url, **kw)
|
|
|
|
|
|
# --------------------------------------------------------------------- model
|
|
|
|
|
|
@dataclass
|
|
class SupplementalDoc:
|
|
kind: str
|
|
title: str
|
|
url: str
|
|
last_modified: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class BayerProduct:
|
|
slug: str # filesystem-safe slug, e.g. "warrant"
|
|
catalog_slug: str # bayer's seoSlug, e.g. "warrant-herbicide"
|
|
product_url_path: str # e.g. "/crop-protection/herbicide/warrant-herbicide"
|
|
product_class: str # "herbicide" | "fungicide" | ...
|
|
product_name: str = ""
|
|
epa_reg_no: str | None = None
|
|
active_ingredients: list[dict] = field(default_factory=list) # [{name, cas, percent}]
|
|
label_url: str | None = None
|
|
label_filename: str | None = None
|
|
label_last_modified: str | None = None
|
|
label_page_count: int | None = None
|
|
label_text_layer: bool | None = None
|
|
supplemental_pdfs: list[SupplementalDoc] = field(default_factory=list)
|
|
source_page_url: str = ""
|
|
|
|
|
|
# --------------------------------------------------------------------- helpers
|
|
|
|
|
|
_NEXT_DATA_RE = re.compile(
|
|
r'<script id="__NEXT_DATA__"[^>]*>(.*?)</script>', re.S
|
|
)
|
|
|
|
|
|
def parse_next_data(html: str) -> dict[str, Any]:
|
|
"""Pull the ``__NEXT_DATA__`` JSON blob out of a Next.js page."""
|
|
m = _NEXT_DATA_RE.search(html)
|
|
if not m:
|
|
raise RuntimeError("no __NEXT_DATA__ script tag found")
|
|
return json.loads(m.group(1))
|
|
|
|
|
|
def fetch_build_id(http: RateLimitedSession) -> str:
|
|
"""Grab the rotating ``buildId`` from the Bayer homepage."""
|
|
r = http.get(BASE + "/")
|
|
r.raise_for_status()
|
|
data = parse_next_data(r.text)
|
|
bid = data.get("buildId")
|
|
if not bid:
|
|
raise RuntimeError("buildId missing from homepage __NEXT_DATA__")
|
|
log.info("resolved Bayer buildId=%s", bid)
|
|
return bid
|
|
|
|
|
|
def normalize_epa_reg(raw: str | None) -> str | None:
|
|
"""Convert Bayer's padded reg number to canonical EPA form.
|
|
|
|
Example: ``0000524-00591-AA-0000000`` -> ``524-591``.
|
|
The trailing ``-AA-0000000`` is a Bayer-internal qualifier we
|
|
don't surface. We keep ``524-591/<sub>`` if a non-empty sub-reg
|
|
appears (rare).
|
|
"""
|
|
if not raw:
|
|
return None
|
|
parts = raw.split("-")
|
|
if len(parts) < 2:
|
|
return raw.strip() or None
|
|
company = parts[0].lstrip("0") or "0"
|
|
product = parts[1].lstrip("0") or "0"
|
|
epa = f"{company}-{product}"
|
|
# If the third segment is something other than the default "AA",
|
|
# it's likely a distributor sub-reg. Preserve it.
|
|
if len(parts) >= 3 and parts[2] and parts[2] != "AA":
|
|
epa += f"-{parts[2]}"
|
|
return epa
|
|
|
|
|
|
def classify_supplemental(title: str, url: str) -> str:
|
|
"""Classify a supplemental/auxiliary doc by its title or URL.
|
|
|
|
Returns a short kind tag like ``2EE``, ``24C``, ``24C-CA``,
|
|
``Bulletin``, ``MSDS``, ``Label``, or ``Other``. The exact tag
|
|
isn't load-bearing for the scraper — it's metadata to help the
|
|
chunker/agent later. Best-effort regex; ambiguous = ``Other``.
|
|
"""
|
|
t = (title or "").upper()
|
|
u = (url or "").upper()
|
|
blob = f"{t} {u}"
|
|
|
|
# State-specific 24c labels usually carry a two-letter state code,
|
|
# but Bayer's titles rarely encode it. Best we can do is flag 24c.
|
|
if "24C" in blob or "SECTION_24C" in blob or "SECTION 24C" in blob:
|
|
# Try to spot a state suffix in the URL (e.g. "_24c_ca").
|
|
m = re.search(r"24[_-]?C[_-]([A-Z]{2})\b", u)
|
|
if m:
|
|
return f"24C-{m.group(1)}"
|
|
return "24C"
|
|
if "2EE" in blob or "2_EE" in blob:
|
|
return "2EE"
|
|
if "MSDS" in blob or "SDS" in blob or "SAFETY DATA" in blob:
|
|
return "MSDS"
|
|
if "BULLETIN" in blob:
|
|
return "Bulletin"
|
|
if "SUPPLEMENTAL" in blob:
|
|
return "Supplemental"
|
|
if "LABEL" in blob:
|
|
return "Label"
|
|
return "Other"
|
|
|
|
|
|
def safe_slug(catalog_slug: str, product_class: str) -> str:
|
|
"""Strip the trailing class suffix so ``warrant-herbicide`` becomes
|
|
``warrant``; falls back to the full slug for slugs that don't end
|
|
with the class word."""
|
|
suffix = f"-{product_class}"
|
|
if catalog_slug.endswith(suffix):
|
|
return catalog_slug[: -len(suffix)]
|
|
# seed-treatment is sometimes split or omitted; just return as-is.
|
|
return catalog_slug
|
|
|
|
|
|
def iso_from_http_date(http_date: str | None) -> str | None:
|
|
"""RFC1123 -> ISO 8601 UTC. Returns None if unparseable."""
|
|
if not http_date:
|
|
return None
|
|
try:
|
|
from email.utils import parsedate_to_datetime
|
|
dt = parsedate_to_datetime(http_date)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc).isoformat()
|
|
except Exception: # noqa: BLE001
|
|
return None
|
|
|
|
|
|
# --------------------------------------------------------------------- catalog
|
|
|
|
|
|
def walk_catalog(
|
|
http: RateLimitedSession, build_id: str
|
|
) -> Iterable[BayerProduct]:
|
|
"""Yield ``BayerProduct`` stubs for every product across all classes.
|
|
|
|
Stubs carry only catalog-level info (slug, URL, class). The detail
|
|
fetch (EPA reg, ingredients, PDFs) happens later via
|
|
:func:`fetch_product_detail`.
|
|
"""
|
|
for ptype in PRODUCT_TYPES:
|
|
product_class = PRODUCT_CLASS[ptype]
|
|
page = 1
|
|
seen = 0
|
|
while True:
|
|
url = (
|
|
f"{BASE}/_next/data/{build_id}/crop-protection/catalog.json"
|
|
f"?productType={ptype}&p={page}"
|
|
)
|
|
r = http.get(url)
|
|
if r.status_code != 200:
|
|
log.warning("catalog %s p=%d -> HTTP %d, stopping class",
|
|
ptype, page, r.status_code)
|
|
break
|
|
data = r.json().get("pageProps", {})
|
|
products = data.get("serverProducts") or []
|
|
total = data.get("total") or 0
|
|
if not products:
|
|
break
|
|
for p in products:
|
|
slug = p.get("seoSlug") or ""
|
|
product_url = p.get("productURL") or ""
|
|
if not slug or not product_url:
|
|
continue
|
|
yield BayerProduct(
|
|
slug=safe_slug(slug, product_class),
|
|
catalog_slug=slug,
|
|
product_url_path=product_url,
|
|
product_class=product_class,
|
|
)
|
|
seen += len(products)
|
|
if seen >= total:
|
|
break
|
|
page += 1
|
|
|
|
|
|
# --------------------------------------------------------------------- detail
|
|
|
|
|
|
def fetch_product_detail(
|
|
http: RateLimitedSession, prod: BayerProduct
|
|
) -> BayerProduct:
|
|
"""Populate EPA reg, active ingredients, and the full PDF list on
|
|
a catalog stub by fetching its product page __NEXT_DATA__."""
|
|
page_url = BASE + prod.product_url_path
|
|
prod.source_page_url = page_url
|
|
r = http.get(page_url)
|
|
r.raise_for_status()
|
|
data = parse_next_data(r.text)
|
|
pp = (data.get("props") or {}).get("pageProps") or {}
|
|
pd = pp.get("productDetails") or {}
|
|
|
|
prod.product_name = pd.get("productLabel") or pd.get("productName") or prod.slug
|
|
prod.epa_reg_no = normalize_epa_reg(pd.get("registrationNumber"))
|
|
# Bayer's product page exposes ingredient names only — no CAS or percent.
|
|
# Conform to the canonical schema by emitting objects with name set and
|
|
# the other fields null; downstream consumers can hydrate from EPA PPLS.
|
|
prod.active_ingredients = [
|
|
{"name": a.get("ingredient"), "cas": None, "percent": None}
|
|
for a in (pd.get("activeIngredients") or [])
|
|
if a.get("ingredient")
|
|
]
|
|
|
|
# Primary label: prefer downloadLabelUrl, then importantDocuments.
|
|
important = (pp.get("importantDocuments") or {}).get("labelData") or []
|
|
additional = (pp.get("additionalDownloads") or {}).get("labelData") or []
|
|
download_url = pp.get("downloadLabelUrl")
|
|
|
|
label_url: str | None = None
|
|
if download_url and looks_like_pdf(download_url):
|
|
label_url = download_url
|
|
else:
|
|
# First entry titled "Label" or simply the first PDF.
|
|
for d in important:
|
|
t = (d.get("title") or "").lower()
|
|
u = d.get("url") or ""
|
|
if not looks_like_pdf(u):
|
|
continue
|
|
if "label" in t and "msds" not in t and "sds" not in t:
|
|
label_url = u
|
|
break
|
|
if not label_url:
|
|
for d in important + additional:
|
|
u = d.get("url") or ""
|
|
if looks_like_pdf(u):
|
|
label_url = u
|
|
break
|
|
|
|
prod.label_url = label_url
|
|
if label_url:
|
|
# Last URL segment is the Scene7 asset id (e.g. "Warrant_2025pdf").
|
|
prod.label_filename = label_url.rsplit("/", 1)[-1]
|
|
|
|
# Collect ALL other PDFs as supplementals (label/MSDS/24c/2EE/bulletin
|
|
# /etc.). The kind tag is best-effort; the chunker can refine later.
|
|
supplementals: list[SupplementalDoc] = []
|
|
seen_urls: set[str] = set()
|
|
if label_url:
|
|
seen_urls.add(label_url)
|
|
for d in important + additional:
|
|
u = d.get("url") or ""
|
|
t = d.get("title") or ""
|
|
if not u or u in seen_urls:
|
|
continue
|
|
if not looks_like_pdf(u):
|
|
continue
|
|
seen_urls.add(u)
|
|
supplementals.append(SupplementalDoc(
|
|
kind=classify_supplemental(t, u),
|
|
title=t,
|
|
url=u,
|
|
))
|
|
prod.supplemental_pdfs = supplementals
|
|
|
|
return prod
|
|
|
|
|
|
def looks_like_pdf(url: str) -> bool:
|
|
"""True if the URL is one of Bayer's PDF endpoints.
|
|
|
|
Bayer serves PDFs via Adobe Scene7 with the literal ``pdf`` (no
|
|
dot) appended to the asset ID, plus some assets on cs-contentapi
|
|
with a real ``.pdf`` extension.
|
|
"""
|
|
u = url.lower()
|
|
if u.endswith("pdf"):
|
|
return True
|
|
if u.endswith(".pdf"):
|
|
return True
|
|
return False
|
|
|
|
|
|
# --------------------------------------------------------------------- PDF
|
|
|
|
|
|
def head_last_modified(http: RateLimitedSession, url: str) -> str | None:
|
|
"""Resolve Last-Modified for a PDF URL. Returns ISO 8601 or None."""
|
|
try:
|
|
r = http.head(url)
|
|
except requests.RequestException as exc:
|
|
log.warning("HEAD failed for %s: %s", url, exc)
|
|
return None
|
|
if r.status_code != 200:
|
|
log.warning("HEAD %s -> HTTP %d", url, r.status_code)
|
|
return None
|
|
return iso_from_http_date(r.headers.get("Last-Modified"))
|
|
|
|
|
|
def fetch_pdf_text(http: RateLimitedSession, url: str) -> tuple[str, int, bool]:
|
|
"""Download a PDF and return ``(text, page_count, has_text_layer)``.
|
|
|
|
Concatenates all pages, normalizes whitespace, and collapses runs
|
|
of blank lines so the resulting markdown diffs cleanly. ``has_text_layer``
|
|
is False for scanned PDFs whose pypdf extract produced no text.
|
|
"""
|
|
r = http.get(url)
|
|
r.raise_for_status()
|
|
if "pdf" not in (r.headers.get("Content-Type") or "").lower():
|
|
log.warning("expected PDF Content-Type at %s, got %s",
|
|
url, r.headers.get("Content-Type"))
|
|
reader = PdfReader(io.BytesIO(r.content))
|
|
page_count = len(reader.pages)
|
|
chunks: list[str] = []
|
|
for page in reader.pages:
|
|
try:
|
|
text = page.extract_text() or ""
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("pypdf extract_text failed on a page of %s: %s",
|
|
url, exc)
|
|
text = ""
|
|
chunks.append(text)
|
|
raw = "\n\n".join(chunks)
|
|
normalized = normalize_text(raw)
|
|
has_text_layer = bool(normalized.strip())
|
|
return normalized, page_count, has_text_layer
|
|
|
|
|
|
def normalize_text(s: str) -> str:
|
|
# Strip trailing spaces per line, collapse 3+ blank lines to 2,
|
|
# and trim NBSPs that pypdf often leaves behind.
|
|
s = s.replace("\u00a0", " ")
|
|
s = re.sub(r"[ \t]+\n", "\n", s)
|
|
s = re.sub(r"\n{3,}", "\n\n", s)
|
|
return s.strip() + "\n"
|
|
|
|
|
|
# --------------------------------------------------------------------- write
|
|
|
|
|
|
def write_product(prod: BayerProduct, body_md: str) -> None:
|
|
"""Write the canonical sidecar + markdown body. See scrape/README.md
|
|
for the schema."""
|
|
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
|
md_path = CORPUS_DIR / f"{prod.slug}.md"
|
|
json_path = CORPUS_DIR / f"{prod.slug}.json"
|
|
|
|
# Lightweight markdown frontmatter for human eyeballing — canonical
|
|
# metadata lives in the sidecar.
|
|
title = prod.product_name or prod.slug
|
|
ai_summary = ", ".join(a["name"] for a in prod.active_ingredients if a.get("name")) or "(unknown)"
|
|
header = (
|
|
f"# {title}\n\n"
|
|
f"- **Product class:** {prod.product_class}\n"
|
|
f"- **EPA Reg No:** {prod.epa_reg_no or '(unknown)'}\n"
|
|
f"- **Active ingredients:** {ai_summary}\n"
|
|
f"- **Source:** {prod.source_page_url}\n"
|
|
f"- **Label PDF:** {prod.label_url or '(none on page)'}\n\n"
|
|
"---\n\n"
|
|
)
|
|
md_path.write_text(header + body_md, encoding="utf-8")
|
|
|
|
sidecar = {
|
|
"source": "bayer",
|
|
"source_key": prod.slug,
|
|
"epa_reg_no": prod.epa_reg_no,
|
|
"product_name": prod.product_name,
|
|
"product_class": prod.product_class,
|
|
"registrant": None,
|
|
"active_ingredients": prod.active_ingredients,
|
|
"signal_word": None,
|
|
"label": {
|
|
"url": prod.label_url,
|
|
"filename": prod.label_filename,
|
|
"accepted_date": None,
|
|
"last_modified": prod.label_last_modified,
|
|
"page_count": prod.label_page_count,
|
|
"text_layer": prod.label_text_layer,
|
|
},
|
|
"supplemental_documents": [
|
|
{
|
|
"kind": s.kind,
|
|
"title": s.title,
|
|
"url": s.url,
|
|
"last_modified": s.last_modified,
|
|
}
|
|
for s in prod.supplemental_pdfs
|
|
],
|
|
"source_urls": {
|
|
"product_page": prod.source_page_url,
|
|
"label_api": None,
|
|
"label_index": None,
|
|
},
|
|
"fetched_at": datetime.now(timezone.utc).isoformat(),
|
|
"scraper_version": SCRAPER_VERSION,
|
|
}
|
|
json_path.write_text(
|
|
json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------- pipeline
|
|
|
|
|
|
def process_product(
|
|
http: RateLimitedSession,
|
|
prod: BayerProduct,
|
|
*,
|
|
force: bool,
|
|
seen_regs: set[str] | None = None,
|
|
) -> str:
|
|
"""Fetch detail + PDF and write to disk. Returns a status string
|
|
suitable for logging: ``written``, ``skipped``, ``dup-skip``,
|
|
``no-pdf``, ``failed``.
|
|
|
|
``seen_regs``, if provided, is mutated: EPA reg nos written by this
|
|
call are added so subsequent calls within the same run can dedup
|
|
against products served under multiple catalog product-type queries
|
|
(the seed-treatment query in particular re-serves herbicide /
|
|
fungicide / insecticide products that have seed-treatment use sites).
|
|
"""
|
|
md_path = CORPUS_DIR / f"{prod.slug}.md"
|
|
if md_path.exists() and not force:
|
|
return "skipped"
|
|
try:
|
|
fetch_product_detail(http, prod)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.error("detail fetch failed for %s: %s", prod.slug, exc)
|
|
return "failed"
|
|
|
|
# Dedup: same EPA reg no already written in this run under a
|
|
# different catalog product-type (and thus a different slug).
|
|
if seen_regs is not None and prod.epa_reg_no and prod.epa_reg_no in seen_regs:
|
|
log.info("dup-skip %s (epa=%s already processed under canonical class)",
|
|
prod.slug, prod.epa_reg_no)
|
|
return "dup-skip"
|
|
|
|
# Resolve Last-Modified for label + supplementals (HEAD only, cheap).
|
|
if prod.label_url:
|
|
prod.label_last_modified = head_last_modified(http, prod.label_url)
|
|
for s in prod.supplemental_pdfs:
|
|
s.last_modified = head_last_modified(http, s.url)
|
|
|
|
if not prod.label_url:
|
|
# Some Bayer products have no public label PDF (e.g. product was
|
|
# discontinued or the page only carries a Product Bulletin). We
|
|
# still record the metadata sidecar so the catalog is complete,
|
|
# but write a stub body so the file count reflects reality.
|
|
log.info("%s — no label PDF; writing metadata only", prod.slug)
|
|
prod.label_text_layer = False
|
|
write_product(prod, "_(No label PDF was found on the product page.)_\n")
|
|
return "no-pdf"
|
|
|
|
try:
|
|
body, page_count, text_layer = fetch_pdf_text(http, prod.label_url)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.error("PDF fetch/extract failed for %s (%s): %s",
|
|
prod.slug, prod.label_url, exc)
|
|
return "failed"
|
|
|
|
prod.label_page_count = page_count
|
|
prod.label_text_layer = text_layer
|
|
if not body.strip():
|
|
log.warning("%s — extracted PDF was empty (scanned?)", prod.slug)
|
|
body = "[SCANNED PDF — OCR REQUIRED]\n"
|
|
|
|
write_product(prod, body)
|
|
return "written"
|
|
|
|
|
|
def _load_seen_regs() -> set[str]:
|
|
"""Hydrate the seen-EPA-reg-no set from existing sidecars on disk
|
|
so dedup survives across runs (e.g., a re-run with the seed-treatment
|
|
query won't re-write products already on disk under their canonical
|
|
slug)."""
|
|
seen: set[str] = set()
|
|
if not CORPUS_DIR.exists():
|
|
return seen
|
|
for f in CORPUS_DIR.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8"))
|
|
reg = data.get("epa_reg_no")
|
|
if reg:
|
|
seen.add(reg)
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
return seen
|
|
|
|
|
|
def run(
|
|
*,
|
|
limit: int | None,
|
|
force: bool,
|
|
only_product: str | None,
|
|
only_class: str | None,
|
|
) -> int:
|
|
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
|
http = RateLimitedSession()
|
|
build_id = fetch_build_id(http)
|
|
|
|
products: list[BayerProduct] = []
|
|
for prod in walk_catalog(http, build_id):
|
|
if only_class and prod.product_class != only_class:
|
|
continue
|
|
if only_product and prod.slug != only_product and prod.catalog_slug != only_product:
|
|
continue
|
|
products.append(prod)
|
|
|
|
if only_product and not products:
|
|
log.error("no product matched --product=%s", only_product)
|
|
return 2
|
|
|
|
log.info("catalog yielded %d candidate product(s)", len(products))
|
|
|
|
# Seed the dedup set from disk so re-runs and force-runs both behave.
|
|
seen_regs: set[str] = set() if force else _load_seen_regs()
|
|
if seen_regs:
|
|
log.info("dedup: %d EPA reg nos pre-loaded from existing corpus", len(seen_regs))
|
|
|
|
counts = {"written": 0, "skipped": 0, "dup-skip": 0, "no-pdf": 0, "failed": 0}
|
|
processed = 0
|
|
for prod in products:
|
|
if limit is not None and processed >= limit:
|
|
break
|
|
processed += 1
|
|
status = process_product(http, prod, force=force, seen_regs=seen_regs)
|
|
counts[status] = counts.get(status, 0) + 1
|
|
if status in ("written", "no-pdf") and prod.epa_reg_no:
|
|
seen_regs.add(prod.epa_reg_no)
|
|
log.info(
|
|
"[%d/%s] %s %s | class=%s epa=%s ai=%s label=%s",
|
|
processed, str(limit) if limit else "all",
|
|
prod.slug, status,
|
|
prod.product_class,
|
|
prod.epa_reg_no or "-",
|
|
",".join(a["name"] for a in prod.active_ingredients if a.get("name")) or "-",
|
|
prod.label_url or "-",
|
|
)
|
|
|
|
log.info(
|
|
"done: processed=%d written=%d skipped=%d dup-skip=%d no-pdf=%d failed=%d",
|
|
processed,
|
|
counts["written"], counts["skipped"], counts["dup-skip"],
|
|
counts["no-pdf"], counts["failed"],
|
|
)
|
|
return 0 if counts["failed"] == 0 else 1
|
|
|
|
|
|
# --------------------------------------------------------------------- CLI
|
|
|
|
|
|
def _build_argparser() -> argparse.ArgumentParser:
|
|
p = argparse.ArgumentParser(
|
|
prog="scrape.sources.bayer",
|
|
description="Scrape Bayer Crop Science US product labels.",
|
|
)
|
|
p.add_argument(
|
|
"--limit", type=int, default=None,
|
|
help="Stop after processing N products (default: all).",
|
|
)
|
|
p.add_argument(
|
|
"--force", action="store_true",
|
|
help="Re-download even if the markdown file already exists.",
|
|
)
|
|
p.add_argument(
|
|
"--product", default=None,
|
|
help="Process a single product by slug (e.g. 'warrant' or "
|
|
"'warrant-herbicide').",
|
|
)
|
|
p.add_argument(
|
|
"--class", dest="product_class", default=None,
|
|
choices=sorted(set(PRODUCT_CLASS.values())),
|
|
help="Limit to one product class.",
|
|
)
|
|
p.add_argument(
|
|
"--log-level", default=os.environ.get("LOG_LEVEL", "INFO"),
|
|
help="Python logging level (default INFO).",
|
|
)
|
|
return p
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = _build_argparser().parse_args(argv)
|
|
logging.basicConfig(
|
|
level=args.log_level.upper(),
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
stream=sys.stderr,
|
|
)
|
|
return run(
|
|
limit=args.limit,
|
|
force=args.force,
|
|
only_product=args.product,
|
|
only_class=args.product_class,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|