gh_plot_reports corpus (4,299 plots) + concurrency + 4-GPU pool
CORPUS — 4,299 GH plot reports added (3,797 written + 502 from the
earlier slow run + 319 sitemap-listed URLs that 404'd as
discontinued). Combined with prior 760 varieties + 14 AgriPro
trials = 5,073 total chunks now indexed.
scrape/sources/gh_plot_reports.py — concurrency speedup:
- 4 worker threads (ThreadPoolExecutor), each with its own
requests.Session for connection-pool efficiency.
- Shared class-level rate limiter (0.25 sec between ANY two
requests across all threads). Net throughput ~4 req/sec —
well below any rate-limit threshold a public site enforces.
- Diagnosis vs original 1 req/sec: GH had ZERO rate limiting,
zero 429s, zero retries. The 1 sec self-throttle was just too
conservative. Bench:
1 worker / 1.0 sec throttle: ~0.4 plots/sec (190 min ETA)
4 workers / 0.25 sec throttle: ~3 plots/sec (~25 min actual)
rag/chunk.py — chunk size cap for nomic-embed-text's 2048-token
context window:
- Empirically tested: failure threshold is ~5,250 chars on
numeric-heavy trial chunks (chars/token ratio 2.4 vs 3.5 for
prose). Cap at 4,500 chars to be safely under at worst-case
2.2 chars/token.
- Applied to BOTH variety and trial chunks. Marked truncated
chunks with metadata.embed_truncated = True; FULL text stays
in the on-disk .md for get_page to return verbatim.
.gitea/workflows/{refresh,image-only}.yml — OLLAMA_URL pool
restructured for the 4 GPU-pinned endpoints. Bench (50-chunk
batches on nomic-embed-text):
.0.125:11434 (RTX 40-series) 242 embeds/sec ← weight ×4
.0.2:11436 (GPU-pinned) 108 embeds/sec ← weight ×2
.0.2:11435 (GPU-pinned) 72 embeds/sec ← weight ×1
localhost (TITAN X) 37 embeds/sec ← weight ×1
Weighting is done by listing the URL multiple times in
OLLAMA_URL since the embedder uses round-robin. .0.2:11434 is
explicitly EXCLUDED — it isn't pinned to a specific GPU.
Combined index rebuild for 5,073 chunks now finishes in ~3 min
(was 19+ on the single-endpoint pool).
Smoke tests:
✓ list_versions: 5,073 docs across 6 sources, 2 vendors, 6
brands, 4 crops (corn 2711, soy 2016, silage 223, wheat 123).
✓ search_trials({crop=corn, state=IA, year=2024}): 3 IA 2024
corn trials surfaced.
✓ search_trials("Phytophthora resistance soybean trial"): NK
NK43-W1XFS top-1 in LA 2024 trial (cross-vendor result).
✓ search_trials("AP Iliad Idaho wheat"): AgriPro Washington/N
Idaho 2025 trial surfaced.
✓ search_trials(product=DKC65-95): 3 corn trials containing
that hybrid in IL/IA 2024.
✓ search_trials(product=NK1701): 3 corn trials in AR/MS 2024.
✓ Product filter correctly returns EMPTY for products that
aren't in the corpus (DKC65-20 is a 2023 product; 2023 plots
deferred). Anti-hallucination contract preserved.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -61,7 +61,9 @@ import os
|
||||
import random
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
@@ -79,7 +81,13 @@ REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||
CORPUS_ROOT = Path(os.environ.get("CORPUS_ROOT") or REPO_ROOT / "corpus")
|
||||
CORPUS_DIR = CORPUS_ROOT / "gh_plot_reports"
|
||||
|
||||
REQ_INTERVAL_SEC = 1.0
|
||||
# 0.25 sec between any two requests in the WHOLE process (shared
|
||||
# across worker threads). The site's pages take ~1.5 sec to serve on
|
||||
# their own; combined with concurrent workers this gives ~4 req/sec
|
||||
# net — polite by any normal standard, and the GH plot-reports
|
||||
# scrape on 4,600 docs finishes in ~20 min instead of ~3 hours.
|
||||
REQ_INTERVAL_SEC = 0.25
|
||||
DEFAULT_WORKERS = 4
|
||||
|
||||
log = logging.getLogger("scrape.gh_plot_reports")
|
||||
|
||||
@@ -108,17 +116,35 @@ STATE_NAMES = {
|
||||
|
||||
|
||||
class RateLimitedSession:
|
||||
"""Thread-safe rate-limited requests.Session wrapper.
|
||||
|
||||
The lock + last-request timestamp are class-level so multiple
|
||||
sessions (one per worker thread) share the same global interval.
|
||||
Each thread has its own requests.Session for connection-pool
|
||||
efficiency, but they all coordinate on the request-cadence
|
||||
floor.
|
||||
"""
|
||||
|
||||
_lock = threading.Lock()
|
||||
_last_global: float = 0.0
|
||||
_global_interval: float = REQ_INTERVAL_SEC
|
||||
|
||||
def __init__(self, interval: float = REQ_INTERVAL_SEC) -> None:
|
||||
self.s = requests.Session()
|
||||
self.s.headers["User-Agent"] = USER_AGENT
|
||||
self.interval = interval
|
||||
self._last = 0.0
|
||||
# Set the class-level interval to the most-restrictive caller
|
||||
# (so a 0.25s caller can't be overridden by a later 1.0s
|
||||
# caller starting another scraper in the same process).
|
||||
with RateLimitedSession._lock:
|
||||
if interval > RateLimitedSession._global_interval:
|
||||
RateLimitedSession._global_interval = interval
|
||||
|
||||
def _wait(self) -> None:
|
||||
delta = time.monotonic() - self._last
|
||||
if delta < self.interval:
|
||||
time.sleep(self.interval - delta)
|
||||
self._last = time.monotonic()
|
||||
with RateLimitedSession._lock:
|
||||
delta = time.monotonic() - RateLimitedSession._last_global
|
||||
if delta < RateLimitedSession._global_interval:
|
||||
time.sleep(RateLimitedSession._global_interval - delta)
|
||||
RateLimitedSession._last_global = time.monotonic()
|
||||
|
||||
def request(
|
||||
self,
|
||||
@@ -692,9 +718,9 @@ def run(
|
||||
only_state: str | None,
|
||||
only_year: int | None,
|
||||
include_2023: bool,
|
||||
workers: int = DEFAULT_WORKERS,
|
||||
) -> int:
|
||||
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
http = RateLimitedSession()
|
||||
|
||||
crops = {only_crop} if only_crop else {"corn", "soybeans", "silage"}
|
||||
states = {only_state} if only_state else None
|
||||
@@ -705,32 +731,68 @@ def run(
|
||||
else:
|
||||
years = {2024, 2025}
|
||||
|
||||
targets = discover_plots(http, crops=crops, states=states, years=years)
|
||||
# One shared session for sitemap walk (single-threaded).
|
||||
discovery_http = RateLimitedSession()
|
||||
targets = discover_plots(discovery_http, crops=crops, states=states, years=years)
|
||||
if limit is not None:
|
||||
targets = targets[:limit]
|
||||
|
||||
counts = {"written": 0, "skipped": 0, "missing": 0, "failed": 0}
|
||||
processed = 0
|
||||
for url, crop, state, year, plot_id in targets:
|
||||
if limit is not None and processed >= limit:
|
||||
break
|
||||
processed += 1
|
||||
status, prod = process_plot(
|
||||
http, url=url, crop=crop, state=state, year=year,
|
||||
counts_lock = threading.Lock()
|
||||
processed_counter = {"n": 0}
|
||||
total = len(targets)
|
||||
|
||||
# One requests.Session per worker thread — they share the
|
||||
# class-level rate limiter (REQ_INTERVAL_SEC between any two
|
||||
# requests across all threads), but each has its own HTTP
|
||||
# connection pool.
|
||||
thread_local = threading.local()
|
||||
|
||||
def _session() -> RateLimitedSession:
|
||||
s = getattr(thread_local, "session", None)
|
||||
if s is None:
|
||||
s = RateLimitedSession()
|
||||
thread_local.session = s
|
||||
return s
|
||||
|
||||
def _worker(target: tuple[str, str, str, int, str]) -> tuple[str, Any]:
|
||||
url, crop, state, year, plot_id = target
|
||||
return process_plot(
|
||||
_session(), url=url, crop=crop, state=state, year=year,
|
||||
plot_id=plot_id, force=force,
|
||||
)
|
||||
counts[status] = counts.get(status, 0) + 1
|
||||
if prod is not None and processed <= 5 or processed % 100 == 0:
|
||||
log.info(
|
||||
"[%d/%s] %s %s | results=%d coop=%s",
|
||||
processed, str(limit) if limit else len(targets),
|
||||
source_key_for(crop, state, year, plot_id), status,
|
||||
len(prod.results) if prod else 0,
|
||||
(prod.cooperator if prod else "-") or "-",
|
||||
)
|
||||
|
||||
log.info("dispatching %d plots across %d workers (shared rate limiter %.2f sec/req)",
|
||||
total, workers, REQ_INTERVAL_SEC)
|
||||
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||||
futures = {pool.submit(_worker, t): t for t in targets}
|
||||
for fut in as_completed(futures):
|
||||
target = futures[fut]
|
||||
url, crop, state, year, plot_id = target
|
||||
try:
|
||||
status, prod = fut.result()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.error("worker failed for %s: %s", url, exc)
|
||||
status, prod = "failed", None
|
||||
|
||||
with counts_lock:
|
||||
counts[status] = counts.get(status, 0) + 1
|
||||
processed_counter["n"] += 1
|
||||
n = processed_counter["n"]
|
||||
|
||||
if (prod is not None and n <= 5) or n % 100 == 0 or status == "failed":
|
||||
log.info(
|
||||
"[%d/%d] %s %s | results=%d coop=%s",
|
||||
n, total,
|
||||
source_key_for(crop, state, year, plot_id), status,
|
||||
len(prod.results) if prod else 0,
|
||||
(prod.cooperator if prod else "-") or "-",
|
||||
)
|
||||
|
||||
log.info(
|
||||
"done: processed=%d written=%d skipped=%d missing=%d failed=%d (of %d candidates)",
|
||||
processed, counts["written"], counts["skipped"],
|
||||
counts["missing"], counts["failed"], len(targets),
|
||||
processed_counter["n"], counts["written"], counts["skipped"],
|
||||
counts["missing"], counts["failed"], total,
|
||||
)
|
||||
return 0 if counts["failed"] == 0 else 1
|
||||
|
||||
@@ -756,6 +818,9 @@ def _build_argparser() -> argparse.ArgumentParser:
|
||||
help="Limit to one year.")
|
||||
p.add_argument("--include-2023", action="store_true",
|
||||
help="Include 2023 plot reports (default: 2024-2025 only).")
|
||||
p.add_argument("--workers", type=int, default=DEFAULT_WORKERS,
|
||||
help=f"Concurrent worker threads (default {DEFAULT_WORKERS}, "
|
||||
f"all share a global {REQ_INTERVAL_SEC}-sec rate limiter).")
|
||||
p.add_argument("--log-level", default=os.environ.get("LOG_LEVEL", "INFO"))
|
||||
return p
|
||||
|
||||
@@ -774,6 +839,7 @@ def main(argv: list[str] | None = None) -> int:
|
||||
only_state=args.state.lower() if args.state else None,
|
||||
only_year=args.year,
|
||||
include_2023=args.include_2023,
|
||||
workers=args.workers,
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user