Files
crop-chem-docs/rag/index.py
T
justin 38141c362e Phase 2: chunking + parallel Ollama embeddings + Chroma + BM25 indexes
End-to-end RAG pipeline for the pesticide-labels corpus. From the
4,066 labels on USB, the indexer produces 216,467 chunks, embeds
them via N parallel Ollama endpoints, upserts to Chroma, and builds
a BM25 lexical index.

## Files

- rag/index.py: adapted to labels schema (source / source_key /
  epa_reg_no / product_name / product_class / registrant /
  signal_word / active_ingredients flattened for Chroma where-filter);
  honors PPLS_CORPUS_ROOT (corpus on USB) and PPLS_CHROMA_DIR;
  upsert batch size auto-tuned to 64 * N URLs; --limit + --source
  flags for incremental work.
- rag/chunk.py: label-aware. ALL-CAPS section heading detector
  (heuristic) for EPA labels alongside markdown `#` headings.
  TARGET_CHARS=2000 (~500 tokens), MAX_CHUNK_CHARS=4000 (~1000
  tokens) hard cap with _force_split sentence/char fallback to
  defend against monolithic crop+rate tables. Chunk 0 is a synthetic
  anchor with product name, EPA Reg No, registrant, signal word,
  product class, active ingredients + keyword bag for joint
  dense/BM25 retrieval.
- rag/embeddings.py: parallel-dispatch across N Ollama URLs via
  ThreadPoolExecutor. Each __call__ stride-slices input into N
  shards, fires N concurrent HTTP requests, joins in original order.
  Bisect-resilient on 400 (context-length): recursively splits the
  failing shard down to single doc, logs+drops single bad doc with
  zero-vector placeholder so Chroma upsert never sees a gap. Real
  HTTP/connection errors still propagate.
- requirements.txt: chromadb already pinned via template.

## Run

  PPLS_CORPUS_ROOT=/run/media/justin/USB/ppls-corpus \
    OLLAMA_URL=http://host1:11434,http://host2:11434,...  \
    PRODUCT_NAME=ppls \
    python -m rag.index --rebuild

## Build stats

  - 216,467 chunks across 4,066 labels (~53 chunks/label avg)
  - Wall time: 75.7 min on 4 parallel GPU-backed Ollama endpoints
    (Bayer-Crop / BASF / Corteva / FMC / Nufarm / Syngenta / etc.
    chemistry; production Ollama on trashpanda + 2× 192.168.0.2 +
    1× Windows 192.168.0.125)
  - 473 bisect-drops (0.22%) — all from monolithic-table sections
    in 1970s-90s scanned PDFs whose pypdf extracts tokenized past
    the model's context. Acceptable; the dropped chunks were
    garbled OCR with no useful content.
  - Chroma: 2.2 GB persistent SQLite at ./chroma/
  - BM25: 416 MB SQLite FTS5 at ./bm25/ppls_docs.db

## Smoke-test queries (top-3 dense-only)

  "what can I spray on soybeans to control waterhemp"
    → Rage (glyphosate+carfentrazone), Sencor (metribuzin)
  "REI for dicamba on corn"
    → Nufarm Credit (DICAMBA tank-mix restrictions section)
  "fungicide for wheat head scab"
    → MCW 710 SC (azoxystrobin+tebuconazole), Sercadis (fluxapyroxad)

Distances 0.16-0.23. Dense-only quality is OK-not-great in spots
(exactly the failure mode Phase 6 reranker + Phase 8 hybrid BM25
fusion address).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 09:56:49 -04:00

