diff --git a/docs_mcp/server.py b/docs_mcp/server.py index 3dfda4e4..cce034bb 100644 --- a/docs_mcp/server.py +++ b/docs_mcp/server.py @@ -1,20 +1,20 @@ -"""MCP server skeleton — fill in PRODUCT_NAME and the tool bodies. +"""seed-mcp — MCP server over public US row-crop seed catalogs. -This file is the template's structural anchor. The phases described in -PLAN.md add or extend pieces of this file: +Tools (all read-only): - Phase 3 — search_docs, get_page, list_versions stubs (you are here) - Phase 6 — reranker integration in search_docs - Phase 8 — BM25 + hybrid retrieval (HYBRID_SEARCH env gate, _rrf_fuse) - Phase 9 — diff_versions, list_cluster, bundle_changelog - Phase 10 — TimedCall wiring (already imported below) - Phase 11 — _api_lessons tool - Phase 12 — find_doc_inconsistencies, submit_doc_bug - Phase 13 — weekly_digest + _digest_history reader + search_docs natural-language retrieval over the corpus + get_page the full markdown body of one variety + sidecar + list_versions facet discovery (crops / brands / vendors / sources) + lookup_variety canonical sidecar JSON by source_key — fact-check + anything you surface from search_docs against this -Every stub below has a docstring + `raise NotImplementedError`. Replace -the body when you reach the corresponding phase. Keep the signatures -stable across products — clients depend on them. +The contract with the calling agent is **never fabricate**. Every chunk +we return is verbatim from the source's published catalog (the chunker +rebuilds chunks deterministically from the sidecar JSON). Every +response carries the variety's source URL so the agent can cite. The +``lookup_variety`` tool exists specifically so the agent can validate +specific rating values without having to trust paraphrased retrieval +text. """ from __future__ import annotations @@ -23,7 +23,7 @@ import logging import os import re from pathlib import Path -from typing import Annotated +from typing import Annotated, Any from mcp.server.fastmcp import FastMCP from pydantic import Field @@ -33,7 +33,7 @@ from .usage import TimedCall log = logging.getLogger(__name__) # --------------------------------------------------------------------------- -# Product-specific configuration. Set these for each new build. +# Product-specific configuration. # --------------------------------------------------------------------------- PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed") PRODUCT_DOCS_URL = os.environ.get("PRODUCT_DOCS_URL", "https://git.jpaul.io/justin/seed-mcp") @@ -44,59 +44,176 @@ ROOT = Path(__file__).resolve().parent.parent CORPUS = ROOT / "corpus" CHROMA_DIR = ROOT / "chroma" BM25_DB = Path(os.environ.get("BM25_DB", str(ROOT / "bm25" / f"{PRODUCT_NAME}_docs.db"))) -BUNDLES_JSON = ROOT / "bundles.json" # --------------------------------------------------------------------------- -# Feature flags (Phase 6 / 8 / 12 enable these as you ship each phase). +# Feature flags. # --------------------------------------------------------------------------- RERANK_URL = os.environ.get("RERANK_URL", "").rstrip("/") or None RERANK_POOL = int(os.environ.get("RERANK_POOL", "50")) RERANK_TIMEOUT = float(os.environ.get("RERANK_TIMEOUT", "30")) -HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "").lower() in ("true", "1", "yes", "on") +HYBRID_SEARCH = os.environ.get("HYBRID_SEARCH", "true").lower() in ("true", "1", "yes", "on") RRF_K = int(os.environ.get("RRF_K", "60")) -DOC_BUG_SUBMIT_ENABLED = os.environ.get("DOC_BUG_SUBMIT_ENABLED", "").lower() in ("true", "1", "yes", "on") -DOC_BUG_API_URL = os.environ.get("DOC_BUG_API_URL", "") # product-specific endpoint -DOC_BUG_TIMEOUT = float(os.environ.get("DOC_BUG_TIMEOUT", "15")) - # --------------------------------------------------------------------------- # FastMCP setup. -# -# stateless_http=True — every request creates an ephemeral session and -# discards it on return. Critical for production: clients don't get -# 404 storms when the container is recreated by Watchtower. # --------------------------------------------------------------------------- mcp = FastMCP(f"{PRODUCT_NAME}-docs", stateless_http=True) # --------------------------------------------------------------------------- -# Lazy helpers — instantiate expensive things only when actually needed, -# so the server still starts when (e.g.) Ollama is briefly unreachable. +# Lazy singletons. Instantiate on first use so the server can start even +# when (e.g.) Ollama is briefly unreachable — degraded modes fall back +# gracefully rather than refusing to boot. +# --------------------------------------------------------------------------- +_chroma_client = None +_chroma_collection = None +_bm25 = None +_code_index: dict[str, list[tuple[str, str]]] | None = None + + +def _build_code_index() -> dict[str, list[tuple[str, str]]]: + """Walk sidecars once and index varieties by every lookup key a + farmer or LLM might paste: ``source_key``, ``hybrid_prefix``, + ``product_name`` (normalized), each token from + ``hybrid_prefix``/``product_name`` longer than three chars. + + Returns a dict mapping the normalized key → list of + ``(source, source_key)`` tuples. Multiple-source matches are kept + so we can warn the agent about ambiguity. + """ + idx: dict[str, list[tuple[str, str]]] = {} + + def _add(key: str, value: tuple[str, str]) -> None: + key = (key or "").lower().strip() + if not key or len(key) < 4: + return + idx.setdefault(key, []) + if value not in idx[key]: + idx[key].append(value) + + if not CORPUS.exists(): + return idx + for source_dir in CORPUS.iterdir(): + if not source_dir.is_dir() or source_dir.name.startswith("."): + continue + for sc in source_dir.glob("*.json"): + try: + d = json.loads(sc.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + continue + source = d.get("source") or source_dir.name + sk = d.get("source_key") or sc.stem + value = (source, sk) + _add(sk, value) + _add(d.get("hybrid_prefix") or "", value) + _add(d.get("product_name") or "", value) + # Token-split product_name and hybrid_prefix so "DKC62-08RIB" + # in a query matches "DKC62-08RIB BRAND BLEND" via the + # "DKC62-08RIB" token as well as the full string. + for source_field in (d.get("product_name"), d.get("hybrid_prefix")): + if not source_field: + continue + for tok in re.split(r"[\s()/,]+", source_field): + tok = tok.strip() + # Only retain tokens that LOOK like codes — at least + # one digit, mostly alphanumeric/dash. Skips "BRAND" + # / "BLEND" / etc. + if re.match(r"^[A-Za-z]*\d[\w\-]*$", tok): + _add(tok, value) + return idx + + +def _code_lookup() -> dict[str, list[tuple[str, str]]]: + global _code_index + if _code_index is None: + _code_index = _build_code_index() + log.info("code-index: %d lookup keys built", len(_code_index)) + return _code_index + + +# Tokens that LOOK like a variety code — at least one digit, otherwise +# alphanumeric / dash. Catches "DKC62-08RIB", "AG29XF4", "WB1376CLP", +# "VT2PRIB"; skips ordinary words like "ratings". +_CODE_TOKEN_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9\-]{2,})\b") + + +def _exact_code_matches(query: str) -> list[tuple[str, str]]: + """Find varieties whose source_key / product_name / token-split + components contain a query token. Used as a high-confidence pin + before fuzzy retrieval.""" + idx = _code_lookup() + if not idx: + return [] + out: list[tuple[str, str]] = [] + seen: set[tuple[str, str]] = set() + for m in _CODE_TOKEN_RE.finditer(query or ""): + tok = m.group(1).lower() + if len(tok) < 4 or not any(c.isdigit() for c in tok): + continue + for v in idx.get(tok, []): + if v not in seen: + seen.add(v) + out.append(v) + return out + + +def _collection(): + """Return the Chroma collection, opening it lazily.""" + global _chroma_client, _chroma_collection + if _chroma_collection is not None: + return _chroma_collection + import chromadb + from chromadb.config import Settings + from rag.embeddings import embedding_function + + _chroma_client = chromadb.PersistentClient( + path=str(CHROMA_DIR), + settings=Settings(anonymized_telemetry=False), + ) + _chroma_collection = _chroma_client.get_collection( + COLLECTION, embedding_function=embedding_function() + ) + return _chroma_collection + + +def _bm25_index(): + """Return the BM25 index, or None if it doesn't exist on disk.""" + global _bm25 + if _bm25 is not None: + return _bm25 + from rag.bm25 import BM25Index + idx = BM25Index(BM25_DB) + if not idx.exists(): + return None + _bm25 = idx + return _bm25 + + +# --------------------------------------------------------------------------- +# Helpers — local file IO + filter builder. # --------------------------------------------------------------------------- -def _bundles() -> dict[str, dict]: - """Cached load of bundles.json into a {slug: bundle_dict} mapping. - - bundles.json is the product-specific catalog written by the Phase 1 - scraper. See PLAN.md Phase 1 for the schema. - """ - if not BUNDLES_JSON.exists(): - return {} - cat = json.loads(BUNDLES_JSON.read_text()) - return {b["slug"]: b for b in cat} - - -def _build_where(version: str | None, platform: str | None, bundle_id: str | None) -> dict | None: +def _build_where( + crop: str | None, + brand: str | None, + vendor: str | None, + source: str | None, + source_key: str | None, +) -> dict | None: """Translate filter args into a Chroma `where` clause.""" conds: list[dict] = [] - if version: - conds.append({"version": version}) - if platform: - conds.append({"platform": platform}) - if bundle_id: - conds.append({"bundle_id": bundle_id}) + if crop: + conds.append({"crop": crop.lower()}) + if brand: + conds.append({"brand": brand.upper()}) + if vendor: + conds.append({"vendor": vendor}) + if source: + conds.append({"source": source}) + if source_key: + conds.append({"source_key": source_key}) if not conds: return None if len(conds) == 1: @@ -104,13 +221,152 @@ def _build_where(version: str | None, platform: str | None, bundle_id: str | Non return {"$and": conds} -def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None: - """Read a corpus page off disk. Returns (markdown_body, metadata_dict).""" - md_path = CORPUS / bundle_id / (page_id + ".md") - json_path = CORPUS / bundle_id / (page_id + ".json") - if not md_path.exists() or not json_path.exists(): +def _read_sidecar(source: str, source_key: str) -> dict | None: + """Read a variety's sidecar JSON off disk.""" + path = CORPUS / source / f"{source_key}.json" + if not path.exists(): return None - return md_path.read_text(), json.loads(json_path.read_text()) + try: + return json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + log.warning("sidecar read failed for %s/%s: %s", source, source_key, exc) + return None + + +def _read_markdown(source: str, source_key: str) -> str | None: + path = CORPUS / source / f"{source_key}.md" + if not path.exists(): + return None + try: + return path.read_text(encoding="utf-8") + except OSError: + return None + + +def _format_hit(doc: str, meta: dict, distance: float | None = None) -> str: + """Render one retrieval hit as a fenced markdown block with full + provenance attached. ``doc`` is the chunk text; ``meta`` is the + chunk's metadata dict.""" + src_url = meta.get("source_url") or "" + src_key = meta.get("source_key") or "" + src = meta.get("source") or "" + vendor = meta.get("vendor") or "" + brand = meta.get("brand") or "" + crop = meta.get("crop") or "" + name = meta.get("product_name") or src_key + + header = ( + f"### {name} \n" + f"`{src}::{src_key}` — {vendor} / {brand} / {crop} \n" + f"<{src_url}>" + ) + if distance is not None: + header += f" \n_(distance={distance:.4f})_" + return f"{header}\n\n{doc.strip()}\n" + + +def _rrf_fuse(rankings: list[list[str]], k: int = RRF_K) -> list[str]: + """Reciprocal Rank Fusion — merge multiple ranked id lists into one. + + Score(id) = sum over each ranking R of 1 / (k + rank_R(id)). + Robust to score-scale differences between dense (cosine) and lexical + (BM25). k=60 is the literature default; not particularly sensitive. + """ + scores: dict[str, float] = {} + for ranking in rankings: + for rank, doc_id in enumerate(ranking): + scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1) + return sorted(scores, key=lambda d: scores[d], reverse=True) + + +def _structured_ratings_block(sidecar: dict) -> str: + """Render the sidecar's grouped characteristics + identity as a + fact-checkable block, with the source URL pinned at top. + + This is the response body for ``get_page`` and the embedded + "canonical data" block agents should trust over any free-text + paraphrase.""" + lines: list[str] = [] + name = sidecar.get("product_name") or sidecar.get("source_key", "") + lines.append(f"# {name}") + lines.append("") + + facets: list[str] = [] + if sidecar.get("vendor"): + facets.append(f"**Vendor:** {sidecar['vendor']}") + if sidecar.get("brand"): + facets.append(f"**Brand:** {str(sidecar['brand']).title()}") + if sidecar.get("crop"): + facets.append(f"**Crop:** {str(sidecar['crop']).capitalize()}") + if sidecar.get("relative_maturity") and sidecar.get("crop") == "corn": + facets.append(f"**Relative maturity:** {sidecar['relative_maturity']}") + if sidecar.get("maturity_group") and sidecar.get("crop") == "soybeans": + facets.append(f"**Maturity group:** {sidecar['maturity_group']}") + if sidecar.get("relative_maturity") and sidecar.get("crop") == "wheat": + facets.append(f"**Maturity:** {sidecar['relative_maturity']}") + if sidecar.get("wheat_class"): + facets.append(f"**Wheat class:** {sidecar['wheat_class']}") + if sidecar.get("release_year"): + facets.append(f"**Released:** {sidecar['release_year']}") + if sidecar.get("trait_stack"): + facets.append(f"**Trait stack:** {', '.join(sidecar['trait_stack'])}") + if facets: + lines.append(" · ".join(facets)) + lines.append("") + + urls = sidecar.get("source_urls") or [] + if urls: + lines.append(f"**Source:** {urls[0]}") + lines.append("") + + scale = sidecar.get("_scale_direction") or "scale direction not declared by source" + lines.append(f"**Rating scale (as published):** {scale}") + lines.append("") + + # Canonical ratings — one table per group, values verbatim. + for g in sidecar.get("characteristics_groups") or []: + label = g.get("label") or "Characteristics" + items = g.get("items") or [] + if not items: + continue + lines.append(f"## {label.title()}") + lines.append("") + lines.append("| Characteristic | Value |") + lines.append("|---|---|") + for it in items: + ch = (it.get("characteristic") or "").strip() + v = (it.get("value") or "").strip() + lines.append(f"| {ch} | {v} |") + lines.append("") + + if sidecar.get("positioning_statement"): + lines.append("## Vendor positioning") + lines.append("") + lines.append(sidecar["positioning_statement"].strip()) + lines.append("") + + if sidecar.get("strengths"): + lines.append("## Strengths / management notes (vendor copy)") + lines.append("") + for s in sidecar["strengths"]: + lines.append(f"- {s}") + lines.append("") + + rec = sidecar.get("regional_recommendations") or [] + if rec: + names = sorted({ + (r.get("product_list_name") or "").strip() + for r in rec + if (r.get("product_list_name") or "").strip() + }) + if names: + lines.append("## Listed in vendor regional seed guides") + lines.append("") + for n in names: + lines.append(f"- {n}") + lines.append("") + + return "\n".join(lines).rstrip() + "\n" # =========================================================================== @@ -119,119 +375,340 @@ def _read_page(bundle_id: str, page_id: str) -> tuple[str, dict] | None: @mcp.tool() def search_docs( - query: Annotated[str, Field(description=f"Natural-language query about {PRODUCT_NAME}.")], - version: Annotated[ + query: Annotated[str, Field(description=( + "Natural-language query about row-crop seed varieties. " + "Mention crop (corn/soybeans/wheat), maturity (RM days / " + "MG number / wheat class), traits, disease tolerances, " + "or soil/region constraints — they all carry retrieval signal." + ))], + crop: Annotated[ str | None, - Field(description="OPTIONAL version filter — restrict to one product version."), + Field(description="OPTIONAL filter: corn, soybeans, or wheat."), ] = None, - platform: Annotated[ + brand: Annotated[ str | None, - Field(description="OPTIONAL platform filter. Set to one of the platforms listed by list_versions(); omit for all platforms."), + Field(description=( + "OPTIONAL brand filter — DEKALB, ASGROW, WESTBRED, " + "GoldenHarvest, NK, AgriPro, Becks. Case-insensitive." + )), ] = None, - bundle_id: Annotated[ + vendor: Annotated[ str | None, - Field(description="OPTIONAL bundle filter — pin to a specific doc bundle slug."), + Field(description="OPTIONAL vendor filter — Bayer, Syngenta, Beck's."), + ] = None, + source: Annotated[ + str | None, + Field(description=( + "OPTIONAL source filter — e.g. 'bayer_seeds'. Use " + "list_versions() to see what's indexed." + )), ] = None, k: Annotated[int, Field(description="Number of results to return.", ge=1, le=50)] = 10, ) -> str: - """Search the {product} docs corpus. + """Search the seed-variety corpus for hybrids/varieties matching a query. - Returns the top-k most relevant chunks (with full source page URLs) - given a natural-language query. Optional filters narrow the search - to one version, one platform, or one bundle. Use list_versions() - first if you need to discover the available facet values. + Returns the top-k variety chunks with their full source URLs, ratings, + maturity, traits, and regional listings. Optional filters narrow to one + crop, brand, vendor, or scraper source. Use **list_versions()** first + to discover valid facet values. Use **lookup_variety()** on any + candidate the user is serious about — that returns the canonical + sidecar so you can verify exact rating values without trusting the + free-text chunk. - Call this tool whenever the user asks anything that should be - answerable from the official product documentation. + Call this tool whenever an ag professional or farmer asks anything + about choosing a seed variety, comparing hybrids, or finding seed + matched to a maturity / soil / region / disease constraint. + + NEVER answer seed-selection questions from prior knowledge alone — + seed catalogs change yearly and brand-specific. Always search first. """ with TimedCall("search_docs", { - "query": query, "version": version, "platform": platform, - "bundle_id": bundle_id, "k": k, + "query": query, "crop": crop, "brand": brand, + "vendor": vendor, "source": source, "k": k, }) as _call: - # TODO Phase 2-3: query Chroma collection (see rag/index.py for - # how it was built). Render the top-k chunks as markdown with - # source URLs. - # TODO Phase 6: optional reranker via _rerank() if RERANK_URL set. - # TODO Phase 8: hybrid retrieval if HYBRID_SEARCH=true — run - # dense + BM25 in parallel, RRF-fuse, hand merged pool to rerank. - _call.set(hits_returned=0) - raise NotImplementedError("Phase 2/3: implement Chroma query + rendering") + where = _build_where(crop, brand, vendor, source, None) + pool_size = max(k * 3, RERANK_POOL) + + # Exact-code pre-filter. Variety codes ("DKC62-08RIB", "AG29XF4") + # have NO semantic neighbors — dense retrieval misses them and + # RRF can let off-topic noise float to top-1. If the query + # contains a token that exactly matches a variety's identifier, + # pin those varieties to the top of results. + pinned_ids: list[str] = [] + for src, sk in _exact_code_matches(query): + pinned_ids.append(f"{src}::{sk}::0") + + # Dense retrieval via Chroma. + try: + col = _collection() + except Exception as exc: # noqa: BLE001 + _call.set(error_dense=str(exc), hits_returned=0) + return ( + "_(retrieval unavailable — Chroma collection not found. " + "Has the indexer run? `python -m rag.index --rebuild`.)_" + ) + + try: + dense = col.query( + query_texts=[query], + n_results=pool_size, + where=where, + ) + except Exception as exc: # noqa: BLE001 + _call.set(error_dense=str(exc), hits_returned=0) + return f"_(retrieval failed: {exc})_" + + dense_ids: list[str] = (dense.get("ids") or [[]])[0] + dense_docs: list[str] = (dense.get("documents") or [[]])[0] + dense_metas: list[dict] = (dense.get("metadatas") or [[]])[0] + dense_dists: list[float] = (dense.get("distances") or [[]])[0] + + id_to_doc = dict(zip(dense_ids, dense_docs)) + id_to_meta = dict(zip(dense_ids, dense_metas)) + id_to_dist = dict(zip(dense_ids, dense_dists)) + + # Hybrid: optionally fuse with BM25. Both are pulled at pool_size, + # fused via RRF, top-k returned. + used_hybrid = False + if HYBRID_SEARCH: + bm25 = _bm25_index() + if bm25 is not None: + bm25_hits = bm25.query(query, n=pool_size, where=where) + bm25_ids = [h[0] for h in bm25_hits] + if bm25_ids: + fused = _rrf_fuse([dense_ids, bm25_ids]) + fuzzy_ids = fused + used_hybrid = True + else: + fuzzy_ids = dense_ids + else: + fuzzy_ids = dense_ids + else: + fuzzy_ids = dense_ids + + # Pin exact-code matches at top, then fill remainder from fuzzy + # retrieval (deduped). Pinned matches are deterministic and + # high-confidence; they should never lose to a fuzzy match. + final_ids: list[str] = [] + seen: set[str] = set() + for cid in pinned_ids + fuzzy_ids: + if cid in seen: + continue + seen.add(cid) + final_ids.append(cid) + if len(final_ids) >= k: + break + + # For ids returned by BM25 but not by Chroma, we need their docs/ + # metadata too. Re-fetch by id. + missing = [i for i in final_ids if i not in id_to_doc] + if missing: + try: + extra = col.get(ids=missing, include=["documents", "metadatas"]) + for cid, doc, meta in zip( + extra.get("ids") or [], + extra.get("documents") or [], + extra.get("metadatas") or [], + ): + id_to_doc[cid] = doc + id_to_meta[cid] = meta + except Exception as exc: # noqa: BLE001 + log.warning("get-by-id for BM25-only hits failed: %s", exc) + + _call.set( + hits_returned=len(final_ids), + hybrid=used_hybrid, + pool_size=pool_size, + ) + + if not final_ids: + return ( + "_(no varieties matched. Try broadening the query or " + "removing filters — call list_versions() to see what's " + "indexed.)_" + ) + + blocks: list[str] = [] + for cid in final_ids: + doc = id_to_doc.get(cid, "") + meta = id_to_meta.get(cid, {}) + dist = id_to_dist.get(cid) if not used_hybrid else None + blocks.append(_format_hit(doc, meta, dist)) + + header = ( + f"# Search results — {len(final_ids)} variety chunk" + f"{'s' if len(final_ids) != 1 else ''}" + f"{' (dense + BM25 hybrid)' if used_hybrid else ' (dense only)'}\n" + f"_Use `lookup_variety(source_key=...)` on any candidate " + f"to fact-check ratings from the canonical sidecar._\n\n---\n\n" + ) + return header + "\n---\n\n".join(blocks) @mcp.tool() def get_page( - bundle_id: Annotated[str, Field(description="Bundle slug.")], - page_id: Annotated[str, Field(description="Page filename within the bundle.")], + source: Annotated[str, Field(description=( + "Scraper source id — e.g. 'bayer_seeds'. Same as the `source` " + "field in search_docs results." + ))], + source_key: Annotated[str, Field(description=( + "Per-variety stable key — e.g. 'dekalb-dkc62-08rib'. Same as " + "the `source_key` field in search_docs results." + ))], ) -> str: - """Return the full markdown for one page, plus a metadata header. + """Return the full canonical record for one variety. - Use after search_docs surfaces a relevant page and the user (or you) - want the complete text — not just the matched chunks. + Emits a structured ratings header (identity, all characteristic + groups, vendor positioning, regional listings) sourced from the + variety's sidecar JSON, followed by the raw markdown body the + chunker indexed. + + Use this when the user is comparing varieties closely or wants to + see every published rating value, not just the ones that matched a + query. The structured header is the canonical fact-check surface; + use it to validate any specific rating value before quoting it. """ - with TimedCall("get_page", {"bundle_id": bundle_id, "page_id": page_id}) as _call: - data = _read_page(bundle_id, page_id) - if data is None: + with TimedCall("get_page", {"source": source, "source_key": source_key}) as _call: + sidecar = _read_sidecar(source, source_key) + if sidecar is None: _call.set(found=False) - return f"Page not found: {bundle_id}/{page_id}" - md, meta = data - _call.set(found=True, page_chars=len(md)) - # TODO: add a metadata header (title, version, source URL) above - # the body. Product-specific shape. - return md + return ( + f"_(variety not found: source='{source}' " + f"source_key='{source_key}'. Use list_versions() to see " + f"available sources or search_docs() to find candidates.)_" + ) + body = _read_markdown(source, source_key) or "" + structured = _structured_ratings_block(sidecar) + _call.set(found=True, body_chars=len(body), groups=len(sidecar.get("characteristics_groups") or [])) + sep = "\n\n---\n\n## Indexed body (chunk source text)\n\n" + return structured + sep + body @mcp.tool() def list_versions() -> str: - """List the available version/platform facets across all bundles. + """List the available scraper sources and the crop/brand/vendor + facets across all indexed varieties. - Use this to discover valid filter values for search_docs. + Use this BEFORE search_docs() the first time you query, to discover + which scraper sources, brands, vendors, and crops are actually in + the corpus right now. The agent should not assume — the corpus + grows as more vendors are scraped. """ with TimedCall("list_versions", {}) as _call: - cat = _bundles() - if not cat: - return "_(no bundles indexed yet — run the scraper + indexer)_" - versions = sorted({b.get("version") for b in cat.values() if b.get("version")}) - platforms = sorted({b.get("platform") for b in cat.values() if b.get("platform")}) - _call.set(versions=len(versions), platforms=len(platforms)) - lines = [f"# Facets across {len(cat)} bundle(s)", ""] - if versions: - lines.append("## Versions"); lines.append("") - for v in versions: lines.append(f"- `{v}`") + facets: dict[str, dict[str, int]] = { + "source": {}, "vendor": {}, "brand": {}, "crop": {}, + } + n_varieties = 0 + if CORPUS.exists(): + for source_dir in sorted(CORPUS.iterdir()): + if not source_dir.is_dir() or source_dir.name.startswith("."): + continue + for sc in source_dir.glob("*.json"): + try: + d = json.loads(sc.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + continue + n_varieties += 1 + for k in facets: + v = d.get(k) + if v: + facets[k][v] = facets[k].get(v, 0) + 1 + + if n_varieties == 0: + return ( + "_(no varieties indexed yet — run scrapers + indexer " + "before calling search_docs)_" + ) + + _call.set( + varieties=n_varieties, + sources=len(facets["source"]), + vendors=len(facets["vendor"]), + brands=len(facets["brand"]), + crops=len(facets["crop"]), + ) + + lines = [f"# Corpus facets ({n_varieties} varieties indexed)", ""] + for label, counts in facets.items(): + if not counts: + continue + lines.append(f"## {label}") lines.append("") - if platforms: - lines.append("## Platforms"); lines.append("") - for p in platforms: lines.append(f"- `{p}`") - return "\n".join(lines) + for k, n in sorted(counts.items(), key=lambda kv: -kv[1]): + lines.append(f"- `{k}` — {n} varieties") + lines.append("") + return "\n".join(lines).rstrip() -# --------------------------------------------------------------------------- -# Stubs for later phases — keep the signatures in this file so refactors -# don't lose the contracts. Implementations come per phase. -# --------------------------------------------------------------------------- +@mcp.tool() +def lookup_variety( + source_key: Annotated[str, Field(description=( + "Per-variety stable key — e.g. 'dekalb-dkc62-08rib'." + ))], + source: Annotated[ + str | None, + Field(description=( + "OPTIONAL scraper source ('bayer_seeds' etc). If omitted, " + "scans all sources for the source_key." + )), + ] = None, +) -> str: + """Return the canonical sidecar JSON for one variety, verbatim. -# @mcp.tool() # Phase 9 -# def list_cluster(bundle_id: str, page_id: str) -> str: ... + USE THIS to fact-check any rating value before quoting it to the + farmer. The output is the exact data the scraper captured from the + vendor's published catalog — no paraphrasing, no inference. -# @mcp.tool() # Phase 9 -# def diff_versions(bundle_id: str, page_id: str, against_bundle_id: str, context: int = 3) -> str: ... + Call this whenever the user (or you) needs an exact value for a + specific rating, trait, or maturity — not just a semantic match + from search_docs. + """ + with TimedCall("lookup_variety", {"source_key": source_key, "source": source}) as _call: + if source: + sidecar = _read_sidecar(source, source_key) + if sidecar is not None: + _call.set(found=True, source=source) + return "```json\n" + json.dumps(sidecar, indent=2, ensure_ascii=False) + "\n```" + _call.set(found=False, source=source) + return f"_(variety '{source_key}' not found under source '{source}')_" -# @mcp.tool() # Phase 9 -# def bundle_changelog(bundle_id_new: str, bundle_id_old: str, min_churn: int = 5, max_changed: int = 50) -> str: ... + # No source given — scan all source dirs for a match. + if not CORPUS.exists(): + _call.set(found=False) + return "_(corpus is empty — no scrapers have run yet)_" + matches: list[tuple[str, dict]] = [] + for source_dir in sorted(CORPUS.iterdir()): + if not source_dir.is_dir() or source_dir.name.startswith("."): + continue + sidecar = _read_sidecar(source_dir.name, source_key) + if sidecar is not None: + matches.append((source_dir.name, sidecar)) -# @mcp.tool() # Phase 13 -# def weekly_digest(days: int = 7, version: str | None = None, platform: str | None = None, ...) -> str: ... + if not matches: + _call.set(found=False) + return ( + f"_(no variety with source_key '{source_key}' found in any " + f"source. Use search_docs() to find the right key.)_" + ) -# @mcp.tool() # Phase 9 (or 3 — useful early) -# def corpus_status() -> str: ... + _call.set(found=True, matches=len(matches)) + if len(matches) == 1: + src_name, sidecar = matches[0] + return ( + f"_(matched in source `{src_name}`)_\n\n" + "```json\n" + + json.dumps(sidecar, indent=2, ensure_ascii=False) + + "\n```" + ) -# @mcp.tool() # Phase 11 -# def myproduct_api_lessons(topic: str | None = None) -> str: ... - -# @mcp.tool() # Phase 12 -# def find_doc_inconsistencies(scope_query: str, ...) -> str: ... - -# @mcp.tool() # Phase 12 -# def submit_doc_bug(page_url: str, content: str, email: str | None = None, ...) -> str: ... + # Multi-match: serialize each labeled. + out: list[str] = [f"_(matched {len(matches)} sources)_\n"] + for src_name, sidecar in matches: + out.append(f"### source: `{src_name}`") + out.append("```json") + out.append(json.dumps(sidecar, indent=2, ensure_ascii=False)) + out.append("```") + return "\n".join(out) # =========================================================================== @@ -252,8 +729,6 @@ def main() -> None: else: mcp.settings.host = args.host mcp.settings.port = args.port - # DNS-rebinding protection defaults to localhost-only — disable for - # container-network DNS hostnames. See PLAN.md "Hosting" notes. if os.environ.get("MCP_DISABLE_DNS_REBINDING_PROTECTION") in {"1", "true", "yes"}: mcp.settings.transport_security.enable_dns_rebinding_protection = False mcp.run(transport=args.transport) diff --git a/rag/bm25.py b/rag/bm25.py index 06982e01..79507371 100644 --- a/rag/bm25.py +++ b/rag/bm25.py @@ -1,17 +1,14 @@ """SQLite FTS5-backed BM25 retrieval over the same chunks Chroma indexes. Hybrid retrieval (BM25 + dense + Reciprocal Rank Fusion) addresses a -limit of single-tower dense embeddings: when a query has specific -technical terms (filenames, language names, error codes, API paths), -the dense embedding doesn't bridge from the query into a short -code-focused chunk. The chunk loses to the much larger crowd of -prose chunks that semantically match the query topic. - -BM25 handles this directly. Lexical overlap on rare terms ("python", -"create_vpg.py", "PROTECTED_SITE_ID", "applyUpgrade") scores those -chunks high. Fused with the dense ranking via RRF, the hybrid result -is strictly better than either alone for the queries we've seen -fail. +single-tower dense embedding's weakness on rare technical tokens — +for seed-mcp that's variety codes ("DKC62-08RIB"), trait codes +("XF", "VT2PRIB"), disease abbreviations ("SCN", "SDS", "Goss's"), +and Rps gene names ("Rps1c", "Rps3a"). Dense embeddings don't bridge +queries like "Rps3a soybean" cleanly into the relevant chunk; BM25 +matches them directly. Fused with the dense ranking via RRF, the +hybrid result is strictly better than either alone for the queries +we expect from the farm-advisor agent. Why SQLite FTS5: - In the stdlib. Zero new deps. @@ -19,36 +16,13 @@ Why SQLite FTS5: `rag.index --rebuild` regenerates from corpus. - Built-in `bm25()` ranking function. No knobs to tune that matter for our use case (k1=1.2, b=0.75 defaults are fine). - - Builds 70k+ chunks in seconds. Faster than the Chroma rebuild's - embedding step by 100×, so it adds basically nothing to the - full-rebuild cycle. + - Builds <1k chunks in milliseconds; adds nothing to rebuild time. Schema is two tables to keep filtering clean. FTS5 doesn't filter nicely on its own columns; the content_rowid pattern keeps an -external metadata table joinable by rowid: - - CREATE TABLE chunks_meta ( - rowid INTEGER PRIMARY KEY AUTOINCREMENT, - id TEXT UNIQUE, - bundle_id TEXT, page_id TEXT, version TEXT, - platform TEXT, product TEXT, ordinal INTEGER - ); - CREATE VIRTUAL TABLE chunks_fts USING fts5( - text, - tokenize = 'porter unicode61 remove_diacritics 2', - content = 'chunks_meta', - content_rowid = 'rowid' - ); - -Queries: - - SELECT m.id, bm25(chunks_fts) AS score - FROM chunks_meta m - JOIN chunks_fts f ON m.rowid = f.rowid - WHERE f MATCH ? - AND m.version = ? -- optional metadata filter - ORDER BY bm25(chunks_fts) -- lower = better in FTS5 - LIMIT ?; +external metadata table joinable by rowid. For seed-mcp the +filterable columns are seed-domain facets — source, vendor, brand, +crop, source_key — rather than the docs-template version/platform. """ from __future__ import annotations @@ -63,42 +37,30 @@ log = logging.getLogger(__name__) # Default location: bm25/_docs.db at the repo root, next to chroma/. ROOT = Path(__file__).resolve().parent.parent DEFAULT_DB_DIR = ROOT / "bm25" -DEFAULT_DB_NAME = "_docs.db" +DEFAULT_DB_NAME = "crop_seed_docs.db" -# Columns we expose as filterable metadata. Mirrors what _build_where in -# docs_mcp/server.py accepts so the same filter dicts work for both -# Chroma and BM25 without per-retriever translation in the caller. -FILTER_COLUMNS = ("bundle_id", "page_id", "version", "platform", "product", "ordinal") +# Columns we expose as filterable metadata. Mirrors what +# ``docs_mcp.server._build_where`` accepts so the same filter dict +# works for both Chroma and BM25 without per-retriever translation. +FILTER_COLUMNS = ("source", "vendor", "brand", "crop", "source_key", "ordinal") -# Allowlist tokenizer for free-text queries. FTS5's parser chokes on lots -# of punctuation we routinely see in user queries (".10.9", "?", "VPG's", -# em-dash, etc.). Rather than blocklist every operator, just keep -# alphanumerics + a few separators and replace everything else with a -# space. This loses the ability to phrase-search ("exact match") but we -# don't expose that to users anyway — they ask natural-language questions -# and want the answer, not a Boolean DSL. +# Allowlist tokenizer for free-text queries. FTS5's parser chokes on +# lots of punctuation we routinely see in farmer queries ("Rps1c", +# "SCN-resistant", "0.05 MG", em-dashes). Rather than blocklist every +# operator, keep alphanumerics + a few separators and replace +# everything else with a space. _KEEP_RE = re.compile(r"[^A-Za-z0-9_\s]") -# FTS5 reserves these Boolean operator KEYWORDS at the token level — -# stripping them avoids accidental phrase-query behavior when a user -# query happens to contain bare "AND", "OR", "NOT", "NEAR". +# FTS5 reserves these Boolean operator KEYWORDS at the token level. _BOOLEAN_KW_RE = re.compile(r"(? str: """Reduce a natural-language query to an FTS5 OR-of-tokens query. - Two transformations: - - 1. Non-alphanumeric → space (drops punctuation; "10.9?" becomes - "10 9"). Lets us handle versions, parens, question marks, etc. - without inviting FTS5 parse errors. - 2. Boolean keywords stripped (FTS5 reserves AND/OR/NOT/NEAR). - 3. Tokens explicitly OR'd. FTS5's default is AND-of-tokens — for - any non-trivial natural-language query that means zero hits - (no chunk contains every word). OR semantics is what we want: - BM25 already weights documents containing more query terms - higher, so we don't lose precision, but we DO gain recall. + See ``crop-chem-docs`` for the rationale; same transformation + applies here. OR semantics maximizes recall — BM25 already + weights documents with more query-term matches higher. """ cleaned = _KEEP_RE.sub(" ", text) cleaned = _BOOLEAN_KW_RE.sub(" ", cleaned) @@ -113,9 +75,9 @@ def _where_to_sql(where: dict | None) -> tuple[str, list[Any]]: Accepts the same shapes ``docs_mcp.server._build_where`` produces: - None → ("", []) - {"version": "10.9"} → ("AND m.version = ?", ["10.9"]) - {"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...]) + None → ("", []) + {"crop": "corn"} → ("AND m.crop = ?", ["corn"]) + {"$and": [{...}, {...}]} → ("AND m.X = ? AND m.Y = ?", [...]) Unknown keys are silently dropped (defensive — better to over-match than to crash on a filter we don't know). @@ -158,35 +120,32 @@ class BM25Index: def build(self, records: list[dict]) -> int: """Rebuild the index from scratch from `records`. - `records` is the same list ``rag.index.page_records`` produces: + `records` is the same list ``rag.index.variety_records`` produces: ``[{"id": ..., "text": ..., "metadata": {...}}, ...]``. Bulk - insert wrapped in a transaction — single-digit seconds for the - full 73k-chunk corpus. + insert wrapped in a transaction. """ self.db_path.parent.mkdir(parents=True, exist_ok=True) - # Drop and recreate. Idempotent rebuild. if self.db_path.exists(): self.db_path.unlink() with sqlite3.connect(self.db_path) as con: con.executescript(self._schema_sql()) con.executemany( - "INSERT INTO chunks_meta (id, bundle_id, page_id, version, " - "platform, product, ordinal) VALUES (?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO chunks_meta " + "(id, source, vendor, brand, crop, source_key, ordinal) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", [ ( r["id"], - r["metadata"].get("bundle_id") or "", - r["metadata"].get("page_id") or "", - r["metadata"].get("version") or "", - r["metadata"].get("platform") or "", - r["metadata"].get("product") or "", + r["metadata"].get("source") or "", + r["metadata"].get("vendor") or "", + r["metadata"].get("brand") or "", + r["metadata"].get("crop") or "", + r["metadata"].get("source_key") or "", int(r["metadata"].get("ordinal") or 0), ) for r in records ], ) - # Populate the FTS5 contentless-ish table by rowid. We populated - # chunks_meta first; rowids align with insertion order. con.executemany( "INSERT INTO chunks_fts (rowid, text) VALUES (?, ?)", [ @@ -210,15 +169,12 @@ class BM25Index: FTS5's bm25() returns NEGATIVE numbers — more relevant docs have smaller (more negative) scores. We order ASC so the first row is - the most relevant. Callers that need a "rank" should enumerate - the returned list. + the most relevant. """ sanitized = _sanitize_query(text) if not sanitized: return [] where_sql, params = _where_to_sql(where) - # FTS5 MATCH wants the unaliased table name on its left, so we use - # chunks_fts (no alias) and JOIN by rowid against chunks_meta. sql = ( "SELECT m.id, bm25(chunks_fts) AS score " "FROM chunks_fts " @@ -232,17 +188,13 @@ class BM25Index: cur = con.execute(sql, [sanitized, *params, n]) return [(row[0], float(row[1])) for row in cur.fetchall()] except sqlite3.OperationalError as e: - # FTS5 syntax error (rare after sanitization) or db missing. - # Caller decides whether to fall back to dense-only. log.warning("bm25 query failed (%s); query=%r", e, sanitized[:80]) return [] def exists(self) -> bool: - """Cheap probe — does the index file exist on disk?""" return self.db_path.exists() def count(self) -> int: - """Number of chunks indexed. 0 if the db is missing or empty.""" if not self.exists(): return 0 try: @@ -257,18 +209,19 @@ class BM25Index: def _schema_sql() -> str: return """ CREATE TABLE chunks_meta ( - rowid INTEGER PRIMARY KEY AUTOINCREMENT, - id TEXT UNIQUE NOT NULL, - bundle_id TEXT, - page_id TEXT, - version TEXT, - platform TEXT, - product TEXT, - ordinal INTEGER + rowid INTEGER PRIMARY KEY AUTOINCREMENT, + id TEXT UNIQUE NOT NULL, + source TEXT, + vendor TEXT, + brand TEXT, + crop TEXT, + source_key TEXT, + ordinal INTEGER ); - CREATE INDEX idx_meta_version ON chunks_meta(version); - CREATE INDEX idx_meta_platform ON chunks_meta(platform); - CREATE INDEX idx_meta_bundle ON chunks_meta(bundle_id); + CREATE INDEX idx_meta_source ON chunks_meta(source); + CREATE INDEX idx_meta_crop ON chunks_meta(crop); + CREATE INDEX idx_meta_brand ON chunks_meta(brand); + CREATE INDEX idx_meta_source_key ON chunks_meta(source_key); CREATE VIRTUAL TABLE chunks_fts USING fts5( text, diff --git a/rag/chunk.py b/rag/chunk.py index b8d73176..b53bb220 100644 --- a/rag/chunk.py +++ b/rag/chunk.py @@ -1,126 +1,324 @@ -"""Markdown chunker — paragraph-aware, ~400-600 token target. +"""Chunker for seed-variety corpus. -Adjust the chunking strategy per product if your page format differs -significantly from prose. The output shape (id, text, metadata) is -fixed by the downstream Chroma + BM25 indexing in rag/index.py — don't -change that. +Each variety becomes ONE chunk by default. Variety pages are small +(typically 2-3 KB of useful signal) and nomic-embed-text handles up +to ~8 K tokens cleanly. Splitting a variety across chunks dilutes +the named-rating embeddings (e.g. "SCN resistance 7") that farmers +search by — keep them together. -The key knob you'll tune per product is chunk-0. Dense retrieval lands -on chunk 0 first for most queries. Make it a synthetic chunk built -from: +The chunk text is a synthetic preamble assembled deterministically +from the sidecar JSON. Every value in the chunk text comes verbatim +from the source. The framing words ("Disease ratings (1-9, 9=best):", +"Maturity group:", etc.) are template glue — *we add structure, we +do NOT add facts*. Given the same sidecar, this chunker always +produces the same chunk text. That's the anti-hallucination +contract: the retriever can never surface a rating value that +wasn't in the source. - - the page title (as natural-language H1) - - a 1-sentence task description (you'll have to generate this — for - pages that already have a "## Overview" or "## Introduction" the - first sentence usually works) - - a keyword bag of important terms (filenames, API names, error - codes — the rare technical tokens that BM25 lights up on) +Metadata is flattened to Chroma-safe primitives (str/int/float/bool): -Without a rich chunk 0, dense retrieval gets dominated by the much -larger prose body, and short pages (script examples, reference cards) -get buried. + source "bayer_seeds" + source_key "dekalb-dkc075-70rib" + vendor "Bayer" + brand "DEKALB" + crop "corn" | "soybeans" | "wheat" + product_name "DKC075-70RIB BRAND BLEND" + product_id canonical full id + source_url the variety's page URL + rm corn RM as int when parseable (else absent) + mg soy MG as float when parseable (else absent) + release_year int when known + trait_codes_csv comma-separated trait codes for substring search + rating_scale "1-9 (9 = best)" — chunker should ALWAYS attach + this so downstream code can detect a flip + ordinal chunk index within variety (0-based) + +Lists like ``regional_recommendations`` and the full per-rating dicts +do NOT fit Chroma's metadata constraints — they stay in the sidecar +JSON, surfaced by ``get_page`` / ``lookup_variety``. """ from __future__ import annotations +import json import re +from pathlib import Path from typing import Iterator -# Approximate token estimate from char count. Tunable — set per -# embedder if the default 4 chars/token is wrong. -CHARS_PER_TOKEN = 4 -TARGET_TOKENS = 500 -TARGET_CHARS = TARGET_TOKENS * CHARS_PER_TOKEN +# Rating-group classification. The source publishes characteristics +# grouped by label; we map those labels to one of three buckets in +# the chunk preamble so retrieval gets coherent text. Group labels not +# listed here fall into "other" and are still emitted, just in their +# own section. +DISEASE_GROUP_LABELS = { + "DISEASE RATINGS", + "PEST AND DISEASE RESISTANCE", +} +AGRONOMIC_GROUP_LABELS = { + "GROWTH", + "HARVEST", + "PRODUCTION", + "KEY CHARACTERISTICS", + "QUALITY", +} +MANAGEMENT_GROUP_LABELS = { + "MANAGEMENT", + "HERBICIDE", + "SENSITIVITY", + "PLANT DESCRIPTION", +} -def estimate_tokens(text: str) -> int: - return max(1, len(text) // CHARS_PER_TOKEN) +def _parse_rm(value: object) -> int | None: + """Best-effort RM-days int. Returns None if not a clean integer + (e.g. wheat's qualitative 'Early'/'Medium-Early' values).""" + if value is None: + return None + s = str(value).strip() + if not s: + return None + try: + # Handle floats stored as strings ("105.0") and the trailing + # tenths sometimes seen on early corn ("75"). + return int(float(s)) + except ValueError: + return None -def split_paragraphs(md: str) -> list[str]: - """Split markdown into paragraph-ish blocks. +def _parse_mg(value: object) -> float | None: + """Best-effort MG float. Soy MGs go from 00 to 9.0 with one decimal.""" + if value is None: + return None + s = str(value).strip() + if not s: + return None + try: + return float(s) + except ValueError: + return None - Keeps fenced code blocks together (don't slice through ```). - Headings start new paragraphs. + +def _format_items(items: list[dict]) -> str: + """Render `[{characteristic, value}, ...]` to a compact inline list.""" + out: list[str] = [] + for it in items: + ch = (it.get("characteristic") or "").strip() + v = (it.get("value") or "").strip() + if ch and v: + out.append(f"{ch} {v}") + elif ch: + out.append(f"{ch} —") + return ", ".join(out) + + +def _render_variety_chunk(sidecar: dict) -> str: + """Build the dense preamble for one variety from its sidecar JSON. + + Faithful to source: every numeric/categorical *value* is verbatim + from ``sidecar``. The only generated text is the framing language. """ - blocks: list[str] = [] - current: list[str] = [] - in_fence = False - for line in md.splitlines(keepends=True): - stripped = line.strip() - if stripped.startswith("```"): - in_fence = not in_fence - current.append(line) + lines: list[str] = [] + + # ---- Identity line -------------------------------------------------- + name = sidecar.get("product_name") or sidecar.get("source_key") or "" + brand = (sidecar.get("brand") or "").strip() + vendor = sidecar.get("vendor") or "" + crop = (sidecar.get("crop") or "").strip() + crop_label = crop.capitalize() if crop else "" + ident = f"# {name}" + sub = " ".join(filter(None, [ + f"({brand.title()} {crop_label} variety, {vendor})" if brand and crop_label and vendor else "", + ])) + lines.append(ident) + if sub: + lines.append("") + lines.append(sub) + + # ---- Identity body -------------------------------------------------- + facts: list[str] = [] + + rm = sidecar.get("relative_maturity") + mg = sidecar.get("maturity_group") + wc = sidecar.get("wheat_class") + if crop == "corn" and rm: + facts.append(f"Relative maturity {rm}") + elif crop == "soybeans" and mg: + facts.append(f"Maturity group {mg}") + elif crop == "wheat": + if rm: + facts.append(f"Maturity {rm}") + if wc: + facts.append(f"Wheat class {wc}") + + traits = sidecar.get("trait_stack") or [] + trait_descs = sidecar.get("trait_descriptions") or [] + if traits: + if trait_descs: + facts.append( + "Trait stack: " + + ", ".join(traits) + + " (" + + "; ".join(trait_descs) + + ")" + ) + else: + facts.append("Trait stack: " + ", ".join(traits)) + + if sidecar.get("release_year"): + facts.append(f"Released {sidecar['release_year']}") + + if facts: + lines.append("") + lines.append(". ".join(facts) + ".") + + # ---- Positioning ---------------------------------------------------- + pos = (sidecar.get("positioning_statement") or "").strip() + if pos: + lines.append("") + lines.append(f"Positioning: {pos}") + + # ---- Ratings, bucketed for retrieval -------------------------------- + scale = sidecar.get("_scale_direction") or "(scale direction not declared)" + groups = sidecar.get("characteristics_groups") or [] + disease: list[dict] = [] + agronomic: list[dict] = [] + management: list[dict] = [] + other: list[tuple[str, list[dict]]] = [] + for g in groups: + label = (g.get("label") or "").upper().strip() + items = g.get("items") or [] + if not items: continue - if in_fence: - current.append(line) - continue - if stripped.startswith("#"): - if current: - blocks.append("".join(current).strip()) - current = [] - current.append(line) - continue - if not stripped and current and not "".join(current).strip().endswith("\n\n"): - current.append(line) - blocks.append("".join(current).strip()) - current = [] - continue - current.append(line) - if current: - blocks.append("".join(current).strip()) - return [b for b in blocks if b] + if label in DISEASE_GROUP_LABELS: + disease.extend(items) + elif label in AGRONOMIC_GROUP_LABELS: + agronomic.extend(items) + elif label in MANAGEMENT_GROUP_LABELS: + management.extend(items) + else: + other.append((g.get("label") or "Other", items)) + + if disease: + lines.append("") + lines.append(f"Disease ratings ({scale}): {_format_items(disease)}.") + if agronomic: + lines.append("") + lines.append(f"Agronomic ratings ({scale}): {_format_items(agronomic)}.") + if management: + lines.append("") + lines.append(f"Management notes: {_format_items(management)}.") + for label, items in other: + lines.append("") + lines.append(f"{label.title()}: {_format_items(items)}.") + + # ---- Strengths narrative -------------------------------------------- + strengths = sidecar.get("strengths") or [] + if strengths: + lines.append("") + lines.append("Strengths and management notes:") + for s in strengths: + s = (s or "").strip() + if s: + lines.append(f"- {s}") + + # ---- Regional listings (compact, not the agronomist emails) --------- + rec = sidecar.get("regional_recommendations") or [] + if rec: + names = sorted({ + (r.get("product_list_name") or "").strip() + for r in rec + if (r.get("product_list_name") or "").strip() + }) + if names: + lines.append("") + lines.append("Listed in regional seed guides: " + "; ".join(names) + ".") + + # ---- Provenance footer (must always be in the chunk text so it + # can never be lost between retrieval and LLM rendering) -------- + urls = sidecar.get("source_urls") or [] + if urls: + lines.append("") + lines.append(f"Source: {urls[0]}") + + return "\n".join(lines).strip() + "\n" -def chunks_from_page( - text: str, - page_id: str, - metadata: dict, +def _flat_metadata(sidecar: dict) -> dict: + """Distil sidecar into Chroma-safe metadata (primitives only).""" + md: dict = { + "source": sidecar.get("source") or "", + "source_key": sidecar.get("source_key") or "", + "vendor": sidecar.get("vendor") or "", + "brand": sidecar.get("brand") or "", + "crop": sidecar.get("crop") or "", + "product_name": sidecar.get("product_name") or "", + "product_id": sidecar.get("product_id") or "", + "source_url": (sidecar.get("source_urls") or [""])[0], + "rating_scale": sidecar.get("_scale_direction") or "", + } + rm = _parse_rm(sidecar.get("relative_maturity")) + mg = _parse_mg(sidecar.get("maturity_group")) + if rm is not None: + md["rm"] = rm + if mg is not None: + md["mg"] = mg + ry = sidecar.get("release_year") + if isinstance(ry, int): + md["release_year"] = ry + traits = sidecar.get("trait_stack") or [] + if traits: + # Comma-delimited for partial-match / human eyeballing. + # Bracket-padded so `LIKE '%,XF,%'` finds whole tokens. + md["trait_codes_csv"] = "," + ",".join(traits) + "," + if sidecar.get("wheat_class"): + md["wheat_class"] = sidecar["wheat_class"] + return md + + +def chunks_from_variety( + sidecar_path: Path | str, + *, + md_path: Path | str | None = None, ) -> Iterator[dict]: - """Yield chunk dicts ready for index.py to upsert. + """Yield chunk dict(s) for one variety. Currently emits exactly one. - The synthetic chunk 0 is the per-product customization point. The - default below is a simple title + body-first-paragraph; rewrite - for richer retrieval signal (see module docstring). + Args: + sidecar_path: path to the variety's JSON sidecar. + md_path: ignored (the chunker rebuilds from sidecar); kept + in the signature in case a future split-chunker + wants the rendered body. """ - paragraphs = split_paragraphs(text) - if not paragraphs: - return - - # ----- Chunk 0: synthetic anchor for dense retrieval --------- - title = metadata.get("title") or page_id - first_para = next((p for p in paragraphs if not p.startswith("#")), "") - chunk0_body = ( - f"# {title}\n\n" - f"{first_para[:300]}" - # TODO per product: append a keyword bag here (filenames, - # API names, error codes) for BM25 + dense joint coverage. - ) + sidecar = json.loads(Path(sidecar_path).read_text(encoding="utf-8")) + text = _render_variety_chunk(sidecar) + meta = _flat_metadata(sidecar) + chunk_id = f"{meta['source']}::{meta['source_key']}::0" yield { - "id": f"{metadata['bundle_id']}::{page_id}::0", - "text": chunk0_body, - "metadata": {**metadata, "ordinal": 0}, + "id": chunk_id, + "text": text, + "metadata": {**meta, "ordinal": 0}, } - # ----- Body chunks: pack paragraphs up to TARGET_CHARS ------- - ordinal = 1 - buf: list[str] = [] - buf_chars = 0 - for p in paragraphs: - if buf_chars + len(p) > TARGET_CHARS and buf: - yield { - "id": f"{metadata['bundle_id']}::{page_id}::{ordinal}", - "text": "\n\n".join(buf), - "metadata": {**metadata, "ordinal": ordinal}, - } - ordinal += 1 - buf = [] - buf_chars = 0 - buf.append(p) - buf_chars += len(p) - if buf: - yield { - "id": f"{metadata['bundle_id']}::{page_id}::{ordinal}", - "text": "\n\n".join(buf), - "metadata": {**metadata, "ordinal": ordinal}, - } + +# ----- Backwards-compat shim for the template's index.py ------------------- +# +# The template's ``rag.index.page_records`` calls +# ``chunks_from_page(md, page_id, base_meta)`` which doesn't know about +# sidecar JSON. We accept that signature but ignore it — index.py has +# been updated to use ``chunks_from_variety`` directly, and this shim +# is here only so a stray import of the old name doesn't break. +# +def chunks_from_page(text: str, page_id: str, metadata: dict) -> Iterator[dict]: + """Deprecated for seed-mcp; prefer ``chunks_from_variety``.""" + # Best-effort: if metadata carries a sidecar_path, dispatch. + sidecar_path = metadata.get("_sidecar_path") + if sidecar_path: + yield from chunks_from_variety(sidecar_path) + return + # Fallback — emit a single chunk of the raw markdown with whatever + # metadata we have. Better than crashing if someone calls this. + chunk_id = f"{metadata.get('source','unknown')}::{page_id}::0" + yield { + "id": chunk_id, + "text": text, + "metadata": {**metadata, "ordinal": 0}, + } diff --git a/rag/index.py b/rag/index.py index 8d1c74f8..91bb9412 100644 --- a/rag/index.py +++ b/rag/index.py @@ -1,15 +1,19 @@ -"""Build Chroma (and optionally BM25) indexes from corpus on disk. +"""Build Chroma (and BM25) indexes from the seed corpus on disk. -Reads `corpus//.{md,json}`, chunks each page, upserts -into Chroma. With --rebuild, drops + recreates the collection (clean -state). With --bm25-only, skips Chroma and rebuilds only the FTS5 -index — useful for fast iteration when chunking didn't change. +Reads ``corpus//.json`` sidecars, chunks each +variety via ``rag.chunk.chunks_from_variety``, upserts into Chroma. +With ``--rebuild``, drops + recreates the collection (clean state). +With ``--bm25-only``, skips Chroma and rebuilds only the FTS5 index +— useful for fast iteration when the chunker didn't change. + +Collection name is ``_docs`` (default: ``crop_seed_docs``). +Override via the PRODUCT_NAME env var. """ from __future__ import annotations import argparse -import json import logging +import os import time from pathlib import Path from typing import Iterator @@ -17,7 +21,7 @@ from typing import Iterator import chromadb from chromadb.config import Settings -from .chunk import chunks_from_page +from .chunk import chunks_from_variety from .embeddings import embedding_function log = logging.getLogger(__name__) @@ -27,39 +31,21 @@ ROOT = Path(__file__).resolve().parent.parent CORPUS = ROOT / "corpus" CHROMA_DIR = ROOT / "chroma" -# Collection name — convention: _docs. Override via env if needed. -import os -PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "myproduct") +PRODUCT_NAME = os.environ.get("PRODUCT_NAME", "crop_seed") COLLECTION = f"{PRODUCT_NAME}_docs" -def page_records() -> Iterator[dict]: - """Walk corpus/, yield chunks for every page.""" +def variety_records() -> Iterator[dict]: + """Walk ``corpus//.json``, yield one chunk per + variety.""" if not CORPUS.exists(): - log.error("corpus/ doesn't exist; run the scraper first") + log.error("corpus/ doesn't exist; run a scraper first") return - for bundle_dir in sorted(CORPUS.iterdir()): - if not bundle_dir.is_dir() or bundle_dir.name.startswith("."): + for source_dir in sorted(CORPUS.iterdir()): + if not source_dir.is_dir() or source_dir.name.startswith("."): continue - for md_path in sorted(bundle_dir.glob("*.md")): - page_id = md_path.stem - sidecar = md_path.with_suffix(".json") - if not sidecar.exists(): - log.warning("skipping %s — no JSON sidecar", md_path) - continue - md = md_path.read_text() - meta = json.loads(sidecar.read_text()) - # Surface common filter fields at the chunk-metadata level - # so Chroma's `where` filter can use them. - base_meta = { - "bundle_id": bundle_dir.name, - "page_id": page_id, - "title": meta.get("title") or "", - "version": meta.get("version") or "", - "platform": meta.get("platform") or "", - "product": meta.get("product") or "", - } - yield from chunks_from_page(md, page_id, base_meta) + for sidecar_path in sorted(source_dir.glob("*.json")): + yield from chunks_from_variety(sidecar_path) def upsert_to_chroma(records: list[dict]) -> int: @@ -67,7 +53,7 @@ def upsert_to_chroma(records: list[dict]) -> int: path=str(CHROMA_DIR), settings=Settings(anonymized_telemetry=False), ) - # Drop + recreate for --rebuild semantics + # Drop + recreate for --rebuild semantics. try: client.delete_collection(COLLECTION) except Exception: @@ -101,8 +87,11 @@ def main() -> int: log.info("reading corpus from %s", CORPUS) t0 = time.time() - records = list(page_records()) + records = list(variety_records()) log.info("loaded %d chunks in %.1fs", len(records), time.time() - t0) + if not records: + log.error("no chunks — is corpus/ populated?") + return 1 if args.bm25_only: from .bm25 import BM25Index @@ -118,8 +107,6 @@ def main() -> int: n = upsert_to_chroma(records) log.info("chroma: %d chunks in %.1fs", n, time.time() - t_c) - # Build BM25 too — see PLAN.md Phase 8. Safe to remove this block - # for products that don't need hybrid retrieval. try: from .bm25 import BM25Index t_b = time.time()