Files
provenance/backend/app/services/relationship_service.py
T
justin fae1162ff8 Prevent duplicate relationships; harden tree render against bad graphs
Root cause of the blank Jung tree: a child double-linked to the same parent
(and, generally, any cycle) made family-chart recurse forever.

Backend (the real fix):
- create_relationship now rejects an equivalent existing edge → 409.
  parent_child is directional (parent→child); partnership/sibling match the
  pair in either order. So you can't link the same two people the same way
  twice. (GEDCOM import already deduped; manual creates didn't.)

Frontend (defense in depth so data can never blank the view):
- Tree view sanitizes the graph before rendering: dedupes parents/spouses,
  drops self-links, and greedily breaks ancestor cycles (a person can't be
  their own ancestor); children are derived from the kept edges. The render is
  wrapped in try/catch and shows a note instead of a blank canvas, telling you
  which conflicting links were skipped.
- Person page surfaces the 409 ("They're already linked that way.").

59 backend tests pass (incl. dup-rejection + reverse-parent-child allowed).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 11:35:11 -04:00

206 lines
7.0 KiB
Python

"""Relationship service. Typed, qualified edges between two Persons in a tree.
Writes require editor rights; reads go through the privacy engine."""
import uuid
from datetime import UTC, datetime
from sqlalchemy import and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.enums import ParentChildQualifier, RelationshipType
from app.models.person import Person
from app.models.relationship import Relationship
from app.models.tree import Tree
from app.models.user import User
from app.services import privacy
from app.services.audit import record_audit
from app.services.exceptions import Conflict, Forbidden, NotFound
async def _person_in_tree(session: AsyncSession, person_id: uuid.UUID, tree_id: uuid.UUID) -> bool:
row = (
await session.execute(
select(Person.id).where(
Person.id == person_id, Person.tree_id == tree_id, Person.deleted_at.is_(None)
)
)
).scalar_one_or_none()
return row is not None
async def create_relationship(
session: AsyncSession,
*,
actor: User,
tree: Tree,
type: RelationshipType,
person_from_id: uuid.UUID,
person_to_id: uuid.UUID,
qualifier: ParentChildQualifier | None = None,
notes: str | None = None,
) -> Relationship:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
if person_from_id == person_to_id:
raise Conflict("a relationship needs two different people")
if qualifier is not None and type is not RelationshipType.parent_child:
raise Conflict("qualifier only applies to parent_child relationships")
for pid in (person_from_id, person_to_id):
if not await _person_in_tree(session, pid, tree.id):
raise NotFound("person not found in this tree")
# Reject an equivalent existing edge so the same two people can't be linked
# the same way twice. parent_child is directional (parent -> child);
# partnership/sibling are symmetric, so match the pair in either order.
if type is RelationshipType.parent_child:
pair = and_(
Relationship.person_from_id == person_from_id,
Relationship.person_to_id == person_to_id,
)
else:
pair = or_(
and_(
Relationship.person_from_id == person_from_id,
Relationship.person_to_id == person_to_id,
),
and_(
Relationship.person_from_id == person_to_id,
Relationship.person_to_id == person_from_id,
),
)
existing = (
await session.execute(
select(Relationship.id).where(
Relationship.tree_id == tree.id,
Relationship.type == type,
Relationship.deleted_at.is_(None),
pair,
)
)
).scalar_one_or_none()
if existing is not None:
raise Conflict("these two people are already linked that way")
relationship = Relationship(
tree_id=tree.id,
type=type,
person_from_id=person_from_id,
person_to_id=person_to_id,
qualifier=qualifier,
notes=notes,
)
session.add(relationship)
await session.flush()
record_audit(
session,
action="create",
entity_type="Relationship",
entity_id=relationship.id,
tree_id=tree.id,
actor_user_id=actor.id,
after={"type": type.value, "from": str(person_from_id), "to": str(person_to_id)},
)
await session.commit()
await session.refresh(relationship)
return relationship
async def list_relationships(
session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree
) -> list[Relationship]:
"""All relationships in the tree — powers the family/pedigree view in one call."""
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
stmt = (
select(Relationship)
.where(Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None))
.order_by(Relationship.created_at)
)
return list((await session.execute(stmt)).scalars().all())
async def list_relationships_for_person(
session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, person_id: uuid.UUID
) -> list[Relationship]:
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
stmt = (
select(Relationship)
.where(
Relationship.tree_id == tree.id,
Relationship.deleted_at.is_(None),
or_(
Relationship.person_from_id == person_id,
Relationship.person_to_id == person_id,
),
)
.order_by(Relationship.created_at)
)
return list((await session.execute(stmt)).scalars().all())
async def update_relationship(
session: AsyncSession, *, actor: User, tree: Tree, relationship_id: uuid.UUID, changes: dict
) -> Relationship:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
relationship = (
await session.execute(
select(Relationship).where(
Relationship.id == relationship_id,
Relationship.tree_id == tree.id,
Relationship.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if relationship is None:
raise NotFound("relationship not found")
if (
"qualifier" in changes
and changes["qualifier"] is not None
and relationship.type is not RelationshipType.parent_child
):
raise Conflict("qualifier only applies to parent_child relationships")
for key in {"qualifier", "notes"} & changes.keys():
setattr(relationship, key, changes[key])
record_audit(
session,
action="update",
entity_type="Relationship",
entity_id=relationship.id,
tree_id=tree.id,
actor_user_id=actor.id,
after=changes,
)
await session.commit()
await session.refresh(relationship)
return relationship
async def delete_relationship(
session: AsyncSession, *, actor: User, tree: Tree, relationship_id: uuid.UUID
) -> None:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
relationship = (
await session.execute(
select(Relationship).where(
Relationship.id == relationship_id,
Relationship.tree_id == tree.id,
Relationship.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if relationship is None:
raise NotFound("relationship not found")
relationship.deleted_at = datetime.now(UTC)
record_audit(
session,
action="delete",
entity_type="Relationship",
entity_id=relationship.id,
tree_id=tree.id,
actor_user_id=actor.id,
)
await session.commit()