Files
provenance/backend/app/services/public_view_service.py
T
justin a6179037c2 Close citation/source living-person leak; add on-demand tree purge
Two changes.

1. Privacy fix (NN#2/NN#3) — the citation and source list endpoints gated only
   on can_view_tree, so a non-member on a public/unlisted/site_members tree could
   enumerate citations and sources tied to a redacted living person, leaking that
   the person exists and has sourced facts (and possibly their name via a source
   title). #46 closed this for events/media/names/relationships but not
   citations/sources. Now citation_service.list_citations and
   source_service.{list_sources,get_source} delegate non-member reads to
   public_view_service, mirroring the #46 pattern:
   - citations: shown only when the cited fact resolves to FULL-visibility
     person(s) — covers the person_id, name_id, event_id (person or both-partner),
     and relationship_id (both-partner) target paths.
   - sources: shown only when they back at least one visible citation; a withheld
     source 404s (don't reveal it exists).
   Tests cover all four citation target types + source withholding + member-sees-all.

2. On-demand tree purge — owners can permanently delete a soft-deleted tree now
   instead of waiting out the 30-day auto-purge window. POST /trees/{id}/purge
   (owner-only): the tree must already be in the trash, and the caller retypes its
   name to confirm. Media objects are deleted from storage, then a single
   DELETE on trees cascades all tree-owned rows via the tree_id ON DELETE CASCADE;
   the audit entry survives (tree_id SET NULL). Frontend adds a "Delete forever"
   button to the Recently-deleted list. No migration.

Suite: 102 passing.
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-10 22:38:59 -04:00

411 lines
14 KiB
Python

"""Read-only, redaction-safe projections for the public viewing surface.
INVARIANT (CLAUDE.md #2): everything returned here has passed through
``privacy.person_visibility``. A non-member must never receive a possibly-living
person's real name, dates, alternate names, or media. The rules:
- persons : redacted (living → "Living person"); hidden dropped.
- relationships : only when BOTH endpoints are non-hidden (a link to a
redacted person is fine — the name is already hidden).
- events : only for FULL-visibility persons; partnership events only
when BOTH partners are full (a marriage date would leak a
living partner's timeline otherwise).
- names : only for FULL-visibility persons.
- media : NOT exposed yet (deferred — see docs/design/tree-visibility.md).
- citations : only when the cited fact resolves to FULL person(s).
- sources : only when they back at least one visible citation.
A tree that isn't viewable raises NotFound (never Forbidden) so the public
surface can't be used to probe whether a private tree exists.
"""
import uuid
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.enums import TreeVisibility
from app.models.event import Event
from app.models.media import Media
from app.models.person import Name, Person
from app.models.relationship import Relationship
from app.models.source import Citation, Source
from app.models.tree import Tree
from app.services import privacy
from app.services.exceptions import NotFound
from app.services.person_service import _attach_primary_name, _redact
from app.services.privacy import Visibility
async def get_public_tree(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree_id: uuid.UUID
) -> Tree:
tree = (
await session.execute(
select(Tree).where(Tree.id == tree_id, Tree.deleted_at.is_(None))
)
).scalar_one_or_none()
# 404 (not 403) when not viewable: don't reveal that a private tree exists.
if tree is None or not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise NotFound("tree not found")
return tree
async def _persons(session: AsyncSession, tree: Tree) -> list[Person]:
return list(
(
await session.execute(
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
)
)
.scalars()
.all()
)
async def _visibility_map(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, persons: list[Person]
) -> dict[uuid.UUID, Visibility]:
return {
p.id: await privacy.person_visibility(
session, user_id=viewer_id, tree=tree, person=p
)
for p in persons
}
async def list_public_persons(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Person]:
out: list[Person] = []
for p in await _persons(session, tree):
vis = await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=p)
if vis == Visibility.hidden:
continue
if vis == Visibility.redacted:
_redact(p)
else:
await _attach_primary_name(session, p)
out.append(p)
return out
async def get_public_person(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> Person:
person = (
await session.execute(
select(Person).where(
Person.id == person_id,
Person.tree_id == tree.id,
Person.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if person is None:
raise NotFound("person not found")
vis = await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person)
if vis == Visibility.hidden:
raise NotFound("person not found")
if vis == Visibility.redacted:
_redact(person)
else:
await _attach_primary_name(session, person)
return person
async def _person_visibility(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> Visibility | None:
person = (
await session.execute(
select(Person).where(
Person.id == person_id,
Person.tree_id == tree.id,
Person.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if person is None:
return None
return await privacy.person_visibility(session, user_id=viewer_id, tree=tree, person=person)
async def list_public_relationships(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Relationship]:
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
nonhidden = {pid for pid, v in vis.items() if v != Visibility.hidden}
rels = list(
(
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
)
)
)
.scalars()
.all()
)
return [
r for r in rels if r.person_from_id in nonhidden and r.person_to_id in nonhidden
]
async def list_public_events(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Event]:
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
full = {pid for pid, v in vis.items() if v == Visibility.full}
rels = {
r.id: r
for r in (
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
)
)
)
.scalars()
.all()
}
events = list(
(
await session.execute(
select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None))
)
)
.scalars()
.all()
)
out: list[Event] = []
for e in events:
if e.person_id is not None:
if e.person_id in full:
out.append(e)
elif e.relationship_id is not None:
r = rels.get(e.relationship_id)
if r is not None and r.person_from_id in full and r.person_to_id in full:
out.append(e)
return out
async def list_public_person_names(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> list[Name]:
vis = await _person_visibility(session, viewer_id=viewer_id, tree=tree, person_id=person_id)
if vis is None:
raise NotFound("person not found")
if vis != Visibility.full:
return [] # redacted/hidden → no names (the real name must not leak)
return list(
(
await session.execute(
select(Name)
.where(Name.person_id == person_id, Name.deleted_at.is_(None))
.order_by(Name.is_primary.desc(), Name.sort_order, Name.created_at)
)
)
.scalars()
.all()
)
async def list_public_person_events(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> list[Event]:
vis = await _person_visibility(session, viewer_id=viewer_id, tree=tree, person_id=person_id)
if vis is None:
raise NotFound("person not found")
if vis != Visibility.full:
return [] # redacted/hidden → no dates
return list(
(
await session.execute(
select(Event)
.where(
Event.person_id == person_id,
Event.tree_id == tree.id,
Event.deleted_at.is_(None),
)
.order_by(Event.date_start.nulls_last(), Event.created_at)
)
)
.scalars()
.all()
)
async def list_public_relationships_for_person(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, person_id: uuid.UUID
) -> list[Relationship]:
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
if vis.get(person_id) in (None, Visibility.hidden):
return []
nonhidden = {pid for pid, v in vis.items() if v != Visibility.hidden}
rels = list(
(
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id,
Relationship.deleted_at.is_(None),
or_(
Relationship.person_from_id == person_id,
Relationship.person_to_id == person_id,
),
)
)
)
.scalars()
.all()
)
return [r for r in rels if r.person_from_id in nonhidden and r.person_to_id in nonhidden]
async def list_public_media(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Media]:
"""Only media linked to a FULL-visibility person. Media without a person (or
linked only to an event/source) is not exposed to non-members — a photo of a
living person must never leak."""
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
full = {pid for pid, v in vis.items() if v == Visibility.full}
media = list(
(
await session.execute(
select(Media).where(Media.tree_id == tree.id, Media.deleted_at.is_(None))
)
)
.scalars()
.all()
)
return [m for m in media if m.person_id is not None and m.person_id in full]
async def can_view_media(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, media: Media
) -> bool:
"""Whether a non-member may see/download a single media item: only when it is
linked to a FULL-visibility person."""
if media.person_id is None:
return False
vis = await _person_visibility(
session, viewer_id=viewer_id, tree=tree, person_id=media.person_id
)
return vis == Visibility.full
async def _full_person_ids(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> set[uuid.UUID]:
persons = await _persons(session, tree)
vis = await _visibility_map(session, viewer_id=viewer_id, tree=tree, persons=persons)
return {pid for pid, v in vis.items() if v == Visibility.full}
async def list_public_citations(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Citation]:
"""Only citations whose cited fact resolves to FULL-visibility person(s). A
citation on a redacted/hidden person's fact (or a partnership where either
partner isn't full) is dropped — its existence plus page/detail would leak
that the person has that sourced fact. Mirrors the events/names rule (FULL
only)."""
full = await _full_person_ids(session, viewer_id=viewer_id, tree=tree)
async def _by_id(model):
rows = (
await session.execute(
select(model).where(model.tree_id == tree.id, model.deleted_at.is_(None))
)
).scalars().all()
return {r.id: r for r in rows}
names = await _by_id(Name)
rels = await _by_id(Relationship)
events = await _by_id(Event)
def target_is_full(c: Citation) -> bool:
if c.person_id is not None:
return c.person_id in full
if c.name_id is not None:
n = names.get(c.name_id)
return n is not None and n.person_id in full
if c.event_id is not None:
e = events.get(c.event_id)
if e is None:
return False
if e.person_id is not None:
return e.person_id in full
if e.relationship_id is not None:
r = rels.get(e.relationship_id)
return r is not None and r.person_from_id in full and r.person_to_id in full
return False
if c.relationship_id is not None:
r = rels.get(c.relationship_id)
return r is not None and r.person_from_id in full and r.person_to_id in full
return False
citations = (
await session.execute(
select(Citation)
.where(Citation.tree_id == tree.id, Citation.deleted_at.is_(None))
.order_by(Citation.created_at)
)
).scalars().all()
return [c for c in citations if target_is_full(c)]
async def list_public_sources(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree
) -> list[Source]:
"""Only sources backing at least one visible citation. A source used solely
for a redacted/hidden person's facts is withheld — its title or notes could
name that living person."""
visible = await list_public_citations(session, viewer_id=viewer_id, tree=tree)
cited = {c.source_id for c in visible}
sources = (
await session.execute(
select(Source)
.where(Source.tree_id == tree.id, Source.deleted_at.is_(None))
.order_by(Source.title)
)
).scalars().all()
return [s for s in sources if s.id in cited]
async def get_public_source(
session: AsyncSession, *, viewer_id: uuid.UUID | None, tree: Tree, source_id: uuid.UUID
) -> Source:
for s in await list_public_sources(session, viewer_id=viewer_id, tree=tree):
if s.id == source_id:
return s
# 404 (not 403): don't reveal that a withheld source exists.
raise NotFound("source not found")
async def list_public_trees(
session: AsyncSession,
*,
viewer_id: uuid.UUID | None,
q: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[Tree]:
# Anonymous: only `public`. Authenticated: also `site_members`. Never list
# `unlisted` (reachable by link only) or `private`.
allowed = [TreeVisibility.public]
if viewer_id is not None:
allowed.append(TreeVisibility.site_members)
stmt = select(Tree).where(
Tree.deleted_at.is_(None), Tree.visibility.in_(allowed)
)
if q and q.strip():
stmt = stmt.where(Tree.name.ilike(f"%{q.strip()}%"))
stmt = stmt.order_by(Tree.name).limit(min(limit, 100)).offset(max(offset, 0))
return list((await session.execute(stmt)).scalars().all())