"""Instance owner (OWNER_EMAIL): the operator account + the owner-only /admin surface. Ownership is derived from the env at request time — no DB column — and requires a *verified* email so the owner address can't be land-grabbed by whoever registers it first.""" from datetime import datetime, timezone from sqlalchemy import text from app.api.deps import is_instance_owner from app.core.config import get_settings from app.models.user import User from tests.conftest import auth, register VERIFIED = datetime(2020, 1, 1, tzinfo=timezone.utc) def test_is_instance_owner_matches_case_insensitively(monkeypatch): monkeypatch.setattr(get_settings(), "owner_email", "Owner@Example.com, second@ex.com") assert is_instance_owner(User(email="owner@example.com", email_verified_at=VERIFIED)) is True assert is_instance_owner(User(email="SECOND@ex.com", email_verified_at=VERIFIED)) is True assert is_instance_owner(User(email="nope@ex.com", email_verified_at=VERIFIED)) is False def test_unverified_owner_email_is_not_owner(monkeypatch): """The land-grab guard: a matching email with no verification is NOT owner.""" monkeypatch.setattr(get_settings(), "owner_email", "boss@ex.com") assert is_instance_owner(User(email="boss@ex.com", email_verified_at=None)) is False assert is_instance_owner(User(email="boss@ex.com", email_verified_at=VERIFIED)) is True def test_no_owner_when_unset(monkeypatch): monkeypatch.setattr(get_settings(), "owner_email", "") # An empty OWNER_EMAIL designates no owner — and must never match the (also # empty-string-normalizing) edges. assert is_instance_owner(User(email="anyone@ex.com", email_verified_at=VERIFIED)) is False assert is_instance_owner(User(email="", email_verified_at=VERIFIED)) is False monkeypatch.setattr(get_settings(), "owner_email", " , ") assert is_instance_owner(User(email="", email_verified_at=VERIFIED)) is False async def _verify(db_session, email: str) -> None: await db_session.execute( text("UPDATE users SET email_verified_at = now() WHERE email = :e"), {"e": email} ) await db_session.commit() async def test_me_reports_instance_owner(client, db_session, monkeypatch): monkeypatch.setattr(get_settings(), "owner_email", "boss@ex.com") boss = auth(await register(client, "boss@ex.com")) other = auth(await register(client, "peon@ex.com")) await _verify(db_session, "boss@ex.com") assert (await client.get("/api/v1/users/me", headers=boss)).json()["is_instance_owner"] is True assert (await client.get("/api/v1/users/me", headers=other)).json()["is_instance_owner"] is False async def test_admin_instance_is_owner_only(client, db_session, monkeypatch): monkeypatch.setattr(get_settings(), "owner_email", "boss@ex.com") boss = auth(await register(client, "boss@ex.com")) other = auth(await register(client, "peon@ex.com")) await _verify(db_session, "boss@ex.com") assert (await client.get("/api/v1/admin/instance")).status_code == 401 # anon assert (await client.get("/api/v1/admin/instance", headers=other)).status_code == 403 # non-owner r = await client.get("/api/v1/admin/instance", headers=boss) assert r.status_code == 200 body = r.json() assert body["owner_emails"] == ["boss@ex.com"] assert body["user_count"] >= 2 assert "ai_providers" in body and "default_llm_provider" in body