a766756a05
Phase 2 — Chunking and indexing
- rag/chunk.py: replace template chunker with seed-variety-specific
chunks_from_variety(). One chunk per variety (varieties are small
and named-rating retrieval signal is best kept together). Output
is rebuilt deterministically from the sidecar JSON: every value is
verbatim from the source, only framing language ("Disease ratings
(1-9, 9=best):") is template glue. Anti-hallucination contract:
same sidecar in → same chunk out, never a fabricated rating.
Metadata flattened to Chroma-safe primitives (str/int/float/bool):
source, source_key, vendor, brand, crop, product_name,
product_id, source_url, rm (corn), mg (soy), wheat_class,
release_year, trait_codes_csv, rating_scale.
- rag/index.py: walks corpus/<source>/<source_key>.json sidecars
via the new chunker. Default PRODUCT_NAME=crop_seed so the
Chroma collection is crop_seed_docs.
- rag/bm25.py: filterable columns updated to seed-domain facets
(source/vendor/brand/crop/source_key) instead of the template's
version/platform/product.
Phase 3 — MCP server tools wired up
- search_docs: hybrid dense (Chroma) + BM25 (FTS5) retrieval with
RRF fusion. Optional filters: crop, brand, vendor, source.
Variety-code prefilter pins exact source_key / product_name /
hybrid_prefix matches at the top — dense embeddings have no
semantic neighbor for tokens like "DKC62-08RIB" and RRF can let
noise float to #1 without this pin. Each response carries the
variety's source URL inline so the agent can cite.
- get_page(source, source_key): emits a structured ratings header
(verbatim from sidecar, table per characteristics group, vendor
positioning, regional listings) followed by the raw indexed body.
This is the canonical fact-check surface.
- list_versions(): facet discovery — distinct sources, vendors,
brands, crops across the corpus.
- lookup_variety(source_key, source?): returns the raw sidecar JSON
for one variety. The agent should call this BEFORE quoting any
specific rating value to a farmer — guaranteed verbatim.
Smoke tests against 475 indexed Bayer varieties:
- list_versions returns 475 varieties, 1 source, 1 vendor, 3 brands,
3 crops with correct per-brand counts (288/102/85).
- Semantic ag queries find the right candidates: short-season
drought-tolerant corn → DKC44-97RIB at RM 94 (in 90-95 band);
SCN+MG3 soybean → Asgrow XF varieties with explicit SCN R3 ratings;
Phytophthora Rps3a soy → AG07XF4 (right gene); stripe-rust
wheat → WestBred WB1376CLP (Yellow Rust 2 = best).
- Variety-code lookups work via prefilter: DKC62-08RIB, AG29XF4,
WB6430 all return as #1 hit. BM25 confirms ranking unambiguously
(top-1 score -13.2 vs -8.5 for #2 on "DKC62-08RIB ratings").
- Server boots cleanly in stdio AND streamable-http modes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
231 lines
8.5 KiB
Python
231 lines
8.5 KiB
Python
"""SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes.
|
|
|
|
Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a
|
|
single-tower dense embedding's weakness on rare technical tokens —
|
|
for seed-mcp that's variety codes ("DKC62-08RIB"), trait codes
|
|
("XF", "VT2PRIB"), disease abbreviations ("SCN", "SDS", "Goss's"),
|
|
and Rps gene names ("Rps1c", "Rps3a"). Dense embeddings don't bridge
|
|
queries like "Rps3a soybean" cleanly into the relevant chunk; BM25
|
|
matches them directly. Fused with the dense ranking via RRF, the
|
|
hybrid result is strictly better than either alone for the queries
|
|
we expect from the farm-advisor agent.
|
|
|
|
Why SQLite FTS5:
|
|
- In the stdlib. Zero new deps.
|
|
- On-disk. Same persistence model as Chroma — Docker COPY the dir,
|
|
`rag.index --rebuild` regenerates from corpus.
|
|
- Built-in `bm25()` ranking function. No knobs to tune that matter
|
|
for our use case (k1=1.2, b=0.75 defaults are fine).
|
|
- Builds <1k chunks in milliseconds; adds nothing to rebuild time.
|
|
|
|
Schema is two tables to keep filtering clean. FTS5 doesn't filter
|
|
nicely on its own columns; the content_rowid pattern keeps an
|
|
external metadata table joinable by rowid. For seed-mcp the
|
|
filterable columns are seed-domain facets — source, vendor, brand,
|
|
crop, source_key — rather than the docs-template version/platform.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Default location: bm25/<product>_docs.db at the repo root, next to chroma/.
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
DEFAULT_DB_DIR = ROOT / "bm25"
|
|
DEFAULT_DB_NAME = "crop_seed_docs.db"
|
|
|
|
# Columns we expose as filterable metadata. Mirrors what
|
|
# ``docs_mcp.server._build_where`` accepts so the same filter dict
|
|
# works for both Chroma and BM25 without per-retriever translation.
|
|
FILTER_COLUMNS = ("source", "vendor", "brand", "crop", "source_key", "ordinal")
|
|
|
|
|
|
# Allowlist tokenizer for free-text queries. FTS5's parser chokes on
|
|
# lots of punctuation we routinely see in farmer queries ("Rps1c",
|
|
# "SCN-resistant", "0.05 MG", em-dashes). Rather than blocklist every
|
|
# operator, keep alphanumerics + a few separators and replace
|
|
# everything else with a space.
|
|
_KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]")
|
|
# FTS5 reserves these Boolean operator KEYWORDS at the token level.
|
|
_BOOLEAN_KW_RE = re.compile(r"(?<!\w)(AND|OR|NOT|NEAR)(?!\w)")
|
|
|
|
|
|
def _sanitize_query(text: str) -> str:
|
|
"""Reduce a natural-language query to an FTS5 OR-of-tokens query.
|
|
|
|
See ``crop-chem-docs`` for the rationale; same transformation
|
|
applies here. OR semantics maximizes recall — BM25 already
|
|
weights documents with more query-term matches higher.
|
|
"""
|
|
cleaned = _KEEP_RE.sub(" ", text)
|
|
cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned)
|
|
tokens = cleaned.split()
|
|
if not tokens:
|
|
return ""
|
|
return " OR ".join(tokens)
|
|
|
|
|
|
def _where_to_sql(where: dict | None) -> tuple[str, list[Any]]:
|
|
"""Translate a Chroma-shaped filter dict into a SQL fragment + params.
|
|
|
|
Accepts the same shapes ``docs_mcp.server._build_where`` produces:
|
|
|
|
None → ("", [])
|
|
{"crop": "corn"} → ("AND m.crop = ?", ["corn"])
|
|
{"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...])
|
|
|
|
Unknown keys are silently dropped (defensive — better to over-match
|
|
than to crash on a filter we don't know).
|
|
"""
|
|
if not where:
|
|
return "", []
|
|
parts: list[str] = []
|
|
params: list[Any] = []
|
|
|
|
def _emit_eq(cond: dict[str, Any]) -> None:
|
|
for k, v in cond.items():
|
|
if k in FILTER_COLUMNS:
|
|
parts.append(f"m.{k} = ?")
|
|
params.append(v)
|
|
|
|
if "$and" in where:
|
|
for sub in where["$and"]:
|
|
_emit_eq(sub)
|
|
else:
|
|
_emit_eq(where)
|
|
if not parts:
|
|
return "", []
|
|
return "AND " + " AND ".join(parts), params
|
|
|
|
|
|
class BM25Index:
|
|
"""Thin wrapper around an FTS5-backed sqlite db.
|
|
|
|
Single-writer model. Reads are connection-per-call (sqlite handles
|
|
concurrency through file locks; for our read-heavy workload that's
|
|
fine and avoids cross-thread connection sharing issues with the MCP
|
|
server's request handlers).
|
|
"""
|
|
|
|
def __init__(self, db_path: Path | None = None):
|
|
self.db_path = Path(db_path) if db_path else (DEFAULT_DB_DIR / DEFAULT_DB_NAME)
|
|
|
|
# -- build ----------------------------------------------------------
|
|
|
|
def build(self, records: list[dict]) -> int:
|
|
"""Rebuild the index from scratch from `records`.
|
|
|
|
`records` is the same list ``rag.index.variety_records`` produces:
|
|
``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk
|
|
insert wrapped in a transaction.
|
|
"""
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if self.db_path.exists():
|
|
self.db_path.unlink()
|
|
with sqlite3.connect(self.db_path) as con:
|
|
con.executescript(self._schema_sql())
|
|
con.executemany(
|
|
"INSERT INTO chunks_meta "
|
|
"(id, source, vendor, brand, crop, source_key, ordinal) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
[
|
|
(
|
|
r["id"],
|
|
r["metadata"].get("source") or "",
|
|
r["metadata"].get("vendor") or "",
|
|
r["metadata"].get("brand") or "",
|
|
r["metadata"].get("crop") or "",
|
|
r["metadata"].get("source_key") or "",
|
|
int(r["metadata"].get("ordinal") or 0),
|
|
)
|
|
for r in records
|
|
],
|
|
)
|
|
con.executemany(
|
|
"INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)",
|
|
[
|
|
(i + 1, r["text"])
|
|
for i, r in enumerate(records)
|
|
],
|
|
)
|
|
con.commit()
|
|
log.info("bm25: indexed %d chunks → %s", len(records), self.db_path)
|
|
return len(records)
|
|
|
|
# -- query ----------------------------------------------------------
|
|
|
|
def query(
|
|
self,
|
|
text: str,
|
|
n: int = 200,
|
|
where: dict | None = None,
|
|
) -> list[tuple[str, float]]:
|
|
"""Return up to `n` (chunk_id, bm25_score) pairs, lowest score first.
|
|
|
|
FTS5's bm25() returns NEGATIVE numbers — more relevant docs have
|
|
smaller (more negative) scores. We order ASC so the first row is
|
|
the most relevant.
|
|
"""
|
|
sanitized = _sanitize_query(text)
|
|
if not sanitized:
|
|
return []
|
|
where_sql, params = _where_to_sql(where)
|
|
sql = (
|
|
"SELECT m.id, bm25(chunks_fts) AS score "
|
|
"FROM chunks_fts "
|
|
"JOIN chunks_meta m ON m.rowid = chunks_fts.rowid "
|
|
f"WHERE chunks_fts MATCH ? {where_sql} "
|
|
"ORDER BY bm25(chunks_fts) "
|
|
"LIMIT ?"
|
|
)
|
|
try:
|
|
with sqlite3.connect(self.db_path) as con:
|
|
cur = con.execute(sql, [sanitized, *params, n])
|
|
return [(row[0], float(row[1])) for row in cur.fetchall()]
|
|
except sqlite3.OperationalError as e:
|
|
log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80])
|
|
return []
|
|
|
|
def exists(self) -> bool:
|
|
return self.db_path.exists()
|
|
|
|
def count(self) -> int:
|
|
if not self.exists():
|
|
return 0
|
|
try:
|
|
with sqlite3.connect(self.db_path) as con:
|
|
return con.execute("SELECT COUNT(*) FROM chunks_meta").fetchone()[0]
|
|
except sqlite3.OperationalError:
|
|
return 0
|
|
|
|
# -- schema ---------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _schema_sql() -> str:
|
|
return """
|
|
CREATE TABLE chunks_meta (
|
|
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
id TEXT UNIQUE NOT NULL,
|
|
source TEXT,
|
|
vendor TEXT,
|
|
brand TEXT,
|
|
crop TEXT,
|
|
source_key TEXT,
|
|
ordinal INTEGER
|
|
);
|
|
CREATE INDEX idx_meta_source ON chunks_meta(source);
|
|
CREATE INDEX idx_meta_crop ON chunks_meta(crop);
|
|
CREATE INDEX idx_meta_brand ON chunks_meta(brand);
|
|
CREATE INDEX idx_meta_source_key ON chunks_meta(source_key);
|
|
|
|
CREATE VIRTUAL TABLE chunks_fts USING fts5(
|
|
text,
|
|
tokenize = 'porter unicode61 remove_diacritics 2'
|
|
);
|
|
"""
|