Files
provenance/backend/app/services/public_view_service.py
T
justin 8b91326481 Fix leak: redact per-person on authed non-member reads
A logged-in NON-member of a public/unlisted tree could read living people's
dates, real alternate names, and media (incl. downloading photos) through the
family-view endpoints — only the person LIST was redacted; list_events,
list_relationships, list_names, list_media gated on can_view_tree alone.

For non-members, these now delegate to the same visibility-filtered reads the
public surface uses (person_visibility-driven): living-person events/names
dropped, relationships touching a hidden person dropped, media limited to
full-visibility persons, and media download (get_media → media_content) 404s
for a redacted/unlinked person's media. Members are unchanged.

Adds list_public_relationships_for_person / list_public_media / can_view_media
to public_view_service. Test: an authed non-member sees no living-person PII
across events/names/relationships/media and can't download a living person's
file, while the owner still sees everything. Full suite: 72 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-09 09:26:53 -04:00

319 lines
11 KiB
Python

"""Read-only, redaction-safe projections for the public viewing surface.
INVARIANT (CLAUDE.md #2): everything returned here has passed through
``privacy.person_visibility``. A non-member must never receive a possibly-living
person's real name, dates, alternate names, or media. The rules:
- persons : redacted (living → "Living person"); hidden dropped.
- relationships : only when BOTH endpoints are non-hidden (a link to a
redacted person is fine — the name is already hidden).
- events : only for FULL-visibility persons; partnership events only
when BOTH partners are full (a marriage date would leak a
living partner's timeline otherwise).
- names : only for FULL-visibility persons.
- media : NOT exposed yet (deferred — see docs/design/tree-visibility.md).
A tree that isn't viewable raises NotFound (never Forbidden) so the public
surface can't be used to probe whether a private tree exists.
"""
import uuid
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.enums import TreeVisibility
from app.models.event import Event
from app.models.media import Media
from app.models.person import Name, Person
from app.models.relationship import Relationship
from app.models.tree import Tree
from app.services import privacy
from app.services.exceptions import NotFound
from app.services.person_service import _attach_primary_name, _redact
from app.services.privacy import Visibility
async def get_public_tree(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree_id: uuid.UUID
) -> Tree:
tree = (
await session.execute(
select(Tree).where(Tree.id == tree_id, Tree.deleted_at.is_(None))
)
).scalar_one_or_none()
# 404 (not 403) when not viewable: don't reveal that a private tree exists.
if tree is None or not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise NotFound("tree not found")
return tree
async def _persons(session: AsyncSession, tree: Tree) -> list[Person]:
return list(
(
await session.execute(
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
)
)
.scalars()
.all()
)
async def _visibility_map(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, persons: list[Person]
) -> dict[uuid.UUID, Visibility]:
return {
p.id: await privacy.person_visibility(
session, user_id=viewer_id, tree=tree, person=p
)
for p in persons
}
async def list_public_persons(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Person]:
out: list[Person] = []
for p in await _persons(session, tree):
vis = await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=p)
if vis == Visibility.hidden:
continue
if vis == Visibility.redacted:
_redact(p)
else:
await _attach_primary_name(session, p)
out.append(p)
return out
async def get_public_person(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> Person:
person = (
await session.execute(
select(Person).where(
Person.id == person_id,
Person.tree_id == tree.id,
Person.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if person is None:
raise NotFound("person not found")
vis = await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person)
if vis == Visibility.hidden:
raise NotFound("person not found")
if vis == Visibility.redacted:
_redact(person)
else:
await _attach_primary_name(session, person)
return person
async def _person_visibility(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> Visibility | None:
person = (
await session.execute(
select(Person).where(
Person.id == person_id,
Person.tree_id == tree.id,
Person.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if person is None:
return None
return await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person)
async def list_public_relationships(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Relationship]:
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
nonhidden = {pid for pid, v in vis.items() if v != Visibility.hidden}
rels = list(
(
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
)
)
)
.scalars()
.all()
)
return [
r for r in rels if r.person_from_id in nonhidden and r.person_to_id in nonhidden
]
async def list_public_events(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Event]:
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
full = {pid for pid, v in vis.items() if v == Visibility.full}
rels = {
r.id: r
for r in (
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
)
)
)
.scalars()
.all()
}
events = list(
(
await session.execute(
select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None))
)
)
.scalars()
.all()
)
out: list[Event] = []
for e in events:
if e.person_id is not None:
if e.person_id in full:
out.append(e)
elif e.relationship_id is not None:
r = rels.get(e.relationship_id)
if r is not None and r.person_from_id in full and r.person_to_id in full:
out.append(e)
return out
async def list_public_person_names(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> list[Name]:
vis = await _person_visibility(session, viewer_id=viewer_id, tree=tree, person_id=person_id)
if vis is None:
raise NotFound("person not found")
if vis != Visibility.full:
return [] # redacted/hidden → no names (the real name must not leak)
return list(
(
await session.execute(
select(Name)
.where(Name.person_id == person_id, Name.deleted_at.is_(None))
.order_by(Name.is_primary.desc(), Name.sort_order, Name.created_at)
)
)
.scalars()
.all()
)
async def list_public_person_events(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> list[Event]:
vis = await _person_visibility(session, viewer_id=viewer_id, tree=tree, person_id=person_id)
if vis is None:
raise NotFound("person not found")
if vis != Visibility.full:
return [] # redacted/hidden → no dates
return list(
(
await session.execute(
select(Event)
.where(
Event.person_id == person_id,
Event.tree_id == tree.id,
Event.deleted_at.is_(None),
)
.order_by(Event.date_start.nulls_last(), Event.created_at)
)
)
.scalars()
.all()
)
async def list_public_relationships_for_person(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> list[Relationship]:
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
if vis.get(person_id) in (None, Visibility.hidden):
return []
nonhidden = {pid for pid, v in vis.items() if v != Visibility.hidden}
rels = list(
(
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id,
Relationship.deleted_at.is_(None),
or_(
Relationship.person_from_id == person_id,
Relationship.person_to_id == person_id,
),
)
)
)
.scalars()
.all()
)
return [r for r in rels if r.person_from_id in nonhidden and r.person_to_id in nonhidden]
async def list_public_media(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Media]:
"""Only media linked to a FULL-visibility person. Media without a person (or
linked only to an event/source) is not exposed to non-members — a photo of a
living person must never leak."""
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
full = {pid for pid, v in vis.items() if v == Visibility.full}
media = list(
(
await session.execute(
select(Media).where(Media.tree_id == tree.id, Media.deleted_at.is_(None))
)
)
.scalars()
.all()
)
return [m for m in media if m.person_id is not None and m.person_id in full]
async def can_view_media(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, media: Media
) -> bool:
"""Whether a non-member may see/download a single media item: only when it is
linked to a FULL-visibility person."""
if media.person_id is None:
return False
vis = await _person_visibility(
session, viewer_id=viewer_id, tree=tree, person_id=media.person_id
)
return vis == Visibility.full
async def list_public_trees(
session: AsyncSession,
*,
viewer_id: uuid.UUID | None,
q: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[Tree]:
# Anonymous: only `public`. Authenticated: also `site_members`. Never list
# `unlisted` (reachable by link only) or `private`.
allowed = [TreeVisibility.public]
if viewer_id is not None:
allowed.append(TreeVisibility.site_members)
stmt = select(Tree).where(
Tree.deleted_at.is_(None), Tree.visibility.in_(allowed)
)
if q and q.strip():
stmt = stmt.where(Tree.name.ilike(f"%{q.strip()}%"))
stmt = stmt.order_by(Tree.name).limit(min(limit, 100)).offset(max(offset, 0))
return list((await session.execute(stmt)).scalars().all())