489 lines
21 KiB
Python
489 lines
21 KiB
Python
# Legal Disclaimer
|
|
# This script is an example script and is not supported under any Zerto support program or service.
|
|
# The author and Zerto further disclaim all implied warranties including, without limitation,
|
|
# any implied warranties of merchantability or of fitness for a particular purpose.
|
|
# In no event shall Zerto, its authors or anyone else involved in the creation,
|
|
# production or delivery of the scripts be liable for any damages whatsoever (including,
|
|
# without limitation, damages for loss of business profits, business interruption, loss of business
|
|
# information, or other pecuniary loss) arising out of the use of or the inability to use the sample
|
|
# scripts or documentation, even if the author or Zerto has been advised of the possibility of such damages.
|
|
# The entire risk arising out of the use or performance of the sample scripts and documentation remains with you.
|
|
|
|
"""
|
|
Zerto VPG Settings Export/Import Example Script
|
|
|
|
This script demonstrates how to export and import Virtual Protection Group (VPG) settings
|
|
using the Zerto Virtual Manager (ZVM) API. It allows for backup and restoration of VPG
|
|
configurations, which is useful for disaster recovery planning and VPG replication.
|
|
|
|
Key Features:
|
|
1. Export Selection:
|
|
- List all available exports with timestamps and status
|
|
- Display VPGs in each export with source and target sites
|
|
- Compare requested VPGs with available ones in the export
|
|
- Allow user to select which export to use
|
|
|
|
2. Resource Validation:
|
|
- Check and validate datastores, hosts, folders, and networks
|
|
- Allow user to select replacements for missing resources
|
|
- Handle both VPG-level and VM-level resource mappings
|
|
- Support for journal, scratch, and recovery settings
|
|
|
|
3. Settings Import:
|
|
- Import validated settings back to create new VPGs
|
|
- Display detailed import results including:
|
|
- Validation failures with error messages
|
|
- Import failures with specific errors
|
|
- Successfully initiated imports with task IDs
|
|
- Allow user to verify changes before proceeding
|
|
|
|
Required Arguments:
|
|
--zvm_address: Protected site ZVM address
|
|
--client_id: Protected site Keycloak client ID
|
|
--client_secret: Protected site Keycloak client secret
|
|
--ignore_ssl: Ignore SSL certificate verification (optional)
|
|
--vpg_names: Comma-separated list of VPG names to process (optional)
|
|
|
|
Example Usage:
|
|
python examples/vpg_setting_export_example_enhanced.py \
|
|
--zvm_address "192.168.111.20" \
|
|
--client_id "zerto-api" \
|
|
--client_secret "your-secret-here" \
|
|
--vpg_names "VpgTest1,VpgTest2" \
|
|
--ignore_ssl
|
|
|
|
Script Flow:
|
|
1. Connect to protected site ZVM
|
|
2. List available exports and let user select one
|
|
3. Display VPGs in selected export and compare with requested ones
|
|
4. Get peer site resources (datastores, hosts, folders, networks)
|
|
5. Validate and update resource mappings:
|
|
- VPG-level settings (journal, scratch, recovery)
|
|
- VM-level settings (host, datastore, folder, network)
|
|
6. Save updated settings to file
|
|
7. Allow user to verify changes
|
|
8. Import settings to recreate VPGs
|
|
9. Display import results with task IDs
|
|
|
|
Note: This script requires only protected site credentials. It's designed for VPG
|
|
configuration backup and restore scenarios, allowing you to quickly recreate VPGs
|
|
with identical settings after changes or in disaster recovery situations.
|
|
"""
|
|
import argparse
|
|
import logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
import urllib3
|
|
import json
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from zvml import ZVMLClient
|
|
from typing import List, Dict
|
|
|
|
# Disable SSL warningss
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
def setup_client(args):
|
|
"""Initialize and return Zerto client"""
|
|
client = ZVMLClient(
|
|
zvm_address=args.zvm_address,
|
|
client_id=args.client_id,
|
|
client_secret=args.client_secret,
|
|
verify_certificate=not args.ignore_ssl
|
|
)
|
|
return client
|
|
|
|
def select_resource(resource_type: str, resources: List[Dict]) -> str:
|
|
"""Let user select a resource from available options
|
|
|
|
Args:
|
|
resource_type: Type of resource (datastore, host, folder)
|
|
resources: List of available resources
|
|
|
|
Returns:
|
|
str: Selected resource identifier or None if skipped
|
|
"""
|
|
print(f"\nAvailable {resource_type}s:")
|
|
for i, resource in enumerate(resources, 1):
|
|
if resource_type == "datastore":
|
|
name = resource.get('DatastoreName', 'Unknown')
|
|
id_key = 'DatastoreIdentifier'
|
|
elif resource_type == "host":
|
|
name = resource.get('VirtualizationHostName', 'Unknown')
|
|
id_key = 'HostIdentifier'
|
|
elif resource_type == "folder":
|
|
name = resource.get('FolderName', 'Unknown')
|
|
id_key = 'FolderIdentifier'
|
|
else:
|
|
name = 'Unknown'
|
|
id_key = 'Identifier'
|
|
|
|
print(f"{i}. {name} (ID: {resource.get(id_key, 'Unknown')})")
|
|
|
|
while True:
|
|
try:
|
|
selection = input(f"\nSelect a {resource_type} number to replace the missing one(or 'q' to skip): ")
|
|
if selection.lower() == 'q':
|
|
return None
|
|
|
|
selection = int(selection)
|
|
if 1 <= selection <= len(resources):
|
|
return resources[selection - 1]
|
|
else:
|
|
print("Invalid selection. Please try again.")
|
|
except ValueError:
|
|
print("Please enter a valid number or 'q' to skip.")
|
|
|
|
def check_vpg_datastore(datastore_id: str, available_datastores: List[Dict]) -> str:
|
|
"""Check if a datastore ID exists in the available datastores list.
|
|
If not found, ask user to select a replacement.
|
|
|
|
Args:
|
|
datastore_id: The datastore identifier to check
|
|
available_datastores: List of available datastores from the peer site
|
|
|
|
Returns:
|
|
str: Original datastore ID if found, or selected replacement datastore ID
|
|
"""
|
|
if not datastore_id:
|
|
return None
|
|
|
|
if any(ds['DatastoreIdentifier'] == datastore_id for ds in available_datastores):
|
|
return datastore_id
|
|
|
|
# If not found, ask user to select a replacement
|
|
logging.warning(f"Datastore {datastore_id} not found in peer site")
|
|
return select_resource("datastore", available_datastores)
|
|
|
|
def check_vpg_host(host_id: str, available_hosts: List[Dict]) -> str:
|
|
"""Check if a host ID exists in the available hosts list.
|
|
If not found, ask user to select a replacement.
|
|
|
|
Args:
|
|
host_id: The host identifier to check
|
|
available_hosts: List of available hosts from the peer site
|
|
|
|
Returns:
|
|
str: Original host ID if found, or selected replacement host ID
|
|
"""
|
|
if not host_id:
|
|
return None
|
|
|
|
if any(host['HostIdentifier'] == host_id for host in available_hosts):
|
|
return host_id
|
|
|
|
# If not found, ask user to select a replacement
|
|
logging.warning(f"Host {host_id} not found in peer site")
|
|
return select_resource("host", available_hosts)
|
|
|
|
def check_vpg_folder(folder_id: str, available_folders: List[Dict]) -> str:
|
|
"""Check if a folder ID exists in the available folders list.
|
|
If not found, ask user to select a replacement.
|
|
|
|
Args:
|
|
folder_id: The folder identifier to check
|
|
available_folders: List of available folders from the peer site
|
|
|
|
Returns:
|
|
str: Original folder ID if found, or selected replacement folder ID
|
|
"""
|
|
if not folder_id:
|
|
return None
|
|
|
|
if any(folder['FolderIdentifier'] == folder_id for folder in available_folders):
|
|
return folder_id
|
|
|
|
# If not found, ask user to select a replacement
|
|
logging.warning(f"Folder {folder_id} not found in peer site")
|
|
return select_resource("folder", available_folders).get('FolderIdentifier')
|
|
|
|
def check_vpg_network(network_id: str, available_networks: List[Dict]) -> str:
|
|
"""Check if a network ID exists in the available networks list.
|
|
If not found, ask user to select a replacement.
|
|
|
|
Args:
|
|
network_id: The network identifier to check
|
|
available_networks: List of available networks from the peer site
|
|
|
|
Returns:
|
|
str: Original network ID if found, or selected replacement network ID
|
|
"""
|
|
if not network_id:
|
|
return None
|
|
|
|
if any(net['NetworkIdentifier'] == network_id for net in available_networks):
|
|
return network_id
|
|
|
|
# If not found, ask user to select a replacement
|
|
logging.warning(f"Network {network_id} not found in peer site")
|
|
return select_resource("network", available_networks).get('NetworkIdentifier')
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Export and Import VPG settings example")
|
|
parser.add_argument("--zvm_address", required=True, help="Site 1 ZVM address")
|
|
parser.add_argument('--client_id', required=True, help='Site 1 Keycloak client ID')
|
|
parser.add_argument('--client_secret', required=True, help='Site 1 Keycloak client secret')
|
|
parser.add_argument("--ignore_ssl", action="store_true", help="Ignore SSL certificate verification")
|
|
parser.add_argument("--vpg_names", required=True, help="Comma-separated list of VPG names to process")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
# Setup client
|
|
client = setup_client(args)
|
|
|
|
# Split the comma-separated string and strip whitespace
|
|
vpg_names = [name.strip() for name in args.vpg_names.split(',')]
|
|
logging.info(f"Processing VPGs: {vpg_names}")
|
|
|
|
# Export selection loop
|
|
while True:
|
|
# List all available exports
|
|
print("\nAvailable exports:")
|
|
exports = client.vpgs.list_exported_vpg_settings()
|
|
if not exports:
|
|
logging.error("No exports found")
|
|
sys.exit(1)
|
|
|
|
# Display available exports
|
|
for i, export in enumerate(exports, 1):
|
|
print(f"{i}. Timestamp: {export.get('TimeStamp')}")
|
|
print(f" Status: {export.get('Status')}")
|
|
print()
|
|
|
|
# Let user select an export
|
|
while True:
|
|
try:
|
|
selection = input("\nSelect an export number to use (or 'q' to quit): ")
|
|
if selection.lower() == 'q':
|
|
logging.info("User chose to quit")
|
|
sys.exit(0)
|
|
|
|
selection = int(selection)
|
|
if 1 <= selection <= len(exports):
|
|
selected_export = exports[selection - 1]
|
|
break
|
|
else:
|
|
print("Invalid selection. Please try again.")
|
|
except ValueError:
|
|
print("Please enter a valid number or 'q' to quit.")
|
|
|
|
# Get VPGs from selected export
|
|
print("\nVPGs in selected export:")
|
|
export_vpgs = client.vpgs.list_vpgs_from_exported_settings(selected_export['TimeStamp'])
|
|
for vpg in export_vpgs:
|
|
print(f"- {vpg['VpgName']} (Source: {vpg['SourceSiteName']}, Target: {vpg['TargetSiteName']})")
|
|
|
|
# Compare with requested VPGs if specified
|
|
if vpg_names:
|
|
missing_vpgs = [name for name in vpg_names if name not in [vpg['VpgName'] for vpg in export_vpgs]]
|
|
if missing_vpgs:
|
|
print(f"\nWarning: The following requested VPGs are not in the selected export:")
|
|
for vpg in missing_vpgs:
|
|
print(f"- {vpg}")
|
|
print("\nYou can either:")
|
|
print("1. Select a different export")
|
|
print("2. Continue with the available VPGs")
|
|
print("3. Quit")
|
|
|
|
choice = input("\nEnter your choice (1/2/3): ")
|
|
if choice == '1':
|
|
continue # Restart the export selection loop
|
|
elif choice == '3':
|
|
logging.info("User chose to quit")
|
|
sys.exit(0)
|
|
# If choice is 2, continue with available VPGs
|
|
|
|
# Ask for confirmation to proceed
|
|
confirm = input("\nDo you want to proceed with this export? (y/n): ")
|
|
if confirm.lower() != 'y':
|
|
logging.info("User chose not to proceed")
|
|
sys.exit(0)
|
|
break # Exit the export selection loop if confirmed
|
|
|
|
# Get the selected export settings
|
|
export_settings = client.vpgs.read_exported_vpg_settings(selected_export['TimeStamp'], vpg_names)
|
|
|
|
# Save the selected export to a file
|
|
file_name = f"{selected_export['TimeStamp']}-original-exported-vpg-settings.json"
|
|
with open(file_name, 'w') as f:
|
|
json.dump(export_settings, f, indent=2)
|
|
print(f"\nSelected export saved to: {file_name}")
|
|
|
|
# Get peer site resources
|
|
virtualization_sites = client.virtualization_sites.get_virtualization_sites()
|
|
logging.debug(f"Virtualization Sites: {json.dumps(virtualization_sites, indent=4)}")
|
|
|
|
# Get local site ids
|
|
local_site_identifier = client.localsite.get_local_site().get('SiteIdentifier')
|
|
logging.info(f"Local Site ID: {local_site_identifier}")
|
|
|
|
peer_site_identifier = next((site['SiteIdentifier'] for site in virtualization_sites if site['SiteIdentifier'] != local_site_identifier), None)
|
|
logging.info(f"Peer Site ID: {peer_site_identifier}")
|
|
|
|
# Get peer site resources
|
|
peer_datastores = client.virtualization_sites.get_virtualization_site_datastores(
|
|
site_identifier=peer_site_identifier
|
|
)
|
|
logging.info(f"Peer Datastores: {json.dumps(peer_datastores, indent=4)}")
|
|
|
|
peer_hosts = client.virtualization_sites.get_virtualization_site_hosts(
|
|
site_identifier=peer_site_identifier
|
|
)
|
|
logging.info(f"Peer Hosts: {json.dumps(peer_hosts, indent=4)}")
|
|
|
|
peer_folders = client.virtualization_sites.get_virtualization_site_folders(
|
|
site_identifier=peer_site_identifier
|
|
)
|
|
logging.info(f"Peer Folders: {json.dumps(peer_folders, indent=4)}")
|
|
|
|
peer_networks = client.virtualization_sites.get_virtualization_site_networks(
|
|
site_identifier=peer_site_identifier
|
|
)
|
|
logging.info(f"Peer Networks: {json.dumps(peer_networks, indent=4)}")
|
|
|
|
# Process each VPG
|
|
for vpg in export_settings['ExportedVpgSettingsApi']:
|
|
vpg_name = vpg['Basic']['Name']
|
|
logging.info(f"\nProcessing VPG: {vpg_name}")
|
|
|
|
# Check journal datastore
|
|
journal_ds = vpg['Journal'].get('DatastoreIdentifier')
|
|
new_ds = check_vpg_datastore(journal_ds, peer_datastores)
|
|
if new_ds:
|
|
vpg['Journal']['DatastoreIdentifier'] = new_ds
|
|
|
|
# Check scratch datastore
|
|
scratch_ds = vpg['Scratch'].get('DatastoreIdentifier')
|
|
new_ds = check_vpg_datastore(scratch_ds, peer_datastores)
|
|
if new_ds:
|
|
vpg['Scratch']['DatastoreIdentifier'] = new_ds
|
|
|
|
# Check recovery settings
|
|
recovery = vpg['Recovery']
|
|
new_host = check_vpg_host(recovery['DefaultHostIdentifier'], peer_hosts)
|
|
if new_host:
|
|
recovery['DefaultHostIdentifier'] = new_host
|
|
|
|
new_ds = check_vpg_datastore(recovery['DefaultDatastoreIdentifier'], peer_datastores)
|
|
if new_ds:
|
|
recovery['DefaultDatastoreIdentifier'] = new_ds
|
|
|
|
new_folder = check_vpg_folder(recovery['DefaultFolderIdentifier'], peer_folders)
|
|
if new_folder:
|
|
recovery['DefaultFolderIdentifier'] = new_folder
|
|
|
|
# Check VPG-level network settings
|
|
networks = vpg['Networks']
|
|
# Check failover network
|
|
failover_net = networks['Failover']['Hypervisor'].get('DefaultNetworkIdentifier')
|
|
new_net = check_vpg_network(failover_net, peer_networks)
|
|
if new_net:
|
|
networks['Failover']['Hypervisor']['DefaultNetworkIdentifier'] = new_net
|
|
|
|
# Check failover test network
|
|
failover_test_net = networks['FailoverTest']['Hypervisor'].get('DefaultNetworkIdentifier')
|
|
new_net = check_vpg_network(failover_test_net, peer_networks)
|
|
if new_net:
|
|
networks['FailoverTest']['Hypervisor']['DefaultNetworkIdentifier'] = new_net
|
|
|
|
# Check VM-specific settings
|
|
for vm in vpg.get('Vms', []):
|
|
vm_recovery = vm['Recovery']
|
|
new_host = check_vpg_host(vm_recovery['HostIdentifier'], peer_hosts)
|
|
if new_host:
|
|
vm_recovery['HostIdentifier'] = new_host
|
|
|
|
new_ds = check_vpg_datastore(vm_recovery['DatastoreIdentifier'], peer_datastores)
|
|
if new_ds:
|
|
vm_recovery['DatastoreIdentifier'] = new_ds
|
|
|
|
new_folder = check_vpg_folder(vm_recovery['FolderIdentifier'], peer_folders)
|
|
if new_folder:
|
|
vm_recovery['FolderIdentifier'] = new_folder
|
|
|
|
# Check VM journal datastore
|
|
vm_journal_ds = vm['Journal'].get('DatastoreIdentifier')
|
|
new_ds = check_vpg_datastore(vm_journal_ds, peer_datastores)
|
|
if new_ds:
|
|
vm['Journal']['DatastoreIdentifier'] = new_ds
|
|
|
|
# Check VM scratch datastore
|
|
vm_scratch_ds = vm['Scratch'].get('DatastoreIdentifier')
|
|
new_ds = check_vpg_datastore(vm_scratch_ds, peer_datastores)
|
|
if new_ds:
|
|
vm['Scratch']['DatastoreIdentifier'] = new_ds
|
|
|
|
# Check VM network settings
|
|
for nic in vm.get('Nics', []):
|
|
# Check failover network
|
|
failover_net = nic['Failover']['Hypervisor'].get('NetworkIdentifier')
|
|
new_net = check_vpg_network(failover_net, peer_networks)
|
|
if new_net:
|
|
nic['Failover']['Hypervisor']['NetworkIdentifier'] = new_net
|
|
|
|
# Check failover test network
|
|
failover_test_net = nic['FailoverTest']['Hypervisor'].get('NetworkIdentifier')
|
|
new_net = check_vpg_network(failover_test_net, peer_networks)
|
|
if new_net:
|
|
nic['FailoverTest']['Hypervisor']['NetworkIdentifier'] = new_net
|
|
|
|
# Save updated settings
|
|
updated_file_name = f"{selected_export['TimeStamp']}-updated-exported-vpg-settings.json"
|
|
with open(updated_file_name, 'w') as f:
|
|
json.dump(export_settings, f, indent=2)
|
|
logging.info(f"\nUpdated settings saved to: {updated_file_name}")
|
|
|
|
# ask users if they want to import the settings back
|
|
# print the name of the original file and the updated file and ask to confirm the changes where compared
|
|
print(f"\nOriginal file: {file_name}")
|
|
print(f"Updated file: {updated_file_name}")
|
|
print(f"Please compare those file and confirm the changes where done correctly")
|
|
input("\nPress Enter when you done comparing the files to continue...")
|
|
|
|
import_settings = input("\nDo you want to import the settings back? (y/n): ")
|
|
if import_settings.lower() != 'y':
|
|
logging.info("User did not confirm the changes, exiting the script")
|
|
sys.exit(1)
|
|
|
|
#pause
|
|
input(f"\nNow you can delete the selected VPGs [{vpg_names}] and then press Enter to continue...")
|
|
|
|
# Step 3: Import the settings back
|
|
print("\nStep 3: Importing VPG settings...")
|
|
import_result = client.vpgs.import_vpg_settings(export_settings)
|
|
print("\nImport Result:")
|
|
|
|
# Display validation failures
|
|
if import_result.get('ValidationFailedResults'):
|
|
print("\nValidation Failed:")
|
|
for failure in import_result['ValidationFailedResults']:
|
|
print(f"- VPG: {failure['VpgName']}")
|
|
for error in failure['ErrorMessages']:
|
|
print(f" Error: {error}")
|
|
|
|
# Display import failures
|
|
if import_result.get('ImportFailedResults'):
|
|
print("\nImport Failed:")
|
|
for failure in import_result['ImportFailedResults']:
|
|
print(f"- VPG: {failure['VpgName']}")
|
|
print(f" Error: {failure['ErrorMessage']}")
|
|
|
|
# Display successful imports
|
|
if import_result.get('ImportTaskIdentifiers'):
|
|
print("\nSuccessfully Initiated Imports:")
|
|
for task in import_result['ImportTaskIdentifiers']:
|
|
print(f"- VPG: {task['VpgName']}")
|
|
print(f" Task ID: {task['TaskIdentifier']}")
|
|
|
|
#pause
|
|
input("\nLook at the VPGs and verify whether the manual changes are reverted back to the original settings. Press Enter to exit")
|
|
|
|
except Exception as e:
|
|
logging.exception("Error occurred:")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |