Files
justin c5631d3eab Add an instance owner/operator role (env-declared via OWNER_EMAIL)
Provenance had no system-level owner: ownership was only per-tree
(TreeMembership), so a self-hosted instance had no operator account and no
instance-admin surface. This adds one, declared by environment per the project's
twelve-factor rule.

- OWNER_EMAIL (comma-separated): the account(s) named here are instance owners.
  Derived at request time — no DB column, no migration, can't drift from the env,
  survives DB resets. is_instance_owner()/InstanceOwner dependency in api/deps.py.
- Ownership requires a VERIFIED email (independent of REQUIRE_EMAIL_VERIFICATION).
  Registration is open, so without this an attacker could seize the role by
  registering the owner address first; verification ties it to inbox control.
- GET /api/v1/admin/instance (owner-only): operational status — version, env,
  user/tree counts, configured AI providers. Deliberately exposes no tree data
  or PII: instance ownership is an operator role, NOT a privacy-engine bypass.
- /users/me reports is_instance_owner; frontend gains an owner-only /admin page
  and a conditional sidebar link (server-enforced, not just client-hidden).

Found-and-fixed by an adversarial security review before merge: the
verified-email land-grab (above) and a frontend null-deref where the admin page
crashed on 401/5xx instead of failing closed.

Docs: .env.example + ARCHITECTURE (notes the not-a-privacy-bypass boundary and
the verified-email requirement). Tests: owner matching, the land-grab guard,
/users/me, and owner-only /admin. Suite 96 passing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-09 23:16:45 -04:00

183 lines
7.1 KiB
Python

