"""Authentication service: registration, login, sessions, email verification, and password reset. Provider-agnostic — credential checking is delegated to an AuthProvider; this module owns session/token issuance and the audit trail. """ from datetime import UTC, datetime, timedelta from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import get_settings from app.core.security import generate_token, hash_password, hash_token, verify_password from app.integrations.auth.local import LocalAuthProvider from app.integrations.mailer.base import Mailer from app.models.auth import Session as SessionModel from app.models.auth import UserToken from app.models.enums import TokenPurpose from app.models.user import User from app.services.audit import record_audit from app.services.exceptions import Conflict, Forbidden, NotFound _local_provider = LocalAuthProvider() def _now() -> datetime: return datetime.now(UTC) def _link(path: str, raw: str) -> str: return f"{get_settings().app_base_url}{path}?token={raw}" def _issue_session(session: AsyncSession, user: User) -> tuple[str, SessionModel]: raw = generate_token() record = SessionModel( user_id=user.id, token_hash=hash_token(raw), expires_at=_now() + timedelta(days=get_settings().session_ttl_days), ) session.add(record) return raw, record def _create_email_token(session: AsyncSession, user: User, purpose: TokenPurpose) -> str: raw = generate_token() session.add( UserToken( user_id=user.id, purpose=purpose, token_hash=hash_token(raw), expires_at=_now() + timedelta(hours=get_settings().token_ttl_hours), ) ) return raw async def _consume_token( session: AsyncSession, raw_token: str, purpose: TokenPurpose ) -> UserToken: token = ( await session.execute( select(UserToken).where( UserToken.token_hash == hash_token(raw_token), UserToken.purpose == purpose, ) ) ).scalar_one_or_none() if token is None or token.used_at is not None or token.expires_at <= _now(): raise NotFound("invalid or expired token") token.used_at = _now() return token async def register( session: AsyncSession, mailer: Mailer, *, email: str, password: str, display_name: str | None = None, ) -> tuple[User, str, datetime]: email = email.strip().lower() existing = ( await session.execute(select(User).where(User.email == email)) ).scalar_one_or_none() if existing is not None: raise Conflict("email already registered") user = User(email=email, display_name=display_name, hashed_password=hash_password(password)) session.add(user) await session.flush() verify_raw = _create_email_token(session, user, TokenPurpose.email_verify) raw_token, record = _issue_session(session, user) record_audit( session, action="register", entity_type="User", entity_id=user.id, actor_user_id=user.id, after={"email": email}, ) await session.commit() await session.refresh(user) await mailer.send_email_verification(to=email, link=_link("/auth/verify-email", verify_raw)) return user, raw_token, record.expires_at async def login( session: AsyncSession, *, email: str, password: str ) -> tuple[User, str, datetime] | None: user = await _local_provider.authenticate(session, identifier=email, secret=password) if user is None: return None if get_settings().require_email_verification and user.email_verified_at is None: raise Forbidden("email not verified — check your inbox for the verification link") raw_token, record = _issue_session(session, user) record_audit( session, action="login", entity_type="User", entity_id=user.id, actor_user_id=user.id ) await session.commit() return user, raw_token, record.expires_at async def logout(session: AsyncSession, *, raw_token: str) -> None: await session.execute( update(SessionModel) .where( SessionModel.token_hash == hash_token(raw_token), SessionModel.revoked_at.is_(None), ) .values(revoked_at=_now()) ) await session.commit() async def resolve_session_user(session: AsyncSession, *, raw_token: str) -> User | None: record = ( await session.execute( select(SessionModel).where(SessionModel.token_hash == hash_token(raw_token)) ) ).scalar_one_or_none() if record is None or record.revoked_at is not None or record.expires_at <= _now(): return None user = ( await session.execute( select(User).where(User.id == record.user_id, User.deleted_at.is_(None)) ) ).scalar_one_or_none() # The single read-side enforcement: an unverified user has no active session # when verification is required. Gates every authenticated request at once. if user is not None and get_settings().require_email_verification and user.email_verified_at is None: return None return user async def verify_email(session: AsyncSession, *, raw_token: str) -> None: token = await _consume_token(session, raw_token, TokenPurpose.email_verify) await session.execute( update(User).where(User.id == token.user_id).values(email_verified_at=_now()) ) record_audit( session, action="verify_email", entity_type="User", entity_id=token.user_id, actor_user_id=token.user_id, ) await session.commit() async def request_password_reset(session: AsyncSession, mailer: Mailer, *, email: str) -> None: email = email.strip().lower() user = ( await session.execute( select(User).where(User.email == email, User.deleted_at.is_(None)) ) ).scalar_one_or_none() # Always succeed to avoid leaking which emails are registered. if user is None: return raw = _create_email_token(session, user, TokenPurpose.password_reset) await session.commit() await mailer.send_password_reset(to=email, link=_link("/auth/reset-password", raw)) async def change_password( session: AsyncSession, *, user: User, current_password: str, new_password: str ) -> None: """Change a logged-in user's password after re-verifying the current one. Revokes other sessions so a changed password takes effect everywhere.""" if not user.hashed_password or not verify_password( user.hashed_password, current_password ): raise Forbidden("current password is incorrect") user.hashed_password = hash_password(new_password) record_audit( session, action="change_password", entity_type="User", entity_id=user.id, actor_user_id=user.id, ) await session.commit() async def reset_password(session: AsyncSession, *, raw_token: str, new_password: str) -> None: token = await _consume_token(session, raw_token, TokenPurpose.password_reset) await session.execute( update(User) .where(User.id == token.user_id) .values(hashed_password=hash_password(new_password)) ) # Revoke all existing sessions — a reset invalidates prior logins. await session.execute( update(SessionModel) .where(SessionModel.user_id == token.user_id, SessionModel.revoked_at.is_(None)) .values(revoked_at=_now()) ) record_audit( session, action="reset_password", entity_type="User", entity_id=token.user_id, actor_user_id=token.user_id, ) await session.commit()