Phase 2: chunking + parallel Ollama embeddings + Chroma + BM25 indexes

End-to-end RAG pipeline for the pesticide-labels corpus. From the
4,066 labels on USB, the indexer produces 216,467 chunks, embeds
them via N parallel Ollama endpoints, upserts to Chroma, and builds
a BM25 lexical index.

## Files

- rag/index.py: adapted to labels schema (source / source_key /
  epa_reg_no / product_name / product_class / registrant /
  signal_word / active_ingredients flattened for Chroma where-filter);
  honors PPLS_CORPUS_ROOT (corpus on USB) and PPLS_CHROMA_DIR;
  upsert batch size auto-tuned to 64 * N URLs; --limit + --source
  flags for incremental work.
- rag/chunk.py: label-aware. ALL-CAPS section heading detector
  (heuristic) for EPA labels alongside markdown `#` headings.
  TARGET_CHARS=2000 (~500 tokens), MAX_CHUNK_CHARS=4000 (~1000
  tokens) hard cap with _force_split sentence/char fallback to
  defend against monolithic crop+rate tables. Chunk 0 is a synthetic
  anchor with product name, EPA Reg No, registrant, signal word,
  product class, active ingredients + keyword bag for joint
  dense/BM25 retrieval.
- rag/embeddings.py: parallel-dispatch across N Ollama URLs via
  ThreadPoolExecutor. Each __call__ stride-slices input into N
  shards, fires N concurrent HTTP requests, joins in original order.
  Bisect-resilient on 400 (context-length): recursively splits the
  failing shard down to single doc, logs+drops single bad doc with
  zero-vector placeholder so Chroma upsert never sees a gap. Real
  HTTP/connection errors still propagate.
- requirements.txt: chromadb already pinned via template.

## Run

  PPLS_CORPUS_ROOT=/run/media/justin/USB/ppls-corpus \
    OLLAMA_URL=http://host1:11434,http://host2:11434,...  \
    PRODUCT_NAME=ppls \
    python -m rag.index --rebuild

## Build stats

  - 216,467 chunks across 4,066 labels (~53 chunks/label avg)
  - Wall time: 75.7 min on 4 parallel GPU-backed Ollama endpoints
    (Bayer-Crop / BASF / Corteva / FMC / Nufarm / Syngenta / etc.
    chemistry; production Ollama on trashpanda + 2× 192.168.0.2 +
    1× Windows 192.168.0.125)
  - 473 bisect-drops (0.22%) — all from monolithic-table sections
    in 1970s-90s scanned PDFs whose pypdf extracts tokenized past
    the model's context. Acceptable; the dropped chunks were
    garbled OCR with no useful content.
  - Chroma: 2.2 GB persistent SQLite at ./chroma/
  - BM25: 416 MB SQLite FTS5 at ./bm25/ppls_docs.db

## Smoke-test queries (top-3 dense-only)

  "what can I spray on soybeans to control waterhemp"
    → Rage (glyphosate+carfentrazone), Sencor (metribuzin)
  "REI for dicamba on corn"
    → Nufarm Credit (DICAMBA tank-mix restrictions section)
  "fungicide for wheat head scab"
    → MCW 710 SC (azoxystrobin+tebuconazole), Sercadis (fluxapyroxad)

