mirror of
https://github.com/recklessop/zroc.git
synced 2026-07-03 05:23:13 -04:00
0f988fa449
Extend collector to pull total provisioned virtual disk capacity per VM using VirtualDisk device enumeration, and expose it as a Prometheus gauge. Used by the zroc-ui Planner page for mirror volume storage estimates. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
449 lines
16 KiB
Python
449 lines
16 KiB
Python
"""
|
||
vCenter metrics collector for zROC Planner.
|
||
|
||
Collects virtual-disk I/O statistics from vCenter using pyvmomi's
|
||
QueryPerf API and publishes them as Prometheus gauges.
|
||
|
||
Design highlights
|
||
-----------------
|
||
* VM inventory is cached and refreshed every VM_INVENTORY_TTL seconds,
|
||
so normal polling never touches the inventory API.
|
||
* Performance counter IDs are looked up once and cached for the life of
|
||
the session.
|
||
* VMs are queried in batches (BATCH_SIZE) with a short sleep between
|
||
batches so vCenter is never flooded.
|
||
* On any vCenter API error the session is torn down and re-established
|
||
before the next poll cycle.
|
||
"""
|
||
|
||
import logging
|
||
import threading
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
|
||
from pyVim.connect import SmartConnect, Disconnect
|
||
from pyVmomi import vim, vmodl
|
||
import ssl
|
||
|
||
import config
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Prometheus metric storage
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class MetricStore:
|
||
"""Thread-safe store for the latest per-VM disk metrics."""
|
||
|
||
def __init__(self) -> None:
|
||
self._lock = threading.Lock()
|
||
# key: vm_moref value: dict of label + metric values
|
||
self._data: dict[str, dict] = {}
|
||
self.last_collection_time: Optional[float] = None
|
||
self.last_collection_duration: float = 0.0
|
||
self.last_vm_count: int = 0
|
||
self.last_error: Optional[str] = None
|
||
self.collection_cycles: int = 0
|
||
|
||
def update(self, vm_moref: str, labels: dict, metrics: dict) -> None:
|
||
with self._lock:
|
||
self._data[vm_moref] = {"labels": labels, "metrics": metrics}
|
||
|
||
def clear(self) -> None:
|
||
with self._lock:
|
||
self._data.clear()
|
||
|
||
def snapshot(self) -> dict:
|
||
with self._lock:
|
||
return dict(self._data)
|
||
|
||
def remove_stale(self, active_morefs: set[str]) -> None:
|
||
with self._lock:
|
||
stale = [k for k in self._data if k not in active_morefs]
|
||
for k in stale:
|
||
del self._data[k]
|
||
|
||
|
||
# Module-level store shared with server.py
|
||
store = MetricStore()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# vCenter session helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_ssl_context() -> ssl.SSLContext:
|
||
ctx = ssl.create_default_context()
|
||
if not config.VCENTER_SSL_VERIFY:
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
return ctx
|
||
|
||
|
||
def _connect() -> vim.ServiceInstance:
|
||
log.info("Connecting to vCenter %s:%s as %s",
|
||
config.VCENTER_HOST, config.VCENTER_PORT, config.VCENTER_USER)
|
||
si = SmartConnect(
|
||
host=config.VCENTER_HOST,
|
||
user=config.VCENTER_USER,
|
||
pwd=config.VCENTER_PASSWORD,
|
||
port=config.VCENTER_PORT,
|
||
sslContext=_build_ssl_context(),
|
||
connectionPoolTimeout=60,
|
||
)
|
||
log.info("Connected to vCenter (session established)")
|
||
return si
|
||
|
||
|
||
def _disconnect(si: Optional[vim.ServiceInstance]) -> None:
|
||
if si is not None:
|
||
try:
|
||
Disconnect(si)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Inventory helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _get_all_vms(si: vim.ServiceInstance) -> list[vim.VirtualMachine]:
|
||
"""Return every VM object in the inventory using a ContainerView."""
|
||
content = si.RetrieveContent()
|
||
container = content.viewManager.CreateContainerView(
|
||
content.rootFolder, [vim.VirtualMachine], True
|
||
)
|
||
try:
|
||
vms = list(container.view)
|
||
finally:
|
||
container.Destroy()
|
||
return vms
|
||
|
||
|
||
def _get_vm_labels(vm: vim.VirtualMachine) -> dict:
|
||
"""Extract topology labels for a VM. Tolerates missing attributes."""
|
||
try:
|
||
name = vm.name or ""
|
||
moref = vm._moId or ""
|
||
|
||
# Walk the parent chain: VM → ResourcePool/vApp → ComputeResource → host
|
||
datacenter = ""
|
||
cluster = ""
|
||
host_name = ""
|
||
|
||
runtime = vm.runtime
|
||
if runtime and runtime.host:
|
||
h = runtime.host
|
||
host_name = h.name if h.name else ""
|
||
# parent of HostSystem is ComputeResource or ClusterComputeResource
|
||
cr = h.parent
|
||
if cr:
|
||
cluster = cr.name if cr.name else ""
|
||
# parent of ComputeResource is Datacenter or folder chain
|
||
node = cr.parent
|
||
while node is not None:
|
||
if isinstance(node, vim.Datacenter):
|
||
datacenter = node.name
|
||
break
|
||
node = getattr(node, "parent", None)
|
||
|
||
return {
|
||
"vm_name": name,
|
||
"vm_moref": moref,
|
||
"cluster": cluster,
|
||
"host": host_name,
|
||
"datacenter": datacenter,
|
||
}
|
||
except Exception as exc:
|
||
log.debug("Could not extract labels for VM %s: %s", vm, exc)
|
||
return {
|
||
"vm_name": "", "vm_moref": vm._moId or "",
|
||
"cluster": "", "host": "", "datacenter": "",
|
||
}
|
||
|
||
|
||
def _get_vm_provisioned_gb(vm: vim.VirtualMachine) -> float:
|
||
"""Return total provisioned virtual disk capacity in GB for a VM."""
|
||
total_kb = 0
|
||
try:
|
||
for device in vm.config.hardware.device:
|
||
if isinstance(device, vim.vm.device.VirtualDisk):
|
||
total_kb += device.capacityInKB
|
||
except Exception as exc:
|
||
log.debug("Could not read disk capacity for VM %s: %s", vm._moId, exc)
|
||
return total_kb / (1024 * 1024) # KB -> GB
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Performance counter ID cache
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_counter_map(si: vim.ServiceInstance) -> dict[tuple, int]:
|
||
"""Return a mapping of (group, name, rollup) -> counter key (integer id)."""
|
||
pm = si.RetrieveContent().perfManager
|
||
result: dict[tuple, int] = {}
|
||
for c in pm.perfCounter:
|
||
key = (c.groupInfo.key, c.nameInfo.key, c.rollupType)
|
||
result[key] = c.key
|
||
log.debug("Loaded %d performance counter definitions", len(result))
|
||
return result
|
||
|
||
|
||
def _resolve_counter_ids(counter_map: dict[tuple, int]) -> dict[str, int]:
|
||
"""Map our human labels to integer counter IDs. Warn on missing counters."""
|
||
resolved: dict[str, int] = {}
|
||
for label, triple in config.COUNTERS_WANTED.items():
|
||
cid = counter_map.get(triple)
|
||
if cid is None:
|
||
log.warning("Counter not found in vCenter: %s -> %s", label, triple)
|
||
else:
|
||
resolved[label] = cid
|
||
return resolved
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# QueryPerf batching
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _make_perf_query_spec(
|
||
vm: vim.VirtualMachine,
|
||
counter_ids: list[int],
|
||
interval_id: int,
|
||
) -> vim.PerformanceManager.QuerySpec:
|
||
metrics = [
|
||
vim.PerformanceManager.MetricId(counterId=cid, instance="*")
|
||
for cid in counter_ids
|
||
]
|
||
return vim.PerformanceManager.QuerySpec(
|
||
entity=vm,
|
||
metricId=metrics,
|
||
intervalId=interval_id,
|
||
maxSample=1, # we only need the most recent sample
|
||
)
|
||
|
||
|
||
def _query_batch(
|
||
pm: vim.PerformanceManager,
|
||
vms: list[vim.VirtualMachine],
|
||
counter_ids: list[int],
|
||
interval_id: int,
|
||
) -> list:
|
||
"""Run QueryPerf for a single batch of VMs. Returns raw EntityMetric list."""
|
||
specs = [_make_perf_query_spec(vm, counter_ids, interval_id) for vm in vms]
|
||
try:
|
||
results = pm.QueryPerf(querySpec=specs)
|
||
return results or []
|
||
except vmodl.fault.InvalidArgument as exc:
|
||
log.warning("QueryPerf InvalidArgument for batch of %d VMs: %s", len(vms), exc)
|
||
return []
|
||
except Exception as exc:
|
||
log.error("QueryPerf error for batch: %s", exc)
|
||
raise
|
||
|
||
|
||
def _parse_results(
|
||
results: list,
|
||
label_cache: dict[str, dict],
|
||
label_to_counter_id: dict[str, int],
|
||
provisioned_gb_cache: Optional[dict[str, float]] = None,
|
||
) -> None:
|
||
"""Parse EntityMetric results and update the global MetricStore."""
|
||
# Build reverse map: counter_id -> label
|
||
id_to_label = {v: k for k, v in label_to_counter_id.items()}
|
||
|
||
for entity_metric in results:
|
||
moref = entity_metric.entity._moId
|
||
labels = label_cache.get(moref, {"vm_name": "", "vm_moref": moref,
|
||
"cluster": "", "host": "", "datacenter": ""})
|
||
|
||
# Aggregate across all disk instances: sum IOPS/throughput, average latency
|
||
sums: dict[str, float] = {}
|
||
counts: dict[str, int] = {}
|
||
|
||
for series in entity_metric.value:
|
||
cid = series.id.counterId
|
||
metric_label = id_to_label.get(cid)
|
||
if metric_label is None:
|
||
continue
|
||
if not series.value:
|
||
continue
|
||
|
||
val = series.value[-1] # most recent sample
|
||
if val < 0: # vCenter uses -1 for "no data"
|
||
continue
|
||
|
||
sums[metric_label] = sums.get(metric_label, 0.0) + val
|
||
counts[metric_label] = counts.get(metric_label, 0) + 1
|
||
|
||
if not sums:
|
||
continue
|
||
|
||
metrics: dict[str, float] = {}
|
||
for lbl, total in sums.items():
|
||
if lbl == "disk_write_latency":
|
||
# average latency across disk instances
|
||
metrics[lbl] = total / counts[lbl]
|
||
elif lbl == "disk_write_throughput":
|
||
# vCenter returns KB/s; convert to MB/s
|
||
metrics[lbl] = total / 1024.0
|
||
else:
|
||
metrics[lbl] = total
|
||
|
||
if provisioned_gb_cache:
|
||
metrics["disk_provisioned_gb"] = provisioned_gb_cache.get(moref, 0.0)
|
||
|
||
store.update(moref, labels, metrics)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main collection loop
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class VCenterCollector:
|
||
def __init__(self) -> None:
|
||
self._si: Optional[vim.ServiceInstance] = None
|
||
self._pm: Optional[vim.PerformanceManager] = None
|
||
self._counter_ids: dict[str, int] = {} # label -> int counter id
|
||
self._vm_cache: list[vim.VirtualMachine] = []
|
||
self._label_cache: dict[str, dict] = {} # moref -> labels dict
|
||
self._vm_cache_ts: float = 0.0
|
||
self._stop_event = threading.Event()
|
||
|
||
# ── public API ────────────────────────────────────────────────────────────
|
||
|
||
def start(self) -> None:
|
||
t = threading.Thread(target=self._run_loop, name="collector", daemon=True)
|
||
t.start()
|
||
log.info("Collector thread started")
|
||
|
||
def stop(self) -> None:
|
||
self._stop_event.set()
|
||
|
||
# ── internal ──────────────────────────────────────────────────────────────
|
||
|
||
def _ensure_connected(self) -> None:
|
||
if self._si is not None:
|
||
# Ping the session to detect stale connections
|
||
try:
|
||
self._si.CurrentTime()
|
||
return
|
||
except Exception as exc:
|
||
log.warning("vCenter session appears stale (%s), reconnecting…", exc)
|
||
_disconnect(self._si)
|
||
self._si = None
|
||
self._pm = None
|
||
self._counter_ids = {}
|
||
# Force inventory refresh after reconnect
|
||
self._vm_cache_ts = 0.0
|
||
|
||
self._si = _connect()
|
||
content = self._si.RetrieveContent()
|
||
self._pm = content.perfManager
|
||
counter_map = _build_counter_map(self._si)
|
||
self._counter_ids = _resolve_counter_ids(counter_map)
|
||
log.info("Counter IDs resolved: %s", self._counter_ids)
|
||
|
||
def _refresh_vm_inventory(self) -> None:
|
||
now = time.monotonic()
|
||
if now - self._vm_cache_ts < config.VM_INVENTORY_TTL:
|
||
return
|
||
|
||
log.info("Refreshing VM inventory…")
|
||
vms = _get_all_vms(self._si)
|
||
self._vm_cache = [
|
||
vm for vm in vms
|
||
if vm.runtime and vm.runtime.powerState == vim.VirtualMachinePowerState.poweredOn
|
||
]
|
||
self._label_cache = {vm._moId: _get_vm_labels(vm) for vm in self._vm_cache}
|
||
self._provisioned_gb_cache: dict[str, float] = {
|
||
vm._moId: _get_vm_provisioned_gb(vm) for vm in self._vm_cache
|
||
}
|
||
self._vm_cache_ts = now
|
||
log.info("VM inventory: %d powered-on VMs", len(self._vm_cache))
|
||
|
||
# Remove metrics for VMs no longer in inventory
|
||
store.remove_stale(set(self._label_cache.keys()))
|
||
|
||
def _collect_once(self) -> None:
|
||
t0 = time.monotonic()
|
||
self._ensure_connected()
|
||
self._refresh_vm_inventory()
|
||
|
||
if not self._vm_cache:
|
||
log.warning("No powered-on VMs found; skipping QueryPerf")
|
||
return
|
||
|
||
if not self._counter_ids:
|
||
log.error("No performance counter IDs resolved; cannot collect metrics")
|
||
return
|
||
|
||
counter_id_list = list(self._counter_ids.values())
|
||
total_vms = len(self._vm_cache)
|
||
collected = 0
|
||
errors = 0
|
||
|
||
log.info("Starting QueryPerf collection for %d VMs in batches of %d",
|
||
total_vms, config.BATCH_SIZE)
|
||
|
||
for batch_start in range(0, total_vms, config.BATCH_SIZE):
|
||
if self._stop_event.is_set():
|
||
break
|
||
|
||
batch = self._vm_cache[batch_start: batch_start + config.BATCH_SIZE]
|
||
try:
|
||
results = _query_batch(
|
||
self._pm, batch, counter_id_list, config.PERF_INTERVAL_ID
|
||
)
|
||
_parse_results(results, self._label_cache, self._counter_ids,
|
||
self._provisioned_gb_cache)
|
||
collected += len(batch)
|
||
except Exception as exc:
|
||
errors += len(batch)
|
||
log.error("Batch %d–%d failed: %s",
|
||
batch_start, batch_start + len(batch), exc)
|
||
# Assume session is broken; force reconnect next cycle
|
||
_disconnect(self._si)
|
||
self._si = None
|
||
self._pm = None
|
||
self._counter_ids = {}
|
||
self._vm_cache_ts = 0.0
|
||
break
|
||
|
||
if batch_start + config.BATCH_SIZE < total_vms:
|
||
time.sleep(config.BATCH_DELAY)
|
||
|
||
elapsed = time.monotonic() - t0
|
||
store.last_collection_time = time.time()
|
||
store.last_collection_duration = elapsed
|
||
store.last_vm_count = collected
|
||
store.last_error = None if errors == 0 else f"{errors} VMs failed"
|
||
store.collection_cycles += 1
|
||
|
||
log.info(
|
||
"Collection complete: %d/%d VMs, %.1fs elapsed%s",
|
||
collected, total_vms, elapsed,
|
||
f", {errors} errors" if errors else "",
|
||
)
|
||
|
||
def _run_loop(self) -> None:
|
||
while not self._stop_event.is_set():
|
||
try:
|
||
self._collect_once()
|
||
except Exception as exc:
|
||
store.last_error = str(exc)
|
||
log.exception("Unhandled error in collection cycle: %s", exc)
|
||
_disconnect(self._si)
|
||
self._si = None
|
||
self._pm = None
|
||
self._counter_ids = {}
|
||
self._vm_cache_ts = 0.0
|
||
|
||
# Sleep until next poll, but wake up early if stopped
|
||
self._stop_event.wait(timeout=config.POLL_INTERVAL)
|
||
|
||
log.info("Collector thread exiting")
|
||
_disconnect(self._si)
|