"""Person service. Writes require editor rights on the tree; reads run every person through the privacy engine. Each returned Person gets a transient ``primary_name`` for display (not persisted). """ import uuid from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import PersonPrivacy from app.models.person import Name, Person from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Forbidden, NotFound from app.services.privacy import Visibility def _format_name(name: Name) -> str | None: parts = [name.given, name.surname] joined = " ".join(p for p in parts if p) return joined or name.display_name async def _attach_primary_name(session: AsyncSession, person: Person) -> None: stmt = ( select(Name) .where(Name.person_id == person.id, Name.deleted_at.is_(None)) .order_by(Name.is_primary.desc(), Name.sort_order) ) name = (await session.execute(stmt)).scalars().first() # Transient display attribute consumed by the PersonRead schema. person.primary_name = _format_name(name) if name is not None else None async def create_person( session: AsyncSession, *, actor: User, tree: Tree, given: str | None = None, surname: str | None = None, gender: str | None = None, is_living: bool | None = None, privacy_setting: PersonPrivacy = PersonPrivacy.inherit, notes: str | None = None, ) -> Person: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") person = Person( tree_id=tree.id, gender=gender, is_living=is_living, privacy=privacy_setting, notes=notes, ) session.add(person) await session.flush() # assign person.id if given or surname: session.add( Name( tree_id=tree.id, person_id=person.id, name_type="birth", given=given, surname=surname, is_primary=True, ) ) record_audit( session, action="create", entity_type="Person", entity_id=person.id, tree_id=tree.id, actor_user_id=actor.id, after={"given": given, "surname": surname}, ) await session.commit() await session.refresh(person) await _attach_primary_name(session, person) return person async def get_person( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, person_id: uuid.UUID ) -> Person: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None), ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") # Run the single person through the privacy engine (redaction lands Phase 2). if ( await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person) == Visibility.hidden ): raise NotFound("person not found") await _attach_primary_name(session, person) return person async def list_persons( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree ) -> list[Person]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") stmt = ( select(Person) .where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) .order_by(Person.created_at) ) persons = list((await session.execute(stmt)).scalars().all()) visible: list[Person] = [] for person in persons: if ( await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=person ) == Visibility.hidden ): continue await _attach_primary_name(session, person) visible.append(person) return visible