"""Read-only, redaction-safe projections for the public viewing surface. INVARIANT (CLAUDE.md #2): everything returned here has passed through ``privacy.person_visibility``. A non-member must never receive a possibly-living person's real name, dates, alternate names, or media. The rules: - persons : redacted (living → "Living person"); hidden dropped. - relationships : only when BOTH endpoints are non-hidden (a link to a redacted person is fine — the name is already hidden). - events : only for FULL-visibility persons; partnership events only when BOTH partners are full (a marriage date would leak a living partner's timeline otherwise). - names : only for FULL-visibility persons. - media : NOT exposed yet (deferred — see docs/design/tree-visibility.md). A tree that isn't viewable raises NotFound (never Forbidden) so the public surface can't be used to probe whether a private tree exists. """ import uuid from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import TreeVisibility from app.models.event import Event from app.models.person import Name, Person from app.models.relationship import Relationship from app.models.tree import Tree from app.services import privacy from app.services.exceptions import NotFound from app.services.person_service import _attach_primary_name, _redact from app.services.privacy import Visibility async def get_public_tree( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree_id: uuid.UUID ) -> Tree: tree = ( await session.execute( select(Tree).where(Tree.id == tree_id, Tree.deleted_at.is_(None)) ) ).scalar_one_or_none() # 404 (not 403) when not viewable: don't reveal that a private tree exists. if tree is None or not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise NotFound("tree not found") return tree async def _persons(session: AsyncSession, tree: Tree) -> list[Person]: return list( ( await session.execute( select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) ) ) .scalars() .all() ) async def _visibility_map( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, persons: list[Person] ) -> dict[uuid.UUID, Visibility]: return { p.id: await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=p ) for p in persons } async def list_public_persons( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Person]: out: list[Person] = [] for p in await _persons(session, tree): vis = await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=p) if vis == Visibility.hidden: continue if vis == Visibility.redacted: _redact(p) else: await _attach_primary_name(session, p) out.append(p) return out async def get_public_person( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> Person: person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None), ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") vis = await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person) if vis == Visibility.hidden: raise NotFound("person not found") if vis == Visibility.redacted: _redact(person) else: await _attach_primary_name(session, person) return person async def _person_visibility( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> Visibility | None: person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None), ) ) ).scalar_one_or_none() if person is None: return None return await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person) async def list_public_relationships( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Relationship]: persons = await _persons(session, tree) vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons) nonhidden = {pid for pid, v in vis.items() if v != Visibility.hidden} rels = list( ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None) ) ) ) .scalars() .all() ) return [ r for r in rels if r.person_from_id in nonhidden and r.person_to_id in nonhidden ] async def list_public_events( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Event]: persons = await _persons(session, tree) vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons) full = {pid for pid, v in vis.items() if v == Visibility.full} rels = { r.id: r for r in ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None) ) ) ) .scalars() .all() } events = list( ( await session.execute( select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None)) ) ) .scalars() .all() ) out: list[Event] = [] for e in events: if e.person_id is not None: if e.person_id in full: out.append(e) elif e.relationship_id is not None: r = rels.get(e.relationship_id) if r is not None and r.person_from_id in full and r.person_to_id in full: out.append(e) return out async def list_public_person_names( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> list[Name]: vis = await _person_visibility(session, viewer_id=viewer_id, tree=tree, person_id=person_id) if vis is None: raise NotFound("person not found") if vis != Visibility.full: return [] # redacted/hidden → no names (the real name must not leak) return list( ( await session.execute( select(Name) .where(Name.person_id == person_id, Name.deleted_at.is_(None)) .order_by(Name.is_primary.desc(), Name.sort_order, Name.created_at) ) ) .scalars() .all() ) async def list_public_person_events( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> list[Event]: vis = await _person_visibility(session, viewer_id=viewer_id, tree=tree, person_id=person_id) if vis is None: raise NotFound("person not found") if vis != Visibility.full: return [] # redacted/hidden → no dates return list( ( await session.execute( select(Event) .where( Event.person_id == person_id, Event.tree_id == tree.id, Event.deleted_at.is_(None), ) .order_by(Event.date_start.nulls_last(), Event.created_at) ) ) .scalars() .all() ) async def list_public_trees( session: AsyncSession, *, viewer_id: uuid.UUID | None, q: str | None = None, limit: int = 50, offset: int = 0, ) -> list[Tree]: # Anonymous: only `public`. Authenticated: also `site_members`. Never list # `unlisted` (reachable by link only) or `private`. allowed = [TreeVisibility.public] if viewer_id is not None: allowed.append(TreeVisibility.site_members) stmt = select(Tree).where( Tree.deleted_at.is_(None), Tree.visibility.in_(allowed) ) if q and q.strip(): stmt = stmt.where(Tree.name.ilike(f"%{q.strip()}%")) stmt = stmt.order_by(Tree.name).limit(min(limit, 100)).offset(max(offset, 0)) return list((await session.execute(stmt)).scalars().all())