Phase 2/3: chunker + indexer + MCP server tools (475 Bayer varieties searchable) #2

Merged
justin merged 1 commits from phase-2-3-retrieval into main 2026-05-25 13:14:58 -04:00
4 changed files with 982 additions and 369 deletions
+607 -132
View File
@@ -1,20 +1,20 @@
"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies. """seed-mcp — MCP server over public US row-crop seed catalogs.
This file is the template's structural anchor. The phases described in Tools (all read-only):
PLAN.md add or extend pieces of this file:
Phase 3 — search_docs, get_page, list_versions stubs (you are here) search_docs natural-language retrieval over the corpus
Phase 6 — reranker integration in search_docs get_page the full markdown body of one variety + sidecar
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse) list_versions facet discovery (crops / brands / vendors / sources)
Phase 9 — diff_versions, list_cluster, bundle_changelog lookup_variety canonical sidecar JSON by source_key — fact-check
Phase 10 — TimedCall wiring (already imported below) anything you surface from search_docs against this
Phase 11 — <product>_api_lessons tool
Phase 12 — find_doc_inconsistencies, submit_doc_bug
Phase 13 — weekly_digest + _digest_history reader
Every stub below has a docstring + `raise NotImplementedError`. Replace The contract with the calling agent is **never fabricate**. Every chunk
the body when you reach the corresponding phase. Keep the signatures we return is verbatim from the source's published catalog (the chunker
stable across products — clients depend on them. rebuilds chunks deterministically from the sidecar JSON). Every
response carries the variety's source URL so the agent can cite. The
``lookup_variety`` tool exists specifically so the agent can validate
specific rating values without having to trust paraphrased retrieval
text.
""" """
from __future__ import annotations from __future__ import annotations
@@ -23,7 +23,7 @@ import logging
import os import os
import re import re
from pathlib import Path from pathlib import Path
from typing import Annotated from typing import Annotated, Any
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import FastMCP
from pydantic import Field from pydantic import Field
@@ -33,7 +33,7 @@ from .usage import TimedCall
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Product-specific configuration. Set these for each new build. # Product-specific configuration.
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed") PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp") PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp")
@@ -44,59 +44,176 @@ ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus" CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma" CHROMA_DIR = ROOT / "chroma"
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db"))) BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
BUNDLES_JSON = ROOT / "bundles.json"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase). # Feature flags.
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50")) RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30")) RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on") HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "true").lower() in ("true", "1", "yes", "on")
RRF_K = int(os.environ.get("RRF_K", "60")) RRF_K = int(os.environ.get("RRF_K", "60"))
DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on")
DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint
DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15"))
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# FastMCP setup. # FastMCP setup.
#
# stateless_http=True — every request creates an ephemeral session and
# discards it on return. Critical for production: clients don't get
# 404 storms when the container is recreated by Watchtower.
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True) mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Lazy helpers — instantiate expensive things only when actually needed, # Lazy singletons. Instantiate on first use so the server can start even
# so the server still starts when (e.g.) Ollama is briefly unreachable. # when (e.g.) Ollama is briefly unreachable — degraded modes fall back
# gracefully rather than refusing to boot.
# ---------------------------------------------------------------------------
_chroma_client = None
_chroma_collection = None
_bm25 = None
_code_index: dict[str, list[tuple[str, str]]] | None = None
def _build_code_index() -> dict[str, list[tuple[str, str]]]:
"""Walk sidecars once and index varieties by every lookup key a
farmer or LLM might paste: ``source_key``, ``hybrid_prefix``,
``product_name`` (normalized), each token from
``hybrid_prefix``/``product_name`` longer than three chars.
Returns a dict mapping the normalized key → list of
``(source, source_key)`` tuples. Multiple-source matches are kept
so we can warn the agent about ambiguity.
"""
idx: dict[str, list[tuple[str, str]]] = {}
def _add(key: str, value: tuple[str, str]) -> None:
key = (key or "").lower().strip()
if not key or len(key) < 4:
return
idx.setdefault(key, [])
if value not in idx[key]:
idx[key].append(value)
if not CORPUS.exists():
return idx
for source_dir in CORPUS.iterdir():
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
for sc in source_dir.glob("*.json"):
try:
d = json.loads(sc.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
source = d.get("source") or source_dir.name
sk = d.get("source_key") or sc.stem
value = (source, sk)
_add(sk, value)
_add(d.get("hybrid_prefix") or "", value)
_add(d.get("product_name") or "", value)
# Token-split product_name and hybrid_prefix so "DKC62-08RIB"
# in a query matches "DKC62-08RIB BRAND BLEND" via the
# "DKC62-08RIB" token as well as the full string.
for source_field in (d.get("product_name"), d.get("hybrid_prefix")):
if not source_field:
continue
for tok in re.split(r"[\s()/,]+", source_field):
tok = tok.strip()
# Only retain tokens that LOOK like codes — at least
# one digit, mostly alphanumeric/dash. Skips "BRAND"
# / "BLEND" / etc.
if re.match(r"^[A-Za-z]*\d[\w\-]*$", tok):
_add(tok, value)
return idx
def _code_lookup() -> dict[str, list[tuple[str, str]]]:
global _code_index
if _code_index is None:
_code_index = _build_code_index()
log.info("code-index: %d lookup keys built", len(_code_index))
return _code_index
# Tokens that LOOK like a variety code — at least one digit, otherwise
# alphanumeric / dash. Catches "DKC62-08RIB", "AG29XF4", "WB1376CLP",
# "VT2PRIB"; skips ordinary words like "ratings".
_CODE_TOKEN_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9\-]{2,})\b")
def _exact_code_matches(query: str) -> list[tuple[str, str]]:
"""Find varieties whose source_key / product_name / token-split
components contain a query token. Used as a high-confidence pin
before fuzzy retrieval."""
idx = _code_lookup()
if not idx:
return []
out: list[tuple[str, str]] = []
seen: set[tuple[str, str]] = set()
for m in _CODE_TOKEN_RE.finditer(query or ""):
tok = m.group(1).lower()
if len(tok) < 4 or not any(c.isdigit() for c in tok):
continue
for v in idx.get(tok, []):
if v not in seen:
seen.add(v)
out.append(v)
return out
def _collection():
"""Return the Chroma collection, opening it lazily."""
global _chroma_client, _chroma_collection
if _chroma_collection is not None:
return _chroma_collection
import chromadb
from chromadb.config import Settings
from rag.embeddings import embedding_function
_chroma_client = chromadb.PersistentClient(
path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False),
)
_chroma_collection = _chroma_client.get_collection(
COLLECTION, embedding_function=embedding_function()
)
return _chroma_collection
def _bm25_index():
"""Return the BM25 index, or None if it doesn't exist on disk."""
global _bm25
if _bm25 is not None:
return _bm25
from rag.bm25 import BM25Index
idx = BM25Index(BM25_DB)
if not idx.exists():
return None
_bm25 = idx
return _bm25
# ---------------------------------------------------------------------------
# Helpers — local file IO + filter builder.
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _bundles() -> dict[str, dict]: def _build_where(
"""Cached load of bundles.json into a {slug: bundle_dict} mapping. crop: str | None,
brand: str | None,
bundles.json is the product-specific catalog written by the Phase 1 vendor: str | None,
scraper. See PLAN.md Phase 1 for the schema. source: str | None,
""" source_key: str | None,
if not BUNDLES_JSON.exists(): ) -> dict | None:
return {}
cat = json.loads(BUNDLES_JSON.read_text())
return {b["slug"]: b for b in cat}
def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
"""Translate filter args into a Chroma `where` clause.""" """Translate filter args into a Chroma `where` clause."""
conds: list[dict] = [] conds: list[dict] = []
if version: if crop:
conds.append({"version": version}) conds.append({"crop": crop.lower()})
if platform: if brand:
conds.append({"platform": platform}) conds.append({"brand": brand.upper()})
if bundle_id: if vendor:
conds.append({"bundle_id": bundle_id}) conds.append({"vendor": vendor})
if source:
conds.append({"source": source})
if source_key:
conds.append({"source_key": source_key})
if not conds: if not conds:
return None return None
if len(conds) == 1: if len(conds) == 1:
@@ -104,13 +221,152 @@ def _build_where(version: str | None, platform: str | None, bundle_id: str | Non
return {"$and": conds} return {"$and": conds}
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None: def _read_sidecar(source: str, source_key: str) -> dict | None:
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict).""" """Read a variety's sidecar JSON off disk."""
md_path = CORPUS / bundle_id / (page_id + ".md") path = CORPUS / source / f"{source_key}.json"
json_path = CORPUS / bundle_id / (page_id + ".json") if not path.exists():
if not md_path.exists() or not json_path.exists():
return None return None
return md_path.read_text(), json.loads(json_path.read_text()) try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
log.warning("sidecar read failed for %s/%s: %s", source, source_key, exc)
return None
def _read_markdown(source: str, source_key: str) -> str | None:
path = CORPUS / source / f"{source_key}.md"
if not path.exists():
return None
try:
return path.read_text(encoding="utf-8")
except OSError:
return None
def _format_hit(doc: str, meta: dict, distance: float | None = None) -> str:
"""Render one retrieval hit as a fenced markdown block with full
provenance attached. ``doc`` is the chunk text; ``meta`` is the
chunk's metadata dict."""
src_url = meta.get("source_url") or ""
src_key = meta.get("source_key") or ""
src = meta.get("source") or ""
vendor = meta.get("vendor") or ""
brand = meta.get("brand") or ""
crop = meta.get("crop") or ""
name = meta.get("product_name") or src_key
header = (
f"### {name} \n"
f"`{src}::{src_key}` — {vendor} / {brand} / {crop} \n"
f"<{src_url}>"
)
if distance is not None:
header += f" \n_(distance={distance:.4f})_"
return f"{header}\n\n{doc.strip()}\n"
def _rrf_fuse(rankings: list[list[str]], k: int = RRF_K) -> list[str]:
"""Reciprocal Rank Fusion — merge multiple ranked id lists into one.
Score(id) = sum over each ranking R of 1 / (k + rank_R(id)).
Robust to score-scale differences between dense (cosine) and lexical
(BM25). k=60 is the literature default; not particularly sensitive.
"""
scores: dict[str, float] = {}
for ranking in rankings:
for rank, doc_id in enumerate(ranking):
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)
return sorted(scores, key=lambda d: scores[d], reverse=True)
def _structured_ratings_block(sidecar: dict) -> str:
"""Render the sidecar's grouped characteristics + identity as a
fact-checkable block, with the source URL pinned at top.
This is the response body for ``get_page`` and the embedded
"canonical data" block agents should trust over any free-text
paraphrase."""
lines: list[str] = []
name = sidecar.get("product_name") or sidecar.get("source_key", "")
lines.append(f"# {name}")
lines.append("")
facets: list[str] = []
if sidecar.get("vendor"):
facets.append(f"**Vendor:** {sidecar['vendor']}")
if sidecar.get("brand"):
facets.append(f"**Brand:** {str(sidecar['brand']).title()}")
if sidecar.get("crop"):
facets.append(f"**Crop:** {str(sidecar['crop']).capitalize()}")
if sidecar.get("relative_maturity") and sidecar.get("crop") == "corn":
facets.append(f"**Relative maturity:** {sidecar['relative_maturity']}")
if sidecar.get("maturity_group") and sidecar.get("crop") == "soybeans":
facets.append(f"**Maturity group:** {sidecar['maturity_group']}")
if sidecar.get("relative_maturity") and sidecar.get("crop") == "wheat":
facets.append(f"**Maturity:** {sidecar['relative_maturity']}")
if sidecar.get("wheat_class"):
facets.append(f"**Wheat class:** {sidecar['wheat_class']}")
if sidecar.get("release_year"):
facets.append(f"**Released:** {sidecar['release_year']}")
if sidecar.get("trait_stack"):
facets.append(f"**Trait stack:** {', '.join(sidecar['trait_stack'])}")
if facets:
lines.append(" · ".join(facets))
lines.append("")
urls = sidecar.get("source_urls") or []
if urls:
lines.append(f"**Source:** {urls[0]}")
lines.append("")
scale = sidecar.get("_scale_direction") or "scale direction not declared by source"
lines.append(f"**Rating scale (as published):** {scale}")
lines.append("")
# Canonical ratings — one table per group, values verbatim.
for g in sidecar.get("characteristics_groups") or []:
label = g.get("label") or "Characteristics"
items = g.get("items") or []
if not items:
continue
lines.append(f"## {label.title()}")
lines.append("")
lines.append("| Characteristic | Value |")
lines.append("|---|---|")
for it in items:
ch = (it.get("characteristic") or "").strip()
v = (it.get("value") or "").strip()
lines.append(f"| {ch} | {v} |")
lines.append("")
if sidecar.get("positioning_statement"):
lines.append("## Vendor positioning")
lines.append("")
lines.append(sidecar["positioning_statement"].strip())
lines.append("")
if sidecar.get("strengths"):
lines.append("## Strengths / management notes (vendor copy)")
lines.append("")
for s in sidecar["strengths"]:
lines.append(f"- {s}")
lines.append("")
rec = sidecar.get("regional_recommendations") or []
if rec:
names = sorted({
(r.get("product_list_name") or "").strip()
for r in rec
if (r.get("product_list_name") or "").strip()
})
if names:
lines.append("## Listed in vendor regional seed guides")
lines.append("")
for n in names:
lines.append(f"- {n}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
# =========================================================================== # ===========================================================================
@@ -119,119 +375,340 @@ def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
@mcp.tool() @mcp.tool()
def search_docs( def search_docs(
query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")], query: Annotated[str, Field(description=(
version: Annotated[ "Natural-language query about row-crop seed varieties. "
"Mention crop (corn/soybeans/wheat), maturity (RM days / "
"MG number / wheat class), traits, disease tolerances, "
"or soil/region constraints — they all carry retrieval signal."
))],
crop: Annotated[
str | None, str | None,
Field(description="OPTIONAL version filter — restrict to one product version."), Field(description="OPTIONAL filter: corn, soybeans, or wheat."),
] = None, ] = None,
platform: Annotated[ brand: Annotated[
str | None, str | None,
Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."), Field(description=(
"OPTIONAL brand filter — DEKALB, ASGROW, WESTBRED, "
"GoldenHarvest, NK, AgriPro, Becks. Case-insensitive."
)),
] = None, ] = None,
bundle_id: Annotated[ vendor: Annotated[
str | None, str | None,
Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."), Field(description="OPTIONAL vendor filter — Bayer, Syngenta, Beck's."),
] = None,
source: Annotated[
str | None,
Field(description=(
"OPTIONAL source filter — e.g. 'bayer_seeds'. Use "
"list_versions() to see what's indexed."
)),
] = None, ] = None,
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10, k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
) -> str: ) -> str:
"""Search the {product} docs corpus. """Search the seed-variety corpus for hybrids/varieties matching a query.
Returns the top-k most relevant chunks (with full source page URLs) Returns the top-k variety chunks with their full source URLs, ratings,
given a natural-language query. Optional filters narrow the search maturity, traits, and regional listings. Optional filters narrow to one
to one version, one platform, or one bundle. Use list_versions() crop, brand, vendor, or scraper source. Use **list_versions()** first
first if you need to discover the available facet values. to discover valid facet values. Use **lookup_variety()** on any
candidate the user is serious about — that returns the canonical
sidecar so you can verify exact rating values without trusting the
free-text chunk.
Call this tool whenever the user asks anything that should be Call this tool whenever an ag professional or farmer asks anything
answerable from the official product documentation. about choosing a seed variety, comparing hybrids, or finding seed
matched to a maturity / soil / region / disease constraint.
NEVER answer seed-selection questions from prior knowledge alone —
seed catalogs change yearly and brand-specific. Always search first.
""" """
with TimedCall("search_docs", { with TimedCall("search_docs", {
"query": query, "version": version, "platform": platform, "query": query, "crop": crop, "brand": brand,
"bundle_id": bundle_id, "k": k, "vendor": vendor, "source": source, "k": k,
}) as _call: }) as _call:
# TODO Phase 2-3: query Chroma collection (see rag/index.py for where = _build_where(crop, brand, vendor, source, None)
# how it was built). Render the top-k chunks as markdown with pool_size = max(k * 3, RERANK_POOL)
# source URLs.
# TODO Phase 6: optional reranker via _rerank() if RERANK_URL set. # Exact-code pre-filter. Variety codes ("DKC62-08RIB", "AG29XF4")
# TODO Phase 8: hybrid retrieval if HYBRID_SEARCH=true — run # have NO semantic neighbors — dense retrieval misses them and
# dense + BM25 in parallel, RRF-fuse, hand merged pool to rerank. # RRF can let off-topic noise float to top-1. If the query
_call.set(hits_returned=0) # contains a token that exactly matches a variety's identifier,
raise NotImplementedError("Phase 2/3: implement Chroma query + rendering") # pin those varieties to the top of results.
pinned_ids: list[str] = []
for src, sk in _exact_code_matches(query):
pinned_ids.append(f"{src}::{sk}::0")
# Dense retrieval via Chroma.
try:
col = _collection()
except Exception as exc: # noqa: BLE001
_call.set(error_dense=str(exc), hits_returned=0)
return (
"_(retrieval unavailable — Chroma collection not found. "
"Has the indexer run? `python -m rag.index --rebuild`.)_"
)
try:
dense = col.query(
query_texts=[query],
n_results=pool_size,
where=where,
)
except Exception as exc: # noqa: BLE001
_call.set(error_dense=str(exc), hits_returned=0)
return f"_(retrieval failed: {exc})_"
dense_ids: list[str] = (dense.get("ids") or [[]])[0]
dense_docs: list[str] = (dense.get("documents") or [[]])[0]
dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0]
dense_dists: list[float] = (dense.get("distances") or [[]])[0]
id_to_doc = dict(zip(dense_ids, dense_docs))
id_to_meta = dict(zip(dense_ids, dense_metas))
id_to_dist = dict(zip(dense_ids, dense_dists))
# Hybrid: optionally fuse with BM25. Both are pulled at pool_size,
# fused via RRF, top-k returned.
used_hybrid = False
if HYBRID_SEARCH:
bm25 = _bm25_index()
if bm25 is not None:
bm25_hits = bm25.query(query, n=pool_size, where=where)
bm25_ids = [h[0] for h in bm25_hits]
if bm25_ids:
fused = _rrf_fuse([dense_ids, bm25_ids])
fuzzy_ids = fused
used_hybrid = True
else:
fuzzy_ids = dense_ids
else:
fuzzy_ids = dense_ids
else:
fuzzy_ids = dense_ids
# Pin exact-code matches at top, then fill remainder from fuzzy
# retrieval (deduped). Pinned matches are deterministic and
# high-confidence; they should never lose to a fuzzy match.
final_ids: list[str] = []
seen: set[str] = set()
for cid in pinned_ids + fuzzy_ids:
if cid in seen:
continue
seen.add(cid)
final_ids.append(cid)
if len(final_ids) >= k:
break
# For ids returned by BM25 but not by Chroma, we need their docs/
# metadata too. Re-fetch by id.
missing = [i for i in final_ids if i not in id_to_doc]
if missing:
try:
extra = col.get(ids=missing, include=["documents", "metadatas"])
for cid, doc, meta in zip(
extra.get("ids") or [],
extra.get("documents") or [],
extra.get("metadatas") or [],
):
id_to_doc[cid] = doc
id_to_meta[cid] = meta
except Exception as exc: # noqa: BLE001
log.warning("get-by-id for BM25-only hits failed: %s", exc)
_call.set(
hits_returned=len(final_ids),
hybrid=used_hybrid,
pool_size=pool_size,
)
if not final_ids:
return (
"_(no varieties matched. Try broadening the query or "
"removing filters — call list_versions() to see what's "
"indexed.)_"
)
blocks: list[str] = []
for cid in final_ids:
doc = id_to_doc.get(cid, "")
meta = id_to_meta.get(cid, {})
dist = id_to_dist.get(cid) if not used_hybrid else None
blocks.append(_format_hit(doc, meta, dist))
header = (
f"# Search results — {len(final_ids)} variety chunk"
f"{'s' if len(final_ids) != 1 else ''}"
f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n"
f"_Use `lookup_variety(source_key=...)` on any candidate "
f"to fact-check ratings from the canonical sidecar._\n\n---\n\n"
)
return header + "\n---\n\n".join(blocks)
@mcp.tool() @mcp.tool()
def get_page( def get_page(
bundle_id: Annotated[str, Field(description="Bundle slug.")], source: Annotated[str, Field(description=(
page_id: Annotated[str, Field(description="Page filename within the bundle.")], "Scraper source id — e.g. 'bayer_seeds'. Same as the `source` "
"field in search_docs results."
))],
source_key: Annotated[str, Field(description=(
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'. Same as "
"the `source_key` field in search_docs results."
))],
) -> str: ) -> str:
"""Return the full markdown for one page, plus a metadata header. """Return the full canonical record for one variety.
Use after search_docs surfaces a relevant page and the user (or you) Emits a structured ratings header (identity, all characteristic
want the complete text — not just the matched chunks. groups, vendor positioning, regional listings) sourced from the
variety's sidecar JSON, followed by the raw markdown body the
chunker indexed.
Use this when the user is comparing varieties closely or wants to
see every published rating value, not just the ones that matched a
query. The structured header is the canonical fact-check surface;
use it to validate any specific rating value before quoting it.
""" """
with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call: with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call:
data = _read_page(bundle_id, page_id) sidecar = _read_sidecar(source, source_key)
if data is None: if sidecar is None:
_call.set(found=False) _call.set(found=False)
return f"Page not found: {bundle_id}/{page_id}" return (
md, meta = data f"_(variety not found: source='{source}' "
_call.set(found=True, page_chars=len(md)) f"source_key='{source_key}'. Use list_versions() to see "
# TODO: add a metadata header (title, version, source URL) above f"available sources or search_docs() to find candidates.)_"
# the body. Product-specific shape. )
return md body = _read_markdown(source, source_key) or ""
structured = _structured_ratings_block(sidecar)
_call.set(found=True, body_chars=len(body), groups=len(sidecar.get("characteristics_groups") or []))
sep = "\n\n---\n\n## Indexed body (chunk source text)\n\n"
return structured + sep + body
@mcp.tool() @mcp.tool()
def list_versions() -> str: def list_versions() -> str:
"""List the available version/platform facets across all bundles. """List the available scraper sources and the crop/brand/vendor
facets across all indexed varieties.
Use this to discover valid filter values for search_docs. Use this BEFORE search_docs() the first time you query, to discover
which scraper sources, brands, vendors, and crops are actually in
the corpus right now. The agent should not assume — the corpus
grows as more vendors are scraped.
""" """
with TimedCall("list_versions", {}) as _call: with TimedCall("list_versions", {}) as _call:
cat = _bundles() facets: dict[str, dict[str, int]] = {
if not cat: "source": {}, "vendor": {}, "brand": {}, "crop": {},
return "_(no bundles indexed yet — run the scraper + indexer)_" }
versions = sorted({b.get("version") for b in cat.values() if b.get("version")}) n_varieties = 0
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")}) if CORPUS.exists():
_call.set(versions=len(versions), platforms=len(platforms)) for source_dir in sorted(CORPUS.iterdir()):
lines = [f"# Facets across {len(cat)} bundle(s)", ""] if not source_dir.is_dir() or source_dir.name.startswith("."):
if versions: continue
lines.append("## Versions"); lines.append("") for sc in source_dir.glob("*.json"):
for v in versions: lines.append(f"- `{v}`") try:
d = json.loads(sc.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
n_varieties += 1
for k in facets:
v = d.get(k)
if v:
facets[k][v] = facets[k].get(v, 0) + 1
if n_varieties == 0:
return (
"_(no varieties indexed yet — run scrapers + indexer "
"before calling search_docs)_"
)
_call.set(
varieties=n_varieties,
sources=len(facets["source"]),
vendors=len(facets["vendor"]),
brands=len(facets["brand"]),
crops=len(facets["crop"]),
)
lines = [f"# Corpus facets ({n_varieties} varieties indexed)", ""]
for label, counts in facets.items():
if not counts:
continue
lines.append(f"## {label}")
lines.append("") lines.append("")
if platforms: for k, n in sorted(counts.items(), key=lambda kv: -kv[1]):
lines.append("## Platforms"); lines.append("") lines.append(f"- `{k}` — {n} varieties")
for p in platforms: lines.append(f"- `{p}`") lines.append("")
return "\n".join(lines) return "\n".join(lines).rstrip()
# --------------------------------------------------------------------------- @mcp.tool()
# Stubs for later phases — keep the signatures in this file so refactors def lookup_variety(
# don't lose the contracts. Implementations come per phase. source_key: Annotated[str, Field(description=(
# --------------------------------------------------------------------------- "Per-variety stable key — e.g. 'dekalb-dkc62-08rib'."
))],
source: Annotated[
str | None,
Field(description=(
"OPTIONAL scraper source ('bayer_seeds' etc). If omitted, "
"scans all sources for the source_key."
)),
] = None,
) -> str:
"""Return the canonical sidecar JSON for one variety, verbatim.
# @mcp.tool() # Phase 9 USE THIS to fact-check any rating value before quoting it to the
# def list_cluster(bundle_id: str, page_id: str) -> str: ... farmer. The output is the exact data the scraper captured from the
vendor's published catalog — no paraphrasing, no inference.
# @mcp.tool() # Phase 9 Call this whenever the user (or you) needs an exact value for a
# def diff_versions(bundle_id: str, page_id: str, against_bundle_id: str, context: int = 3) -> str: ... specific rating, trait, or maturity — not just a semantic match
from search_docs.
"""
with TimedCall("lookup_variety", {"source_key": source_key, "source": source}) as _call:
if source:
sidecar = _read_sidecar(source, source_key)
if sidecar is not None:
_call.set(found=True, source=source)
return "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```"
_call.set(found=False, source=source)
return f"_(variety '{source_key}' not found under source '{source}')_"
# @mcp.tool() # Phase 9 # No source given — scan all source dirs for a match.
# def bundle_changelog(bundle_id_new: str, bundle_id_old: str, min_churn: int = 5, max_changed: int = 50) -> str: ... if not CORPUS.exists():
_call.set(found=False)
return "_(corpus is empty — no scrapers have run yet)_"
matches: list[tuple[str, dict]] = []
for source_dir in sorted(CORPUS.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
sidecar = _read_sidecar(source_dir.name, source_key)
if sidecar is not None:
matches.append((source_dir.name, sidecar))
# @mcp.tool() # Phase 13 if not matches:
# def weekly_digest(days: int = 7, version: str | None = None, platform: str | None = None, ...) -> str: ... _call.set(found=False)
return (
f"_(no variety with source_key '{source_key}' found in any "
f"source. Use search_docs() to find the right key.)_"
)
# @mcp.tool() # Phase 9 (or 3 — useful early) _call.set(found=True, matches=len(matches))
# def corpus_status() -> str: ... if len(matches) == 1:
src_name, sidecar = matches[0]
return (
f"_(matched in source `{src_name}`)_\n\n"
"```json\n"
+ json.dumps(sidecar, indent=2, ensure_ascii=False)
+ "\n```"
)
# @mcp.tool() # Phase 11 # Multi-match: serialize each labeled.
# def myproduct_api_lessons(topic: str | None = None) -> str: ... out: list[str] = [f"_(matched {len(matches)} sources)_\n"]
for src_name, sidecar in matches:
# @mcp.tool() # Phase 12 out.append(f"### source: `{src_name}`")
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ... out.append("```json")
out.append(json.dumps(sidecar, indent=2, ensure_ascii=False))
# @mcp.tool() # Phase 12 out.append("```")
# def submit_doc_bug(page_url: str, content: str, email: str | None = None, ...) -> str: ... return "\n".join(out)
# =========================================================================== # ===========================================================================
@@ -252,8 +729,6 @@ def main() -> None:
else: else:
mcp.settings.host = args.host mcp.settings.host = args.host
mcp.settings.port = args.port mcp.settings.port = args.port
# DNS-rebinding protection defaults to localhost-only — disable for
# container-network DNS hostnames. See PLAN.md "Hosting" notes.
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}: if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
mcp.settings.transport_security.enable_dns_rebinding_protection = False mcp.settings.transport_security.enable_dns_rebinding_protection = False
mcp.run(transport=args.transport) mcp.run(transport=args.transport)
+52 -99
View File
@@ -1,17 +1,14 @@
"""SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes. """SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes.
Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a
limit of single-tower dense embeddings: when a query has specific single-tower dense embedding's weakness on rare technical tokens —
technical terms (filenames, language names, error codes, API paths), for seed-mcp that's variety codes ("DKC62-08RIB"), trait codes
the dense embedding doesn't bridge from the query into a short ("XF", "VT2PRIB"), disease abbreviations ("SCN", "SDS", "Goss's"),
code-focused chunk. The chunk loses to the much larger crowd of and Rps gene names ("Rps1c", "Rps3a"). Dense embeddings don't bridge
prose chunks that semantically match the query topic. queries like "Rps3a soybean" cleanly into the relevant chunk; BM25
matches them directly. Fused with the dense ranking via RRF, the
BM25 handles this directly. Lexical overlap on rare terms ("python", hybrid result is strictly better than either alone for the queries
"create_vpg.py", "PROTECTED_SITE_ID", "applyUpgrade") scores those we expect from the farm-advisor agent.
chunks high. Fused with the dense ranking via RRF, the hybrid result
is strictly better than either alone for the queries we've seen
fail.
Why SQLite FTS5: Why SQLite FTS5:
- In the stdlib. Zero new deps. - In the stdlib. Zero new deps.
@@ -19,36 +16,13 @@ Why SQLite FTS5:
`rag.index --rebuild` regenerates from corpus. `rag.index --rebuild` regenerates from corpus.
- Built-in `bm25()` ranking function. No knobs to tune that matter - Built-in `bm25()` ranking function. No knobs to tune that matter
for our use case (k1=1.2, b=0.75 defaults are fine). for our use case (k1=1.2, b=0.75 defaults are fine).
- Builds 70k+ chunks in seconds. Faster than the Chroma rebuild's - Builds <1k chunks in milliseconds; adds nothing to rebuild time.
embedding step by 100×, so it adds basically nothing to the
full-rebuild cycle.
Schema is two tables to keep filtering clean. FTS5 doesn't filter Schema is two tables to keep filtering clean. FTS5 doesn't filter
nicely on its own columns; the content_rowid pattern keeps an nicely on its own columns; the content_rowid pattern keeps an
external metadata table joinable by rowid: external metadata table joinable by rowid. For seed-mcp the
filterable columns are seed-domain facets — source, vendor, brand,
CREATE TABLE chunks_meta ( crop, source_key — rather than the docs-template version/platform.
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT UNIQUE,
bundle_id TEXT, page_id TEXT, version TEXT,
platform TEXT, product TEXT, ordinal INTEGER
);
CREATE VIRTUAL TABLE chunks_fts USING fts5(
text,
tokenize = 'porter unicode61 remove_diacritics 2',
content = 'chunks_meta',
content_rowid = 'rowid'
);
Queries:
SELECT m.id, bm25(chunks_fts) AS score
FROM chunks_meta m
JOIN chunks_fts f ON m.rowid = f.rowid
WHERE f MATCH ?
AND m.version = ? -- optional metadata filter
ORDER BY bm25(chunks_fts) -- lower = better in FTS5
LIMIT ?;
""" """
from __future__ import annotations from __future__ import annotations
@@ -63,42 +37,30 @@ log = logging.getLogger(__name__)
# Default location: bm25/<product>_docs.db at the repo root, next to chroma/. # Default location: bm25/<product>_docs.db at the repo root, next to chroma/.
ROOT = Path(__file__).resolve().parent.parent ROOT = Path(__file__).resolve().parent.parent
DEFAULT_DB_DIR = ROOT / "bm25" DEFAULT_DB_DIR = ROOT / "bm25"
DEFAULT_DB_NAME = "<product>_docs.db" DEFAULT_DB_NAME = "crop_seed_docs.db"
# Columns we expose as filterable metadata. Mirrors what _build_where in # Columns we expose as filterable metadata. Mirrors what
# docs_mcp/server.py accepts so the same filter dicts work for both # ``docs_mcp.server._build_where`` accepts so the same filter dict
# Chroma and BM25 without per-retriever translation in the caller. # works for both Chroma and BM25 without per-retriever translation.
FILTER_COLUMNS = ("bundle_id", "page_id", "version", "platform", "product", "ordinal") FILTER_COLUMNS = ("source", "vendor", "brand", "crop", "source_key", "ordinal")
# Allowlist tokenizer for free-text queries. FTS5's parser chokes on lots # Allowlist tokenizer for free-text queries. FTS5's parser chokes on
# of punctuation we routinely see in user queries (".10.9", "?", "VPG's", # lots of punctuation we routinely see in farmer queries ("Rps1c",
# em-dash, etc.). Rather than blocklist every operator, just keep # "SCN-resistant", "0.05 MG", em-dashes). Rather than blocklist every
# alphanumerics + a few separators and replace everything else with a # operator, keep alphanumerics + a few separators and replace
# space. This loses the ability to phrase-search ("exact match") but we # everything else with a space.
# don't expose that to users anyway — they ask natural-language questions
# and want the answer, not a Boolean DSL.
_KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]") _KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]")
# FTS5 reserves these Boolean operator KEYWORDS at the token level # FTS5 reserves these Boolean operator KEYWORDS at the token level.
# stripping them avoids accidental phrase-query behavior when a user
# query happens to contain bare "AND", "OR", "NOT", "NEAR".
_BOOLEAN_KW_RE = re.compile(r"(?<!\w)(AND|OR|NOT|NEAR)(?!\w)") _BOOLEAN_KW_RE = re.compile(r"(?<!\w)(AND|OR|NOT|NEAR)(?!\w)")
def _sanitize_query(text: str) -> str: def _sanitize_query(text: str) -> str:
"""Reduce a natural-language query to an FTS5 OR-of-tokens query. """Reduce a natural-language query to an FTS5 OR-of-tokens query.
Two transformations: See ``crop-chem-docs`` for the rationale; same transformation
applies here. OR semantics maximizes recall — BM25 already
1. Non-alphanumeric → space (drops punctuation; "10.9?" becomes weights documents with more query-term matches higher.
"10 9"). Lets us handle versions, parens, question marks, etc.
without inviting FTS5 parse errors.
2. Boolean keywords stripped (FTS5 reserves AND/OR/NOT/NEAR).
3. Tokens explicitly OR'd. FTS5's default is AND-of-tokens — for
any non-trivial natural-language query that means zero hits
(no chunk contains every word). OR semantics is what we want:
BM25 already weights documents containing more query terms
higher, so we don't lose precision, but we DO gain recall.
""" """
cleaned = _KEEP_RE.sub(" ", text) cleaned = _KEEP_RE.sub(" ", text)
cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned) cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned)
@@ -113,9 +75,9 @@ def _where_to_sql(where: dict | None) -> tuple[str, list[Any]]:
Accepts the same shapes ``docs_mcp.server._build_where`` produces: Accepts the same shapes ``docs_mcp.server._build_where`` produces:
None → ("", []) None → ("", [])
{"version": "10.9"} → ("AND m.version = ?", ["10.9"]) {"crop": "corn"} → ("AND m.crop = ?", ["corn"])
{"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...]) {"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...])
Unknown keys are silently dropped (defensive — better to over-match Unknown keys are silently dropped (defensive — better to over-match
than to crash on a filter we don't know). than to crash on a filter we don't know).
@@ -158,35 +120,32 @@ class BM25Index:
def build(self, records: list[dict]) -> int: def build(self, records: list[dict]) -> int:
"""Rebuild the index from scratch from `records`. """Rebuild the index from scratch from `records`.
`records` is the same list ``rag.index.page_records`` produces: `records` is the same list ``rag.index.variety_records`` produces:
``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk ``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk
insert wrapped in a transaction — single-digit seconds for the insert wrapped in a transaction.
full 73k-chunk corpus.
""" """
self.db_path.parent.mkdir(parents=True, exist_ok=True) self.db_path.parent.mkdir(parents=True, exist_ok=True)
# Drop and recreate. Idempotent rebuild.
if self.db_path.exists(): if self.db_path.exists():
self.db_path.unlink() self.db_path.unlink()
with sqlite3.connect(self.db_path) as con: with sqlite3.connect(self.db_path) as con:
con.executescript(self._schema_sql()) con.executescript(self._schema_sql())
con.executemany( con.executemany(
"INSERT INTO chunks_meta (id, bundle_id, page_id, version, " "INSERT INTO chunks_meta "
"platform, product, ordinal) VALUES (?, ?, ?, ?, ?, ?, ?)", "(id, source, vendor, brand, crop, source_key, ordinal) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
[ [
( (
r["id"], r["id"],
r["metadata"].get("bundle_id") or "", r["metadata"].get("source") or "",
r["metadata"].get("page_id") or "", r["metadata"].get("vendor") or "",
r["metadata"].get("version") or "", r["metadata"].get("brand") or "",
r["metadata"].get("platform") or "", r["metadata"].get("crop") or "",
r["metadata"].get("product") or "", r["metadata"].get("source_key") or "",
int(r["metadata"].get("ordinal") or 0), int(r["metadata"].get("ordinal") or 0),
) )
for r in records for r in records
], ],
) )
# Populate the FTS5 contentless-ish table by rowid. We populated
# chunks_meta first; rowids align with insertion order.
con.executemany( con.executemany(
"INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)", "INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)",
[ [
@@ -210,15 +169,12 @@ class BM25Index:
FTS5's bm25() returns NEGATIVE numbers — more relevant docs have FTS5's bm25() returns NEGATIVE numbers — more relevant docs have
smaller (more negative) scores. We order ASC so the first row is smaller (more negative) scores. We order ASC so the first row is
the most relevant. Callers that need a "rank" should enumerate the most relevant.
the returned list.
""" """
sanitized = _sanitize_query(text) sanitized = _sanitize_query(text)
if not sanitized: if not sanitized:
return [] return []
where_sql, params = _where_to_sql(where) where_sql, params = _where_to_sql(where)
# FTS5 MATCH wants the unaliased table name on its left, so we use
# chunks_fts (no alias) and JOIN by rowid against chunks_meta.
sql = ( sql = (
"SELECT m.id, bm25(chunks_fts) AS score " "SELECT m.id, bm25(chunks_fts) AS score "
"FROM chunks_fts " "FROM chunks_fts "
@@ -232,17 +188,13 @@ class BM25Index:
cur = con.execute(sql, [sanitized, *params, n]) cur = con.execute(sql, [sanitized, *params, n])
return [(row[0], float(row[1])) for row in cur.fetchall()] return [(row[0], float(row[1])) for row in cur.fetchall()]
except sqlite3.OperationalError as e: except sqlite3.OperationalError as e:
# FTS5 syntax error (rare after sanitization) or db missing.
# Caller decides whether to fall back to dense-only.
log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80]) log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80])
return [] return []
def exists(self) -> bool: def exists(self) -> bool:
"""Cheap probe — does the index file exist on disk?"""
return self.db_path.exists() return self.db_path.exists()
def count(self) -> int: def count(self) -> int:
"""Number of chunks indexed. 0 if the db is missing or empty."""
if not self.exists(): if not self.exists():
return 0 return 0
try: try:
@@ -257,18 +209,19 @@ class BM25Index:
def _schema_sql() -> str: def _schema_sql() -> str:
return """ return """
CREATE TABLE chunks_meta ( CREATE TABLE chunks_meta (
rowid INTEGER PRIMARY KEY AUTOINCREMENT, rowid INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT UNIQUE NOT NULL, id TEXT UNIQUE NOT NULL,
bundle_id TEXT, source TEXT,
page_id TEXT, vendor TEXT,
version TEXT, brand TEXT,
platform TEXT, crop TEXT,
product TEXT, source_key TEXT,
ordinal INTEGER ordinal INTEGER
); );
CREATE INDEX idx_meta_version ON chunks_meta(version); CREATE INDEX idx_meta_source ON chunks_meta(source);
CREATE INDEX idx_meta_platform ON chunks_meta(platform); CREATE INDEX idx_meta_crop ON chunks_meta(crop);
CREATE INDEX idx_meta_bundle ON chunks_meta(bundle_id); CREATE INDEX idx_meta_brand ON chunks_meta(brand);
CREATE INDEX idx_meta_source_key ON chunks_meta(source_key);
CREATE VIRTUAL TABLE chunks_fts USING fts5( CREATE VIRTUAL TABLE chunks_fts USING fts5(
text, text,
+298 -100
View File
@@ -1,126 +1,324 @@
"""Markdown chunker — paragraph-aware, ~400-600 token target. """Chunker for seed-variety corpus.
Adjust the chunking strategy per product if your page format differs Each variety becomes ONE chunk by default. Variety pages are small
significantly from prose. The output shape (id, text, metadata) is (typically 2-3 KB of useful signal) and nomic-embed-text handles up
fixed by the downstream Chroma + BM25 indexing in rag/index.py — don't to ~8 K tokens cleanly. Splitting a variety across chunks dilutes
change that. the named-rating embeddings (e.g. "SCN resistance 7") that farmers
search by — keep them together.
The key knob you'll tune per product is chunk-0. Dense retrieval lands The chunk text is a synthetic preamble assembled deterministically
on chunk 0 first for most queries. Make it a synthetic chunk built from the sidecar JSON. Every value in the chunk text comes verbatim
from: from the source. The framing words ("Disease ratings (1-9, 9=best):",
"Maturity group:", etc.) are template glue — *we add structure, we
do NOT add facts*. Given the same sidecar, this chunker always
produces the same chunk text. That's the anti-hallucination
contract: the retriever can never surface a rating value that
wasn't in the source.
- the page title (as natural-language H1) Metadata is flattened to Chroma-safe primitives (str/int/float/bool):
- a 1-sentence task description (you'll have to generate this — for
pages that already have a "## Overview" or "## Introduction" the
first sentence usually works)
- a keyword bag of important terms (filenames, API names, error
codes — the rare technical tokens that BM25 lights up on)
Without a rich chunk 0, dense retrieval gets dominated by the much source "bayer_seeds"
larger prose body, and short pages (script examples, reference cards) source_key "dekalb-dkc075-70rib"
get buried. vendor "Bayer"
brand "DEKALB"
crop "corn" | "soybeans" | "wheat"
product_name "DKC075-70RIB BRAND BLEND"
product_id canonical full id
source_url the variety's page URL
rm corn RM as int when parseable (else absent)
mg soy MG as float when parseable (else absent)
release_year int when known
trait_codes_csv comma-separated trait codes for substring search
rating_scale "1-9 (9 = best)" — chunker should ALWAYS attach
this so downstream code can detect a flip
ordinal chunk index within variety (0-based)
Lists like ``regional_recommendations`` and the full per-rating dicts
do NOT fit Chroma's metadata constraints — they stay in the sidecar
JSON, surfaced by ``get_page`` / ``lookup_variety``.
""" """
from __future__ import annotations from __future__ import annotations
import json
import re import re
from pathlib import Path
from typing import Iterator from typing import Iterator
# Approximate token estimate from char count. Tunable — set per # Rating-group classification. The source publishes characteristics
# embedder if the default 4 chars/token is wrong. # grouped by label; we map those labels to one of three buckets in
CHARS_PER_TOKEN = 4 # the chunk preamble so retrieval gets coherent text. Group labels not
TARGET_TOKENS = 500 # listed here fall into "other" and are still emitted, just in their
TARGET_CHARS = TARGET_TOKENS * CHARS_PER_TOKEN # own section.
DISEASE_GROUP_LABELS = {
"DISEASE RATINGS",
"PEST AND DISEASE RESISTANCE",
}
AGRONOMIC_GROUP_LABELS = {
"GROWTH",
"HARVEST",
"PRODUCTION",
"KEY CHARACTERISTICS",
"QUALITY",
}
MANAGEMENT_GROUP_LABELS = {
"MANAGEMENT",
"HERBICIDE",
"SENSITIVITY",
"PLANT DESCRIPTION",
}
def estimate_tokens(text: str) -> int: def _parse_rm(value: object) -> int | None:
return max(1, len(text) // CHARS_PER_TOKEN) """Best-effort RM-days int. Returns None if not a clean integer
(e.g. wheat's qualitative 'Early'/'Medium-Early' values)."""
if value is None:
return None
s = str(value).strip()
if not s:
return None
try:
# Handle floats stored as strings ("105.0") and the trailing
# tenths sometimes seen on early corn ("75").
return int(float(s))
except ValueError:
return None
def split_paragraphs(md: str) -> list[str]: def _parse_mg(value: object) -> float | None:
"""Split markdown into paragraph-ish blocks. """Best-effort MG float. Soy MGs go from 00 to 9.0 with one decimal."""
if value is None:
return None
s = str(value).strip()
if not s:
return None
try:
return float(s)
except ValueError:
return None
Keeps fenced code blocks together (don't slice through ```).
Headings start new paragraphs. def _format_items(items: list[dict]) -> str:
"""Render `[{characteristic, value}, ...]` to a compact inline list."""
out: list[str] = []
for it in items:
ch = (it.get("characteristic") or "").strip()
v = (it.get("value") or "").strip()
if ch and v:
out.append(f"{ch} {v}")
elif ch:
out.append(f"{ch}")
return ", ".join(out)
def _render_variety_chunk(sidecar: dict) -> str:
"""Build the dense preamble for one variety from its sidecar JSON.
Faithful to source: every numeric/categorical *value* is verbatim
from ``sidecar``. The only generated text is the framing language.
""" """
blocks: list[str] = [] lines: list[str] = []
current: list[str] = []
in_fence = False # ---- Identity line --------------------------------------------------
for line in md.splitlines(keepends=True): name = sidecar.get("product_name") or sidecar.get("source_key") or ""
stripped = line.strip() brand = (sidecar.get("brand") or "").strip()
if stripped.startswith("```"): vendor = sidecar.get("vendor") or ""
in_fence = not in_fence crop = (sidecar.get("crop") or "").strip()
current.append(line) crop_label = crop.capitalize() if crop else ""
ident = f"# {name}"
sub = " ".join(filter(None, [
f"({brand.title()} {crop_label} variety, {vendor})" if brand and crop_label and vendor else "",
]))
lines.append(ident)
if sub:
lines.append("")
lines.append(sub)
# ---- Identity body --------------------------------------------------
facts: list[str] = []
rm = sidecar.get("relative_maturity")
mg = sidecar.get("maturity_group")
wc = sidecar.get("wheat_class")
if crop == "corn" and rm:
facts.append(f"Relative maturity {rm}")
elif crop == "soybeans" and mg:
facts.append(f"Maturity group {mg}")
elif crop == "wheat":
if rm:
facts.append(f"Maturity {rm}")
if wc:
facts.append(f"Wheat class {wc}")
traits = sidecar.get("trait_stack") or []
trait_descs = sidecar.get("trait_descriptions") or []
if traits:
if trait_descs:
facts.append(
"Trait stack: "
+ ", ".join(traits)
+ " ("
+ "; ".join(trait_descs)
+ ")"
)
else:
facts.append("Trait stack: " + ", ".join(traits))
if sidecar.get("release_year"):
facts.append(f"Released {sidecar['release_year']}")
if facts:
lines.append("")
lines.append(". ".join(facts) + ".")
# ---- Positioning ----------------------------------------------------
pos = (sidecar.get("positioning_statement") or "").strip()
if pos:
lines.append("")
lines.append(f"Positioning: {pos}")
# ---- Ratings, bucketed for retrieval --------------------------------
scale = sidecar.get("_scale_direction") or "(scale direction not declared)"
groups = sidecar.get("characteristics_groups") or []
disease: list[dict] = []
agronomic: list[dict] = []
management: list[dict] = []
other: list[tuple[str, list[dict]]] = []
for g in groups:
label = (g.get("label") or "").upper().strip()
items = g.get("items") or []
if not items:
continue continue
if in_fence: if label in DISEASE_GROUP_LABELS:
current.append(line) disease.extend(items)
continue elif label in AGRONOMIC_GROUP_LABELS:
if stripped.startswith("#"): agronomic.extend(items)
if current: elif label in MANAGEMENT_GROUP_LABELS:
blocks.append("".join(current).strip()) management.extend(items)
current = [] else:
current.append(line) other.append((g.get("label") or "Other", items))
continue
if not stripped and current and not "".join(current).strip().endswith("\n\n"): if disease:
current.append(line) lines.append("")
blocks.append("".join(current).strip()) lines.append(f"Disease ratings ({scale}): {_format_items(disease)}.")
current = [] if agronomic:
continue lines.append("")
current.append(line) lines.append(f"Agronomic ratings ({scale}): {_format_items(agronomic)}.")
if current: if management:
blocks.append("".join(current).strip()) lines.append("")
return [b for b in blocks if b] lines.append(f"Management notes: {_format_items(management)}.")
for label, items in other:
lines.append("")
lines.append(f"{label.title()}: {_format_items(items)}.")
# ---- Strengths narrative --------------------------------------------
strengths = sidecar.get("strengths") or []
if strengths:
lines.append("")
lines.append("Strengths and management notes:")
for s in strengths:
s = (s or "").strip()
if s:
lines.append(f"- {s}")
# ---- Regional listings (compact, not the agronomist emails) ---------
rec = sidecar.get("regional_recommendations") or []
if rec:
names = sorted({
(r.get("product_list_name") or "").strip()
for r in rec
if (r.get("product_list_name") or "").strip()
})
if names:
lines.append("")
lines.append("Listed in regional seed guides: " + "; ".join(names) + ".")
# ---- Provenance footer (must always be in the chunk text so it
# can never be lost between retrieval and LLM rendering) --------
urls = sidecar.get("source_urls") or []
if urls:
lines.append("")
lines.append(f"Source: {urls[0]}")
return "\n".join(lines).strip() + "\n"
def chunks_from_page( def _flat_metadata(sidecar: dict) -> dict:
text: str, """Distil sidecar into Chroma-safe metadata (primitives only)."""
page_id: str, md: dict = {
metadata: dict, "source": sidecar.get("source") or "",
"source_key": sidecar.get("source_key") or "",
"vendor": sidecar.get("vendor") or "",
"brand": sidecar.get("brand") or "",
"crop": sidecar.get("crop") or "",
"product_name": sidecar.get("product_name") or "",
"product_id": sidecar.get("product_id") or "",
"source_url": (sidecar.get("source_urls") or [""])[0],
"rating_scale": sidecar.get("_scale_direction") or "",
}
rm = _parse_rm(sidecar.get("relative_maturity"))
mg = _parse_mg(sidecar.get("maturity_group"))
if rm is not None:
md["rm"] = rm
if mg is not None:
md["mg"] = mg
ry = sidecar.get("release_year")
if isinstance(ry, int):
md["release_year"] = ry
traits = sidecar.get("trait_stack") or []
if traits:
# Comma-delimited for partial-match / human eyeballing.
# Bracket-padded so `LIKE '%,XF,%'` finds whole tokens.
md["trait_codes_csv"] = "," + ",".join(traits) + ","
if sidecar.get("wheat_class"):
md["wheat_class"] = sidecar["wheat_class"]
return md
def chunks_from_variety(
sidecar_path: Path | str,
*,
md_path: Path | str | None = None,
) -> Iterator[dict]: ) -> Iterator[dict]:
"""Yield chunk dicts ready for index.py to upsert. """Yield chunk dict(s) for one variety. Currently emits exactly one.
The synthetic chunk 0 is the per-product customization point. The Args:
default below is a simple title + body-first-paragraph; rewrite sidecar_path: path to the variety's JSON sidecar.
for richer retrieval signal (see module docstring). md_path: ignored (the chunker rebuilds from sidecar); kept
in the signature in case a future split-chunker
wants the rendered body.
""" """
paragraphs = split_paragraphs(text) sidecar = json.loads(Path(sidecar_path).read_text(encoding="utf-8"))
if not paragraphs: text = _render_variety_chunk(sidecar)
return meta = _flat_metadata(sidecar)
chunk_id = f"{meta['source']}::{meta['source_key']}::0"
# ----- Chunk 0: synthetic anchor for dense retrieval ---------
title = metadata.get("title") or page_id
first_para = next((p for p in paragraphs if not p.startswith("#")), "")
chunk0_body = (
f"# {title}\n\n"
f"{first_para[:300]}"
# TODO per product: append a keyword bag here (filenames,
# API names, error codes) for BM25 + dense joint coverage.
)
yield { yield {
"id": f"{metadata['bundle_id']}::{page_id}::0", "id": chunk_id,
"text": chunk0_body, "text": text,
"metadata": {**metadata, "ordinal": 0}, "metadata": {**meta, "ordinal": 0},
} }
# ----- Body chunks: pack paragraphs up to TARGET_CHARS -------
ordinal = 1 # ----- Backwards-compat shim for the template's index.py -------------------
buf: list[str] = [] #
buf_chars = 0 # The template's ``rag.index.page_records`` calls
for p in paragraphs: # ``chunks_from_page(md, page_id, base_meta)`` which doesn't know about
if buf_chars + len(p) > TARGET_CHARS and buf: # sidecar JSON. We accept that signature but ignore it — index.py has
yield { # been updated to use ``chunks_from_variety`` directly, and this shim
"id": f"{metadata['bundle_id']}::{page_id}::{ordinal}", # is here only so a stray import of the old name doesn't break.
"text": "\n\n".join(buf), #
"metadata": {**metadata, "ordinal": ordinal}, def chunks_from_page(text: str, page_id: str, metadata: dict) -> Iterator[dict]:
} """Deprecated for seed-mcp; prefer ``chunks_from_variety``."""
ordinal += 1 # Best-effort: if metadata carries a sidecar_path, dispatch.
buf = [] sidecar_path = metadata.get("_sidecar_path")
buf_chars = 0 if sidecar_path:
buf.append(p) yield from chunks_from_variety(sidecar_path)
buf_chars += len(p) return
if buf: # Fallback — emit a single chunk of the raw markdown with whatever
yield { # metadata we have. Better than crashing if someone calls this.
"id": f"{metadata['bundle_id']}::{page_id}::{ordinal}", chunk_id = f"{metadata.get('source','unknown')}::{page_id}::0"
"text": "\n\n".join(buf), yield {
"metadata": {**metadata, "ordinal": ordinal}, "id": chunk_id,
} "text": text,
"metadata": {**metadata, "ordinal": 0},
}
+25 -38
View File
@@ -1,15 +1,19 @@
"""Build Chroma (and optionally BM25) indexes from corpus on disk. """Build Chroma (and BM25) indexes from the seed corpus on disk.
Reads `corpus/<bundle>/<page>.{md,json}`, chunks each page, upserts Reads ``corpus/<source>/<source_key>.json`` sidecars, chunks each
into Chroma. With --rebuild, drops + recreates the collection (clean variety via ``rag.chunk.chunks_from_variety``, upserts into Chroma.
state). With --bm25-only, skips Chroma and rebuilds only the FTS5 With ``--rebuild``, drops + recreates the collection (clean state).
index — useful for fast iteration when chunking didn't change. With ``--bm25-only``, skips Chroma and rebuilds only the FTS5 index
— useful for fast iteration when the chunker didn't change.
Collection name is ``<PRODUCT_NAME>_docs`` (default: ``crop_seed_docs``).
Override via the PRODUCT_NAME env var.
""" """
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import json
import logging import logging
import os
import time import time
from pathlib import Path from pathlib import Path
from typing import Iterator from typing import Iterator
@@ -17,7 +21,7 @@ from typing import Iterator
import chromadb import chromadb
from chromadb.config import Settings from chromadb.config import Settings
from .chunk import chunks_from_page from .chunk import chunks_from_variety
from .embeddings import embedding_function from .embeddings import embedding_function
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -27,39 +31,21 @@ ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus" CORPUS = ROOT / "corpus"
CHROMA_DIR = ROOT / "chroma" CHROMA_DIR = ROOT / "chroma"
# Collection name — convention: <product>_docs. Override via env if needed. PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
import os
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "myproduct")
COLLECTION = f"{PRODUCT_NAME}_docs" COLLECTION = f"{PRODUCT_NAME}_docs"
def page_records() -> Iterator[dict]: def variety_records() -> Iterator[dict]:
"""Walk corpus/, yield chunks for every page.""" """Walk ``corpus/<source>/<source_key>.json``, yield one chunk per
variety."""
if not CORPUS.exists(): if not CORPUS.exists():
log.error("corpus/ doesn't exist; run the scraper first") log.error("corpus/ doesn't exist; run a scraper first")
return return
for bundle_dir in sorted(CORPUS.iterdir()): for source_dir in sorted(CORPUS.iterdir()):
if not bundle_dir.is_dir() or bundle_dir.name.startswith("."): if not source_dir.is_dir() or source_dir.name.startswith("."):
continue continue
for md_path in sorted(bundle_dir.glob("*.md")): for sidecar_path in sorted(source_dir.glob("*.json")):
page_id = md_path.stem yield from chunks_from_variety(sidecar_path)
sidecar = md_path.with_suffix(".json")
if not sidecar.exists():
log.warning("skipping %s — no JSON sidecar", md_path)
continue
md = md_path.read_text()
meta = json.loads(sidecar.read_text())
# Surface common filter fields at the chunk-metadata level
# so Chroma's `where` filter can use them.
base_meta = {
"bundle_id": bundle_dir.name,
"page_id": page_id,
"title": meta.get("title") or "",
"version": meta.get("version") or "",
"platform": meta.get("platform") or "",
"product": meta.get("product") or "",
}
yield from chunks_from_page(md, page_id, base_meta)
def upsert_to_chroma(records: list[dict]) -> int: def upsert_to_chroma(records: list[dict]) -> int:
@@ -67,7 +53,7 @@ def upsert_to_chroma(records: list[dict]) -> int:
path=str(CHROMA_DIR), path=str(CHROMA_DIR),
settings=Settings(anonymized_telemetry=False), settings=Settings(anonymized_telemetry=False),
) )
# Drop + recreate for --rebuild semantics # Drop + recreate for --rebuild semantics.
try: try:
client.delete_collection(COLLECTION) client.delete_collection(COLLECTION)
except Exception: except Exception:
@@ -101,8 +87,11 @@ def main() -> int:
log.info("reading corpus from %s", CORPUS) log.info("reading corpus from %s", CORPUS)
t0 = time.time() t0 = time.time()
records = list(page_records()) records = list(variety_records())
log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0) log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0)
if not records:
log.error("no chunks — is corpus/ populated?")
return 1
if args.bm25_only: if args.bm25_only:
from .bm25 import BM25Index from .bm25 import BM25Index
@@ -118,8 +107,6 @@ def main() -> int:
n = upsert_to_chroma(records) n = upsert_to_chroma(records)
log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c) log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c)
# Build BM25 too — see PLAN.md Phase 8. Safe to remove this block
# for products that don't need hybrid retrieval.
try: try:
from .bm25 import BM25Index from .bm25 import BM25Index
t_b = time.time() t_b = time.time()