Add media (object storage) and the background worker (Phase 1)
Media model + migration; an ObjectStore interface with an S3/MinIO (boto3) implementation behind the service layer. Upload (multipart) stores bytes in object storage + a metadata row (checksum, size, content-type, optional attach to person/event/source); list returns presigned URLs; delete is soft. Editor-gated, privacy-filtered, audited. 24 tests pass (object store faked). Introduces the worker container (same image, 'python -m app.worker'): its first job is the scheduled 30-day soft-delete purge across tables + media object cleanup. Compose gains worker + S3 env on backend/worker; dev override builds the worker too. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Paul <justin@jpaul.me>
This commit is contained in:
@@ -10,6 +10,8 @@ from app.core.db import get_session
|
||||
from app.integrations.mailer.base import Mailer
|
||||
from app.integrations.mailer.console import ConsoleMailer
|
||||
from app.integrations.mailer.smtp import SMTPMailer
|
||||
from app.integrations.objectstore.base import ObjectStore
|
||||
from app.integrations.objectstore.s3 import S3ObjectStore
|
||||
from app.models.user import User
|
||||
from app.services import auth_service
|
||||
|
||||
@@ -46,3 +48,10 @@ def get_mailer() -> Mailer:
|
||||
|
||||
|
||||
MailerDep = Annotated[Mailer, Depends(get_mailer)]
|
||||
|
||||
|
||||
def get_objectstore() -> ObjectStore:
|
||||
return S3ObjectStore(get_settings())
|
||||
|
||||
|
||||
ObjectStoreDep = Annotated[ObjectStore, Depends(get_objectstore)]
|
||||
|
||||
@@ -6,6 +6,7 @@ from app.api.v1 import (
|
||||
auth,
|
||||
citations,
|
||||
events,
|
||||
media,
|
||||
persons,
|
||||
relationships,
|
||||
sources,
|
||||
@@ -22,3 +23,4 @@ api_router.include_router(events.router)
|
||||
api_router.include_router(relationships.router)
|
||||
api_router.include_router(sources.router)
|
||||
api_router.include_router(citations.router)
|
||||
api_router.include_router(media.router)
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
import uuid
|
||||
|
||||
from fastapi import APIRouter, File, Form, UploadFile, status
|
||||
|
||||
from app.api.deps import CurrentUser, ObjectStoreDep, SessionDep
|
||||
from app.schemas.media import MediaRead
|
||||
from app.services import media_service, tree_service
|
||||
|
||||
router = APIRouter(prefix="/trees", tags=["media"])
|
||||
|
||||
|
||||
def _with_url(media, url: str) -> MediaRead:
|
||||
out = MediaRead.model_validate(media)
|
||||
out.url = url
|
||||
return out
|
||||
|
||||
|
||||
@router.post("/{tree_id}/media", response_model=MediaRead, status_code=status.HTTP_201_CREATED)
|
||||
async def upload_media(
|
||||
tree_id: uuid.UUID,
|
||||
session: SessionDep,
|
||||
current: CurrentUser,
|
||||
store: ObjectStoreDep,
|
||||
file: UploadFile = File(...),
|
||||
title: str | None = Form(None),
|
||||
person_id: uuid.UUID | None = Form(None),
|
||||
event_id: uuid.UUID | None = Form(None),
|
||||
source_id: uuid.UUID | None = Form(None),
|
||||
) -> MediaRead:
|
||||
tree = await tree_service.get_tree(session, viewer_id=current.id, tree_id=tree_id)
|
||||
data = await file.read()
|
||||
media = await media_service.upload_media(
|
||||
session,
|
||||
store,
|
||||
actor=current,
|
||||
tree=tree,
|
||||
data=data,
|
||||
filename=file.filename or "upload",
|
||||
content_type=file.content_type or "application/octet-stream",
|
||||
title=title,
|
||||
person_id=person_id,
|
||||
event_id=event_id,
|
||||
source_id=source_id,
|
||||
)
|
||||
return _with_url(media, await store.presigned_get_url(key=media.storage_key))
|
||||
|
||||
|
||||
@router.get("/{tree_id}/media", response_model=list[MediaRead])
|
||||
async def list_media(
|
||||
tree_id: uuid.UUID, session: SessionDep, current: CurrentUser, store: ObjectStoreDep
|
||||
) -> list[MediaRead]:
|
||||
tree = await tree_service.get_tree(session, viewer_id=current.id, tree_id=tree_id)
|
||||
items = await media_service.list_media(session, viewer_id=current.id, tree=tree)
|
||||
return [_with_url(m, await store.presigned_get_url(key=m.storage_key)) for m in items]
|
||||
|
||||
|
||||
@router.delete("/{tree_id}/media/{media_id}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def delete_media(
|
||||
tree_id: uuid.UUID, media_id: uuid.UUID, session: SessionDep, current: CurrentUser
|
||||
) -> None:
|
||||
tree = await tree_service.get_tree(session, viewer_id=current.id, tree_id=tree_id)
|
||||
await media_service.delete_media(session, actor=current, tree=tree, media_id=media_id)
|
||||
@@ -35,6 +35,18 @@ class Settings(BaseSettings):
|
||||
# Base URL used to build links in outbound email.
|
||||
app_base_url: str = "http://localhost"
|
||||
|
||||
# --- Object storage (S3-compatible / MinIO) ---
|
||||
s3_endpoint_url: str = "http://minio:9000"
|
||||
s3_bucket: str = "provenance"
|
||||
s3_access_key: str = "provenance"
|
||||
s3_secret_key: str = "change-me-too"
|
||||
s3_region: str = "us-east-1"
|
||||
s3_presign_ttl: int = 3600 # seconds
|
||||
|
||||
# --- Worker ---
|
||||
purge_interval_seconds: int = 3600 # how often to run the soft-delete purge
|
||||
purge_after_days: int = 30 # soft-deleted rows older than this are purged
|
||||
|
||||
# --- Email (SMTP) ---
|
||||
mailer: str = Field(default="console", description="console | smtp")
|
||||
smtp_host: str | None = None
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
"""ObjectStore interface — pluggable binary storage behind the service layer.
|
||||
|
||||
Implementations are S3-compatible (MinIO for self-host, any S3 otherwise).
|
||||
Methods are async wrappers so the service layer stays non-blocking even though
|
||||
the underlying SDK (boto3) is synchronous.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class ObjectStore(ABC):
|
||||
@abstractmethod
|
||||
async def ensure_bucket(self) -> None: ...
|
||||
|
||||
@abstractmethod
|
||||
async def put_object(self, *, key: str, data: bytes, content_type: str) -> None: ...
|
||||
|
||||
@abstractmethod
|
||||
async def presigned_get_url(self, *, key: str) -> str: ...
|
||||
|
||||
@abstractmethod
|
||||
async def delete_object(self, *, key: str) -> None: ...
|
||||
@@ -0,0 +1,56 @@
|
||||
"""S3-compatible ObjectStore (boto3), suitable for MinIO or any S3 provider.
|
||||
|
||||
boto3 is synchronous; each call is dispatched to a thread so request handlers
|
||||
and the worker stay async."""
|
||||
|
||||
import asyncio
|
||||
|
||||
import boto3
|
||||
from botocore.client import Config
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from app.core.config import Settings
|
||||
from app.integrations.objectstore.base import ObjectStore
|
||||
|
||||
|
||||
class S3ObjectStore(ObjectStore):
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.bucket = settings.s3_bucket
|
||||
self.presign_ttl = settings.s3_presign_ttl
|
||||
self._client = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=settings.s3_endpoint_url,
|
||||
aws_access_key_id=settings.s3_access_key,
|
||||
aws_secret_access_key=settings.s3_secret_key,
|
||||
region_name=settings.s3_region,
|
||||
config=Config(signature_version="s3v4"),
|
||||
)
|
||||
|
||||
def _ensure_bucket_sync(self) -> None:
|
||||
try:
|
||||
self._client.head_bucket(Bucket=self.bucket)
|
||||
except ClientError:
|
||||
self._client.create_bucket(Bucket=self.bucket)
|
||||
|
||||
async def ensure_bucket(self) -> None:
|
||||
await asyncio.to_thread(self._ensure_bucket_sync)
|
||||
|
||||
async def put_object(self, *, key: str, data: bytes, content_type: str) -> None:
|
||||
await asyncio.to_thread(
|
||||
self._client.put_object,
|
||||
Bucket=self.bucket,
|
||||
Key=key,
|
||||
Body=data,
|
||||
ContentType=content_type,
|
||||
)
|
||||
|
||||
async def presigned_get_url(self, *, key: str) -> str:
|
||||
return await asyncio.to_thread(
|
||||
self._client.generate_presigned_url,
|
||||
"get_object",
|
||||
Params={"Bucket": self.bucket, "Key": key},
|
||||
ExpiresIn=self.presign_ttl,
|
||||
)
|
||||
|
||||
async def delete_object(self, *, key: str) -> None:
|
||||
await asyncio.to_thread(self._client.delete_object, Bucket=self.bucket, Key=key)
|
||||
@@ -5,6 +5,7 @@ from app.models.audit import AuditEntry
|
||||
from app.models.auth import Session, UserToken
|
||||
from app.models.base import Base
|
||||
from app.models.event import Event
|
||||
from app.models.media import Media
|
||||
from app.models.person import Name, Person
|
||||
from app.models.place import Place, PlaceName
|
||||
from app.models.relationship import Relationship
|
||||
@@ -28,4 +29,5 @@ __all__ = [
|
||||
"AuditEntry",
|
||||
"Session",
|
||||
"UserToken",
|
||||
"Media",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
"""Media — a binary asset (image, scan, PDF, audio) in object storage. The row
|
||||
holds metadata + checksum + the storage key; the bytes live in the ObjectStore.
|
||||
Optionally attached to a single fact (person, event, or source) for now."""
|
||||
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import BigInteger, ForeignKey, String
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from app.models.base import Base
|
||||
from app.models.mixins import SoftDelete, TenantScoped, Timestamps, UUIDPrimaryKey
|
||||
|
||||
|
||||
class Media(Base, UUIDPrimaryKey, TenantScoped, Timestamps, SoftDelete):
|
||||
__tablename__ = "media"
|
||||
|
||||
uploader_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
ForeignKey("users.id", ondelete="SET NULL"), index=True
|
||||
)
|
||||
storage_key: Mapped[str] = mapped_column(String(512), unique=True)
|
||||
original_filename: Mapped[str] = mapped_column(String(512))
|
||||
content_type: Mapped[str] = mapped_column(String(128))
|
||||
byte_size: Mapped[int] = mapped_column(BigInteger)
|
||||
checksum_sha256: Mapped[str] = mapped_column(String(64), index=True)
|
||||
title: Mapped[str | None] = mapped_column(String(512))
|
||||
|
||||
# Optional single attachment target.
|
||||
person_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
ForeignKey("persons.id", ondelete="SET NULL"), index=True
|
||||
)
|
||||
event_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
ForeignKey("events.id", ondelete="SET NULL"), index=True
|
||||
)
|
||||
source_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
ForeignKey("sources.id", ondelete="SET NULL"), index=True
|
||||
)
|
||||
@@ -0,0 +1,22 @@
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
|
||||
class MediaRead(BaseModel):
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
id: uuid.UUID
|
||||
tree_id: uuid.UUID
|
||||
original_filename: str
|
||||
content_type: str
|
||||
byte_size: int
|
||||
checksum_sha256: str
|
||||
title: str | None
|
||||
person_id: uuid.UUID | None
|
||||
event_id: uuid.UUID | None
|
||||
source_id: uuid.UUID | None
|
||||
created_at: datetime
|
||||
# Presigned download URL, filled in by the router from the ObjectStore.
|
||||
url: str | None = None
|
||||
@@ -0,0 +1,107 @@
|
||||
"""Media service. Bytes go to the ObjectStore; a metadata row goes to the DB.
|
||||
Writes require editor rights; reads go through the privacy engine."""
|
||||
|
||||
import hashlib
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.integrations.objectstore.base import ObjectStore
|
||||
from app.models.media import Media
|
||||
from app.models.tree import Tree
|
||||
from app.models.user import User
|
||||
from app.services import privacy
|
||||
from app.services.audit import record_audit
|
||||
from app.services.exceptions import Forbidden, NotFound
|
||||
|
||||
|
||||
async def upload_media(
|
||||
session: AsyncSession,
|
||||
store: ObjectStore,
|
||||
*,
|
||||
actor: User,
|
||||
tree: Tree,
|
||||
data: bytes,
|
||||
filename: str,
|
||||
content_type: str,
|
||||
title: str | None = None,
|
||||
person_id: uuid.UUID | None = None,
|
||||
event_id: uuid.UUID | None = None,
|
||||
source_id: uuid.UUID | None = None,
|
||||
) -> Media:
|
||||
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
||||
raise Forbidden("not an editor of this tree")
|
||||
|
||||
media_id = uuid.uuid4()
|
||||
key = f"{tree.id}/{media_id}/{filename}"
|
||||
await store.ensure_bucket()
|
||||
await store.put_object(key=key, data=data, content_type=content_type)
|
||||
|
||||
media = Media(
|
||||
id=media_id,
|
||||
tree_id=tree.id,
|
||||
uploader_id=actor.id,
|
||||
storage_key=key,
|
||||
original_filename=filename,
|
||||
content_type=content_type,
|
||||
byte_size=len(data),
|
||||
checksum_sha256=hashlib.sha256(data).hexdigest(),
|
||||
title=title,
|
||||
person_id=person_id,
|
||||
event_id=event_id,
|
||||
source_id=source_id,
|
||||
)
|
||||
session.add(media)
|
||||
await session.flush()
|
||||
record_audit(
|
||||
session,
|
||||
action="create",
|
||||
entity_type="Media",
|
||||
entity_id=media.id,
|
||||
tree_id=tree.id,
|
||||
actor_user_id=actor.id,
|
||||
after={"filename": filename, "bytes": len(data)},
|
||||
)
|
||||
await session.commit()
|
||||
await session.refresh(media)
|
||||
return media
|
||||
|
||||
|
||||
async def list_media(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> list[Media]:
|
||||
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
|
||||
raise Forbidden("not permitted to view this tree")
|
||||
stmt = (
|
||||
select(Media)
|
||||
.where(Media.tree_id == tree.id, Media.deleted_at.is_(None))
|
||||
.order_by(Media.created_at.desc())
|
||||
)
|
||||
return list((await session.execute(stmt)).scalars().all())
|
||||
|
||||
|
||||
async def delete_media(
|
||||
session: AsyncSession, *, actor: User, tree: Tree, media_id: uuid.UUID
|
||||
) -> None:
|
||||
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
||||
raise Forbidden("not an editor of this tree")
|
||||
media = (
|
||||
await session.execute(
|
||||
select(Media).where(
|
||||
Media.id == media_id, Media.tree_id == tree.id, Media.deleted_at.is_(None)
|
||||
)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if media is None:
|
||||
raise NotFound("media not found")
|
||||
# Soft delete the row; the object is removed by the worker's purge job.
|
||||
media.deleted_at = datetime.now(UTC)
|
||||
record_audit(
|
||||
session,
|
||||
action="delete",
|
||||
entity_type="Media",
|
||||
entity_id=media.id,
|
||||
tree_id=tree.id,
|
||||
actor_user_id=actor.id,
|
||||
)
|
||||
await session.commit()
|
||||
@@ -0,0 +1,103 @@
|
||||
"""Background worker. Same image as the backend, run in worker mode
|
||||
(`python -m app.worker`). First job: the scheduled soft-delete purge — hard-
|
||||
delete rows whose ``deleted_at`` is older than the recovery window, and remove
|
||||
their objects from storage. More jobs (media processing, scraping, hints) and a
|
||||
proper queue arrive in later phases.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
from sqlalchemy import delete, select
|
||||
|
||||
from app.core.config import get_settings
|
||||
from app.core.db import get_sessionmaker
|
||||
from app.integrations.objectstore.s3 import S3ObjectStore
|
||||
from app.models import (
|
||||
Citation,
|
||||
Event,
|
||||
Media,
|
||||
Name,
|
||||
Person,
|
||||
Place,
|
||||
PlaceName,
|
||||
Relationship,
|
||||
Source,
|
||||
Tree,
|
||||
User,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("provenance.worker")
|
||||
|
||||
# Child -> parent so foreign keys are satisfied as rows are removed.
|
||||
_PURGE_ORDER = [Citation, Name, Event, Relationship, PlaceName, Place, Source, Person, Tree, User]
|
||||
|
||||
|
||||
async def _purge_media(sessionmaker, store, cutoff: datetime) -> None:
|
||||
async with sessionmaker() as session:
|
||||
rows = (
|
||||
await session.execute(
|
||||
select(Media).where(Media.deleted_at.is_not(None), Media.deleted_at < cutoff)
|
||||
)
|
||||
).scalars().all()
|
||||
for media in rows:
|
||||
try:
|
||||
await store.delete_object(key=media.storage_key)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("object delete failed for %s: %s", media.storage_key, exc)
|
||||
await session.delete(media)
|
||||
await session.commit()
|
||||
if rows:
|
||||
logger.info("purged %d media", len(rows))
|
||||
|
||||
|
||||
async def _purge_table(sessionmaker, model, cutoff: datetime) -> None:
|
||||
async with sessionmaker() as session:
|
||||
try:
|
||||
res = await session.execute(
|
||||
delete(model).where(model.deleted_at.is_not(None), model.deleted_at < cutoff)
|
||||
)
|
||||
await session.commit()
|
||||
if res.rowcount:
|
||||
logger.info("purged %d %s", res.rowcount, model.__tablename__)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
await session.rollback()
|
||||
logger.warning("purge %s failed: %s", model.__tablename__, exc)
|
||||
|
||||
|
||||
async def purge_once(sessionmaker, store) -> None:
|
||||
settings = get_settings()
|
||||
cutoff = datetime.now(UTC) - timedelta(days=settings.purge_after_days)
|
||||
await _purge_media(sessionmaker, store, cutoff)
|
||||
for model in _PURGE_ORDER:
|
||||
await _purge_table(sessionmaker, model, cutoff)
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(levelname)s [%(name)s] %(message)s", stream=sys.stdout
|
||||
)
|
||||
settings = get_settings()
|
||||
store = S3ObjectStore(settings)
|
||||
try:
|
||||
await store.ensure_bucket()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("ensure_bucket failed: %s", exc)
|
||||
sessionmaker = get_sessionmaker()
|
||||
logger.info(
|
||||
"worker started; purge every %ds (recovery window %dd)",
|
||||
settings.purge_interval_seconds,
|
||||
settings.purge_after_days,
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
await purge_once(sessionmaker, store)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("purge cycle error: %s", exc)
|
||||
await asyncio.sleep(settings.purge_interval_seconds)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user