"""Source service. Sources are reusable, tree-scoped records of an origin. Writes require editor rights; reads go through the privacy engine.""" import uuid from datetime import UTC, datetime from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.source import Source from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Forbidden, NotFound async def create_source( session: AsyncSession, *, actor: User, tree: Tree, title: str, author: str | None = None, source_type: str | None = None, repository: str | None = None, url: str | None = None, citation_text: str | None = None, publication_info: str | None = None, quality_note: str | None = None, ) -> Source: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") source = Source( tree_id=tree.id, title=title, author=author, source_type=source_type, repository=repository, url=url, citation_text=citation_text, publication_info=publication_info, quality_note=quality_note, ) session.add(source) await session.flush() record_audit( session, action="create", entity_type="Source", entity_id=source.id, tree_id=tree.id, actor_user_id=actor.id, after={"title": title}, ) await session.commit() await session.refresh(source) return source async def list_sources(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> list[Source]: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") # Non-members see only sources backing a visible citation (see citation # redaction) — a source used solely for a redacted person could name them. if await privacy.get_membership_role(session, viewer_id, tree.id) is None: from app.services import public_view_service return await public_view_service.list_public_sources( session, viewer_id=viewer_id, tree=tree ) stmt = ( select(Source) .where(Source.tree_id == tree.id, Source.deleted_at.is_(None)) .order_by(Source.title) ) return list((await session.execute(stmt)).scalars().all()) async def get_source( session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, source_id: uuid.UUID ) -> Source: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") if await privacy.get_membership_role(session, viewer_id, tree.id) is None: from app.services import public_view_service return await public_view_service.get_public_source( session, viewer_id=viewer_id, tree=tree, source_id=source_id ) source = ( await session.execute( select(Source).where( Source.id == source_id, Source.tree_id == tree.id, Source.deleted_at.is_(None) ) ) ).scalar_one_or_none() if source is None: raise NotFound("source not found") return source _SOURCE_FIELDS = { "title", "author", "source_type", "repository", "url", "citation_text", "publication_info", "quality_note", } async def update_source( session: AsyncSession, *, actor: User, tree: Tree, source_id: uuid.UUID, changes: dict ) -> Source: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") source = ( await session.execute( select(Source).where( Source.id == source_id, Source.tree_id == tree.id, Source.deleted_at.is_(None) ) ) ).scalar_one_or_none() if source is None: raise NotFound("source not found") for key in _SOURCE_FIELDS & changes.keys(): setattr(source, key, changes[key]) record_audit( session, action="update", entity_type="Source", entity_id=source.id, tree_id=tree.id, actor_user_id=actor.id, after=changes, ) await session.commit() await session.refresh(source) return source async def delete_source( session: AsyncSession, *, actor: User, tree: Tree, source_id: uuid.UUID ) -> None: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") source = ( await session.execute( select(Source).where( Source.id == source_id, Source.tree_id == tree.id, Source.deleted_at.is_(None) ) ) ).scalar_one_or_none() if source is None: raise NotFound("source not found") source.deleted_at = datetime.now(UTC) record_audit( session, action="delete", entity_type="Source", entity_id=source.id, tree_id=tree.id, actor_user_id=actor.id, ) await session.commit()