Distances 0.16-0.23. Dense-only quality is OK-not-great in spots
(exactly the failure mode Phase 6 reranker + Phase 8 hybrid BM25
fusion address).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-24 09:56:49 -04:00
parent 92a95d5e78
commit 38141c362e
3 changed files with 431 additions and 150 deletions
+114 -53
View File
@@ -1,15 +1,21 @@
"""Build Chroma (and optionally BM25) indexes from corpus on disk.
"""Build Chroma (and optionally BM25) indexes from the labels corpus.
Reads `corpus/<bundle>/<page>.{md,json}`, chunks each page, upserts
Reads `corpus/<source>/<source_key>.{md,json}`, chunks each label, upserts
into Chroma. With --rebuild, drops + recreates the collection (clean
state). With --bm25-only, skips Chroma and rebuilds only the FTS5
index — useful for fast iteration when chunking didn't change.
The corpus root honors PPLS_CORPUS_ROOT (matching the scrapers).
The Chroma + BM25 stores stay at the repo root because both rely on
filesystem locking semantics that vfat (typical USB drive) doesn't
provide reliably.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import time
from pathlib import Path
from typing import Iterator
@@ -17,74 +23,106 @@ from typing import Iterator
import chromadb
from chromadb.config import Settings
from .chunk import chunks_from_page
from .chunk import chunks_from_label
from .embeddings import embedding_function
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma"
REPO_ROOT = Path(__file__).resolve().parent.parent
CORPUS_ROOT = Path(os.environ.get("PPLS_CORPUS_ROOT") or REPO_ROOT / "corpus")
CHROMA_DIR = Path(os.environ.get("PPLS_CHROMA_DIR") or REPO_ROOT / "chroma")
# Collection name — convention: <product>_docs. Override via env if needed.
import os
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "myproduct")
# Collection name — convention: <product>_docs. Override via env.
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "ppls")
COLLECTION = f"{PRODUCT_NAME}_docs"
def page_records() -> Iterator[dict]:
"""Walk corpus/, yield chunks for every page."""
if not CORPUS.exists():
log.error("corpus/ doesn't exist; run the scraper first")
def _extract_label_metadata(sidecar: dict, source: str, source_key: str) -> dict:
"""Flatten the canonical labels sidecar into a Chroma-friendly metadata
dict (Chroma requires str/int/float/bool values, no nesting/lists)."""
label = sidecar.get("label") or {}
actives = ", ".join(
a["name"] for a in (sidecar.get("active_ingredients") or [])
if isinstance(a, dict) and a.get("name")
)
return {
"source": sidecar.get("source") or source,
"source_key": sidecar.get("source_key") or source_key,
"epa_reg_no": sidecar.get("epa_reg_no") or "",
"product_name": sidecar.get("product_name") or "",
"product_class": sidecar.get("product_class") or "",
"registrant": sidecar.get("registrant") or "",
"signal_word": sidecar.get("signal_word") or "",
"active_ingredients": actives,
"label_url": label.get("url") or "",
"label_accepted_date": label.get("accepted_date") or "",
}
def label_chunks() -> Iterator[dict]:
"""Walk the corpus and yield one chunk dict per chunk per label."""
if not CORPUS_ROOT.exists():
log.error("corpus root %s doesn't exist; run a scraper first", CORPUS_ROOT)
return
for bundle_dir in sorted(CORPUS.iterdir()):
if not bundle_dir.is_dir() or bundle_dir.name.startswith("."):
sources_seen = 0
labels_seen = 0
for source_dir in sorted(CORPUS_ROOT.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
for md_path in sorted(bundle_dir.glob("*.md")):
page_id = md_path.stem
sidecar = md_path.with_suffix(".json")
if not sidecar.exists():
sources_seen += 1
source = source_dir.name
for md_path in sorted(source_dir.glob("*.md")):
source_key = md_path.stem
sidecar_path = md_path.with_suffix(".json")
if not sidecar_path.exists():
log.warning("skipping %s — no JSON sidecar", md_path)
continue
md = md_path.read_text()
meta = json.loads(sidecar.read_text())
# Surface common filter fields at the chunk-metadata level
# so Chroma's `where` filter can use them.
base_meta = {
"bundle_id": bundle_dir.name,
"page_id": page_id,
"title": meta.get("title") or "",
"version": meta.get("version") or "",
"platform": meta.get("platform") or "",
"product": meta.get("product") or "",
}
yield from chunks_from_page(md, page_id, base_meta)
try:
md = md_path.read_text(encoding="utf-8")
sidecar = json.loads(sidecar_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
log.warning("skipping %s — read error: %s", md_path, exc)
continue
base_meta = _extract_label_metadata(sidecar, source, source_key)
labels_seen += 1
yield from chunks_from_label(md, sidecar, base_meta)
log.info("walked %d source(s), %d label(s)", sources_seen, labels_seen)
def upsert_to_chroma(records: list[dict]) -> int:
def upsert_to_chroma(records: list[dict], *, rebuild: bool) -> int:
client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
# Drop + recreate for --rebuild semantics
try:
client.delete_collection(COLLECTION)
except Exception:
pass
col = client.create_collection(COLLECTION, embedding_function=embedding_function())
if rebuild:
try:
client.delete_collection(COLLECTION)
log.info("dropped existing collection %r", COLLECTION)
except Exception:
pass
col = client.get_or_create_collection(
COLLECTION, embedding_function=embedding_function()
)
BATCH = 64
# Match Chroma upsert batch size to the number of parallel Ollama
# endpoints so each one gets a meaningful per-call shard (~64 docs).
# Overridable via env for tuning.
n_urls = max(1, len([u for u in os.environ.get("OLLAMA_URL",
"http://localhost:11434").split(",") if u.strip()]))
BATCH = int(os.environ.get("INDEX_BATCH") or 64 * n_urls)
log.info("upsert batch size: %d (%d URL(s) × 64)", BATCH, n_urls)
total = 0
for i in range(0, len(records), BATCH):
chunk = records[i:i + BATCH]
batch = records[i:i + BATCH]
col.upsert(
ids=[r["id"] for r in chunk],
documents=[r["text"] for r in chunk],
metadatas=[r["metadata"] for r in chunk],
ids=[r["id"] for r in batch],
documents=[r["text"] for r in batch],
metadatas=[r["metadata"] for r in batch],
)
total += len(chunk)
log.info("upserted %d / %d chunks", total, len(records))
total += len(batch)
if total % 1024 == 0 or total == len(records):
log.info("upserted %d / %d chunks", total, len(records))
return total
@@ -94,19 +132,41 @@ def main() -> int:
help="Drop and recreate the Chroma collection.")
p.add_argument("--bm25-only", action="store_true",
help="Rebuild only the BM25 index, skip Chroma.")
p.add_argument("--limit", type=int, default=None,
help="Limit to N labels (smoke testing).")
p.add_argument("--source", action="append",
help="Restrict to one or more source dirs (repeatable).")
p.add_argument("--bm25-db", type=Path,
default=ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db",
default=REPO_ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db",
help="Path to the BM25 sqlite db.")
args = p.parse_args()
log.info("reading corpus from %s", CORPUS)
log.info("corpus root: %s", CORPUS_ROOT)
log.info("chroma dir: %s", CHROMA_DIR)
log.info("collection: %s", COLLECTION)
t0 = time.time()
records = list(page_records())
log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0)
records = []
label_count = 0
last_label_key: str | None = None
for rec in label_chunks():
if args.source and rec["metadata"]["source"] not in args.source:
continue
if args.limit:
key = (rec["metadata"]["source"], rec["metadata"]["source_key"])
if key != last_label_key:
if label_count >= args.limit:
break
label_count += 1
last_label_key = key
records.append(rec)
log.info("loaded %d chunks from %d label(s) in %.1fs",
len(records), label_count or "(all)", time.time() - t0)
if args.bm25_only:
from .bm25 import BM25Index
log.info("--bm25-only: building FTS5 only")
args.bm25_db.parent.mkdir(parents=True, exist_ok=True)
BM25Index(args.bm25_db).build(records)
return 0
@@ -115,14 +175,15 @@ def main() -> int:
return 0
t_c = time.time()
n = upsert_to_chroma(records)
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
n = upsert_to_chroma(records, rebuild=True)
log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c)
# Build BM25 too — see PLAN.md Phase 8. Safe to remove this block
# for products that don't need hybrid retrieval.
# Build BM25 too — see PLAN.md Phase 8.
try:
from .bm25 import BM25Index
t_b = time.time()
args.bm25_db.parent.mkdir(parents=True, exist_ok=True)
BM25Index(args.bm25_db).build(records)
log.info("bm25 done in %.1fs", time.time() - t_b)
except ImportError: