Files
Zerto_Exporter/app/zvma10/zvma.py
T
2023-12-19 12:18:46 -05:00

834 lines
32 KiB
Python

import atexit
import threading
import ssl
import json
import os
import time
import logging
import socket
import requests
import urllib3
from urllib3.exceptions import InsecureRequestWarning
from urllib.parse import urlencode
from urllib.parse import urlparse
from time import sleep
from typing import List, Dict, Tuple, Union, Any, Optional
from requests.structures import CaseInsensitiveDict
from logging.handlers import RotatingFileHandler
from posthog import Posthog
import uuid
from requests import Request, Session
from .version import VERSION
class zvmsite:
def __init__(self, host, username=None, password=None, port: int = 443, verify_ssl: bool = False, client_id="zerto-client", client_secret=None, grant_type="password", loglevel="debug", stats: bool = True):
self.stats = stats
self.host = host
self.port = port
self.username = username
self.password = password
self.verify_ssl = verify_ssl
self.base_url = f"https://{self.host}:{self.port}"
if not self.verify_ssl:
# Disable ssl warnings if verify is set to false.
urllib3.disable_warnings(InsecureRequestWarning)
self.client_id = client_id
self.client_secret = client_secret
self.grant_type = grant_type
self.__auththread__ = None
self.__version__ = VERSION
self.token = None
self.expiresIn = 0
self.token_expire_time = None
self.site_id = None
self.site_name = None
self.site_type = None
self.site_type_version = None
self.zvm_version = dict(full=None, major=None, minor=None, update=None, patch=None)
self.__user_agent_string__ = f"zerto_python_sdk_jpaul"
self.apiheader = CaseInsensitiveDict()
self.apiheader["Accept"] = "application/json"
self.apiheader['User-Agent'] = self.__user_agent_string__
self.__connected__ = False
self._running = False
self.LOGLEVEL = loglevel.upper()
self.setup_logging()
atexit.register(self.disconnect)
self._running = True
# Get UUID
self.uuid = self.load_or_generate_uuid()
# Posthog stats setup
if self.stats:
self.setup_posthog()
self.posthog.capture(self.uuid, 'ZVMA10 Python Module Loaded')
self.log.debug("Sent PostHog Hook")
def __authhandler__(self):
self.log.info(f"Log Level set to {self.LOGLEVEL}")
if not self.__connected__:
context = ssl.create_default_context()
if not self.verify_ssl:
self.log.debug("Disabling SSL verification")
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
retries = 0
while self._running:
if self.expiresIn < 30:
self.log.debug(f"Authenticating to the server: {self.host}")
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/x-www-form-urlencoded"
data = {
"grant_type": self.grant_type,
"client_id": self.client_id
}
if self.grant_type == "client_credentials":
data["client_secret"] = self.client_secret
else:
data["username"] = self.username
data["password"] = self.password
uri = self.construct_url(path="auth/realms/zerto/protocol/openid-connect/token")
response = self.make_api_request("POST", uri, data=data, headers=headers)
if response and 'access_token' in response and 'expires_in' in response:
self.token = str(response['access_token'])
self.apiheader["Authorization"] = "Bearer " + self.token
self.expiresIn = int(response['expires_in'])
self.log.info("Authentication successful")
self.__connected__ = True
else:
self.log.error("Authentication failed")
sleep(2 ** retries)
retries += 1
else:
sleep(10)
self.expiresIn -= 10
else:
self.log.info("Authentication thread is already running")
print(f"Auth thread already running")
def is_authenticated(self) -> bool:
# Assuming self.token is the authentication token and it's set upon successful authentication
# and self.__connected__ is a boolean indicating the connection status
return self.token is not None and self.__connected__
def setup_logging(self) -> None:
container_id = str(socket.gethostname())
log_formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(threadName)s;%(message)s", "%Y-%m-%d %H:%M:%S")
log_handler = RotatingFileHandler(filename=f"./logs/Log-{container_id}.log", maxBytes=1024*1024*100, backupCount=5)
log_handler.setFormatter(log_formatter)
self.log = logging.getLogger("Node-Exporter")
self.log.setLevel(self.LOGLEVEL)
self.log.addHandler(log_handler)
def __redact__(self, data) -> str:
sensitive_keys = ["password", "secret", "token"] # Add any other keys that need redaction
redacted_data = {}
for key, value in data.items():
if key in sensitive_keys:
redacted_data[key] = "********"
else:
redacted_data[key] = value
return redacted_data
def load_or_generate_uuid(self) -> uuid.uuid4:
uuid_path = 'uuid.txt'
if os.path.exists(uuid_path):
with open(uuid_path, 'r') as file:
saved_uuid = file.read().strip()
try:
return str(uuid.UUID(saved_uuid))
except ValueError:
pass # Invalid UUID, generate a new one below
new_uuid = str(uuid.uuid4())
with open(uuid_path, 'w') as file:
file.write(new_uuid)
return new_uuid
def setup_posthog(self) -> None:
self.posthog = Posthog(project_api_key='phc_HflqUkx9majhzm8DZva8pTwXFRnOn99onA9xPpK5HaQ', host='https://posthog.jpaul.io')
self.posthog.debug = True
self.posthog.identify(distinct_id=self.uuid)
def construct_url(self, path="", params=None) -> str:
full_url = f"{self.base_url}/{path}"
if params:
query_string = urlencode({k: str(v) for k, v in params.items() if v is not None})
full_url = f"{full_url}?{query_string}"
return full_url
def deconstruct_url(self, url) -> Tuple[str, str]:
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
path = parsed_url.path
return base_url, path
def make_api_request(self, method, url, data=None, json_data=None, headers=None, timeout=3, test=None) -> Optional[Union[Dict[str, Any], str]]:
try:
headers = headers or {}
start_time = time.time() # Record the start time
if method == "PUT":
# Create a Request object
headers['Content-Type'] = 'application/json'
data = json.dumps(json_data)
req = Request(method, url, data=data, headers=headers)
# Prepare the request
prepared_req = req.prepare()
# Print the prepared request details
self.log.debug("Prepared Request:")
self.log.debug(f"URL: {prepared_req.url}")
self.log.debug(f"Method: {prepared_req.method}")
self.log.debug(f"Headers: {prepared_req.headers}")
self.log.debug(f"Body: {prepared_req.body}")
# Send the request using a Session
with Session() as s:
response = s.send(prepared_req, verify=self.verify_ssl)
# Print the response
self.log.debug(f"Response Status Code: {response.status_code}")
self.log.debug(response.text)
elif json_data is not None:
# If json_data is provided, serialize it as JSON and set the appropriate header
serialized_data = json.dumps(json_data)
headers['Content-Type'] = 'application/json'
self.log.debug(f"API Request using JSON Body: {serialized_data}")
response = requests.request(method, url, data=serialized_data, headers=headers, timeout=timeout, verify=self.verify_ssl)
else:
# If json_data is not provided, use data as-is
if data:
self.log.debug(f"API Request using Form/Data Body: {self.__redact__(data)}")
response = requests.request(method, url, data=data, headers=headers, timeout=timeout, verify=self.verify_ssl)
end_time = time.time()
elapsed_time_ms = (end_time - start_time) * 1000
response.raise_for_status()
self.log.debug(f'API Request: {method} - {url}')
# Posthog stats setup
if self.stats:
temp_base, temp_path = self.deconstruct_url(url)
self.posthog.capture( self.uuid, 'API REQUEST',
{
"url": temp_base,
"port": self.port,
"endpoint": temp_path,
"method": method,
"response_time_ms": int(elapsed_time_ms),
"verify_ssl": self.verify_ssl,
"grant_type": self.grant_type,
"status_code": str(response.status_code),
"sdk_version": self.__version__
})
self.log.debug("Sent PostHog Hook")
return response.json()
except requests.exceptions.RequestException as e:
self.log.error(f"Error while sending API request: {e}")
if e.response:
self.log.error(f"Response content: {e.response.text}")
return None
def connect(self) -> None:
if (self.__auththread__ is None) or (not self.__auththread__.is_alive()):
self._running = True
self.__auththread__ = threading.Thread(target=self.__authhandler__, daemon=True)
self.__auththread__.start()
self.log.info(f"Starting authentication thread {self.__auththread__.ident}")
else:
self.log.info("Already connected to the ZVM")
def disconnect(self) -> None:
self.log.debug("Disconnecting")
self._running = False
if self.__auththread__ and self.__auththread__.is_alive():
self.__auththread__.join(timeout=5)
def alert(self, alertidentifier=None) -> Dict[str, Any]:
if alertidentifier is None:
self.log.error("Alert identifier is required for get_vpg function.")
raise ValueError("Alert identifier is required.")
params = {
}
uri = self.construct_url(f"v1/alerts/{alertidentifier}", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def alert_dismiss(self, alertidentifier=None) -> bool:
if alertidentifier is None:
self.log.error("Alert identifier is required for alert_dismiss function.")
raise ValueError("Alert identifier is required.")
params = {}
uri = self.construct_url(f"v1/alerts/{alertidentifier}/dismiss", params)
try:
response = self.make_api_request("POST", uri, headers=self.apiheader)
# Check if the response status code is 200 (OK)
if response.status_code == 200:
return True
else:
# Log and raise an exception for any non-200 status codes
self.log.error(f"Failed to dismiss alert: {response.status_code}")
response.raise_for_status()
except requests.exceptions.RequestException as e:
self.log.error(f"Error while sending dismiss alert request: {e}")
raise
return False # Return False if the try block didn't execute successfully
def alert_undismiss(self, alertidentifier=None) -> bool:
if alertidentifier is None:
self.log.error("Alert identifier is required for alert_undismiss function.")
raise ValueError("Alert identifier is required.")
params = {}
uri = self.construct_url(f"v1/alerts/{alertidentifier}/undismiss", params)
try:
response = self.make_api_request("POST", uri, headers=self.apiheader)
# Check if the response status code is 200 (OK)
if response.status_code == 200:
return True
else:
# Log and raise an exception for any non-200 status codes
self.log.error(f"Failed to undismiss alert: {response.status_code}")
response.raise_for_status()
except requests.exceptions.RequestException as e:
self.log.error(f"Error while sending undismiss alert request: {e}")
raise
return False # Return False if the try block didn't execute successfully
def alert_levels(self) -> List[str]:
params = {
}
uri = self.construct_url(f"v1/alerts/levels", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def alert_entities(self) -> List[str]:
params = {
}
uri = self.construct_url(f"v1/alerts/entities", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def alert_helpidentifiers(self) -> List[str]:
params = {
}
uri = self.construct_url(f"v1/alerts/helpidentifiers", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def alerts(self, startdate=None, enddate=None, vpgid=None, zorgidentifier=None, level=None,
entity=None, helpidentifier=None, isdismissed: bool = None) -> List[Dict[str, Any]]:
params = {
'startdate': startdate,
'enddate': enddate,
'vpgid': vpgid,
'zorgidentifier': zorgidentifier,
'level': level,
'entity': entity,
'helpidentifier': helpidentifier,
'isdismissed': isdismissed
}
uri = self.construct_url("v1/alerts", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def datastore(self, datastoreidentifier=None):
if datastoreidentifier is None:
self.log.error("Datastore identifier is required for get_datastore function.")
raise ValueError("datastore identifier is required.")
params = {
}
uri = self.construct_url(f"v1/datastores/{datastoreidentifier}", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def datastores(self, datadtoreidentifier=None) -> List[Dict[str, Any]]:
params = {
}
uri = self.construct_url("v1/datastores", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def encryptiondetection_enable(self):
params = {
"encryptionDetectionEnabled": True
}
uri = self.construct_url("v1/encryptionDetection/state", params)
return self.make_api_request("POST", uri, headers=self.apiheader)
def encryptiondetection_disable(self):
params = {
"encryptionDetectionEnabled": False
}
uri = self.construct_url("v1/encryptionDetection/state", params)
return self.make_api_request("POST", uri, headers=self.apiheader)
def encryptiondetection_status(self):
params = {}
uri = self.construct_url("v1/encryptionDetection/state", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def encryptiondetection_metrics_vms(self):
params = {}
uri = self.construct_url("v1/encryptionDetection/metrics/vms", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def encryptiondetection_metrics_volumes(self):
params = {}
uri = self.construct_url("v1/encryptionDetection/metrics/volumes", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def encryptiondetection_metrics_vpgs(self):
params = {}
uri = self.construct_url("v1/encryptionDetection/metrics/vpgs", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def encryptiondetection_suspected_vms(self):
params = {}
uri = self.construct_url("v1/encryptionDetection/suspected/vms", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def encryptiondetection_suspected_volumes(self):
params = {}
uri = self.construct_url("v1/encryptionDetection/suspected/volumes", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def encryptiondetection_suspected_vpgs(self):
params = {}
uri = self.construct_url("v1/encryptionDetection/suspected/vpgs", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def event(self, eventidentifier=None):
if eventidentifier is None:
self.log.error("Event identifier is required for get event function.")
raise ValueError("Event identifier is required.")
params = {
}
uri = self.construct_url(f"v1/events/{eventidentifier}", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def event_types(self):
params = {
}
uri = self.construct_url(f"v1/events/types", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def event_entities(self):
params = {
}
uri = self.construct_url(f"v1/events/entities", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def event_categories(self):
params = {
}
uri = self.construct_url(f"v1/events/categories", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def events(self, startdate=None, enddate=None, vpgid=None, sitename=None, zorgidentifier=None, eventtype=None,
entitytype=None, category=None, username=None, alertidentifier=None) -> List[Dict[str, Any]]:
params = {
'startdate': startdate,
'enddate': enddate,
'vpgid': vpgid,
'sitename': sitename,
'zorgidentifier': zorgidentifier,
'eventtype': eventtype,
'entitytype': entitytype,
'category': category,
'username': username,
'alertidentifier': alertidentifier
}
uri = self.construct_url("v1/events", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def local_site(self):
params = {
}
uri = self.construct_url(f"v1/localsite", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def local_site_pairing_statues(self):
params = {
}
uri = self.construct_url(f"v1/localsite/pairingstatuses", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def local_site_send_billing(self):
params = {
}
uri = self.construct_url(f"v1/localsite/settings/sendusage", params)
return self.make_api_request("POST", uri, headers=self.apiheader)
def local_site_banner(self):
params = {
}
# uri is spelled incorrectly because it is also spelled incorrectly in zerto
uri = self.construct_url(f"v1/localsite/settings/logingbanner", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def local_site_banner_update(self, enabled: bool = None, loginbanner = None):
params = {
}
data = {
"isLoginBannerEnabled": enabled,
"loginBanner": loginbanner
}
# uri is spelled incorrectly because it is also spelled incorrectly in zerto
uri = self.construct_url(f"v1/localsite/settings/logingbanner", params)
return self.make_api_request("PUT", uri, json_data=data, headers=self.apiheader)
def license(self):
params = {
}
uri = self.construct_url(f"v1/license", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def license_delete(self):
params = {
}
uri = self.construct_url(f"v1/license", params)
return self.make_api_request("DELETE", uri, headers=self.apiheader)
def license_apply(self, license=None):
if license is None:
self.log.error("A license key is required for apply license function.")
raise ValueError("License key is required.")
params = {
}
license = {
"licenseKey": license
}
uri = self.construct_url(f"v1/license", params)
return self.make_api_request("PUT", uri, json_data=license, headers=self.apiheader)
def peer_sites(self) -> List[Dict[str, Any]]:
params = {
}
uri = self.construct_url(f"v1/peersites", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def peer_site(self, siteidentifier=None):
if siteidentifier is None:
self.log.error("Site identifier is required for get site function.")
raise ValueError("Site identifier is required.")
params = {
}
uri = self.construct_url(f"v1/peersites/{siteidentifier}", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def peer_sites_pairing_statues(self):
params = {
}
uri = self.construct_url(f"v1/peersites/pairingstatuses", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def peer_site_add(self, hostname=None, port=None, token=None):
missing_params = [param for param, value in [('hostname', hostname), ('port', port), ('token', token)] if value is None]
if missing_params:
missing_params_str = ", ".join(missing_params)
error_message = f"Missing required parameter(s): {missing_params_str} for pair site function."
self.log.error(error_message)
raise ValueError(error_message)
params = {}
data = {
"hostname": hostname,
"port": port,
"token": token
}
uri = self.construct_url(f"v1/peersites", params)
return self.make_api_request("POST", uri, json_data=data, headers=self.apiheader)
def peer_site_delete(self, siteidentifier=None, keepdisks: bool = True):
if siteidentifier is None:
self.log.error("Site identifier is required for delete site function.")
raise ValueError("Site identifier is required.")
params = {}
data = {
"iskeeptargetdisks": keepdisks
}
uri = self.construct_url(f"v1/peersites/{siteidentifier}", params)
return self.make_api_request("DELETE", uri, json=data, headers=self.apiheader)
def peer_site_pairing_token(self):
params = {}
uri = self.construct_url(f"v1/peersites/generatetoken", params)
return self.make_api_request("POST", uri, headers=self.apiheader)
def tasks(self, startedbeforedate=None, startedafterdate=None, completedbeforedate=None, completedafterdate=None, tasktype=None, status=None) -> List[Dict[str, Any]]:
params = {
'startedbeforedate': startedbeforedate,
'startedafterdate': startedafterdate,
'completedbeforedate': completedbeforedate,
'completedafterdate': completedafterdate,
'type': tasktype,
'status': status
}
uri = self.construct_url("v1/tasks", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def task(self, taskidentifier=None):
if taskidentifier is None:
self.log.error("Task identifier is required for function.")
raise ValueError("Task identifier is required.")
params = {}
uri = self.construct_url(f"v1/tasks/{taskidentifier}", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def task_types(self):
params = {
}
uri = self.construct_url(f"v1/tasks/types", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def vpg(self, vpgidentifier=None):
if vpgidentifier is None:
self.log.error("Vpg identifier is required for get_vpg function.")
raise ValueError("VM identifier is required.")
params = {
}
uri = self.construct_url(f"v1/vpgs/{vpgidentifier}", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def vpgs(self, vpgid=None, vpgname=None, vpgstatus=None, vpgsubstatus=None, protectedsitetype=None,
recoverysitetype=None, protectedsiteidentifier=None, recoverysiteidentifier=None,
zorgidentifier=None, priority=None, serviceprofileidentifier=None) -> List[Dict[str, Any]]:
params = {
'vpgid': vpgid,
'vpgname': vpgname,
'vpgstatus': vpgstatus,
'vpgsubstatus': vpgsubstatus,
'protectedsitetype': protectedsitetype,
'recoverysitetype': recoverysitetype,
'protectedsiteidentifier': protectedsiteidentifier,
'recoverysiteidentifier': recoverysiteidentifier,
'zorgidentifier': zorgidentifier,
'priority': priority,
'serviceprofileidentifier': serviceprofileidentifier
}
uri = self.construct_url("v1/vpgs", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def vpg_delete(self, vpgidentifier=None, keeprecoveryvolumes=True, force=True):
if vpgidentifier is None:
self.log.error("VPG identifier is required for delete_vpg function.")
raise ValueError("VPG identifier is required.")
# URL with vpgidentifier in the path
uri = self.construct_url(f"v1/vpgs/{vpgidentifier}")
# Data to be sent in the request body
data = {
"keepRecoveryVolumes": keeprecoveryvolumes,
"force": force
}
# Make the POST request
return self.make_api_request("POST", uri, data=data, headers=self.apiheader)
def vms(self, vmidentifier=None, vmname=None, vpgstatus=None, vpgsubstatus=None, protectedsitetype=None,
recoverysitetype=None, protectedsiteidentifier=None, recoverysiteidentifier=None,
zorgname=None, priority=None, includebackupvms: bool = None, includemountedvms: bool = None) -> List[Dict[str, Any]]:
params = {
'vmidentifier': vmidentifier,
'vmname': vmname,
'vpgstatus': vpgstatus,
'vpgsubstatus': vpgsubstatus,
'protectedsitetype': protectedsitetype,
'recoverysitetype': recoverysitetype,
'protectedsiteidentifier': protectedsiteidentifier,
'recoverysiteidentifier': recoverysiteidentifier,
'zorgname': zorgname,
'priority': priority,
'includebackupvms': includebackupvms,
'includemountedvms': includemountedvms
}
uri = self.construct_url("v1/vms", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def vm(self, vmidentifier=None, vpgidentifier=None, includebackupvms: bool = None, includemountedvms: bool = None):
if vmidentifier is None:
self.log.error("VM identifier is required for get_vm function.")
raise ValueError("VM identifier is required.")
params = {
'vpgidentifier': vpgidentifier,
'includebackupvms': includebackupvms,
'includemountedvms': includemountedvms
}
uri = self.construct_url(f"v1/vms/{vmidentifier}", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def vm_pointintime(self, vmidentifier=None, vpgidentifier=None, includebackupvms: bool = None, includemountedvms: bool = None):
if vmidentifier is None:
self.log.error("VM identifier is required for get_vm function.")
raise ValueError("VM identifier is required.")
params = {
'vpgidentifier': vpgidentifier,
'includebackupvms': includebackupvms,
'includemountedvms': includemountedvms
}
uri = self.construct_url(f"v1/vms/{vmidentifier}/pointsintime", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def volumes(self, volumetype=None, vpgidentifier=None, datastoreidentifier=None, protectedvmidentifier=None, owningvmidentifier=None) -> List[Dict[str, Any]]:
if volumetype:
valid_volumetypes = ["scratch", "journal", "recovery", "protected", "appliance"]
# Convert volumetype to lowercase for case-insensitive comparison
volumetype_lower = volumetype.lower()
if volumetype_lower not in valid_volumetypes:
raise ValueError(f"Invalid volumetype: {volumetype}. Must be one of {', '.join(valid_volumetypes)}")
params = {
'volumetype': volumetype,
'vpgidentifier': vpgidentifier,
'datastoreidentifier': datastoreidentifier,
'protectedvmidentifier': protectedvmidentifier,
'owningvmidentifier': owningvmidentifier
}
uri = self.construct_url("v1/volumes", params)
return self.make_api_request("GET", uri, headers=self.apiheader)
def __set_zvm_version__(self):
uri = self.construct_url("v1/localsite")
response = self.make_api_request("GET", uri, headers=self.apiheader)
if response:
self.site_id = str(response.get('SiteIdentifier', ''))
self.site_name = str(response.get('SiteName', ''))
self.zvm_version['full'] = str(response.get('Version', ''))
self.site_type_version = str(response.get('SiteTypeVersion', ''))
self.site_type = str(response.get('SiteType', ''))
# Break out ZVM version strings
version_parts = self.zvm_version['full'].split(".")
if len(version_parts) >= 3:
self.zvm_version['major'], self.zvm_version['minor'], temp = version_parts
self.zvm_version['update'] = temp[0]
self.zvm_version['patch'] = temp[1] if len(temp) > 1 else "0"
self.log.info(f"Site ID: {self.site_id}, Site Name: {self.site_name}, Site Type: {self.site_type}")
def version(self):
if self.__connected__ and self._running:
if self.zvm_version['full'] is None:
self.__set_zvm_version__()
return self.zvm_version
else:
return "Error: Not Connected to ZVM"