"""Label chunker — section-aware first, paragraph-aware fallback, ~500 token target. EPA pesticide labels have very consistent section headings (DIRECTIONS FOR USE, PRECAUTIONARY STATEMENTS, FIRST AID, ENVIRONMENTAL HAZARDS, STORAGE AND DISPOSAL, RESTRICTIONS, etc.). When pypdf extracts the text it preserves these as ALL-CAPS lines but doesn't reliably mark them as markdown headings. This chunker detects them heuristically and uses them as natural chunk boundaries — that keeps "what's the PHI for Warrant on soybeans" returning the directions block, not a half-paragraph from environmental hazards. The output shape (id, text, metadata) is fixed by the downstream Chroma + BM25 indexing in rag/index.py — don't change it. Chunk 0 is a synthetic anchor crafted specifically for label retrieval: it includes product name, EPA Reg No, registrant, signal word, and active ingredients up front, then appends a keyword bag so BM25 hits on exact terms (chemistry names, reg numbers, manufacturer brands). """ from __future__ import annotations import re from typing import Iterator CHARS_PER_TOKEN = 4 TARGET_TOKENS = 500 TARGET_CHARS = TARGET_TOKENS * CHARS_PER_TOKEN MIN_CHUNK_CHARS = 200 # don't emit microscopic chunks; merge upward # Hard ceiling per chunk. nomic-embed-text trains at n_ctx=2048; we leave # headroom for tokenizer variance. A single paragraph longer than this # gets force-split at the nearest sentence (or, failing that, at the # nearest char boundary) so no chunk can blow the embedder's context # window. EPA labels sometimes have monolithic crop+rate tables or # all-caps precautionary blocks that exceed TARGET_CHARS by 10×. MAX_CHUNK_CHARS = 4000 # ~1000 tokens; tightened after seeing 400s from # an older Ollama instance with a stricter context limit # Heuristic detector for EPA-label-style ALL-CAPS section headings. # - Line is ALL CAPS (with optional punctuation, ampersands, digits, parens) # - Length between 3 and 80 chars # - Doesn't start with a list bullet, table delimiter, or markdown stuff _SECTION_HEADING_RE = re.compile( r"^[A-Z0-9][A-Z0-9 \-\&,\(\)/\.\:]{2,79}$" ) def estimate_tokens(text: str) -> int: return max(1, len(text) // CHARS_PER_TOKEN) def _looks_like_section_heading(line: str) -> bool: """True if line is a plausible EPA-label section heading.""" s = line.strip() if not (3 <= len(s) <= 80): return False # Must contain at least one letter; reject pure-numeric lines if not any(c.isalpha() for c in s): return False # Must be all caps — quick check via .upper() round-trip if s != s.upper(): return False # Reject obvious table rows (many digits, commas, percents) if sum(c.isdigit() for c in s) > len(s) // 2: return False # Reject lines that start with non-heading punctuation if s[0] in "•·-*[(\"": return False return bool(_SECTION_HEADING_RE.match(s)) def split_into_blocks(md: str) -> list[tuple[str, str]]: """Split label markdown into (kind, text) blocks. kind ∈ {"heading", "para"}. Headings are either markdown `#` lines or detected ALL-CAPS section headings. Paragraphs are runs of non-blank lines between headings or blank-line separators. """ blocks: list[tuple[str, str]] = [] current: list[str] = [] for raw in md.splitlines(): line = raw.rstrip() if line.startswith("#"): if current: blocks.append(("para", "\n".join(current).strip())) current = [] blocks.append(("heading", line.lstrip("#").strip())) continue if _looks_like_section_heading(line): if current: blocks.append(("para", "\n".join(current).strip())) current = [] blocks.append(("heading", line.strip())) continue if not line: if current: blocks.append(("para", "\n".join(current).strip())) current = [] continue current.append(line) if current: blocks.append(("para", "\n".join(current).strip())) return [b for b in blocks if b[1]] def _build_chunk0(sidecar: dict, meta: dict) -> str: """Synthetic anchor chunk — front-loads everything a farmer might search by (product name, EPA reg, registrant, actives, signal word, class) so dense retrieval and BM25 both land cleanly.""" product_name = sidecar.get("product_name") or meta.get("source_key") or "(unnamed)" epa = sidecar.get("epa_reg_no") or "—" registrant = sidecar.get("registrant") or "" signal = sidecar.get("signal_word") or "—" pclass = sidecar.get("product_class") or "" actives_list = [ a["name"] for a in (sidecar.get("active_ingredients") or []) if isinstance(a, dict) and a.get("name") ] actives = "; ".join(actives_list) or "—" src = sidecar.get("source") or meta.get("source") or "" header = ( f"# {product_name}\n\n" f"EPA Reg No: {epa}\n" f"Registrant: {registrant or '(unknown)'}\n" f"Source: {src}\n" f"Product class: {pclass or '(unspecified)'}\n" f"Signal word: {signal}\n" f"Active ingredients: {actives}\n" ) # Keyword bag for BM25 — repeats the high-signal exact terms. bag_terms: list[str] = [] if product_name: bag_terms.append(product_name) if epa and epa != "—": bag_terms.append(epa) if registrant: bag_terms.append(registrant) bag_terms.extend(actives_list) if pclass: bag_terms.append(pclass) keyword_bag = "Keywords: " + ", ".join(bag_terms) if bag_terms else "" return header + ("\n" + keyword_bag + "\n" if keyword_bag else "") def _force_split(text: str, max_chars: int = MAX_CHUNK_CHARS) -> list[str]: """Split an oversized paragraph at sentence boundaries when possible, falling back to brutal char-boundary splits. Used as a last resort so MAX_CHUNK_CHARS is genuinely enforced.""" if len(text) <= max_chars: return [text] # Try sentence-ish splits first pieces: list[str] = [] buf = "" for sent in re.split(r"(?<=[.!?])\s+", text): if not sent: continue if buf and len(buf) + 1 + len(sent) > max_chars: pieces.append(buf) buf = sent else: buf = (buf + " " + sent) if buf else sent # Sentence alone exceeds limit — brutal split while len(buf) > max_chars: pieces.append(buf[:max_chars]) buf = buf[max_chars:] if buf: pieces.append(buf) return pieces def chunks_from_label( md: str, sidecar: dict, metadata: dict, ) -> Iterator[dict]: """Yield chunk dicts ready for rag.index to upsert. Chunk 0 is the synthetic anchor (always emitted). Body chunks pack label sections together, splitting only when ~TARGET_CHARS is reached. Each chunk is tagged with the current section heading so retrieval can surface section context. """ source = metadata["source"] source_key = metadata["source_key"] # Chunk 0 yield { "id": f"{source}::{source_key}::0", "text": _build_chunk0(sidecar, metadata), "metadata": {**metadata, "ordinal": 0, "section": "header"}, } blocks = split_into_blocks(md) if not blocks: return ordinal = 1 buf: list[str] = [] buf_chars = 0 current_section = "" def flush() -> Iterator[dict]: nonlocal ordinal, buf, buf_chars if not buf or buf_chars < MIN_CHUNK_CHARS: return text = "\n\n".join(buf).strip() yield { "id": f"{source}::{source_key}::{ordinal}", "text": text, "metadata": {**metadata, "ordinal": ordinal, "section": current_section[:80]}, } ordinal += 1 buf = [] buf_chars = 0 def _flush_with_section_repeat() -> Iterator[dict]: """Flush current buffer, then re-seed buffer with section heading for continuity in the next chunk.""" yield from flush() if current_section: buf.append(f"## {current_section}") # `nonlocal buf_chars` not needed inside this closure since we # mutate via append; manage buf_chars at call site. for kind, text in blocks: if kind == "heading": yield from flush() current_section = text buf.append(f"## {text}") buf_chars += len(text) + 4 continue # Defend against oversized paragraphs (massive crop/rate tables, # all-caps precautionary blocks) — split them first. for piece in _force_split(text): # If a single piece would push us past TARGET (and we already # have a reasonable buffer), flush before adding. if buf_chars + len(piece) > TARGET_CHARS and buf_chars >= MIN_CHUNK_CHARS: yield from flush() if current_section: buf.append(f"## {current_section}") buf_chars += len(current_section) + 4 # If the piece alone exceeds TARGET (still under MAX after # force-split), emit it as its own chunk to avoid bloating. if len(piece) > TARGET_CHARS: yield from flush() if current_section: buf.append(f"## {current_section}") buf_chars += len(current_section) + 4 buf.append(piece) buf_chars += len(piece) yield from flush() continue buf.append(piece) buf_chars += len(piece) yield from flush()