Guard against schema drift (readiness 503 + loud startup log) #239
@@ -12,6 +12,7 @@ from sqlalchemy import text
|
||||
|
||||
from app.core.config import get_settings
|
||||
from app.core.db import get_engine
|
||||
from app.core.schema_version import schema_is_current
|
||||
|
||||
router = APIRouter(tags=["health"])
|
||||
|
||||
@@ -33,9 +34,20 @@ async def ready(response: Response) -> dict:
|
||||
try:
|
||||
async with get_engine().connect() as conn:
|
||||
await conn.execute(text("SELECT 1"))
|
||||
checks["database"] = "ok"
|
||||
checks["database"] = "ok"
|
||||
# Schema drift = code ahead of the DB; queries would 500. Fail
|
||||
# readiness loudly rather than serve a broken surface.
|
||||
ok, db, expected = await schema_is_current(conn)
|
||||
if not ok:
|
||||
checks["schema"] = (
|
||||
f"drift: db={sorted(db) or ['none']} expected={sorted(expected)} "
|
||||
"— run 'alembic upgrade head'"
|
||||
)
|
||||
response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
|
||||
return {"status": "not ready", "checks": checks}
|
||||
checks["schema"] = "ok"
|
||||
return {"status": "ready", "checks": checks}
|
||||
except Exception as exc: # noqa: BLE001 — surface any failure as "not ready"
|
||||
checks["database"] = "error"
|
||||
checks.setdefault("database", "error")
|
||||
response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
|
||||
return {"status": "not ready", "checks": checks, "detail": str(exc)}
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
"""Schema-drift detection — a safety net for the deploy pipeline.
|
||||
|
||||
If a deploy ships code whose models reference a column a migration hasn't added
|
||||
yet (the code is ahead of the DB), every query against that table 500s with an
|
||||
opaque ``UndefinedColumnError``. That is exactly the failure that took the tree
|
||||
list down once: the backend image advanced but ``alembic upgrade head`` hadn't
|
||||
run on the server.
|
||||
|
||||
The real prevention is auto-migrate on deploy (the entrypoint runs
|
||||
``alembic upgrade head`` when ``RUN_MIGRATIONS=1``). This module is defense in
|
||||
depth: it makes the drift *loud and explicit* — a readiness failure and a
|
||||
CRITICAL startup log — instead of a silent storm of 500s, so a half-applied
|
||||
deploy is obvious within seconds.
|
||||
"""
|
||||
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import AsyncConnection
|
||||
|
||||
# app/core/schema_version.py -> backend/ (parents: core, app, backend)
|
||||
_MIGRATIONS_DIR = Path(__file__).resolve().parents[2] / "migrations"
|
||||
|
||||
|
||||
@lru_cache
|
||||
def expected_heads() -> frozenset[str]:
|
||||
"""Revision head(s) baked into this image's migration scripts. Static for a
|
||||
given build, so cache it."""
|
||||
from alembic.config import Config
|
||||
from alembic.script import ScriptDirectory
|
||||
|
||||
cfg = Config()
|
||||
cfg.set_main_option("script_location", str(_MIGRATIONS_DIR))
|
||||
return frozenset(ScriptDirectory.from_config(cfg).get_heads())
|
||||
|
||||
|
||||
async def db_heads(conn: AsyncConnection) -> frozenset[str] | None:
|
||||
"""Revision(s) the database is stamped at, or ``None`` when the DB is not
|
||||
Alembic-managed (no ``alembic_version`` table — e.g. a test DB built straight
|
||||
from ``create_all``). ``to_regclass`` returns NULL rather than erroring when
|
||||
the table is absent, so this never poisons the caller's transaction."""
|
||||
if await conn.scalar(text("SELECT to_regclass('public.alembic_version')")) is None:
|
||||
return None
|
||||
result = await conn.execute(text("SELECT version_num FROM alembic_version"))
|
||||
return frozenset(row[0] for row in result)
|
||||
|
||||
|
||||
async def schema_is_current(
|
||||
conn: AsyncConnection,
|
||||
) -> tuple[bool, frozenset[str], frozenset[str]]:
|
||||
"""``(ok, db, expected)``. ``ok`` is True when the DB is stamped at the
|
||||
code's head(s). A DB with no ``alembic_version`` table is treated as current
|
||||
(not Alembic-managed → nothing to compare), so this stays quiet in tests."""
|
||||
expected = expected_heads()
|
||||
current = await db_heads(conn)
|
||||
if current is None:
|
||||
return True, frozenset(), expected
|
||||
return current == expected, current, expected
|
||||
@@ -7,6 +7,7 @@ engine is the single enforcement point for reads.
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import JSONResponse
|
||||
@@ -14,6 +15,8 @@ from fastapi.responses import JSONResponse
|
||||
from app.api.health import router as health_router
|
||||
from app.api.v1 import api_router
|
||||
from app.core.config import get_settings
|
||||
from app.core.db import get_engine
|
||||
from app.core.schema_version import schema_is_current
|
||||
from app.services.exceptions import Conflict, Forbidden, NotFound
|
||||
|
||||
|
||||
@@ -30,6 +33,32 @@ def _configure_logging() -> None:
|
||||
app_logger.propagate = False
|
||||
|
||||
|
||||
async def _check_schema_drift() -> None:
|
||||
"""On startup, shout if the DB schema is behind the code. The entrypoint
|
||||
runs migrations when RUN_MIGRATIONS=1; this catches the case where that
|
||||
didn't happen, so a half-applied deploy is obvious in the logs instead of a
|
||||
silent storm of 500s. Never blocks startup — purely advisory."""
|
||||
logger = logging.getLogger("provenance")
|
||||
try:
|
||||
async with get_engine().connect() as conn:
|
||||
ok, db, expected = await schema_is_current(conn)
|
||||
if not ok:
|
||||
logger.critical(
|
||||
"SCHEMA DRIFT: database is at %s but this build expects %s. "
|
||||
"Run 'alembic upgrade head' — queries will fail until migrated.",
|
||||
sorted(db) or ["none"],
|
||||
sorted(expected),
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001 — advisory only; never block startup
|
||||
logger.warning("schema drift check skipped: %s", exc)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _lifespan(app: FastAPI):
|
||||
await _check_schema_drift()
|
||||
yield
|
||||
|
||||
|
||||
def _register_error_handlers(app: FastAPI) -> None:
|
||||
@app.exception_handler(NotFound)
|
||||
async def _not_found(request: Request, exc: NotFound) -> JSONResponse:
|
||||
@@ -51,6 +80,7 @@ def create_app() -> FastAPI:
|
||||
title=settings.app_name,
|
||||
version=settings.version,
|
||||
description="Provenance API — family and land provenance.",
|
||||
lifespan=_lifespan,
|
||||
)
|
||||
app.include_router(health_router)
|
||||
app.include_router(api_router)
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
"""Schema-drift guard: the DB-vs-code head check behind /health/ready and the
|
||||
startup log. Regression cover for the outage where the backend image shipped
|
||||
ahead of an un-applied migration and every trees query 500'd."""
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.core.schema_version import db_heads, expected_heads, schema_is_current
|
||||
|
||||
|
||||
def test_expected_heads_is_a_single_known_head():
|
||||
heads = expected_heads()
|
||||
# Linear migration history → exactly one head, and it's a real revision id.
|
||||
assert len(heads) == 1
|
||||
assert all(h and isinstance(h, str) for h in heads)
|
||||
|
||||
|
||||
async def test_schema_is_current_detects_drift(db_session):
|
||||
conn = await db_session.connection()
|
||||
|
||||
# The test DB is built from create_all (no alembic_version table), so it is
|
||||
# not Alembic-managed and the check stays quiet — treated as current.
|
||||
await conn.execute(text("DROP TABLE IF EXISTS alembic_version"))
|
||||
assert await db_heads(conn) is None
|
||||
ok, _, _ = await schema_is_current(conn)
|
||||
assert ok is True
|
||||
|
||||
# Stamp an old/wrong revision → drift detected.
|
||||
await conn.execute(text("CREATE TABLE alembic_version (version_num varchar(32) NOT NULL)"))
|
||||
await conn.execute(text("INSERT INTO alembic_version (version_num) VALUES ('0000deadbeef')"))
|
||||
ok, db, expected = await schema_is_current(conn)
|
||||
assert ok is False
|
||||
assert db == frozenset({"0000deadbeef"})
|
||||
|
||||
# Stamp the code's real head → current again.
|
||||
head = next(iter(expected))
|
||||
await conn.execute(text("DELETE FROM alembic_version"))
|
||||
await conn.execute(text("INSERT INTO alembic_version (version_num) VALUES (:h)"), {"h": head})
|
||||
ok, _, _ = await schema_is_current(conn)
|
||||
assert ok is True
|
||||
|
||||
# Leave no alembic_version behind for other tests.
|
||||
await conn.execute(text("DROP TABLE IF EXISTS alembic_version"))
|
||||
Reference in New Issue
Block a user