Guard against schema drift (readiness 503 + loud startup log) #239

Merged
justin merged 1 commits from schema-drift-guard into main 2026-06-09 21:56:09 -04:00
4 changed files with 145 additions and 2 deletions
+14 -2
View File
@@ -12,6 +12,7 @@ from sqlalchemy import text
from app.core.config import get_settings from app.core.config import get_settings
from app.core.db import get_engine from app.core.db import get_engine
from app.core.schema_version import schema_is_current
router = APIRouter(tags=["health"]) router = APIRouter(tags=["health"])
@@ -33,9 +34,20 @@ async def ready(response: Response) -> dict:
try: try:
async with get_engine().connect() as conn: async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1")) await conn.execute(text("SELECT 1"))
checks["database"] = "ok" checks["database"] = "ok"
# Schema drift = code ahead of the DB; queries would 500. Fail
# readiness loudly rather than serve a broken surface.
ok, db, expected = await schema_is_current(conn)
if not ok:
checks["schema"] = (
f"drift: db={sorted(db) or ['none']} expected={sorted(expected)} "
"— run 'alembic upgrade head'"
)
response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
return {"status": "not ready", "checks": checks}
checks["schema"] = "ok"
return {"status": "ready", "checks": checks} return {"status": "ready", "checks": checks}
except Exception as exc: # noqa: BLE001 — surface any failure as "not ready" except Exception as exc: # noqa: BLE001 — surface any failure as "not ready"
checks["database"] = "error" checks.setdefault("database", "error")
response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
return {"status": "not ready", "checks": checks, "detail": str(exc)} return {"status": "not ready", "checks": checks, "detail": str(exc)}
+59
View File
@@ -0,0 +1,59 @@
"""Schema-drift detection — a safety net for the deploy pipeline.
If a deploy ships code whose models reference a column a migration hasn't added
yet (the code is ahead of the DB), every query against that table 500s with an
opaque ``UndefinedColumnError``. That is exactly the failure that took the tree
list down once: the backend image advanced but ``alembic upgrade head`` hadn't
run on the server.
The real prevention is auto-migrate on deploy (the entrypoint runs
``alembic upgrade head`` when ``RUN_MIGRATIONS=1``). This module is defense in
depth: it makes the drift *loud and explicit* — a readiness failure and a
CRITICAL startup log — instead of a silent storm of 500s, so a half-applied
deploy is obvious within seconds.
"""
from functools import lru_cache
from pathlib import Path
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncConnection
# app/core/schema_version.py -> backend/ (parents: core, app, backend)
_MIGRATIONS_DIR = Path(__file__).resolve().parents[2] / "migrations"
@lru_cache
def expected_heads() -> frozenset[str]:
"""Revision head(s) baked into this image's migration scripts. Static for a
given build, so cache it."""
from alembic.config import Config
from alembic.script import ScriptDirectory
cfg = Config()
cfg.set_main_option("script_location", str(_MIGRATIONS_DIR))
return frozenset(ScriptDirectory.from_config(cfg).get_heads())
async def db_heads(conn: AsyncConnection) -> frozenset[str] | None:
"""Revision(s) the database is stamped at, or ``None`` when the DB is not
Alembic-managed (no ``alembic_version`` table — e.g. a test DB built straight
from ``create_all``). ``to_regclass`` returns NULL rather than erroring when
the table is absent, so this never poisons the caller's transaction."""
if await conn.scalar(text("SELECT to_regclass('public.alembic_version')")) is None:
return None
result = await conn.execute(text("SELECT version_num FROM alembic_version"))
return frozenset(row[0] for row in result)
async def schema_is_current(
conn: AsyncConnection,
) -> tuple[bool, frozenset[str], frozenset[str]]:
"""``(ok, db, expected)``. ``ok`` is True when the DB is stamped at the
code's head(s). A DB with no ``alembic_version`` table is treated as current
(not Alembic-managed → nothing to compare), so this stays quiet in tests."""
expected = expected_heads()
current = await db_heads(conn)
if current is None:
return True, frozenset(), expected
return current == expected, current, expected
+30
View File
@@ -7,6 +7,7 @@ engine is the single enforcement point for reads.
import logging import logging
import sys import sys
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@@ -14,6 +15,8 @@ from fastapi.responses import JSONResponse
from app.api.health import router as health_router from app.api.health import router as health_router
from app.api.v1 import api_router from app.api.v1 import api_router
from app.core.config import get_settings from app.core.config import get_settings
from app.core.db import get_engine
from app.core.schema_version import schema_is_current
from app.services.exceptions import Conflict, Forbidden, NotFound from app.services.exceptions import Conflict, Forbidden, NotFound
@@ -30,6 +33,32 @@ def _configure_logging() -> None:
app_logger.propagate = False app_logger.propagate = False
async def _check_schema_drift() -> None:
"""On startup, shout if the DB schema is behind the code. The entrypoint
runs migrations when RUN_MIGRATIONS=1; this catches the case where that
didn't happen, so a half-applied deploy is obvious in the logs instead of a
silent storm of 500s. Never blocks startup — purely advisory."""
logger = logging.getLogger("provenance")
try:
async with get_engine().connect() as conn:
ok, db, expected = await schema_is_current(conn)
if not ok:
logger.critical(
"SCHEMA DRIFT: database is at %s but this build expects %s. "
"Run 'alembic upgrade head' — queries will fail until migrated.",
sorted(db) or ["none"],
sorted(expected),
)
except Exception as exc: # noqa: BLE001 — advisory only; never block startup
logger.warning("schema drift check skipped: %s", exc)
@asynccontextmanager
async def _lifespan(app: FastAPI):
await _check_schema_drift()
yield
def _register_error_handlers(app: FastAPI) -> None: def _register_error_handlers(app: FastAPI) -> None:
@app.exception_handler(NotFound) @app.exception_handler(NotFound)
async def _not_found(request: Request, exc: NotFound) -> JSONResponse: async def _not_found(request: Request, exc: NotFound) -> JSONResponse:
@@ -51,6 +80,7 @@ def create_app() -> FastAPI:
title=settings.app_name, title=settings.app_name,
version=settings.version, version=settings.version,
description="Provenance API — family and land provenance.", description="Provenance API — family and land provenance.",
lifespan=_lifespan,
) )
app.include_router(health_router) app.include_router(health_router)
app.include_router(api_router) app.include_router(api_router)
+42
View File
@@ -0,0 +1,42 @@
"""Schema-drift guard: the DB-vs-code head check behind /health/ready and the
startup log. Regression cover for the outage where the backend image shipped
ahead of an un-applied migration and every trees query 500'd."""
from sqlalchemy import text
from app.core.schema_version import db_heads, expected_heads, schema_is_current
def test_expected_heads_is_a_single_known_head():
heads = expected_heads()
# Linear migration history → exactly one head, and it's a real revision id.
assert len(heads) == 1
assert all(h and isinstance(h, str) for h in heads)
async def test_schema_is_current_detects_drift(db_session):
conn = await db_session.connection()
# The test DB is built from create_all (no alembic_version table), so it is
# not Alembic-managed and the check stays quiet — treated as current.
await conn.execute(text("DROP TABLE IF EXISTS alembic_version"))
assert await db_heads(conn) is None
ok, _, _ = await schema_is_current(conn)
assert ok is True
# Stamp an old/wrong revision → drift detected.
await conn.execute(text("CREATE TABLE alembic_version (version_num varchar(32) NOT NULL)"))
await conn.execute(text("INSERT INTO alembic_version (version_num) VALUES ('0000deadbeef')"))
ok, db, expected = await schema_is_current(conn)
assert ok is False
assert db == frozenset({"0000deadbeef"})
# Stamp the code's real head → current again.
head = next(iter(expected))
await conn.execute(text("DELETE FROM alembic_version"))
await conn.execute(text("INSERT INTO alembic_version (version_num) VALUES (:h)"), {"h": head})
ok, _, _ = await schema_is_current(conn)
assert ok is True
# Leave no alembic_version behind for other tests.
await conn.execute(text("DROP TABLE IF EXISTS alembic_version"))