"""Person service. Writes require editor rights on the tree; reads run every person through the privacy engine. Each returned Person gets a transient ``primary_name`` for display (not persisted). """ import uuid from datetime import UTC, datetime from sqlalchemy import func, or_, select, update from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import PersonPrivacy, RelationshipType from app.models.person import Name, Person from app.models.relationship import Relationship from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Forbidden, NotFound from app.services.privacy import Visibility def _format_name(name: Name) -> str | None: parts = [name.given, name.surname] joined = " ".join(p for p in parts if p) return joined or name.display_name def _redact(person: Person) -> None: """Minimise a possibly-living person for a non-member view (transient only — never committed).""" person.primary_name = "Living person" person.gender = None person.is_living = True async def _attach_primary_name(session: AsyncSession, person: Person) -> None: stmt = ( select(Name) .where(Name.person_id == person.id, Name.deleted_at.is_(None)) .order_by(Name.is_primary.desc(), Name.sort_order) ) name = (await session.execute(stmt)).scalars().first() # Transient display attribute consumed by the PersonRead schema. person.primary_name = _format_name(name) if name is not None else None async def create_person( session: AsyncSession, *, actor: User, tree: Tree, given: str | None = None, surname: str | None = None, gender: str | None = None, is_living: bool | None = None, privacy_setting: PersonPrivacy = PersonPrivacy.inherit, notes: str | None = None, ) -> Person: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") person = Person( tree_id=tree.id, gender=gender, is_living=is_living, privacy=privacy_setting, notes=notes, ) session.add(person) await session.flush() # assign person.id if given or surname: session.add( Name( tree_id=tree.id, person_id=person.id, name_type="birth", given=given, surname=surname, is_primary=True, ) ) record_audit( session, action="create", entity_type="Person", entity_id=person.id, tree_id=tree.id, actor_user_id=actor.id, after={"given": given, "surname": surname}, ) await session.commit() await session.refresh(person) await _attach_primary_name(session, person) return person _PERSON_FIELDS = {"gender", "is_living", "privacy", "notes"} async def update_person( session: AsyncSession, *, actor: User, tree: Tree, person_id: uuid.UUID, changes: dict ) -> Person: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None) ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") for key in _PERSON_FIELDS & changes.keys(): setattr(person, key, changes[key]) if "given" in changes or "surname" in changes: name = ( await session.execute( select(Name) .where(Name.person_id == person.id, Name.deleted_at.is_(None)) .order_by(Name.is_primary.desc(), Name.sort_order) ) ).scalars().first() if name is None: name = Name(tree_id=tree.id, person_id=person.id, name_type="birth", is_primary=True) session.add(name) if "given" in changes: name.given = changes["given"] if "surname" in changes: name.surname = changes["surname"] name.display_name = None # rebuild display from parts record_audit( session, action="update", entity_type="Person", entity_id=person.id, tree_id=tree.id, actor_user_id=actor.id, after=changes, ) await session.commit() await session.refresh(person) await _attach_primary_name(session, person) return person async def get_person( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, person_id: uuid.UUID ) -> Person: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None), ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") # Run the single person through the privacy engine (redaction lands Phase 2). vis = await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=person ) if vis == Visibility.hidden: raise NotFound("person not found") if vis == Visibility.redacted: _redact(person) else: await _attach_primary_name(session, person) return person async def _children_of( session: AsyncSession, *, tree_id: uuid.UUID, parent_id: uuid.UUID ) -> list[uuid.UUID]: rows = ( await session.execute( select(Relationship.person_to_id).where( Relationship.tree_id == tree_id, Relationship.deleted_at.is_(None), Relationship.type == RelationshipType.parent_child, Relationship.person_from_id == parent_id, ) ) ).scalars().all() return list(rows) async def _soft_delete_one( session: AsyncSession, *, actor: User, tree: Tree, person: Person, now: datetime ) -> None: """Soft-delete a single person and the relationships touching them, so no dangling edges are left to break the tree view.""" person.deleted_at = now rels = ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None), or_( Relationship.person_from_id == person.id, Relationship.person_to_id == person.id, ), ) ) ).scalars().all() for rel in rels: rel.deleted_at = now record_audit( session, action="delete", entity_type="Person", entity_id=person.id, tree_id=tree.id, actor_user_id=actor.id, after={"cascaded_relationships": len(rels)}, ) async def delete_person( session: AsyncSession, *, actor: User, tree: Tree, person_id: uuid.UUID, cascade: bool = False, ) -> int: """Soft-delete a person. Always removes the relationships that touch them (preventing dangling edges). With ``cascade=True``, recursively deletes their descendants too — handy for pruning a bad GEDCOM import. Returns the number of persons deleted.""" if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None) ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") now = datetime.now(UTC) # Gather the set of persons to delete. For cascade, walk descendants # breadth-first, guarding against cycles. to_delete: list[Person] = [person] if cascade: seen = {person.id} frontier = [person.id] while frontier: nxt: list[uuid.UUID] = [] for pid in frontier: for child_id in await _children_of(session, tree_id=tree.id, parent_id=pid): if child_id not in seen: seen.add(child_id) nxt.append(child_id) frontier = nxt extra_ids = [pid for pid in seen if pid != person.id] if extra_ids: extra = ( await session.execute( select(Person).where( Person.id.in_(extra_ids), Person.tree_id == tree.id, Person.deleted_at.is_(None), ) ) ).scalars().all() to_delete.extend(extra) for p in to_delete: await _soft_delete_one(session, actor=actor, tree=tree, person=p, now=now) # Soft delete leaves the row in place, so the DB-level "ON DELETE SET NULL" # never fires — clear any account's self-person link to a deleted person. await session.execute( update(User) .where(User.self_person_id.in_([p.id for p in to_delete])) .values(self_person_id=None) ) await session.commit() return len(to_delete) async def restore_person( session: AsyncSession, *, actor: User, tree: Tree, person_id: uuid.UUID ) -> Person: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_not(None) ) ) ).scalar_one_or_none() if person is None: raise NotFound("deleted person not found") person.deleted_at = None record_audit( session, action="restore", entity_type="Person", entity_id=person.id, tree_id=tree.id, actor_user_id=actor.id, ) await session.commit() await session.refresh(person) await _attach_primary_name(session, person) return person async def list_deleted_persons( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree ) -> list[Person]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") stmt = ( select(Person) .where(Person.tree_id == tree.id, Person.deleted_at.is_not(None)) .order_by(Person.deleted_at.desc()) ) persons = list((await session.execute(stmt)).scalars().all()) for person in persons: await _attach_primary_name(session, person) return persons async def list_persons( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree ) -> list[Person]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") stmt = ( select(Person) .where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) .order_by(Person.created_at) ) persons = list((await session.execute(stmt)).scalars().all()) visible: list[Person] = [] for person in persons: vis = await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=person ) if vis == Visibility.hidden: continue if vis == Visibility.redacted: _redact(person) else: await _attach_primary_name(session, person) visible.append(person) return visible async def search_persons( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, query: str, limit: int = 50 ) -> list[Person]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") q = query.strip() if not q: return [] like = f"%{q}%" score = func.greatest( func.similarity(func.coalesce(Name.given, ""), q), func.similarity(func.coalesce(Name.surname, ""), q), ) sub = ( select(Name.person_id.label("pid"), func.max(score).label("score")) .where( Name.tree_id == tree.id, Name.deleted_at.is_(None), or_( Name.given.op("%")(q), Name.surname.op("%")(q), Name.given.ilike(like), Name.surname.ilike(like), ), ) .group_by(Name.person_id) .order_by(func.max(score).desc()) .limit(limit) .subquery() ) stmt = ( select(Person) .join(sub, sub.c.pid == Person.id) .where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) .order_by(sub.c.score.desc()) ) persons = list((await session.execute(stmt)).scalars().all()) out: list[Person] = [] for person in persons: vis = await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=person ) if vis == Visibility.hidden: continue if vis == Visibility.redacted: _redact(person) else: await _attach_primary_name(session, person) out.append(person) return out