"""Per-call usage telemetry — JSONL with daily rotation and retention. Reusable as-is across products. Drop the import + `with TimedCall(...)` into any tool body and the call gets logged with the tool name, args, elapsed time, and any extra fields the tool sets via `_call.set(...)`. The log file is `var/logs/usage.jsonl` by default (override with the `USAGE_LOG_DIR` env). Daily rotation; files older than `USAGE_LOG_KEEP_DAYS` (default 90) are deleted on next write. Layout of one record: { "ts": "2026-05-22T13:14:15+00:00", "tool": "search_docs", "args": {"query": "...", "version": "10.9", "k": 10}, "elapsed_ms": 142.5, "hits_returned": 7, # optional, set by the tool "reranked": true, # optional, set by the tool // ... any other key the tool sets via _call.set(...) } """ from __future__ import annotations import json import os import time import threading from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any USAGE_LOG_DIR = Path(os.environ.get("USAGE_LOG_DIR", "var/logs")) USAGE_LOG_KEEP_DAYS = int(os.environ.get("USAGE_LOG_KEEP_DAYS", "90")) # Single global lock to serialize writes from multiple request handlers. # JSONL appends are atomic at the OS level for short records on most # filesystems, but the lock is cheap and saves you from cross-platform # surprises. _lock = threading.Lock() _last_rotation_check: float = 0.0 def _maybe_rotate() -> None: """Move usage.jsonl → usage.jsonl. if the date has rolled. Cheap to call; we only do filesystem work when a day has actually passed since the last check. """ global _last_rotation_check now = time.time() if now - _last_rotation_check < 300: # 5 min cap between checks return _last_rotation_check = now USAGE_LOG_DIR.mkdir(parents=True, exist_ok=True) active = USAGE_LOG_DIR / "usage.jsonl" if active.exists(): try: mtime = datetime.fromtimestamp(active.stat().st_mtime, tz=timezone.utc).date() today = datetime.now(timezone.utc).date() if mtime < today: rotated = USAGE_LOG_DIR / f"usage.jsonl.{mtime.isoformat()}" if not rotated.exists(): active.rename(rotated) except OSError: pass # Retention: delete usage.jsonl.YYYY-MM-DD files older than the # retention window. The active file is never deleted by this. cutoff = datetime.now(timezone.utc).date() - timedelta(days=USAGE_LOG_KEEP_DAYS) for f in USAGE_LOG_DIR.glob("usage.jsonl.*"): try: datestamp = f.name.split(".", 2)[-1] if datetime.fromisoformat(datestamp).date() < cutoff: f.unlink() except (ValueError, OSError): continue class TimedCall: """Context manager that captures one tool call's telemetry record. Usage: with TimedCall("search_docs", {"query": q, ...}) as call: ... do the work ... call.set(hits_returned=len(results), reranked=True) On exit, writes one JSONL record to usage.jsonl. Exceptions are captured into the `error` field; the exception is re-raised so the tool's caller sees the failure. """ def __init__(self, tool: str, args: dict[str, Any]): self.tool = tool self.args = args self.extra: dict[str, Any] = {} self._t0: float = 0.0 def set(self, **kwargs: Any) -> None: """Attach extra fields to the eventual telemetry record.""" self.extra.update(kwargs) def __enter__(self) -> "TimedCall": self._t0 = time.perf_counter() return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: elapsed_ms = (time.perf_counter() - self._t0) * 1000.0 record: dict[str, Any] = { "ts": datetime.now(timezone.utc).isoformat(), "tool": self.tool, "args": self.args, "elapsed_ms": round(elapsed_ms, 2), } if exc_type is not None: record["error"] = f"{exc_type.__name__}: {exc_val}" record.update(self.extra) _maybe_rotate() with _lock: USAGE_LOG_DIR.mkdir(parents=True, exist_ok=True) with open(USAGE_LOG_DIR / "usage.jsonl", "a") as fh: fh.write(json.dumps(record, separators=(",", ":")) + "\n") # Don't swallow the exception — the caller still needs to see it.