"""Citation service. A citation links one Source to exactly one fact (person, event, name, or relationship) within a tree — the provenance spine.""" import uuid from datetime import UTC, datetime from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import CitationConfidence from app.models.event import Event from app.models.person import Name, Person from app.models.relationship import Relationship from app.models.source import Citation, Source from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Conflict, Forbidden, NotFound # Citation target column -> model, for tenant/existence validation. _TARGET_MODELS = { "person_id": Person, "event_id": Event, "name_id": Name, "relationship_id": Relationship, } async def _in_tree(session: AsyncSession, model: type, id_: uuid.UUID, tree_id: uuid.UUID) -> bool: row = ( await session.execute( select(model.id).where( model.id == id_, model.tree_id == tree_id, model.deleted_at.is_(None) ) ) ).scalar_one_or_none() return row is not None async def create_citation( session: AsyncSession, *, actor: User, tree: Tree, source_id: uuid.UUID, person_id: uuid.UUID | None = None, event_id: uuid.UUID | None = None, name_id: uuid.UUID | None = None, relationship_id: uuid.UUID | None = None, page: str | None = None, detail: str | None = None, confidence: CitationConfidence | None = None, ) -> Citation: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") targets = { "person_id": person_id, "event_id": event_id, "name_id": name_id, "relationship_id": relationship_id, } set_targets = {k: v for k, v in targets.items() if v is not None} if len(set_targets) != 1: raise Conflict("a citation must reference exactly one fact") if not await _in_tree(session, Source, source_id, tree.id): raise NotFound("source not found in this tree") (target_col, target_id), = set_targets.items() if not await _in_tree(session, _TARGET_MODELS[target_col], target_id, tree.id): raise NotFound("cited fact not found in this tree") citation = Citation( tree_id=tree.id, source_id=source_id, person_id=person_id, event_id=event_id, name_id=name_id, relationship_id=relationship_id, page=page, detail=detail, confidence=confidence, ) session.add(citation) await session.flush() record_audit( session, action="create", entity_type="Citation", entity_id=citation.id, tree_id=tree.id, actor_user_id=actor.id, after={"source_id": str(source_id), target_col: str(target_id)}, ) await session.commit() await session.refresh(citation) return citation async def list_citations( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree ) -> list[Citation]: """All citations in the tree — the UI maps them to facts to show 'sourced' indicators in a single round-trip.""" if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") stmt = ( select(Citation) .where(Citation.tree_id == tree.id, Citation.deleted_at.is_(None)) .order_by(Citation.created_at) ) return list((await session.execute(stmt)).scalars().all()) async def delete_citation( session: AsyncSession, *, actor: User, tree: Tree, citation_id: uuid.UUID ) -> None: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") citation = ( await session.execute( select(Citation).where( Citation.id == citation_id, Citation.tree_id == tree.id, Citation.deleted_at.is_(None), ) ) ).scalar_one_or_none() if citation is None: raise NotFound("citation not found") citation.deleted_at = datetime.now(UTC) record_audit( session, action="delete", entity_type="Citation", entity_id=citation.id, tree_id=tree.id, actor_user_id=actor.id, ) await session.commit()