Files
zvml-python-sdk/zvml/vpgs.py
T

1079 lines
49 KiB
Python

# Legal Disclaimer
# This script is an example script and is not supported under any Zerto support program or service.
# The author and Zerto further disclaim all implied warranties including, without limitation,
# any implied warranties of merchantability or of fitness for a particular purpose.
# In no event shall Zerto, its authors or anyone else involved in the creation,
# production or delivery of the scripts be liable for any damages whatsoever (including,
# without limitation, damages for loss of business profits, business interruption, loss of business
# information, or other pecuniary loss) arising out of the use of or the inability to use the sample
# scripts or documentation, even if the author or Zerto has been advised of the possibility of such damages.
# The entire risk arising out of the use or performance of the sample scripts and documentation remains with you.
import requests
import logging
import time
import json
from .tasks import Tasks
from .common import ZertoVPGStatus, ZertoVPGSubstatus, ZertoProtectedSiteType, ZertoRecoverySiteType, ZertoVPGPriority
from .localsite import LocalSite
from typing import Optional, Union, Dict, List
class VPGs:
def __init__(self, client):
self.client = client
self.tasks = Tasks(client)
def list_vpgs(self,
vpg_name: str = None,
vpg_identifier: str = None,
status: ZertoVPGStatus = None,
sub_status: ZertoVPGSubstatus = None,
protected_site_type: ZertoProtectedSiteType = None,
recovery_site_type: ZertoRecoverySiteType = None,
protected_site_identifier: str = None,
recovery_site_identifier: str = None,
organization_name: str = None,
zorg_identifier: str = None,
priority: ZertoVPGPriority = None,
service_profile_identifier: str = None,
backup_enabled: bool = None) -> Dict | List[Dict]:
"""
Get information about VPGs. If vpg_identifier or vpg_name is provided, returns a single VPG.
Otherwise, returns a list of VPGs that match the filter criteria.
Args:
vpg_name: Get a specific VPG by name
vpg_identifier: Get a specific VPG by identifier
status: Filter by VPG status
sub_status: Filter by VPG sub-status
protected_site_type: The protected site type
recovery_site_type: The recovery site type
protected_site_identifier: The identifier of the protected site
recovery_site_identifier: The identifier of the recovery site
organization_name: Filter by ZORG name
zorg_identifier: Filter by ZORG identifier
priority: Filter by VPG priority
service_profile_identifier: Filter by service profile ID
backup_enabled: Deprecated parameter
Returns:
Dict: When vpg_identifier or vpg_name is provided
List[Dict]: When filtering VPGs without specific identifier
"""
# Construct the base URL
if vpg_identifier:
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}"
else:
url = f"https://{self.client.zvm_address}/v1/vpgs"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
# Only include query parameters if we're not getting a specific VPG
params = {}
if not vpg_identifier:
params = {
'name': vpg_name,
'status': status.get_name_by_value(status.value) if status else None,
'subStatus': sub_status.get_name_by_value(sub_status.value) if sub_status else None,
'protectedSiteType': protected_site_type.get_name_by_value(protected_site_type.value) if protected_site_type else None,
'recoverySiteType': recovery_site_type.get_name_by_value(recovery_site_type.value) if recovery_site_type else None,
'protectedSiteIdentifier': protected_site_identifier,
'recoverySiteIdentifier': recovery_site_identifier,
'organizationName': organization_name,
'zorgIdentifier': zorg_identifier,
'priority': priority.get_name_by_value(priority.value) if priority else None,
'serviceProfileIdentifier': service_profile_identifier,
'backupEnabled': backup_enabled
}
# Remove None values from params
params = {k: v for k, v in params.items() if v is not None}
logging.info(f"VPGs.list_vpgs: Fetching VPGs with parameters:")
if vpg_identifier:
logging.info(f" vpg_identifier: {vpg_identifier}")
for key, value in params.items():
logging.info(f" {key}: {value}")
try:
response = requests.get(
url,
headers=headers,
params=params,
verify=self.client.verify_certificate,
timeout=30
)
response.raise_for_status()
result = response.json()
# If we're querying by name, return the first matching VPG
if vpg_name and isinstance(result, list):
matching_vpg = next((vpg for vpg in result if vpg.get("VpgName") == vpg_name), None)
if matching_vpg:
logging.info(f"Successfully retrieved VPG details for {vpg_name}")
return matching_vpg
logging.warning(f"No VPG found with name {vpg_name}")
return {}
if vpg_identifier:
logging.info(f"Successfully retrieved VPG details for {vpg_identifier}")
else:
logging.info(f"Successfully retrieved {len(result)} VPGs")
return result
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def commit_vpg(self, vpg_settings_id, vpg_name, sync=False, expected_status=ZertoVPGStatus.Initializing, timeout=30, interval=5):
logging.info(f'VPGs.commit_vpg(zvm_address={self.client.zvm_address}, vpg_settings_id={vpg_settings_id}, vpg_name={vpg_name}, sync={sync})')
commit_uri = f"https://{self.client.zvm_address}/v1/vpgSettings/{vpg_settings_id}/commit"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
try:
response = requests.post(commit_uri, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
task_id = response.json()
logging.info(f"VPGSettings {vpg_settings_id} successfully committed, {vpg_name} is created, task_id={task_id}")
if sync:
# Wait for task completion
self.tasks.wait_for_task_completion(task_id, timeout=timeout, interval=interval)
logging.debug('sleeping 5 seconds ...') # looks like there is a bug in the Zerto API, so there is a need to sleep for 5 seconds
time.sleep(5)
self.wait_for_vpg_ready(vpg_name=vpg_name, timeout=30, interval=5, expected_status=expected_status)
return task_id
return task_id
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise
def create_vpg(self, basic, journal, recovery, networks, scratch = None, sync=True, status: ZertoVPGStatus = ZertoVPGStatus.Initializing, timeout=30, interval=5):
vpg_name = basic.get("Name")
logging.info(f'VPGs.create_vpg(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, sync={sync})')
vpg_settings_id = self.create_vpg_settings(basic, journal, recovery, networks, scratch, vpg_identifier=None)
return self.commit_vpg(vpg_settings_id, vpg_name, sync, expected_status=status, timeout=timeout, interval=interval)
def wait_for_vpg_ready(self, vpg_name, timeout=180, interval=5, expected_status=ZertoVPGStatus.Initializing):
logging.debug(f'VPGs.wait_for_vpg_ready(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, timeout={timeout}, interval={interval}, expected_status={ZertoVPGStatus.get_name_by_value(expected_status.value)})')
start_time = time.time()
while True:
time.sleep(interval)
vpg_info = self.list_vpgs(vpg_name=vpg_name)
# get status and convert string into enum
logging.debug(f"VPG status: {vpg_info.get('Status')}")
vpg_status: ZertoVPGStatus = ZertoVPGStatus(vpg_info.get("Status"))
logging.debug(f"Checking VPG status for {vpg_name}: Expected status = {ZertoVPGStatus.get_name_by_value(expected_status.value)}, Current status = {ZertoVPGStatus.get_name_by_value(vpg_status.value)}")
# If VPG is in the expected status or passed the Initializing status too quickly and is in another status
if vpg_status == expected_status or (expected_status == ZertoVPGStatus.Initializing and vpg_status.value > ZertoVPGStatus.Initializing.value):
logging.info(f"VPG {vpg_name} is now in the expected state: {ZertoVPGStatus.get_name_by_value(vpg_status.value)}")
return vpg_info
# Check if the timeout has been reached
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
raise TimeoutError(f"VPG {vpg_name} did not reach the {ZertoVPGStatus.get_name_by_value(expected_status.value)} state within the allotted time. Current status: {ZertoVPGStatus.get_name_by_value(vpg_status.value)}")
def add_vm_to_vpg_by_name(self, vpg_name, vm_name):
logging.info(f'VPGs.add_vm_to_vpg_by_name(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, vm_name={vm_name})')
local_site = LocalSite(self.client.zvm_address, self.client.token)
local_site_identifier = local_site.get_local_site()['SiteIdentifier']
vms = self.client.virtualization_sites.get_virtualization_site_vms(site_identifier=local_site_identifier)
vm_dict = {vm.get('VmName'): vm for vm in vms}
if vm_name not in vm_dict:
logging.error(f"VM with name '{vm_name}' not found.")
return
vm_id = vm_dict[vm_name]['VmIdentifier']
vm_payload = {
"VmIdentifier": vm_id
}
return self.add_vm_to_vpg(vpg_name, vm_payload)
def add_vm_to_vpg(self, vpg_name, vm_list_payload):
logging.info(f'VPGs.add_vm_to_vpg(zvm_address={self.client.zvm_address}, vpg_name={vpg_name})')
vpg = self.list_vpgs(vpg_name=vpg_name)
if not vpg:
logging.error(f"VPG with name '{vpg_name}' not found.")
return
vpg_identifier = vpg['VpgIdentifier']
logging.info(f"Found VPG '{vpg_name}' with Identifier: {vpg_identifier}")
new_vpg_settings_id = self.create_vpg_settings(basic=None, journal=None, recovery=None, networks=None, vpg_identifier=vpg_identifier)
logging.info(f"Adding VMs to VPGSettings ID: {new_vpg_settings_id}")
logging.debug(f"VM List Payload: {json.dumps(vm_list_payload, indent=4)}")
vms_uri = f"https://{self.client.zvm_address}/v1/vpgSettings/{new_vpg_settings_id}/vms"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
try:
response = requests.post(vms_uri, headers=headers, json=vm_list_payload, verify=self.client.verify_certificate)
response.raise_for_status()
logging.info(f"Successfully added VMs to VPG {new_vpg_settings_id}.")
self.commit_vpg(new_vpg_settings_id, vpg_name, sync=True, expected_status=ZertoVPGStatus.Initializing)
return
except requests.exceptions.RequestException as e:
if e.response is not None:
try:
error_details = e.response.json()
error_message = error_details.get('Message', '')
# Extract the actual error message after "Exception occurred in API:"
if 'Exception occurred in API:' in error_message:
detailed_error = error_message.split('Exception occurred in API:', 1)[1].strip()
# Remove trailing semicolon if present
detailed_error = detailed_error.rstrip(';')
# Log only the detailed error message
logging.error(f"Failed to add VM to VPG: {detailed_error}")
else:
logging.error(f"Failed to add VM to VPG: {error_message}")
# Log the full error details at debug level for troubleshooting
logging.debug(f"Full error response: {json.dumps(error_details, indent=2)}")
except ValueError:
logging.error(f"Failed to add VM to VPG: {e.response.text}")
else:
logging.error(f"Failed to add VM to VPG: {str(e)}")
raise
except Exception as e:
logging.error(f"Failed to add VM to VPG: {str(e)}")
raise
def remove_vm_from_vpg(self, vpg_name, vm_identifier):
logging.info(f'VPGs.remove_vm_from_vpg(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, vm_identifier={vm_identifier})')
vpg = self.list_vpgs(vpg_name=vpg_name)
if not vpg:
logging.error(f"VPG with name '{vpg_name}' not found.")
return
vpg_id = vpg['VpgIdentifier']
logging.info(f"Found VPG '{vpg_name}' with Identifier: {vpg_id}")
new_vpg_settings_id = self.create_vpg_settings(basic=None, journal=None, recovery=None, networks=None, vpg_identifier=vpg_id)
remove_vm_uri = f"https://{self.client.zvm_address}/v1/vpgSettings/{new_vpg_settings_id}/vms/{vm_identifier}"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
try:
response = requests.delete(remove_vm_uri, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
logging.info(f"VM {vm_identifier} successfully removed from VPG '{vpg_name}' (ID: {new_vpg_settings_id}).")
self.commit_vpg(new_vpg_settings_id, vpg_name, sync=True, expected_status=ZertoVPGStatus.Initializing)
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise
def failover_test(self, vpg_name, checkpoint_identifier=None, vm_name_list=None, sync=True):
"""
Initiate a failover test for a given VPG by its name.
:param vpg_name: The name of the VPG.
:param checkpoint_identifier: checkpoint_identifier can be recived by list_checkpoint, if not provided uses the latest checkpoint.
:param vm_name_list: List of
:param options: Optional parameters for the failover test.
:return: Response from the Zerto API.
"""
logging.info(f'VPGs.failover_test(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, checkpoint_identifier={checkpoint_identifier}, vm_name_list={vm_name_list}, sync={sync})')
# Retrieve the VPG identifier using the VPG name
vpg_info = self.list_vpgs(vpg_name=vpg_name)
vpg_identifier = vpg_info['VpgIdentifier']
logging.debug(f"Found VPG '{vpg_name}' with Identifier: {vpg_identifier}")
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}/FailoverTest"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
payload = {}
if checkpoint_identifier: payload['CheckpointIdentifier'] = checkpoint_identifier
vm_identifier_list = []
if vm_name_list:
for vm in vm_name_list:
vm_info = self.list_vms(vm_name=vm)
if not vm_info:
logging.error (f'failover_test vm={vm} not found')
return
vm_identifier_list.append(vm_info[0]['VmIdentifier'])
payload['VmIdentifiers'] = vm_identifier_list
try:
logging.info(f"Initiating failover test for VPG '{vpg_name}', payload={payload}")
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
response.raise_for_status()
task_id = response.json()
logging.info(f"Failover test initiated for VPG {vpg_name}, task_id = {task_id}")
if sync:
# Wait for task completion
self.tasks.wait_for_task_completion(task_id, timeout=30, interval=5)
return response.json()
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise
def stop_failover_test(self, vpg_name, failoverTestSuccess=True, failoverTestSummary=None, sync=True):
"""
Stop a failover test for a given VPG by its name.
:param vpg_name: The name of the VPG.
:param sync: wait until task is completed.
"""
logging.info(f'VPGs.stop_failover_test(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, sync={sync})')
# Retrieve the VPG identifier using the VPG name
vpg_info = self.list_vpgs(vpg_name=vpg_name)
vpg_identifier = vpg_info['VpgIdentifier']
logging.info(f"Found VPG '{vpg_name}' with Identifier: {vpg_identifier}")
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}/FailoverTestStop"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
body = {
"FailoverTestSuccess": failoverTestSuccess,
"FailoverTestSummary": failoverTestSummary
}
try:
logging.info(f"Stopping failover test for VPG '{vpg_name}'...")
response = requests.post(url, headers=headers, json=body, verify=self.client.verify_certificate)
response.raise_for_status()
task_id = response.json()
logging.info(f"Failover test stopping for VPG {vpg_name}, task_id = {task_id}")
if sync:
# Wait for task completion
self.tasks.wait_for_task_completion(task_id, timeout=30, interval=5)
return response.json()
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise
def rollback_failover(self, vpg_name, sync=True):
"""
Rollback failover for a given VPG by its name.
:param vpg_name: The name of the VPG.
:param sync: wait until task is completed.
"""
logging.info(f'VPGs.rollback_failover(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, sync={sync})')
# Retrieve the VPG identifier using the VPG name
vpg_info = self.list_vpgs(vpg_name=vpg_name)
vpg_identifier = vpg_info['VpgIdentifier']
logging.info(f"Found VPG '{vpg_name}' with Identifier: {vpg_identifier}")
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}/FailoverRollback"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
try:
logging.info(f"Rollback failover for VPG '{vpg_name}'...")
response = requests.post(url, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
task_id = response.json()
logging.info(f"Rollback faolover for VPG {vpg_name}, task_id = {task_id}")
if sync:
# Wait for task completion
self.tasks.wait_for_task_completion(task_id, timeout=30, interval=5)
return response.json()
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise
def delete_vpg(self, vpg_name, force=False, keep_recovery_volumes=True):
"""
Deletes a VPG by its name.
:param vpg_name: The name of the VPG to delete.
:return: Success message if deleted, else an error message.
"""
logging.info(f"VPGs.delete_vpg(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, force={force}, keep_recovery_volumes={keep_recovery_volumes})")
# Step 1: Retrieve the VPG details using the VPG name
vpg = self.list_vpgs(vpg_name=vpg_name)
if not vpg:
logging.error(f"No VPG found with the name '{vpg_name}'.")
return
# Get the VPG Identifier
vpg_identifier = vpg.get("VpgIdentifier")
if not vpg_identifier:
logging.error(f"Could not retrieve Identifier for VPG '{vpg_name}'.")
return
# Step 2: Construct the DELETE request URL
delete_vpg_uri = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
payload = {
"keepRecoveryVolumes": force,
"force": keep_recovery_volumes
}
try:
# Step 3: Send DELETE request
response = requests.delete(delete_vpg_uri, headers=headers, json=payload, verify=self.client.verify_certificate)
response.raise_for_status() # Ensure the request was successful
logging.info(f"Successfully deleted VPG '{vpg_name}' (ID: {vpg_identifier}).")
return f"VPG '{vpg_name}' deleted successfully."
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise
# Added methods from VPGSettings
def list_vpg_settings(self):
url = f"https://{self.client.zvm_address}/v1/vpgs/settings"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
try:
response = requests.get(url, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"Failed to list VPG settings: {e}")
raise
def get_vpg_settings_by_id(self, vpg_settings_id):
url = f"https://{self.client.zvm_address}/v1/vpgSettings/{vpg_settings_id}"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
try:
response = requests.get(url, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"Failed to get VPG settings by ID: {e}")
raise
def update_vpg_settings(self, vpg_settings_id, payload):
url = f"https://{self.client.zvm_address}/v1/vpgSettings/{vpg_settings_id}"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
logging.info(f"VPGs.update_vpg_settings: Updating VPG settings for ID: {vpg_settings_id}")
logging.debug(f"VPGs.update_vpg_settings: Payload: {json.dumps(payload, indent=4)}")
try:
response = requests.put(url, json=payload, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def delete_vpg_settings(self, vpg_settings_id):
url = f"https://{self.client.zvm_address}/v1/vpgs/settings/{vpg_settings_id}"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
try:
response = requests.delete(url, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def create_vpg_settings(self, basic=None, journal=None, recovery=None, networks=None, scratch=None, vpg_identifier=None):
logging.info(f'VPGs.create_vpg_settings(zvm_address={self.client.zvm_address}, vpg_identifier={vpg_identifier})')
vpg_settings_uri = f"https://{self.client.zvm_address}/v1/vpgSettings"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
payload = {}
if vpg_identifier:
payload["vpgIdentifier"] = vpg_identifier
if basic:
payload["Basic"] = basic
if journal:
payload["Journal"] = journal
if recovery:
payload["Recovery"] = recovery
if networks:
payload["Networks"] = networks
if scratch:
payload["Scratch"] = scratch
logging.debug(f"VPGs.create_vpg_settings: Payload: {json.dumps(payload, indent=4)}")
try:
response = requests.post(vpg_settings_uri, headers=headers, json=payload, verify=self.client.verify_certificate)
response.raise_for_status()
vpg_settings_id = response.json()
logging.info(f"VPG Settings ID: {vpg_settings_id} created")
return vpg_settings_id
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise
def list_checkpoints(self, vpg_name, start_date=None, endd_date=None, checkpoint_date_str=None, latest=None):
"""
Fetches a list of checkpoints for a specified Virtual Protection Group (VPG).
Parameters:
vpg_name (str): The name of the Virtual Protection Group (VPG) to fetch checkpoints for.
start_date (str): The start date for filtering checkpoints, in ISO 8601 format (e.g., '2024-11-13T00:00:00Z').
endd_date (str): The end date for filtering checkpoints, in ISO 8601 format (e.g., '2024-11-14T00:00:00Z').
checkpoint_date_str (str): A specific date string in the format 'Month Day, Year HH:MM:SS AM/PM'
(e.g., 'November 13, 2024 1:43:02 PM') to search for an exact checkpoint.
latest (bool): If True, returns the checkpoint with the most recent timestamp.
Returns:
dict: A single checkpoint that matches `checkpoint_date_str` or the latest checkpoint if `latest=True`.
list: The full list of checkpoints if neither `checkpoint_date_str` nor `latest` is specified.
None: If no matching checkpoint is found or an error occurs.
Raises:
SystemExit: If a request exception occurs during the API call.
"""
logging.info(f'VPGs.list_checkpoints(vpg_name={vpg_name}, start_date={start_date}, endd_date={endd_date}, checkpoint_date_str={checkpoint_date_str}, latest={latest})')
vpgid = (self.list_vpgs(vpg_name=vpg_name))['VpgIdentifier']
vpgs_uri = f"https://{self.client.zvm_address}/v1/vpgs/{vpgid}/checkpoints"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
params = {
"startDate": start_date,
"endDate": endd_date
}
try:
response = requests.get(vpgs_uri, headers=headers, params=params, verify=self.client.verify_certificate)
response.raise_for_status()
checkpoints = response.json()
if not checkpoints:
logging.warning("No checkpoints found.")
return []
if checkpoint_date_str:
check_point_timestamp = self.__convert_datetime_to_timestamp(date_str = checkpoint_date_str)
matching_checkpoints = next((checkpoint for checkpoint in checkpoints if checkpoint.get("TimeStamp") == check_point_timestamp), None)
if not check_point_timestamp:
logging.warning(f"No checkpoint {checkpoint_date_str} found")
return {}
return matching_checkpoints
if latest:
# Find the checkpoint with the most recent timestamp
latest_checkpoint = max(checkpoints, key=lambda x: x.get("TimeStamp"))
logging.debug(f"Latest checkpoint found: {latest_checkpoint}")
return latest_checkpoint
return checkpoints
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def create_checkpoint(self, checkpoint_name: str, vpg_identifier: str = None, vpg_name: str = None) -> str:
"""
Create a tagged checkpoint for the VPG.
Args:
checkpoint_name: The name/tag to assign to the checkpoint
vpg_identifier: The identifier of the VPG
vpg_name: The name of the VPG (alternative to vpg_identifier)
Returns:
str: The task identifier that can be used to monitor the operation
Raises:
requests.exceptions.RequestException: If the API request fails
ValueError: If neither vpg_identifier nor vpg_name is provided, or if VPG name is not found
"""
if not vpg_identifier and not vpg_name:
raise ValueError("Either vpg_identifier or vpg_name must be provided")
# If vpg_name is provided, get the vpg_identifier
if vpg_name and not vpg_identifier:
vpg = self.list_vpgs(vpg_name=vpg_name)
if not vpg:
raise ValueError(f"VPG with name '{vpg_name}' not found")
vpg_identifier = vpg.get('VpgIdentifier')
logging.info(f"Found VPG identifier '{vpg_identifier}' for VPG name '{vpg_name}'")
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}/checkpoints"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
data = {
"CheckpointName": checkpoint_name
}
logging.info(f"VPGs.create_checkpoint: Creating checkpoint '{checkpoint_name}' for VPG {vpg_identifier}")
try:
response = requests.post(
url,
headers=headers,
json=data,
verify=self.client.verify_certificate,
timeout=30
)
response.raise_for_status()
task_id = response.json()
logging.info(f"Successfully initiated checkpoint creation, task_id={task_id}")
return task_id
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def export_vpg_settings(self, vpg_names: List[str] = None) -> dict:
"""
Export settings for specified VPGs.
Args:
vpg_names: Optional list of VPG names to export settings for. If not provided, exports all.
Returns:
dict: The exported VPG settings in the format:
{
"timeStamp": "2025-02-08T21:50:46.574Z",
"exportResult": {
"result": str,
"message": str
}
}
Raises:
requests.exceptions.RequestException: If the API request fails
"""
url = f"https://{self.client.zvm_address}/v1/vpgSettings/exportSettings"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
if vpg_names:
payload = {
"vpgNames": vpg_names
}
else:
payload = {}
logging.info(f"VPGs.export_vpg_settings: Exporting settings for VPGs: {vpg_names if vpg_names else 'ALL'}")
try:
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
response.raise_for_status()
result = response.json()
logging.info(f"Successfully exported settings for {len(vpg_names) if vpg_names else 'ALL'} VPGs at {result.get('timeStamp')}")
logging.debug(f"Export result: {json.dumps(result, indent=2)}")
return result
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def list_exported_vpg_settings(self) -> List[Dict]:
"""
Get all available exported settings files.
Returns:
List[Dict]: List of exported settings files in the format:
[
{
"timeStamp": "2025-02-08T22:02:18.685Z"
}
]
Raises:
requests.exceptions.RequestException: If the API request fails
"""
url = f"https://{self.client.zvm_address}/v1/vpgSettings/exportedSettings"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
logging.debug("Fetching list of exported VPG settings")
try:
response = requests.get(url, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
result = response.json()
logging.info(f"Found {len(result)} exported settings files")
logging.debug(f"Exported settings list: {json.dumps(result, indent=2)}")
return result
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def read_exported_vpg_settings(self, timestamp: str, vpg_names: List[str] = None) -> dict:
"""
Read exported settings from a file of given timestamp.
Args:
timestamp: The timestamp of the exported settings file (format: YYYY-MM-DDThh:mm:ss.SSSZ)
vpg_names: Optional list of VPG names to filter the exported settings
Returns:
dict: The exported VPG settings containing:
- ExportedVpgSettingsApi: List[dict] - List of VPG settings
- ErrorMessage: str - Error message if any
Raises:
requests.exceptions.RequestException: If the API request fails
"""
url = f"https://{self.client.zvm_address}/v1/vpgSettings/exportedSettings/{timestamp}"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
payload = {}
if vpg_names:
payload['vpgNames'] = vpg_names
logging.info(f"VPGs.read_exported_vpg_settings: Reading exported VPG settings for timestamp: {timestamp}")
if vpg_names:
logging.debug(f"Filtering for VPGs: {vpg_names}")
try:
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
response.raise_for_status()
result = response.json()
logging.debug(f"VPGs.read_exported_vpg_settings: result: {json.dumps(result, indent=4)}")
return result
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def import_vpg_settings(self, settings: Dict) -> dict:
"""
Import VPG settings.
Args:
settings: Dictionary containing the VPG settings to import. Must include:
- ExportedVpgSettingsApi: List of VPG settings with detailed configuration
Returns:
dict: The import result containing:
- validationFailedResults: List[dict] - VPGs that failed validation
- vpgName: str - Name of the VPG
- errorMessages: List[str] - List of validation error messages
- importFailedResults: List[dict] - VPGs that failed to import
- vpgName: str - Name of the VPG
- errorMessage: str - Import error message
- importTaskIdentifiers: List[dict] - Successfully initiated imports
- vpgName: str - Name of the VPG
- taskIdentifier: str - Task ID for tracking the import
Raises:
requests.exceptions.RequestException: If the API request fails
ValueError: If settings dictionary is missing required fields
"""
url = f"https://{self.client.zvm_address}/v1/vpgSettings/import"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
# Validate input settings
if not isinstance(settings, dict):
raise ValueError("Settings must be a dictionary")
if 'ExportedVpgSettingsApi' not in settings:
raise ValueError("Settings must contain 'ExportedVpgSettingsApi' key")
# Prepare payload
payload = {
"ExportedVpgSettingsApi": settings['ExportedVpgSettingsApi']
}
logging.info(f"VPGs.import_vpg_settings: Importing settings for {len(settings['ExportedVpgSettingsApi'])} VPGs")
try:
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
response.raise_for_status()
result = response.json()
logging.debug(f"VPGs.import_vpg_settings: result: {json.dumps(result, indent=4)}")
# Log validation failures
if result.get('validationFailedResults'):
for failure in result['validationFailedResults']:
logging.error(f"Validation failed for VPG '{failure['vpgName']}': {', '.join(failure['errorMessages'])}")
# Log import failures
if result.get('importFailedResults'):
for failure in result['importFailedResults']:
logging.error(f"Import failed for VPG '{failure['vpgName']}': {failure['errorMessage']}")
# Log successful imports
if result.get('importTaskIdentifiers'):
for task in result['importTaskIdentifiers']:
logging.info(f"Import initiated for VPG '{task['vpgName']}' with task ID: {task['taskIdentifier']}")
logging.debug(f"Import result: {json.dumps(result, indent=2)}")
return result
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise
def list_vpgs_from_exported_settings(self, time_stamp: str) -> List[Dict]:
"""List VPGs from exported settings.
Args:
time_stamp: Timestamp of the exported settings
Returns:
List[Dict]: List of VPGs in the format:
[
{
"VpgName": "test",
"SourceSiteName": "Production",
"TargetSiteName": "DR"
}
]
"""
url = f"https://{self.client.zvm_address}/v1/vpgSettings/exportedSettings/{time_stamp}/vpgsinfo"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.client.token}'
}
logging.info(f"VPGs.list_vpgs_from_exported_settings: Fetching VPGs from export {time_stamp}")
try:
response = requests.get(url, headers=headers, verify=self.client.verify_certificate)
response.raise_for_status()
result = response.json()
logging.info(f"Successfully retrieved {len(result)} VPGs from export {time_stamp}")
logging.debug(f"VPGs from export: {json.dumps(result, indent=2)}")
return result
except requests.exceptions.RequestException as e:
if e.response is not None:
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
try:
error_details = e.response.json()
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
except ValueError:
logging.error(f"Response content: {e.response.text}")
else:
logging.error("HTTPError occurred with no response attached.")
raise