Files
provenance/backend/app/services/source_service.py
T
justin cf5518c7ec Full-CRUD sweep: update endpoints for tree, source, citation, relationship, media
Closes the rule #8 gap at the API layer: PATCH endpoints + service updates for Tree (name/description/visibility), Source, Citation (page/detail/confidence), Relationship (qualifier/notes), and Media (title/attachment) — editor-gated and audited. Every core entity now has create/read/update/delete. Edit UIs for these land in the frontend batch. 37 tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-07 09:53:17 -04:00

149 lines
4.4 KiB
Python

"""Source service. Sources are reusable, tree-scoped records of an origin.
Writes require editor rights; reads go through the privacy engine."""
import uuid
from datetime import UTC, datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.source import Source
from app.models.tree import Tree
from app.models.user import User
from app.services import privacy
from app.services.audit import record_audit
from app.services.exceptions import Forbidden, NotFound
async def create_source(
session: AsyncSession,
*,
actor: User,
tree: Tree,
title: str,
author: str | None = None,
source_type: str | None = None,
repository: str | None = None,
url: str | None = None,
citation_text: str | None = None,
publication_info: str | None = None,
quality_note: str | None = None,
) -> Source:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
source = Source(
tree_id=tree.id,
title=title,
author=author,
source_type=source_type,
repository=repository,
url=url,
citation_text=citation_text,
publication_info=publication_info,
quality_note=quality_note,
)
session.add(source)
await session.flush()
record_audit(
session,
action="create",
entity_type="Source",
entity_id=source.id,
tree_id=tree.id,
actor_user_id=actor.id,
after={"title": title},
)
await session.commit()
await session.refresh(source)
return source
async def list_sources(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> list[Source]:
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
stmt = (
select(Source)
.where(Source.tree_id == tree.id, Source.deleted_at.is_(None))
.order_by(Source.title)
)
return list((await session.execute(stmt)).scalars().all())
async def get_source(
session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree, source_id: uuid.UUID
) -> Source:
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
source = (
await session.execute(
select(Source).where(
Source.id == source_id, Source.tree_id == tree.id, Source.deleted_at.is_(None)
)
)
).scalar_one_or_none()
if source is None:
raise NotFound("source not found")
return source
_SOURCE_FIELDS = {
"title", "author", "source_type", "repository", "url", "citation_text",
"publication_info", "quality_note",
}
async def update_source(
session: AsyncSession, *, actor: User, tree: Tree, source_id: uuid.UUID, changes: dict
) -> Source:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
source = (
await session.execute(
select(Source).where(
Source.id == source_id, Source.tree_id == tree.id, Source.deleted_at.is_(None)
)
)
).scalar_one_or_none()
if source is None:
raise NotFound("source not found")
for key in _SOURCE_FIELDS & changes.keys():
setattr(source, key, changes[key])
record_audit(
session,
action="update",
entity_type="Source",
entity_id=source.id,
tree_id=tree.id,
actor_user_id=actor.id,
after=changes,
)
await session.commit()
await session.refresh(source)
return source
async def delete_source(
session: AsyncSession, *, actor: User, tree: Tree, source_id: uuid.UUID
) -> None:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
source = (
await session.execute(
select(Source).where(
Source.id == source_id, Source.tree_id == tree.id, Source.deleted_at.is_(None)
)
)
).scalar_one_or_none()
if source is None:
raise NotFound("source not found")
source.deleted_at = datetime.now(UTC)
record_audit(
session,
action="delete",
entity_type="Source",
entity_id=source.id,
tree_id=tree.id,
actor_user_id=actor.id,
)
await session.commit()