196 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Build Chroma (and optionally BM25) indexes from the labels corpus.
Reads `corpus/<source>/<source_key>.{md,json}`, chunks each label, upserts
into Chroma. With --rebuild, drops + recreates the collection (clean
state). With --bm25-only, skips Chroma and rebuilds only the FTS5
index — useful for fast iteration when chunking didn't change.
The corpus root honors PPLS_CORPUS_ROOT (matching the scrapers).
The Chroma + BM25 stores stay at the repo root because both rely on
filesystem locking semantics that vfat (typical USB drive) doesn't
provide reliably.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import time
from pathlib import Path
from typing import Iterator
import chromadb
from chromadb.config import Settings
from .chunk import chunks_from_label
from .embeddings import embedding_function
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
REPO_ROOT = Path(__file__).resolve().parent.parent
CORPUS_ROOT = Path(os.environ.get("PPLS_CORPUS_ROOT") or REPO_ROOT / "corpus")
CHROMA_DIR = Path(os.environ.get("PPLS_CHROMA_DIR") or REPO_ROOT / "chroma")
# Collection name — convention: <product>_docs. Override via env.
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "ppls")
COLLECTION = f"{PRODUCT_NAME}_docs"
def _extract_label_metadata(sidecar: dict, source: str, source_key: str) -> dict:
"""Flatten the canonical labels sidecar into a Chroma-friendly metadata
dict (Chroma requires str/int/float/bool values, no nesting/lists)."""
label = sidecar.get("label") or {}
actives = ", ".join(
a["name"] for a in (sidecar.get("active_ingredients") or [])
if isinstance(a, dict) and a.get("name")
)
return {
"source": sidecar.get("source") or source,
"source_key": sidecar.get("source_key") or source_key,
"epa_reg_no": sidecar.get("epa_reg_no") or "",
"product_name": sidecar.get("product_name") or "",
"product_class": sidecar.get("product_class") or "",
"registrant": sidecar.get("registrant") or "",
"signal_word": sidecar.get("signal_word") or "",
"active_ingredients": actives,
"label_url": label.get("url") or "",
"label_accepted_date": label.get("accepted_date") or "",
}
def label_chunks() -> Iterator[dict]:
"""Walk the corpus and yield one chunk dict per chunk per label."""
if not CORPUS_ROOT.exists():
log.error("corpus root %s doesn't exist; run a scraper first", CORPUS_ROOT)
return
sources_seen = 0
labels_seen = 0
for source_dir in sorted(CORPUS_ROOT.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
sources_seen += 1
source = source_dir.name
for md_path in sorted(source_dir.glob("*.md")):
source_key = md_path.stem
sidecar_path = md_path.with_suffix(".json")
if not sidecar_path.exists():
log.warning("skipping %s — no JSON sidecar", md_path)
continue
try:
md = md_path.read_text(encoding="utf-8")
sidecar = json.loads(sidecar_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
log.warning("skipping %s — read error: %s", md_path, exc)
continue
base_meta = _extract_label_metadata(sidecar, source, source_key)
labels_seen += 1
yield from chunks_from_label(md, sidecar, base_meta)
log.info("walked %d source(s), %d label(s)", sources_seen, labels_seen)
def upsert_to_chroma(records: list[dict], *, rebuild: bool) -> int:
client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
if rebuild:
try:
client.delete_collection(COLLECTION)
log.info("dropped existing collection %r", COLLECTION)
except Exception:
pass
col = client.get_or_create_collection(
COLLECTION, embedding_function=embedding_function()
)
# Match Chroma upsert batch size to the number of parallel Ollama
# endpoints so each one gets a meaningful per-call shard (~64 docs).
# Overridable via env for tuning.
n_urls = max(1, len([u for u in os.environ.get("OLLAMA_URL",
"http://localhost:11434").split(",") if u.strip()]))
BATCH = int(os.environ.get("INDEX_BATCH") or 64 * n_urls)
log.info("upsert batch size: %d (%d URL(s) × 64)", BATCH, n_urls)
total = 0
for i in range(0, len(records), BATCH):
batch = records[i:i + BATCH]
col.upsert(
ids=[r["id"] for r in batch],
documents=[r["text"] for r in batch],
metadatas=[r["metadata"] for r in batch],
)
total += len(batch)
if total % 1024 == 0 or total == len(records):
log.info("upserted %d / %d chunks", total, len(records))
return total
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--rebuild", action="store_true",
help="Drop and recreate the Chroma collection.")
p.add_argument("--bm25-only", action="store_true",
help="Rebuild only the BM25 index, skip Chroma.")
p.add_argument("--limit", type=int, default=None,
help="Limit to N labels (smoke testing).")
p.add_argument("--source", action="append",
help="Restrict to one or more source dirs (repeatable).")
p.add_argument("--bm25-db", type=Path,
default=REPO_ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db",
help="Path to the BM25 sqlite db.")
args = p.parse_args()
log.info("corpus root: %s", CORPUS_ROOT)
log.info("chroma dir: %s", CHROMA_DIR)
log.info("collection: %s", COLLECTION)
t0 = time.time()
records = []
label_count = 0
last_label_key: str | None = None
for rec in label_chunks():
if args.source and rec["metadata"]["source"] not in args.source:
continue
if args.limit:
key = (rec["metadata"]["source"], rec["metadata"]["source_key"])
if key != last_label_key:
if label_count >= args.limit:
break
label_count += 1
last_label_key = key
records.append(rec)
log.info("loaded %d chunks from %d label(s) in %.1fs",
len(records), label_count or "(all)", time.time() - t0)
if args.bm25_only:
from .bm25 import BM25Index
log.info("--bm25-only: building FTS5 only")
args.bm25_db.parent.mkdir(parents=True, exist_ok=True)
BM25Index(args.bm25_db).build(records)
return 0
if not args.rebuild:
log.info("no --rebuild; nothing to do. (Use --rebuild to upsert.)")
return 0
t_c = time.time()
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
n = upsert_to_chroma(records, rebuild=True)
log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c)
# Build BM25 too — see PLAN.md Phase 8.
try:
from .bm25 import BM25Index
t_b = time.time()
args.bm25_db.parent.mkdir(parents=True, exist_ok=True)
BM25Index(args.bm25_db).build(records)
log.info("bm25 done in %.1fs", time.time() - t_b)
except ImportError:
log.info("rag.bm25 not available — skipping BM25 build")
return 0
if __name__ == "__main__":
raise SystemExit(main())