Phase 2/3: chunker + indexer + MCP server tools (475 Bayer varieties searchable) #2
+607
-132
@@ -1,20 +1,20 @@
|
|||||||
"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies.
|
"""seed-mcp — MCP server over public US row-crop seed catalogs.
|
||||||
|
|
||||||
This file is the template's structural anchor. The phases described in
|
Tools (all read-only):
|
||||||
PLAN.md add or extend pieces of this file:
|
|
||||||
|
|
||||||
Phase 3 — search_docs, get_page, list_versions stubs (you are here)
|
search_docs natural-language retrieval over the corpus
|
||||||
Phase 6 — reranker integration in search_docs
|
get_page the full markdown body of one variety + sidecar
|
||||||
Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse)
|
list_versions facet discovery (crops / brands / vendors / sources)
|
||||||
Phase 9 — diff_versions, list_cluster, bundle_changelog
|
lookup_variety canonical sidecar JSON by source_key — fact-check
|
||||||
Phase 10 — TimedCall wiring (already imported below)
|
anything you surface from search_docs against this
|
||||||
Phase 11 — <product>_api_lessons tool
|
|
||||||
Phase 12 — find_doc_inconsistencies, submit_doc_bug
|
|
||||||
Phase 13 — weekly_digest + _digest_history reader
|
|
||||||
|
|
||||||
Every stub below has a docstring + `raise NotImplementedError`. Replace
|
The contract with the calling agent is **never fabricate**. Every chunk
|
||||||
the body when you reach the corresponding phase. Keep the signatures
|
we return is verbatim from the source's published catalog (the chunker
|
||||||
stable across products — clients depend on them.
|
rebuilds chunks deterministically from the sidecar JSON). Every
|
||||||
|
response carries the variety's source URL so the agent can cite. The
|
||||||
|
``lookup_variety`` tool exists specifically so the agent can validate
|
||||||
|
specific rating values without having to trust paraphrased retrieval
|
||||||
|
text.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
@@ -23,7 +23,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Annotated
|
from typing import Annotated, Any
|
||||||
|
|
||||||
from mcp.server.fastmcp import FastMCP
|
from mcp.server.fastmcp import FastMCP
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
@@ -33,7 +33,7 @@ from .usage import TimedCall
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Product-specific configuration. Set these for each new build.
|
# Product-specific configuration.
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
|
||||||
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp")
|
PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp")
|
||||||
@@ -44,59 +44,176 @@ ROOT = Path(__file__).resolve().parent.parent
|
|||||||
CORPUS = ROOT / "corpus"
|
CORPUS = ROOT / "corpus"
|
||||||
CHROMA_DIR = ROOT / "chroma"
|
CHROMA_DIR = ROOT / "chroma"
|
||||||
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
|
BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db")))
|
||||||
BUNDLES_JSON = ROOT / "bundles.json"
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase).
|
# Feature flags.
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
|
RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None
|
||||||
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
|
RERANK_POOL = int(os.environ.get("RERANK_POOL", "50"))
|
||||||
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
|
RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30"))
|
||||||
|
|
||||||
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on")
|
HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "true").lower() in ("true", "1", "yes", "on")
|
||||||
RRF_K = int(os.environ.get("RRF_K", "60"))
|
RRF_K = int(os.environ.get("RRF_K", "60"))
|
||||||
|
|
||||||
DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on")
|
|
||||||
DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint
|
|
||||||
DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15"))
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# FastMCP setup.
|
# FastMCP setup.
|
||||||
#
|
|
||||||
# stateless_http=True — every request creates an ephemeral session and
|
|
||||||
# discards it on return. Critical for production: clients don't get
|
|
||||||
# 404 storms when the container is recreated by Watchtower.
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
|
mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Lazy helpers — instantiate expensive things only when actually needed,
|
# Lazy singletons. Instantiate on first use so the server can start even
|
||||||
# so the server still starts when (e.g.) Ollama is briefly unreachable.
|
# when (e.g.) Ollama is briefly unreachable — degraded modes fall back
|
||||||
|
# gracefully rather than refusing to boot.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
_chroma_client = None
|
||||||
|
_chroma_collection = None
|
||||||
|
_bm25 = None
|
||||||
|
_code_index: dict[str, list[tuple[str, str]]] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def _build_code_index() -> dict[str, list[tuple[str, str]]]:
|
||||||
|
"""Walk sidecars once and index varieties by every lookup key a
|
||||||
|
farmer or LLM might paste: ``source_key``, ``hybrid_prefix``,
|
||||||
|
``product_name`` (normalized), each token from
|
||||||
|
``hybrid_prefix``/``product_name`` longer than three chars.
|
||||||
|
|
||||||
|
Returns a dict mapping the normalized key → list of
|
||||||
|
``(source, source_key)`` tuples. Multiple-source matches are kept
|
||||||
|
so we can warn the agent about ambiguity.
|
||||||
|
"""
|
||||||
|
idx: dict[str, list[tuple[str, str]]] = {}
|
||||||
|
|
||||||
|
def _add(key: str, value: tuple[str, str]) -> None:
|
||||||
|
key = (key or "").lower().strip()
|
||||||
|
if not key or len(key) < 4:
|
||||||
|
return
|
||||||
|
idx.setdefault(key, [])
|
||||||
|
if value not in idx[key]:
|
||||||
|
idx[key].append(value)
|
||||||
|
|
||||||
|
if not CORPUS.exists():
|
||||||
|
return idx
|
||||||
|
for source_dir in CORPUS.iterdir():
|
||||||
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
||||||
|
continue
|
||||||
|
for sc in source_dir.glob("*.json"):
|
||||||
|
try:
|
||||||
|
d = json.loads(sc.read_text(encoding="utf-8"))
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
|
continue
|
||||||
|
source = d.get("source") or source_dir.name
|
||||||
|
sk = d.get("source_key") or sc.stem
|
||||||
|
value = (source, sk)
|
||||||
|
_add(sk, value)
|
||||||
|
_add(d.get("hybrid_prefix") or "", value)
|
||||||
|
_add(d.get("product_name") or "", value)
|
||||||
|
# Token-split product_name and hybrid_prefix so "DKC62-08RIB"
|
||||||
|
# in a query matches "DKC62-08RIB BRAND BLEND" via the
|
||||||
|
# "DKC62-08RIB" token as well as the full string.
|
||||||
|
for source_field in (d.get("product_name"), d.get("hybrid_prefix")):
|
||||||
|
if not source_field:
|
||||||
|
continue
|
||||||
|
for tok in re.split(r"[\s()/,]+", source_field):
|
||||||
|
tok = tok.strip()
|
||||||
|
# Only retain tokens that LOOK like codes — at least
|
||||||
|
# one digit, mostly alphanumeric/dash. Skips "BRAND"
|
||||||
|
# / "BLEND" / etc.
|
||||||
|
if re.match(r"^[A-Za-z]*\d[\w\-]*$", tok):
|
||||||
|
_add(tok, value)
|
||||||
|
return idx
|
||||||
|
|
||||||
|
|
||||||
|
def _code_lookup() -> dict[str, list[tuple[str, str]]]:
|
||||||
|
global _code_index
|
||||||
|
if _code_index is None:
|
||||||
|
_code_index = _build_code_index()
|
||||||
|
log.info("code-index: %d lookup keys built", len(_code_index))
|
||||||
|
return _code_index
|
||||||
|
|
||||||
|
|
||||||
|
# Tokens that LOOK like a variety code — at least one digit, otherwise
|
||||||
|
# alphanumeric / dash. Catches "DKC62-08RIB", "AG29XF4", "WB1376CLP",
|
||||||
|
# "VT2PRIB"; skips ordinary words like "ratings".
|
||||||
|
_CODE_TOKEN_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9\-]{2,})\b")
|
||||||
|
|
||||||
|
|
||||||
|
def _exact_code_matches(query: str) -> list[tuple[str, str]]:
|
||||||
|
"""Find varieties whose source_key / product_name / token-split
|
||||||
|
components contain a query token. Used as a high-confidence pin
|
||||||
|
before fuzzy retrieval."""
|
||||||
|
idx = _code_lookup()
|
||||||
|
if not idx:
|
||||||
|
return []
|
||||||
|
out: list[tuple[str, str]] = []
|
||||||
|
seen: set[tuple[str, str]] = set()
|
||||||
|
for m in _CODE_TOKEN_RE.finditer(query or ""):
|
||||||
|
tok = m.group(1).lower()
|
||||||
|
if len(tok) < 4 or not any(c.isdigit() for c in tok):
|
||||||
|
continue
|
||||||
|
for v in idx.get(tok, []):
|
||||||
|
if v not in seen:
|
||||||
|
seen.add(v)
|
||||||
|
out.append(v)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _collection():
|
||||||
|
"""Return the Chroma collection, opening it lazily."""
|
||||||
|
global _chroma_client, _chroma_collection
|
||||||
|
if _chroma_collection is not None:
|
||||||
|
return _chroma_collection
|
||||||
|
import chromadb
|
||||||
|
from chromadb.config import Settings
|
||||||
|
from rag.embeddings import embedding_function
|
||||||
|
|
||||||
|
_chroma_client = chromadb.PersistentClient(
|
||||||
|
path=str(CHROMA_DIR),
|
||||||
|
settings=Settings(anonymized_telemetry=False),
|
||||||
|
)
|
||||||
|
_chroma_collection = _chroma_client.get_collection(
|
||||||
|
COLLECTION, embedding_function=embedding_function()
|
||||||
|
)
|
||||||
|
return _chroma_collection
|
||||||
|
|
||||||
|
|
||||||
|
def _bm25_index():
|
||||||
|
"""Return the BM25 index, or None if it doesn't exist on disk."""
|
||||||
|
global _bm25
|
||||||
|
if _bm25 is not None:
|
||||||
|
return _bm25
|
||||||
|
from rag.bm25 import BM25Index
|
||||||
|
idx = BM25Index(BM25_DB)
|
||||||
|
if not idx.exists():
|
||||||
|
return None
|
||||||
|
_bm25 = idx
|
||||||
|
return _bm25
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers — local file IO + filter builder.
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _bundles() -> dict[str, dict]:
|
def _build_where(
|
||||||
"""Cached load of bundles.json into a {slug: bundle_dict} mapping.
|
crop: str | None,
|
||||||
|
brand: str | None,
|
||||||
bundles.json is the product-specific catalog written by the Phase 1
|
vendor: str | None,
|
||||||
scraper. See PLAN.md Phase 1 for the schema.
|
source: str | None,
|
||||||
"""
|
source_key: str | None,
|
||||||
if not BUNDLES_JSON.exists():
|
) -> dict | None:
|
||||||
return {}
|
|
||||||
cat = json.loads(BUNDLES_JSON.read_text())
|
|
||||||
return {b["slug"]: b for b in cat}
|
|
||||||
|
|
||||||
|
|
||||||
def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None:
|
|
||||||
"""Translate filter args into a Chroma `where` clause."""
|
"""Translate filter args into a Chroma `where` clause."""
|
||||||
conds: list[dict] = []
|
conds: list[dict] = []
|
||||||
if version:
|
if crop:
|
||||||
conds.append({"version": version})
|
conds.append({"crop": crop.lower()})
|
||||||
if platform:
|
if brand:
|
||||||
conds.append({"platform": platform})
|
conds.append({"brand": brand.upper()})
|
||||||
if bundle_id:
|
if vendor:
|
||||||
conds.append({"bundle_id": bundle_id})
|
conds.append({"vendor": vendor})
|
||||||
|
if source:
|
||||||
|
conds.append({"source": source})
|
||||||
|
if source_key:
|
||||||
|
conds.append({"source_key": source_key})
|
||||||
if not conds:
|
if not conds:
|
||||||
return None
|
return None
|
||||||
if len(conds) == 1:
|
if len(conds) == 1:
|
||||||
@@ -104,13 +221,152 @@ def _build_where(version: str | None, platform: str | None, bundle_id: str | Non
|
|||||||
return {"$and": conds}
|
return {"$and": conds}
|
||||||
|
|
||||||
|
|
||||||
def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
|
def _read_sidecar(source: str, source_key: str) -> dict | None:
|
||||||
"""Read a corpus page off disk. Returns (markdown_body, metadata_dict)."""
|
"""Read a variety's sidecar JSON off disk."""
|
||||||
md_path = CORPUS / bundle_id / (page_id + ".md")
|
path = CORPUS / source / f"{source_key}.json"
|
||||||
json_path = CORPUS / bundle_id / (page_id + ".json")
|
if not path.exists():
|
||||||
if not md_path.exists() or not json_path.exists():
|
|
||||||
return None
|
return None
|
||||||
return md_path.read_text(), json.loads(json_path.read_text())
|
try:
|
||||||
|
return json.loads(path.read_text(encoding="utf-8"))
|
||||||
|
except (OSError, json.JSONDecodeError) as exc:
|
||||||
|
log.warning("sidecar read failed for %s/%s: %s", source, source_key, exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _read_markdown(source: str, source_key: str) -> str | None:
|
||||||
|
path = CORPUS / source / f"{source_key}.md"
|
||||||
|
if not path.exists():
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return path.read_text(encoding="utf-8")
|
||||||
|
except OSError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _format_hit(doc: str, meta: dict, distance: float | None = None) -> str:
|
||||||
|
"""Render one retrieval hit as a fenced markdown block with full
|
||||||
|
provenance attached. ``doc`` is the chunk text; ``meta`` is the
|
||||||
|
chunk's metadata dict."""
|
||||||
|
src_url = meta.get("source_url") or ""
|
||||||
|
src_key = meta.get("source_key") or ""
|
||||||
|
src = meta.get("source") or ""
|
||||||
|
vendor = meta.get("vendor") or ""
|
||||||
|
brand = meta.get("brand") or ""
|
||||||
|
crop = meta.get("crop") or ""
|
||||||
|
name = meta.get("product_name") or src_key
|
||||||
|
|
||||||
|
header = (
|
||||||
|
f"### {name} \n"
|
||||||
|
f"`{src}::{src_key}` — {vendor} / {brand} / {crop} \n"
|
||||||
|
f"<{src_url}>"
|
||||||
|
)
|
||||||
|
if distance is not None:
|
||||||
|
header += f" \n_(distance={distance:.4f})_"
|
||||||
|
return f"{header}\n\n{doc.strip()}\n"
|
||||||
|
|
||||||
|
|
||||||
|
def _rrf_fuse(rankings: list[list[str]], k: int = RRF_K) -> list[str]:
|
||||||
|
"""Reciprocal Rank Fusion — merge multiple ranked id lists into one.
|
||||||
|
|
||||||
|
Score(id) = sum over each ranking R of 1 / (k + rank_R(id)).
|
||||||
|
Robust to score-scale differences between dense (cosine) and lexical
|
||||||
|
(BM25). k=60 is the literature default; not particularly sensitive.
|
||||||
|
"""
|
||||||
|
scores: dict[str, float] = {}
|
||||||
|
for ranking in rankings:
|
||||||
|
for rank, doc_id in enumerate(ranking):
|
||||||
|
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)
|
||||||
|
return sorted(scores, key=lambda d: scores[d], reverse=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _structured_ratings_block(sidecar: dict) -> str:
|
||||||
|
"""Render the sidecar's grouped characteristics + identity as a
|
||||||
|
fact-checkable block, with the source URL pinned at top.
|
||||||
|
|
||||||
|
This is the response body for ``get_page`` and the embedded
|
||||||
|
"canonical data" block agents should trust over any free-text
|
||||||
|
paraphrase."""
|
||||||
|
lines: list[str] = []
|
||||||
|
name = sidecar.get("product_name") or sidecar.get("source_key", "")
|
||||||
|
lines.append(f"# {name}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
facets: list[str] = []
|
||||||
|
if sidecar.get("vendor"):
|
||||||
|
facets.append(f"**Vendor:** {sidecar['vendor']}")
|
||||||
|
if sidecar.get("brand"):
|
||||||
|
facets.append(f"**Brand:** {str(sidecar['brand']).title()}")
|
||||||
|
if sidecar.get("crop"):
|
||||||
|
facets.append(f"**Crop:** {str(sidecar['crop']).capitalize()}")
|
||||||
|
if sidecar.get("relative_maturity") and sidecar.get("crop") == "corn":
|
||||||
|
facets.append(f"**Relative maturity:** {sidecar['relative_maturity']}")
|
||||||
|
if sidecar.get("maturity_group") and sidecar.get("crop") == "soybeans":
|
||||||
|
facets.append(f"**Maturity group:** {sidecar['maturity_group']}")
|
||||||
|
if sidecar.get("relative_maturity") and sidecar.get("crop") == "wheat":
|
||||||
|
facets.append(f"**Maturity:** {sidecar['relative_maturity']}")
|
||||||
|
if sidecar.get("wheat_class"):
|
||||||
|
facets.append(f"**Wheat class:** {sidecar['wheat_class']}")
|
||||||
|
if sidecar.get("release_year"):
|
||||||
|
facets.append(f"**Released:** {sidecar['release_year']}")
|
||||||
|
if sidecar.get("trait_stack"):
|
||||||
|
facets.append(f"**Trait stack:** {', '.join(sidecar['trait_stack'])}")
|
||||||
|
if facets:
|
||||||
|
lines.append(" · ".join(facets))
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
urls = sidecar.get("source_urls") or []
|
||||||
|
if urls:
|
||||||
|
lines.append(f"**Source:** {urls[0]}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
scale = sidecar.get("_scale_direction") or "scale direction not declared by source"
|
||||||
|
lines.append(f"**Rating scale (as published):** {scale}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Canonical ratings — one table per group, values verbatim.
|
||||||
|
for g in sidecar.get("characteristics_groups") or []:
|
||||||
|
label = g.get("label") or "Characteristics"
|
||||||
|
items = g.get("items") or []
|
||||||
|
if not items:
|
||||||
|
continue
|
||||||
|
lines.append(f"## {label.title()}")
|
||||||
|
lines.append("")
|
||||||
|
lines.append("| Characteristic | Value |")
|
||||||
|
lines.append("|---|---|")
|
||||||
|
for it in items:
|
||||||
|
ch = (it.get("characteristic") or "").strip()
|
||||||
|
v = (it.get("value") or "").strip()
|
||||||
|
lines.append(f"| {ch} | {v} |")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
if sidecar.get("positioning_statement"):
|
||||||
|
lines.append("## Vendor positioning")
|
||||||
|
lines.append("")
|
||||||
|
lines.append(sidecar["positioning_statement"].strip())
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
if sidecar.get("strengths"):
|
||||||
|
lines.append("## Strengths / management notes (vendor copy)")
|
||||||
|
lines.append("")
|
||||||
|
for s in sidecar["strengths"]:
|
||||||
|
lines.append(f"- {s}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
rec = sidecar.get("regional_recommendations") or []
|
||||||
|
if rec:
|
||||||
|
names = sorted({
|
||||||
|
(r.get("product_list_name") or "").strip()
|
||||||
|
for r in rec
|
||||||
|
if (r.get("product_list_name") or "").strip()
|
||||||
|
})
|
||||||
|
if names:
|
||||||
|
lines.append("## Listed in vendor regional seed guides")
|
||||||
|
lines.append("")
|
||||||
|
for n in names:
|
||||||
|
lines.append(f"- {n}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
return "\n".join(lines).rstrip() + "\n"
|
||||||
|
|
||||||
|
|
||||||
# ===========================================================================
|
# ===========================================================================
|
||||||
@@ -119,119 +375,340 @@ def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None:
|
|||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def search_docs(
|
def search_docs(
|
||||||
query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")],
|
query: Annotated[str, Field(description=(
|
||||||
version: Annotated[
|
"Natural-language query about row-crop seed varieties. "
|
||||||
|
"Mention crop (corn/soybeans/wheat), maturity (RM days / "
|
||||||
|
"MG number / wheat class), traits, disease tolerances, "
|
||||||
|
"or soil/region constraints — they all carry retrieval signal."
|
||||||
|
))],
|
||||||
|
crop: Annotated[
|
||||||
str | None,
|
str | None,
|
||||||
Field(description="OPTIONAL version filter — restrict to one product version."),
|
Field(description="OPTIONAL filter: corn, soybeans, or wheat."),
|
||||||
] = None,
|
] = None,
|
||||||
platform: Annotated[
|
brand: Annotated[
|
||||||
str | None,
|
str | None,
|
||||||
Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."),
|
Field(description=(
|
||||||
|
"OPTIONAL brand filter — DEKALB, ASGROW, WESTBRED, "
|
||||||
|
"GoldenHarvest, NK, AgriPro, Becks. Case-insensitive."
|
||||||
|
)),
|
||||||
] = None,
|
] = None,
|
||||||
bundle_id: Annotated[
|
vendor: Annotated[
|
||||||
str | None,
|
str | None,
|
||||||
Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."),
|
Field(description="OPTIONAL vendor filter — Bayer, Syngenta, Beck's."),
|
||||||
|
] = None,
|
||||||
|
source: Annotated[
|
||||||
|
str | None,
|
||||||
|
Field(description=(
|
||||||
|
"OPTIONAL source filter — e.g. 'bayer_seeds'. Use "
|
||||||
|
"list_versions() to see what's indexed."
|
||||||
|
)),
|
||||||
] = None,
|
] = None,
|
||||||
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
|
k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Search the {product} docs corpus.
|
"""Search the seed-variety corpus for hybrids/varieties matching a query.
|
||||||
|
|
||||||
Returns the top-k most relevant chunks (with full source page URLs)
|
Returns the top-k variety chunks with their full source URLs, ratings,
|
||||||
given a natural-language query. Optional filters narrow the search
|
maturity, traits, and regional listings. Optional filters narrow to one
|
||||||
to one version, one platform, or one bundle. Use list_versions()
|
crop, brand, vendor, or scraper source. Use **list_versions()** first
|
||||||
first if you need to discover the available facet values.
|
to discover valid facet values. Use **lookup_variety()** on any
|
||||||
|
candidate the user is serious about — that returns the canonical
|
||||||
|
sidecar so you can verify exact rating values without trusting the
|
||||||
|
free-text chunk.
|
||||||
|
|
||||||
Call this tool whenever the user asks anything that should be
|
Call this tool whenever an ag professional or farmer asks anything
|
||||||
answerable from the official product documentation.
|
about choosing a seed variety, comparing hybrids, or finding seed
|
||||||
|
matched to a maturity / soil / region / disease constraint.
|
||||||
|
|
||||||
|
NEVER answer seed-selection questions from prior knowledge alone —
|
||||||
|
seed catalogs change yearly and brand-specific. Always search first.
|
||||||
"""
|
"""
|
||||||
with TimedCall("search_docs", {
|
with TimedCall("search_docs", {
|
||||||
"query": query, "version": version, "platform": platform,
|
"query": query, "crop": crop, "brand": brand,
|
||||||
"bundle_id": bundle_id, "k": k,
|
"vendor": vendor, "source": source, "k": k,
|
||||||
}) as _call:
|
}) as _call:
|
||||||
# TODO Phase 2-3: query Chroma collection (see rag/index.py for
|
where = _build_where(crop, brand, vendor, source, None)
|
||||||
# how it was built). Render the top-k chunks as markdown with
|
pool_size = max(k * 3, RERANK_POOL)
|
||||||
# source URLs.
|
|
||||||
# TODO Phase 6: optional reranker via _rerank() if RERANK_URL set.
|
# Exact-code pre-filter. Variety codes ("DKC62-08RIB", "AG29XF4")
|
||||||
# TODO Phase 8: hybrid retrieval if HYBRID_SEARCH=true — run
|
# have NO semantic neighbors — dense retrieval misses them and
|
||||||
# dense + BM25 in parallel, RRF-fuse, hand merged pool to rerank.
|
# RRF can let off-topic noise float to top-1. If the query
|
||||||
_call.set(hits_returned=0)
|
# contains a token that exactly matches a variety's identifier,
|
||||||
raise NotImplementedError("Phase 2/3: implement Chroma query + rendering")
|
# pin those varieties to the top of results.
|
||||||
|
pinned_ids: list[str] = []
|
||||||
|
for src, sk in _exact_code_matches(query):
|
||||||
|
pinned_ids.append(f"{src}::{sk}::0")
|
||||||
|
|
||||||
|
# Dense retrieval via Chroma.
|
||||||
|
try:
|
||||||
|
col = _collection()
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
_call.set(error_dense=str(exc), hits_returned=0)
|
||||||
|
return (
|
||||||
|
"_(retrieval unavailable — Chroma collection not found. "
|
||||||
|
"Has the indexer run? `python -m rag.index --rebuild`.)_"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
dense = col.query(
|
||||||
|
query_texts=[query],
|
||||||
|
n_results=pool_size,
|
||||||
|
where=where,
|
||||||
|
)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
_call.set(error_dense=str(exc), hits_returned=0)
|
||||||
|
return f"_(retrieval failed: {exc})_"
|
||||||
|
|
||||||
|
dense_ids: list[str] = (dense.get("ids") or [[]])[0]
|
||||||
|
dense_docs: list[str] = (dense.get("documents") or [[]])[0]
|
||||||
|
dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0]
|
||||||
|
dense_dists: list[float] = (dense.get("distances") or [[]])[0]
|
||||||
|
|
||||||
|
id_to_doc = dict(zip(dense_ids, dense_docs))
|
||||||
|
id_to_meta = dict(zip(dense_ids, dense_metas))
|
||||||
|
id_to_dist = dict(zip(dense_ids, dense_dists))
|
||||||
|
|
||||||
|
# Hybrid: optionally fuse with BM25. Both are pulled at pool_size,
|
||||||
|
# fused via RRF, top-k returned.
|
||||||
|
used_hybrid = False
|
||||||
|
if HYBRID_SEARCH:
|
||||||
|
bm25 = _bm25_index()
|
||||||
|
if bm25 is not None:
|
||||||
|
bm25_hits = bm25.query(query, n=pool_size, where=where)
|
||||||
|
bm25_ids = [h[0] for h in bm25_hits]
|
||||||
|
if bm25_ids:
|
||||||
|
fused = _rrf_fuse([dense_ids, bm25_ids])
|
||||||
|
fuzzy_ids = fused
|
||||||
|
used_hybrid = True
|
||||||
|
else:
|
||||||
|
fuzzy_ids = dense_ids
|
||||||
|
else:
|
||||||
|
fuzzy_ids = dense_ids
|
||||||
|
else:
|
||||||
|
fuzzy_ids = dense_ids
|
||||||
|
|
||||||
|
# Pin exact-code matches at top, then fill remainder from fuzzy
|
||||||
|
# retrieval (deduped). Pinned matches are deterministic and
|
||||||
|
# high-confidence; they should never lose to a fuzzy match.
|
||||||
|
final_ids: list[str] = []
|
||||||
|
seen: set[str] = set()
|
||||||
|
for cid in pinned_ids + fuzzy_ids:
|
||||||
|
if cid in seen:
|
||||||
|
continue
|
||||||
|
seen.add(cid)
|
||||||
|
final_ids.append(cid)
|
||||||
|
if len(final_ids) >= k:
|
||||||
|
break
|
||||||
|
|
||||||
|
# For ids returned by BM25 but not by Chroma, we need their docs/
|
||||||
|
# metadata too. Re-fetch by id.
|
||||||
|
missing = [i for i in final_ids if i not in id_to_doc]
|
||||||
|
if missing:
|
||||||
|
try:
|
||||||
|
extra = col.get(ids=missing, include=["documents", "metadatas"])
|
||||||
|
for cid, doc, meta in zip(
|
||||||
|
extra.get("ids") or [],
|
||||||
|
extra.get("documents") or [],
|
||||||
|
extra.get("metadatas") or [],
|
||||||
|
):
|
||||||
|
id_to_doc[cid] = doc
|
||||||
|
id_to_meta[cid] = meta
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning("get-by-id for BM25-only hits failed: %s", exc)
|
||||||
|
|
||||||
|
_call.set(
|
||||||
|
hits_returned=len(final_ids),
|
||||||
|
hybrid=used_hybrid,
|
||||||
|
pool_size=pool_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not final_ids:
|
||||||
|
return (
|
||||||
|
"_(no varieties matched. Try broadening the query or "
|
||||||
|
"removing filters — call list_versions() to see what's "
|
||||||
|
"indexed.)_"
|
||||||
|
)
|
||||||
|
|
||||||
|
blocks: list[str] = []
|
||||||
|
for cid in final_ids:
|
||||||
|
doc = id_to_doc.get(cid, "")
|
||||||
|
meta = id_to_meta.get(cid, {})
|
||||||
|
dist = id_to_dist.get(cid) if not used_hybrid else None
|
||||||
|
blocks.append(_format_hit(doc, meta, dist))
|
||||||
|
|
||||||
|
header = (
|
||||||
|
f"# Search results — {len(final_ids)} variety chunk"
|
||||||
|
f"{'s' if len(final_ids) != 1 else ''}"
|
||||||
|
f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n"
|
||||||
|
f"_Use `lookup_variety(source_key=...)` on any candidate "
|
||||||
|
f"to fact-check ratings from the canonical sidecar._\n\n---\n\n"
|
||||||
|
)
|
||||||
|
return header + "\n---\n\n".join(blocks)
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def get_page(
|
def get_page(
|
||||||
bundle_id: Annotated[str, Field(description="Bundle slug.")],
|
source: Annotated[str, Field(description=(
|
||||||
page_id: Annotated[str, Field(description="Page filename within the bundle.")],
|
"Scraper source id — e.g. 'bayer_seeds'. Same as the `source` "
|
||||||
|
"field in search_docs results."
|
||||||
|
))],
|
||||||
|
source_key: Annotated[str, Field(description=(
|
||||||
|
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'. Same as "
|
||||||
|
"the `source_key` field in search_docs results."
|
||||||
|
))],
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Return the full markdown for one page, plus a metadata header.
|
"""Return the full canonical record for one variety.
|
||||||
|
|
||||||
Use after search_docs surfaces a relevant page and the user (or you)
|
Emits a structured ratings header (identity, all characteristic
|
||||||
want the complete text — not just the matched chunks.
|
groups, vendor positioning, regional listings) sourced from the
|
||||||
|
variety's sidecar JSON, followed by the raw markdown body the
|
||||||
|
chunker indexed.
|
||||||
|
|
||||||
|
Use this when the user is comparing varieties closely or wants to
|
||||||
|
see every published rating value, not just the ones that matched a
|
||||||
|
query. The structured header is the canonical fact-check surface;
|
||||||
|
use it to validate any specific rating value before quoting it.
|
||||||
"""
|
"""
|
||||||
with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call:
|
with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call:
|
||||||
data = _read_page(bundle_id, page_id)
|
sidecar = _read_sidecar(source, source_key)
|
||||||
if data is None:
|
if sidecar is None:
|
||||||
_call.set(found=False)
|
_call.set(found=False)
|
||||||
return f"Page not found: {bundle_id}/{page_id}"
|
return (
|
||||||
md, meta = data
|
f"_(variety not found: source='{source}' "
|
||||||
_call.set(found=True, page_chars=len(md))
|
f"source_key='{source_key}'. Use list_versions() to see "
|
||||||
# TODO: add a metadata header (title, version, source URL) above
|
f"available sources or search_docs() to find candidates.)_"
|
||||||
# the body. Product-specific shape.
|
)
|
||||||
return md
|
body = _read_markdown(source, source_key) or ""
|
||||||
|
structured = _structured_ratings_block(sidecar)
|
||||||
|
_call.set(found=True, body_chars=len(body), groups=len(sidecar.get("characteristics_groups") or []))
|
||||||
|
sep = "\n\n---\n\n## Indexed body (chunk source text)\n\n"
|
||||||
|
return structured + sep + body
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def list_versions() -> str:
|
def list_versions() -> str:
|
||||||
"""List the available version/platform facets across all bundles.
|
"""List the available scraper sources and the crop/brand/vendor
|
||||||
|
facets across all indexed varieties.
|
||||||
|
|
||||||
Use this to discover valid filter values for search_docs.
|
Use this BEFORE search_docs() the first time you query, to discover
|
||||||
|
which scraper sources, brands, vendors, and crops are actually in
|
||||||
|
the corpus right now. The agent should not assume — the corpus
|
||||||
|
grows as more vendors are scraped.
|
||||||
"""
|
"""
|
||||||
with TimedCall("list_versions", {}) as _call:
|
with TimedCall("list_versions", {}) as _call:
|
||||||
cat = _bundles()
|
facets: dict[str, dict[str, int]] = {
|
||||||
if not cat:
|
"source": {}, "vendor": {}, "brand": {}, "crop": {},
|
||||||
return "_(no bundles indexed yet — run the scraper + indexer)_"
|
}
|
||||||
versions = sorted({b.get("version") for b in cat.values() if b.get("version")})
|
n_varieties = 0
|
||||||
platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")})
|
if CORPUS.exists():
|
||||||
_call.set(versions=len(versions), platforms=len(platforms))
|
for source_dir in sorted(CORPUS.iterdir()):
|
||||||
lines = [f"# Facets across {len(cat)} bundle(s)", ""]
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
||||||
if versions:
|
continue
|
||||||
lines.append("## Versions"); lines.append("")
|
for sc in source_dir.glob("*.json"):
|
||||||
for v in versions: lines.append(f"- `{v}`")
|
try:
|
||||||
|
d = json.loads(sc.read_text(encoding="utf-8"))
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
|
continue
|
||||||
|
n_varieties += 1
|
||||||
|
for k in facets:
|
||||||
|
v = d.get(k)
|
||||||
|
if v:
|
||||||
|
facets[k][v] = facets[k].get(v, 0) + 1
|
||||||
|
|
||||||
|
if n_varieties == 0:
|
||||||
|
return (
|
||||||
|
"_(no varieties indexed yet — run scrapers + indexer "
|
||||||
|
"before calling search_docs)_"
|
||||||
|
)
|
||||||
|
|
||||||
|
_call.set(
|
||||||
|
varieties=n_varieties,
|
||||||
|
sources=len(facets["source"]),
|
||||||
|
vendors=len(facets["vendor"]),
|
||||||
|
brands=len(facets["brand"]),
|
||||||
|
crops=len(facets["crop"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
lines = [f"# Corpus facets ({n_varieties} varieties indexed)", ""]
|
||||||
|
for label, counts in facets.items():
|
||||||
|
if not counts:
|
||||||
|
continue
|
||||||
|
lines.append(f"## {label}")
|
||||||
lines.append("")
|
lines.append("")
|
||||||
if platforms:
|
for k, n in sorted(counts.items(), key=lambda kv: -kv[1]):
|
||||||
lines.append("## Platforms"); lines.append("")
|
lines.append(f"- `{k}` — {n} varieties")
|
||||||
for p in platforms: lines.append(f"- `{p}`")
|
lines.append("")
|
||||||
return "\n".join(lines)
|
return "\n".join(lines).rstrip()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
@mcp.tool()
|
||||||
# Stubs for later phases — keep the signatures in this file so refactors
|
def lookup_variety(
|
||||||
# don't lose the contracts. Implementations come per phase.
|
source_key: Annotated[str, Field(description=(
|
||||||
# ---------------------------------------------------------------------------
|
"Per-variety stable key — e.g. 'dekalb-dkc62-08rib'."
|
||||||
|
))],
|
||||||
|
source: Annotated[
|
||||||
|
str | None,
|
||||||
|
Field(description=(
|
||||||
|
"OPTIONAL scraper source ('bayer_seeds' etc). If omitted, "
|
||||||
|
"scans all sources for the source_key."
|
||||||
|
)),
|
||||||
|
] = None,
|
||||||
|
) -> str:
|
||||||
|
"""Return the canonical sidecar JSON for one variety, verbatim.
|
||||||
|
|
||||||
# @mcp.tool() # Phase 9
|
USE THIS to fact-check any rating value before quoting it to the
|
||||||
# def list_cluster(bundle_id: str, page_id: str) -> str: ...
|
farmer. The output is the exact data the scraper captured from the
|
||||||
|
vendor's published catalog — no paraphrasing, no inference.
|
||||||
|
|
||||||
# @mcp.tool() # Phase 9
|
Call this whenever the user (or you) needs an exact value for a
|
||||||
# def diff_versions(bundle_id: str, page_id: str, against_bundle_id: str, context: int = 3) -> str: ...
|
specific rating, trait, or maturity — not just a semantic match
|
||||||
|
from search_docs.
|
||||||
|
"""
|
||||||
|
with TimedCall("lookup_variety", {"source_key": source_key, "source": source}) as _call:
|
||||||
|
if source:
|
||||||
|
sidecar = _read_sidecar(source, source_key)
|
||||||
|
if sidecar is not None:
|
||||||
|
_call.set(found=True, source=source)
|
||||||
|
return "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```"
|
||||||
|
_call.set(found=False, source=source)
|
||||||
|
return f"_(variety '{source_key}' not found under source '{source}')_"
|
||||||
|
|
||||||
# @mcp.tool() # Phase 9
|
# No source given — scan all source dirs for a match.
|
||||||
# def bundle_changelog(bundle_id_new: str, bundle_id_old: str, min_churn: int = 5, max_changed: int = 50) -> str: ...
|
if not CORPUS.exists():
|
||||||
|
_call.set(found=False)
|
||||||
|
return "_(corpus is empty — no scrapers have run yet)_"
|
||||||
|
matches: list[tuple[str, dict]] = []
|
||||||
|
for source_dir in sorted(CORPUS.iterdir()):
|
||||||
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
||||||
|
continue
|
||||||
|
sidecar = _read_sidecar(source_dir.name, source_key)
|
||||||
|
if sidecar is not None:
|
||||||
|
matches.append((source_dir.name, sidecar))
|
||||||
|
|
||||||
# @mcp.tool() # Phase 13
|
if not matches:
|
||||||
# def weekly_digest(days: int = 7, version: str | None = None, platform: str | None = None, ...) -> str: ...
|
_call.set(found=False)
|
||||||
|
return (
|
||||||
|
f"_(no variety with source_key '{source_key}' found in any "
|
||||||
|
f"source. Use search_docs() to find the right key.)_"
|
||||||
|
)
|
||||||
|
|
||||||
# @mcp.tool() # Phase 9 (or 3 — useful early)
|
_call.set(found=True, matches=len(matches))
|
||||||
# def corpus_status() -> str: ...
|
if len(matches) == 1:
|
||||||
|
src_name, sidecar = matches[0]
|
||||||
|
return (
|
||||||
|
f"_(matched in source `{src_name}`)_\n\n"
|
||||||
|
"```json\n"
|
||||||
|
+ json.dumps(sidecar, indent=2, ensure_ascii=False)
|
||||||
|
+ "\n```"
|
||||||
|
)
|
||||||
|
|
||||||
# @mcp.tool() # Phase 11
|
# Multi-match: serialize each labeled.
|
||||||
# def myproduct_api_lessons(topic: str | None = None) -> str: ...
|
out: list[str] = [f"_(matched {len(matches)} sources)_\n"]
|
||||||
|
for src_name, sidecar in matches:
|
||||||
# @mcp.tool() # Phase 12
|
out.append(f"### source: `{src_name}`")
|
||||||
# def find_doc_inconsistencies(scope_query: str, ...) -> str: ...
|
out.append("```json")
|
||||||
|
out.append(json.dumps(sidecar, indent=2, ensure_ascii=False))
|
||||||
# @mcp.tool() # Phase 12
|
out.append("```")
|
||||||
# def submit_doc_bug(page_url: str, content: str, email: str | None = None, ...) -> str: ...
|
return "\n".join(out)
|
||||||
|
|
||||||
|
|
||||||
# ===========================================================================
|
# ===========================================================================
|
||||||
@@ -252,8 +729,6 @@ def main() -> None:
|
|||||||
else:
|
else:
|
||||||
mcp.settings.host = args.host
|
mcp.settings.host = args.host
|
||||||
mcp.settings.port = args.port
|
mcp.settings.port = args.port
|
||||||
# DNS-rebinding protection defaults to localhost-only — disable for
|
|
||||||
# container-network DNS hostnames. See PLAN.md "Hosting" notes.
|
|
||||||
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
|
if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}:
|
||||||
mcp.settings.transport_security.enable_dns_rebinding_protection = False
|
mcp.settings.transport_security.enable_dns_rebinding_protection = False
|
||||||
mcp.run(transport=args.transport)
|
mcp.run(transport=args.transport)
|
||||||
|
|||||||
+52
-99
@@ -1,17 +1,14 @@
|
|||||||
"""SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes.
|
"""SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes.
|
||||||
|
|
||||||
Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a
|
Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a
|
||||||
limit of single-tower dense embeddings: when a query has specific
|
single-tower dense embedding's weakness on rare technical tokens —
|
||||||
technical terms (filenames, language names, error codes, API paths),
|
for seed-mcp that's variety codes ("DKC62-08RIB"), trait codes
|
||||||
the dense embedding doesn't bridge from the query into a short
|
("XF", "VT2PRIB"), disease abbreviations ("SCN", "SDS", "Goss's"),
|
||||||
code-focused chunk. The chunk loses to the much larger crowd of
|
and Rps gene names ("Rps1c", "Rps3a"). Dense embeddings don't bridge
|
||||||
prose chunks that semantically match the query topic.
|
queries like "Rps3a soybean" cleanly into the relevant chunk; BM25
|
||||||
|
matches them directly. Fused with the dense ranking via RRF, the
|
||||||
BM25 handles this directly. Lexical overlap on rare terms ("python",
|
hybrid result is strictly better than either alone for the queries
|
||||||
"create_vpg.py", "PROTECTED_SITE_ID", "applyUpgrade") scores those
|
we expect from the farm-advisor agent.
|
||||||
chunks high. Fused with the dense ranking via RRF, the hybrid result
|
|
||||||
is strictly better than either alone for the queries we've seen
|
|
||||||
fail.
|
|
||||||
|
|
||||||
Why SQLite FTS5:
|
Why SQLite FTS5:
|
||||||
- In the stdlib. Zero new deps.
|
- In the stdlib. Zero new deps.
|
||||||
@@ -19,36 +16,13 @@ Why SQLite FTS5:
|
|||||||
`rag.index --rebuild` regenerates from corpus.
|
`rag.index --rebuild` regenerates from corpus.
|
||||||
- Built-in `bm25()` ranking function. No knobs to tune that matter
|
- Built-in `bm25()` ranking function. No knobs to tune that matter
|
||||||
for our use case (k1=1.2, b=0.75 defaults are fine).
|
for our use case (k1=1.2, b=0.75 defaults are fine).
|
||||||
- Builds 70k+ chunks in seconds. Faster than the Chroma rebuild's
|
- Builds <1k chunks in milliseconds; adds nothing to rebuild time.
|
||||||
embedding step by 100×, so it adds basically nothing to the
|
|
||||||
full-rebuild cycle.
|
|
||||||
|
|
||||||
Schema is two tables to keep filtering clean. FTS5 doesn't filter
|
Schema is two tables to keep filtering clean. FTS5 doesn't filter
|
||||||
nicely on its own columns; the content_rowid pattern keeps an
|
nicely on its own columns; the content_rowid pattern keeps an
|
||||||
external metadata table joinable by rowid:
|
external metadata table joinable by rowid. For seed-mcp the
|
||||||
|
filterable columns are seed-domain facets — source, vendor, brand,
|
||||||
CREATE TABLE chunks_meta (
|
crop, source_key — rather than the docs-template version/platform.
|
||||||
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
id TEXT UNIQUE,
|
|
||||||
bundle_id TEXT, page_id TEXT, version TEXT,
|
|
||||||
platform TEXT, product TEXT, ordinal INTEGER
|
|
||||||
);
|
|
||||||
CREATE VIRTUAL TABLE chunks_fts USING fts5(
|
|
||||||
text,
|
|
||||||
tokenize = 'porter unicode61 remove_diacritics 2',
|
|
||||||
content = 'chunks_meta',
|
|
||||||
content_rowid = 'rowid'
|
|
||||||
);
|
|
||||||
|
|
||||||
Queries:
|
|
||||||
|
|
||||||
SELECT m.id, bm25(chunks_fts) AS score
|
|
||||||
FROM chunks_meta m
|
|
||||||
JOIN chunks_fts f ON m.rowid = f.rowid
|
|
||||||
WHERE f MATCH ?
|
|
||||||
AND m.version = ? -- optional metadata filter
|
|
||||||
ORDER BY bm25(chunks_fts) -- lower = better in FTS5
|
|
||||||
LIMIT ?;
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
@@ -63,42 +37,30 @@ log = logging.getLogger(__name__)
|
|||||||
# Default location: bm25/<product>_docs.db at the repo root, next to chroma/.
|
# Default location: bm25/<product>_docs.db at the repo root, next to chroma/.
|
||||||
ROOT = Path(__file__).resolve().parent.parent
|
ROOT = Path(__file__).resolve().parent.parent
|
||||||
DEFAULT_DB_DIR = ROOT / "bm25"
|
DEFAULT_DB_DIR = ROOT / "bm25"
|
||||||
DEFAULT_DB_NAME = "<product>_docs.db"
|
DEFAULT_DB_NAME = "crop_seed_docs.db"
|
||||||
|
|
||||||
# Columns we expose as filterable metadata. Mirrors what _build_where in
|
# Columns we expose as filterable metadata. Mirrors what
|
||||||
# docs_mcp/server.py accepts so the same filter dicts work for both
|
# ``docs_mcp.server._build_where`` accepts so the same filter dict
|
||||||
# Chroma and BM25 without per-retriever translation in the caller.
|
# works for both Chroma and BM25 without per-retriever translation.
|
||||||
FILTER_COLUMNS = ("bundle_id", "page_id", "version", "platform", "product", "ordinal")
|
FILTER_COLUMNS = ("source", "vendor", "brand", "crop", "source_key", "ordinal")
|
||||||
|
|
||||||
|
|
||||||
# Allowlist tokenizer for free-text queries. FTS5's parser chokes on lots
|
# Allowlist tokenizer for free-text queries. FTS5's parser chokes on
|
||||||
# of punctuation we routinely see in user queries (".10.9", "?", "VPG's",
|
# lots of punctuation we routinely see in farmer queries ("Rps1c",
|
||||||
# em-dash, etc.). Rather than blocklist every operator, just keep
|
# "SCN-resistant", "0.05 MG", em-dashes). Rather than blocklist every
|
||||||
# alphanumerics + a few separators and replace everything else with a
|
# operator, keep alphanumerics + a few separators and replace
|
||||||
# space. This loses the ability to phrase-search ("exact match") but we
|
# everything else with a space.
|
||||||
# don't expose that to users anyway — they ask natural-language questions
|
|
||||||
# and want the answer, not a Boolean DSL.
|
|
||||||
_KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]")
|
_KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]")
|
||||||
# FTS5 reserves these Boolean operator KEYWORDS at the token level —
|
# FTS5 reserves these Boolean operator KEYWORDS at the token level.
|
||||||
# stripping them avoids accidental phrase-query behavior when a user
|
|
||||||
# query happens to contain bare "AND", "OR", "NOT", "NEAR".
|
|
||||||
_BOOLEAN_KW_RE = re.compile(r"(?<!\w)(AND|OR|NOT|NEAR)(?!\w)")
|
_BOOLEAN_KW_RE = re.compile(r"(?<!\w)(AND|OR|NOT|NEAR)(?!\w)")
|
||||||
|
|
||||||
|
|
||||||
def _sanitize_query(text: str) -> str:
|
def _sanitize_query(text: str) -> str:
|
||||||
"""Reduce a natural-language query to an FTS5 OR-of-tokens query.
|
"""Reduce a natural-language query to an FTS5 OR-of-tokens query.
|
||||||
|
|
||||||
Two transformations:
|
See ``crop-chem-docs`` for the rationale; same transformation
|
||||||
|
applies here. OR semantics maximizes recall — BM25 already
|
||||||
1. Non-alphanumeric → space (drops punctuation; "10.9?" becomes
|
weights documents with more query-term matches higher.
|
||||||
"10 9"). Lets us handle versions, parens, question marks, etc.
|
|
||||||
without inviting FTS5 parse errors.
|
|
||||||
2. Boolean keywords stripped (FTS5 reserves AND/OR/NOT/NEAR).
|
|
||||||
3. Tokens explicitly OR'd. FTS5's default is AND-of-tokens — for
|
|
||||||
any non-trivial natural-language query that means zero hits
|
|
||||||
(no chunk contains every word). OR semantics is what we want:
|
|
||||||
BM25 already weights documents containing more query terms
|
|
||||||
higher, so we don't lose precision, but we DO gain recall.
|
|
||||||
"""
|
"""
|
||||||
cleaned = _KEEP_RE.sub(" ", text)
|
cleaned = _KEEP_RE.sub(" ", text)
|
||||||
cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned)
|
cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned)
|
||||||
@@ -113,9 +75,9 @@ def _where_to_sql(where: dict | None) -> tuple[str, list[Any]]:
|
|||||||
|
|
||||||
Accepts the same shapes ``docs_mcp.server._build_where`` produces:
|
Accepts the same shapes ``docs_mcp.server._build_where`` produces:
|
||||||
|
|
||||||
None → ("", [])
|
None → ("", [])
|
||||||
{"version": "10.9"} → ("AND m.version = ?", ["10.9"])
|
{"crop": "corn"} → ("AND m.crop = ?", ["corn"])
|
||||||
{"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...])
|
{"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...])
|
||||||
|
|
||||||
Unknown keys are silently dropped (defensive — better to over-match
|
Unknown keys are silently dropped (defensive — better to over-match
|
||||||
than to crash on a filter we don't know).
|
than to crash on a filter we don't know).
|
||||||
@@ -158,35 +120,32 @@ class BM25Index:
|
|||||||
def build(self, records: list[dict]) -> int:
|
def build(self, records: list[dict]) -> int:
|
||||||
"""Rebuild the index from scratch from `records`.
|
"""Rebuild the index from scratch from `records`.
|
||||||
|
|
||||||
`records` is the same list ``rag.index.page_records`` produces:
|
`records` is the same list ``rag.index.variety_records`` produces:
|
||||||
``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk
|
``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk
|
||||||
insert wrapped in a transaction — single-digit seconds for the
|
insert wrapped in a transaction.
|
||||||
full 73k-chunk corpus.
|
|
||||||
"""
|
"""
|
||||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
# Drop and recreate. Idempotent rebuild.
|
|
||||||
if self.db_path.exists():
|
if self.db_path.exists():
|
||||||
self.db_path.unlink()
|
self.db_path.unlink()
|
||||||
with sqlite3.connect(self.db_path) as con:
|
with sqlite3.connect(self.db_path) as con:
|
||||||
con.executescript(self._schema_sql())
|
con.executescript(self._schema_sql())
|
||||||
con.executemany(
|
con.executemany(
|
||||||
"INSERT INTO chunks_meta (id, bundle_id, page_id, version, "
|
"INSERT INTO chunks_meta "
|
||||||
"platform, product, ordinal) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
"(id, source, vendor, brand, crop, source_key, ordinal) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||||
[
|
[
|
||||||
(
|
(
|
||||||
r["id"],
|
r["id"],
|
||||||
r["metadata"].get("bundle_id") or "",
|
r["metadata"].get("source") or "",
|
||||||
r["metadata"].get("page_id") or "",
|
r["metadata"].get("vendor") or "",
|
||||||
r["metadata"].get("version") or "",
|
r["metadata"].get("brand") or "",
|
||||||
r["metadata"].get("platform") or "",
|
r["metadata"].get("crop") or "",
|
||||||
r["metadata"].get("product") or "",
|
r["metadata"].get("source_key") or "",
|
||||||
int(r["metadata"].get("ordinal") or 0),
|
int(r["metadata"].get("ordinal") or 0),
|
||||||
)
|
)
|
||||||
for r in records
|
for r in records
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
# Populate the FTS5 contentless-ish table by rowid. We populated
|
|
||||||
# chunks_meta first; rowids align with insertion order.
|
|
||||||
con.executemany(
|
con.executemany(
|
||||||
"INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)",
|
"INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)",
|
||||||
[
|
[
|
||||||
@@ -210,15 +169,12 @@ class BM25Index:
|
|||||||
|
|
||||||
FTS5's bm25() returns NEGATIVE numbers — more relevant docs have
|
FTS5's bm25() returns NEGATIVE numbers — more relevant docs have
|
||||||
smaller (more negative) scores. We order ASC so the first row is
|
smaller (more negative) scores. We order ASC so the first row is
|
||||||
the most relevant. Callers that need a "rank" should enumerate
|
the most relevant.
|
||||||
the returned list.
|
|
||||||
"""
|
"""
|
||||||
sanitized = _sanitize_query(text)
|
sanitized = _sanitize_query(text)
|
||||||
if not sanitized:
|
if not sanitized:
|
||||||
return []
|
return []
|
||||||
where_sql, params = _where_to_sql(where)
|
where_sql, params = _where_to_sql(where)
|
||||||
# FTS5 MATCH wants the unaliased table name on its left, so we use
|
|
||||||
# chunks_fts (no alias) and JOIN by rowid against chunks_meta.
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT m.id, bm25(chunks_fts) AS score "
|
"SELECT m.id, bm25(chunks_fts) AS score "
|
||||||
"FROM chunks_fts "
|
"FROM chunks_fts "
|
||||||
@@ -232,17 +188,13 @@ class BM25Index:
|
|||||||
cur = con.execute(sql, [sanitized, *params, n])
|
cur = con.execute(sql, [sanitized, *params, n])
|
||||||
return [(row[0], float(row[1])) for row in cur.fetchall()]
|
return [(row[0], float(row[1])) for row in cur.fetchall()]
|
||||||
except sqlite3.OperationalError as e:
|
except sqlite3.OperationalError as e:
|
||||||
# FTS5 syntax error (rare after sanitization) or db missing.
|
|
||||||
# Caller decides whether to fall back to dense-only.
|
|
||||||
log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80])
|
log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80])
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def exists(self) -> bool:
|
def exists(self) -> bool:
|
||||||
"""Cheap probe — does the index file exist on disk?"""
|
|
||||||
return self.db_path.exists()
|
return self.db_path.exists()
|
||||||
|
|
||||||
def count(self) -> int:
|
def count(self) -> int:
|
||||||
"""Number of chunks indexed. 0 if the db is missing or empty."""
|
|
||||||
if not self.exists():
|
if not self.exists():
|
||||||
return 0
|
return 0
|
||||||
try:
|
try:
|
||||||
@@ -257,18 +209,19 @@ class BM25Index:
|
|||||||
def _schema_sql() -> str:
|
def _schema_sql() -> str:
|
||||||
return """
|
return """
|
||||||
CREATE TABLE chunks_meta (
|
CREATE TABLE chunks_meta (
|
||||||
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
|
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
id TEXT UNIQUE NOT NULL,
|
id TEXT UNIQUE NOT NULL,
|
||||||
bundle_id TEXT,
|
source TEXT,
|
||||||
page_id TEXT,
|
vendor TEXT,
|
||||||
version TEXT,
|
brand TEXT,
|
||||||
platform TEXT,
|
crop TEXT,
|
||||||
product TEXT,
|
source_key TEXT,
|
||||||
ordinal INTEGER
|
ordinal INTEGER
|
||||||
);
|
);
|
||||||
CREATE INDEX idx_meta_version ON chunks_meta(version);
|
CREATE INDEX idx_meta_source ON chunks_meta(source);
|
||||||
CREATE INDEX idx_meta_platform ON chunks_meta(platform);
|
CREATE INDEX idx_meta_crop ON chunks_meta(crop);
|
||||||
CREATE INDEX idx_meta_bundle ON chunks_meta(bundle_id);
|
CREATE INDEX idx_meta_brand ON chunks_meta(brand);
|
||||||
|
CREATE INDEX idx_meta_source_key ON chunks_meta(source_key);
|
||||||
|
|
||||||
CREATE VIRTUAL TABLE chunks_fts USING fts5(
|
CREATE VIRTUAL TABLE chunks_fts USING fts5(
|
||||||
text,
|
text,
|
||||||
|
|||||||
+298
-100
@@ -1,126 +1,324 @@
|
|||||||
"""Markdown chunker — paragraph-aware, ~400-600 token target.
|
"""Chunker for seed-variety corpus.
|
||||||
|
|
||||||
Adjust the chunking strategy per product if your page format differs
|
Each variety becomes ONE chunk by default. Variety pages are small
|
||||||
significantly from prose. The output shape (id, text, metadata) is
|
(typically 2-3 KB of useful signal) and nomic-embed-text handles up
|
||||||
fixed by the downstream Chroma + BM25 indexing in rag/index.py — don't
|
to ~8 K tokens cleanly. Splitting a variety across chunks dilutes
|
||||||
change that.
|
the named-rating embeddings (e.g. "SCN resistance 7") that farmers
|
||||||
|
search by — keep them together.
|
||||||
|
|
||||||
The key knob you'll tune per product is chunk-0. Dense retrieval lands
|
The chunk text is a synthetic preamble assembled deterministically
|
||||||
on chunk 0 first for most queries. Make it a synthetic chunk built
|
from the sidecar JSON. Every value in the chunk text comes verbatim
|
||||||
from:
|
from the source. The framing words ("Disease ratings (1-9, 9=best):",
|
||||||
|
"Maturity group:", etc.) are template glue — *we add structure, we
|
||||||
|
do NOT add facts*. Given the same sidecar, this chunker always
|
||||||
|
produces the same chunk text. That's the anti-hallucination
|
||||||
|
contract: the retriever can never surface a rating value that
|
||||||
|
wasn't in the source.
|
||||||
|
|
||||||
- the page title (as natural-language H1)
|
Metadata is flattened to Chroma-safe primitives (str/int/float/bool):
|
||||||
- a 1-sentence task description (you'll have to generate this — for
|
|
||||||
pages that already have a "## Overview" or "## Introduction" the
|
|
||||||
first sentence usually works)
|
|
||||||
- a keyword bag of important terms (filenames, API names, error
|
|
||||||
codes — the rare technical tokens that BM25 lights up on)
|
|
||||||
|
|
||||||
Without a rich chunk 0, dense retrieval gets dominated by the much
|
source "bayer_seeds"
|
||||||
larger prose body, and short pages (script examples, reference cards)
|
source_key "dekalb-dkc075-70rib"
|
||||||
get buried.
|
vendor "Bayer"
|
||||||
|
brand "DEKALB"
|
||||||
|
crop "corn" | "soybeans" | "wheat"
|
||||||
|
product_name "DKC075-70RIB BRAND BLEND"
|
||||||
|
product_id canonical full id
|
||||||
|
source_url the variety's page URL
|
||||||
|
rm corn RM as int when parseable (else absent)
|
||||||
|
mg soy MG as float when parseable (else absent)
|
||||||
|
release_year int when known
|
||||||
|
trait_codes_csv comma-separated trait codes for substring search
|
||||||
|
rating_scale "1-9 (9 = best)" — chunker should ALWAYS attach
|
||||||
|
this so downstream code can detect a flip
|
||||||
|
ordinal chunk index within variety (0-based)
|
||||||
|
|
||||||
|
Lists like ``regional_recommendations`` and the full per-rating dicts
|
||||||
|
do NOT fit Chroma's metadata constraints — they stay in the sidecar
|
||||||
|
JSON, surfaced by ``get_page`` / ``lookup_variety``.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
import re
|
import re
|
||||||
|
from pathlib import Path
|
||||||
from typing import Iterator
|
from typing import Iterator
|
||||||
|
|
||||||
|
|
||||||
# Approximate token estimate from char count. Tunable — set per
|
# Rating-group classification. The source publishes characteristics
|
||||||
# embedder if the default 4 chars/token is wrong.
|
# grouped by label; we map those labels to one of three buckets in
|
||||||
CHARS_PER_TOKEN = 4
|
# the chunk preamble so retrieval gets coherent text. Group labels not
|
||||||
TARGET_TOKENS = 500
|
# listed here fall into "other" and are still emitted, just in their
|
||||||
TARGET_CHARS = TARGET_TOKENS * CHARS_PER_TOKEN
|
# own section.
|
||||||
|
DISEASE_GROUP_LABELS = {
|
||||||
|
"DISEASE RATINGS",
|
||||||
|
"PEST AND DISEASE RESISTANCE",
|
||||||
|
}
|
||||||
|
AGRONOMIC_GROUP_LABELS = {
|
||||||
|
"GROWTH",
|
||||||
|
"HARVEST",
|
||||||
|
"PRODUCTION",
|
||||||
|
"KEY CHARACTERISTICS",
|
||||||
|
"QUALITY",
|
||||||
|
}
|
||||||
|
MANAGEMENT_GROUP_LABELS = {
|
||||||
|
"MANAGEMENT",
|
||||||
|
"HERBICIDE",
|
||||||
|
"SENSITIVITY",
|
||||||
|
"PLANT DESCRIPTION",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def estimate_tokens(text: str) -> int:
|
def _parse_rm(value: object) -> int | None:
|
||||||
return max(1, len(text) // CHARS_PER_TOKEN)
|
"""Best-effort RM-days int. Returns None if not a clean integer
|
||||||
|
(e.g. wheat's qualitative 'Early'/'Medium-Early' values)."""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
s = str(value).strip()
|
||||||
|
if not s:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
# Handle floats stored as strings ("105.0") and the trailing
|
||||||
|
# tenths sometimes seen on early corn ("75").
|
||||||
|
return int(float(s))
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def split_paragraphs(md: str) -> list[str]:
|
def _parse_mg(value: object) -> float | None:
|
||||||
"""Split markdown into paragraph-ish blocks.
|
"""Best-effort MG float. Soy MGs go from 00 to 9.0 with one decimal."""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
s = str(value).strip()
|
||||||
|
if not s:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return float(s)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
Keeps fenced code blocks together (don't slice through ```).
|
|
||||||
Headings start new paragraphs.
|
def _format_items(items: list[dict]) -> str:
|
||||||
|
"""Render `[{characteristic, value}, ...]` to a compact inline list."""
|
||||||
|
out: list[str] = []
|
||||||
|
for it in items:
|
||||||
|
ch = (it.get("characteristic") or "").strip()
|
||||||
|
v = (it.get("value") or "").strip()
|
||||||
|
if ch and v:
|
||||||
|
out.append(f"{ch} {v}")
|
||||||
|
elif ch:
|
||||||
|
out.append(f"{ch} —")
|
||||||
|
return ", ".join(out)
|
||||||
|
|
||||||
|
|
||||||
|
def _render_variety_chunk(sidecar: dict) -> str:
|
||||||
|
"""Build the dense preamble for one variety from its sidecar JSON.
|
||||||
|
|
||||||
|
Faithful to source: every numeric/categorical *value* is verbatim
|
||||||
|
from ``sidecar``. The only generated text is the framing language.
|
||||||
"""
|
"""
|
||||||
blocks: list[str] = []
|
lines: list[str] = []
|
||||||
current: list[str] = []
|
|
||||||
in_fence = False
|
# ---- Identity line --------------------------------------------------
|
||||||
for line in md.splitlines(keepends=True):
|
name = sidecar.get("product_name") or sidecar.get("source_key") or ""
|
||||||
stripped = line.strip()
|
brand = (sidecar.get("brand") or "").strip()
|
||||||
if stripped.startswith("```"):
|
vendor = sidecar.get("vendor") or ""
|
||||||
in_fence = not in_fence
|
crop = (sidecar.get("crop") or "").strip()
|
||||||
current.append(line)
|
crop_label = crop.capitalize() if crop else ""
|
||||||
|
ident = f"# {name}"
|
||||||
|
sub = " ".join(filter(None, [
|
||||||
|
f"({brand.title()} {crop_label} variety, {vendor})" if brand and crop_label and vendor else "",
|
||||||
|
]))
|
||||||
|
lines.append(ident)
|
||||||
|
if sub:
|
||||||
|
lines.append("")
|
||||||
|
lines.append(sub)
|
||||||
|
|
||||||
|
# ---- Identity body --------------------------------------------------
|
||||||
|
facts: list[str] = []
|
||||||
|
|
||||||
|
rm = sidecar.get("relative_maturity")
|
||||||
|
mg = sidecar.get("maturity_group")
|
||||||
|
wc = sidecar.get("wheat_class")
|
||||||
|
if crop == "corn" and rm:
|
||||||
|
facts.append(f"Relative maturity {rm}")
|
||||||
|
elif crop == "soybeans" and mg:
|
||||||
|
facts.append(f"Maturity group {mg}")
|
||||||
|
elif crop == "wheat":
|
||||||
|
if rm:
|
||||||
|
facts.append(f"Maturity {rm}")
|
||||||
|
if wc:
|
||||||
|
facts.append(f"Wheat class {wc}")
|
||||||
|
|
||||||
|
traits = sidecar.get("trait_stack") or []
|
||||||
|
trait_descs = sidecar.get("trait_descriptions") or []
|
||||||
|
if traits:
|
||||||
|
if trait_descs:
|
||||||
|
facts.append(
|
||||||
|
"Trait stack: "
|
||||||
|
+ ", ".join(traits)
|
||||||
|
+ " ("
|
||||||
|
+ "; ".join(trait_descs)
|
||||||
|
+ ")"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
facts.append("Trait stack: " + ", ".join(traits))
|
||||||
|
|
||||||
|
if sidecar.get("release_year"):
|
||||||
|
facts.append(f"Released {sidecar['release_year']}")
|
||||||
|
|
||||||
|
if facts:
|
||||||
|
lines.append("")
|
||||||
|
lines.append(". ".join(facts) + ".")
|
||||||
|
|
||||||
|
# ---- Positioning ----------------------------------------------------
|
||||||
|
pos = (sidecar.get("positioning_statement") or "").strip()
|
||||||
|
if pos:
|
||||||
|
lines.append("")
|
||||||
|
lines.append(f"Positioning: {pos}")
|
||||||
|
|
||||||
|
# ---- Ratings, bucketed for retrieval --------------------------------
|
||||||
|
scale = sidecar.get("_scale_direction") or "(scale direction not declared)"
|
||||||
|
groups = sidecar.get("characteristics_groups") or []
|
||||||
|
disease: list[dict] = []
|
||||||
|
agronomic: list[dict] = []
|
||||||
|
management: list[dict] = []
|
||||||
|
other: list[tuple[str, list[dict]]] = []
|
||||||
|
for g in groups:
|
||||||
|
label = (g.get("label") or "").upper().strip()
|
||||||
|
items = g.get("items") or []
|
||||||
|
if not items:
|
||||||
continue
|
continue
|
||||||
if in_fence:
|
if label in DISEASE_GROUP_LABELS:
|
||||||
current.append(line)
|
disease.extend(items)
|
||||||
continue
|
elif label in AGRONOMIC_GROUP_LABELS:
|
||||||
if stripped.startswith("#"):
|
agronomic.extend(items)
|
||||||
if current:
|
elif label in MANAGEMENT_GROUP_LABELS:
|
||||||
blocks.append("".join(current).strip())
|
management.extend(items)
|
||||||
current = []
|
else:
|
||||||
current.append(line)
|
other.append((g.get("label") or "Other", items))
|
||||||
continue
|
|
||||||
if not stripped and current and not "".join(current).strip().endswith("\n\n"):
|
if disease:
|
||||||
current.append(line)
|
lines.append("")
|
||||||
blocks.append("".join(current).strip())
|
lines.append(f"Disease ratings ({scale}): {_format_items(disease)}.")
|
||||||
current = []
|
if agronomic:
|
||||||
continue
|
lines.append("")
|
||||||
current.append(line)
|
lines.append(f"Agronomic ratings ({scale}): {_format_items(agronomic)}.")
|
||||||
if current:
|
if management:
|
||||||
blocks.append("".join(current).strip())
|
lines.append("")
|
||||||
return [b for b in blocks if b]
|
lines.append(f"Management notes: {_format_items(management)}.")
|
||||||
|
for label, items in other:
|
||||||
|
lines.append("")
|
||||||
|
lines.append(f"{label.title()}: {_format_items(items)}.")
|
||||||
|
|
||||||
|
# ---- Strengths narrative --------------------------------------------
|
||||||
|
strengths = sidecar.get("strengths") or []
|
||||||
|
if strengths:
|
||||||
|
lines.append("")
|
||||||
|
lines.append("Strengths and management notes:")
|
||||||
|
for s in strengths:
|
||||||
|
s = (s or "").strip()
|
||||||
|
if s:
|
||||||
|
lines.append(f"- {s}")
|
||||||
|
|
||||||
|
# ---- Regional listings (compact, not the agronomist emails) ---------
|
||||||
|
rec = sidecar.get("regional_recommendations") or []
|
||||||
|
if rec:
|
||||||
|
names = sorted({
|
||||||
|
(r.get("product_list_name") or "").strip()
|
||||||
|
for r in rec
|
||||||
|
if (r.get("product_list_name") or "").strip()
|
||||||
|
})
|
||||||
|
if names:
|
||||||
|
lines.append("")
|
||||||
|
lines.append("Listed in regional seed guides: " + "; ".join(names) + ".")
|
||||||
|
|
||||||
|
# ---- Provenance footer (must always be in the chunk text so it
|
||||||
|
# can never be lost between retrieval and LLM rendering) --------
|
||||||
|
urls = sidecar.get("source_urls") or []
|
||||||
|
if urls:
|
||||||
|
lines.append("")
|
||||||
|
lines.append(f"Source: {urls[0]}")
|
||||||
|
|
||||||
|
return "\n".join(lines).strip() + "\n"
|
||||||
|
|
||||||
|
|
||||||
def chunks_from_page(
|
def _flat_metadata(sidecar: dict) -> dict:
|
||||||
text: str,
|
"""Distil sidecar into Chroma-safe metadata (primitives only)."""
|
||||||
page_id: str,
|
md: dict = {
|
||||||
metadata: dict,
|
"source": sidecar.get("source") or "",
|
||||||
|
"source_key": sidecar.get("source_key") or "",
|
||||||
|
"vendor": sidecar.get("vendor") or "",
|
||||||
|
"brand": sidecar.get("brand") or "",
|
||||||
|
"crop": sidecar.get("crop") or "",
|
||||||
|
"product_name": sidecar.get("product_name") or "",
|
||||||
|
"product_id": sidecar.get("product_id") or "",
|
||||||
|
"source_url": (sidecar.get("source_urls") or [""])[0],
|
||||||
|
"rating_scale": sidecar.get("_scale_direction") or "",
|
||||||
|
}
|
||||||
|
rm = _parse_rm(sidecar.get("relative_maturity"))
|
||||||
|
mg = _parse_mg(sidecar.get("maturity_group"))
|
||||||
|
if rm is not None:
|
||||||
|
md["rm"] = rm
|
||||||
|
if mg is not None:
|
||||||
|
md["mg"] = mg
|
||||||
|
ry = sidecar.get("release_year")
|
||||||
|
if isinstance(ry, int):
|
||||||
|
md["release_year"] = ry
|
||||||
|
traits = sidecar.get("trait_stack") or []
|
||||||
|
if traits:
|
||||||
|
# Comma-delimited for partial-match / human eyeballing.
|
||||||
|
# Bracket-padded so `LIKE '%,XF,%'` finds whole tokens.
|
||||||
|
md["trait_codes_csv"] = "," + ",".join(traits) + ","
|
||||||
|
if sidecar.get("wheat_class"):
|
||||||
|
md["wheat_class"] = sidecar["wheat_class"]
|
||||||
|
return md
|
||||||
|
|
||||||
|
|
||||||
|
def chunks_from_variety(
|
||||||
|
sidecar_path: Path | str,
|
||||||
|
*,
|
||||||
|
md_path: Path | str | None = None,
|
||||||
) -> Iterator[dict]:
|
) -> Iterator[dict]:
|
||||||
"""Yield chunk dicts ready for index.py to upsert.
|
"""Yield chunk dict(s) for one variety. Currently emits exactly one.
|
||||||
|
|
||||||
The synthetic chunk 0 is the per-product customization point. The
|
Args:
|
||||||
default below is a simple title + body-first-paragraph; rewrite
|
sidecar_path: path to the variety's JSON sidecar.
|
||||||
for richer retrieval signal (see module docstring).
|
md_path: ignored (the chunker rebuilds from sidecar); kept
|
||||||
|
in the signature in case a future split-chunker
|
||||||
|
wants the rendered body.
|
||||||
"""
|
"""
|
||||||
paragraphs = split_paragraphs(text)
|
sidecar = json.loads(Path(sidecar_path).read_text(encoding="utf-8"))
|
||||||
if not paragraphs:
|
text = _render_variety_chunk(sidecar)
|
||||||
return
|
meta = _flat_metadata(sidecar)
|
||||||
|
chunk_id = f"{meta['source']}::{meta['source_key']}::0"
|
||||||
# ----- Chunk 0: synthetic anchor for dense retrieval ---------
|
|
||||||
title = metadata.get("title") or page_id
|
|
||||||
first_para = next((p for p in paragraphs if not p.startswith("#")), "")
|
|
||||||
chunk0_body = (
|
|
||||||
f"# {title}\n\n"
|
|
||||||
f"{first_para[:300]}"
|
|
||||||
# TODO per product: append a keyword bag here (filenames,
|
|
||||||
# API names, error codes) for BM25 + dense joint coverage.
|
|
||||||
)
|
|
||||||
yield {
|
yield {
|
||||||
"id": f"{metadata['bundle_id']}::{page_id}::0",
|
"id": chunk_id,
|
||||||
"text": chunk0_body,
|
"text": text,
|
||||||
"metadata": {**metadata, "ordinal": 0},
|
"metadata": {**meta, "ordinal": 0},
|
||||||
}
|
}
|
||||||
|
|
||||||
# ----- Body chunks: pack paragraphs up to TARGET_CHARS -------
|
|
||||||
ordinal = 1
|
# ----- Backwards-compat shim for the template's index.py -------------------
|
||||||
buf: list[str] = []
|
#
|
||||||
buf_chars = 0
|
# The template's ``rag.index.page_records`` calls
|
||||||
for p in paragraphs:
|
# ``chunks_from_page(md, page_id, base_meta)`` which doesn't know about
|
||||||
if buf_chars + len(p) > TARGET_CHARS and buf:
|
# sidecar JSON. We accept that signature but ignore it — index.py has
|
||||||
yield {
|
# been updated to use ``chunks_from_variety`` directly, and this shim
|
||||||
"id": f"{metadata['bundle_id']}::{page_id}::{ordinal}",
|
# is here only so a stray import of the old name doesn't break.
|
||||||
"text": "\n\n".join(buf),
|
#
|
||||||
"metadata": {**metadata, "ordinal": ordinal},
|
def chunks_from_page(text: str, page_id: str, metadata: dict) -> Iterator[dict]:
|
||||||
}
|
"""Deprecated for seed-mcp; prefer ``chunks_from_variety``."""
|
||||||
ordinal += 1
|
# Best-effort: if metadata carries a sidecar_path, dispatch.
|
||||||
buf = []
|
sidecar_path = metadata.get("_sidecar_path")
|
||||||
buf_chars = 0
|
if sidecar_path:
|
||||||
buf.append(p)
|
yield from chunks_from_variety(sidecar_path)
|
||||||
buf_chars += len(p)
|
return
|
||||||
if buf:
|
# Fallback — emit a single chunk of the raw markdown with whatever
|
||||||
yield {
|
# metadata we have. Better than crashing if someone calls this.
|
||||||
"id": f"{metadata['bundle_id']}::{page_id}::{ordinal}",
|
chunk_id = f"{metadata.get('source','unknown')}::{page_id}::0"
|
||||||
"text": "\n\n".join(buf),
|
yield {
|
||||||
"metadata": {**metadata, "ordinal": ordinal},
|
"id": chunk_id,
|
||||||
}
|
"text": text,
|
||||||
|
"metadata": {**metadata, "ordinal": 0},
|
||||||
|
}
|
||||||
|
|||||||
+25
-38
@@ -1,15 +1,19 @@
|
|||||||
"""Build Chroma (and optionally BM25) indexes from corpus on disk.
|
"""Build Chroma (and BM25) indexes from the seed corpus on disk.
|
||||||
|
|
||||||
Reads `corpus/<bundle>/<page>.{md,json}`, chunks each page, upserts
|
Reads ``corpus/<source>/<source_key>.json`` sidecars, chunks each
|
||||||
into Chroma. With --rebuild, drops + recreates the collection (clean
|
variety via ``rag.chunk.chunks_from_variety``, upserts into Chroma.
|
||||||
state). With --bm25-only, skips Chroma and rebuilds only the FTS5
|
With ``--rebuild``, drops + recreates the collection (clean state).
|
||||||
index — useful for fast iteration when chunking didn't change.
|
With ``--bm25-only``, skips Chroma and rebuilds only the FTS5 index
|
||||||
|
— useful for fast iteration when the chunker didn't change.
|
||||||
|
|
||||||
|
Collection name is ``<PRODUCT_NAME>_docs`` (default: ``crop_seed_docs``).
|
||||||
|
Override via the PRODUCT_NAME env var.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Iterator
|
from typing import Iterator
|
||||||
@@ -17,7 +21,7 @@ from typing import Iterator
|
|||||||
import chromadb
|
import chromadb
|
||||||
from chromadb.config import Settings
|
from chromadb.config import Settings
|
||||||
|
|
||||||
from .chunk import chunks_from_page
|
from .chunk import chunks_from_variety
|
||||||
from .embeddings import embedding_function
|
from .embeddings import embedding_function
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -27,39 +31,21 @@ ROOT = Path(__file__).resolve().parent.parent
|
|||||||
CORPUS = ROOT / "corpus"
|
CORPUS = ROOT / "corpus"
|
||||||
CHROMA_DIR = ROOT / "chroma"
|
CHROMA_DIR = ROOT / "chroma"
|
||||||
|
|
||||||
# Collection name — convention: <product>_docs. Override via env if needed.
|
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed")
|
||||||
import os
|
|
||||||
PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "myproduct")
|
|
||||||
COLLECTION = f"{PRODUCT_NAME}_docs"
|
COLLECTION = f"{PRODUCT_NAME}_docs"
|
||||||
|
|
||||||
|
|
||||||
def page_records() -> Iterator[dict]:
|
def variety_records() -> Iterator[dict]:
|
||||||
"""Walk corpus/, yield chunks for every page."""
|
"""Walk ``corpus/<source>/<source_key>.json``, yield one chunk per
|
||||||
|
variety."""
|
||||||
if not CORPUS.exists():
|
if not CORPUS.exists():
|
||||||
log.error("corpus/ doesn't exist; run the scraper first")
|
log.error("corpus/ doesn't exist; run a scraper first")
|
||||||
return
|
return
|
||||||
for bundle_dir in sorted(CORPUS.iterdir()):
|
for source_dir in sorted(CORPUS.iterdir()):
|
||||||
if not bundle_dir.is_dir() or bundle_dir.name.startswith("."):
|
if not source_dir.is_dir() or source_dir.name.startswith("."):
|
||||||
continue
|
continue
|
||||||
for md_path in sorted(bundle_dir.glob("*.md")):
|
for sidecar_path in sorted(source_dir.glob("*.json")):
|
||||||
page_id = md_path.stem
|
yield from chunks_from_variety(sidecar_path)
|
||||||
sidecar = md_path.with_suffix(".json")
|
|
||||||
if not sidecar.exists():
|
|
||||||
log.warning("skipping %s — no JSON sidecar", md_path)
|
|
||||||
continue
|
|
||||||
md = md_path.read_text()
|
|
||||||
meta = json.loads(sidecar.read_text())
|
|
||||||
# Surface common filter fields at the chunk-metadata level
|
|
||||||
# so Chroma's `where` filter can use them.
|
|
||||||
base_meta = {
|
|
||||||
"bundle_id": bundle_dir.name,
|
|
||||||
"page_id": page_id,
|
|
||||||
"title": meta.get("title") or "",
|
|
||||||
"version": meta.get("version") or "",
|
|
||||||
"platform": meta.get("platform") or "",
|
|
||||||
"product": meta.get("product") or "",
|
|
||||||
}
|
|
||||||
yield from chunks_from_page(md, page_id, base_meta)
|
|
||||||
|
|
||||||
|
|
||||||
def upsert_to_chroma(records: list[dict]) -> int:
|
def upsert_to_chroma(records: list[dict]) -> int:
|
||||||
@@ -67,7 +53,7 @@ def upsert_to_chroma(records: list[dict]) -> int:
|
|||||||
path=str(CHROMA_DIR),
|
path=str(CHROMA_DIR),
|
||||||
settings=Settings(anonymized_telemetry=False),
|
settings=Settings(anonymized_telemetry=False),
|
||||||
)
|
)
|
||||||
# Drop + recreate for --rebuild semantics
|
# Drop + recreate for --rebuild semantics.
|
||||||
try:
|
try:
|
||||||
client.delete_collection(COLLECTION)
|
client.delete_collection(COLLECTION)
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -101,8 +87,11 @@ def main() -> int:
|
|||||||
|
|
||||||
log.info("reading corpus from %s", CORPUS)
|
log.info("reading corpus from %s", CORPUS)
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
records = list(page_records())
|
records = list(variety_records())
|
||||||
log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0)
|
log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0)
|
||||||
|
if not records:
|
||||||
|
log.error("no chunks — is corpus/ populated?")
|
||||||
|
return 1
|
||||||
|
|
||||||
if args.bm25_only:
|
if args.bm25_only:
|
||||||
from .bm25 import BM25Index
|
from .bm25 import BM25Index
|
||||||
@@ -118,8 +107,6 @@ def main() -> int:
|
|||||||
n = upsert_to_chroma(records)
|
n = upsert_to_chroma(records)
|
||||||
log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c)
|
log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c)
|
||||||
|
|
||||||
# Build BM25 too — see PLAN.md Phase 8. Safe to remove this block
|
|
||||||
# for products that don't need hybrid retrieval.
|
|
||||||
try:
|
try:
|
||||||
from .bm25 import BM25Index
|
from .bm25 import BM25Index
|
||||||
t_b = time.time()
|
t_b = time.time()
|
||||||
|
|||||||
Reference in New Issue
Block a user