From 796bafac633cbd46037db56e5e909e0553a60152 Mon Sep 17 00:00:00 2001 From: Justin Date: Sun, 12 Apr 2026 19:35:31 -0400 Subject: [PATCH] feat: add zroc-planner Python vCenter collector Co-Authored-By: Claude Sonnet 4.6 --- zroc-planner/Dockerfile | 50 ++++ zroc-planner/README.md | 187 +++++++++++++++ zroc-planner/collector.py | 428 ++++++++++++++++++++++++++++++++++ zroc-planner/config.py | 79 +++++++ zroc-planner/requirements.txt | 6 + zroc-planner/server.py | 214 +++++++++++++++++ 6 files changed, 964 insertions(+) create mode 100644 zroc-planner/Dockerfile create mode 100644 zroc-planner/README.md create mode 100644 zroc-planner/collector.py create mode 100644 zroc-planner/config.py create mode 100644 zroc-planner/requirements.txt create mode 100644 zroc-planner/server.py diff --git a/zroc-planner/Dockerfile b/zroc-planner/Dockerfile new file mode 100644 index 0000000..ae16056 --- /dev/null +++ b/zroc-planner/Dockerfile @@ -0,0 +1,50 @@ +# ── Stage 1: dependency builder ─────────────────────────────────────────────── +FROM python:3.12-slim AS builder + +WORKDIR /build + +# Install build tools needed to compile pyvmomi's C extension if required +RUN apt-get update -qq && apt-get install -y --no-install-recommends \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir --prefix=/install -r requirements.txt + + +# ── Stage 2: runtime image ──────────────────────────────────────────────────── +FROM python:3.12-slim + +# Non-root user for security +RUN groupadd -r collector && useradd -r -g collector collector + +WORKDIR /app + +# Copy installed packages from builder +COPY --from=builder /install /usr/local + +# Copy application source +COPY config.py collector.py server.py ./ + +USER collector + +# Metrics port +EXPOSE 9272 + +# Health check — Docker will poll this every 30s +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD python - <<'EOF' +import urllib.request, sys +try: + r = urllib.request.urlopen("http://localhost:9272/health", timeout=8) + import json + d = json.load(r) + sys.exit(0 if d.get("status") == "ok" else 1) +except Exception: + sys.exit(1) +EOF + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 + +ENTRYPOINT ["python", "server.py"] diff --git a/zroc-planner/README.md b/zroc-planner/README.md new file mode 100644 index 0000000..cc30cb8 --- /dev/null +++ b/zroc-planner/README.md @@ -0,0 +1,187 @@ +# zROC Planner — vCenter Metrics Collector + +A Python-based Prometheus exporter that replaces zPlanner's PowerCLI scripts. +It queries vCenter for per-VM virtual-disk I/O statistics using the pyvmomi SDK +and exposes them on a `/metrics` endpoint for Prometheus to scrape. + +## Metrics exposed + +| Prometheus metric | Unit | Description | +|---|---|---| +| `vcenter_vm_disk_write_iops` | IOPS | Write IOPS (sum across all disk instances) | +| `vcenter_vm_disk_write_throughput_mbps` | MB/s | Write throughput (sum across all disk instances) | +| `vcenter_vm_disk_write_latency_ms` | ms | Write latency (mean across all disk instances) | + +Every metric carries these labels: + +| Label | Example | Notes | +|---|---|---| +| `vm_name` | `web-prod-01` | VM display name | +| `vm_moref` | `vm-1234` | vCenter Managed Object Reference (stable ID) | +| `cluster` | `Cluster-01` | Compute cluster name | +| `host` | `esxi-01.corp` | ESXi host the VM is running on | +| `datacenter` | `DC-East` | vCenter datacenter name | + +### Self-monitoring metrics + +| Metric | Description | +|---|---| +| `vcenter_collector_last_collection_timestamp_seconds` | Unix timestamp of the last successful poll | +| `vcenter_collector_last_collection_duration_seconds` | How long the last poll took | +| `vcenter_collector_last_vm_count` | VMs collected in the last cycle | +| `vcenter_collector_cycles_total` | Running count of completed cycles | + +## Configuration + +All settings are environment variables. + +| Variable | Default | Description | +|---|---|---| +| `VCENTER_HOST` | `vcenter.local` | vCenter hostname or IP | +| `VCENTER_USER` | `administrator@vsphere.local` | vCenter username (read-only is sufficient) | +| `VCENTER_PASSWORD` | _(required)_ | vCenter password | +| `VCENTER_PORT` | `443` | vCenter HTTPS port | +| `VCENTER_SSL_VERIFY` | `false` | Set `true` to enforce TLS certificate validation | +| `POLL_INTERVAL` | `300` | Seconds between collection cycles | +| `BATCH_SIZE` | `100` | VMs per QueryPerf call (VMware recommends ≤ 200) | +| `BATCH_DELAY` | `0.5` | Seconds to sleep between batches | +| `VM_INVENTORY_TTL` | `600` | Seconds between VM inventory refreshes | +| `PERF_INTERVAL_ID` | `300` | vCenter rollup interval (300 = 5-minute stats) | +| `HTTP_HOST` | `0.0.0.0` | IP the HTTP server binds to | +| `HTTP_PORT` | `9272` | Port for `/metrics` and `/health` | +| `LOG_LEVEL` | `INFO` | Python log level (`DEBUG`, `INFO`, `WARNING`, `ERROR`) | + +## Running + +### Docker (recommended) + +```bash +docker build -t zroc-planner:latest . + +docker run -d \ + --name zroc-planner \ + -p 9272:9272 \ + -e VCENTER_HOST=vcenter.corp.example \ + -e VCENTER_USER=svc-readonly@vsphere.local \ + -e VCENTER_PASSWORD=supersecret \ + -e POLL_INTERVAL=300 \ + -e BATCH_SIZE=100 \ + zroc-planner:latest +``` + +### Docker Compose + +Add to your existing `docker-compose.yaml`: + +```yaml + zroc-planner: + build: ./zroc-planner + container_name: zroc-planner + hostname: zroc-planner + ports: + - "9272:9272" + environment: + - VCENTER_HOST=vcenter.corp.example + - VCENTER_USER=svc-readonly@vsphere.local + - VCENTER_PASSWORD=supersecret + - POLL_INTERVAL=300 + - BATCH_SIZE=100 + - BATCH_DELAY=0.5 + - VM_INVENTORY_TTL=600 + - LOG_LEVEL=INFO + networks: + - back-tier + restart: always +``` + +Then add a scrape job to `prometheus/prometheus.yml`: + +```yaml +scrape_configs: + - job_name: vcenter_planner + scrape_interval: 300s + scrape_timeout: 30s + static_configs: + - targets: ['zroc-planner:9272'] +``` + +### Local (dev) + +```bash +python -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +export VCENTER_HOST=vcenter.local +export VCENTER_USER=administrator@vsphere.local +export VCENTER_PASSWORD=password + +python server.py +``` + +## Endpoints + +| Path | Description | +|---|---| +| `GET /metrics` | Prometheus text exposition | +| `GET /health` | JSON health check (200 = healthy, 503 = degraded) | + +### Health check response + +```json +{ + "status": "ok", + "last_collection_time": 1712345678.0, + "last_collection_duration_seconds": 4.23, + "last_vm_count": 3412, + "last_error": null, + "collection_cycles": 42, + "stale": false +} +``` + +## Scaling notes + +| Environment size | Recommended settings | +|---|---| +| ≤ 500 VMs | `BATCH_SIZE=100`, `BATCH_DELAY=0.5` | +| 500–5 000 VMs | `BATCH_SIZE=150`, `BATCH_DELAY=0.5` | +| 5 000–15 000 VMs | `BATCH_SIZE=200`, `BATCH_DELAY=1.0` | + +Collection of 15 000 VMs at `BATCH_SIZE=200` with `BATCH_DELAY=1.0` takes +roughly 75 batches × ~1 s each = **~75 seconds**, comfortably within the +300-second poll window. + +## vCenter permissions + +The service account only needs the read-only role on the vCenter root: + +- `System.View` +- `Performance.ModifyIntervals` (read-only — needed to query counter definitions) + +A standard **Read-Only** vCenter role is sufficient. + +## Architecture + +``` +┌────────────────────────────────────────────────────────────┐ +│ server.py (main thread) │ +│ HTTPServer /metrics /health │ +└────────────────────────┬───────────────────────────────────┘ + │ reads from +┌────────────────────────▼───────────────────────────────────┐ +│ collector.MetricStore (thread-safe dict) │ +└────────────────────────▲───────────────────────────────────┘ + │ writes to +┌────────────────────────┴───────────────────────────────────┐ +│ collector.VCenterCollector (daemon thread) │ +│ ┌──────────────────────────────────────────────────┐ │ +│ │ every POLL_INTERVAL seconds: │ │ +│ │ 1. ensure vCenter session alive │ │ +│ │ 2. refresh VM inventory (if TTL expired) │ │ +│ │ 3. for each batch of BATCH_SIZE VMs: │ │ +│ │ QueryPerf → parse → MetricStore.update │ │ +│ │ sleep(BATCH_DELAY) │ │ +│ └──────────────────────────────────────────────────┘ │ +└────────────────────────────────────────────────────────────┘ +``` diff --git a/zroc-planner/collector.py b/zroc-planner/collector.py new file mode 100644 index 0000000..4672362 --- /dev/null +++ b/zroc-planner/collector.py @@ -0,0 +1,428 @@ +""" +vCenter metrics collector for zROC Planner. + +Collects virtual-disk I/O statistics from vCenter using pyvmomi's +QueryPerf API and publishes them as Prometheus gauges. + +Design highlights +----------------- +* VM inventory is cached and refreshed every VM_INVENTORY_TTL seconds, + so normal polling never touches the inventory API. +* Performance counter IDs are looked up once and cached for the life of + the session. +* VMs are queried in batches (BATCH_SIZE) with a short sleep between + batches so vCenter is never flooded. +* On any vCenter API error the session is torn down and re-established + before the next poll cycle. +""" + +import logging +import threading +import time +from datetime import datetime, timezone +from typing import Optional + +from pyVim.connect import SmartConnect, Disconnect +from pyVmomi import vim, vmodl +import ssl + +import config + +log = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Prometheus metric storage +# --------------------------------------------------------------------------- + +class MetricStore: + """Thread-safe store for the latest per-VM disk metrics.""" + + def __init__(self) -> None: + self._lock = threading.Lock() + # key: vm_moref value: dict of label + metric values + self._data: dict[str, dict] = {} + self.last_collection_time: Optional[float] = None + self.last_collection_duration: float = 0.0 + self.last_vm_count: int = 0 + self.last_error: Optional[str] = None + self.collection_cycles: int = 0 + + def update(self, vm_moref: str, labels: dict, metrics: dict) -> None: + with self._lock: + self._data[vm_moref] = {"labels": labels, "metrics": metrics} + + def clear(self) -> None: + with self._lock: + self._data.clear() + + def snapshot(self) -> dict: + with self._lock: + return dict(self._data) + + def remove_stale(self, active_morefs: set[str]) -> None: + with self._lock: + stale = [k for k in self._data if k not in active_morefs] + for k in stale: + del self._data[k] + + +# Module-level store shared with server.py +store = MetricStore() + + +# --------------------------------------------------------------------------- +# vCenter session helpers +# --------------------------------------------------------------------------- + +def _build_ssl_context() -> ssl.SSLContext: + ctx = ssl.create_default_context() + if not config.VCENTER_SSL_VERIFY: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +def _connect() -> vim.ServiceInstance: + log.info("Connecting to vCenter %s:%s as %s", + config.VCENTER_HOST, config.VCENTER_PORT, config.VCENTER_USER) + si = SmartConnect( + host=config.VCENTER_HOST, + user=config.VCENTER_USER, + pwd=config.VCENTER_PASSWORD, + port=config.VCENTER_PORT, + sslContext=_build_ssl_context(), + connectionPoolTimeout=60, + ) + log.info("Connected to vCenter (session established)") + return si + + +def _disconnect(si: Optional[vim.ServiceInstance]) -> None: + if si is not None: + try: + Disconnect(si) + except Exception: + pass + + +# --------------------------------------------------------------------------- +# Inventory helpers +# --------------------------------------------------------------------------- + +def _get_all_vms(si: vim.ServiceInstance) -> list[vim.VirtualMachine]: + """Return every VM object in the inventory using a ContainerView.""" + content = si.RetrieveContent() + container = content.viewManager.CreateContainerView( + content.rootFolder, [vim.VirtualMachine], True + ) + try: + vms = list(container.view) + finally: + container.Destroy() + return vms + + +def _get_vm_labels(vm: vim.VirtualMachine) -> dict: + """Extract topology labels for a VM. Tolerates missing attributes.""" + try: + name = vm.name or "" + moref = vm._moId or "" + + # Walk the parent chain: VM → ResourcePool/vApp → ComputeResource → host + datacenter = "" + cluster = "" + host_name = "" + + runtime = vm.runtime + if runtime and runtime.host: + h = runtime.host + host_name = h.name if h.name else "" + # parent of HostSystem is ComputeResource or ClusterComputeResource + cr = h.parent + if cr: + cluster = cr.name if cr.name else "" + # parent of ComputeResource is Datacenter or folder chain + node = cr.parent + while node is not None: + if isinstance(node, vim.Datacenter): + datacenter = node.name + break + node = getattr(node, "parent", None) + + return { + "vm_name": name, + "vm_moref": moref, + "cluster": cluster, + "host": host_name, + "datacenter": datacenter, + } + except Exception as exc: + log.debug("Could not extract labels for VM %s: %s", vm, exc) + return { + "vm_name": "", "vm_moref": vm._moId or "", + "cluster": "", "host": "", "datacenter": "", + } + + +# --------------------------------------------------------------------------- +# Performance counter ID cache +# --------------------------------------------------------------------------- + +def _build_counter_map(si: vim.ServiceInstance) -> dict[tuple, int]: + """Return a mapping of (group, name, rollup) -> counter key (integer id).""" + pm = si.RetrieveContent().perfManager + result: dict[tuple, int] = {} + for c in pm.perfCounter: + key = (c.groupInfo.key, c.nameInfo.key, c.rollupType) + result[key] = c.key + log.debug("Loaded %d performance counter definitions", len(result)) + return result + + +def _resolve_counter_ids(counter_map: dict[tuple, int]) -> dict[str, int]: + """Map our human labels to integer counter IDs. Warn on missing counters.""" + resolved: dict[str, int] = {} + for label, triple in config.COUNTERS_WANTED.items(): + cid = counter_map.get(triple) + if cid is None: + log.warning("Counter not found in vCenter: %s -> %s", label, triple) + else: + resolved[label] = cid + return resolved + + +# --------------------------------------------------------------------------- +# QueryPerf batching +# --------------------------------------------------------------------------- + +def _make_perf_query_spec( + vm: vim.VirtualMachine, + counter_ids: list[int], + interval_id: int, +) -> vim.PerformanceManager.QuerySpec: + metrics = [ + vim.PerformanceManager.MetricId(counterId=cid, instance="*") + for cid in counter_ids + ] + return vim.PerformanceManager.QuerySpec( + entity=vm, + metricId=metrics, + intervalId=interval_id, + maxSample=1, # we only need the most recent sample + ) + + +def _query_batch( + pm: vim.PerformanceManager, + vms: list[vim.VirtualMachine], + counter_ids: list[int], + interval_id: int, +) -> list: + """Run QueryPerf for a single batch of VMs. Returns raw EntityMetric list.""" + specs = [_make_perf_query_spec(vm, counter_ids, interval_id) for vm in vms] + try: + results = pm.QueryPerf(querySpec=specs) + return results or [] + except vmodl.fault.InvalidArgument as exc: + log.warning("QueryPerf InvalidArgument for batch of %d VMs: %s", len(vms), exc) + return [] + except Exception as exc: + log.error("QueryPerf error for batch: %s", exc) + raise + + +def _parse_results( + results: list, + label_cache: dict[str, dict], + label_to_counter_id: dict[str, int], +) -> None: + """Parse EntityMetric results and update the global MetricStore.""" + # Build reverse map: counter_id -> label + id_to_label = {v: k for k, v in label_to_counter_id.items()} + + for entity_metric in results: + moref = entity_metric.entity._moId + labels = label_cache.get(moref, {"vm_name": "", "vm_moref": moref, + "cluster": "", "host": "", "datacenter": ""}) + + # Aggregate across all disk instances: sum IOPS/throughput, average latency + sums: dict[str, float] = {} + counts: dict[str, int] = {} + + for series in entity_metric.value: + cid = series.id.counterId + metric_label = id_to_label.get(cid) + if metric_label is None: + continue + if not series.value: + continue + + val = series.value[-1] # most recent sample + if val < 0: # vCenter uses -1 for "no data" + continue + + sums[metric_label] = sums.get(metric_label, 0.0) + val + counts[metric_label] = counts.get(metric_label, 0) + 1 + + if not sums: + continue + + metrics: dict[str, float] = {} + for lbl, total in sums.items(): + if lbl == "disk_write_latency": + # average latency across disk instances + metrics[lbl] = total / counts[lbl] + elif lbl == "disk_write_throughput": + # vCenter returns KB/s; convert to MB/s + metrics[lbl] = total / 1024.0 + else: + metrics[lbl] = total + + store.update(moref, labels, metrics) + + +# --------------------------------------------------------------------------- +# Main collection loop +# --------------------------------------------------------------------------- + +class VCenterCollector: + def __init__(self) -> None: + self._si: Optional[vim.ServiceInstance] = None + self._pm: Optional[vim.PerformanceManager] = None + self._counter_ids: dict[str, int] = {} # label -> int counter id + self._vm_cache: list[vim.VirtualMachine] = [] + self._label_cache: dict[str, dict] = {} # moref -> labels dict + self._vm_cache_ts: float = 0.0 + self._stop_event = threading.Event() + + # ── public API ──────────────────────────────────────────────────────────── + + def start(self) -> None: + t = threading.Thread(target=self._run_loop, name="collector", daemon=True) + t.start() + log.info("Collector thread started") + + def stop(self) -> None: + self._stop_event.set() + + # ── internal ────────────────────────────────────────────────────────────── + + def _ensure_connected(self) -> None: + if self._si is not None: + # Ping the session to detect stale connections + try: + self._si.CurrentTime() + return + except Exception as exc: + log.warning("vCenter session appears stale (%s), reconnecting…", exc) + _disconnect(self._si) + self._si = None + self._pm = None + self._counter_ids = {} + # Force inventory refresh after reconnect + self._vm_cache_ts = 0.0 + + self._si = _connect() + content = self._si.RetrieveContent() + self._pm = content.perfManager + counter_map = _build_counter_map(self._si) + self._counter_ids = _resolve_counter_ids(counter_map) + log.info("Counter IDs resolved: %s", self._counter_ids) + + def _refresh_vm_inventory(self) -> None: + now = time.monotonic() + if now - self._vm_cache_ts < config.VM_INVENTORY_TTL: + return + + log.info("Refreshing VM inventory…") + vms = _get_all_vms(self._si) + self._vm_cache = [ + vm for vm in vms + if vm.runtime and vm.runtime.powerState == vim.VirtualMachinePowerState.poweredOn + ] + self._label_cache = {vm._moId: _get_vm_labels(vm) for vm in self._vm_cache} + self._vm_cache_ts = now + log.info("VM inventory: %d powered-on VMs", len(self._vm_cache)) + + # Remove metrics for VMs no longer in inventory + store.remove_stale(set(self._label_cache.keys())) + + def _collect_once(self) -> None: + t0 = time.monotonic() + self._ensure_connected() + self._refresh_vm_inventory() + + if not self._vm_cache: + log.warning("No powered-on VMs found; skipping QueryPerf") + return + + if not self._counter_ids: + log.error("No performance counter IDs resolved; cannot collect metrics") + return + + counter_id_list = list(self._counter_ids.values()) + total_vms = len(self._vm_cache) + collected = 0 + errors = 0 + + log.info("Starting QueryPerf collection for %d VMs in batches of %d", + total_vms, config.BATCH_SIZE) + + for batch_start in range(0, total_vms, config.BATCH_SIZE): + if self._stop_event.is_set(): + break + + batch = self._vm_cache[batch_start: batch_start + config.BATCH_SIZE] + try: + results = _query_batch( + self._pm, batch, counter_id_list, config.PERF_INTERVAL_ID + ) + _parse_results(results, self._label_cache, self._counter_ids) + collected += len(batch) + except Exception as exc: + errors += len(batch) + log.error("Batch %d–%d failed: %s", + batch_start, batch_start + len(batch), exc) + # Assume session is broken; force reconnect next cycle + _disconnect(self._si) + self._si = None + self._pm = None + self._counter_ids = {} + self._vm_cache_ts = 0.0 + break + + if batch_start + config.BATCH_SIZE < total_vms: + time.sleep(config.BATCH_DELAY) + + elapsed = time.monotonic() - t0 + store.last_collection_time = time.time() + store.last_collection_duration = elapsed + store.last_vm_count = collected + store.last_error = None if errors == 0 else f"{errors} VMs failed" + store.collection_cycles += 1 + + log.info( + "Collection complete: %d/%d VMs, %.1fs elapsed%s", + collected, total_vms, elapsed, + f", {errors} errors" if errors else "", + ) + + def _run_loop(self) -> None: + while not self._stop_event.is_set(): + try: + self._collect_once() + except Exception as exc: + store.last_error = str(exc) + log.exception("Unhandled error in collection cycle: %s", exc) + _disconnect(self._si) + self._si = None + self._pm = None + self._counter_ids = {} + self._vm_cache_ts = 0.0 + + # Sleep until next poll, but wake up early if stopped + self._stop_event.wait(timeout=config.POLL_INTERVAL) + + log.info("Collector thread exiting") + _disconnect(self._si) diff --git a/zroc-planner/config.py b/zroc-planner/config.py new file mode 100644 index 0000000..a27b298 --- /dev/null +++ b/zroc-planner/config.py @@ -0,0 +1,79 @@ +""" +Configuration for the zROC Planner vCenter metrics collector. +All settings are driven by environment variables with sensible defaults. +""" + +import os +import logging + + +def _get_int(key: str, default: int) -> int: + try: + return int(os.environ.get(key, default)) + except (ValueError, TypeError): + return default + + +def _get_float(key: str, default: float) -> float: + try: + return float(os.environ.get(key, default)) + except (ValueError, TypeError): + return default + + +def _get_bool(key: str, default: bool) -> bool: + val = os.environ.get(key, "").strip().lower() + if val in ("1", "true", "yes"): + return True + if val in ("0", "false", "no"): + return False + return default + + +# ── vCenter connection ──────────────────────────────────────────────────────── +VCENTER_HOST: str = os.environ.get("VCENTER_HOST", "vcenter.local") +VCENTER_USER: str = os.environ.get("VCENTER_USER", "administrator@vsphere.local") +VCENTER_PASSWORD: str = os.environ.get("VCENTER_PASSWORD", "") +VCENTER_PORT: int = _get_int("VCENTER_PORT", 443) +VCENTER_SSL_VERIFY: bool = _get_bool("VCENTER_SSL_VERIFY", False) + +# ── collection tuning ───────────────────────────────────────────────────────── +# How often (seconds) the collector runs a full poll cycle +POLL_INTERVAL: int = _get_int("POLL_INTERVAL", 300) + +# Size of each QueryPerf batch (VMware recommends ≤200) +BATCH_SIZE: int = _get_int("BATCH_SIZE", 100) + +# Seconds to sleep between batches to avoid hammering vCenter +BATCH_DELAY: float = _get_float("BATCH_DELAY", 0.5) + +# How often (seconds) to refresh the VM inventory list +VM_INVENTORY_TTL: int = _get_int("VM_INVENTORY_TTL", 600) + +# Rollup interval for performance counters (300 = 5-minute stats) +PERF_INTERVAL_ID: int = _get_int("PERF_INTERVAL_ID", 300) + +# ── HTTP server ─────────────────────────────────────────────────────────────── +HTTP_HOST: str = os.environ.get("HTTP_HOST", "0.0.0.0") +HTTP_PORT: int = _get_int("HTTP_PORT", 9272) + +# ── logging ─────────────────────────────────────────────────────────────────── +LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO").upper() + +# ── counter names we care about ─────────────────────────────────────────────── +# These are the vSphere performance counter names for disk metrics. +# Mapped as: human_label -> (group, name, rollup_type) +COUNTERS_WANTED: dict[str, tuple[str, str, str]] = { + "disk_write_iops": ("virtualDisk", "numberWriteAveraged", "average"), + "disk_write_throughput": ("virtualDisk", "write", "average"), + "disk_write_latency": ("virtualDisk", "totalWriteLatency", "average"), +} + + +def configure_logging() -> None: + level = getattr(logging, LOG_LEVEL, logging.INFO) + logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(name)s: %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + level=level, + ) diff --git a/zroc-planner/requirements.txt b/zroc-planner/requirements.txt new file mode 100644 index 0000000..cea5f10 --- /dev/null +++ b/zroc-planner/requirements.txt @@ -0,0 +1,6 @@ +# vCenter SDK +pyvmomi==8.0.3.0.1 + +# No external Prometheus client needed — we render text format ourselves. +# If you prefer the official client library, uncomment: +# prometheus-client==0.21.1 diff --git a/zroc-planner/server.py b/zroc-planner/server.py new file mode 100644 index 0000000..312e3fc --- /dev/null +++ b/zroc-planner/server.py @@ -0,0 +1,214 @@ +""" +HTTP server for the zROC Planner vCenter metrics collector. + +Exposes: + GET /metrics — Prometheus text exposition format + GET /health — JSON health-check (used by Docker HEALTHCHECK and load balancers) +""" + +import json +import logging +import signal +import sys +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Optional + +import config +import collector as col + +log = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Prometheus text format renderer +# --------------------------------------------------------------------------- + +_METRIC_HELP = { + "vcenter_vm_disk_write_iops": ( + "gauge", + "Virtual disk write IOPS (numberWriteAveraged.average, sum across all disk instances)", + ), + "vcenter_vm_disk_write_throughput_mbps": ( + "gauge", + "Virtual disk write throughput in MB/s (write.average, sum across all disk instances)", + ), + "vcenter_vm_disk_write_latency_ms": ( + "gauge", + "Virtual disk write latency in milliseconds (totalWriteLatency.average, mean across disk instances)", + ), +} + +# Maps our collector metric keys to Prometheus metric names +_METRIC_NAME_MAP = { + "disk_write_iops": "vcenter_vm_disk_write_iops", + "disk_write_throughput": "vcenter_vm_disk_write_throughput_mbps", + "disk_write_latency": "vcenter_vm_disk_write_latency_ms", +} + + +def _escape_label_value(v: str) -> str: + return v.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") + + +def _render_labels(labels: dict) -> str: + parts = [ + f'{k}="{_escape_label_value(str(v))}"' + for k, v in labels.items() + ] + return "{" + ",".join(parts) + "}" + + +def _build_prometheus_output() -> str: + lines: list[str] = [] + snapshot = col.store.snapshot() + + # Emit # HELP / # TYPE headers once per metric name + emitted_headers: set[str] = set() + + for _moref, entry in snapshot.items(): + labels = entry["labels"] + metrics = entry["metrics"] + label_str = _render_labels(labels) + + for col_key, value in metrics.items(): + prom_name = _METRIC_NAME_MAP.get(col_key) + if prom_name is None: + continue + + if prom_name not in emitted_headers: + mtype, mhelp = _METRIC_HELP[prom_name] + lines.append(f"# HELP {prom_name} {mhelp}") + lines.append(f"# TYPE {prom_name} {mtype}") + emitted_headers.add(prom_name) + + lines.append(f"{prom_name}{label_str} {value:.4f}") + + # Collector self-metrics + lines += [ + "# HELP vcenter_collector_last_collection_timestamp_seconds Unix timestamp of the last completed collection cycle", + "# TYPE vcenter_collector_last_collection_timestamp_seconds gauge", + ] + ts = col.store.last_collection_time + lines.append(f"vcenter_collector_last_collection_timestamp_seconds {ts or 0:.0f}") + + lines += [ + "# HELP vcenter_collector_last_collection_duration_seconds Duration of the last collection cycle in seconds", + "# TYPE vcenter_collector_last_collection_duration_seconds gauge", + ] + lines.append(f"vcenter_collector_last_collection_duration_seconds {col.store.last_collection_duration:.3f}") + + lines += [ + "# HELP vcenter_collector_last_vm_count Number of VMs collected in the last cycle", + "# TYPE vcenter_collector_last_vm_count gauge", + ] + lines.append(f"vcenter_collector_last_vm_count {col.store.last_vm_count}") + + lines += [ + "# HELP vcenter_collector_cycles_total Total number of completed collection cycles", + "# TYPE vcenter_collector_cycles_total counter", + ] + lines.append(f"vcenter_collector_cycles_total {col.store.collection_cycles}") + + return "\n".join(lines) + "\n" + + +# --------------------------------------------------------------------------- +# HTTP handler +# --------------------------------------------------------------------------- + +class _Handler(BaseHTTPRequestHandler): + def log_message(self, fmt: str, *args) -> None: + # Route HTTP access log through the standard logger at DEBUG level + log.debug("HTTP %s", fmt % args) + + def do_GET(self) -> None: + path = self.path.split("?")[0] + + if path == "/metrics": + self._serve_metrics() + elif path in ("/health", "/healthz", "/ready"): + self._serve_health() + else: + self._send(404, "text/plain", b"Not Found\n") + + def _serve_metrics(self) -> None: + body = _build_prometheus_output().encode("utf-8") + self._send(200, "text/plain; version=0.0.4; charset=utf-8", body) + + def _serve_health(self) -> None: + now = time.time() + last_ts = col.store.last_collection_time + last_error = col.store.last_error + + # Unhealthy if we've never collected, or the last collection was more + # than 3× the poll interval ago (suggests the loop is hung/dead). + stale_threshold = config.POLL_INTERVAL * 3 + is_stale = last_ts is None or (now - last_ts) > stale_threshold + healthy = not is_stale and last_error is None + + payload = { + "status": "ok" if healthy else "degraded", + "last_collection_time": last_ts, + "last_collection_duration_seconds": col.store.last_collection_duration, + "last_vm_count": col.store.last_vm_count, + "last_error": last_error, + "collection_cycles": col.store.collection_cycles, + "stale": is_stale, + } + body = json.dumps(payload, indent=2).encode("utf-8") + status = 200 if healthy else 503 + self._send(status, "application/json", body) + + def _send(self, code: int, content_type: str, body: bytes) -> None: + self.send_response(code) + self.send_header("Content-Type", content_type) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main() -> None: + config.configure_logging() + log.info("zROC Planner vCenter Collector starting") + log.info( + "Config: vCenter=%s port=%s poll_interval=%ss batch_size=%d", + config.VCENTER_HOST, config.VCENTER_PORT, + config.POLL_INTERVAL, config.BATCH_SIZE, + ) + + # Start the background collection loop + c = col.VCenterCollector() + c.start() + + # Graceful shutdown on SIGTERM / SIGINT + def _shutdown(signum, frame) -> None: + log.info("Caught signal %d, shutting down…", signum) + c.stop() + sys.exit(0) + + signal.signal(signal.SIGTERM, _shutdown) + signal.signal(signal.SIGINT, _shutdown) + + # Start HTTP server (blocking) + httpd = HTTPServer((config.HTTP_HOST, config.HTTP_PORT), _Handler) + log.info("HTTP server listening on %s:%d", config.HTTP_HOST, config.HTTP_PORT) + log.info("Metrics endpoint: http://%s:%d/metrics", config.HTTP_HOST, config.HTTP_PORT) + log.info("Health endpoint: http://%s:%d/health", config.HTTP_HOST, config.HTTP_PORT) + + try: + httpd.serve_forever() + except KeyboardInterrupt: + pass + finally: + c.stop() + httpd.server_close() + log.info("Server stopped") + + +if __name__ == "__main__": + main()