"""Person service. Writes require editor rights on the tree; reads run every person through the privacy engine. Each returned Person gets a transient ``primary_name`` for display (not persisted). """ import uuid from datetime import UTC, datetime from sqlalchemy import func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import PersonPrivacy from app.models.person import Name, Person from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Forbidden, NotFound from app.services.privacy import Visibility def _format_name(name: Name) -> str | None: parts = [name.given, name.surname] joined = " ".join(p for p in parts if p) return joined or name.display_name def _redact(person: Person) -> None: """Minimise a possibly-living person for a non-member view (transient only — never committed).""" person.primary_name = "Living person" person.gender = None person.is_living = True async def _attach_primary_name(session: AsyncSession, person: Person) -> None: stmt = ( select(Name) .where(Name.person_id == person.id, Name.deleted_at.is_(None)) .order_by(Name.is_primary.desc(), Name.sort_order) ) name = (await session.execute(stmt)).scalars().first() # Transient display attribute consumed by the PersonRead schema. person.primary_name = _format_name(name) if name is not None else None async def create_person( session: AsyncSession, *, actor: User, tree: Tree, given: str | None = None, surname: str | None = None, gender: str | None = None, is_living: bool | None = None, privacy_setting: PersonPrivacy = PersonPrivacy.inherit, notes: str | None = None, ) -> Person: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") person = Person( tree_id=tree.id, gender=gender, is_living=is_living, privacy=privacy_setting, notes=notes, ) session.add(person) await session.flush() # assign person.id if given or surname: session.add( Name( tree_id=tree.id, person_id=person.id, name_type="birth", given=given, surname=surname, is_primary=True, ) ) record_audit( session, action="create", entity_type="Person", entity_id=person.id, tree_id=tree.id, actor_user_id=actor.id, after={"given": given, "surname": surname}, ) await session.commit() await session.refresh(person) await _attach_primary_name(session, person) return person async def get_person( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, person_id: uuid.UUID ) -> Person: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None), ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") # Run the single person through the privacy engine (redaction lands Phase 2). vis = await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=person ) if vis == Visibility.hidden: raise NotFound("person not found") if vis == Visibility.redacted: _redact(person) else: await _attach_primary_name(session, person) return person async def delete_person( session: AsyncSession, *, actor: User, tree: Tree, person_id: uuid.UUID ) -> None: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None) ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") person.deleted_at = datetime.now(UTC) record_audit( session, action="delete", entity_type="Person", entity_id=person.id, tree_id=tree.id, actor_user_id=actor.id, ) await session.commit() async def restore_person( session: AsyncSession, *, actor: User, tree: Tree, person_id: uuid.UUID ) -> Person: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_not(None) ) ) ).scalar_one_or_none() if person is None: raise NotFound("deleted person not found") person.deleted_at = None record_audit( session, action="restore", entity_type="Person", entity_id=person.id, tree_id=tree.id, actor_user_id=actor.id, ) await session.commit() await session.refresh(person) await _attach_primary_name(session, person) return person async def list_deleted_persons( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree ) -> list[Person]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") stmt = ( select(Person) .where(Person.tree_id == tree.id, Person.deleted_at.is_not(None)) .order_by(Person.deleted_at.desc()) ) persons = list((await session.execute(stmt)).scalars().all()) for person in persons: await _attach_primary_name(session, person) return persons async def list_persons( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree ) -> list[Person]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") stmt = ( select(Person) .where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) .order_by(Person.created_at) ) persons = list((await session.execute(stmt)).scalars().all()) visible: list[Person] = [] for person in persons: vis = await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=person ) if vis == Visibility.hidden: continue if vis == Visibility.redacted: _redact(person) else: await _attach_primary_name(session, person) visible.append(person) return visible async def search_persons( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, query: str, limit: int = 50 ) -> list[Person]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") q = query.strip() if not q: return [] like = f"%{q}%" score = func.greatest( func.similarity(func.coalesce(Name.given, ""), q), func.similarity(func.coalesce(Name.surname, ""), q), ) sub = ( select(Name.person_id.label("pid"), func.max(score).label("score")) .where( Name.tree_id == tree.id, Name.deleted_at.is_(None), or_( Name.given.op("%")(q), Name.surname.op("%")(q), Name.given.ilike(like), Name.surname.ilike(like), ), ) .group_by(Name.person_id) .order_by(func.max(score).desc()) .limit(limit) .subquery() ) stmt = ( select(Person) .join(sub, sub.c.pid == Person.id) .where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) .order_by(sub.c.score.desc()) ) persons = list((await session.execute(stmt)).scalars().all()) out: list[Person] = [] for person in persons: vis = await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=person ) if vis == Visibility.hidden: continue if vis == Visibility.redacted: _redact(person) else: await _attach_primary_name(session, person) out.append(person) return out