"""Shared API dependencies: DB session, the authenticated user, and the mailer."""
from typing import Annotated
from fastapi import Depends, HTTPException, Request, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import get_settings
from app.core.db import get_session
from app.integrations.mailer.base import Mailer
from app.integrations.mailer.console import ConsoleMailer
from app.integrations.mailer.smtp import SMTPMailer
from app.integrations.models.base import EmbeddingProvider, LLMProvider
from app.integrations.models.null import NullEmbeddingProvider, NullLLMProvider
from app.integrations.objectstore.base import ObjectStore
from app.integrations.objectstore.s3 import S3ObjectStore
from app.models.user import User
from app.services import auth_service
SessionDep = Annotated[AsyncSession, Depends(get_session)]
def extract_session_token(request: Request) -> str | None:
"""Bearer header (API clients) takes precedence over the session cookie
(browser)."""
authorization = request.headers.get("authorization")
if authorization and authorization.lower().startswith("bearer "):
return authorization[7:].strip()
return request.cookies.get(get_settings().cookie_name)
async def get_current_user(request: Request, session: SessionDep) -> User:
raw_token = extract_session_token(request)
if raw_token is None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "authentication required")
user = await auth_service.resolve_session_user(session, raw_token=raw_token)
if user is None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "invalid or expired session")
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
async def get_current_user_or_none(request: Request, session: SessionDep) -> User | None:
"""Optional auth for public read endpoints — never raises. Returns the user
when a valid session is present, else None (anonymous viewer)."""
raw_token = extract_session_token(request)
if raw_token is None:
return None
return await auth_service.resolve_session_user(session, raw_token=raw_token)
CurrentUserOrNone = Annotated[User | None, Depends(get_current_user_or_none)]
def is_instance_owner(user: User) -> bool:
"""Whether this account is an instance owner/operator — i.e. its email is
named in OWNER_EMAIL *and* that email has been verified. Instance ownership
is an operational/config role; it does NOT bypass the privacy engine or grant
access to others' tree data.
The verified-email requirement is load-bearing: registration is open and (by
default) doesn't require verification, so without it an attacker could claim
the owner email by registering it before the operator does — a land-grab to
the highest role with no proof of inbox control. Requiring verification ties
ownership to actual control of the named inbox regardless of the global
REQUIRE_EMAIL_VERIFICATION setting. (Self-hosts without SMTP can verify via
the link the console mailer prints to the operator-controlled logs.)"""
owners = get_settings().owner_emails()
return (
bool(owners)
and user.email_verified_at is not None
and user.email.strip().lower() in owners
)
async def require_instance_owner(current: CurrentUser) -> User:
if not is_instance_owner(current):
raise HTTPException(status.HTTP_403_FORBIDDEN, "instance owner only")
return current
InstanceOwner = Annotated[User, Depends(require_instance_owner)]
def get_mailer() -> Mailer:
settings = get_settings()
if settings.mailer == "smtp" and settings.smtp_host:
return SMTPMailer(settings)
return ConsoleMailer()
MailerDep = Annotated[Mailer, Depends(get_mailer)]
def get_objectstore() -> ObjectStore:
return S3ObjectStore(get_settings())
ObjectStoreDep = Annotated[ObjectStore, Depends(get_objectstore)]
def build_llm_providers() -> dict[str, LLMProvider]:
"""Every LLM provider whose credentials are configured, keyed by name. Run
several at once; pick one with get_llm_provider(name)."""
from app.integrations.models.anthropic_provider import AnthropicLLMProvider
from app.integrations.models.openai_compat import OpenAICompatibleLLMProvider
s = get_settings()
providers: dict[str, LLMProvider] = {}
if s.anthropic_api_key:
providers["anthropic"] = AnthropicLLMProvider(
api_key=s.anthropic_api_key, model=s.anthropic_model, max_tokens=s.llm_max_tokens
)
if s.openai_api_key:
providers["openai"] = OpenAICompatibleLLMProvider(
api_key=s.openai_api_key, base_url=s.openai_base_url, model=s.openai_model,
max_tokens=s.llm_max_tokens,
)
if s.xai_api_key:
providers["xai"] = OpenAICompatibleLLMProvider(
api_key=s.xai_api_key, base_url=s.xai_base_url, model=s.xai_model,
max_tokens=s.llm_max_tokens,
)
if s.ollama_enabled:
providers["ollama"] = OpenAICompatibleLLMProvider(
api_key=None, base_url=s.ollama_base_url, model=s.ollama_model,
max_tokens=s.llm_max_tokens,
)
return providers
def configured_llm_providers() -> list[dict]:
"""Configured LLM providers as {name, model} — for the AI admin view (no
secrets). Mirrors build_llm_providers() without constructing clients."""
s = get_settings()
out: list[dict] = []
if s.anthropic_api_key:
out.append({"name": "anthropic", "model": s.anthropic_model})
if s.openai_api_key:
out.append({"name": "openai", "model": s.openai_model})
if s.xai_api_key:
out.append({"name": "xai", "model": s.xai_model})
if s.ollama_enabled:
out.append({"name": "ollama", "model": s.ollama_model})
return out
def get_llm_provider(name: str | None = None) -> LLMProvider:
"""The named LLM provider, or the configured default, or Null if unconfigured."""
providers = build_llm_providers()
return providers.get(name or get_settings().default_llm_provider) or NullLLMProvider()
LLMProviderDep = Annotated[LLMProvider, Depends(get_llm_provider)]
def build_embedding_providers() -> dict[str, EmbeddingProvider]:
from app.integrations.models.openai_compat import OpenAICompatibleEmbeddingProvider
s = get_settings()
providers: dict[str, EmbeddingProvider] = {}
if s.openai_api_key:
providers["openai"] = OpenAICompatibleEmbeddingProvider(
api_key=s.openai_api_key, base_url=s.openai_base_url,
model=s.openai_embedding_model, dimensions=s.embedding_dimensions,
)
if s.ollama_enabled:
providers["ollama"] = OpenAICompatibleEmbeddingProvider(
api_key=None, base_url=s.ollama_base_url,
model=s.ollama_embedding_model, dimensions=s.embedding_dimensions,
)
return providers
def get_embedding_provider(name: str | None = None) -> EmbeddingProvider:
providers = build_embedding_providers()
return providers.get(name or get_settings().default_embedding_provider) or NullEmbeddingProvider()
EmbeddingProviderDep = Annotated[EmbeddingProvider, Depends(get_embedding_provider)]