Add an instance owner/operator role (env-declared via OWNER_EMAIL)

Provenance had no system-level owner: ownership was only per-tree
(TreeMembership), so a self-hosted instance had no operator account and no
instance-admin surface. This adds one, declared by environment per the project's
twelve-factor rule.

- OWNER_EMAIL (comma-separated): the account(s) named here are instance owners.
  Derived at request time — no DB column, no migration, can't drift from the env,
  survives DB resets. is_instance_owner()/InstanceOwner dependency in api/deps.py.
- Ownership requires a VERIFIED email (independent of REQUIRE_EMAIL_VERIFICATION).
  Registration is open, so without this an attacker could seize the role by
  registering the owner address first; verification ties it to inbox control.
- GET /api/v1/admin/instance (owner-only): operational status — version, env,
  user/tree counts, configured AI providers. Deliberately exposes no tree data
  or PII: instance ownership is an operator role, NOT a privacy-engine bypass.
- /users/me reports is_instance_owner; frontend gains an owner-only /admin page
  and a conditional sidebar link (server-enforced, not just client-hidden).

Found-and-fixed by an adversarial security review before merge: the
verified-email land-grab (above) and a frontend null-deref where the admin page
crashed on 401/5xx instead of failing closed.

Docs: .env.example + ARCHITECTURE (notes the not-a-privacy-bypass boundary and
the verified-email requirement). Tests: owner matching, the land-grab guard,
/users/me, and owner-only /admin. Suite 96 passing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
This commit is contained in:
2026-06-09 23:16:45 -04:00
parent 6fbad3106d
commit c5631d3eab
15 changed files with 467 additions and 5 deletions
+30
View File
@@ -54,6 +54,36 @@ async def get_current_user_or_none(request: Request, session: SessionDep) -> Use
CurrentUserOrNone = Annotated[User | None, Depends(get_current_user_or_none)]
def is_instance_owner(user: User) -> bool:
"""Whether this account is an instance owner/operator — i.e. its email is
named in OWNER_EMAIL *and* that email has been verified. Instance ownership
is an operational/config role; it does NOT bypass the privacy engine or grant
access to others' tree data.
The verified-email requirement is load-bearing: registration is open and (by
default) doesn't require verification, so without it an attacker could claim
the owner email by registering it before the operator does — a land-grab to
the highest role with no proof of inbox control. Requiring verification ties
ownership to actual control of the named inbox regardless of the global
REQUIRE_EMAIL_VERIFICATION setting. (Self-hosts without SMTP can verify via
the link the console mailer prints to the operator-controlled logs.)"""
owners = get_settings().owner_emails()
return (
bool(owners)
and user.email_verified_at is not None
and user.email.strip().lower() in owners
)
async def require_instance_owner(current: CurrentUser) -> User:
if not is_instance_owner(current):
raise HTTPException(status.HTTP_403_FORBIDDEN, "instance owner only")
return current
InstanceOwner = Annotated[User, Depends(require_instance_owner)]
def get_mailer() -> Mailer:
settings = get_settings()
if settings.mailer == "smtp" and settings.smtp_host:
+2
View File
@@ -3,6 +3,7 @@
from fastapi import APIRouter
from app.api.v1 import (
admin,
ai,
auth,
citations,
@@ -38,3 +39,4 @@ api_router.include_router(public.router)
api_router.include_router(members.router)
api_router.include_router(proposals.router)
api_router.include_router(ai.router)
api_router.include_router(admin.router)
+38
View File
@@ -0,0 +1,38 @@
"""Instance-admin surface — owner-only (OWNER_EMAIL). Operational status and
instance-wide configuration. Deliberately exposes no tree contents or PII:
instance ownership is an operator role, not a privacy bypass."""
from sqlalchemy import func, select
from fastapi import APIRouter
from app.api.deps import InstanceOwner, SessionDep, configured_llm_providers
from app.core.config import get_settings
from app.models.tree import Tree
from app.models.user import User
from app.schemas.admin import InstanceStatus
from app.schemas.ai_policy import ConfiguredProvider
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/instance", response_model=InstanceStatus)
async def instance_status(owner: InstanceOwner, session: SessionDep) -> InstanceStatus:
"""Operator dashboard data. Requires the caller to be an instance owner."""
s = get_settings()
user_count = await session.scalar(
select(func.count()).select_from(User).where(User.deleted_at.is_(None))
)
tree_count = await session.scalar(
select(func.count()).select_from(Tree).where(Tree.deleted_at.is_(None))
)
return InstanceStatus(
version=s.version,
env=s.app_env,
owner_emails=sorted(s.owner_emails()),
require_email_verification=s.require_email_verification,
user_count=user_count or 0,
tree_count=tree_count or 0,
default_llm_provider=s.default_llm_provider,
ai_providers=[ConfiguredProvider(**p) for p in configured_llm_providers()],
)
+9 -3
View File
@@ -1,15 +1,21 @@
from fastapi import APIRouter, File, Form, Response, UploadFile
from app.api.deps import CurrentUser, ObjectStoreDep, SessionDep
from app.api.deps import CurrentUser, ObjectStoreDep, SessionDep, is_instance_owner
from app.schemas.user import UserRead, UserSelfPersonUpdate
from app.services import account_service, user_service
router = APIRouter(prefix="/users", tags=["users"])
def _me(user) -> UserRead:
out = UserRead.model_validate(user)
out.is_instance_owner = is_instance_owner(user)
return out
@router.get("/me", response_model=UserRead)
async def read_me(current: CurrentUser) -> UserRead:
return UserRead.model_validate(current)
return _me(current)
@router.patch("/me/self-person", response_model=UserRead)
@@ -20,7 +26,7 @@ async def set_self_person(
user = await user_service.set_self_person(
session, user=current, person_id=data.self_person_id
)
return UserRead.model_validate(user)
return _me(user)
@router.get("/me/export")
+12
View File
@@ -22,6 +22,18 @@ class Settings(BaseSettings):
version: str = "0.0.0"
app_env: str = Field(default="development", description="development | production")
# --- Instance owner / operator ---
# Email(s) of the instance owner(s) — the operator(s) who run this server.
# The matching account(s) get instance-admin rights (instance-wide settings;
# see /api/v1/admin). Comma-separated for several. Empty = no designated
# owner (the instance has no operator account). Derived at request time, so
# changing it takes effect immediately with no migration or DB state.
owner_email: str = ""
def owner_emails(self) -> frozenset[str]:
"""Normalized (lowercased, trimmed) owner emails; empty if none set."""
return frozenset(e.strip().lower() for e in self.owner_email.split(",") if e.strip())
# SQLAlchemy async URL, e.g. postgresql+asyncpg://user:pass@host:5432/db
database_url: str = Field(
default="postgresql+asyncpg://provenance:provenance@localhost:5432/provenance",
+20
View File
@@ -0,0 +1,20 @@
"""Instance-admin schemas. Operator-facing, owner-only — operational status and
config, never tree data or PII (instance ownership doesn't bypass privacy)."""
from pydantic import BaseModel
from app.schemas.ai_policy import ConfiguredProvider
class InstanceStatus(BaseModel):
version: str
env: str
# Operator account(s) — the email(s) named in OWNER_EMAIL.
owner_emails: list[str]
require_email_verification: bool
# Aggregate, non-identifying counts (live rows only).
user_count: int
tree_count: int
# Instance-wide AI configuration (no secrets).
default_llm_provider: str
ai_providers: list[ConfiguredProvider]
+3
View File
@@ -21,6 +21,9 @@ class UserRead(BaseModel):
email_verified_at: datetime | None
self_person_id: uuid.UUID | None = None
created_at: datetime
# Operational role, not a DB column: true when this account's email is named
# in OWNER_EMAIL. Set by the API layer (see users.read_me).
is_instance_owner: bool = False
class UserSelfPersonUpdate(BaseModel):
+72
View File
@@ -0,0 +1,72 @@
"""Instance owner (OWNER_EMAIL): the operator account + the owner-only /admin
surface. Ownership is derived from the env at request time — no DB column — and
requires a *verified* email so the owner address can't be land-grabbed by
whoever registers it first."""
from datetime import datetime, timezone
from sqlalchemy import text
from app.api.deps import is_instance_owner
from app.core.config import get_settings
from app.models.user import User
from tests.conftest import auth, register
VERIFIED = datetime(2020, 1, 1, tzinfo=timezone.utc)
def test_is_instance_owner_matches_case_insensitively(monkeypatch):
monkeypatch.setattr(get_settings(), "owner_email", "Owner@Example.com, second@ex.com")
assert is_instance_owner(User(email="owner@example.com", email_verified_at=VERIFIED)) is True
assert is_instance_owner(User(email="SECOND@ex.com", email_verified_at=VERIFIED)) is True
assert is_instance_owner(User(email="nope@ex.com", email_verified_at=VERIFIED)) is False
def test_unverified_owner_email_is_not_owner(monkeypatch):
"""The land-grab guard: a matching email with no verification is NOT owner."""
monkeypatch.setattr(get_settings(), "owner_email", "boss@ex.com")
assert is_instance_owner(User(email="boss@ex.com", email_verified_at=None)) is False
assert is_instance_owner(User(email="boss@ex.com", email_verified_at=VERIFIED)) is True
def test_no_owner_when_unset(monkeypatch):
monkeypatch.setattr(get_settings(), "owner_email", "")
# An empty OWNER_EMAIL designates no owner — and must never match the (also
# empty-string-normalizing) edges.
assert is_instance_owner(User(email="anyone@ex.com", email_verified_at=VERIFIED)) is False
assert is_instance_owner(User(email="", email_verified_at=VERIFIED)) is False
monkeypatch.setattr(get_settings(), "owner_email", " , ")
assert is_instance_owner(User(email="", email_verified_at=VERIFIED)) is False
async def _verify(db_session, email: str) -> None:
await db_session.execute(
text("UPDATE users SET email_verified_at = now() WHERE email = :e"), {"e": email}
)
await db_session.commit()
async def test_me_reports_instance_owner(client, db_session, monkeypatch):
monkeypatch.setattr(get_settings(), "owner_email", "boss@ex.com")
boss = auth(await register(client, "boss@ex.com"))
other = auth(await register(client, "peon@ex.com"))
await _verify(db_session, "boss@ex.com")
assert (await client.get("/api/v1/users/me", headers=boss)).json()["is_instance_owner"] is True
assert (await client.get("/api/v1/users/me", headers=other)).json()["is_instance_owner"] is False
async def test_admin_instance_is_owner_only(client, db_session, monkeypatch):
monkeypatch.setattr(get_settings(), "owner_email", "boss@ex.com")
boss = auth(await register(client, "boss@ex.com"))
other = auth(await register(client, "peon@ex.com"))
await _verify(db_session, "boss@ex.com")
assert (await client.get("/api/v1/admin/instance")).status_code == 401 # anon
assert (await client.get("/api/v1/admin/instance", headers=other)).status_code == 403 # non-owner
r = await client.get("/api/v1/admin/instance", headers=boss)
assert r.status_code == 200
body = r.json()
assert body["owner_emails"] == ["boss@ex.com"]
assert body["user_count"] >= 2
assert "ai_providers" in body and "default_llm_provider" in body