Files
seed-mcp/rag/bm25.py
T
justin a766756a05 Phase 2/3: chunker + indexer + MCP server tools
Phase 2 — Chunking and indexing
- rag/chunk.py: replace template chunker with seed-variety-specific
  chunks_from_variety(). One chunk per variety (varieties are small
  and named-rating retrieval signal is best kept together). Output
  is rebuilt deterministically from the sidecar JSON: every value is
  verbatim from the source, only framing language ("Disease ratings
  (1-9, 9=best):") is template glue. Anti-hallucination contract:
  same sidecar in → same chunk out, never a fabricated rating.
  Metadata flattened to Chroma-safe primitives (str/int/float/bool):
  source, source_key, vendor, brand, crop, product_name,
  product_id, source_url, rm (corn), mg (soy), wheat_class,
  release_year, trait_codes_csv, rating_scale.
- rag/index.py: walks corpus/<source>/<source_key>.json sidecars
  via the new chunker. Default PRODUCT_NAME=crop_seed so the
  Chroma collection is crop_seed_docs.
- rag/bm25.py: filterable columns updated to seed-domain facets
  (source/vendor/brand/crop/source_key) instead of the template's
  version/platform/product.

Phase 3 — MCP server tools wired up
- search_docs: hybrid dense (Chroma) + BM25 (FTS5) retrieval with
  RRF fusion. Optional filters: crop, brand, vendor, source.
  Variety-code prefilter pins exact source_key / product_name /
  hybrid_prefix matches at the top — dense embeddings have no
  semantic neighbor for tokens like "DKC62-08RIB" and RRF can let
  noise float to #1 without this pin. Each response carries the
  variety's source URL inline so the agent can cite.
- get_page(source, source_key): emits a structured ratings header
  (verbatim from sidecar, table per characteristics group, vendor
  positioning, regional listings) followed by the raw indexed body.
  This is the canonical fact-check surface.
- list_versions(): facet discovery — distinct sources, vendors,
  brands, crops across the corpus.
- lookup_variety(source_key, source?): returns the raw sidecar JSON
  for one variety. The agent should call this BEFORE quoting any
  specific rating value to a farmer — guaranteed verbatim.

Smoke tests against 475 indexed Bayer varieties:
- list_versions returns 475 varieties, 1 source, 1 vendor, 3 brands,
  3 crops with correct per-brand counts (288/102/85).
- Semantic ag queries find the right candidates: short-season
  drought-tolerant corn → DKC44-97RIB at RM 94 (in 90-95 band);
  SCN+MG3 soybean → Asgrow XF varieties with explicit SCN R3 ratings;
  Phytophthora Rps3a soy → AG07XF4 (right gene); stripe-rust
  wheat → WestBred WB1376CLP (Yellow Rust 2 = best).
- Variety-code lookups work via prefilter: DKC62-08RIB, AG29XF4,
  WB6430 all return as #1 hit. BM25 confirms ranking unambiguously
  (top-1 score -13.2 vs -8.5 for #2 on "DKC62-08RIB ratings").
- Server boots cleanly in stdio AND streamable-http modes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 13:14:16 -04:00

231 lines
8.5 KiB
Python

"""SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes.
Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a
single-tower dense embedding's weakness on rare technical tokens —
for seed-mcp that's variety codes ("DKC62-08RIB"), trait codes
("XF", "VT2PRIB"), disease abbreviations ("SCN", "SDS", "Goss's"),
and Rps gene names ("Rps1c", "Rps3a"). Dense embeddings don't bridge
queries like "Rps3a soybean" cleanly into the relevant chunk; BM25
matches them directly. Fused with the dense ranking via RRF, the
hybrid result is strictly better than either alone for the queries
we expect from the farm-advisor agent.
Why SQLite FTS5:
- In the stdlib. Zero new deps.
- On-disk. Same persistence model as Chroma — Docker COPY the dir,
`rag.index --rebuild` regenerates from corpus.
- Built-in `bm25()` ranking function. No knobs to tune that matter
for our use case (k1=1.2, b=0.75 defaults are fine).
- Builds <1k chunks in milliseconds; adds nothing to rebuild time.
Schema is two tables to keep filtering clean. FTS5 doesn't filter
nicely on its own columns; the content_rowid pattern keeps an
external metadata table joinable by rowid. For seed-mcp the
filterable columns are seed-domain facets — source, vendor, brand,
crop, source_key — rather than the docs-template version/platform.
"""
from __future__ import annotations
import logging
import re
import sqlite3
from pathlib import Path
from typing import Any
log = logging.getLogger(__name__)
# Default location: bm25/<product>_docs.db at the repo root, next to chroma/.
ROOT = Path(__file__).resolve().parent.parent
DEFAULT_DB_DIR = ROOT / "bm25"
DEFAULT_DB_NAME = "crop_seed_docs.db"
# Columns we expose as filterable metadata. Mirrors what
# ``docs_mcp.server._build_where`` accepts so the same filter dict
# works for both Chroma and BM25 without per-retriever translation.
FILTER_COLUMNS = ("source", "vendor", "brand", "crop", "source_key", "ordinal")
# Allowlist tokenizer for free-text queries. FTS5's parser chokes on
# lots of punctuation we routinely see in farmer queries ("Rps1c",
# "SCN-resistant", "0.05 MG", em-dashes). Rather than blocklist every
# operator, keep alphanumerics + a few separators and replace
# everything else with a space.
_KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]")
# FTS5 reserves these Boolean operator KEYWORDS at the token level.
_BOOLEAN_KW_RE = re.compile(r"(?<!\w)(AND|OR|NOT|NEAR)(?!\w)")
def _sanitize_query(text: str) -> str:
"""Reduce a natural-language query to an FTS5 OR-of-tokens query.
See ``crop-chem-docs`` for the rationale; same transformation
applies here. OR semantics maximizes recall — BM25 already
weights documents with more query-term matches higher.
"""
cleaned = _KEEP_RE.sub(" ", text)
cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned)
tokens = cleaned.split()
if not tokens:
return ""
return " OR ".join(tokens)
def _where_to_sql(where: dict | None) -> tuple[str, list[Any]]:
"""Translate a Chroma-shaped filter dict into a SQL fragment + params.
Accepts the same shapes ``docs_mcp.server._build_where`` produces:
None → ("", [])
{"crop": "corn"} → ("AND m.crop = ?", ["corn"])
{"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...])
Unknown keys are silently dropped (defensive — better to over-match
than to crash on a filter we don't know).
"""
if not where:
return "", []
parts: list[str] = []
params: list[Any] = []
def _emit_eq(cond: dict[str, Any]) -> None:
for k, v in cond.items():
if k in FILTER_COLUMNS:
parts.append(f"m.{k} = ?")
params.append(v)
if "$and" in where:
for sub in where["$and"]:
_emit_eq(sub)
else:
_emit_eq(where)
if not parts:
return "", []
return "AND " + " AND ".join(parts), params
class BM25Index:
"""Thin wrapper around an FTS5-backed sqlite db.
Single-writer model. Reads are connection-per-call (sqlite handles
concurrency through file locks; for our read-heavy workload that's
fine and avoids cross-thread connection sharing issues with the MCP
server's request handlers).
"""
def __init__(self, db_path: Path | None = None):
self.db_path = Path(db_path) if db_path else (DEFAULT_DB_DIR / DEFAULT_DB_NAME)
# -- build ----------------------------------------------------------
def build(self, records: list[dict]) -> int:
"""Rebuild the index from scratch from `records`.
`records` is the same list ``rag.index.variety_records`` produces:
``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk
insert wrapped in a transaction.
"""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
if self.db_path.exists():
self.db_path.unlink()
with sqlite3.connect(self.db_path) as con:
con.executescript(self._schema_sql())
con.executemany(
"INSERT INTO chunks_meta "
"(id, source, vendor, brand, crop, source_key, ordinal) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
[
(
r["id"],
r["metadata"].get("source") or "",
r["metadata"].get("vendor") or "",
r["metadata"].get("brand") or "",
r["metadata"].get("crop") or "",
r["metadata"].get("source_key") or "",
int(r["metadata"].get("ordinal") or 0),
)
for r in records
],
)
con.executemany(
"INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)",
[
(i + 1, r["text"])
for i, r in enumerate(records)
],
)
con.commit()
log.info("bm25: indexed %d chunks → %s", len(records), self.db_path)
return len(records)
# -- query ----------------------------------------------------------
def query(
self,
text: str,
n: int = 200,
where: dict | None = None,
) -> list[tuple[str, float]]:
"""Return up to `n` (chunk_id, bm25_score) pairs, lowest score first.
FTS5's bm25() returns NEGATIVE numbers — more relevant docs have
smaller (more negative) scores. We order ASC so the first row is
the most relevant.
"""
sanitized = _sanitize_query(text)
if not sanitized:
return []
where_sql, params = _where_to_sql(where)
sql = (
"SELECT m.id, bm25(chunks_fts) AS score "
"FROM chunks_fts "
"JOIN chunks_meta m ON m.rowid = chunks_fts.rowid "
f"WHERE chunks_fts MATCH ? {where_sql} "
"ORDER BY bm25(chunks_fts) "
"LIMIT ?"
)
try:
with sqlite3.connect(self.db_path) as con:
cur = con.execute(sql, [sanitized, *params, n])
return [(row[0], float(row[1])) for row in cur.fetchall()]
except sqlite3.OperationalError as e:
log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80])
return []
def exists(self) -> bool:
return self.db_path.exists()
def count(self) -> int:
if not self.exists():
return 0
try:
with sqlite3.connect(self.db_path) as con:
return con.execute("SELECT COUNT(*) FROM chunks_meta").fetchone()[0]
except sqlite3.OperationalError:
return 0
# -- schema ---------------------------------------------------------
@staticmethod
def _schema_sql() -> str:
return """
CREATE TABLE chunks_meta (
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT UNIQUE NOT NULL,
source TEXT,
vendor TEXT,
brand TEXT,
crop TEXT,
source_key TEXT,
ordinal INTEGER
);
CREATE INDEX idx_meta_source ON chunks_meta(source);
CREATE INDEX idx_meta_crop ON chunks_meta(crop);
CREATE INDEX idx_meta_brand ON chunks_meta(brand);
CREATE INDEX idx_meta_source_key ON chunks_meta(source_key);
CREATE VIRTUAL TABLE chunks_fts USING fts5(
text,
tokenize = 'porter unicode61 remove_diacritics 2'
);
"""