Files
justin 38141c362e Phase 2: chunking + parallel Ollama embeddings + Chroma + BM25 indexes
End-to-end RAG pipeline for the pesticide-labels corpus. From the
4,066 labels on USB, the indexer produces 216,467 chunks, embeds
them via N parallel Ollama endpoints, upserts to Chroma, and builds
a BM25 lexical index.

## Files

- rag/index.py: adapted to labels schema (source / source_key /
  epa_reg_no / product_name / product_class / registrant /
  signal_word / active_ingredients flattened for Chroma where-filter);
  honors PPLS_CORPUS_ROOT (corpus on USB) and PPLS_CHROMA_DIR;
  upsert batch size auto-tuned to 64 * N URLs; --limit + --source
  flags for incremental work.
- rag/chunk.py: label-aware. ALL-CAPS section heading detector
  (heuristic) for EPA labels alongside markdown `#` headings.
  TARGET_CHARS=2000 (~500 tokens), MAX_CHUNK_CHARS=4000 (~1000
  tokens) hard cap with _force_split sentence/char fallback to
  defend against monolithic crop+rate tables. Chunk 0 is a synthetic
  anchor with product name, EPA Reg No, registrant, signal word,
  product class, active ingredients + keyword bag for joint
  dense/BM25 retrieval.
- rag/embeddings.py: parallel-dispatch across N Ollama URLs via
  ThreadPoolExecutor. Each __call__ stride-slices input into N
  shards, fires N concurrent HTTP requests, joins in original order.
  Bisect-resilient on 400 (context-length): recursively splits the
  failing shard down to single doc, logs+drops single bad doc with
  zero-vector placeholder so Chroma upsert never sees a gap. Real
  HTTP/connection errors still propagate.
- requirements.txt: chromadb already pinned via template.

## Run

  PPLS_CORPUS_ROOT=/run/media/justin/USB/ppls-corpus \
    OLLAMA_URL=http://host1:11434,http://host2:11434,...  \
    PRODUCT_NAME=ppls \
    python -m rag.index --rebuild

## Build stats

  - 216,467 chunks across 4,066 labels (~53 chunks/label avg)
  - Wall time: 75.7 min on 4 parallel GPU-backed Ollama endpoints
    (Bayer-Crop / BASF / Corteva / FMC / Nufarm / Syngenta / etc.
    chemistry; production Ollama on trashpanda + 2× 192.168.0.2 +
    1× Windows 192.168.0.125)
  - 473 bisect-drops (0.22%) — all from monolithic-table sections
    in 1970s-90s scanned PDFs whose pypdf extracts tokenized past
    the model's context. Acceptable; the dropped chunks were
    garbled OCR with no useful content.
  - Chroma: 2.2 GB persistent SQLite at ./chroma/
  - BM25: 416 MB SQLite FTS5 at ./bm25/ppls_docs.db

## Smoke-test queries (top-3 dense-only)

  "what can I spray on soybeans to control waterhemp"
    → Rage (glyphosate+carfentrazone), Sencor (metribuzin)
  "REI for dicamba on corn"
    → Nufarm Credit (DICAMBA tank-mix restrictions section)
  "fungicide for wheat head scab"
    → MCW 710 SC (azoxystrobin+tebuconazole), Sercadis (fluxapyroxad)

Distances 0.16-0.23. Dense-only quality is OK-not-great in spots
(exactly the failure mode Phase 6 reranker + Phase 8 hybrid BM25
fusion address).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 09:56:49 -04:00

