Files
zroc/zroc-planner/collector.py
T
2026-04-12 19:35:31 -04:00

429 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
vCenter metrics collector for zROC Planner.
Collects virtual-disk I/O statistics from vCenter using pyvmomi's
QueryPerf API and publishes them as Prometheus gauges.
Design highlights
-----------------
* VM inventory is cached and refreshed every VM_INVENTORY_TTL seconds,
so normal polling never touches the inventory API.
* Performance counter IDs are looked up once and cached for the life of
the session.
* VMs are queried in batches (BATCH_SIZE) with a short sleep between
batches so vCenter is never flooded.
* On any vCenter API error the session is torn down and re-established
before the next poll cycle.
"""
import logging
import threading
import time
from datetime import datetime, timezone
from typing import Optional
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim, vmodl
import ssl
import config
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Prometheus metric storage
# ---------------------------------------------------------------------------
class MetricStore:
"""Thread-safe store for the latest per-VM disk metrics."""
def __init__(self) -> None:
self._lock = threading.Lock()
# key: vm_moref value: dict of label + metric values
self._data: dict[str, dict] = {}
self.last_collection_time: Optional[float] = None
self.last_collection_duration: float = 0.0
self.last_vm_count: int = 0
self.last_error: Optional[str] = None
self.collection_cycles: int = 0
def update(self, vm_moref: str, labels: dict, metrics: dict) -> None:
with self._lock:
self._data[vm_moref] = {"labels": labels, "metrics": metrics}
def clear(self) -> None:
with self._lock:
self._data.clear()
def snapshot(self) -> dict:
with self._lock:
return dict(self._data)
def remove_stale(self, active_morefs: set[str]) -> None:
with self._lock:
stale = [k for k in self._data if k not in active_morefs]
for k in stale:
del self._data[k]
# Module-level store shared with server.py
store = MetricStore()
# ---------------------------------------------------------------------------
# vCenter session helpers
# ---------------------------------------------------------------------------
def _build_ssl_context() -> ssl.SSLContext:
ctx = ssl.create_default_context()
if not config.VCENTER_SSL_VERIFY:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
def _connect() -> vim.ServiceInstance:
log.info("Connecting to vCenter %s:%s as %s",
config.VCENTER_HOST, config.VCENTER_PORT, config.VCENTER_USER)
si = SmartConnect(
host=config.VCENTER_HOST,
user=config.VCENTER_USER,
pwd=config.VCENTER_PASSWORD,
port=config.VCENTER_PORT,
sslContext=_build_ssl_context(),
connectionPoolTimeout=60,
)
log.info("Connected to vCenter (session established)")
return si
def _disconnect(si: Optional[vim.ServiceInstance]) -> None:
if si is not None:
try:
Disconnect(si)
except Exception:
pass
# ---------------------------------------------------------------------------
# Inventory helpers
# ---------------------------------------------------------------------------
def _get_all_vms(si: vim.ServiceInstance) -> list[vim.VirtualMachine]:
"""Return every VM object in the inventory using a ContainerView."""
content = si.RetrieveContent()
container = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
try:
vms = list(container.view)
finally:
container.Destroy()
return vms
def _get_vm_labels(vm: vim.VirtualMachine) -> dict:
"""Extract topology labels for a VM. Tolerates missing attributes."""
try:
name = vm.name or ""
moref = vm._moId or ""
# Walk the parent chain: VM → ResourcePool/vApp → ComputeResource → host
datacenter = ""
cluster = ""
host_name = ""
runtime = vm.runtime
if runtime and runtime.host:
h = runtime.host
host_name = h.name if h.name else ""
# parent of HostSystem is ComputeResource or ClusterComputeResource
cr = h.parent
if cr:
cluster = cr.name if cr.name else ""
# parent of ComputeResource is Datacenter or folder chain
node = cr.parent
while node is not None:
if isinstance(node, vim.Datacenter):
datacenter = node.name
break
node = getattr(node, "parent", None)
return {
"vm_name": name,
"vm_moref": moref,
"cluster": cluster,
"host": host_name,
"datacenter": datacenter,
}
except Exception as exc:
log.debug("Could not extract labels for VM %s: %s", vm, exc)
return {
"vm_name": "", "vm_moref": vm._moId or "",
"cluster": "", "host": "", "datacenter": "",
}
# ---------------------------------------------------------------------------
# Performance counter ID cache
# ---------------------------------------------------------------------------
def _build_counter_map(si: vim.ServiceInstance) -> dict[tuple, int]:
"""Return a mapping of (group, name, rollup) -> counter key (integer id)."""
pm = si.RetrieveContent().perfManager
result: dict[tuple, int] = {}
for c in pm.perfCounter:
key = (c.groupInfo.key, c.nameInfo.key, c.rollupType)
result[key] = c.key
log.debug("Loaded %d performance counter definitions", len(result))
return result
def _resolve_counter_ids(counter_map: dict[tuple, int]) -> dict[str, int]:
"""Map our human labels to integer counter IDs. Warn on missing counters."""
resolved: dict[str, int] = {}
for label, triple in config.COUNTERS_WANTED.items():
cid = counter_map.get(triple)
if cid is None:
log.warning("Counter not found in vCenter: %s -> %s", label, triple)
else:
resolved[label] = cid
return resolved
# ---------------------------------------------------------------------------
# QueryPerf batching
# ---------------------------------------------------------------------------
def _make_perf_query_spec(
vm: vim.VirtualMachine,
counter_ids: list[int],
interval_id: int,
) -> vim.PerformanceManager.QuerySpec:
metrics = [
vim.PerformanceManager.MetricId(counterId=cid, instance="*")
for cid in counter_ids
]
return vim.PerformanceManager.QuerySpec(
entity=vm,
metricId=metrics,
intervalId=interval_id,
maxSample=1, # we only need the most recent sample
)
def _query_batch(
pm: vim.PerformanceManager,
vms: list[vim.VirtualMachine],
counter_ids: list[int],
interval_id: int,
) -> list:
"""Run QueryPerf for a single batch of VMs. Returns raw EntityMetric list."""
specs = [_make_perf_query_spec(vm, counter_ids, interval_id) for vm in vms]
try:
results = pm.QueryPerf(querySpec=specs)
return results or []
except vmodl.fault.InvalidArgument as exc:
log.warning("QueryPerf InvalidArgument for batch of %d VMs: %s", len(vms), exc)
return []
except Exception as exc:
log.error("QueryPerf error for batch: %s", exc)
raise
def _parse_results(
results: list,
label_cache: dict[str, dict],
label_to_counter_id: dict[str, int],
) -> None:
"""Parse EntityMetric results and update the global MetricStore."""
# Build reverse map: counter_id -> label
id_to_label = {v: k for k, v in label_to_counter_id.items()}
for entity_metric in results:
moref = entity_metric.entity._moId
labels = label_cache.get(moref, {"vm_name": "", "vm_moref": moref,
"cluster": "", "host": "", "datacenter": ""})
# Aggregate across all disk instances: sum IOPS/throughput, average latency
sums: dict[str, float] = {}
counts: dict[str, int] = {}
for series in entity_metric.value:
cid = series.id.counterId
metric_label = id_to_label.get(cid)
if metric_label is None:
continue
if not series.value:
continue
val = series.value[-1] # most recent sample
if val < 0: # vCenter uses -1 for "no data"
continue
sums[metric_label] = sums.get(metric_label, 0.0) + val
counts[metric_label] = counts.get(metric_label, 0) + 1
if not sums:
continue
metrics: dict[str, float] = {}
for lbl, total in sums.items():
if lbl == "disk_write_latency":
# average latency across disk instances
metrics[lbl] = total / counts[lbl]
elif lbl == "disk_write_throughput":
# vCenter returns KB/s; convert to MB/s
metrics[lbl] = total / 1024.0
else:
metrics[lbl] = total
store.update(moref, labels, metrics)
# ---------------------------------------------------------------------------
# Main collection loop
# ---------------------------------------------------------------------------
class VCenterCollector:
def __init__(self) -> None:
self._si: Optional[vim.ServiceInstance] = None
self._pm: Optional[vim.PerformanceManager] = None
self._counter_ids: dict[str, int] = {} # label -> int counter id
self._vm_cache: list[vim.VirtualMachine] = []
self._label_cache: dict[str, dict] = {} # moref -> labels dict
self._vm_cache_ts: float = 0.0
self._stop_event = threading.Event()
# ── public API ────────────────────────────────────────────────────────────
def start(self) -> None:
t = threading.Thread(target=self._run_loop, name="collector", daemon=True)
t.start()
log.info("Collector thread started")
def stop(self) -> None:
self._stop_event.set()
# ── internal ──────────────────────────────────────────────────────────────
def _ensure_connected(self) -> None:
if self._si is not None:
# Ping the session to detect stale connections
try:
self._si.CurrentTime()
return
except Exception as exc:
log.warning("vCenter session appears stale (%s), reconnecting…", exc)
_disconnect(self._si)
self._si = None
self._pm = None
self._counter_ids = {}
# Force inventory refresh after reconnect
self._vm_cache_ts = 0.0
self._si = _connect()
content = self._si.RetrieveContent()
self._pm = content.perfManager
counter_map = _build_counter_map(self._si)
self._counter_ids = _resolve_counter_ids(counter_map)
log.info("Counter IDs resolved: %s", self._counter_ids)
def _refresh_vm_inventory(self) -> None:
now = time.monotonic()
if now - self._vm_cache_ts < config.VM_INVENTORY_TTL:
return
log.info("Refreshing VM inventory…")
vms = _get_all_vms(self._si)
self._vm_cache = [
vm for vm in vms
if vm.runtime and vm.runtime.powerState == vim.VirtualMachinePowerState.poweredOn
]
self._label_cache = {vm._moId: _get_vm_labels(vm) for vm in self._vm_cache}
self._vm_cache_ts = now
log.info("VM inventory: %d powered-on VMs", len(self._vm_cache))
# Remove metrics for VMs no longer in inventory
store.remove_stale(set(self._label_cache.keys()))
def _collect_once(self) -> None:
t0 = time.monotonic()
self._ensure_connected()
self._refresh_vm_inventory()
if not self._vm_cache:
log.warning("No powered-on VMs found; skipping QueryPerf")
return
if not self._counter_ids:
log.error("No performance counter IDs resolved; cannot collect metrics")
return
counter_id_list = list(self._counter_ids.values())
total_vms = len(self._vm_cache)
collected = 0
errors = 0
log.info("Starting QueryPerf collection for %d VMs in batches of %d",
total_vms, config.BATCH_SIZE)
for batch_start in range(0, total_vms, config.BATCH_SIZE):
if self._stop_event.is_set():
break
batch = self._vm_cache[batch_start: batch_start + config.BATCH_SIZE]
try:
results = _query_batch(
self._pm, batch, counter_id_list, config.PERF_INTERVAL_ID
)
_parse_results(results, self._label_cache, self._counter_ids)
collected += len(batch)
except Exception as exc:
errors += len(batch)
log.error("Batch %d%d failed: %s",
batch_start, batch_start + len(batch), exc)
# Assume session is broken; force reconnect next cycle
_disconnect(self._si)
self._si = None
self._pm = None
self._counter_ids = {}
self._vm_cache_ts = 0.0
break
if batch_start + config.BATCH_SIZE < total_vms:
time.sleep(config.BATCH_DELAY)
elapsed = time.monotonic() - t0
store.last_collection_time = time.time()
store.last_collection_duration = elapsed
store.last_vm_count = collected
store.last_error = None if errors == 0 else f"{errors} VMs failed"
store.collection_cycles += 1
log.info(
"Collection complete: %d/%d VMs, %.1fs elapsed%s",
collected, total_vms, elapsed,
f", {errors} errors" if errors else "",
)
def _run_loop(self) -> None:
while not self._stop_event.is_set():
try:
self._collect_once()
except Exception as exc:
store.last_error = str(exc)
log.exception("Unhandled error in collection cycle: %s", exc)
_disconnect(self._si)
self._si = None
self._pm = None
self._counter_ids = {}
self._vm_cache_ts = 0.0
# Sleep until next poll, but wake up early if stopped
self._stop_event.wait(timeout=config.POLL_INTERVAL)
log.info("Collector thread exiting")
_disconnect(self._si)