"""Background worker. Same image as the backend, run in worker mode (`python -m app.worker`). First job: the scheduled soft-delete purge — hard- delete rows whose ``deleted_at`` is older than the recovery window, and remove their objects from storage. More jobs (media processing, scraping, hints) and a proper queue arrive in later phases. """ import asyncio import logging import sys from datetime import UTC, datetime, timedelta from sqlalchemy import delete, select from app.core.config import get_settings from app.core.db import get_sessionmaker from app.integrations.objectstore.s3 import S3ObjectStore from app.models import ( Citation, Event, Media, Name, Person, Place, PlaceName, Relationship, Source, Tree, User, ) logger = logging.getLogger("provenance.worker") # Child -> parent so foreign keys are satisfied as rows are removed. _PURGE_ORDER = [Citation, Name, Event, Relationship, PlaceName, Place, Source, Person, Tree, User] async def _purge_media(sessionmaker, store, cutoff: datetime) -> None: async with sessionmaker() as session: rows = ( await session.execute( select(Media).where(Media.deleted_at.is_not(None), Media.deleted_at < cutoff) ) ).scalars().all() for media in rows: try: await store.delete_object(key=media.storage_key) except Exception as exc: # noqa: BLE001 logger.warning("object delete failed for %s: %s", media.storage_key, exc) await session.delete(media) await session.commit() if rows: logger.info("purged %d media", len(rows)) async def _purge_table(sessionmaker, model, cutoff: datetime) -> None: async with sessionmaker() as session: try: res = await session.execute( delete(model).where(model.deleted_at.is_not(None), model.deleted_at < cutoff) ) await session.commit() if res.rowcount: logger.info("purged %d %s", res.rowcount, model.__tablename__) except Exception as exc: # noqa: BLE001 await session.rollback() logger.warning("purge %s failed: %s", model.__tablename__, exc) async def purge_once(sessionmaker, store) -> None: settings = get_settings() cutoff = datetime.now(UTC) - timedelta(days=settings.purge_after_days) await _purge_media(sessionmaker, store, cutoff) for model in _PURGE_ORDER: await _purge_table(sessionmaker, model, cutoff) async def main() -> None: logging.basicConfig( level=logging.INFO, format="%(levelname)s [%(name)s] %(message)s", stream=sys.stdout ) settings = get_settings() store = S3ObjectStore(settings) try: await store.ensure_bucket() except Exception as exc: # noqa: BLE001 logger.warning("ensure_bucket failed: %s", exc) sessionmaker = get_sessionmaker() logger.info( "worker started; purge every %ds (recovery window %dd)", settings.purge_interval_seconds, settings.purge_after_days, ) while True: try: await purge_once(sessionmaker, store) except Exception as exc: # noqa: BLE001 logger.warning("purge cycle error: %s", exc) await asyncio.sleep(settings.purge_interval_seconds) if __name__ == "__main__": asyncio.run(main())