258 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Label chunker — section-aware first, paragraph-aware fallback, ~500 token target.
EPA pesticide labels have very consistent section headings (DIRECTIONS
FOR USE, PRECAUTIONARY STATEMENTS, FIRST AID, ENVIRONMENTAL HAZARDS,
STORAGE AND DISPOSAL, RESTRICTIONS, etc.). When pypdf extracts the
text it preserves these as ALL-CAPS lines but doesn't reliably mark
them as markdown headings. This chunker detects them heuristically
and uses them as natural chunk boundaries — that keeps "what's the
PHI for Warrant on soybeans" returning the directions block, not a
half-paragraph from environmental hazards.
The output shape (id, text, metadata) is fixed by the downstream
Chroma + BM25 indexing in rag/index.py — don't change it.
Chunk 0 is a synthetic anchor crafted specifically for label retrieval:
it includes product name, EPA Reg No, registrant, signal word, and
active ingredients up front, then appends a keyword bag so BM25 hits
on exact terms (chemistry names, reg numbers, manufacturer brands).
"""
from __future__ import annotations
import re
from typing import Iterator
CHARS_PER_TOKEN = 4
TARGET_TOKENS = 500
TARGET_CHARS = TARGET_TOKENS * CHARS_PER_TOKEN
MIN_CHUNK_CHARS = 200 # don't emit microscopic chunks; merge upward
# Hard ceiling per chunk. nomic-embed-text trains at n_ctx=2048; we leave
# headroom for tokenizer variance. A single paragraph longer than this
# gets force-split at the nearest sentence (or, failing that, at the
# nearest char boundary) so no chunk can blow the embedder's context
# window. EPA labels sometimes have monolithic crop+rate tables or
# all-caps precautionary blocks that exceed TARGET_CHARS by 10×.
MAX_CHUNK_CHARS = 4000 # ~1000 tokens; tightened after seeing 400s from
# an older Ollama instance with a stricter context limit
# Heuristic detector for EPA-label-style ALL-CAPS section headings.
# - Line is ALL CAPS (with optional punctuation, ampersands, digits, parens)
# - Length between 3 and 80 chars
# - Doesn't start with a list bullet, table delimiter, or markdown stuff
_SECTION_HEADING_RE = re.compile(
r"^[A-Z0-9][A-Z0-9 \-\&,\(\)/\.\:]{2,79}$"
)
def estimate_tokens(text: str) -> int:
return max(1, len(text) // CHARS_PER_TOKEN)
def _looks_like_section_heading(line: str) -> bool:
"""True if line is a plausible EPA-label section heading."""
s = line.strip()
if not (3 <= len(s) <= 80):
return False
# Must contain at least one letter; reject pure-numeric lines
if not any(c.isalpha() for c in s):
return False
# Must be all caps — quick check via .upper() round-trip
if s != s.upper():
return False
# Reject obvious table rows (many digits, commas, percents)
if sum(c.isdigit() for c in s) > len(s) // 2:
return False
# Reject lines that start with non-heading punctuation
if s[0] in "•·-*[(\"":
return False
return bool(_SECTION_HEADING_RE.match(s))
def split_into_blocks(md: str) -> list[tuple[str, str]]:
"""Split label markdown into (kind, text) blocks.
kind ∈ {"heading", "para"}. Headings are either markdown `#` lines
or detected ALL-CAPS section headings. Paragraphs are runs of
non-blank lines between headings or blank-line separators.
"""
blocks: list[tuple[str, str]] = []
current: list[str] = []
for raw in md.splitlines():
line = raw.rstrip()
if line.startswith("#"):
if current:
blocks.append(("para", "\n".join(current).strip()))
current = []
blocks.append(("heading", line.lstrip("#").strip()))
continue
if _looks_like_section_heading(line):
if current:
blocks.append(("para", "\n".join(current).strip()))
current = []
blocks.append(("heading", line.strip()))
continue
if not line:
if current:
blocks.append(("para", "\n".join(current).strip()))
current = []
continue
current.append(line)
if current:
blocks.append(("para", "\n".join(current).strip()))
return [b for b in blocks if b[1]]
def _build_chunk0(sidecar: dict, meta: dict) -> str:
"""Synthetic anchor chunk — front-loads everything a farmer might
search by (product name, EPA reg, registrant, actives, signal word,
class) so dense retrieval and BM25 both land cleanly."""
product_name = sidecar.get("product_name") or meta.get("source_key") or "(unnamed)"
epa = sidecar.get("epa_reg_no") or "—"
registrant = sidecar.get("registrant") or ""
signal = sidecar.get("signal_word") or "—"
pclass = sidecar.get("product_class") or ""
actives_list = [
a["name"] for a in (sidecar.get("active_ingredients") or [])
if isinstance(a, dict) and a.get("name")
]
actives = "; ".join(actives_list) or "—"
src = sidecar.get("source") or meta.get("source") or ""
header = (
f"# {product_name}\n\n"
f"EPA Reg No: {epa}\n"
f"Registrant: {registrant or '(unknown)'}\n"
f"Source: {src}\n"
f"Product class: {pclass or '(unspecified)'}\n"
f"Signal word: {signal}\n"
f"Active ingredients: {actives}\n"
)
# Keyword bag for BM25 — repeats the high-signal exact terms.
bag_terms: list[str] = []
if product_name: bag_terms.append(product_name)
if epa and epa != "—": bag_terms.append(epa)
if registrant: bag_terms.append(registrant)
bag_terms.extend(actives_list)
if pclass: bag_terms.append(pclass)
keyword_bag = "Keywords: " + ", ".join(bag_terms) if bag_terms else ""
return header + ("\n" + keyword_bag + "\n" if keyword_bag else "")
def _force_split(text: str, max_chars: int = MAX_CHUNK_CHARS) -> list[str]:
"""Split an oversized paragraph at sentence boundaries when possible,
falling back to brutal char-boundary splits. Used as a last resort
so MAX_CHUNK_CHARS is genuinely enforced."""
if len(text) <= max_chars:
return [text]
# Try sentence-ish splits first
pieces: list[str] = []
buf = ""
for sent in re.split(r"(?<=[.!?])\s+", text):
if not sent:
continue
if buf and len(buf) + 1 + len(sent) > max_chars:
pieces.append(buf)
buf = sent
else:
buf = (buf + " " + sent) if buf else sent
# Sentence alone exceeds limit — brutal split
while len(buf) > max_chars:
pieces.append(buf[:max_chars])
buf = buf[max_chars:]
if buf:
pieces.append(buf)
return pieces
def chunks_from_label(
md: str,
sidecar: dict,
metadata: dict,
) -> Iterator[dict]:
"""Yield chunk dicts ready for rag.index to upsert.
Chunk 0 is the synthetic anchor (always emitted). Body chunks pack
label sections together, splitting only when ~TARGET_CHARS is
reached. Each chunk is tagged with the current section heading
so retrieval can surface section context.
"""
source = metadata["source"]
source_key = metadata["source_key"]
# Chunk 0
yield {
"id": f"{source}::{source_key}::0",
"text": _build_chunk0(sidecar, metadata),
"metadata": {**metadata, "ordinal": 0, "section": "header"},
}
blocks = split_into_blocks(md)
if not blocks:
return
ordinal = 1
buf: list[str] = []
buf_chars = 0
current_section = ""
def flush() -> Iterator[dict]:
nonlocal ordinal, buf, buf_chars
if not buf or buf_chars < MIN_CHUNK_CHARS:
return
text = "\n\n".join(buf).strip()
yield {
"id": f"{source}::{source_key}::{ordinal}",
"text": text,
"metadata": {**metadata, "ordinal": ordinal, "section": current_section[:80]},
}
ordinal += 1
buf = []
buf_chars = 0
def _flush_with_section_repeat() -> Iterator[dict]:
"""Flush current buffer, then re-seed buffer with section heading
for continuity in the next chunk."""
yield from flush()
if current_section:
buf.append(f"## {current_section}")
# `nonlocal buf_chars` not needed inside this closure since we
# mutate via append; manage buf_chars at call site.
for kind, text in blocks:
if kind == "heading":
yield from flush()
current_section = text
buf.append(f"## {text}")
buf_chars += len(text) + 4
continue
# Defend against oversized paragraphs (massive crop/rate tables,
# all-caps precautionary blocks) — split them first.
for piece in _force_split(text):
# If a single piece would push us past TARGET (and we already
# have a reasonable buffer), flush before adding.
if buf_chars + len(piece) > TARGET_CHARS and buf_chars >= MIN_CHUNK_CHARS:
yield from flush()
if current_section:
buf.append(f"## {current_section}")
buf_chars += len(current_section) + 4
# If the piece alone exceeds TARGET (still under MAX after
# force-split), emit it as its own chunk to avoid bloating.
if len(piece) > TARGET_CHARS:
yield from flush()
if current_section:
buf.append(f"## {current_section}")
buf_chars += len(current_section) + 4
buf.append(piece)
buf_chars += len(piece)
yield from flush()
continue
buf.append(piece)
buf_chars += len(piece)
yield from flush()