0bac06b7b6
Image rebuild (skip scrape) / build (push) Successful in 4m48s
Co-authored-by: claude <claude@jpaul.io> Co-committed-by: claude <claude@jpaul.io>
970 lines
38 KiB
Python
970 lines
38 KiB
Python
"""RobSeeCo (Rob-See-Co / Innotech) seed-guide scraper.
|
||
|
||
Source: the 2026 RobSeeCo Seed Guide PDF
|
||
(``robseeco.com/s/2026_RobSeeCo-Seed-Guide_FINAL-LR-Single.pdf`` — a
|
||
302 to a static1.squarespace.com asset). Rob-See-Co is an independent
|
||
seed company serving the Western/Central Corn Belt + Dakotas (Eastern
|
||
Corn Belt via Federal Hybrids / Kiser Seed); the guide carries the
|
||
Rob-See-Co corn line, the Rob-See-Co + Innotech soybean lines, plus
|
||
Masters Choice silage corn and sorghum (both OUT OF SCOPE for the
|
||
row-crop advisor — skipped).
|
||
|
||
This is a marketing-laid-out PDF, the hardest extraction class. The
|
||
value lives in two complementary places per crop:
|
||
|
||
1. A dense **ratings table** (corn p5-8, soy p19-26) — one row per
|
||
hybrid/variety with ~14 single-digit 1-9 agronomic + disease
|
||
ratings. The numeric column headers are ROTATED 90deg (rendered as
|
||
reversed text), so we reconstruct each header label by clustering the
|
||
rotated glyphs by their x-position, ordering bottom-to-top, and
|
||
reversing the string. We then map each numeric data cell to its
|
||
column by **x-center alignment** (the most reliable signal — the
|
||
whitespace-tokenised text drops/duplicates cells around the sparse
|
||
"Fungicide Response" / soil-fit columns and the trailing geo letters).
|
||
|
||
2. Descriptive **2-column cards** (corn p9-18, soy p21-26) — code +
|
||
trait variants (corn) + 3-6 marketing bullets. We split the page at
|
||
its mid-x into a left / right column, cluster each into lines, and
|
||
group consecutive title lines (font size 14) + their following
|
||
bullet lines (size 9.5) into one card. Cards enrich the table record
|
||
with the corn trait-variant suffixes (-RR2 / -VT2P / -Conv / …) and
|
||
the bullets (``strengths``).
|
||
|
||
The table is authoritative for the structured 1-9 ``characteristics_groups``;
|
||
cards add the trait stack + bullets. A table row that won't parse cleanly
|
||
(too few aligned numeric cells) falls back to a **card-only** record
|
||
(identity + bullets) rather than emit mis-assigned ratings.
|
||
|
||
EVERY content page in the PDF is DUPLICATED consecutively (p5 == p6,
|
||
p7 == p8, …), and the two covers carry mirrored text — we dedup by
|
||
source_key and only ever process the first occurrence of a code.
|
||
|
||
Output:
|
||
corpus/robseeco/<source_key>.md
|
||
corpus/robseeco/<source_key>.json
|
||
|
||
source_key: ``robseeco-<code>`` lowercased, e.g. ``robseeco-rc2500``,
|
||
``robseeco-is1162e3``.
|
||
|
||
CLI:
|
||
python -m scrape.sources.robseeco --crop corn --limit 5
|
||
python -m scrape.sources.robseeco --force
|
||
python -m scrape.sources.robseeco --product robseeco-rc2500
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import os
|
||
import random
|
||
import re
|
||
import sys
|
||
import time
|
||
from collections import defaultdict
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import pdfplumber
|
||
import requests
|
||
|
||
SCRAPER_VERSION = "0.1.0"
|
||
USER_AGENT = "seed-mcp-scraper/0.1 (+https://drawbar.example/contact)"
|
||
PDF_URL = "https://www.robseeco.com/s/2026_RobSeeCo-Seed-Guide_FINAL-LR-Single.pdf"
|
||
PRODUCTS_URL = "https://www.robseeco.com/products"
|
||
|
||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
|
||
CORPUS_DIR = CORPUS_ROOT / "robseeco"
|
||
PDF_CACHE = Path(os.environ.get("ROBSEECO_PDF_CACHE")
|
||
or (REPO_ROOT / "var" / "robseeco_2026_seed_guide.pdf"))
|
||
|
||
REQ_INTERVAL_SEC = 1.5
|
||
|
||
# Scale direction — taken verbatim from the legend on p7-8 of the guide.
|
||
SCALE_DIRECTION = (
|
||
"Agronomic and disease ratings 1-9, 9=Best, 1=Worst, -=not available "
|
||
"(higher is better). Plant Height 9=Tall/1=Short; Ear Height 9=High/1=Low. "
|
||
"Planting Rate Guideline L=Low / ML=Medium-Low / M=Medium / MH=Medium-High / "
|
||
"H=High. Disease/pest letter codes (soy): R=resistant, MR=moderately resistant, "
|
||
"S=susceptible. Product Fit Geography A=All, C=Central (IA/MN/WI), "
|
||
"E=East (IN/MI/OH/PA/MD), W=West (ND/SD/NE/KS/OK/TX & West), CW=Central+West."
|
||
)
|
||
|
||
REGIONAL_REC = [{
|
||
"product_list_name": ("RobSeeCo dealer network (Western/Central Corn Belt + "
|
||
"Dakotas; Eastern Corn Belt via Federal Hybrids/Kiser Seed)"),
|
||
"agronomist": None,
|
||
"agronomist_email": None,
|
||
"variant_id": None,
|
||
}]
|
||
|
||
# Page sections (0-indexed). Content pages are duplicated consecutively,
|
||
# so the ranges deliberately span both copies — dedup by source_key
|
||
# handles the repeat.
|
||
CORN_TABLE_PAGES = range(5, 9) # p5-8
|
||
CORN_CARD_PAGES = range(9, 19) # p9-18
|
||
SOY_TABLE_PAGES = range(19, 27) # p19-26
|
||
SOY_CARD_PAGES = range(21, 27) # p21-26 (soy cards interleave after the table)
|
||
|
||
log = logging.getLogger("scrape.robseeco")
|
||
|
||
|
||
# --------------------------------------------------------------------- HTTP
|
||
|
||
|
||
class RateLimitedSession:
|
||
"""Polite session with backoff. We make exactly one request (the PDF)
|
||
so this is mostly belt-and-suspenders, but it keeps the source uniform
|
||
with the other scrapers and gives retry/backoff on a flaky CDN."""
|
||
|
||
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
|
||
self.s = requests.Session()
|
||
self.s.headers["User-Agent"] = USER_AGENT
|
||
self.interval = interval
|
||
self._last = 0.0
|
||
|
||
def _wait(self) -> None:
|
||
delta = time.monotonic() - self._last
|
||
if delta < self.interval:
|
||
time.sleep(self.interval - delta)
|
||
self._last = time.monotonic()
|
||
|
||
def request(self, method: str, url: str, *, max_retries: int = 4,
|
||
timeout: float = 90.0, **kw: Any) -> requests.Response:
|
||
last_exc: Exception | None = None
|
||
for attempt in range(max_retries):
|
||
self._wait()
|
||
try:
|
||
resp = self.s.request(method, url, timeout=timeout,
|
||
allow_redirects=True, **kw)
|
||
except requests.RequestException as exc:
|
||
last_exc = exc
|
||
backoff = min(30.0, (2 ** attempt) + random.random())
|
||
log.warning("network error on %s %s: %s — retry in %.1fs",
|
||
method, url, exc, backoff)
|
||
time.sleep(backoff)
|
||
continue
|
||
if resp.status_code == 429 or 500 <= resp.status_code < 600:
|
||
ra = resp.headers.get("Retry-After")
|
||
backoff = float(ra) if (ra and ra.isdigit()) else min(
|
||
30.0, (2 ** attempt) + random.random())
|
||
log.warning("HTTP %d on %s %s — retry in %.1fs",
|
||
resp.status_code, method, url, backoff)
|
||
time.sleep(backoff)
|
||
continue
|
||
return resp
|
||
if last_exc:
|
||
raise last_exc
|
||
return resp # type: ignore[return-value]
|
||
|
||
def get(self, url: str, **kw: Any) -> requests.Response:
|
||
return self.request("GET", url, **kw)
|
||
|
||
|
||
def fetch_pdf(http: RateLimitedSession, *, force: bool) -> Path:
|
||
"""Download the seed guide to the cache path (reused unless --force).
|
||
Reproducible for the monthly refresh — the scraper owns the download."""
|
||
if PDF_CACHE.exists() and PDF_CACHE.stat().st_size > 1_000_000 and not force:
|
||
log.info("using cached PDF %s (%d bytes)", PDF_CACHE, PDF_CACHE.stat().st_size)
|
||
return PDF_CACHE
|
||
PDF_CACHE.parent.mkdir(parents=True, exist_ok=True)
|
||
log.info("downloading seed guide %s", PDF_URL)
|
||
r = http.get(PDF_URL)
|
||
r.raise_for_status()
|
||
ct = r.headers.get("Content-Type", "")
|
||
if "pdf" not in ct.lower() and not r.content[:4] == b"%PDF":
|
||
raise RuntimeError(f"unexpected content-type {ct!r} for {PDF_URL}")
|
||
PDF_CACHE.write_bytes(r.content)
|
||
log.info("cached PDF -> %s (%d bytes)", PDF_CACHE, len(r.content))
|
||
return PDF_CACHE
|
||
|
||
|
||
# --------------------------------------------------------------------- model
|
||
|
||
|
||
@dataclass
|
||
class RSVariety:
|
||
source_key: str
|
||
crop: str # "corn" | "soybeans"
|
||
brand: str # "Rob-See-Co" | "Innotech"
|
||
product_name: str # the base code
|
||
relative_maturity: int | None = None # corn
|
||
maturity_group: float | None = None # soy
|
||
trait_stack: list[str] = field(default_factory=list)
|
||
positioning: str | None = None
|
||
strengths: list[str] = field(default_factory=list)
|
||
# characteristics_groups: [{label, items:[{characteristic,value}]}]
|
||
groups: list[dict] = field(default_factory=list)
|
||
from_table: bool = True # False = card-only fallback
|
||
|
||
|
||
# --------------------------------------------------------------------- header reconstruction
|
||
|
||
|
||
def reconstruct_rotated_headers(page: pdfplumber.page.Page,
|
||
top_max: float) -> list[tuple[float, str]]:
|
||
"""Return ``[(x_center, label), ...]`` for the rotated 90deg column
|
||
headers in the top header band of a ratings-table page.
|
||
|
||
Rotated text comes back as ``upright=False`` words; each column shares
|
||
an x-position and reads bottom-to-top, so we cluster by ``round(x0)``,
|
||
order each column's glyphs by descending ``top`` (read order), join,
|
||
and reverse the string to recover the human-readable label.
|
||
"""
|
||
words = page.extract_words(use_text_flow=False, keep_blank_chars=False,
|
||
extra_attrs=["upright"])
|
||
rot = [w for w in words if not w["upright"] and w["top"] < top_max]
|
||
cols: dict[int, list[dict]] = defaultdict(list)
|
||
for w in rot:
|
||
cols[round(w["x0"])].append(w)
|
||
out: list[tuple[float, str]] = []
|
||
for x0 in sorted(cols):
|
||
ws = cols[x0]
|
||
ws.sort(key=lambda w: -w["top"]) # bottom glyph first
|
||
label = "".join(w["text"] for w in ws)[::-1]
|
||
cx = sum((w["x0"] + w["x1"]) / 2 for w in ws) / len(ws)
|
||
out.append((round(cx, 2), label))
|
||
return out
|
||
|
||
|
||
# --------------------------------------------------------------------- column maps
|
||
#
|
||
# The decoded (reversed) rotated-header label -> a canonical column key.
|
||
# Built once from the reconstructed headers per page; the per-row cell
|
||
# alignment then keys on x-center, so these maps only need to recognise
|
||
# the header *strings* the reconstruction produces.
|
||
|
||
# Canonical corn columns (decoded label fragments -> canonical key).
|
||
# We match on the de-spaced reversed string the reconstructor emits.
|
||
CORN_HEADER_CANON = {
|
||
"RM": "RM",
|
||
"SilktoRM": "RM to Silk",
|
||
"BlacklayertoRM": "RM to Blacklayer",
|
||
"BlacklayertoGDU": "GDU to Blacklayer",
|
||
"Emergence": "Emergence",
|
||
"VigorSeedling": "Seedling Vigor",
|
||
"StrengthRoot": "Root Strength",
|
||
"StrengthStalk": "Stalk Strength",
|
||
"SnapGreen": "Green Snap",
|
||
"Staygreen": "Staygreen",
|
||
"Drydown": "Drydown",
|
||
"ToleranceDrought": "Drought Tolerance",
|
||
"HeightPlant": "Plant Height",
|
||
"HeightEar": "Ear Height",
|
||
"WeightTest": "Test Weight",
|
||
"SpotLeafGray": "Gray Leaf Spot",
|
||
"WiltGoss’s": "Goss's Wilt",
|
||
"WiltGoss's": "Goss's Wilt",
|
||
"CornNorthern": "Northern Corn Leaf Blight",
|
||
"BlightLeaf": "Northern Corn Leaf Blight",
|
||
"SpotTar": "Tar Spot",
|
||
"ResponseFungicide": "Fungicide Response (Absence of Disease in Continuous Corn)",
|
||
"DiseaseofAbsencein": "Fungicide Response (Absence of Disease in Continuous Corn)",
|
||
"RatePlanting": "Planting Rate Guideline",
|
||
"Guidelines": "Planting Rate Guideline",
|
||
"SoilsProductiveHighly": "Continuous-corn fit: Highly Productive Soils",
|
||
"SoilsVariable": "Continuous-corn fit: Variable Soils",
|
||
"SoilsDrainedPoorly": "Continuous-corn fit: Poorly Drained Soils",
|
||
"AgronomicCorn:onCorn": "Corn-on-Corn Agronomic Characteristics",
|
||
"Characteristics": "Corn-on-Corn Agronomic Characteristics",
|
||
"RegionRecommended": "Product Fit Geography",
|
||
}
|
||
|
||
# Corn classification of canonical column -> chunker bucket label.
|
||
CORN_DISEASE = {
|
||
"Gray Leaf Spot", "Goss's Wilt", "Northern Corn Leaf Blight", "Tar Spot",
|
||
"Fungicide Response (Absence of Disease in Continuous Corn)",
|
||
}
|
||
CORN_AGRONOMIC = {
|
||
"Emergence", "Seedling Vigor", "Root Strength", "Stalk Strength",
|
||
"Green Snap", "Staygreen", "Drydown", "Drought Tolerance",
|
||
"Plant Height", "Ear Height", "Test Weight",
|
||
}
|
||
# Everything else (RM-to-Silk/Blacklayer, GDU, planting rate, soil fit,
|
||
# product-fit geography, corn-on-corn) is passthrough placement context.
|
||
CORN_PASSTHROUGH = "Planting & Placement"
|
||
|
||
# Canonical soy columns (decoded reversed-label fragments -> key). The
|
||
# soy reconstruction joins the two label words; match on the de-spaced
|
||
# string. Two-word labels like "Maturity Relative" reverse word-order.
|
||
SOY_HEADER_CANON = {
|
||
"MaturityRelative": "Relative Maturity",
|
||
"TraitHerbicide": "Herbicide Trait",
|
||
"Herbicide": "Herbicide Trait",
|
||
"TraitTolerance": "Herbicide Trait",
|
||
"ResistanceSCN": "SCN Resistance Source",
|
||
"Source": "SCN Resistance Source",
|
||
"ChlorosisIron": "Iron Chlorosis (IDC) Tolerance",
|
||
"(IDC)Tolerance": "Iron Chlorosis (IDC) Tolerance",
|
||
"Phytophthora": "Phytophthora", # disambiguated by x-order below
|
||
"GeneResistance": "Phytophthora Gene Resistance",
|
||
"(PRR)RatingField": "Phytophthora (PRR) Field Rating",
|
||
"RotStemBrown": "Brown Stem Rot (BSR)",
|
||
"(BSR)": "Brown Stem Rot (BSR)",
|
||
"MoldWhite": "White Mold (SWM) Tolerance",
|
||
"(SWM)Tolerance": "White Mold (SWM) Tolerance",
|
||
"DeathSudden": "Sudden Death Syndrome (SDS) Tolerance",
|
||
"(SDS)Syndrome": "Sudden Death Syndrome (SDS) Tolerance",
|
||
"Emergence": "Emergence",
|
||
"Standability": "Standability",
|
||
"HeightPlant": "Plant Height for Maturity",
|
||
"Maturityfor": "Plant Height for Maturity",
|
||
"WidthCanopy": "Canopy Width / Plant Type",
|
||
"SoilsVariable": "Variable Soils fit",
|
||
"ProductiveHigh": "High Productive Soil fit",
|
||
"Soil": "High Productive Soil fit",
|
||
"StressandVariableEnvironments": "Stress & Variable Environments fit",
|
||
"EnvironmentsYieldHigh": "High Yield Environments fit",
|
||
"RegionRecommended": "Product Fit Geography",
|
||
"Recommended": "Product Fit Geography",
|
||
"Region": "Product Fit Geography",
|
||
}
|
||
SOY_DISEASE = {
|
||
"SCN Resistance Source", "Iron Chlorosis (IDC) Tolerance",
|
||
"Phytophthora Gene Resistance", "Phytophthora (PRR) Field Rating",
|
||
"Brown Stem Rot (BSR)", "White Mold (SWM) Tolerance",
|
||
"Sudden Death Syndrome (SDS) Tolerance",
|
||
}
|
||
SOY_AGRONOMIC = {
|
||
"Emergence", "Standability", "Plant Height for Maturity",
|
||
"Canopy Width / Plant Type",
|
||
}
|
||
SOY_PASSTHROUGH = "Placement"
|
||
|
||
|
||
def _despace(s: str) -> str:
|
||
return re.sub(r"\s+", "", s or "")
|
||
|
||
|
||
def build_corn_colmap(headers: list[tuple[float, str]]) -> list[tuple[float, str, str]]:
|
||
"""Return ``[(x_center, canonical_key, bucket), ...]`` sorted by x.
|
||
bucket in {"disease","agronomic","pass"}."""
|
||
out: list[tuple[float, str, str]] = []
|
||
for cx, raw in headers:
|
||
key = CORN_HEADER_CANON.get(_despace(raw))
|
||
if not key:
|
||
continue
|
||
if key in CORN_DISEASE:
|
||
bucket = "disease"
|
||
elif key in CORN_AGRONOMIC:
|
||
bucket = "agronomic"
|
||
else:
|
||
bucket = "pass"
|
||
out.append((cx, key, bucket))
|
||
out.sort(key=lambda t: t[0])
|
||
return out
|
||
|
||
|
||
def build_soy_colmap(headers: list[tuple[float, str]]) -> list[tuple[float, str, str]]:
|
||
out: list[tuple[float, str, str]] = []
|
||
seen_phyto = False
|
||
for cx, raw in sorted(headers, key=lambda t: t[0]):
|
||
ds = _despace(raw)
|
||
key = SOY_HEADER_CANON.get(ds)
|
||
if key == "Phytophthora":
|
||
# First "Phytophthora" header = the gene-resistance column;
|
||
# the (PRR)RatingField label sits at the next column.
|
||
key = "Phytophthora Gene Resistance" if not seen_phyto else None
|
||
seen_phyto = True
|
||
if not key:
|
||
continue
|
||
if key in SOY_DISEASE:
|
||
bucket = "disease"
|
||
elif key in SOY_AGRONOMIC:
|
||
bucket = "agronomic"
|
||
else:
|
||
bucket = "pass"
|
||
out.append((cx, key, bucket))
|
||
out.sort(key=lambda t: t[0])
|
||
return out
|
||
|
||
|
||
# --------------------------------------------------------------------- table row parsing
|
||
|
||
|
||
_CORN_CODE = re.compile(r"^(RC\d{4}|[A-Z]\d{2}-\d{2})$")
|
||
_SOY_CODE = re.compile(r"^(IS|RS)\d.*$")
|
||
|
||
|
||
def _nearest_col(cx: float, colmap: list[tuple[float, str, str]],
|
||
tol: float) -> tuple[str, str] | None:
|
||
best = None
|
||
bestd = tol
|
||
for x, key, bucket in colmap:
|
||
d = abs(x - cx)
|
||
if d < bestd:
|
||
bestd = d
|
||
best = (key, bucket)
|
||
return best
|
||
|
||
|
||
def _row_words(page: pdfplumber.page.Page) -> dict[int, list[dict]]:
|
||
"""Group upright words on a page into rows keyed by rounded top."""
|
||
words = [w for w in page.extract_words(use_text_flow=False,
|
||
keep_blank_chars=False,
|
||
extra_attrs=["upright"])
|
||
if w["upright"]]
|
||
rows: dict[int, list[dict]] = defaultdict(list)
|
||
for w in words:
|
||
# snap near-equal tops together
|
||
key = None
|
||
for k in list(rows):
|
||
if abs(k - w["top"]) < 4:
|
||
key = k
|
||
break
|
||
rows[key if key is not None else round(w["top"])].append(w)
|
||
return rows
|
||
|
||
|
||
def parse_corn_table(page: pdfplumber.page.Page,
|
||
colmap: list[tuple[float, str, str]]) -> list[RSVariety]:
|
||
out: list[RSVariety] = []
|
||
rows = _row_words(page)
|
||
for _top, ws in rows.items():
|
||
ws = sorted(ws, key=lambda w: w["x0"])
|
||
if not ws:
|
||
continue
|
||
code = ws[0]["text"]
|
||
if not _CORN_CODE.match(code) or ws[0]["x0"] > 80:
|
||
continue
|
||
cells = ws[1:]
|
||
# A genuine data row leads with the RM integer right after the code.
|
||
nums = [c for c in cells if re.fullmatch(r"-?\d+(?:\.\d+)?", c["text"])]
|
||
if not nums:
|
||
continue
|
||
v = _assemble_corn(code, cells, colmap)
|
||
if v:
|
||
out.append(v)
|
||
return out
|
||
|
||
|
||
def _assemble_corn(code: str, cells: list[dict],
|
||
colmap: list[tuple[float, str, str]]) -> RSVariety | None:
|
||
disease: list[dict] = []
|
||
agronomic: list[dict] = []
|
||
passthrough: list[dict] = []
|
||
rm: int | None = None
|
||
placed = 0
|
||
for c in cells:
|
||
cx = (c["x0"] + c["x1"]) / 2
|
||
# The page-number column sits far right (x > 740) — ignore it.
|
||
if cx > 738:
|
||
continue
|
||
hit = _nearest_col(cx, colmap, tol=13.0)
|
||
if not hit:
|
||
continue
|
||
key, bucket = hit
|
||
val = c["text"].strip()
|
||
if key == "RM":
|
||
try:
|
||
rm = int(float(val))
|
||
except ValueError:
|
||
pass
|
||
continue
|
||
if val in ("", "-"):
|
||
# still record a placeholder so the chunk shows the column
|
||
item = {"characteristic": key, "value": val or "-"}
|
||
else:
|
||
item = {"characteristic": key, "value": val}
|
||
if bucket == "disease":
|
||
disease.append(item)
|
||
elif bucket == "agronomic":
|
||
agronomic.append(item)
|
||
else:
|
||
passthrough.append(item)
|
||
placed += 1
|
||
# A clean corn row aligns into a healthy number of columns; if almost
|
||
# nothing lined up, the caller falls back to a card-only record.
|
||
if placed < 8 or rm is None:
|
||
return None
|
||
groups: list[dict] = []
|
||
if agronomic:
|
||
groups.append({"label": "AGRONOMIC CHARACTERISTICS", "items": agronomic})
|
||
if disease:
|
||
groups.append({"label": "DISEASE RATINGS", "items": disease})
|
||
if passthrough:
|
||
groups.append({"label": CORN_PASSTHROUGH, "items": passthrough})
|
||
return RSVariety(
|
||
source_key=f"robseeco-{code.lower()}",
|
||
crop="corn",
|
||
brand="Rob-See-Co",
|
||
product_name=code,
|
||
relative_maturity=rm,
|
||
groups=groups,
|
||
)
|
||
|
||
|
||
def parse_soy_table(page: pdfplumber.page.Page,
|
||
colmap: list[tuple[float, str, str]]) -> list[RSVariety]:
|
||
out: list[RSVariety] = []
|
||
rows = _row_words(page)
|
||
for _top, ws in rows.items():
|
||
ws = sorted(ws, key=lambda w: w["x0"])
|
||
if not ws:
|
||
continue
|
||
code = ws[0]["text"]
|
||
if not _SOY_CODE.match(code) or ws[0]["x0"] > 80:
|
||
continue
|
||
v = _assemble_soy(code, ws[1:], colmap)
|
||
if v:
|
||
out.append(v)
|
||
return out
|
||
|
||
|
||
def _assemble_soy(code: str, cells: list[dict],
|
||
colmap: list[tuple[float, str, str]]) -> RSVariety | None:
|
||
disease: list[dict] = []
|
||
agronomic: list[dict] = []
|
||
passthrough: list[dict] = []
|
||
mg: float | None = None
|
||
trait: str | None = None
|
||
# Phytophthora gene cells can be two tokens ("Rps1c, Rps3a") split by
|
||
# a space; merge any cell that lands in the same column.
|
||
bycol: dict[str, list[str]] = defaultdict(list)
|
||
colbucket: dict[str, str] = {}
|
||
placed = 0
|
||
for c in cells:
|
||
cx = (c["x0"] + c["x1"]) / 2
|
||
if cx > 738:
|
||
continue
|
||
hit = _nearest_col(cx, colmap, tol=16.0)
|
||
if not hit:
|
||
continue
|
||
key, bucket = hit
|
||
bycol[key].append(c["text"].strip())
|
||
colbucket[key] = bucket
|
||
placed += 1
|
||
for key, parts in bycol.items():
|
||
val = " ".join(p for p in parts if p).strip()
|
||
if key == "Relative Maturity":
|
||
try:
|
||
mg = float(val)
|
||
except ValueError:
|
||
pass
|
||
continue
|
||
if key == "Herbicide Trait":
|
||
trait = val or None
|
||
continue
|
||
item = {"characteristic": key, "value": val or "-"}
|
||
b = colbucket[key]
|
||
if b == "disease":
|
||
disease.append(item)
|
||
elif b == "agronomic":
|
||
agronomic.append(item)
|
||
else:
|
||
passthrough.append(item)
|
||
if placed < 6 or mg is None:
|
||
return None
|
||
brand = "Innotech" if code.upper().startswith("IS") else "Rob-See-Co"
|
||
trait_stack = [trait] if trait else _trait_from_soy_code(code)
|
||
groups: list[dict] = []
|
||
if agronomic:
|
||
groups.append({"label": "AGRONOMIC CHARACTERISTICS", "items": agronomic})
|
||
if disease:
|
||
groups.append({"label": "DISEASE RATINGS", "items": disease})
|
||
if passthrough:
|
||
groups.append({"label": SOY_PASSTHROUGH, "items": passthrough})
|
||
return RSVariety(
|
||
source_key=f"robseeco-{code.lower()}",
|
||
crop="soybeans",
|
||
brand=brand,
|
||
product_name=code,
|
||
maturity_group=mg,
|
||
trait_stack=trait_stack,
|
||
groups=groups,
|
||
)
|
||
|
||
|
||
def _trait_from_soy_code(code: str) -> list[str]:
|
||
"""Fallback trait from the code suffix (E3 / E3S / XF / XFS)."""
|
||
m = re.search(r"(E3S|E3|XFS|XF)$", code.upper())
|
||
if not m:
|
||
return []
|
||
mapping = {"E3": "E3", "E3S": "E3/STS", "XF": "XF", "XFS": "XF/STS"}
|
||
return [mapping[m.group(1)]]
|
||
|
||
|
||
# --------------------------------------------------------------------- card parsing
|
||
|
||
|
||
# Try the hyphenated D94-26 form FIRST so a code like ``N97-55-Organic``
|
||
# splits to base ``N97-55`` + trait ``Organic`` rather than base ``N97``.
|
||
_CARD_CODE = re.compile(r"^([A-Z]\d{2}-\d{2}|[A-Z]{1,3}\d{2,4}[A-Z]?)(?:-(\S+))?$")
|
||
|
||
# Recognised corn trait suffixes — used to filter card-derived traits so a
|
||
# mis-split sibling fragment (e.g. "55-Conv") never leaks into trait_stack.
|
||
_KNOWN_CORN_TRAITS = {
|
||
"Conv", "RR2", "VT2P", "DGVT2P", "GT", "GTA", "GT/LL", "GT/LLA", "SS",
|
||
"SSP", "TRE", "AA", "DV", "D", "V", "VZ", "PCE", "3110A", "Artesian",
|
||
"Organic",
|
||
}
|
||
|
||
|
||
# A soybean card code is the full variety code (IS0325E3 / RS3437XFS) —
|
||
# no trait suffix, so the whole token is the base.
|
||
_SOY_CARD_CODE = re.compile(r"^(IS|RS)\d{2,5}(E3S|E3|XFS|XF)$", re.I)
|
||
|
||
|
||
def _card_base(token: str) -> tuple[str, str | None]:
|
||
"""``RC4185-VT2P`` -> ("RC4185","VT2P"); ``D94-26-VT2P`` -> ("D94-26","VT2P");
|
||
``IS0325E3`` -> ("IS0325E3", None)."""
|
||
if _SOY_CARD_CODE.match(token):
|
||
return token, None
|
||
m = _CARD_CODE.match(token)
|
||
if not m:
|
||
return token, None
|
||
return m.group(1), m.group(2)
|
||
|
||
|
||
def _is_card_code(token: str) -> bool:
|
||
return bool(_SOY_CARD_CODE.match(token) or _CARD_CODE.match(token))
|
||
|
||
|
||
def parse_cards(page: pdfplumber.page.Page, *, ncols: int = 2) -> list[dict]:
|
||
"""Return ``[{bases:set, traits:set, bullets:[str]}, ...]`` for the
|
||
descriptive cards on a card page. Corn cards are a 2-column layout,
|
||
soy cards a 3-column layout — ``ncols`` sets the equal-width column
|
||
split. Within a column a card is a run of size-14 code lines followed
|
||
by its size-9.5 bullet lines (sub-11pt continuation lines fold into
|
||
the preceding bullet)."""
|
||
width = page.width
|
||
bounds = [(width * i / ncols, width * (i + 1) / ncols) for i in range(ncols)]
|
||
words = [w for w in page.extract_words(use_text_flow=False,
|
||
keep_blank_chars=False,
|
||
extra_attrs=["upright", "size"])
|
||
if w["upright"]]
|
||
cards: list[dict] = []
|
||
for lo, hi in bounds:
|
||
colw = [w for w in words if lo <= w["x0"] < hi and 140 < w["top"] < 1180]
|
||
colw.sort(key=lambda w: (round(w["top"]), w["x0"]))
|
||
lines: list[dict] = []
|
||
for w in colw:
|
||
if lines and abs(w["top"] - lines[-1]["top"]) < 5:
|
||
lines[-1]["ws"].append(w)
|
||
else:
|
||
lines.append({"top": w["top"], "ws": [w]})
|
||
cur: dict | None = None
|
||
for ln in lines:
|
||
ws = sorted(ln["ws"], key=lambda w: w["x0"])
|
||
txt = " ".join(w["text"] for w in ws).strip()
|
||
sz = max(w["size"] for w in ws)
|
||
collapsed = txt.replace(" ", "")
|
||
if sz >= 13 and _is_card_code(collapsed) and len(collapsed) <= 20:
|
||
if cur is None or cur["bullets"]:
|
||
cur = {"bases": set(), "traits": set(), "bullets": []}
|
||
cards.append(cur)
|
||
base, trait = _card_base(collapsed)
|
||
cur["bases"].add(base)
|
||
if trait:
|
||
cur["traits"].add(trait)
|
||
elif txt.startswith("•"):
|
||
if cur is not None:
|
||
cur["bullets"].append(_clean_bullet(txt.lstrip("• ")))
|
||
elif sz < 11 and cur is not None and cur["bullets"]:
|
||
cur["bullets"][-1] = _clean_bullet(cur["bullets"][-1] + " " + txt)
|
||
return cards
|
||
|
||
|
||
def _clean_bullet(s: str) -> str:
|
||
s = re.sub(r"\s+", " ", (s or "")).strip()
|
||
s = re.sub(r"\s*®", "®", s) # "Artesian ®" -> "Artesian®"
|
||
s = re.sub(r"\s*\bNEW\b\s*$", "", s).strip() # trailing NEW badge
|
||
return s
|
||
|
||
|
||
def build_card_groups(pdf: pdfplumber.PDF, pages: range, *, ncols: int = 2) -> list[dict]:
|
||
"""Return the list of distinct card groups across the card pages,
|
||
each ``{bases:set(UPPER), traits:set, bullets:[...]}``. Pages are
|
||
deduped by their reconstructed card signature so the duplicated page
|
||
copy doesn't double the cards. A card can group several sibling codes
|
||
for one hybrid (e.g. legacy A94-16 / G94-86 alongside the table's
|
||
D94-26) — keeping the grouping lets the caller fold those into the
|
||
table record instead of minting duplicate identities."""
|
||
groups: list[dict] = []
|
||
seen_sig: set[tuple] = set()
|
||
for pi in pages:
|
||
cards = parse_cards(pdf.pages[pi], ncols=ncols)
|
||
sig = tuple(sorted(b for c in cards for b in c["bases"]))
|
||
if not sig or sig in seen_sig:
|
||
continue
|
||
seen_sig.add(sig)
|
||
for c in cards:
|
||
if not c["bases"]:
|
||
continue
|
||
groups.append({
|
||
"bases": {b.upper() for b in c["bases"]},
|
||
"traits": set(c["traits"]),
|
||
"bullets": [b for b in c["bullets"] if b],
|
||
})
|
||
return groups
|
||
|
||
|
||
# --------------------------------------------------------------------- extraction orchestration
|
||
|
||
|
||
# Corn trait suffix ordering preference for a stable trait_stack.
|
||
_CORN_TRAIT_ORDER = ["Conv", "RR2", "VT2P", "DGVT2P", "GT", "GTA", "GT/LL",
|
||
"GT/LLA", "SS", "SSP", "TRE", "AA", "DV", "D", "V", "VZ",
|
||
"PCE", "3110A", "Artesian", "Organic"]
|
||
|
||
|
||
def _sorted_corn_traits(traits: set[str]) -> list[str]:
|
||
order = {t: i for i, t in enumerate(_CORN_TRAIT_ORDER)}
|
||
clean = {t for t in traits if t in _KNOWN_CORN_TRAITS}
|
||
return sorted(clean, key=lambda t: (order.get(t, 999), t))
|
||
|
||
|
||
def _group_for_base(groups: list[dict], base: str) -> dict | None:
|
||
for g in groups:
|
||
if base in g["bases"]:
|
||
return g
|
||
return None
|
||
|
||
|
||
def extract(pdf: pdfplumber.PDF, *, only_crop: str | None) -> list[RSVariety]:
|
||
varieties: dict[str, RSVariety] = {}
|
||
|
||
# ---- Corn ----------------------------------------------------------
|
||
if only_crop in (None, "corn"):
|
||
corn_groups = build_card_groups(pdf, CORN_CARD_PAGES)
|
||
table_bases: set[str] = set()
|
||
# Pass 1: table records (authoritative ratings), enriched from the
|
||
# card group that contains the base.
|
||
for pi in CORN_TABLE_PAGES:
|
||
page = pdf.pages[pi]
|
||
headers = reconstruct_rotated_headers(page, top_max=320)
|
||
colmap = build_corn_colmap(headers)
|
||
if len(colmap) < 12:
|
||
log.warning("p%d corn header reconstruction thin (%d cols) — skipping",
|
||
pi, len(colmap))
|
||
continue
|
||
for v in parse_corn_table(page, colmap):
|
||
if v.source_key in varieties:
|
||
continue
|
||
base = v.product_name.upper()
|
||
g = _group_for_base(corn_groups, base)
|
||
if g:
|
||
v.trait_stack = _sorted_corn_traits(g["traits"])
|
||
v.strengths = list(g["bullets"])
|
||
varieties[v.source_key] = v
|
||
table_bases.add(base)
|
||
# Pass 2: card-only fallback — one record per card group that has
|
||
# NO table-backed sibling. Sibling legacy codes that share a card
|
||
# with a table hybrid fold into that record (no duplicate identity).
|
||
for g in corn_groups:
|
||
if g["bases"] & table_bases or not g["bullets"]:
|
||
continue
|
||
# Pick a deterministic primary base (prefer an RC#### code).
|
||
cand = sorted(b for b in g["bases"] if _CORN_CODE.match(b))
|
||
if not cand:
|
||
continue
|
||
primary = sorted(cand, key=lambda b: (not b.startswith("RC"), b))[0]
|
||
sk = f"robseeco-{primary.lower()}"
|
||
if sk in varieties:
|
||
continue
|
||
varieties[sk] = RSVariety(
|
||
source_key=sk, crop="corn", brand="Rob-See-Co",
|
||
product_name=primary,
|
||
trait_stack=_sorted_corn_traits(g["traits"]),
|
||
strengths=list(g["bullets"]), from_table=False)
|
||
|
||
# ---- Soy -----------------------------------------------------------
|
||
if only_crop in (None, "soybeans"):
|
||
soy_groups = build_card_groups(pdf, SOY_CARD_PAGES, ncols=3)
|
||
soy_table_bases: set[str] = set()
|
||
for pi in SOY_TABLE_PAGES:
|
||
page = pdf.pages[pi]
|
||
headers = reconstruct_rotated_headers(page, top_max=330)
|
||
colmap = build_soy_colmap(headers)
|
||
if len(colmap) < 8:
|
||
continue
|
||
for v in parse_soy_table(page, colmap):
|
||
if v.source_key in varieties:
|
||
continue
|
||
g = _group_for_base(soy_groups, v.product_name.upper())
|
||
if g:
|
||
v.strengths = list(g["bullets"])
|
||
varieties[v.source_key] = v
|
||
soy_table_bases.add(v.product_name.upper())
|
||
for g in soy_groups:
|
||
if g["bases"] & soy_table_bases or not g["bullets"]:
|
||
continue
|
||
cand = sorted(b for b in g["bases"] if _SOY_CODE.match(b))
|
||
if not cand:
|
||
continue
|
||
primary = cand[0]
|
||
sk = f"robseeco-{primary.lower()}"
|
||
if sk in varieties:
|
||
continue
|
||
brand = "Innotech" if primary.startswith("IS") else "Rob-See-Co"
|
||
varieties[sk] = RSVariety(
|
||
source_key=sk, crop="soybeans", brand=brand, product_name=primary,
|
||
trait_stack=_trait_from_soy_code(primary),
|
||
strengths=list(g["bullets"]), from_table=False)
|
||
|
||
return list(varieties.values())
|
||
|
||
|
||
# --------------------------------------------------------------------- render
|
||
|
||
|
||
def render_markdown(v: RSVariety) -> str:
|
||
crop_label = "Corn" if v.crop == "corn" else "Soybeans"
|
||
head: list[str] = [
|
||
f"# {v.product_name}",
|
||
"",
|
||
"- **Vendor:** RobSeeCo",
|
||
f"- **Brand:** {v.brand}",
|
||
f"- **Crop:** {crop_label}",
|
||
]
|
||
if v.crop == "corn" and v.relative_maturity is not None:
|
||
head.append(f"- **Relative maturity:** {v.relative_maturity} days")
|
||
if v.crop == "soybeans" and v.maturity_group is not None:
|
||
head.append(f"- **Maturity group:** {v.maturity_group}")
|
||
if v.trait_stack:
|
||
head.append(f"- **Trait(s):** {', '.join(v.trait_stack)}")
|
||
head.append(f"- **Source:** {PRODUCTS_URL}")
|
||
head.append(f"- **Rating scale:** {SCALE_DIRECTION}")
|
||
head.append("- **Service area:** RobSeeCo dealer network — Western/Central "
|
||
"Corn Belt + Dakotas (Eastern Corn Belt via Federal Hybrids/Kiser Seed)")
|
||
head.append("")
|
||
if not v.from_table:
|
||
head += ["_Identity from the descriptive card; no structured ratings table "
|
||
"row matched._", ""]
|
||
head += ["---", ""]
|
||
for g in v.groups:
|
||
head.append(f"## {g['label'].title()}")
|
||
head.append("")
|
||
for it in g["items"]:
|
||
head.append(f"- **{it['characteristic']}:** {it['value'] or '—'}")
|
||
head.append("")
|
||
if v.strengths:
|
||
head += ["## Strengths", ""]
|
||
for s in v.strengths:
|
||
head.append(f"- {s}")
|
||
head.append("")
|
||
return "\n".join(head)
|
||
|
||
|
||
def write_variety(v: RSVariety) -> None:
|
||
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
||
(CORPUS_DIR / f"{v.source_key}.md").write_text(render_markdown(v), encoding="utf-8")
|
||
sidecar = {
|
||
"source": "robseeco",
|
||
"source_key": v.source_key,
|
||
"vendor": "RobSeeCo",
|
||
"brand": v.brand,
|
||
"product_name": v.product_name,
|
||
"product_id": v.product_name,
|
||
"crop": v.crop,
|
||
"release_year": None,
|
||
"relative_maturity": v.relative_maturity,
|
||
"maturity_group": v.maturity_group,
|
||
"wheat_class": None,
|
||
"trait_stack": v.trait_stack,
|
||
"trait_descriptions": [],
|
||
"positioning_statement": v.positioning,
|
||
"strengths": v.strengths,
|
||
"characteristics_groups": v.groups,
|
||
"_scale_direction": SCALE_DIRECTION,
|
||
"regional_recommendations": REGIONAL_REC,
|
||
"image_url": None,
|
||
"source_urls": [PRODUCTS_URL],
|
||
"sitemap_last_modified": None,
|
||
"fetched_at": datetime.now(timezone.utc).isoformat(),
|
||
"scraper_version": SCRAPER_VERSION,
|
||
}
|
||
(CORPUS_DIR / f"{v.source_key}.json").write_text(
|
||
json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
|
||
|
||
|
||
# --------------------------------------------------------------------- pipeline
|
||
|
||
|
||
def run(*, limit: int | None, force: bool,
|
||
only_crop: str | None, only_product: str | None) -> int:
|
||
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
||
http = RateLimitedSession()
|
||
pdf_path = fetch_pdf(http, force=force)
|
||
|
||
with pdfplumber.open(pdf_path) as pdf:
|
||
varieties = extract(pdf, only_crop=only_crop)
|
||
|
||
if only_product:
|
||
key = only_product.lower()
|
||
varieties = [v for v in varieties
|
||
if v.source_key == key or v.product_name.lower() == key]
|
||
if not varieties:
|
||
log.error("no variety matched --product=%s", only_product)
|
||
return 2
|
||
|
||
varieties.sort(key=lambda v: (v.crop, v.brand, v.source_key))
|
||
|
||
counts = {"written": 0, "skipped": 0, "card_only": 0,
|
||
"corn": 0, "soybeans": 0}
|
||
processed = 0
|
||
for v in varieties:
|
||
if limit is not None and processed >= limit:
|
||
break
|
||
processed += 1
|
||
md_path = CORPUS_DIR / f"{v.source_key}.md"
|
||
if md_path.exists() and not force:
|
||
counts["skipped"] += 1
|
||
log.info("[%d/%d] %s skipped", processed, len(varieties), v.source_key)
|
||
continue
|
||
write_variety(v)
|
||
counts["written"] += 1
|
||
counts[v.crop] += 1
|
||
if not v.from_table:
|
||
counts["card_only"] += 1
|
||
log.info("[%d/%d] %s written | crop=%s brand=%s rm/mg=%s traits=%s "
|
||
"groups=%d strengths=%d%s",
|
||
processed, len(varieties), v.source_key, v.crop, v.brand,
|
||
v.relative_maturity if v.crop == "corn" else v.maturity_group,
|
||
",".join(v.trait_stack) or "-", len(v.groups), len(v.strengths),
|
||
" [CARD-ONLY]" if not v.from_table else "")
|
||
|
||
log.info("done: processed=%d written=%d skipped=%d card_only=%d "
|
||
"| corn=%d soybeans=%d (of %d)",
|
||
processed, counts["written"], counts["skipped"], counts["card_only"],
|
||
counts["corn"], counts["soybeans"], len(varieties))
|
||
return 0
|
||
|
||
|
||
# --------------------------------------------------------------------- CLI
|
||
|
||
|
||
def _build_argparser() -> argparse.ArgumentParser:
|
||
p = argparse.ArgumentParser(
|
||
prog="scrape.sources.robseeco",
|
||
description="Scrape the RobSeeCo (Rob-See-Co / Innotech) 2026 Seed Guide "
|
||
"PDF — corn + soybean variety identity + 1-9 ratings.")
|
||
p.add_argument("--limit", type=int, default=None,
|
||
help="Stop after writing N varieties (default: all).")
|
||
p.add_argument("--force", action="store_true",
|
||
help="Re-download the PDF and re-write all variety files.")
|
||
p.add_argument("--crop", default=None, choices=["corn", "soybeans"],
|
||
help="Limit to one crop.")
|
||
p.add_argument("--product", default=None,
|
||
help="Process a single variety by source_key or code.")
|
||
p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"))
|
||
return p
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
args = _build_argparser().parse_args(argv)
|
||
logging.basicConfig(
|
||
level=args.log_level.upper(),
|
||
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
||
stream=sys.stderr)
|
||
return run(limit=args.limit, force=args.force,
|
||
only_crop=args.crop, only_product=args.product)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|