"""SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes. Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a single-tower dense embedding's weakness on rare technical tokens — for seed-mcp that's variety codes ("DKC62-08RIB"), trait codes ("XF", "VT2PRIB"), disease abbreviations ("SCN", "SDS", "Goss's"), and Rps gene names ("Rps1c", "Rps3a"). Dense embeddings don't bridge queries like "Rps3a soybean" cleanly into the relevant chunk; BM25 matches them directly. Fused with the dense ranking via RRF, the hybrid result is strictly better than either alone for the queries we expect from the farm-advisor agent. Why SQLite FTS5: - In the stdlib. Zero new deps. - On-disk. Same persistence model as Chroma — Docker COPY the dir, `rag.index --rebuild` regenerates from corpus. - Built-in `bm25()` ranking function. No knobs to tune that matter for our use case (k1=1.2, b=0.75 defaults are fine). - Builds <1k chunks in milliseconds; adds nothing to rebuild time. Schema is two tables to keep filtering clean. FTS5 doesn't filter nicely on its own columns; the content_rowid pattern keeps an external metadata table joinable by rowid. For seed-mcp the filterable columns are seed-domain facets — source, vendor, brand, crop, source_key — rather than the docs-template version/platform. """ from __future__ import annotations import logging import re import sqlite3 from pathlib import Path from typing import Any log = logging.getLogger(__name__) # Default location: bm25/_docs.db at the repo root, next to chroma/. ROOT = Path(__file__).resolve().parent.parent DEFAULT_DB_DIR = ROOT / "bm25" DEFAULT_DB_NAME = "crop_seed_docs.db" # Columns we expose as filterable metadata. Mirrors what # ``docs_mcp.server._build_where`` accepts so the same filter dict # works for both Chroma and BM25 without per-retriever translation. # data_type / year / state / region are trial-specific facets; variety # chunks leave them empty. FILTER_COLUMNS = ( "source", "vendor", "brand", "crop", "source_key", "data_type", "year", "state", "ordinal", ) # Allowlist tokenizer for free-text queries. FTS5's parser chokes on # lots of punctuation we routinely see in farmer queries ("Rps1c", # "SCN-resistant", "0.05 MG", em-dashes). Rather than blocklist every # operator, keep alphanumerics + a few separators and replace # everything else with a space. _KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]") # FTS5 reserves these Boolean operator KEYWORDS at the token level. _BOOLEAN_KW_RE = re.compile(r"(? str: """Reduce a natural-language query to an FTS5 OR-of-tokens query. See ``crop-chem-docs`` for the rationale; same transformation applies here. OR semantics maximizes recall — BM25 already weights documents with more query-term matches higher. """ cleaned = _KEEP_RE.sub(" ", text) cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned) tokens = cleaned.split() if not tokens: return "" return " OR ".join(tokens) def _where_to_sql(where: dict | None) -> tuple[str, list[Any]]: """Translate a Chroma-shaped filter dict into a SQL fragment + params. Accepts the same shapes ``docs_mcp.server._build_where`` produces: None → ("", []) {"crop": "corn"} → ("AND m.crop = ?", ["corn"]) {"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...]) Unknown keys are silently dropped (defensive — better to over-match than to crash on a filter we don't know). """ if not where: return "", [] parts: list[str] = [] params: list[Any] = [] def _emit_eq(cond: dict[str, Any]) -> None: for k, v in cond.items(): if k in FILTER_COLUMNS: parts.append(f"m.{k} = ?") params.append(v) if "$and" in where: for sub in where["$and"]: _emit_eq(sub) else: _emit_eq(where) if not parts: return "", [] return "AND " + " AND ".join(parts), params class BM25Index: """Thin wrapper around an FTS5-backed sqlite db. Single-writer model. Reads are connection-per-call (sqlite handles concurrency through file locks; for our read-heavy workload that's fine and avoids cross-thread connection sharing issues with the MCP server's request handlers). """ def __init__(self, db_path: Path | None = None): self.db_path = Path(db_path) if db_path else (DEFAULT_DB_DIR / DEFAULT_DB_NAME) # -- build ---------------------------------------------------------- def build(self, records: list[dict]) -> int: """Rebuild the index from scratch from `records`. `records` is the same list ``rag.index.variety_records`` produces: ``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk insert wrapped in a transaction. """ self.db_path.parent.mkdir(parents=True, exist_ok=True) if self.db_path.exists(): self.db_path.unlink() with sqlite3.connect(self.db_path) as con: con.executescript(self._schema_sql()) con.executemany( "INSERT INTO chunks_meta " "(id, source, vendor, brand, crop, source_key, " " data_type, year, state, ordinal) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", [ ( r["id"], r["metadata"].get("source") or "", r["metadata"].get("vendor") or "", r["metadata"].get("brand") or "", r["metadata"].get("crop") or "", r["metadata"].get("source_key") or "", r["metadata"].get("data_type") or "variety", int(r["metadata"]["year"]) if isinstance(r["metadata"].get("year"), int) else None, r["metadata"].get("state") or "", int(r["metadata"].get("ordinal") or 0), ) for r in records ], ) con.executemany( "INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)", [ (i + 1, r["text"]) for i, r in enumerate(records) ], ) con.commit() log.info("bm25: indexed %d chunks → %s", len(records), self.db_path) return len(records) # -- query ---------------------------------------------------------- def query( self, text: str, n: int = 200, where: dict | None = None, ) -> list[tuple[str, float]]: """Return up to `n` (chunk_id, bm25_score) pairs, lowest score first. FTS5's bm25() returns NEGATIVE numbers — more relevant docs have smaller (more negative) scores. We order ASC so the first row is the most relevant. """ sanitized = _sanitize_query(text) if not sanitized: return [] where_sql, params = _where_to_sql(where) sql = ( "SELECT m.id, bm25(chunks_fts) AS score " "FROM chunks_fts " "JOIN chunks_meta m ON m.rowid = chunks_fts.rowid " f"WHERE chunks_fts MATCH ? {where_sql} " "ORDER BY bm25(chunks_fts) " "LIMIT ?" ) try: with sqlite3.connect(self.db_path) as con: cur = con.execute(sql, [sanitized, *params, n]) return [(row[0], float(row[1])) for row in cur.fetchall()] except sqlite3.OperationalError as e: log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80]) return [] def exists(self) -> bool: return self.db_path.exists() def count(self) -> int: if not self.exists(): return 0 try: with sqlite3.connect(self.db_path) as con: return con.execute("SELECT COUNT(*) FROM chunks_meta").fetchone()[0] except sqlite3.OperationalError: return 0 # -- schema --------------------------------------------------------- @staticmethod def _schema_sql() -> str: return """ CREATE TABLE chunks_meta ( rowid INTEGER PRIMARY KEY AUTOINCREMENT, id TEXT UNIQUE NOT NULL, source TEXT, vendor TEXT, brand TEXT, crop TEXT, source_key TEXT, data_type TEXT, year INTEGER, state TEXT, ordinal INTEGER ); CREATE INDEX idx_meta_source ON chunks_meta(source); CREATE INDEX idx_meta_crop ON chunks_meta(crop); CREATE INDEX idx_meta_brand ON chunks_meta(brand); CREATE INDEX idx_meta_source_key ON chunks_meta(source_key); CREATE INDEX idx_meta_data_type ON chunks_meta(data_type); CREATE INDEX idx_meta_year ON chunks_meta(year); CREATE INDEX idx_meta_state ON chunks_meta(state); CREATE VIRTUAL TABLE chunks_fts USING fts5( text, tokenize = 'porter unicode61 remove_diacritics 2' ); """