8b91326481
A logged-in NON-member of a public/unlisted tree could read living people's dates, real alternate names, and media (incl. downloading photos) through the family-view endpoints — only the person LIST was redacted; list_events, list_relationships, list_names, list_media gated on can_view_tree alone. For non-members, these now delegate to the same visibility-filtered reads the public surface uses (person_visibility-driven): living-person events/names dropped, relationships touching a hidden person dropped, media limited to full-visibility persons, and media download (get_media → media_content) 404s for a redacted/unlinked person's media. Members are unchanged. Adds list_public_relationships_for_person / list_public_media / can_view_media to public_view_service. Test: an authed non-member sees no living-person PII across events/names/relationships/media and can't download a living person's file, while the owner still sees everything. Full suite: 72 passed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Paul <justin@jpaul.me>
216 lines
6.5 KiB
Python
216 lines
6.5 KiB
Python
"""Name service. A Person carries one or more Name rows — a primary (typically
|
|
the birth/maiden name) plus typed alternates (married, alias, religious, …).
|
|
Exactly one name is primary at a time; it drives display everywhere. Writes
|
|
require editor rights; reads go through the tree's view check.
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import UTC, datetime
|
|
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.person import Name, Person
|
|
from app.models.tree import Tree
|
|
from app.models.user import User
|
|
from app.services import privacy
|
|
from app.services.audit import record_audit
|
|
from app.services.exceptions import Forbidden, NotFound
|
|
|
|
|
|
async def _get_person(session: AsyncSession, *, tree: Tree, person_id: uuid.UUID) -> Person:
|
|
person = (
|
|
await session.execute(
|
|
select(Person).where(
|
|
Person.id == person_id, Person.tree_id == tree.id, Person.deleted_at.is_(None)
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if person is None:
|
|
raise NotFound("person not found")
|
|
return person
|
|
|
|
|
|
async def _clear_primary(
|
|
session: AsyncSession, *, person_id: uuid.UUID, keep: uuid.UUID | None
|
|
) -> None:
|
|
"""Demote every other name so exactly one stays primary."""
|
|
stmt = (
|
|
update(Name)
|
|
.where(Name.person_id == person_id, Name.deleted_at.is_(None), Name.is_primary.is_(True))
|
|
.values(is_primary=False)
|
|
)
|
|
if keep is not None:
|
|
stmt = stmt.where(Name.id != keep)
|
|
await session.execute(stmt)
|
|
|
|
|
|
async def list_names(
|
|
session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, person_id: uuid.UUID
|
|
) -> list[Name]:
|
|
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
|
|
raise Forbidden("not permitted to view this tree")
|
|
await _get_person(session, tree=tree, person_id=person_id)
|
|
# Non-members: a redacted/hidden person's real names must not leak.
|
|
if await privacy.get_membership_role(session, viewer_id, tree.id) is None:
|
|
from app.services import public_view_service
|
|
|
|
return await public_view_service.list_public_person_names(
|
|
session, viewer_id=viewer_id, tree=tree, person_id=person_id
|
|
)
|
|
stmt = (
|
|
select(Name)
|
|
.where(Name.person_id == person_id, Name.deleted_at.is_(None))
|
|
.order_by(Name.is_primary.desc(), Name.sort_order, Name.created_at)
|
|
)
|
|
return list((await session.execute(stmt)).scalars().all())
|
|
|
|
|
|
async def create_name(
|
|
session: AsyncSession,
|
|
*,
|
|
actor: User,
|
|
tree: Tree,
|
|
person_id: uuid.UUID,
|
|
name_type: str = "birth",
|
|
given: str | None = None,
|
|
surname: str | None = None,
|
|
prefix: str | None = None,
|
|
suffix: str | None = None,
|
|
nickname: str | None = None,
|
|
is_primary: bool = False,
|
|
) -> Name:
|
|
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
|
raise Forbidden("not an editor of this tree")
|
|
await _get_person(session, tree=tree, person_id=person_id)
|
|
|
|
# First name for a person is always primary; otherwise honor the flag.
|
|
existing = (
|
|
await session.execute(
|
|
select(Name.id).where(Name.person_id == person_id, Name.deleted_at.is_(None))
|
|
)
|
|
).first()
|
|
primary = is_primary or existing is None
|
|
if primary:
|
|
await _clear_primary(session, person_id=person_id, keep=None)
|
|
|
|
name = Name(
|
|
tree_id=tree.id,
|
|
person_id=person_id,
|
|
name_type=name_type,
|
|
given=given,
|
|
surname=surname,
|
|
prefix=prefix,
|
|
suffix=suffix,
|
|
nickname=nickname,
|
|
is_primary=primary,
|
|
)
|
|
session.add(name)
|
|
await session.flush()
|
|
record_audit(
|
|
session,
|
|
action="create",
|
|
entity_type="Name",
|
|
entity_id=name.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
after={"name_type": name_type, "given": given, "surname": surname},
|
|
)
|
|
await session.commit()
|
|
await session.refresh(name)
|
|
return name
|
|
|
|
|
|
_NAME_FIELDS = {"name_type", "given", "surname", "prefix", "suffix", "nickname"}
|
|
|
|
|
|
async def update_name(
|
|
session: AsyncSession,
|
|
*,
|
|
actor: User,
|
|
tree: Tree,
|
|
person_id: uuid.UUID,
|
|
name_id: uuid.UUID,
|
|
changes: dict,
|
|
) -> Name:
|
|
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
|
raise Forbidden("not an editor of this tree")
|
|
name = (
|
|
await session.execute(
|
|
select(Name).where(
|
|
Name.id == name_id,
|
|
Name.person_id == person_id,
|
|
Name.tree_id == tree.id,
|
|
Name.deleted_at.is_(None),
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if name is None:
|
|
raise NotFound("name not found")
|
|
|
|
for key in _NAME_FIELDS & changes.keys():
|
|
setattr(name, key, changes[key])
|
|
if changes.get("is_primary") is True:
|
|
await _clear_primary(session, person_id=person_id, keep=name.id)
|
|
name.is_primary = True
|
|
|
|
record_audit(
|
|
session,
|
|
action="update",
|
|
entity_type="Name",
|
|
entity_id=name.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
after=changes,
|
|
)
|
|
await session.commit()
|
|
await session.refresh(name)
|
|
return name
|
|
|
|
|
|
async def delete_name(
|
|
session: AsyncSession,
|
|
*,
|
|
actor: User,
|
|
tree: Tree,
|
|
person_id: uuid.UUID,
|
|
name_id: uuid.UUID,
|
|
) -> None:
|
|
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
|
raise Forbidden("not an editor of this tree")
|
|
name = (
|
|
await session.execute(
|
|
select(Name).where(
|
|
Name.id == name_id,
|
|
Name.person_id == person_id,
|
|
Name.tree_id == tree.id,
|
|
Name.deleted_at.is_(None),
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if name is None:
|
|
raise NotFound("name not found")
|
|
name.deleted_at = datetime.now(UTC)
|
|
was_primary = name.is_primary
|
|
name.is_primary = False
|
|
record_audit(
|
|
session,
|
|
action="delete",
|
|
entity_type="Name",
|
|
entity_id=name.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
)
|
|
# Promote another name to primary so the person never loses their display name.
|
|
if was_primary:
|
|
nxt = (
|
|
await session.execute(
|
|
select(Name)
|
|
.where(Name.person_id == person_id, Name.deleted_at.is_(None))
|
|
.order_by(Name.sort_order, Name.created_at)
|
|
)
|
|
).scalars().first()
|
|
if nxt is not None:
|
|
nxt.is_primary = True
|
|
await session.commit()
|