Files
provenance/backend/app/services/source_service.py
justin 064bb6ea65 Add sources and citations API (Phase 1: sources-first spine)
Source CRUD (reusable, tree-scoped) and Citation create/list/soft-delete linking one source to exactly one fact (person/event/name/relationship). Editor-gated writes, privacy-filtered reads, audit throughout; tenant + existence validation on source and target. list_citations returns all tree citations so the UI can render 'sourced' indicators in one round-trip. 22 tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-06 13:17:33 -04:00

113 lines
3.4 KiB
Python

"""Source service. Sources are reusable, tree-scoped records of an origin.
Writes require editor rights; reads go through the privacy engine."""
import uuid
from datetime import UTC, datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.source import Source
from app.models.tree import Tree
from app.models.user import User
from app.services import privacy
from app.services.audit import record_audit
from app.services.exceptions import Forbidden, NotFound
async def create_source(
session: AsyncSession,
*,
actor: User,
tree: Tree,
title: str,
author: str | None = None,
source_type: str | None = None,
repository: str | None = None,
url: str | None = None,
citation_text: str | None = None,
publication_info: str | None = None,
quality_note: str | None = None,
) -> Source:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
source = Source(
tree_id=tree.id,
title=title,
author=author,
source_type=source_type,
repository=repository,
url=url,
citation_text=citation_text,
publication_info=publication_info,
quality_note=quality_note,
)
session.add(source)
await session.flush()
record_audit(
session,
action="create",
entity_type="Source",
entity_id=source.id,
tree_id=tree.id,
actor_user_id=actor.id,
after={"title": title},
)
await session.commit()
await session.refresh(source)
return source
async def list_sources(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> list[Source]:
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
stmt = (
select(Source)
.where(Source.tree_id == tree.id, Source.deleted_at.is_(None))
.order_by(Source.title)
)
return list((await session.execute(stmt)).scalars().all())
async def get_source(
session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, source_id: uuid.UUID
) -> Source:
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
source = (
await session.execute(
select(Source).where(
Source.id == source_id, Source.tree_id == tree.id, Source.deleted_at.is_(None)
)
)
).scalar_one_or_none()
if source is None:
raise NotFound("source not found")
return source
async def delete_source(
session: AsyncSession, *, actor: User, tree: Tree, source_id: uuid.UUID
) -> None:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
source = (
await session.execute(
select(Source).where(
Source.id == source_id, Source.tree_id == tree.id, Source.deleted_at.is_(None)
)
)
).scalar_one_or_none()
if source is None:
raise NotFound("source not found")
source.deleted_at = datetime.now(UTC)
record_audit(
session,
action="delete",
entity_type="Source",
entity_id=source.id,
tree_id=tree.id,
actor_user_id=actor.id,
)
await session.commit()