"""Read-only, redaction-safe projections for the public viewing surface. INVARIANT (CLAUDE.md #2): everything returned here has passed through ``privacy.person_visibility``. A non-member must never receive a possibly-living person's real name, dates, alternate names, or media. The rules: - persons : redacted (living → "Living person"); hidden dropped. - relationships : only when BOTH endpoints are non-hidden (a link to a redacted person is fine — the name is already hidden). - events : only for FULL-visibility persons; partnership events only when BOTH partners are full (a marriage date would leak a living partner's timeline otherwise). - names : only for FULL-visibility persons. - media : NOT exposed yet (deferred — see docs/design/tree-visibility.md). - citations : only when the cited fact resolves to FULL person(s). - sources : only when they back at least one visible citation. A tree that isn't viewable raises NotFound (never Forbidden) so the public surface can't be used to probe whether a private tree exists. """ import uuid from sqlalchemy import or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import TreeVisibility from app.models.event import Event from app.models.media import Media from app.models.person import Name, Person from app.models.relationship import Relationship from app.models.source import Citation, Source from app.models.tree import Tree from app.services import privacy from app.services.exceptions import NotFound from app.services.person_service import ( _attach_primary_name, _attach_primary_names, _redact, ) from app.services.privacy import Visibility async def get_public_tree( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree_id: uuid.UUID ) -> Tree: tree = ( await session.execute( select(Tree).where(Tree.id == tree_id, Tree.deleted_at.is_(None)) ) ).scalar_one_or_none() # 404 (not 403) when not viewable: don't reveal that a private tree exists. if tree is None or not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise NotFound("tree not found") return tree async def _persons(session: AsyncSession, tree: Tree) -> list[Person]: return list( ( await session.execute( select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) ) ) .scalars() .all() ) async def _visibility_map( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, persons: list[Person] ) -> dict[uuid.UUID, Visibility]: return { p.id: await privacy.person_visibility( session, user_id=viewer_id, tree=tree, person=p ) for p in persons } async def list_public_persons( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Person]: out: list[Person] = [] full: list[Person] = [] for p in await _persons(session, tree): vis = await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=p) if vis == Visibility.hidden: continue if vis == Visibility.redacted: _redact(p) else: full.append(p) out.append(p) await _attach_primary_names(session, full) # one query, not one per person return out async def get_public_person( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> Person: person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None), ) ) ).scalar_one_or_none() if person is None: raise NotFound("person not found") vis = await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person) if vis == Visibility.hidden: raise NotFound("person not found") if vis == Visibility.redacted: _redact(person) else: await _attach_primary_name(session, person) return person async def _person_visibility( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> Visibility | None: person = ( await session.execute( select(Person).where( Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None), ) ) ).scalar_one_or_none() if person is None: return None return await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person) async def list_public_relationships( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Relationship]: persons = await _persons(session, tree) vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons) nonhidden = {pid for pid, v in vis.items() if v != Visibility.hidden} rels = list( ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None) ) ) ) .scalars() .all() ) return [ r for r in rels if r.person_from_id in nonhidden and r.person_to_id in nonhidden ] async def list_public_events( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Event]: persons = await _persons(session, tree) vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons) full = {pid for pid, v in vis.items() if v == Visibility.full} rels = { r.id: r for r in ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None) ) ) ) .scalars() .all() } events = list( ( await session.execute( select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None)) ) ) .scalars() .all() ) out: list[Event] = [] for e in events: if e.person_id is not None: if e.person_id in full: out.append(e) elif e.relationship_id is not None: r = rels.get(e.relationship_id) if r is not None and r.person_from_id in full and r.person_to_id in full: out.append(e) return out async def list_public_person_names( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> list[Name]: vis = await _person_visibility(session, viewer_id=viewer_id, tree=tree, person_id=person_id) if vis is None: raise NotFound("person not found") if vis != Visibility.full: return [] # redacted/hidden → no names (the real name must not leak) return list( ( await session.execute( select(Name) .where(Name.person_id == person_id, Name.deleted_at.is_(None)) .order_by(Name.is_primary.desc(), Name.sort_order, Name.created_at) ) ) .scalars() .all() ) async def list_public_person_events( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> list[Event]: vis = await _person_visibility(session, viewer_id=viewer_id, tree=tree, person_id=person_id) if vis is None: raise NotFound("person not found") if vis != Visibility.full: return [] # redacted/hidden → no dates return list( ( await session.execute( select(Event) .where( Event.person_id == person_id, Event.tree_id == tree.id, Event.deleted_at.is_(None), ) .order_by(Event.date_start.nulls_last(), Event.created_at) ) ) .scalars() .all() ) async def list_public_relationships_for_person( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID ) -> list[Relationship]: persons = await _persons(session, tree) vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons) if vis.get(person_id) in (None, Visibility.hidden): return [] nonhidden = {pid for pid, v in vis.items() if v != Visibility.hidden} rels = list( ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None), or_( Relationship.person_from_id == person_id, Relationship.person_to_id == person_id, ), ) ) ) .scalars() .all() ) return [r for r in rels if r.person_from_id in nonhidden and r.person_to_id in nonhidden] async def list_public_media( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Media]: """Only media linked to a FULL-visibility person. Media without a person (or linked only to an event/source) is not exposed to non-members — a photo of a living person must never leak.""" persons = await _persons(session, tree) vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons) full = {pid for pid, v in vis.items() if v == Visibility.full} media = list( ( await session.execute( select(Media).where(Media.tree_id == tree.id, Media.deleted_at.is_(None)) ) ) .scalars() .all() ) return [m for m in media if m.person_id is not None and m.person_id in full] async def can_view_media( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, media: Media ) -> bool: """Whether a non-member may see/download a single media item: only when it is linked to a FULL-visibility person.""" if media.person_id is None: return False vis = await _person_visibility( session, viewer_id=viewer_id, tree=tree, person_id=media.person_id ) return vis == Visibility.full async def _full_person_ids( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> set[uuid.UUID]: persons = await _persons(session, tree) vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons) return {pid for pid, v in vis.items() if v == Visibility.full} async def list_public_citations( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Citation]: """Only citations whose cited fact resolves to FULL-visibility person(s). A citation on a redacted/hidden person's fact (or a partnership where either partner isn't full) is dropped — its existence plus page/detail would leak that the person has that sourced fact. Mirrors the events/names rule (FULL only).""" full = await _full_person_ids(session, viewer_id=viewer_id, tree=tree) async def _by_id(model): rows = ( await session.execute( select(model).where(model.tree_id == tree.id, model.deleted_at.is_(None)) ) ).scalars().all() return {r.id: r for r in rows} names = await _by_id(Name) rels = await _by_id(Relationship) events = await _by_id(Event) def target_is_full(c: Citation) -> bool: if c.person_id is not None: return c.person_id in full if c.name_id is not None: n = names.get(c.name_id) return n is not None and n.person_id in full if c.event_id is not None: e = events.get(c.event_id) if e is None: return False if e.person_id is not None: return e.person_id in full if e.relationship_id is not None: r = rels.get(e.relationship_id) return r is not None and r.person_from_id in full and r.person_to_id in full return False if c.relationship_id is not None: r = rels.get(c.relationship_id) return r is not None and r.person_from_id in full and r.person_to_id in full return False citations = ( await session.execute( select(Citation) .where(Citation.tree_id == tree.id, Citation.deleted_at.is_(None)) .order_by(Citation.created_at) ) ).scalars().all() return [c for c in citations if target_is_full(c)] async def list_public_sources( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree ) -> list[Source]: """Only sources backing at least one visible citation. A source used solely for a redacted/hidden person's facts is withheld — its title or notes could name that living person.""" visible = await list_public_citations(session, viewer_id=viewer_id, tree=tree) cited = {c.source_id for c in visible} sources = ( await session.execute( select(Source) .where(Source.tree_id == tree.id, Source.deleted_at.is_(None)) .order_by(Source.title) ) ).scalars().all() return [s for s in sources if s.id in cited] async def get_public_source( session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, source_id: uuid.UUID ) -> Source: for s in await list_public_sources(session, viewer_id=viewer_id, tree=tree): if s.id == source_id: return s # 404 (not 403): don't reveal that a withheld source exists. raise NotFound("source not found") async def list_public_trees( session: AsyncSession, *, viewer_id: uuid.UUID | None, q: str | None = None, limit: int = 50, offset: int = 0, ) -> list[Tree]: # Anonymous: only `public`. Authenticated: also `site_members`. Never list # `unlisted` (reachable by link only) or `private`. allowed = [TreeVisibility.public] if viewer_id is not None: allowed.append(TreeVisibility.site_members) stmt = select(Tree).where( Tree.deleted_at.is_(None), Tree.visibility.in_(allowed) ) if q and q.strip(): stmt = stmt.where(Tree.name.ilike(f"%{q.strip()}%")) stmt = stmt.order_by(Tree.name).limit(min(limit, 100)).offset(max(offset, 0)) return list((await session.execute(stmt)).scalars().all())