"""Name service. A Person carries one or more Name rows — a primary (typically the birth/maiden name) plus typed alternates (married, alias, religious, …). Exactly one name is primary at a time; it drives display everywhere. Writes require editor rights; reads go through the tree's view check. """ import uuid from datetime import UTC, datetime from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.models.person import Name, Person from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Forbidden, NotFound async def _get_person(session: AsyncSession, *, tree: Tree, person_id: uuid.UUID) -> Person: person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None) ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") return person async def _clear_primary( session: AsyncSession, *, person_id: uuid.UUID, keep: uuid.UUID | None ) -> None: """Demote every other name so exactly one stays primary.""" stmt = ( update(Name) .where(Name.person_id == person_id, Name.deleted_at.is_(None), Name.is_primary.is_(True)) .values(is_primary=False) ) if keep is not None: stmt = stmt.where(Name.id != keep) await session.execute(stmt) async def list_names( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, person_id: uuid.UUID ) -> list[Name]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") await _get_person(session, tree=tree, person_id=person_id) # Non-members: a redacted/hidden person's real names must not leak. if await privacy.get_membership_role(session, viewer_id, tree.id) is None: from app.services import public_view_service return await public_view_service.list_public_person_names( session, viewer_id=viewer_id, tree=tree, person_id=person_id ) stmt = ( select(Name) .where(Name.person_id == person_id, Name.deleted_at.is_(None)) .order_by(Name.is_primary.desc(), Name.sort_order, Name.created_at) ) return list((await session.execute(stmt)).scalars().all()) async def create_name( session: AsyncSession, *, actor: User, tree: Tree, person_id: uuid.UUID, name_type: str = "birth", given: str | None = None, surname: str | None = None, prefix: str | None = None, suffix: str | None = None, nickname: str | None = None, is_primary: bool = False, ) -> Name: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") await _get_person(session, tree=tree, person_id=person_id) # First name for a person is always primary; otherwise honor the flag. existing = ( await session.execute( select(Name.id).where(Name.person_id == person_id, Name.deleted_at.is_(None)) ) ).first() primary = is_primary or existing is None if primary: await _clear_primary(session, person_id=person_id, keep=None) name = Name( tree_id=tree.id, person_id=person_id, name_type=name_type, given=given, surname=surname, prefix=prefix, suffix=suffix, nickname=nickname, is_primary=primary, ) session.add(name) await session.flush() record_audit( session, action="create", entity_type="Name", entity_id=name.id, tree_id=tree.id, actor_user_id=actor.id, after={"name_type": name_type, "given": given, "surname": surname}, ) await session.commit() await session.refresh(name) return name _NAME_FIELDS = {"name_type", "given", "surname", "prefix", "suffix", "nickname"} async def update_name( session: AsyncSession, *, actor: User, tree: Tree, person_id: uuid.UUID, name_id: uuid.UUID, changes: dict, ) -> Name: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") name = ( await session.execute( select(Name).where( Name.id == name_id, Name.person_id == person_id, Name.tree_id == tree.id, Name.deleted_at.is_(None), ) ) ).scalar_one_or_none() if name is None: raise NotFound("name not found") for key in _NAME_FIELDS & changes.keys(): setattr(name, key, changes[key]) if changes.get("is_primary") is True: await _clear_primary(session, person_id=person_id, keep=name.id) name.is_primary = True record_audit( session, action="update", entity_type="Name", entity_id=name.id, tree_id=tree.id, actor_user_id=actor.id, after=changes, ) await session.commit() await session.refresh(name) return name async def delete_name( session: AsyncSession, *, actor: User, tree: Tree, person_id: uuid.UUID, name_id: uuid.UUID, ) -> None: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") name = ( await session.execute( select(Name).where( Name.id == name_id, Name.person_id == person_id, Name.tree_id == tree.id, Name.deleted_at.is_(None), ) ) ).scalar_one_or_none() if name is None: raise NotFound("name not found") name.deleted_at = datetime.now(UTC) was_primary = name.is_primary name.is_primary = False record_audit( session, action="delete", entity_type="Name", entity_id=name.id, tree_id=tree.id, actor_user_id=actor.id, ) # Promote another name to primary so the person never loses their display name. if was_primary: nxt = ( await session.execute( select(Name) .where(Name.person_id == person_id, Name.deleted_at.is_(None)) .order_by(Name.sort_order, Name.created_at) ) ).scalars().first() if nxt is not None: nxt.is_primary = True await session.commit()