"""Event service. Writes require editor rights; reads go through the privacy engine. Every event has exactly one subject — a Person or a partnership.""" import uuid from datetime import date from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.event import Event from app.models.person import Person from app.models.place import Place from app.models.relationship import Relationship from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Conflict, Forbidden, NotFound async def _belongs_to_tree( session: AsyncSession, model: type, id_: uuid.UUID, tree_id: uuid.UUID ) -> bool: row = ( await session.execute( select(model.id).where( model.id == id_, model.tree_id == tree_id, model.deleted_at.is_(None) ) ) ).scalar_one_or_none() return row is not None async def create_event( session: AsyncSession, *, actor: User, tree: Tree, event_type: str, person_id: uuid.UUID | None = None, relationship_id: uuid.UUID | None = None, place_id: uuid.UUID | None = None, date_value: str | None = None, date_start: date | None = None, date_end: date | None = None, date_precision: str | None = None, calendar: str = "gregorian", detail: str | None = None, notes: str | None = None, ) -> Event: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") if bool(person_id) == bool(relationship_id): raise Conflict("an event needs exactly one subject: person_id or relationship_id") if person_id and not await _belongs_to_tree(session, Person, person_id, tree.id): raise NotFound("person not found in this tree") if relationship_id and not await _belongs_to_tree( session, Relationship, relationship_id, tree.id ): raise NotFound("relationship not found in this tree") if place_id and not await _belongs_to_tree(session, Place, place_id, tree.id): raise NotFound("place not found in this tree") event = Event( tree_id=tree.id, event_type=event_type, person_id=person_id, relationship_id=relationship_id, place_id=place_id, date_value=date_value, date_start=date_start, date_end=date_end, date_precision=date_precision, calendar=calendar, detail=detail, notes=notes, ) session.add(event) await session.flush() record_audit( session, action="create", entity_type="Event", entity_id=event.id, tree_id=tree.id, actor_user_id=actor.id, after={"event_type": event_type, "person_id": str(person_id) if person_id else None}, ) await session.commit() await session.refresh(event) return event async def list_events_for_person( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, person_id: uuid.UUID ) -> list[Event]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") stmt = ( select(Event) .where( Event.tree_id == tree.id, Event.person_id == person_id, Event.deleted_at.is_(None), ) .order_by(Event.date_start.nulls_last(), Event.created_at) ) return list((await session.execute(stmt)).scalars().all()) async def delete_event( session: AsyncSession, *, actor: User, tree: Tree, event_id: uuid.UUID ) -> None: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") event = ( await session.execute( select(Event).where( Event.id == event_id, Event.tree_id == tree.id, Event.deleted_at.is_(None) ) ) ).scalar_one_or_none() if event is None: raise NotFound("event not found") from datetime import UTC, datetime event.deleted_at = datetime.now(UTC) record_audit( session, action="delete", entity_type="Event", entity_id=event.id, tree_id=tree.id, actor_user_id=actor.id, ) await session.commit()