Files
justin 8b91326481 Fix leak: redact per-person on authed non-member reads
A logged-in NON-member of a public/unlisted tree could read living people's
dates, real alternate names, and media (incl. downloading photos) through the
family-view endpoints — only the person LIST was redacted; list_events,
list_relationships, list_names, list_media gated on can_view_tree alone.

For non-members, these now delegate to the same visibility-filtered reads the
public surface uses (person_visibility-driven): living-person events/names
dropped, relationships touching a hidden person dropped, media limited to
full-visibility persons, and media download (get_media → media_content) 404s
for a redacted/unlinked person's media. Members are unchanged.

Adds list_public_relationships_for_person / list_public_media / can_view_media
to public_view_service. Test: an authed non-member sees no living-person PII
across events/names/relationships/media and can't download a living person's
file, while the owner still sees everything. Full suite: 72 passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-09 09:26:53 -04:00

216 lines
6.5 KiB
Python

"""Name service. A Person carries one or more Name rows — a primary (typically
the birth/maiden name) plus typed alternates (married, alias, religious, …).
Exactly one name is primary at a time; it drives display everywhere. Writes
require editor rights; reads go through the tree's view check.
"""
import uuid
from datetime import UTC, datetime
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.person import Name, Person
from app.models.tree import Tree
from app.models.user import User
from app.services import privacy
from app.services.audit import record_audit
from app.services.exceptions import Forbidden, NotFound
async def _get_person(session: AsyncSession, *, tree: Tree, person_id: uuid.UUID) -> Person:
person = (
await session.execute(
select(Person).where(
Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None)
)
)
).scalar_one_or_none()
if person is None:
raise NotFound("person not found")
return person
async def _clear_primary(
session: AsyncSession, *, person_id: uuid.UUID, keep: uuid.UUID | None
) -> None:
"""Demote every other name so exactly one stays primary."""
stmt = (
update(Name)
.where(Name.person_id == person_id, Name.deleted_at.is_(None), Name.is_primary.is_(True))
.values(is_primary=False)
)
if keep is not None:
stmt = stmt.where(Name.id != keep)
await session.execute(stmt)
async def list_names(
session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, person_id: uuid.UUID
) -> list[Name]:
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
await _get_person(session, tree=tree, person_id=person_id)
# Non-members: a redacted/hidden person's real names must not leak.
if await privacy.get_membership_role(session, viewer_id, tree.id) is None:
from app.services import public_view_service
return await public_view_service.list_public_person_names(
session, viewer_id=viewer_id, tree=tree, person_id=person_id
)
stmt = (
select(Name)
.where(Name.person_id == person_id, Name.deleted_at.is_(None))
.order_by(Name.is_primary.desc(), Name.sort_order, Name.created_at)
)
return list((await session.execute(stmt)).scalars().all())
async def create_name(
session: AsyncSession,
*,
actor: User,
tree: Tree,
person_id: uuid.UUID,
name_type: str = "birth",
given: str | None = None,
surname: str | None = None,
prefix: str | None = None,
suffix: str | None = None,
nickname: str | None = None,
is_primary: bool = False,
) -> Name:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
await _get_person(session, tree=tree, person_id=person_id)
# First name for a person is always primary; otherwise honor the flag.
existing = (
await session.execute(
select(Name.id).where(Name.person_id == person_id, Name.deleted_at.is_(None))
)
).first()
primary = is_primary or existing is None
if primary:
await _clear_primary(session, person_id=person_id, keep=None)
name = Name(
tree_id=tree.id,
person_id=person_id,
name_type=name_type,
given=given,
surname=surname,
prefix=prefix,
suffix=suffix,
nickname=nickname,
is_primary=primary,
)
session.add(name)
await session.flush()
record_audit(
session,
action="create",
entity_type="Name",
entity_id=name.id,
tree_id=tree.id,
actor_user_id=actor.id,
after={"name_type": name_type, "given": given, "surname": surname},
)
await session.commit()
await session.refresh(name)
return name
_NAME_FIELDS = {"name_type", "given", "surname", "prefix", "suffix", "nickname"}
async def update_name(
session: AsyncSession,
*,
actor: User,
tree: Tree,
person_id: uuid.UUID,
name_id: uuid.UUID,
changes: dict,
) -> Name:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
name = (
await session.execute(
select(Name).where(
Name.id == name_id,
Name.person_id == person_id,
Name.tree_id == tree.id,
Name.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if name is None:
raise NotFound("name not found")
for key in _NAME_FIELDS & changes.keys():
setattr(name, key, changes[key])
if changes.get("is_primary") is True:
await _clear_primary(session, person_id=person_id, keep=name.id)
name.is_primary = True
record_audit(
session,
action="update",
entity_type="Name",
entity_id=name.id,
tree_id=tree.id,
actor_user_id=actor.id,
after=changes,
)
await session.commit()
await session.refresh(name)
return name
async def delete_name(
session: AsyncSession,
*,
actor: User,
tree: Tree,
person_id: uuid.UUID,
name_id: uuid.UUID,
) -> None:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
name = (
await session.execute(
select(Name).where(
Name.id == name_id,
Name.person_id == person_id,
Name.tree_id == tree.id,
Name.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if name is None:
raise NotFound("name not found")
name.deleted_at = datetime.now(UTC)
was_primary = name.is_primary
name.is_primary = False
record_audit(
session,
action="delete",
entity_type="Name",
entity_id=name.id,
tree_id=tree.id,
actor_user_id=actor.id,
)
# Promote another name to primary so the person never loses their display name.
if was_primary:
nxt = (
await session.execute(
select(Name)
.where(Name.person_id == person_id, Name.deleted_at.is_(None))
.order_by(Name.sort_order, Name.created_at)
)
).scalars().first()
if nxt is not None:
nxt.is_primary = True
await session.commit()