Files
provenance/backend/tests/test_auth.py
T
justin 660fe7b37f Security: gate sessions on verified email (opt-in)
Backlog §2.10: registration issued a live session and email_verified_at was
written but never read, so an unverified user had full access and there was no
switch to require verification.

Add REQUIRE_EMAIL_VERIFICATION (default false). When true:
- resolve_session_user returns None for a user whose email_verified_at is null —
  the single read-side gate covering every authenticated request, incl. the
  session minted at registration.
- login raises 403 ("email not verified") instead of issuing a useless token.

Default false on purpose: self-hosts without SMTP, and accounts created before
this gate existed (email_verified_at null), must not be locked out. Operators
enable it once mail works and accounts are verified. Documented in .env.example.

Tests: default-off keeps unverified accounts working; on → register's session
won't resolve (401), login is 403, and after verify-email both work. 75 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-09 11:22:54 -04:00

148 lines
5.7 KiB
Python

"""Auth flows: registration, login, email verification, password reset, logout."""
from tests.conftest import auth, register, token_from_link
async def test_register_issues_session_and_verification_email(client, mailer):
resp = await client.post(
"/api/v1/auth/register",
json={"email": "new@example.com", "password": "password123", "display_name": "New"},
)
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["token"]
assert body["user"]["email"] == "new@example.com"
assert body["user"]["email_verified_at"] is None
# A verification email was "sent".
assert len(mailer.verifications) == 1
assert mailer.verifications[0][0] == "new@example.com"
async def test_duplicate_email_conflicts(client):
await register(client, "dupe@example.com")
resp = await client.post(
"/api/v1/auth/register", json={"email": "dupe@example.com", "password": "password123"}
)
assert resp.status_code == 409
async def test_login_wrong_password_rejected(client):
await register(client, "user@example.com", password="password123")
resp = await client.post(
"/api/v1/auth/login", json={"email": "user@example.com", "password": "wrong-password"}
)
assert resp.status_code == 401
async def test_login_succeeds_and_me_returns_user(client):
await register(client, "user2@example.com", password="password123")
resp = await client.post(
"/api/v1/auth/login", json={"email": "user2@example.com", "password": "password123"}
)
assert resp.status_code == 200
token = resp.json()["token"]
resp = await client.get("/api/v1/users/me", headers=auth(token))
assert resp.status_code == 200
assert resp.json()["email"] == "user2@example.com"
async def test_email_verification(client, mailer):
await register(client, "verify@example.com")
token = token_from_link(mailer.verifications[0][1])
resp = await client.post("/api/v1/auth/verify-email", json={"token": token})
assert resp.status_code == 204
# Logging in and checking /me shows the address is now verified.
login = await client.post(
"/api/v1/auth/login", json={"email": "verify@example.com", "password": "password123"}
)
me = await client.get("/api/v1/users/me", headers=auth(login.json()["token"]))
assert me.json()["email_verified_at"] is not None
async def test_password_reset_flow_revokes_old_sessions(client, mailer):
old_token = await register(client, "reset@example.com", password="password123")
resp = await client.post(
"/api/v1/auth/request-password-reset", json={"email": "reset@example.com"}
)
assert resp.status_code == 202
reset_token = token_from_link(mailer.resets[0][1])
resp = await client.post(
"/api/v1/auth/reset-password",
json={"token": reset_token, "new_password": "new-password456"},
)
assert resp.status_code == 204
# Old session is revoked.
assert (await client.get("/api/v1/users/me", headers=auth(old_token))).status_code == 401
# Old password no longer works; new one does.
assert (
await client.post(
"/api/v1/auth/login", json={"email": "reset@example.com", "password": "password123"}
)
).status_code == 401
assert (
await client.post(
"/api/v1/auth/login",
json={"email": "reset@example.com", "password": "new-password456"},
)
).status_code == 200
async def test_request_password_reset_unknown_email_still_accepted(client, mailer):
resp = await client.post(
"/api/v1/auth/request-password-reset", json={"email": "nobody@example.com"}
)
assert resp.status_code == 202
assert len(mailer.resets) == 0 # no email sent, but no enumeration either
async def test_logout_revokes_session(client):
token = await register(client, "logout@example.com")
assert (await client.get("/api/v1/users/me", headers=auth(token))).status_code == 200
assert (await client.post("/api/v1/auth/logout", headers=auth(token))).status_code == 204
assert (await client.get("/api/v1/users/me", headers=auth(token))).status_code == 401
async def test_unverified_user_works_by_default(client):
# Default (require_email_verification off): unverified accounts work as before.
token = await register(client, "open@example.com")
assert (await client.get("/api/v1/users/me", headers=auth(token))).status_code == 200
async def test_verification_gate_blocks_until_verified(client, mailer, monkeypatch):
from app.core.config import get_settings
monkeypatch.setattr(get_settings(), "require_email_verification", True)
reg = await client.post(
"/api/v1/auth/register", json={"email": "gate@example.com", "password": "password123"}
)
assert reg.status_code == 201
token = reg.json()["token"]
# The session issued at registration does not resolve while unverified...
assert (await client.get("/api/v1/users/me", headers=auth(token))).status_code == 401
# ...and login is refused with 403 (not 401 — credentials are valid).
blocked = await client.post(
"/api/v1/auth/login", json={"email": "gate@example.com", "password": "password123"}
)
assert blocked.status_code == 403
# Verify via the emailed link.
link = mailer.verifications[-1][1]
assert (
await client.post("/api/v1/auth/verify-email", json={"token": token_from_link(link)})
).status_code == 204
# Now login works and the session resolves.
ok = await client.post(
"/api/v1/auth/login", json={"email": "gate@example.com", "password": "password123"}
)
assert ok.status_code == 200
assert (
await client.get("/api/v1/users/me", headers=auth(ok.json()["token"]))
).status_code == 200