Test auth flows and switch core tests to session auth

New auth suite covers registration, login (incl. wrong-password), email verification, password reset (old sessions + old password rejected), logout revocation, and no-enumeration on reset. Core tenancy tests now authenticate via real sessions. A capturing mailer makes email flows assertable. 13 tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
This commit is contained in:
2026-06-06 10:51:51 -04:00
parent 00bfe8bfca
commit 9f8dd960f4
3 changed files with 171 additions and 41 deletions
+45 -3
View File
@@ -3,28 +3,49 @@
DB-backed tests require ``TEST_DATABASE_URL`` (an async URL to a *disposable*
Postgres); without it they skip, so the no-DB unit suite still runs anywhere.
The schema is built from the models via ``create_all`` and dropped per test for
isolation.
isolation. A capturing mailer replaces the real one so email flows are testable.
"""
import os
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
import app.models # noqa: F401 — register all models on Base.metadata
from app.api.deps import get_mailer
from app.core.db import get_session
from app.integrations.mailer.base import Mailer
from app.main import app
from app.models import Base
TEST_DATABASE_URL = os.getenv("TEST_DATABASE_URL")
class CapturingMailer(Mailer):
def __init__(self) -> None:
self.verifications: list[tuple[str, str]] = []
self.resets: list[tuple[str, str]] = []
async def send_email_verification(self, *, to: str, link: str) -> None:
self.verifications.append((to, link))
async def send_password_reset(self, *, to: str, link: str) -> None:
self.resets.append((to, link))
_mailer = CapturingMailer()
@pytest.fixture
def mailer() -> CapturingMailer:
return _mailer
@pytest_asyncio.fixture
async def client():
if not TEST_DATABASE_URL:
import pytest
pytest.skip("TEST_DATABASE_URL not set")
engine = create_async_engine(TEST_DATABASE_URL)
@@ -38,10 +59,31 @@ async def client():
async with sessionmaker() as session:
yield session
_mailer.verifications.clear()
_mailer.resets.clear()
app.dependency_overrides[get_session] = _override_session
app.dependency_overrides[get_mailer] = lambda: _mailer
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as http_client:
yield http_client
app.dependency_overrides.clear()
await engine.dispose()
def token_from_link(link: str) -> str:
return link.split("token=", 1)[1]
async def register(client, email: str, password: str = "password123") -> str:
"""Register a user and return their bearer session token."""
resp = await client.post(
"/api/v1/auth/register", json={"email": email, "password": password}
)
assert resp.status_code == 201, resp.text
return resp.json()["token"]
def auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
+106
View File
@@ -0,0 +1,106 @@
"""Auth flows: registration, login, email verification, password reset, logout."""
from tests.conftest import auth, register, token_from_link
async def test_register_issues_session_and_verification_email(client, mailer):
resp = await client.post(
"/api/v1/auth/register",
json={"email": "new@example.com", "password": "password123", "display_name": "New"},
)
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["token"]
assert body["user"]["email"] == "new@example.com"
assert body["user"]["email_verified_at"] is None
# A verification email was "sent".
assert len(mailer.verifications) == 1
assert mailer.verifications[0][0] == "new@example.com"
async def test_duplicate_email_conflicts(client):
await register(client, "dupe@example.com")
resp = await client.post(
"/api/v1/auth/register", json={"email": "dupe@example.com", "password": "password123"}
)
assert resp.status_code == 409
async def test_login_wrong_password_rejected(client):
await register(client, "user@example.com", password="password123")
resp = await client.post(
"/api/v1/auth/login", json={"email": "user@example.com", "password": "wrong-password"}
)
assert resp.status_code == 401
async def test_login_succeeds_and_me_returns_user(client):
await register(client, "user2@example.com", password="password123")
resp = await client.post(
"/api/v1/auth/login", json={"email": "user2@example.com", "password": "password123"}
)
assert resp.status_code == 200
token = resp.json()["token"]
resp = await client.get("/api/v1/users/me", headers=auth(token))
assert resp.status_code == 200
assert resp.json()["email"] == "user2@example.com"
async def test_email_verification(client, mailer):
await register(client, "verify@example.com")
token = token_from_link(mailer.verifications[0][1])
resp = await client.post("/api/v1/auth/verify-email", json={"token": token})
assert resp.status_code == 204
# Logging in and checking /me shows the address is now verified.
login = await client.post(
"/api/v1/auth/login", json={"email": "verify@example.com", "password": "password123"}
)
me = await client.get("/api/v1/users/me", headers=auth(login.json()["token"]))
assert me.json()["email_verified_at"] is not None
async def test_password_reset_flow_revokes_old_sessions(client, mailer):
old_token = await register(client, "reset@example.com", password="password123")
resp = await client.post(
"/api/v1/auth/request-password-reset", json={"email": "reset@example.com"}
)
assert resp.status_code == 202
reset_token = token_from_link(mailer.resets[0][1])
resp = await client.post(
"/api/v1/auth/reset-password",
json={"token": reset_token, "new_password": "new-password456"},
)
assert resp.status_code == 204
# Old session is revoked.
assert (await client.get("/api/v1/users/me", headers=auth(old_token))).status_code == 401
# Old password no longer works; new one does.
assert (
await client.post(
"/api/v1/auth/login", json={"email": "reset@example.com", "password": "password123"}
)
).status_code == 401
assert (
await client.post(
"/api/v1/auth/login",
json={"email": "reset@example.com", "password": "new-password456"},
)
).status_code == 200
async def test_request_password_reset_unknown_email_still_accepted(client, mailer):
resp = await client.post(
"/api/v1/auth/request-password-reset", json={"email": "nobody@example.com"}
)
assert resp.status_code == 202
assert len(mailer.resets) == 0 # no email sent, but no enumeration either
async def test_logout_revokes_session(client):
token = await register(client, "logout@example.com")
assert (await client.get("/api/v1/users/me", headers=auth(token))).status_code == 200
assert (await client.post("/api/v1/auth/logout", headers=auth(token))).status_code == 204
assert (await client.get("/api/v1/users/me", headers=auth(token))).status_code == 401
+20 -38
View File
@@ -1,91 +1,73 @@
"""End-to-end coverage of the core data model through the API: tenancy, the
privacy seam, and the pre-auth actor shim."""
privacy seam, and real session auth."""
async def _create_user(client, email: str) -> str:
resp = await client.post(
"/api/v1/users", json={"email": email, "display_name": email}
)
assert resp.status_code == 201, resp.text
return resp.json()["id"]
from tests.conftest import auth, register
async def test_tree_and_person_flow(client):
user_id = await _create_user(client, "keeper@example.com")
headers = {"X-User-Id": user_id}
token = await register(client, "keeper@example.com")
resp = await client.post(
"/api/v1/trees", json={"name": "Smith Family", "visibility": "private"}, headers=headers
"/api/v1/trees",
json={"name": "Smith Family", "visibility": "private"},
headers=auth(token),
)
assert resp.status_code == 201, resp.text
tree = resp.json()
assert tree["visibility"] == "private"
assert tree["owner_id"] == user_id
tree_id = tree["id"]
resp = await client.get("/api/v1/trees", headers=headers)
resp = await client.get("/api/v1/trees", headers=auth(token))
assert resp.status_code == 200
assert len(resp.json()) == 1
resp = await client.post(
f"/api/v1/trees/{tree_id}/persons",
json={"given": "John", "surname": "Smith"},
headers=headers,
headers=auth(token),
)
assert resp.status_code == 201, resp.text
person = resp.json()
assert person["primary_name"] == "John Smith"
assert person["tree_id"] == tree_id
resp = await client.get(f"/api/v1/trees/{tree_id}/persons", headers=headers)
resp = await client.get(f"/api/v1/trees/{tree_id}/persons", headers=auth(token))
assert resp.status_code == 200
assert len(resp.json()) == 1
async def test_private_tree_isolated_from_other_users(client):
owner = await _create_user(client, "owner@example.com")
other = await _create_user(client, "stranger@example.com")
owner = await register(client, "owner@example.com")
other = await register(client, "stranger@example.com")
resp = await client.post(
"/api/v1/trees", json={"name": "Private", "visibility": "private"},
headers={"X-User-Id": owner},
"/api/v1/trees", json={"name": "Private", "visibility": "private"}, headers=auth(owner)
)
tree_id = resp.json()["id"]
# A non-member cannot view a private tree, nor list its people.
resp = await client.get(f"/api/v1/trees/{tree_id}", headers={"X-User-Id": other})
resp = await client.get(f"/api/v1/trees/{tree_id}", headers=auth(other))
assert resp.status_code == 403
resp = await client.get(f"/api/v1/trees/{tree_id}/persons", headers={"X-User-Id": other})
resp = await client.get(f"/api/v1/trees/{tree_id}/persons", headers=auth(other))
assert resp.status_code == 403
async def test_public_tree_viewable_but_not_editable_by_non_member(client):
owner = await _create_user(client, "owner2@example.com")
viewer = await _create_user(client, "viewer2@example.com")
owner = await register(client, "owner2@example.com")
viewer = await register(client, "viewer2@example.com")
resp = await client.post(
"/api/v1/trees", json={"name": "Public", "visibility": "public"},
headers={"X-User-Id": owner},
"/api/v1/trees", json={"name": "Public", "visibility": "public"}, headers=auth(owner)
)
tree_id = resp.json()["id"]
# Visible to a non-member...
resp = await client.get(f"/api/v1/trees/{tree_id}", headers={"X-User-Id": viewer})
resp = await client.get(f"/api/v1/trees/{tree_id}", headers=auth(viewer))
assert resp.status_code == 200
# ...but not writable (not an editor).
resp = await client.post(
f"/api/v1/trees/{tree_id}/persons", json={"given": "Nope"},
headers={"X-User-Id": viewer},
f"/api/v1/trees/{tree_id}/persons", json={"given": "Nope"}, headers=auth(viewer)
)
assert resp.status_code == 403
async def test_duplicate_email_conflicts(client):
await _create_user(client, "dupe@example.com")
resp = await client.post("/api/v1/users", json={"email": "dupe@example.com"})
assert resp.status_code == 409
async def test_auth_required_without_header(client):
async def test_auth_required_without_token(client):
resp = await client.get("/api/v1/trees")
assert resp.status_code == 401