rag: resilient embedder — rotate/split on endpoint errors; 4-GPU embed pool #3
@@ -22,7 +22,7 @@ env:
|
|||||||
# Two GPU-pinned Ollama containers on the Gitea host — same infra
|
# Two GPU-pinned Ollama containers on the Gitea host — same infra
|
||||||
# zerto-docs uses. :11435 = Titan X, :11436 = 1080 Ti. Indexer
|
# zerto-docs uses. :11435 = Titan X, :11436 = 1080 Ti. Indexer
|
||||||
# round-robins per batch.
|
# round-robins per batch.
|
||||||
OLLAMA_URLS: http://192.168.0.2:11435,http://192.168.0.2:11436
|
OLLAMA_URLS: http://192.168.0.2:11435,http://192.168.0.2:11436,http://192.168.0.125:11434,http://192.168.0.126:11434
|
||||||
EMBED_MODEL: nomic-embed-text
|
EMBED_MODEL: nomic-embed-text
|
||||||
PRODUCT_NAME: morpheus
|
PRODUCT_NAME: morpheus
|
||||||
|
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ env:
|
|||||||
# :11435 owns the Titan X, :11436 owns the 1080 Ti; the indexer
|
# :11435 owns the Titan X, :11436 owns the 1080 Ti; the indexer
|
||||||
# round-robins per batch so both cards run in parallel. The host's
|
# round-robins per batch so both cards run in parallel. The host's
|
||||||
# primary Ollama on :11434 is left alone for OpenWebUI etc.
|
# primary Ollama on :11434 is left alone for OpenWebUI etc.
|
||||||
OLLAMA_URLS: http://192.168.0.2:11435,http://192.168.0.2:11436
|
OLLAMA_URLS: http://192.168.0.2:11435,http://192.168.0.2:11436,http://192.168.0.125:11434,http://192.168.0.126:11434
|
||||||
EMBED_MODEL: nomic-embed-text
|
EMBED_MODEL: nomic-embed-text
|
||||||
|
|
||||||
PRODUCT_NAME: morpheus
|
PRODUCT_NAME: morpheus
|
||||||
|
|||||||
+35
-4
@@ -19,6 +19,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
@@ -43,11 +44,18 @@ EMBED_DIM = int(os.environ.get("EMBED_DIM", "768"))
|
|||||||
|
|
||||||
|
|
||||||
class OllamaEmbeddings(EmbeddingFunction):
|
class OllamaEmbeddings(EmbeddingFunction):
|
||||||
"""Calls /api/embed across N Ollama endpoints, naive round-robin.
|
"""Calls /api/embed across N Ollama endpoints, round-robin per batch.
|
||||||
|
|
||||||
For indexing throughput on multiple GPUs, run one Ollama container
|
For indexing throughput on multiple GPUs, run one Ollama container
|
||||||
per GPU (pinned via NVIDIA_VISIBLE_DEVICES) and pass all their URLs
|
per GPU (pinned via NVIDIA_VISIBLE_DEVICES) and pass all their URLs
|
||||||
in OLLAMA_URL — the embedder picks the next endpoint per batch.
|
in OLLAMA_URL — the embedder picks the next endpoint per batch.
|
||||||
|
|
||||||
|
Resilient (ported from zerto-docs PR #45): a failed call rotates to
|
||||||
|
the next endpoint and retries with backoff instead of failing the
|
||||||
|
whole rebuild. HTTP status errors additionally halve the input —
|
||||||
|
the .0.125 Windows Ollama (4090) 400s when its model runner dies on
|
||||||
|
an oversized input array, and one endpoint rejecting a batch the
|
||||||
|
others accept shouldn't kill a multi-hour index build.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, urls: list[str] = OLLAMA_URLS, model: str = EMBED_MODEL):
|
def __init__(self, urls: list[str] = OLLAMA_URLS, model: str = EMBED_MODEL):
|
||||||
@@ -56,14 +64,37 @@ class OllamaEmbeddings(EmbeddingFunction):
|
|||||||
self._next = 0
|
self._next = 0
|
||||||
|
|
||||||
def __call__(self, input: Documents) -> Embeddings:
|
def __call__(self, input: Documents) -> Embeddings:
|
||||||
|
return self._embed(list(input), attempt=1)
|
||||||
|
|
||||||
|
def _embed(self, texts: list, attempt: int) -> Embeddings:
|
||||||
url = self.urls[self._next % len(self.urls)]
|
url = self.urls[self._next % len(self.urls)]
|
||||||
self._next += 1
|
self._next += 1
|
||||||
|
try:
|
||||||
with httpx.Client(timeout=300) as c:
|
with httpx.Client(timeout=300) as c:
|
||||||
r = c.post(f"{url}/api/embed",
|
r = c.post(f"{url}/api/embed",
|
||||||
json={"model": self.model, "input": list(input)})
|
json={"model": self.model, "input": texts})
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
data = r.json()
|
return r.json().get("embeddings") or []
|
||||||
return data.get("embeddings") or []
|
except (httpx.TransportError, httpx.HTTPStatusError) as e:
|
||||||
|
if isinstance(e, httpx.HTTPStatusError):
|
||||||
|
desc = f"HTTP {e.response.status_code} ({e.response.text[:200]})"
|
||||||
|
else:
|
||||||
|
desc = f"transport error {type(e).__name__}"
|
||||||
|
if attempt >= 5:
|
||||||
|
log.error("%s from %s (%d texts) — giving up after %d attempts",
|
||||||
|
desc, url, len(texts), attempt)
|
||||||
|
raise
|
||||||
|
if isinstance(e, httpx.HTTPStatusError) and len(texts) > 16:
|
||||||
|
mid = len(texts) // 2
|
||||||
|
log.warning("%s from %s — splitting %d texts into %d+%d (attempt %d)",
|
||||||
|
desc, url, len(texts), mid, len(texts) - mid, attempt)
|
||||||
|
return (self._embed(texts[:mid], attempt + 1)
|
||||||
|
+ self._embed(texts[mid:], attempt + 1))
|
||||||
|
backoff = 0.5 * (2 ** (attempt - 1)) # 0.5, 1, 2, 4
|
||||||
|
log.warning("%s (attempt %d, %s) — retrying in %.1fs",
|
||||||
|
desc, attempt, url, backoff)
|
||||||
|
time.sleep(backoff)
|
||||||
|
return self._embed(texts, attempt + 1)
|
||||||
|
|
||||||
def name(self) -> str: # newer chromadb requires this
|
def name(self) -> str: # newer chromadb requires this
|
||||||
return f"ollama:{self.model}"
|
return f"ollama:{self.model}"
|
||||||
|
|||||||
Reference in New Issue
Block a user