"""Tree membership management: list / add / change-role / remove. Only an owner may change membership. A tree must always keep at least one owner. The member list (which exposes user emails) is visible only to members — never to a non-member viewing a public/unlisted tree. """ import uuid from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import MembershipRole from app.models.tree import Tree, TreeMembership from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Conflict, Forbidden, NotFound async def _require_owner(session: AsyncSession, *, actor_id: uuid.UUID, tree: Tree) -> None: if await privacy.get_membership_role(session, actor_id, tree.id) is not MembershipRole.owner: raise Forbidden("only the owner can manage members") async def _owner_count(session: AsyncSession, tree_id: uuid.UUID) -> int: return ( await session.execute( select(func.count()) .select_from(TreeMembership) .where(TreeMembership.tree_id == tree_id, TreeMembership.role == MembershipRole.owner) ) ).scalar_one() def _row(m: TreeMembership, u: User) -> dict: return { "id": m.id, "user_id": u.id, "email": u.email, "display_name": u.display_name, "role": m.role, "created_at": m.created_at, } async def list_members(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> list[dict]: # Member-only: the list exposes emails, so a non-member (even on a public # tree) must not see it. if await privacy.get_membership_role(session, viewer_id, tree.id) is None: raise Forbidden("only members can see the member list") rows = ( await session.execute( select(TreeMembership, User) .join(User, User.id == TreeMembership.user_id) .where(TreeMembership.tree_id == tree.id) .order_by(TreeMembership.created_at) ) ).all() return [_row(m, u) for m, u in rows] async def add_member( session: AsyncSession, *, actor: User, tree: Tree, email: str, role: MembershipRole ) -> dict: await _require_owner(session, actor_id=actor.id, tree=tree) user = ( await session.execute( select(User).where(User.email == email, User.deleted_at.is_(None)) ) ).scalar_one_or_none() if user is None: raise NotFound("no user with that email on this instance") if await privacy.get_membership_role(session, user.id, tree.id) is not None: raise Conflict("that user is already a member") m = TreeMembership(tree_id=tree.id, user_id=user.id, role=role) session.add(m) record_audit( session, action="add_member", entity_type="Tree", entity_id=tree.id, tree_id=tree.id, actor_user_id=actor.id, after={"user_id": str(user.id), "role": role.value}, ) await session.commit() await session.refresh(m) return _row(m, user) async def _get_membership( session: AsyncSession, tree: Tree, membership_id: uuid.UUID ) -> TreeMembership: m = ( await session.execute( select(TreeMembership).where( TreeMembership.id == membership_id, TreeMembership.tree_id == tree.id ) ) ).scalar_one_or_none() if m is None: raise NotFound("member not found") return m async def update_member_role( session: AsyncSession, *, actor: User, tree: Tree, membership_id: uuid.UUID, role: MembershipRole, ) -> dict: await _require_owner(session, actor_id=actor.id, tree=tree) m = await _get_membership(session, tree, membership_id) if ( m.role == MembershipRole.owner and role != MembershipRole.owner and await _owner_count(session, tree.id) <= 1 ): raise Conflict("a tree must keep at least one owner") m.role = role record_audit( session, action="update_member", entity_type="Tree", entity_id=tree.id, tree_id=tree.id, actor_user_id=actor.id, after={"membership_id": str(m.id), "role": role.value}, ) await session.commit() await session.refresh(m) u = (await session.execute(select(User).where(User.id == m.user_id))).scalar_one() return _row(m, u) async def remove_member( session: AsyncSession, *, actor: User, tree: Tree, membership_id: uuid.UUID ) -> None: await _require_owner(session, actor_id=actor.id, tree=tree) m = await _get_membership(session, tree, membership_id) if m.role == MembershipRole.owner and await _owner_count(session, tree.id) <= 1: raise Conflict("a tree must keep at least one owner") await session.delete(m) record_audit( session, action="remove_member", entity_type="Tree", entity_id=tree.id, tree_id=tree.id, actor_user_id=actor.id, after={"membership_id": str(membership_id)}, ) await session.commit()