34d30e3134
Media model + migration; an ObjectStore interface with an S3/MinIO (boto3) implementation behind the service layer. Upload (multipart) stores bytes in object storage + a metadata row (checksum, size, content-type, optional attach to person/event/source); list returns presigned URLs; delete is soft. Editor-gated, privacy-filtered, audited. 24 tests pass (object store faked). Introduces the worker container (same image, 'python -m app.worker'): its first job is the scheduled 30-day soft-delete purge across tables + media object cleanup. Compose gains worker + S3 env on backend/worker; dev override builds the worker too. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Paul <justin@jpaul.me>
104 lines
3.4 KiB
Python
104 lines
3.4 KiB
Python
"""Background worker. Same image as the backend, run in worker mode
|
|
(`python -m app.worker`). First job: the scheduled soft-delete purge — hard-
|
|
delete rows whose ``deleted_at`` is older than the recovery window, and remove
|
|
their objects from storage. More jobs (media processing, scraping, hints) and a
|
|
proper queue arrive in later phases.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
from sqlalchemy import delete, select
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.db import get_sessionmaker
|
|
from app.integrations.objectstore.s3 import S3ObjectStore
|
|
from app.models import (
|
|
Citation,
|
|
Event,
|
|
Media,
|
|
Name,
|
|
Person,
|
|
Place,
|
|
PlaceName,
|
|
Relationship,
|
|
Source,
|
|
Tree,
|
|
User,
|
|
)
|
|
|
|
logger = logging.getLogger("provenance.worker")
|
|
|
|
# Child -> parent so foreign keys are satisfied as rows are removed.
|
|
_PURGE_ORDER = [Citation, Name, Event, Relationship, PlaceName, Place, Source, Person, Tree, User]
|
|
|
|
|
|
async def _purge_media(sessionmaker, store, cutoff: datetime) -> None:
|
|
async with sessionmaker() as session:
|
|
rows = (
|
|
await session.execute(
|
|
select(Media).where(Media.deleted_at.is_not(None), Media.deleted_at < cutoff)
|
|
)
|
|
).scalars().all()
|
|
for media in rows:
|
|
try:
|
|
await store.delete_object(key=media.storage_key)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("object delete failed for %s: %s", media.storage_key, exc)
|
|
await session.delete(media)
|
|
await session.commit()
|
|
if rows:
|
|
logger.info("purged %d media", len(rows))
|
|
|
|
|
|
async def _purge_table(sessionmaker, model, cutoff: datetime) -> None:
|
|
async with sessionmaker() as session:
|
|
try:
|
|
res = await session.execute(
|
|
delete(model).where(model.deleted_at.is_not(None), model.deleted_at < cutoff)
|
|
)
|
|
await session.commit()
|
|
if res.rowcount:
|
|
logger.info("purged %d %s", res.rowcount, model.__tablename__)
|
|
except Exception as exc: # noqa: BLE001
|
|
await session.rollback()
|
|
logger.warning("purge %s failed: %s", model.__tablename__, exc)
|
|
|
|
|
|
async def purge_once(sessionmaker, store) -> None:
|
|
settings = get_settings()
|
|
cutoff = datetime.now(UTC) - timedelta(days=settings.purge_after_days)
|
|
await _purge_media(sessionmaker, store, cutoff)
|
|
for model in _PURGE_ORDER:
|
|
await _purge_table(sessionmaker, model, cutoff)
|
|
|
|
|
|
async def main() -> None:
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(levelname)s [%(name)s] %(message)s", stream=sys.stdout
|
|
)
|
|
settings = get_settings()
|
|
store = S3ObjectStore(settings)
|
|
try:
|
|
await store.ensure_bucket()
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("ensure_bucket failed: %s", exc)
|
|
sessionmaker = get_sessionmaker()
|
|
logger.info(
|
|
"worker started; purge every %ds (recovery window %dd)",
|
|
settings.purge_interval_seconds,
|
|
settings.purge_after_days,
|
|
)
|
|
while True:
|
|
try:
|
|
await purge_once(sessionmaker, store)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("purge cycle error: %s", exc)
|
|
await asyncio.sleep(settings.purge_interval_seconds)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|