1041 lines
47 KiB
Python
1041 lines
47 KiB
Python
# Legal Disclaimer
|
|
# This script is an example script and is not supported under any Zerto support program or service.
|
|
# The author and Zerto further disclaim all implied warranties including, without limitation,
|
|
# any implied warranties of merchantability or of fitness for a particular purpose.
|
|
# In no event shall Zerto, its authors or anyone else involved in the creation,
|
|
# production or delivery of the scripts be liable for any damages whatsoever (including,
|
|
# without limitation, damages for loss of business profits, business interruption, loss of business
|
|
# information, or other pecuniary loss) arising out of the use of or the inability to use the sample
|
|
# scripts or documentation, even if the author or Zerto has been advised of the possibility of such damages.
|
|
# The entire risk arising out of the use or performance of the sample scripts and documentation remains with you.
|
|
|
|
import requests
|
|
import logging
|
|
import time
|
|
import json
|
|
from .tasks import Tasks
|
|
from .common import ZertoVPGStatus, ZertoVPGSubstatus, ZertoProtectedSiteType, ZertoRecoverySiteType, ZertoVPGPriority
|
|
from typing import Optional, Union, Dict, List
|
|
|
|
class VPGs:
|
|
def __init__(self, client):
|
|
self.client = client
|
|
self.tasks = Tasks(client)
|
|
|
|
def list_vpgs(self,
|
|
vpg_name: str = None,
|
|
vpg_identifier: str = None,
|
|
status: ZertoVPGStatus = None,
|
|
sub_status: ZertoVPGSubstatus = None,
|
|
protected_site_type: ZertoProtectedSiteType = None,
|
|
recovery_site_type: ZertoRecoverySiteType = None,
|
|
protected_site_identifier: str = None,
|
|
recovery_site_identifier: str = None,
|
|
organization_name: str = None,
|
|
zorg_identifier: str = None,
|
|
priority: ZertoVPGPriority = None,
|
|
service_profile_identifier: str = None,
|
|
backup_enabled: bool = None) -> Dict | List[Dict]:
|
|
"""
|
|
Get information about VPGs. If vpg_identifier or vpg_name is provided, returns a single VPG.
|
|
Otherwise, returns a list of VPGs that match the filter criteria.
|
|
|
|
Args:
|
|
vpg_name: Get a specific VPG by name
|
|
vpg_identifier: Get a specific VPG by identifier
|
|
status: Filter by VPG status
|
|
sub_status: Filter by VPG sub-status
|
|
protected_site_type: The protected site type
|
|
recovery_site_type: The recovery site type
|
|
protected_site_identifier: The identifier of the protected site
|
|
recovery_site_identifier: The identifier of the recovery site
|
|
organization_name: Filter by ZORG name
|
|
zorg_identifier: Filter by ZORG identifier
|
|
priority: Filter by VPG priority
|
|
service_profile_identifier: Filter by service profile ID
|
|
backup_enabled: Deprecated parameter
|
|
|
|
Returns:
|
|
Dict: When vpg_identifier or vpg_name is provided
|
|
List[Dict]: When filtering VPGs without specific identifier
|
|
"""
|
|
# Construct the base URL
|
|
if vpg_identifier:
|
|
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}"
|
|
else:
|
|
url = f"https://{self.client.zvm_address}/v1/vpgs"
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
# Only include query parameters if we're not getting a specific VPG
|
|
params = {}
|
|
if not vpg_identifier:
|
|
params = {
|
|
'name': vpg_name,
|
|
'status': status.get_name_by_value(status.value) if status else None,
|
|
'subStatus': sub_status.get_name_by_value(sub_status.value) if sub_status else None,
|
|
'protectedSiteType': protected_site_type.get_name_by_value(protected_site_type.value) if protected_site_type else None,
|
|
'recoverySiteType': recovery_site_type.get_name_by_value(recovery_site_type.value) if recovery_site_type else None,
|
|
'protectedSiteIdentifier': protected_site_identifier,
|
|
'recoverySiteIdentifier': recovery_site_identifier,
|
|
'organizationName': organization_name,
|
|
'zorgIdentifier': zorg_identifier,
|
|
'priority': priority.get_name_by_value(priority.value) if priority else None,
|
|
'serviceProfileIdentifier': service_profile_identifier,
|
|
'backupEnabled': backup_enabled
|
|
}
|
|
# Remove None values from params
|
|
params = {k: v for k, v in params.items() if v is not None}
|
|
|
|
logging.info(f"VPGs.list_vpgs: Fetching VPGs with parameters:")
|
|
if vpg_identifier:
|
|
logging.info(f" vpg_identifier: {vpg_identifier}")
|
|
for key, value in params.items():
|
|
logging.info(f" {key}: {value}")
|
|
|
|
try:
|
|
response = requests.get(
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
verify=self.client.verify_certificate,
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
# If we're querying by name, return the first matching VPG
|
|
if vpg_name and isinstance(result, list):
|
|
matching_vpg = next((vpg for vpg in result if vpg.get("VpgName") == vpg_name), None)
|
|
if matching_vpg:
|
|
logging.info(f"Successfully retrieved VPG details for {vpg_name}")
|
|
return matching_vpg
|
|
logging.warning(f"No VPG found with name {vpg_name}")
|
|
return {}
|
|
|
|
if vpg_identifier:
|
|
logging.info(f"Successfully retrieved VPG details for {vpg_identifier}")
|
|
else:
|
|
logging.info(f"Successfully retrieved {len(result)} VPGs")
|
|
|
|
return result
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def commit_vpg(self, vpg_settings_id, vpg_name, sync=False, expected_status=ZertoVPGStatus.Initializing, timeout=30, interval=5):
|
|
logging.info(f'VPGs.commit_vpg(zvm_address={self.client.zvm_address}, vpg_settings_id={vpg_settings_id}, vpg_name={vpg_name}, sync={sync})')
|
|
commit_uri = f"https://{self.client.zvm_address}/v1/vpgSettings/{vpg_settings_id}/commit"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
try:
|
|
response = requests.post(commit_uri, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
task_id = response.json()
|
|
logging.info(f"VPGSettings {vpg_settings_id} successfully committed, {vpg_name} is created, task_id={task_id}")
|
|
|
|
if sync:
|
|
# Wait for task completion
|
|
self.tasks.wait_for_task_completion(task_id, timeout=timeout, interval=interval)
|
|
logging.debug('sleeping 5 seconds ...')
|
|
self.wait_for_vpg_ready(vpg_name=vpg_name, timeout=30, interval=5, expected_status=expected_status)
|
|
return task_id
|
|
return task_id
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
def create_vpg(self, basic, journal, recovery, networks, sync=True, status: ZertoVPGStatus = ZertoVPGStatus.Initializing, timeout=30, interval=5):
|
|
vpg_name = basic.get("Name")
|
|
logging.info(f'VPGs.create_vpg(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, sync={sync})')
|
|
vpg_settings_id = self.create_vpg_settings(basic, journal, recovery, networks, vpg_identifier=None)
|
|
return self.commit_vpg(vpg_settings_id, vpg_name, sync, expected_status=status, timeout=timeout, interval=interval)
|
|
|
|
def wait_for_vpg_ready(self, vpg_name, timeout=180, interval=5, expected_status=ZertoVPGStatus.Initializing):
|
|
logging.debug(f'VPGs.wait_for_vpg_ready(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, timeout={timeout}, interval={interval}, expected_status={ZertoVPGStatus.get_name_by_value(expected_status.value)})')
|
|
start_time = time.time()
|
|
|
|
while True:
|
|
time.sleep(interval)
|
|
vpg_info = self.list_vpgs(vpg_name=vpg_name)
|
|
# get status and convert string into enum
|
|
logging.debug(f"VPG status: {vpg_info.get('Status')}")
|
|
vpg_status: ZertoVPGStatus = ZertoVPGStatus(vpg_info.get("Status"))
|
|
logging.debug(f"Checking VPG status for {vpg_name}: Expected status = {ZertoVPGStatus.get_name_by_value(expected_status.value)}, Current status = {ZertoVPGStatus.get_name_by_value(vpg_status.value)}")
|
|
|
|
# If VPG is in the expected status or passed the Initializing status too quickly and is in another status
|
|
if vpg_status == expected_status or (expected_status == ZertoVPGStatus.Initializing and vpg_status.value > ZertoVPGStatus.Initializing.value):
|
|
logging.info(f"VPG {vpg_name} is now in the expected state: {ZertoVPGStatus.get_name_by_value(vpg_status.value)}")
|
|
return vpg_info
|
|
|
|
# Check if the timeout has been reached
|
|
elapsed_time = time.time() - start_time
|
|
if elapsed_time > timeout:
|
|
raise TimeoutError(f"VPG {vpg_name} did not reach the {ZertoVPGStatus.get_name_by_value(expected_status.value)} state within the allotted time. Current status: {ZertoVPGStatus.get_name_by_value(vpg_status.value)}")
|
|
|
|
def add_vm_to_vpg(self, vpg_name, vm_list_payload):
|
|
logging.info(f'VPGs.add_vm_to_vpg(zvm_address={self.client.zvm_address}, vpg_name={vpg_name})')
|
|
vpg = self.list_vpgs(vpg_name=vpg_name)
|
|
|
|
if not vpg:
|
|
logging.error(f"VPG with name '{vpg_name}' not found.")
|
|
return
|
|
|
|
vpg_identifier = vpg['VpgIdentifier']
|
|
logging.info(f"Found VPG '{vpg_name}' with Identifier: {vpg_identifier}")
|
|
|
|
new_vpg_settings_id = self.create_vpg_settings(basic=None, journal=None, recovery=None, networks=None, vpg_identifier=vpg_identifier)
|
|
|
|
logging.info(f"Adding VMs to VPGSettings ID: {new_vpg_settings_id}")
|
|
logging.debug(f"VM List Payload: {json.dumps(vm_list_payload, indent=4)}")
|
|
vms_uri = f"https://{self.client.zvm_address}/v1/vpgSettings/{new_vpg_settings_id}/vms"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
try:
|
|
response = requests.post(vms_uri, headers=headers, json=vm_list_payload, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
logging.info(f"Successfully added VMs to VPG {new_vpg_settings_id}.")
|
|
self.commit_vpg(new_vpg_settings_id, vpg_name, sync=True, expected_status=ZertoVPGStatus.Initializing)
|
|
return
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
def remove_vm_from_vpg(self, vpg_name, vm_identifier):
|
|
logging.info(f'VPGs.remove_vm_from_vpg(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, vm_identifier={vm_identifier})')
|
|
vpg = self.list_vpgs(vpg_name=vpg_name)
|
|
|
|
if not vpg:
|
|
logging.error(f"VPG with name '{vpg_name}' not found.")
|
|
return
|
|
|
|
vpg_id = vpg['VpgIdentifier']
|
|
logging.info(f"Found VPG '{vpg_name}' with Identifier: {vpg_id}")
|
|
|
|
new_vpg_settings_id = self.create_vpg_settings(basic=None, journal=None, recovery=None, networks=None, vpg_identifier=vpg_id)
|
|
remove_vm_uri = f"https://{self.client.zvm_address}/v1/vpgSettings/{new_vpg_settings_id}/vms/{vm_identifier}"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
try:
|
|
response = requests.delete(remove_vm_uri, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
logging.info(f"VM {vm_identifier} successfully removed from VPG '{vpg_name}' (ID: {new_vpg_settings_id}).")
|
|
self.commit_vpg(new_vpg_settings_id, vpg_name, sync=True, expected_status=ZertoVPGStatus.Initializing)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
def failover_test(self, vpg_name, checkpoint_identifier=None, vm_name_list=None, sync=True):
|
|
"""
|
|
Initiate a failover test for a given VPG by its name.
|
|
|
|
:param vpg_name: The name of the VPG.
|
|
:param checkpoint_identifier: checkpoint_identifier can be recived by list_checkpoint, if not provided uses the latest checkpoint.
|
|
:param vm_name_list: List of
|
|
:param options: Optional parameters for the failover test.
|
|
:return: Response from the Zerto API.
|
|
"""
|
|
logging.info(f'VPGs.failover_test(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, checkpoint_identifier={checkpoint_identifier}, vm_name_list={vm_name_list}, sync={sync})')
|
|
|
|
# Retrieve the VPG identifier using the VPG name
|
|
vpg_info = self.list_vpgs(vpg_name=vpg_name)
|
|
vpg_identifier = vpg_info['VpgIdentifier']
|
|
logging.debug(f"Found VPG '{vpg_name}' with Identifier: {vpg_identifier}")
|
|
|
|
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}/FailoverTest"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
payload = {}
|
|
if checkpoint_identifier: payload['CheckpointIdentifier'] = checkpoint_identifier
|
|
|
|
vm_identifier_list = []
|
|
if vm_name_list:
|
|
|
|
for vm in vm_name_list:
|
|
vm_info = self.list_vms(vm_name=vm)
|
|
if not vm_info:
|
|
logging.error (f'failover_test vm={vm} not found')
|
|
return
|
|
vm_identifier_list.append(vm_info[0]['VmIdentifier'])
|
|
|
|
payload['VmIdentifiers'] = vm_identifier_list
|
|
|
|
try:
|
|
logging.info(f"Initiating failover test for VPG '{vpg_name}', payload={payload}")
|
|
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
task_id = response.json()
|
|
|
|
logging.info(f"Failover test initiated for VPG {vpg_name}, task_id = {task_id}")
|
|
|
|
if sync:
|
|
# Wait for task completion
|
|
self.tasks.wait_for_task_completion(task_id, timeout=30, interval=5)
|
|
return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
def stop_failover_test(self, vpg_name, failoverTestSuccess=True, failoverTestSummary=None, sync=True):
|
|
"""
|
|
Stop a failover test for a given VPG by its name.
|
|
|
|
:param vpg_name: The name of the VPG.
|
|
:param sync: wait until task is completed.
|
|
|
|
"""
|
|
logging.info(f'VPGs.stop_failover_test(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, sync={sync})')
|
|
|
|
# Retrieve the VPG identifier using the VPG name
|
|
vpg_info = self.list_vpgs(vpg_name=vpg_name)
|
|
vpg_identifier = vpg_info['VpgIdentifier']
|
|
logging.info(f"Found VPG '{vpg_name}' with Identifier: {vpg_identifier}")
|
|
|
|
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}/FailoverTestStop"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
body = {
|
|
"FailoverTestSuccess": failoverTestSuccess,
|
|
"FailoverTestSummary": failoverTestSummary
|
|
}
|
|
|
|
try:
|
|
logging.info(f"Stopping failover test for VPG '{vpg_name}'...")
|
|
response = requests.post(url, headers=headers, json=body, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
task_id = response.json()
|
|
|
|
logging.info(f"Failover test stopping for VPG {vpg_name}, task_id = {task_id}")
|
|
|
|
if sync:
|
|
# Wait for task completion
|
|
self.tasks.wait_for_task_completion(task_id, timeout=30, interval=5)
|
|
return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
def rollback_failover(self, vpg_name, sync=True):
|
|
"""
|
|
Rollback failover for a given VPG by its name.
|
|
|
|
:param vpg_name: The name of the VPG.
|
|
:param sync: wait until task is completed.
|
|
|
|
"""
|
|
logging.info(f'VPGs.rollback_failover(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, sync={sync})')
|
|
|
|
# Retrieve the VPG identifier using the VPG name
|
|
vpg_info = self.list_vpgs(vpg_name=vpg_name)
|
|
vpg_identifier = vpg_info['VpgIdentifier']
|
|
logging.info(f"Found VPG '{vpg_name}' with Identifier: {vpg_identifier}")
|
|
|
|
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}/FailoverRollback"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
try:
|
|
logging.info(f"Rollback failover for VPG '{vpg_name}'...")
|
|
response = requests.post(url, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
task_id = response.json()
|
|
|
|
logging.info(f"Rollback faolover for VPG {vpg_name}, task_id = {task_id}")
|
|
|
|
if sync:
|
|
# Wait for task completion
|
|
self.tasks.wait_for_task_completion(task_id, timeout=30, interval=5)
|
|
return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
def delete_vpg(self, vpg_name, force=False, keep_recovery_volumes=True):
|
|
"""
|
|
Deletes a VPG by its name.
|
|
|
|
:param vpg_name: The name of the VPG to delete.
|
|
:return: Success message if deleted, else an error message.
|
|
"""
|
|
logging.info(f"VPGs.delete_vpg(zvm_address={self.client.zvm_address}, vpg_name={vpg_name}, force={force}, keep_recovery_volumes={keep_recovery_volumes})")
|
|
|
|
# Step 1: Retrieve the VPG details using the VPG name
|
|
vpg = self.list_vpgs(vpg_name=vpg_name)
|
|
|
|
if not vpg:
|
|
logging.error(f"No VPG found with the name '{vpg_name}'.")
|
|
return
|
|
|
|
# Get the VPG Identifier
|
|
vpg_identifier = vpg.get("VpgIdentifier")
|
|
if not vpg_identifier:
|
|
logging.error(f"Could not retrieve Identifier for VPG '{vpg_name}'.")
|
|
return
|
|
|
|
# Step 2: Construct the DELETE request URL
|
|
delete_vpg_uri = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}"
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
payload = {
|
|
"keepRecoveryVolumes": force,
|
|
"force": keep_recovery_volumes
|
|
}
|
|
|
|
try:
|
|
# Step 3: Send DELETE request
|
|
response = requests.delete(delete_vpg_uri, headers=headers, json=payload, verify=self.client.verify_certificate)
|
|
|
|
response.raise_for_status() # Ensure the request was successful
|
|
logging.info(f"Successfully deleted VPG '{vpg_name}' (ID: {vpg_identifier}).")
|
|
return f"VPG '{vpg_name}' deleted successfully."
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
# Added methods from VPGSettings
|
|
def list_vpg_settings(self):
|
|
url = f"https://{self.client.zvm_address}/v1/vpgs/settings"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
try:
|
|
response = requests.get(url, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Failed to list VPG settings: {e}")
|
|
raise
|
|
|
|
def get_vpg_settings_by_id(self, vpg_settings_id):
|
|
url = f"https://{self.client.zvm_address}/v1/vpgSettings/{vpg_settings_id}"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
try:
|
|
response = requests.get(url, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Failed to get VPG settings by ID: {e}")
|
|
raise
|
|
|
|
def update_vpg_settings(self, vpg_settings_id, payload):
|
|
url = f"https://{self.client.zvm_address}/v1/vpgSettings/{vpg_settings_id}"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
logging.info(f"VPGs.update_vpg_settings: Updating VPG settings for ID: {vpg_settings_id}")
|
|
logging.debug(f"VPGs.update_vpg_settings: Payload: {json.dumps(payload, indent=4)}")
|
|
try:
|
|
response = requests.put(url, json=payload, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
return response
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def delete_vpg_settings(self, vpg_settings_id):
|
|
url = f"https://{self.client.zvm_address}/v1/vpgs/settings/{vpg_settings_id}"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
try:
|
|
response = requests.delete(url, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def create_vpg_settings(self, basic=None, journal=None, recovery=None, networks=None, vpg_identifier=None):
|
|
logging.info(f'VPGs.create_vpg_settings(zvm_address={self.client.zvm_address}, vpg_identifier={vpg_identifier})')
|
|
vpg_settings_uri = f"https://{self.client.zvm_address}/v1/vpgSettings"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
payload = {}
|
|
if vpg_identifier:
|
|
payload["vpgIdentifier"] = vpg_identifier
|
|
if basic:
|
|
payload["Basic"] = basic
|
|
if journal:
|
|
payload["Journal"] = journal
|
|
if recovery:
|
|
payload["Recovery"] = recovery
|
|
if networks:
|
|
payload["Networks"] = networks
|
|
|
|
logging.debug(f"VPGs.create_vpg_settings: Payload: {json.dumps(payload, indent=4)}")
|
|
try:
|
|
response = requests.post(vpg_settings_uri, headers=headers, json=payload, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
vpg_settings_id = response.json()
|
|
logging.info(f"VPG Settings ID: {vpg_settings_id} created")
|
|
return vpg_settings_id
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
def list_checkpoints(self, vpg_name, start_date=None, endd_date=None, checkpoint_date_str=None, latest=None):
|
|
"""
|
|
Fetches a list of checkpoints for a specified Virtual Protection Group (VPG).
|
|
|
|
Parameters:
|
|
vpg_name (str): The name of the Virtual Protection Group (VPG) to fetch checkpoints for.
|
|
start_date (str): The start date for filtering checkpoints, in ISO 8601 format (e.g., '2024-11-13T00:00:00Z').
|
|
endd_date (str): The end date for filtering checkpoints, in ISO 8601 format (e.g., '2024-11-14T00:00:00Z').
|
|
checkpoint_date_str (str): A specific date string in the format 'Month Day, Year HH:MM:SS AM/PM'
|
|
(e.g., 'November 13, 2024 1:43:02 PM') to search for an exact checkpoint.
|
|
latest (bool): If True, returns the checkpoint with the most recent timestamp.
|
|
|
|
Returns:
|
|
dict: A single checkpoint that matches `checkpoint_date_str` or the latest checkpoint if `latest=True`.
|
|
list: The full list of checkpoints if neither `checkpoint_date_str` nor `latest` is specified.
|
|
None: If no matching checkpoint is found or an error occurs.
|
|
|
|
Raises:
|
|
SystemExit: If a request exception occurs during the API call.
|
|
"""
|
|
logging.info(f'VPGs.list_checkpoints(vpg_name={vpg_name}, start_date={start_date}, endd_date={endd_date}, checkpoint_date_str={checkpoint_date_str}, latest={latest})')
|
|
vpgid = (self.list_vpgs(vpg_name=vpg_name))['VpgIdentifier']
|
|
vpgs_uri = f"https://{self.client.zvm_address}/v1/vpgs/{vpgid}/checkpoints"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
params = {
|
|
"startDate": start_date,
|
|
"endDate": endd_date
|
|
}
|
|
try:
|
|
response = requests.get(vpgs_uri, headers=headers, params=params, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
checkpoints = response.json()
|
|
|
|
if not checkpoints:
|
|
logging.warning("No checkpoints found.")
|
|
return []
|
|
|
|
if checkpoint_date_str:
|
|
check_point_timestamp = self.__convert_datetime_to_timestamp(date_str = checkpoint_date_str)
|
|
matching_checkpoints = next((checkpoint for checkpoint in checkpoints if checkpoint.get("TimeStamp") == check_point_timestamp), None)
|
|
if not check_point_timestamp:
|
|
logging.warning(f"No checkpoint {checkpoint_date_str} found")
|
|
return {}
|
|
return matching_checkpoints
|
|
|
|
if latest:
|
|
# Find the checkpoint with the most recent timestamp
|
|
latest_checkpoint = max(checkpoints, key=lambda x: x.get("TimeStamp"))
|
|
logging.debug(f"Latest checkpoint found: {latest_checkpoint}")
|
|
return latest_checkpoint
|
|
|
|
return checkpoints
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def create_checkpoint(self, checkpoint_name: str, vpg_identifier: str = None, vpg_name: str = None) -> str:
|
|
"""
|
|
Create a tagged checkpoint for the VPG.
|
|
|
|
Args:
|
|
checkpoint_name: The name/tag to assign to the checkpoint
|
|
vpg_identifier: The identifier of the VPG
|
|
vpg_name: The name of the VPG (alternative to vpg_identifier)
|
|
|
|
Returns:
|
|
str: The task identifier that can be used to monitor the operation
|
|
|
|
Raises:
|
|
requests.exceptions.RequestException: If the API request fails
|
|
ValueError: If neither vpg_identifier nor vpg_name is provided, or if VPG name is not found
|
|
"""
|
|
if not vpg_identifier and not vpg_name:
|
|
raise ValueError("Either vpg_identifier or vpg_name must be provided")
|
|
|
|
# If vpg_name is provided, get the vpg_identifier
|
|
if vpg_name and not vpg_identifier:
|
|
vpg = self.list_vpgs(vpg_name=vpg_name)
|
|
if not vpg:
|
|
raise ValueError(f"VPG with name '{vpg_name}' not found")
|
|
vpg_identifier = vpg.get('VpgIdentifier')
|
|
logging.info(f"Found VPG identifier '{vpg_identifier}' for VPG name '{vpg_name}'")
|
|
|
|
url = f"https://{self.client.zvm_address}/v1/vpgs/{vpg_identifier}/checkpoints"
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
data = {
|
|
"CheckpointName": checkpoint_name
|
|
}
|
|
|
|
logging.info(f"VPGs.create_checkpoint: Creating checkpoint '{checkpoint_name}' for VPG {vpg_identifier}")
|
|
|
|
try:
|
|
response = requests.post(
|
|
url,
|
|
headers=headers,
|
|
json=data,
|
|
verify=self.client.verify_certificate,
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
task_id = response.json()
|
|
logging.info(f"Successfully initiated checkpoint creation, task_id={task_id}")
|
|
return task_id
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def export_vpg_settings(self, vpg_names: List[str]) -> dict:
|
|
"""
|
|
Export settings for specified VPGs.
|
|
|
|
Args:
|
|
vpg_names: List of VPG names to export settings for
|
|
|
|
Returns:
|
|
dict: The exported VPG settings in the format:
|
|
{
|
|
"timeStamp": "2025-02-08T21:50:46.574Z",
|
|
"exportResult": {
|
|
"result": str,
|
|
"message": str
|
|
}
|
|
}
|
|
|
|
Raises:
|
|
requests.exceptions.RequestException: If the API request fails
|
|
"""
|
|
url = f"https://{self.client.zvm_address}/v1/vpgSettings/exportSettings"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
payload = {
|
|
"vpgNames": vpg_names
|
|
}
|
|
|
|
logging.info(f"VPGs.export_vpg_settings: Exporting settings for VPGs: {vpg_names}")
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
logging.info(f"Successfully exported settings for {len(vpg_names)} VPGs at {result.get('timeStamp')}")
|
|
logging.debug(f"Export result: {json.dumps(result, indent=2)}")
|
|
return result
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def list_exported_vpg_settings(self) -> List[Dict]:
|
|
"""
|
|
Get all available exported settings files.
|
|
|
|
Returns:
|
|
List[Dict]: List of exported settings files in the format:
|
|
[
|
|
{
|
|
"timeStamp": "2025-02-08T22:02:18.685Z"
|
|
}
|
|
]
|
|
|
|
Raises:
|
|
requests.exceptions.RequestException: If the API request fails
|
|
"""
|
|
url = f"https://{self.client.zvm_address}/v1/vpgSettings/exportedSettings"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
logging.debug("Fetching list of exported VPG settings")
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
logging.info(f"Found {len(result)} exported settings files")
|
|
logging.debug(f"Exported settings list: {json.dumps(result, indent=2)}")
|
|
return result
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def read_exported_vpg_settings(self, timestamp: str, vpg_names: List[str] = None) -> dict:
|
|
"""
|
|
Read exported settings from a file of given timestamp.
|
|
|
|
Args:
|
|
timestamp: The timestamp of the exported settings file (format: YYYY-MM-DDThh:mm:ss.SSSZ)
|
|
vpg_names: Optional list of VPG names to filter the exported settings
|
|
|
|
Returns:
|
|
dict: The exported VPG settings containing:
|
|
- ExportedVpgSettingsApi: List[dict] - List of VPG settings
|
|
- ErrorMessage: str - Error message if any
|
|
|
|
Raises:
|
|
requests.exceptions.RequestException: If the API request fails
|
|
"""
|
|
url = f"https://{self.client.zvm_address}/v1/vpgSettings/exportedSettings/{timestamp}"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
payload = {}
|
|
if vpg_names:
|
|
payload['vpgNames'] = vpg_names
|
|
|
|
logging.info(f"VPGs.read_exported_vpg_settings: Reading exported VPG settings for timestamp: {timestamp}")
|
|
if vpg_names:
|
|
logging.debug(f"Filtering for VPGs: {vpg_names}")
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
logging.debug(f"VPGs.read_exported_vpg_settings: result: {json.dumps(result, indent=4)}")
|
|
|
|
return result
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def import_vpg_settings(self, settings: Dict) -> dict:
|
|
|
|
|
|
"""
|
|
Import VPG settings.
|
|
|
|
Args:
|
|
settings: Dictionary containing the VPG settings to import. Must include:
|
|
- ExportedVpgSettingsApi: List of VPG settings with detailed configuration
|
|
|
|
Returns:
|
|
dict: The import result containing:
|
|
- validationFailedResults: List[dict] - VPGs that failed validation
|
|
- vpgName: str - Name of the VPG
|
|
- errorMessages: List[str] - List of validation error messages
|
|
- importFailedResults: List[dict] - VPGs that failed to import
|
|
- vpgName: str - Name of the VPG
|
|
- errorMessage: str - Import error message
|
|
- importTaskIdentifiers: List[dict] - Successfully initiated imports
|
|
- vpgName: str - Name of the VPG
|
|
- taskIdentifier: str - Task ID for tracking the import
|
|
|
|
Raises:
|
|
requests.exceptions.RequestException: If the API request fails
|
|
ValueError: If settings dictionary is missing required fields
|
|
"""
|
|
url = f"https://{self.client.zvm_address}/v1/vpgSettings/import"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
# Validate input settings
|
|
if not isinstance(settings, dict):
|
|
raise ValueError("Settings must be a dictionary")
|
|
if 'ExportedVpgSettingsApi' not in settings:
|
|
raise ValueError("Settings must contain 'ExportedVpgSettingsApi' key")
|
|
|
|
# Prepare payload
|
|
payload = {
|
|
"ExportedVpgSettingsApi": settings['ExportedVpgSettingsApi']
|
|
}
|
|
|
|
logging.info(f"VPGs.import_vpg_settings: Importing settings for {len(settings['ExportedVpgSettingsApi'])} VPGs")
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
logging.debug(f"VPGs.import_vpg_settings: result: {json.dumps(result, indent=4)}")
|
|
|
|
# Log validation failures
|
|
if result.get('validationFailedResults'):
|
|
for failure in result['validationFailedResults']:
|
|
logging.error(f"Validation failed for VPG '{failure['vpgName']}': {', '.join(failure['errorMessages'])}")
|
|
|
|
# Log import failures
|
|
if result.get('importFailedResults'):
|
|
for failure in result['importFailedResults']:
|
|
logging.error(f"Import failed for VPG '{failure['vpgName']}': {failure['errorMessage']}")
|
|
|
|
# Log successful imports
|
|
if result.get('importTaskIdentifiers'):
|
|
for task in result['importTaskIdentifiers']:
|
|
logging.info(f"Import initiated for VPG '{task['vpgName']}' with task ID: {task['taskIdentifier']}")
|
|
|
|
logging.debug(f"Import result: {json.dumps(result, indent=2)}")
|
|
|
|
return result
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
def list_vpgs_from_exported_settings(self, time_stamp: str) -> List[Dict]:
|
|
"""List VPGs from exported settings.
|
|
|
|
Args:
|
|
time_stamp: Timestamp of the exported settings
|
|
|
|
Returns:
|
|
List[Dict]: List of VPGs in the format:
|
|
[
|
|
{
|
|
"VpgName": "test",
|
|
"SourceSiteName": "Production",
|
|
"TargetSiteName": "DR"
|
|
}
|
|
]
|
|
"""
|
|
url = f"https://{self.client.zvm_address}/v1/vpgSettings/exportedSettings/{time_stamp}/vpgsinfo"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {self.client.token}'
|
|
}
|
|
|
|
logging.info(f"VPGs.list_vpgs_from_exported_settings: Fetching VPGs from export {time_stamp}")
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers, verify=self.client.verify_certificate)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
logging.info(f"Successfully retrieved {len(result)} VPGs from export {time_stamp}")
|
|
logging.debug(f"VPGs from export: {json.dumps(result, indent=2)}")
|
|
return result
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response is not None:
|
|
logging.error(f"HTTPError: {e.response.status_code} - {e.response.reason}")
|
|
try:
|
|
error_details = e.response.json()
|
|
logging.error(f"Error Message: {error_details.get('Message', 'No detailed error message available')}")
|
|
except ValueError:
|
|
logging.error(f"Response content: {e.response.text}")
|
|
else:
|
|
logging.error("HTTPError occurred with no response attached.")
|
|
raise
|
|
|
|
|