c5631d3eab
Provenance had no system-level owner: ownership was only per-tree (TreeMembership), so a self-hosted instance had no operator account and no instance-admin surface. This adds one, declared by environment per the project's twelve-factor rule. - OWNER_EMAIL (comma-separated): the account(s) named here are instance owners. Derived at request time — no DB column, no migration, can't drift from the env, survives DB resets. is_instance_owner()/InstanceOwner dependency in api/deps.py. - Ownership requires a VERIFIED email (independent of REQUIRE_EMAIL_VERIFICATION). Registration is open, so without this an attacker could seize the role by registering the owner address first; verification ties it to inbox control. - GET /api/v1/admin/instance (owner-only): operational status — version, env, user/tree counts, configured AI providers. Deliberately exposes no tree data or PII: instance ownership is an operator role, NOT a privacy-engine bypass. - /users/me reports is_instance_owner; frontend gains an owner-only /admin page and a conditional sidebar link (server-enforced, not just client-hidden). Found-and-fixed by an adversarial security review before merge: the verified-email land-grab (above) and a frontend null-deref where the admin page crashed on 401/5xx instead of failing closed. Docs: .env.example + ARCHITECTURE (notes the not-a-privacy-bypass boundary and the verified-email requirement). Tests: owner matching, the land-grab guard, /users/me, and owner-only /admin. Suite 96 passing. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Paul <justin@jpaul.me>
183 lines
7.1 KiB
Python
183 lines
7.1 KiB
Python
"""Shared API dependencies: DB session, the authenticated user, and the mailer."""
|
|
|
|
from typing import Annotated
|
|
|
|
from fastapi import Depends, HTTPException, Request, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.db import get_session
|
|
from app.integrations.mailer.base import Mailer
|
|
from app.integrations.mailer.console import ConsoleMailer
|
|
from app.integrations.mailer.smtp import SMTPMailer
|
|
from app.integrations.models.base import EmbeddingProvider, LLMProvider
|
|
from app.integrations.models.null import NullEmbeddingProvider, NullLLMProvider
|
|
from app.integrations.objectstore.base import ObjectStore
|
|
from app.integrations.objectstore.s3 import S3ObjectStore
|
|
from app.models.user import User
|
|
from app.services import auth_service
|
|
|
|
SessionDep = Annotated[AsyncSession, Depends(get_session)]
|
|
|
|
|
|
def extract_session_token(request: Request) -> str | None:
|
|
"""Bearer header (API clients) takes precedence over the session cookie
|
|
(browser)."""
|
|
authorization = request.headers.get("authorization")
|
|
if authorization and authorization.lower().startswith("bearer "):
|
|
return authorization[7:].strip()
|
|
return request.cookies.get(get_settings().cookie_name)
|
|
|
|
|
|
async def get_current_user(request: Request, session: SessionDep) -> User:
|
|
raw_token = extract_session_token(request)
|
|
if raw_token is None:
|
|
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "authentication required")
|
|
user = await auth_service.resolve_session_user(session, raw_token=raw_token)
|
|
if user is None:
|
|
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "invalid or expired session")
|
|
return user
|
|
|
|
|
|
CurrentUser = Annotated[User, Depends(get_current_user)]
|
|
|
|
|
|
async def get_current_user_or_none(request: Request, session: SessionDep) -> User | None:
|
|
"""Optional auth for public read endpoints — never raises. Returns the user
|
|
when a valid session is present, else None (anonymous viewer)."""
|
|
raw_token = extract_session_token(request)
|
|
if raw_token is None:
|
|
return None
|
|
return await auth_service.resolve_session_user(session, raw_token=raw_token)
|
|
|
|
|
|
CurrentUserOrNone = Annotated[User | None, Depends(get_current_user_or_none)]
|
|
|
|
|
|
def is_instance_owner(user: User) -> bool:
|
|
"""Whether this account is an instance owner/operator — i.e. its email is
|
|
named in OWNER_EMAIL *and* that email has been verified. Instance ownership
|
|
is an operational/config role; it does NOT bypass the privacy engine or grant
|
|
access to others' tree data.
|
|
|
|
The verified-email requirement is load-bearing: registration is open and (by
|
|
default) doesn't require verification, so without it an attacker could claim
|
|
the owner email by registering it before the operator does — a land-grab to
|
|
the highest role with no proof of inbox control. Requiring verification ties
|
|
ownership to actual control of the named inbox regardless of the global
|
|
REQUIRE_EMAIL_VERIFICATION setting. (Self-hosts without SMTP can verify via
|
|
the link the console mailer prints to the operator-controlled logs.)"""
|
|
owners = get_settings().owner_emails()
|
|
return (
|
|
bool(owners)
|
|
and user.email_verified_at is not None
|
|
and user.email.strip().lower() in owners
|
|
)
|
|
|
|
|
|
async def require_instance_owner(current: CurrentUser) -> User:
|
|
if not is_instance_owner(current):
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, "instance owner only")
|
|
return current
|
|
|
|
|
|
InstanceOwner = Annotated[User, Depends(require_instance_owner)]
|
|
|
|
|
|
def get_mailer() -> Mailer:
|
|
settings = get_settings()
|
|
if settings.mailer == "smtp" and settings.smtp_host:
|
|
return SMTPMailer(settings)
|
|
return ConsoleMailer()
|
|
|
|
|
|
MailerDep = Annotated[Mailer, Depends(get_mailer)]
|
|
|
|
|
|
def get_objectstore() -> ObjectStore:
|
|
return S3ObjectStore(get_settings())
|
|
|
|
|
|
ObjectStoreDep = Annotated[ObjectStore, Depends(get_objectstore)]
|
|
|
|
|
|
def build_llm_providers() -> dict[str, LLMProvider]:
|
|
"""Every LLM provider whose credentials are configured, keyed by name. Run
|
|
several at once; pick one with get_llm_provider(name)."""
|
|
from app.integrations.models.anthropic_provider import AnthropicLLMProvider
|
|
from app.integrations.models.openai_compat import OpenAICompatibleLLMProvider
|
|
|
|
s = get_settings()
|
|
providers: dict[str, LLMProvider] = {}
|
|
if s.anthropic_api_key:
|
|
providers["anthropic"] = AnthropicLLMProvider(
|
|
api_key=s.anthropic_api_key, model=s.anthropic_model, max_tokens=s.llm_max_tokens
|
|
)
|
|
if s.openai_api_key:
|
|
providers["openai"] = OpenAICompatibleLLMProvider(
|
|
api_key=s.openai_api_key, base_url=s.openai_base_url, model=s.openai_model,
|
|
max_tokens=s.llm_max_tokens,
|
|
)
|
|
if s.xai_api_key:
|
|
providers["xai"] = OpenAICompatibleLLMProvider(
|
|
api_key=s.xai_api_key, base_url=s.xai_base_url, model=s.xai_model,
|
|
max_tokens=s.llm_max_tokens,
|
|
)
|
|
if s.ollama_enabled:
|
|
providers["ollama"] = OpenAICompatibleLLMProvider(
|
|
api_key=None, base_url=s.ollama_base_url, model=s.ollama_model,
|
|
max_tokens=s.llm_max_tokens,
|
|
)
|
|
return providers
|
|
|
|
|
|
def configured_llm_providers() -> list[dict]:
|
|
"""Configured LLM providers as {name, model} — for the AI admin view (no
|
|
secrets). Mirrors build_llm_providers() without constructing clients."""
|
|
s = get_settings()
|
|
out: list[dict] = []
|
|
if s.anthropic_api_key:
|
|
out.append({"name": "anthropic", "model": s.anthropic_model})
|
|
if s.openai_api_key:
|
|
out.append({"name": "openai", "model": s.openai_model})
|
|
if s.xai_api_key:
|
|
out.append({"name": "xai", "model": s.xai_model})
|
|
if s.ollama_enabled:
|
|
out.append({"name": "ollama", "model": s.ollama_model})
|
|
return out
|
|
|
|
|
|
def get_llm_provider(name: str | None = None) -> LLMProvider:
|
|
"""The named LLM provider, or the configured default, or Null if unconfigured."""
|
|
providers = build_llm_providers()
|
|
return providers.get(name or get_settings().default_llm_provider) or NullLLMProvider()
|
|
|
|
|
|
LLMProviderDep = Annotated[LLMProvider, Depends(get_llm_provider)]
|
|
|
|
|
|
def build_embedding_providers() -> dict[str, EmbeddingProvider]:
|
|
from app.integrations.models.openai_compat import OpenAICompatibleEmbeddingProvider
|
|
|
|
s = get_settings()
|
|
providers: dict[str, EmbeddingProvider] = {}
|
|
if s.openai_api_key:
|
|
providers["openai"] = OpenAICompatibleEmbeddingProvider(
|
|
api_key=s.openai_api_key, base_url=s.openai_base_url,
|
|
model=s.openai_embedding_model, dimensions=s.embedding_dimensions,
|
|
)
|
|
if s.ollama_enabled:
|
|
providers["ollama"] = OpenAICompatibleEmbeddingProvider(
|
|
api_key=None, base_url=s.ollama_base_url,
|
|
model=s.ollama_embedding_model, dimensions=s.embedding_dimensions,
|
|
)
|
|
return providers
|
|
|
|
|
|
def get_embedding_provider(name: str | None = None) -> EmbeddingProvider:
|
|
providers = build_embedding_providers()
|
|
return providers.get(name or get_settings().default_embedding_provider) or NullEmbeddingProvider()
|
|
|
|
|
|
EmbeddingProviderDep = Annotated[EmbeddingProvider, Depends(get_embedding_provider)]
|