Files
justin 34d30e3134 Add media (object storage) and the background worker (Phase 1)
Media model + migration; an ObjectStore interface with an S3/MinIO (boto3) implementation behind the service layer. Upload (multipart) stores bytes in object storage + a metadata row (checksum, size, content-type, optional attach to person/event/source); list returns presigned URLs; delete is soft. Editor-gated, privacy-filtered, audited. 24 tests pass (object store faked).

Introduces the worker container (same image, 'python -m app.worker'): its first job is the scheduled 30-day soft-delete purge across tables + media object cleanup. Compose gains worker + S3 env on backend/worker; dev override builds the worker too.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-06 21:46:09 -04:00

104 lines
3.4 KiB
Python

"""Background worker. Same image as the backend, run in worker mode
(`python -m app.worker`). First job: the scheduled soft-delete purge — hard-
delete rows whose ``deleted_at`` is older than the recovery window, and remove
their objects from storage. More jobs (media processing, scraping, hints) and a
proper queue arrive in later phases.
"""
import asyncio
import logging
import sys
from datetime import UTC, datetime, timedelta
from sqlalchemy import delete, select
from app.core.config import get_settings
from app.core.db import get_sessionmaker
from app.integrations.objectstore.s3 import S3ObjectStore
from app.models import (
Citation,
Event,
Media,
Name,
Person,
Place,
PlaceName,
Relationship,
Source,
Tree,
User,
)
logger = logging.getLogger("provenance.worker")
# Child -> parent so foreign keys are satisfied as rows are removed.
_PURGE_ORDER = [Citation, Name, Event, Relationship, PlaceName, Place, Source, Person, Tree, User]
async def _purge_media(sessionmaker, store, cutoff: datetime) -> None:
async with sessionmaker() as session:
rows = (
await session.execute(
select(Media).where(Media.deleted_at.is_not(None), Media.deleted_at < cutoff)
)
).scalars().all()
for media in rows:
try:
await store.delete_object(key=media.storage_key)
except Exception as exc: # noqa: BLE001
logger.warning("object delete failed for %s: %s", media.storage_key, exc)
await session.delete(media)
await session.commit()
if rows:
logger.info("purged %d media", len(rows))
async def _purge_table(sessionmaker, model, cutoff: datetime) -> None:
async with sessionmaker() as session:
try:
res = await session.execute(
delete(model).where(model.deleted_at.is_not(None), model.deleted_at < cutoff)
)
await session.commit()
if res.rowcount:
logger.info("purged %d %s", res.rowcount, model.__tablename__)
except Exception as exc: # noqa: BLE001
await session.rollback()
logger.warning("purge %s failed: %s", model.__tablename__, exc)
async def purge_once(sessionmaker, store) -> None:
settings = get_settings()
cutoff = datetime.now(UTC) - timedelta(days=settings.purge_after_days)
await _purge_media(sessionmaker, store, cutoff)
for model in _PURGE_ORDER:
await _purge_table(sessionmaker, model, cutoff)
async def main() -> None:
logging.basicConfig(
level=logging.INFO, format="%(levelname)s [%(name)s] %(message)s", stream=sys.stdout
)
settings = get_settings()
store = S3ObjectStore(settings)
try:
await store.ensure_bucket()
except Exception as exc: # noqa: BLE001
logger.warning("ensure_bucket failed: %s", exc)
sessionmaker = get_sessionmaker()
logger.info(
"worker started; purge every %ds (recovery window %dd)",
settings.purge_interval_seconds,
settings.purge_after_days,
)
while True:
try:
await purge_once(sessionmaker, store)
except Exception as exc: # noqa: BLE001
logger.warning("purge cycle error: %s", exc)
await asyncio.sleep(settings.purge_interval_seconds)
if __name__ == "__main__":
asyncio.run(main())