From b29f5636fc6d73ded990785569e9097c335d913b Mon Sep 17 00:00:00 2001 From: Justin Paul Date: Wed, 10 Jun 2026 15:47:03 -0400 Subject: [PATCH] =?UTF-8?q?rag:=20resilient=20embedder=20=E2=80=94=20rotat?= =?UTF-8?q?e/split=20on=20endpoint=20errors;=204-GPU=20embed=20pool=20(#3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/image-only.yml | 2 +- .gitea/workflows/refresh.yml | 2 +- rag/embeddings.py | 45 ++++++++++++++++++++++++++++----- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/.gitea/workflows/image-only.yml b/.gitea/workflows/image-only.yml index 4e09574..01fd5eb 100644 --- a/.gitea/workflows/image-only.yml +++ b/.gitea/workflows/image-only.yml @@ -22,7 +22,7 @@ env: # Two GPU-pinned Ollama containers on the Gitea host — same infra # zerto-docs uses. :11435 = Titan X, :11436 = 1080 Ti. Indexer # round-robins per batch. - OLLAMA_URLS: http://192.168.0.2:11435,http://192.168.0.2:11436 + OLLAMA_URLS: http://192.168.0.2:11435,http://192.168.0.2:11436,http://192.168.0.125:11434,http://192.168.0.126:11434 EMBED_MODEL: nomic-embed-text PRODUCT_NAME: morpheus diff --git a/.gitea/workflows/refresh.yml b/.gitea/workflows/refresh.yml index caef74a..67f2977 100644 --- a/.gitea/workflows/refresh.yml +++ b/.gitea/workflows/refresh.yml @@ -34,7 +34,7 @@ env: # :11435 owns the Titan X, :11436 owns the 1080 Ti; the indexer # round-robins per batch so both cards run in parallel. The host's # primary Ollama on :11434 is left alone for OpenWebUI etc. - OLLAMA_URLS: http://192.168.0.2:11435,http://192.168.0.2:11436 + OLLAMA_URLS: http://192.168.0.2:11435,http://192.168.0.2:11436,http://192.168.0.125:11434,http://192.168.0.126:11434 EMBED_MODEL: nomic-embed-text PRODUCT_NAME: morpheus diff --git a/rag/embeddings.py b/rag/embeddings.py index a072f7a..c1341e5 100644 --- a/rag/embeddings.py +++ b/rag/embeddings.py @@ -19,6 +19,7 @@ from __future__ import annotations import os import logging +import time from typing import Any import httpx @@ -43,11 +44,18 @@ EMBED_DIM = int(os.environ.get("EMBED_DIM", "768")) class OllamaEmbeddings(EmbeddingFunction): - """Calls /api/embed across N Ollama endpoints, naive round-robin. + """Calls /api/embed across N Ollama endpoints, round-robin per batch. For indexing throughput on multiple GPUs, run one Ollama container per GPU (pinned via NVIDIA_VISIBLE_DEVICES) and pass all their URLs in OLLAMA_URL — the embedder picks the next endpoint per batch. + + Resilient (ported from zerto-docs PR #45): a failed call rotates to + the next endpoint and retries with backoff instead of failing the + whole rebuild. HTTP status errors additionally halve the input — + the .0.125 Windows Ollama (4090) 400s when its model runner dies on + an oversized input array, and one endpoint rejecting a batch the + others accept shouldn't kill a multi-hour index build. """ def __init__(self, urls: list[str] = OLLAMA_URLS, model: str = EMBED_MODEL): @@ -56,14 +64,37 @@ class OllamaEmbeddings(EmbeddingFunction): self._next = 0 def __call__(self, input: Documents) -> Embeddings: + return self._embed(list(input), attempt=1) + + def _embed(self, texts: list, attempt: int) -> Embeddings: url = self.urls[self._next % len(self.urls)] self._next += 1 - with httpx.Client(timeout=300) as c: - r = c.post(f"{url}/api/embed", - json={"model": self.model, "input": list(input)}) - r.raise_for_status() - data = r.json() - return data.get("embeddings") or [] + try: + with httpx.Client(timeout=300) as c: + r = c.post(f"{url}/api/embed", + json={"model": self.model, "input": texts}) + r.raise_for_status() + return r.json().get("embeddings") or [] + except (httpx.TransportError, httpx.HTTPStatusError) as e: + if isinstance(e, httpx.HTTPStatusError): + desc = f"HTTP {e.response.status_code} ({e.response.text[:200]})" + else: + desc = f"transport error {type(e).__name__}" + if attempt >= 5: + log.error("%s from %s (%d texts) — giving up after %d attempts", + desc, url, len(texts), attempt) + raise + if isinstance(e, httpx.HTTPStatusError) and len(texts) > 16: + mid = len(texts) // 2 + log.warning("%s from %s — splitting %d texts into %d+%d (attempt %d)", + desc, url, len(texts), mid, len(texts) - mid, attempt) + return (self._embed(texts[:mid], attempt + 1) + + self._embed(texts[mid:], attempt + 1)) + backoff = 0.5 * (2 ** (attempt - 1)) # 0.5, 1, 2, 4 + log.warning("%s (attempt %d, %s) — retrying in %.1fs", + desc, attempt, url, backoff) + time.sleep(backoff) + return self._embed(texts, attempt + 1) def name(self) -> str: # newer chromadb requires this return f"ollama:{self.model}"