first full version of the exersises

This commit is contained in:
Kosta Mushkin
2025-05-29 19:19:25 -04:00
parent 3a075e5ecf
commit 4690e82e58
15 changed files with 1707 additions and 480 deletions
@@ -9,18 +9,31 @@ Prerequisites:
pip install -e .
2. Update prerequisites/config.py with your ZVM details
Usage:
python create_vpg.py --vm-names "vm1" "vm2" "vm3" [--vpg-name "My-VPG"]
This solution demonstrates:
- Creating a new VPG with basic settings
- Configuring journal, recovery, and network settings
- Adding VMs to the VPG
- Adding specified VMs to the VPG
- Proper error handling and logging
"""
import sys
import os
import logging
# Set up logging with timestamp
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
import json
import argparse
from pathlib import Path
import urllib3
# Suppress SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Add prerequisites to Python path
prerequisites_path = Path(__file__).parent.parent.parent.parent / "prerequisites"
@@ -43,19 +56,65 @@ except ImportError:
print("Expected path:", prerequisites_path / "config.py")
sys.exit(1)
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description='Create VPG and add specified VMs')
parser.add_argument('--vm-names', nargs='+', required=True,
help='List of VM names to add to the VPG (can be space-separated or comma-separated)')
parser.add_argument('--vpg-name', default="Test-VPG-Python",
help='Name of the VPG to create (default: Test-VPG-Python)')
return parser.parse_args()
def find_vms_by_names(client, site_identifier, vm_names):
"""Find VMs by their names in the specified site."""
vms = client.virtualization_sites.get_virtualization_site_vms(site_identifier=site_identifier)
logging.info(f"find_vms_by_names: found vms {json.dumps(vms, indent=4)} VMs in site {site_identifier}")
if not vms:
return [], []
# Create a dictionary of VM name to VM object for easy lookup
vm_dict = {vm.get('VmName'): vm for vm in vms}
# Find requested VMs
found_vms = []
not_found = []
for vm_name in vm_names:
if vm_name in vm_dict:
found_vms.append(vm_dict[vm_name])
logging.info(f"Found VM: {vm_name} (ID: {vm_dict[vm_name].get('VmIdentifier')})")
else:
not_found.append(vm_name)
logging.warning(f"VM not found: {vm_name}")
return found_vms, not_found
def remove_vm_from_vpg(client, vpg_name, vm):
"""Remove a VM from the VPG."""
vm_name = vm.get('VmName')
# vm_id = vm.get('VmIdentifier')
logging.info(f"\nRemoving VM {vm_name} from VPG...")
client.vpgs.remove_vm_from_vpg(vpg_name=vpg_name, vm_name=vm_name)
def main():
"""
Main function to demonstrate VPG creation.
Shows how to:
1. Create a new VPG with basic settings
2. Configure journal, recovery, and network settings
3. Add VMs to the VPG
3. Add specified VMs to the VPG
4. Remove the last added VM from the VPG
"""
# Set up logging with timestamp
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Parse command line arguments
args = parse_arguments()
# Handle both space-separated and comma-separated VM names
vm_names = []
for name in args.vm_names:
vm_names.extend([n.strip() for n in name.split(',') if n.strip()])
# Update args.vm_names with the processed list
args.vm_names = vm_names
try:
# Step 1: Create a ZVMLClient instance
@@ -124,7 +183,7 @@ def main():
# Step 4: Create VPG configuration
logging.info("\nCreating VPG configuration...")
vpg_name = "Test-VPG-Python"
vpg_name = args.vpg_name
# Basic VPG settings
basic = {
@@ -138,13 +197,8 @@ def main():
"RecoverySiteIdentifier": peer_site_identifier
}
# Journal settings
# Journal settings, keep the default settings
journal = {
"DatastoreIdentifier": target_datastore.get('DatastoreIdentifier'),
"Limitation": {
"HardLimitInMB": 153600,
"WarningThresholdInMB": 115200
}
}
# Recovery settings
@@ -180,34 +234,54 @@ def main():
vpg_id = client.vpgs.create_vpg(basic=basic, journal=journal, recovery=recovery, networks=networks, sync=True)
logging.info(f"VPG created successfully with ID: {vpg_id}")
# Step 6: Get available VMs for protection
logging.info("\nRetrieving available VMs for protection...")
vms = client.virtualization_sites.get_virtualization_site_vms(site_identifier=local_site_identifier)
# Step 6: Get specified VMs for protection
logging.info(f"\nRetrieving specified VMs for protection: {args.vm_names}")
found_vms, not_found = find_vms_by_names(client, local_site_identifier, args.vm_names)
if not vms:
logging.warning("No VMs found for protection!")
if not_found:
logging.warning(f"The following VMs were not found: {not_found}")
if not found_vms:
logging.error("No VMs found for protection!")
sys.exit(1)
# Filter out VMs that are already protected
available_vms = [vm for vm in vms if not vm.get('IsProtected')]
logging.info(f"Found {len(available_vms)} available VM(s) for protection")
logging.info(f"Found {len(found_vms)} VM(s) for protection")
# Step 7: Add VMs to VPG
for vm in available_vms[:2]: # Add first two available VMs
logging.info(f"\nAdding VM {vm.get('VmName')} to VPG...")
for vm in found_vms:
vm_name = vm.get('VmName')
vm_id = vm.get('VmIdentifier')
logging.info(f"\nAdding VM {vm_name} (ID: {vm_id}) to VPG...")
vm_payload = {
"VmIdentifier": vm.get('VmIdentifier'),
"VmIdentifier": vm_id,
"Recovery": {
"HostIdentifier": target_host.get('HostIdentifier'),
"DatastoreIdentifier": target_datastore.get('DatastoreIdentifier'),
"FolderIdentifier": target_folder.get('FolderIdentifier')
}
}
task_id = client.vpgs.add_vm_to_vpg(vpg_name, vm_list_payload=vm_payload)
logging.info(f"Task ID: {task_id} to add VM {vm.get('VmName')} to VPG")
task_id = client.vpgs.add_vm_to_vpg(args.vpg_name, vm_list_payload=vm_payload)
logging.info(f"Task ID: {task_id} to add VM {vm_name} to VPG")
# Step 8: Interactive VM removal
if found_vms:
last_vm = found_vms[-1]
vm_name = last_vm.get('VmName')
while True:
response = input(f"\nWould you like to remove the last added VM ({vm_name}) from the VPG? (yes/no): ").lower()
if response in ['yes', 'y']:
remove_vm_from_vpg(client, args.vpg_name, last_vm)
logging.info(f"Successfully removed VM {vm_name} from VPG {args.vpg_name}")
break
elif response in ['no', 'n']:
logging.info("Skipping VM removal.")
break
else:
print("Please answer 'yes' or 'no'")
except Exception as e:
logging.error(f"VPG creation failed: {str(e)}")
logging.error(f"VPG operation failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
@@ -1,162 +0,0 @@
#!/usr/bin/env python3
"""
Exercise 5: VPG Operations - Solution (Part 2: VM Management)
This script demonstrates how to manage VMs within VPGs in your Zerto environment.
Prerequisites:
1. Install the zvml package in development mode:
cd /path/to/zvml-python-sdk
pip install -e .
2. Update prerequisites/config.py with your ZVM details
3. Run create_vpg.py first to create a VPG
This solution demonstrates:
- Adding VMs to existing VPGs using add_vm_to_vpg
- Removing VMs from VPGs using remove_vm_from_vpg
- Proper error handling and logging
"""
import sys
import os
import logging
import json
from pathlib import Path
# Add prerequisites to Python path
prerequisites_path = Path(__file__).parent.parent.parent.parent / "prerequisites"
sys.path.append(str(prerequisites_path))
# Import the SDK modules
from zvml import ZVMLClient
# Import configuration
try:
from config import (
ZVM_HOST,
ZVM_PORT,
ZVM_SSL_VERIFY,
CLIENT_ID,
CLIENT_SECRET
)
except ImportError:
print("Error: Please copy config.example.py to config.py and update with your values")
print("Expected path:", prerequisites_path / "config.py")
sys.exit(1)
def main():
"""
Main function to demonstrate VM management in VPGs.
Shows how to:
1. Add VMs to an existing VPG using add_vm_to_vpg
2. Remove VMs from a VPG using remove_vm_from_vpg
"""
# Set up logging with timestamp
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
try:
# Step 1: Create a ZVMLClient instance
logging.info(f"Initializing ZVMLClient for ZVM at {ZVM_HOST}")
client = ZVMLClient(
zvm_address=ZVM_HOST,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
verify_certificate=ZVM_SSL_VERIFY
)
# Step 2: Find the VPG we created earlier
logging.info("\nRetrieving VPGs...")
vpgs = client.vpgs.get_vpgs()
if not vpgs:
logging.warning("No VPGs found!")
sys.exit(1)
# Find our test VPG
test_vpg = next((vpg for vpg in vpgs if vpg.get('VpgName') == "Test-VPG-Python"), None)
if not test_vpg:
logging.warning("Test VPG not found! Please run create_vpg.py first.")
sys.exit(1)
vpg_name = test_vpg.get('VpgName')
logging.info(f"Found VPG: {json.dumps(test_vpg, indent=4)}")
# Step 3: Get available VMs for adding to VPG
logging.info("\nRetrieving available VMs...")
vms = client.virtualization_sites.get_virtualization_site_vms(site_identifier=test_vpg.get('SourceSiteIdentifier'))
if not vms:
logging.warning("No VMs found!")
sys.exit(1)
# Filter out VMs that are already protected
available_vms = [vm for vm in vms if not vm.get('IsProtected')]
logging.info(f"Found {len(available_vms)} available VM(s)")
if not available_vms:
logging.warning("No available VMs to add!")
sys.exit(1)
# Step 4: Get peer site resources for VM recovery settings
logging.info("\nRetrieving peer site resources...")
peer_site_identifier = test_vpg.get('TargetSiteIdentifier')
# Get peer datastores
peer_datastores = client.virtualization_sites.get_virtualization_site_datastores(site_identifier=peer_site_identifier)
if not peer_datastores:
logging.warning("No datastores found in peer site!")
sys.exit(1)
target_datastore = peer_datastores[0] # Use first available datastore
# Get peer folders
peer_folders = client.virtualization_sites.get_virtualization_site_folders(site_identifier=peer_site_identifier)
if not peer_folders:
logging.warning("No folders found in peer site!")
sys.exit(1)
target_folder = peer_folders[0] # Use first available folder
# Get peer hosts
peer_hosts = client.virtualization_sites.get_virtualization_site_hosts(site_identifier=peer_site_identifier)
if not peer_hosts:
logging.warning("No hosts found in peer site!")
sys.exit(1)
target_host = peer_hosts[0] # Use first available host
# Step 5: Add a new VM to the VPG
logging.info("\nAdding a new VM to the VPG...")
vm_to_add = available_vms[0] # Use first available VM
vm_payload = {
"VmIdentifier": vm_to_add.get('VmIdentifier'),
"Recovery": {
"HostIdentifier": target_host.get('HostIdentifier'),
"DatastoreIdentifier": target_datastore.get('DatastoreIdentifier'),
"FolderIdentifier": target_folder.get('FolderIdentifier')
}
}
task_id = client.vpgs.add_vm_to_vpg(vpg_name, vm_list_payload=vm_payload)
logging.info(f"Task ID: {task_id} to add VM {vm_to_add.get('VmName')} to VPG")
# Step 6: Remove a VM from the VPG
logging.info("\nRemoving a VM from the VPG...")
# Get VPG VMs
vpg_vms = client.vpgs.get_vpg_vms(vpg_name)
if len(vpg_vms) > 1: # Keep at least one VM
vm_to_remove = vpg_vms[-1] # Remove the last VM
vm_identifier = vm_to_remove.get('VmIdentifier')
task_id = client.vpgs.remove_vm_from_vpg(vpg_name, vm_identifier)
logging.info(f"Task ID: {task_id} to remove VM {vm_to_remove.get('VmName')} from VPG")
else:
logging.info("Skipping VM removal to maintain at least one VM in the VPG")
except Exception as e:
logging.error(f"VM management failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
+142 -39
View File
@@ -1,69 +1,172 @@
#!/usr/bin/env python3
"""
Exercise 5: VPG Operations - VPG Creation
This script demonstrates how to create and configure a VPG.
Exercise 5: VPG Operations - Template
This script demonstrates how to create and configure VPGs in your Zerto environment.
Prerequisites:
1. Install the zvml package in development mode:
cd /path/to/zvml-python-sdk
pip install -e .
2. Update prerequisites/config.py with your ZVM details
Usage:
python create_vpg.py --vm-names "vm1" "vm2" "vm3" [--vpg-name "My-VPG"]
Your task:
1. Implement the find_vms_by_names function to locate VMs by their names
2. Complete the VPG configuration with appropriate settings
3. Add the found VMs to the VPG
4. Implement VM removal functionality
The script should:
- Create a new VPG with basic settings
- Configure journal, recovery, and network settings
- Add specified VMs to the VPG
- Allow removing VMs from the VPG
"""
import sys
import os
import logging
import json
import argparse
from pathlib import Path
import urllib3
# Add the parent directory to the Python path to import the SDK
sys.path.append(str(Path(__file__).parent.parent.parent.parent))
# Suppress SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Add prerequisites to Python path
prerequisites_path = Path(__file__).parent.parent.parent.parent / "prerequisites"
sys.path.append(str(prerequisites_path))
# Import the SDK modules
from zvml import ZertoClient
from zvml.vpgs import VPG
from zvml.common import ZertoVPGError
from zvml import ZVMLClient
# Import configuration
try:
from prerequisites.config import (
from config import (
ZVM_HOST,
ZVM_PORT,
ZVM_SSL_VERIFY,
KEYCLOAK_SERVER_URL,
KEYCLOAK_REALM,
CLIENT_ID,
CLIENT_SECRET
)
except ImportError:
print("Error: Please copy config.example.py to config.py and update with your values")
print("Expected path:", prerequisites_path / "config.py")
sys.exit(1)
def parse_arguments():
"""Parse command line arguments."""
# TODO: Implement argument parsing
# Required arguments:
# --vm-names: List of VM names to add to the VPG
# Optional arguments:
# --vpg-name: Name of the VPG to create (default: "Test-VPG-Python")
pass
def find_vms_by_names(client, site_identifier, vm_names):
"""
Find VMs by their names in the specified site.
Args:
client: ZVMLClient instance
site_identifier: Identifier of the site to search in
vm_names: List of VM names to find
Returns:
tuple: (list of found VMs, list of not found VM names)
TODO: Implement the function to:
1. Get all unprotected VMs from the site
2. Create a dictionary for easy lookup
3. Find requested as an argument VMs
4. Return found and not found VMs
"""
pass
def remove_vm_from_vpg(client, vpg_name, vm):
"""
Remove a VM from the VPG.
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG
vm: VM object to remove
TODO: Implement the function to:
1. Get VM identifier
2. Call the appropriate API to remove the VM
3. Log the operation
"""
pass
def main():
"""
Main function to demonstrate VPG creation.
TODO: Implement the following steps:
1. Parse command line arguments
2. Create ZVMLClient instance
3. Identify local and peer sites
4. Get peer site resources (datastores, folders, networks, hosts)
5. Create VPG configuration
6. Create VPG
7. Find and add VMs to VPG
8. Implement interactive VM removal
"""
# Step 1: Create and authenticate ZertoClient
# TODO: Initialize ZertoClient and authenticate
# Hint: Reuse the authentication code from previous exercises
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Step 2: Get source and target sites
# TODO: Get local site and a peer site
# Hint: Use client.sites.get_local() and client.sites.list()
# Step 3: Configure VPG settings
# TODO: Create VPG settings with required parameters
# Required settings:
# - VPG name
# - Source site
# - Target site
# - Journal history
# - RPO
# - Test network
# - Recovery network
# Step 4: Create the VPG
# TODO: Create a new VPG with the configured settings
# Hint: Use client.vpgs.create() method
# Step 5: Validate the VPG
# TODO: Validate the VPG configuration
# Hint: Use vpg.validate() method
# Step 6: Handle errors
# TODO: Add error handling for VPG operations
# Hint: Use try/except blocks for ZertoVPGError
try:
# TODO: Step 1: Parse command line arguments
# args = parse_arguments()
# TODO: Step 2: Create ZVMLClient instance
# client = ZVMLClient(...)
# TODO: Step 3: Identify local and peer sites
# local_site = client.localsite.get_local_site()
# sites = client.virtualization_sites.get_virtualization_sites()
# TODO: Step 4: Get peer site resources
# peer_datastores = client.virtualization_sites.get_virtualization_site_datastores(...)
# peer_folders = client.virtualization_sites.get_virtualization_site_folders(...)
# peer_networks = client.virtualization_sites.get_virtualization_site_networks(...)
# peer_hosts = client.virtualization_sites.get_virtualization_site_hosts(...)
# TODO: Step 5: Create VPG configuration
# basic = {
# "Name": "Your VPG Name",
# "VpgType": "Remote",
# "RpoInSeconds": 300,
# "JournalHistoryInHours": 24,
# "Priority": "Medium",
# "UseWanCompression": True,
# "ProtectedSiteIdentifier": local_site_identifier,
# "RecoverySiteIdentifier": peer_site_identifier
# }
# TODO: Step 6: Create VPG
# vpg_id = client.vpgs.create_vpg(...)
# TODO: Step 7: Find and add VMs to VPG
# found_vms, not_found = find_vms_by_names(...)
# for vm in found_vms:
# # Add VM to VPG
# TODO: Step 8: Implement interactive VM removal
# if found_vms:
# # Ask user if they want to remove the last VM
# # If yes, remove it
except Exception as e:
logging.error(f"VPG operation failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
@@ -1,70 +0,0 @@
#!/usr/bin/env python3
"""
Exercise 5: VPG Operations - VM Management
This script demonstrates how to manage VMs within a VPG.
"""
import sys
from pathlib import Path
# Add the parent directory to the Python path to import the SDK
sys.path.append(str(Path(__file__).parent.parent.parent.parent))
# Import the SDK modules
from zvml import ZertoClient
from zvml.vpgs import VPG
from zvml.common import ZertoVPGError
# Import configuration
try:
from prerequisites.config import (
ZVM_HOST,
ZVM_PORT,
ZVM_SSL_VERIFY,
KEYCLOAK_SERVER_URL,
KEYCLOAK_REALM,
CLIENT_ID,
CLIENT_SECRET
)
except ImportError:
print("Error: Please copy config.example.py to config.py and update with your values")
sys.exit(1)
def main():
"""
Main function to demonstrate VM management in VPGs.
"""
# Step 1: Create and authenticate ZertoClient
# TODO: Initialize ZertoClient and authenticate
# Hint: Reuse the authentication code from previous exercises
# Step 2: Get the VPG
# TODO: Find and get the VPG you want to manage
# Hint: Use client.vpgs.list() and client.vpgs.get()
# Step 3: List current VMs in the VPG
# TODO: Get a list of VMs currently in the VPG
# Hint: Use vpg.get_vms() method
# Step 4: Add VMs to the VPG
# TODO: Add one or more VMs to the VPG
# Required steps:
# - Find eligible VMs
# - Configure VM settings
# - Add VMs to VPG
# Hint: Use vpg.add_vms() method
# Step 5: Remove VMs from the VPG
# TODO: Remove one or more VMs from the VPG
# Hint: Use vpg.remove_vms() method
# Step 6: Validate VPG after changes
# TODO: Validate the VPG after VM changes
# Hint: Use vpg.validate() method
# Step 7: Handle errors
# TODO: Add error handling for VM operations
# Hint: Use try/except blocks for ZertoVPGError
if __name__ == "__main__":
main()
@@ -0,0 +1,180 @@
#!/usr/bin/env python3
"""
Exercise 6: Failover Testing - Solution
This script demonstrates how to perform a failover test on a VPG.
Prerequisites:
1. Install the zvml package in development mode:
cd /path/to/zvml-python-sdk
pip install -e .
2. Update prerequisites/config.py with your ZVM details
Usage:
python failover.py --vpg-name "My-VPG"
This solution demonstrates:
- Finding a VPG by name
- Starting a failover test with default settings
- Monitoring test progress
- Stopping the test when requested
"""
import sys
import os
import logging
import json
import argparse
import time
from pathlib import Path
import urllib3
# Suppress SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Add prerequisites to Python path
prerequisites_path = Path(__file__).parent.parent.parent.parent / "prerequisites"
sys.path.append(str(prerequisites_path))
# Import the SDK modules
from zvml import ZVMLClient
# Import configuration
try:
from config import (
ZVM_HOST,
ZVM_PORT,
ZVM_SSL_VERIFY,
CLIENT_ID,
CLIENT_SECRET
)
except ImportError:
print("Error: Please copy config.example.py to config.py and update with your values")
print("Expected path:", prerequisites_path / "config.py")
sys.exit(1)
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description='Perform failover test on a VPG')
parser.add_argument('--vpg-name', required=True,
help='Name of the VPG to test')
return parser.parse_args()
def find_vpg_by_name(client, vpg_name):
"""
Find a VPG by its name.
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG to find
Returns:
dict: VPG object if found, None otherwise
"""
vpg = client.vpgs.list_vpgs(vpg_name=vpg_name)
# logging.info(f"Found vpg {json.dumps(vpg, indent=4)}")
return vpg if vpg else None
def start_failover_test(client, vpg_name):
"""
Start a failover test for the specified VPG using default settings.
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG to test
Returns:
str: Test identifier
"""
logging.info(f"Starting failover test for VPG '{vpg_name}'")
# Start the test with default settings
response = client.vpgs.failover_test(
vpg_name=vpg_name,
sync=True # Wait for the test to start
)
logging.info(f"Faiolver test response: {response}")
return response
def monitor_test_progress(client, vpg_name, test_id):
"""
Monitor the progress of a failover test.
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG
test_id: Test identifier
Returns:
bool: True if test completed successfully, False otherwise
"""
test_status = client.vpgs.get_vpg_test_status(vpg_name, test_id)
status = test_status.get('Status')
progress = test_status.get('Progress', 0)
logging.info(f"Test status: {status} (Progress: {progress}%)")
if status == 'Succeeded':
return True
elif status in ['Failed', 'Stopped']:
logging.error(f"Test {status.lower()}: {test_status.get('Message', 'No message')}")
return False
return False # Test is still running
def stop_failover_test(client, vpg_name):
"""
Stop a running failover test.
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG
"""
logging.info(f"Stopping faiolver test for VPG '{vpg_name}'...")
response = client.vpgs.stop_failover_test(vpg_name=vpg_name)
logging.info(f"Stop failover test response: {response}")
def main():
"""
Main function to demonstrate failover testing.
"""
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
try:
# Step 1: Parse command line arguments
args = parse_arguments()
# Step 2: Create ZVMLClient instance
logging.info(f"Initializing ZVMLClient for ZVM at {ZVM_HOST}")
client = ZVMLClient(
zvm_address=ZVM_HOST,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
verify_certificate=ZVM_SSL_VERIFY
)
# Step 3: Find the VPG
vpg = find_vpg_by_name(client, args.vpg_name)
if not vpg:
logging.error(f"VPG '{args.vpg_name}' not found!")
sys.exit(1)
# Step 4: Start failover test
response = start_failover_test(client, args.vpg_name)
# Step 5: Handle test stop request
response = input("\nWould you like to stop the test? (yes/no): ").lower()
if response in ['yes', 'y']:
stop_failover_test(client, args.vpg_name)
except Exception as e:
logging.error(f"Failover test failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
+166 -43
View File
@@ -1,74 +1,197 @@
#!/usr/bin/env python3
"""
Exercise 6: Failover Testing
This script demonstrates how to perform and manage failover tests.
Exercise 6: Failover Testing - Template
This script demonstrates how to perform a failover test on a VPG.
Prerequisites:
1. Install the zvml package in development mode:
cd /path/to/zvml-python-sdk
pip install -e .
2. Update prerequisites/config.py with your ZVM details
Usage:
python failover.py --vpg-name "My-VPG"
Your task:
1. Implement VPG lookup by name using get_vpgs()
2. Start a failover test using failover_test() method
3. Monitor test progress using get_vpg_test_status()
4. Stop the test using stop_vpg_test() method when requested
The script should:
- Find the VPG by name and verify it exists
- Start a failover test with default settings
- Monitor the test progress and status
- Allow stopping the test when requested
"""
import sys
import os
import logging
import json
import argparse
import time
from pathlib import Path
import urllib3
# Add the parent directory to the Python path to import the SDK
sys.path.append(str(Path(__file__).parent.parent.parent.parent))
# Suppress SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Add prerequisites to Python path
prerequisites_path = Path(__file__).parent.parent.parent.parent / "prerequisites"
sys.path.append(str(prerequisites_path))
# Import the SDK modules
from zvml import ZertoClient
from zvml.vpgs import VPG
from zvml.common import ZertoVPGError
from zvml import ZVMLClient
# Import configuration
try:
from prerequisites.config import (
from config import (
ZVM_HOST,
ZVM_PORT,
ZVM_SSL_VERIFY,
KEYCLOAK_SERVER_URL,
KEYCLOAK_REALM,
CLIENT_ID,
CLIENT_SECRET
)
except ImportError:
print("Error: Please copy config.example.py to config.py and update with your values")
print("Expected path:", prerequisites_path / "config.py")
sys.exit(1)
def parse_arguments():
"""Parse command line arguments."""
# TODO: Implement argument parsing
# Required argument:
# --vpg-name: Name of the VPG to test
pass
def find_vpg_by_name(client, vpg_name):
"""
Find a VPG by its name using get_vpgs() method.
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG to find
Returns:
dict: VPG object if found, None otherwise
TODO: Implement the function to:
1. Call client.vpgs.get_vpgs() to get all VPGs
2. Find the VPG with matching name
3. Log the VPG details if found
4. Return the VPG object or None
"""
pass
def start_failover_test(client, vpg_name):
"""
Start a failover test for the specified VPG using failover_test() method.
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG to test
Returns:
str: Test identifier (task_id)
TODO: Implement the function to:
1. Call client.vpgs.failover_test() with sync=True
2. Log the test initiation
3. Return the task_id
"""
pass
def monitor_test_progress(client, vpg_name, test_id):
"""
Monitor the progress of a failover test using get_vpg_test_status().
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG
test_id: Test identifier (task_id)
Returns:
bool: True if test completed successfully, False otherwise
TODO: Implement the function to:
1. Call client.vpgs.get_vpg_test_status() to get test status
2. Log the status and progress
3. Return True for 'Succeeded', False for 'Failed' or 'Stopped'
4. Return False if test is still running
"""
pass
def stop_failover_test(client, vpg_name, test_id):
"""
Stop a running failover test using stop_vpg_test() method.
Args:
client: ZVMLClient instance
vpg_name: Name of the VPG
test_id: Test identifier (task_id)
TODO: Implement the function to:
1. Call client.vpgs.stop_vpg_test() with sync=True
2. Wait for the stop operation to complete
3. Log the stop operation status
"""
pass
def main():
"""
Main function to demonstrate failover testing.
TODO: Implement the following steps:
1. Parse command line arguments for VPG name
2. Create ZVMLClient instance
3. Find the VPG by name using find_vpg_by_name()
4. Start failover test using start_failover_test()
5. Monitor test progress and handle stop request:
- Monitor progress using monitor_test_progress()
- If test completes successfully, exit
- If user requests to stop, call stop_failover_test()
"""
# Step 1: Create and authenticate ZertoClient
# TODO: Initialize ZertoClient and authenticate
# Hint: Reuse the authentication code from previous exercises
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Step 2: Get the VPG
# TODO: Find and get the VPG you want to test
# Hint: Use client.vpgs.list() and client.vpgs.get()
# Step 3: Initiate failover test
# TODO: Start a failover test for the VPG
# Required steps:
# - Configure test settings
# - Start the test
# Hint: Use vpg.start_test() method
# Step 4: Monitor test progress
# TODO: Monitor the test status until completion
# Required steps:
# - Get test status
# - Check for completion
# - Handle any errors
# Hint: Use vpg.get_test_status() method
# Step 5: Stop the test
# TODO: Stop the running test
# Hint: Use vpg.stop_test() method
# Step 6: Clean up
# TODO: Ensure proper cleanup after the test
# Hint: Check if any cleanup is needed
# Step 7: Handle errors
# TODO: Add error handling for test operations
# Hint: Use try/except blocks for ZertoVPGError
try:
# TODO: Step 1: Parse command line arguments
# args = parse_arguments()
# TODO: Step 2: Create ZVMLClient instance
# client = ZVMLClient(...)
# TODO: Step 3: Find the VPG
# vpg = find_vpg_by_name(client, args.vpg_name)
# if not vpg:
# logging.error(f"VPG '{args.vpg_name}' not found!")
# sys.exit(1)
# TODO: Step 4: Start failover test
# test_id = start_failover_test(client, args.vpg_name)
# TODO: Step 5: Monitor test progress and handle stop request
# while True:
# success = monitor_test_progress(client, args.vpg_name, test_id)
# if success:
# logging.info("Test completed successfully")
# break
#
# # Check if user wants to stop the test
# response = input("\nWould you like to stop the test? (yes/no): ").lower()
# if response in ['yes', 'y']:
# stop_failover_test(client, args.vpg_name, test_id)
# break
#
# time.sleep(10) # Wait before next status check
except Exception as e:
logging.error(f"Failover test failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
+120 -47
View File
@@ -1,57 +1,130 @@
# Exercise 7: Bulk Operations
# Exercise 7: Bulk VPG NIC Settings Management
## Overview
This final exercise covers bulk operations, focusing on managing multiple VMs efficiently. You'll learn how to perform operations on multiple VMs simultaneously.
## Objectives
- Perform bulk IP modifications
- Manage multiple VMs
- Handle bulk operations efficiently
- Monitor bulk task progress
## Time
5 minutes
This exercise demonstrates how to perform bulk operations on VPG NIC settings using two provided scripts:
1. `export_vpg_settings_nics_to_csv.py` - Exports current VPG NIC settings to a CSV file
2. `import_vpg_settings_nics_from_csv.py` - Imports and applies NIC settings from a CSV file
## Prerequisites
- Completed Exercise 6
- Working VPGs with multiple VMs
- Access to VM management
## Exercise Steps
1. Prepare VM list
2. Configure bulk operations
3. Execute bulk IP changes
4. Monitor operation progress
5. Verify changes
1. Python 3.6 or higher
2. Zerto Python SDK (`zvml` package) installed in development mode:
```bash
cd /path/to/zvml-python-sdk
pip install -e .
```
3. Access to a Zerto Virtual Manager (ZVM) with appropriate permissions
4. VPGs already created and configured in your environment
## Working Directory
The `working` directory contains:
- `bulk_ip.py` - Template to complete
- `vm_list.csv` - Sample VM list
## Exercise Overview
## Solution
The `solution` directory contains:
- `bulk_ip.py` - Complete working example
- `vm_list.csv` - Example VM list
This exercise will guide you through the process of:
1. Exporting current VPG NIC settings to a CSV file
2. Modifying the CSV file to update IP configurations
3. Importing and applying the updated settings back to the VPGs
## Key Concepts
- Bulk operations
- IP management
- Task monitoring
- Error handling
## Step 1: Export Current VPG Settings
## Common Issues
- Invalid IP configurations
- Operation timeouts
- Partial failures
- Resource constraints
Use the export script to save current VPG NIC settings to a CSV file:
## Lab Completion
Congratulations! You have completed all exercises in the Zerto Python SDK Hands-On Lab. You should now have a good understanding of:
- Zerto API basics
- Authentication and connection
- Site and resource management
- VPG operations
- Testing and bulk operations
```bash
python export_vpg_settings_nics_to_csv.py \
--zvm_address "192.168.111.20" \
--client_id "zerto-api" \
--client_secret "your-secret-here" \
--vpg_names "VpgTest1,VpgTest2" \
--ignore_ssl
```
Please complete the feedback form to help us improve the lab content.
The script will create two files:
- `ExportedSettings_[timestamp].json` - Full VPG settings in JSON format
- `ExportedSettings_[timestamp].csv` - NIC settings in CSV format
## Step 2: Modify the CSV File
Open the generated CSV file in a spreadsheet application (e.g., Microsoft Excel, Google Sheets) and modify the following settings as needed:
1. **Failover Network Settings**:
- `Failover ShouldReplaceIpConfiguration` - Set to "True" to modify IP settings
- `Failover Network` - Network identifier for failover
- `Failover DHCP` - Set to "True" for DHCP or "False" for static IP
- `Failover IP` - Static IP address (required if DHCP is False)
- `Failover Subnet` - Subnet mask (required if DHCP is False)
- `Failover Gateway` - Default gateway (optional)
- `Failover DNS1` - Primary DNS server (optional)
- `Failover DNS2` - Secondary DNS server (optional)
2. **Failover Test Network Settings**:
- `Failover Test ShouldReplaceIpConfiguration` - Set to "True" to modify IP settings
- `Failover Test Network` - Network identifier for test failover
- `Failover Test DHCP` - Set to "True" for DHCP or "False" for static IP
- `Failover Test IP` - Static IP address (required if DHCP is False)
- `Failover Test Subnet` - Subnet mask (required if DHCP is False)
- `Failover Test Gateway` - Default gateway (optional)
- `Failover Test DNS1` - Primary DNS server (optional)
- `Failover Test DNS2` - Secondary DNS server (optional)
## Step 3: Import Updated Settings
Use the import script to apply the modified settings:
```bash
python import_vpg_settings_nics_from_csv.py \
--zvm_address "192.168.111.20" \
--client_id "zerto-api" \
--client_secret "your-secret-here" \
--csv_file "ExportedSettings_2024-03-14_12-34-56.csv" \
--vpg_names "VpgTest1,VpgTest2" \
--ignore_ssl
```
The script will:
1. Validate the settings in the CSV file
2. Show a summary of changes to be applied
3. Ask for confirmation before applying changes
4. Apply the changes and commit them to the VPGs
## Important Notes
1. **Backup**: Always keep a backup of the original CSV file before making changes
2. **Validation**: The import script validates settings before applying them:
- Checks for conflicting DHCP and static IP settings
- Verifies required fields are present
- Ensures network identifiers are valid
3. **Safety**: The script requires explicit confirmation before applying changes
4. **Rollback**: If needed, you can re-import the original settings from the backup CSV
## Common Use Cases
1. **Bulk IP Address Changes**:
- Export current settings
- Update IP addresses in the CSV
- Import modified settings
2. **Network Migration**:
- Export current settings
- Update network identifiers
- Import modified settings
3. **DHCP to Static IP Conversion**:
- Export current settings
- Set `ShouldReplaceIpConfiguration` to "True"
- Set `DHCP` to "False"
- Add static IP settings
- Import modified settings
## Troubleshooting
1. **Validation Errors**:
- Ensure `ShouldReplaceIpConfiguration` is set to "True" when modifying IP settings
- Check that DHCP and static IP settings are not conflicting
- Verify all required fields are filled when using static IP
2. **Import Failures**:
- Verify network identifiers exist in the target site
- Check that IP addresses are in the correct format
- Ensure you have sufficient permissions on the ZVM
3. **CSV Format Issues**:
- Keep the original column headers
- Don't modify the `VPG Name`, `VM Identifier`, or `NIC Identifier` columns
- Use "True" or "False" (case-insensitive) for boolean values
@@ -0,0 +1,250 @@
#!/usr/bin/env python3
# Legal Disclaimer
# This script is an example script and is not supported under any Zerto support program or service.
# The author and Zerto further disclaim all implied warranties including, without limitation,
# any implied warranties of merchantability or of fitness for a particular purpose.
# In no event shall Zerto, its authors or anyone else involved in the creation,
# production or delivery of the scripts be liable for any damages whatsoever (including,
# without limitation, damages for loss of business profits, business interruption, loss of business
# information, or other pecuniary loss) arising out of the use of or the inability to use the sample
# scripts or documentation, even if the author or Zerto has been advised of the possibility of such damages.
# The entire risk arising out of the use or performance of the sample scripts and documentation remains with you.
import argparse
import logging
import json
import csv
import sys
import os
from pathlib import Path
import urllib3
from typing import List, Dict
import codecs
# Add parent directory to path to import zvml
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from zvml import ZVMLClient
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
"""
Zerto VPG NIC Settings Export Script
This script exports Virtual Protection Group (VPG) NIC settings to a CSV file, focusing on network
and IP configuration details. It's designed to help with bulk management of VPG NIC settings.
Key Features:
1. VPG NIC Settings Export:
- Export NIC settings for specific VPGs or all VPGs
- Save settings to both JSON and CSV formats
- Include network and IP configuration details
- Capture DHCP and static IP settings
2. CSV Format:
- Organized by VPG, VM, and NIC
- Includes network identifiers
- DHCP settings (True/False)
- Static IP configuration (IP, Subnet, Gateway, DNS)
- ShouldReplaceIpConfiguration flag
3. Settings Management:
- Export current VPG settings
- Convert to CSV format
- Save with timestamp
- Support for Windows line endings
Required Arguments:
--zvm_address: ZVM address
--client_id: Keycloak client ID
--client_secret: Keycloak client secret
--ignore_ssl: Ignore SSL certificate verification (optional)
--vpg_names: Comma-separated list of VPG names to export (optional)
Example Usage:
python export_vpg_settings_nics_to_csv.py \
--zvm_address "192.168.111.20" \
--client_id "zerto-api" \
--client_secret "your-secret-here" \
--vpg_names "VpgTest1,VpgTest2" \
--ignore_ssl
Output Files:
- ExportedSettings_[timestamp].json: Full VPG settings in JSON format
- ExportedSettings_[timestamp].csv: NIC settings in CSV format
Note: This script is part of a pair with import_vpg_settings_nics_from_csv.py, allowing for
export and import of VPG NIC settings in bulk. The CSV format is designed to be easily
editable in spreadsheet applications.
"""
def setup_client(args):
"""Initialize and return Zerto client"""
client = ZVMLClient(
zvm_address=args.zvm_address,
client_id=args.client_id,
client_secret=args.client_secret,
verify_certificate=not args.ignore_ssl
)
return client
def extract_nic_settings(json_data):
"""Extract NIC settings from VPG JSON data."""
nic_settings = []
for vpg in json_data:
vpg_name = vpg['Basic']['Name']
for vm in vpg['Vms']:
vm_id = vm['VmIdentifier']
for nic in vm['Nics']:
nic_id = nic['NicIdentifier']
# Extract failover settings
failover = nic['Failover']['Hypervisor'] if nic['Failover'] and nic['Failover']['Hypervisor'] else {}
failover_network = failover.get('NetworkIdentifier', '')
failover_ip_config = failover.get('IpConfig', {}) or {}
# Extract failover test settings
failover_test = nic['FailoverTest']['Hypervisor'] if nic['FailoverTest'] and nic['FailoverTest']['Hypervisor'] else {}
failover_test_network = failover_test.get('NetworkIdentifier', '')
failover_test_ip_config = failover_test.get('IpConfig', {}) or {}
# Create a row for each NIC
row = {
'VPG Name': vpg_name,
'VM Identifier': vm_id,
'NIC Identifier': nic_id,
'Failover Network': failover_network,
'Failover ShouldReplaceIpConfiguration': str(failover.get('ShouldReplaceIpConfiguration', False)),
'Failover DHCP': str(failover_ip_config.get('IsDhcp', False)),
'Failover IP': failover_ip_config.get('StaticIp', ''),
'Failover Subnet': failover_ip_config.get('SubnetMask', ''),
'Failover Gateway': failover_ip_config.get('Gateway', ''),
'Failover DNS1': failover_ip_config.get('PrimaryDns', ''),
'Failover DNS2': failover_ip_config.get('SecondaryDns', ''),
'Failover Test Network': failover_test_network,
'Failover Test ShouldReplaceIpConfiguration': str(failover_test.get('ShouldReplaceIpConfiguration', False)),
'Failover Test DHCP': str(failover_test_ip_config.get('IsDhcp', False)),
'Failover Test IP': failover_test_ip_config.get('StaticIp', ''),
'Failover Test Subnet': failover_test_ip_config.get('SubnetMask', ''),
'Failover Test Gateway': failover_test_ip_config.get('Gateway', ''),
'Failover Test DNS1': failover_test_ip_config.get('PrimaryDns', ''),
'Failover Test DNS2': failover_test_ip_config.get('SecondaryDns', '')
}
nic_settings.append(row)
return nic_settings
def get_safe_filename(timestamp):
"""Convert timestamp to a URL-safe filename."""
# Replace colons with underscores and remove any other problematic characters
return timestamp.replace(':', '_').replace('/', '_').replace('\\', '_')
def setup_argparse() -> argparse.ArgumentParser:
"""Set up command line argument parsing."""
parser = argparse.ArgumentParser(description="Export VPG settings to CSV")
parser.add_argument("--zvm_address", required=True, help="ZVM address")
parser.add_argument('--client_id', required=True, help='Keycloak client ID')
parser.add_argument('--client_secret', required=True, help='Keycloak client secret')
parser.add_argument("--ignore_ssl", action="store_true", help="Ignore SSL certificate verification")
parser.add_argument("--vpg_names", help="Comma-separated list of VPG names to export (optional)")
parser.add_argument("--output_dir", default='.', help="Directory to save exported files (default: current directory)")
return parser
def ensure_output_dir(output_dir: str) -> None:
"""Ensure the output directory exists."""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
logging.info(f"Created output directory: {output_dir}")
def main():
parser = setup_argparse()
args = parser.parse_args()
try:
# Ensure output directory exists
ensure_output_dir(args.output_dir)
# Setup client
client = setup_client(args)
# Process VPG names if provided
vpg_names = None
if args.vpg_names:
vpg_names = [name.strip() for name in args.vpg_names.split(',')]
logging.info(f"Exporting settings for VPGs: {vpg_names}")
else:
logging.info("No VPG names provided, exporting all VPGs")
# Export VPG settings
print("\nExporting VPG settings...")
export_result = client.vpgs.export_vpg_settings(vpg_names)
if not export_result or 'TimeStamp' not in export_result:
logging.error("Failed to export VPG settings")
sys.exit(1)
timestamp = export_result['TimeStamp']
safe_timestamp = get_safe_filename(timestamp)
print(f"Export completed successfully. Timestamp: {timestamp}")
# Get the exported settings
export_settings = client.vpgs.read_exported_vpg_settings(timestamp, vpg_names)
# Save the JSON export
json_file_name = os.path.join(args.output_dir, f"ExportedSettings_{safe_timestamp}.json")
with open(json_file_name, 'w') as f:
json.dump(export_settings['ExportedVpgSettingsApi'], f, indent=2)
print(f"\nJSON export saved to: {json_file_name}")
# Convert to CSV
nic_settings = extract_nic_settings(export_settings['ExportedVpgSettingsApi'])
# Create CSV file with Windows line endings
csv_file_name = os.path.join(args.output_dir, f"ExportedSettings_{safe_timestamp}.csv")
fieldnames = [
'VPG Name', 'VM Identifier', 'NIC Identifier',
'Failover Network', 'Failover ShouldReplaceIpConfiguration', 'Failover DHCP',
'Failover IP', 'Failover Subnet', 'Failover Gateway',
'Failover DNS1', 'Failover DNS2',
'Failover Test Network', 'Failover Test ShouldReplaceIpConfiguration', 'Failover Test DHCP',
'Failover Test IP', 'Failover Test Subnet',
'Failover Test Gateway', 'Failover Test DNS1', 'Failover Test DNS2'
]
# Write CSV content directly
with open(csv_file_name, 'w', newline='') as f:
writer = csv.DictWriter(
f,
fieldnames=fieldnames,
delimiter=',',
quoting=csv.QUOTE_ALL,
quotechar='"',
lineterminator='\r\n'
)
writer.writeheader()
for row in nic_settings:
# Ensure all fields are present and properly formatted
for field in fieldnames:
if field not in row:
row[field] = ''
# No need to convert boolean values since they're already strings
writer.writerow(row)
print(f"CSV file created: {csv_file_name}")
except Exception as e:
logging.exception("Error occurred:")
sys.exit(1)
if __name__ == "__main__":
main()
@@ -0,0 +1,588 @@
#!/usr/bin/env python3
# Legal Disclaimer
# This script is an example script and is not supported under any Zerto support program or service.
# The author and Zerto further disclaim all implied warranties including, without limitation,
# any implied warranties of merchantability or of fitness for a particular purpose.
# In no event shall Zerto, its authors or anyone else involved in the creation,
# production or delivery of the scripts be liable for any damages whatsoever (including,
# without limitation, damages for loss of business profits, business interruption, loss of business
# information, or other pecuniary loss) arising out of the use of or the inability to use the sample
# scripts or documentation, even if the author or Zerto has been advised of the possibility of such damages.
# The entire risk arising out of the use or performance of the sample scripts and documentation remains with you.
import argparse
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
import json
import csv
import sys
import os
from pathlib import Path
import urllib3
from typing import List, Dict, Tuple
from datetime import datetime
# Add parent directory to path to import zvml
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from zvml import ZVMLClient
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
"""
Zerto VPG NIC Settings Import Script
This script imports Virtual Protection Group (VPG) NIC settings from a CSV file, allowing for
bulk updates of network and IP configurations. It's designed to work with the exported CSV
from export_vpg_settings_nics_to_csv.py.
Key Features:
1. VPG NIC Settings Import:
- Import NIC settings from CSV file
- Update specific VPGs or all VPGs
- Validate settings before applying
- Support for both DHCP and static IP configurations
2. Settings Validation:
- Validate DHCP and static IP settings
- Check ShouldReplaceIpConfiguration flag
- Ensure no conflicting configurations
- Verify network identifiers
3. Bulk Updates:
- Process multiple VPGs in one operation
- Show changes before applying
- Require confirmation before updates
- Detailed logging of changes
Required Arguments:
--zvm_address: ZVM address
--client_id: Keycloak client ID
--client_secret: Keycloak client secret
--ignore_ssl: Ignore SSL certificate verification (optional)
--csv_file: Path to the CSV file with updated settings
--vpg_names: Comma-separated list of VPG names to update (optional)
Example Usage:
python import_vpg_settings_nics_from_csv.py \
--zvm_address "192.168.111.20" \
--client_id "zerto-api" \
--client_secret "your-secret-here" \
--csv_file "ExportedSettings_2024-05-12.csv" \
--vpg_names "VpgTest1,VpgTest2" \
--ignore_ssl
CSV Format Requirements:
- Must include VPG Name, VM Identifier, and NIC Identifier
- DHCP values must be "True" or "False" (case-insensitive)
- ShouldReplaceIpConfiguration must be "True" to modify IP settings
- Static IP settings (IP, Subnet, Gateway, DNS) are optional when DHCP is True
Note: This script is part of a pair with export_vpg_settings_nics_to_csv.py. It's designed
to safely update VPG NIC settings in bulk, with validation and confirmation steps to
prevent unintended changes.
"""
def setup_client(args):
"""Initialize and return Zerto client"""
client = ZVMLClient(
zvm_address=args.zvm_address,
client_id=args.client_id,
client_secret=args.client_secret,
verify_certificate=not args.ignore_ssl
)
return client
def read_csv_settings(csv_path: str) -> List[Dict]:
"""Read settings from CSV file."""
settings = []
with open(csv_path, 'r', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
settings.append(row)
return settings
def get_current_settings(client: ZVMLClient, vpg_names: List[str] = None) -> Tuple[str, List[Dict]]:
"""Get current VPG settings and convert to CSV format."""
# Export current settings
export_result = client.vpgs.export_vpg_settings(vpg_names)
if not export_result or 'TimeStamp' not in export_result:
raise Exception("Failed to export VPG settings")
timestamp = export_result['TimeStamp']
export_settings = client.vpgs.read_exported_vpg_settings(timestamp, vpg_names)
# logging.info(f"get_current_settings: export_settings: {json.dumps(export_settings, indent=4)}")
# Convert to CSV format
nic_settings = []
for vpg in export_settings['ExportedVpgSettingsApi']:
vpg_name = vpg['Basic']['Name']
for vm in vpg['Vms']:
vm_id = vm['VmIdentifier']
for nic in vm['Nics']:
nic_id = nic['NicIdentifier']
# Extract failover settings
failover = nic['Failover']['Hypervisor'] if nic['Failover'] and nic['Failover']['Hypervisor'] else {}
failover_network = failover.get('NetworkIdentifier', '')
failover_ip_config = failover.get('IpConfig', {}) or {}
# Extract failover test settings
failover_test = nic['FailoverTest']['Hypervisor'] if nic['FailoverTest'] and nic['FailoverTest']['Hypervisor'] else {}
failover_test_network = failover_test.get('NetworkIdentifier', '')
failover_test_ip_config = failover_test.get('IpConfig', {}) or {}
row = {
'VPG Name': vpg_name,
'VM Identifier': vm_id,
'NIC Identifier': nic_id,
'Failover Network': failover_network,
'Failover ShouldReplaceIpConfiguration': str(failover.get('ShouldReplaceIpConfiguration', False)),
'Failover DHCP': str(failover_ip_config.get('IsDhcp', False)),
'Failover IP': failover_ip_config.get('StaticIp', ''),
'Failover Subnet': failover_ip_config.get('SubnetMask', ''),
'Failover Gateway': failover_ip_config.get('Gateway', ''),
'Failover DNS1': failover_ip_config.get('PrimaryDns', ''),
'Failover DNS2': failover_ip_config.get('SecondaryDns', ''),
'Failover Test Network': failover_test_network,
'Failover Test ShouldReplaceIpConfiguration': str(failover_test.get('ShouldReplaceIpConfiguration', False)),
'Failover Test DHCP': str(failover_test_ip_config.get('IsDhcp', False)),
'Failover Test IP': failover_test_ip_config.get('StaticIp', ''),
'Failover Test Subnet': failover_test_ip_config.get('SubnetMask', ''),
'Failover Test Gateway': failover_test_ip_config.get('Gateway', ''),
'Failover Test DNS1': failover_test_ip_config.get('PrimaryDns', ''),
'Failover Test DNS2': failover_test_ip_config.get('SecondaryDns', '')
}
nic_settings.append(row)
return timestamp, nic_settings
def normalize_value(value):
"""Normalize values for comparison."""
# Treat None, empty string, and 'None' as the same
if value in ['', None, 'None', 'null']:
return ''
if isinstance(value, bool):
return str(value).lower()
if isinstance(value, str):
value = value.lower()
if value == 'true':
return 'true'
if value == 'false':
return 'false'
return str(value)
def compare_settings(client, current: List[Dict], updated: List[Dict]) -> List[Dict]:
"""Compare current and updated settings and return changes."""
changes = []
# Create lookup dictionaries for faster comparison
current_lookup = {
(row['VPG Name'], row['VM Identifier'], row['NIC Identifier']): row
for row in current
}
def validate_dhcp_settings(client, row: Dict, vpg_name: str, vm_id: str, nic_id: str):
"""Validate that DHCP and IP settings are not conflicting."""
vm_name = client.vms.list_vms(vm_identifier=vm_id).get('VmName')
def validate_ip_settings(prefix: str):
should_replace = normalize_value(row.get(f'{prefix} ShouldReplaceIpConfiguration', '')) == 'true'
dhcp = normalize_value(row.get(f'{prefix} DHCP', '')) == 'true'
has_static_ip = any(row.get(f'{prefix} {field}') for field in ['IP', 'Subnet', 'Gateway', 'DNS1', 'DNS2'])
if not should_replace and (dhcp or has_static_ip):
raise ValueError(
f"Invalid configuration for VPG '{vpg_name}', VM Name '{vm_name}', VM ID '{vm_id}', NIC '{nic_id}': "
f"{prefix} ShouldReplaceIpConfiguration is False but IP settings are present. "
f"Set ShouldReplaceIpConfiguration to True to modify IP settings."
)
if should_replace and not dhcp and not has_static_ip:
raise ValueError(
f"Invalid configuration for VPG '{vpg_name}', VM Name '{vm_name}', VM ID '{vm_id}', NIC '{nic_id}': "
f"{prefix} ShouldReplaceIpConfiguration is True but no IP configuration is provided. "
f"Either set DHCP=True or provide IP configuration (IP, Subnet, Gateway, DNS1, DNS2)."
)
if dhcp and has_static_ip:
raise ValueError(
f"Invalid configuration for VPG '{vpg_name}', VM Name '{vm_name}', VM ID '{vm_id}', NIC '{nic_id}': "
f"Cannot have {prefix} DHCP=True and static IP settings. "
f"Please remove static IP settings or set DHCP=False."
)
# Validate both failover and failover test settings
validate_ip_settings('Failover')
validate_ip_settings('Failover Test')
for updated_row in updated:
key = (updated_row['VPG Name'], updated_row['VM Identifier'], updated_row['NIC Identifier'])
# Validate DHCP settings before processing changes
validate_dhcp_settings(
client,
updated_row,
updated_row['VPG Name'],
updated_row['VM Identifier'],
updated_row['NIC Identifier']
)
if key in current_lookup:
current_row = current_lookup[key]
row_changes = {}
# Compare each field
for field in updated_row:
if field in ['VPG Name', 'VM Identifier', 'NIC Identifier']:
continue
current_value = normalize_value(current_row.get(field, ''))
updated_value = normalize_value(updated_row.get(field, ''))
# Only include the change if the values are different after normalization
if current_value != updated_value:
row_changes[field] = {
'current': current_row.get(field, ''),
'updated': updated_row.get(field, '')
}
if row_changes:
vm_name = client.vms.list_vms(vm_identifier=updated_row['VM Identifier']).get('VmName')
logging.info(f"compare_settings: vm_name {vm_name}")
changes.append({
'VPG Name': updated_row['VPG Name'],
'VM Identifier': updated_row['VM Identifier'],
'NIC Identifier': updated_row['NIC Identifier'],
'VM Name': vm_name,
'changes': row_changes
})
return changes
def display_changes(client, changes: List[Dict]):
"""Display changes in a user-friendly format."""
if not changes:
print("\nNo changes found in the CSV file.")
return
# Group changes by VPG
vpg_changes = {}
for change in changes:
vpg_name = change['VPG Name']
if vpg_name not in vpg_changes:
vpg_changes[vpg_name] = {}
vm_id = change['VM Identifier']
if vm_id not in vpg_changes[vpg_name]:
vpg_changes[vpg_name][vm_id] = {}
nic_id = change['NIC Identifier']
vpg_changes[vpg_name][vm_id][nic_id] = change['changes']
print("\nThe following changes will be applied:")
print("=" * 80)
for vpg_name, vm_changes in vpg_changes.items():
# Skip VPGs with no actual changes
has_vpg_changes = False
for vm_id, nic_changes in vm_changes.items():
for nic_id, changes in nic_changes.items():
if any(values['current'] != values['updated'] for values in changes.values()):
has_vpg_changes = True
break
if has_vpg_changes:
break
if not has_vpg_changes:
continue
print(f"\nVPG: {vpg_name}")
print("-" * 40)
for vm_id, nic_changes in vm_changes.items():
# Skip VMs with no actual changes
has_vm_changes = False
for nic_id, changes in nic_changes.items():
if any(values['current'] != values['updated'] for values in changes.values()):
has_vm_changes = True
break
if not has_vm_changes:
continue
vm_name = client.vms.list_vms(vm_identifier=vm_id).get('VmName')
print(f" VM name: {vm_name}, VM ID: {vm_id}")
for nic_id, changes in nic_changes.items():
# Skip NICs with no actual changes
if not any(values['current'] != values['updated'] for values in changes.values()):
continue
print(f" NIC: {nic_id}")
print(" Changes:")
for field, values in changes.items():
# Only show fields that have actual changes
if values['current'] != values['updated']:
print(f" {field}:")
print(f" Current: {values['current']}")
print(f" Updated: {values['updated']}")
print()
print("=" * 80)
print(f"\nTotal changes: {len(changes)} NIC(s) across {len(vpg_changes)} VPG(s)")
def update_vpg_settings(client: ZVMLClient, changes: List[Dict]):
"""Update VPG settings based on changes."""
# Group changes by VPG
vpg_changes = {}
for change in changes:
vpg_name = change['VPG Name']
if vpg_name not in vpg_changes:
vpg_changes[vpg_name] = []
vpg_changes[vpg_name].append(change)
# Process each VPG
for vpg_name, vpg_change_list in vpg_changes.items():
logging.info(f"update_vpg_settings: Processing VPG: {vpg_name}")
logging.info(f"update_vpg_settings: VPG change list: {json.dumps(vpg_change_list, indent=4)}")
# Get VPG identifier
vpg_info = client.vpgs.list_vpgs(vpg_name=vpg_name)
if not vpg_info:
logging.error(f"update_vpg_settings: VPG {vpg_name} not found")
continue
vpg_identifier = vpg_info['VpgIdentifier']
# Create new VPG settings
vpg_settings_id = client.vpgs.create_vpg_settings(vpg_identifier=vpg_identifier)
vpg_settings = client.vpgs.get_vpg_settings_by_id(vpg_settings_id)
logging.info(f"update_vpg_settings: VPG settings: {json.dumps(vpg_settings, indent=4)}")
# Process each NIC change
for change in vpg_change_list:
vm_id = change['VM Identifier']
nic_id = change['NIC Identifier']
vm_name = change['VM Name']
logging.info(f"update_vpg_settings: Processing NIC: {nic_id} for VM: {vm_name} VM ID: {vm_id}")
# Find the VM and NIC in the settings
vm = None
for v in vpg_settings['Vms']:
if v['VmIdentifier'] == vm_id:
vm = v
# logging.info(f"update_vpg_settings: Found VM: {vm_id} in VPG {vpg_name} vm={json.dumps(vm, indent=4)}")
break
if not vm:
logging.error(f"update_vpg_settings: VM {vm_id} not found in VPG {vpg_name}")
continue
# Find the NIC
nic = None
for n in vm['Nics']:
if n['NicIdentifier'] == nic_id:
nic = n
logging.info(f"update_vpg_settings: Found NIC: {nic_id} in VM {vm_name} VPG {vpg_name} nic={json.dumps(nic, indent=4)}")
break
if not nic:
logging.error(f"update_vpg_settings: NIC {nic_id} not found in VM {vm_id}")
continue
# Initialize structures if needed
if not nic.get('Failover'):
nic['Failover'] = {'Hypervisor': {}}
if not nic.get('FailoverTest'):
nic['FailoverTest'] = {'Hypervisor': {}}
# Process each change for this NIC
for field, values in change['changes'].items():
# Handle Failover settings
if field in ['Failover Network', 'Failover ShouldReplaceIpConfiguration', 'Failover IP',
'Failover Subnet', 'Failover Gateway', 'Failover DNS1', 'Failover DNS2',
'Failover DHCP']:
if field == 'Failover ShouldReplaceIpConfiguration':
nic['Failover']['Hypervisor']['ShouldReplaceIpConfiguration'] = normalize_value(values['updated']) == 'true'
elif field == 'Failover Network':
nic['Failover']['Hypervisor']['NetworkIdentifier'] = values['updated']
elif field == 'Failover DHCP':
if not nic['Failover']['Hypervisor'].get('IpConfig'):
nic['Failover']['Hypervisor']['IpConfig'] = {
'StaticIp': None,
'SubnetMask': None,
'Gateway': None,
'PrimaryDns': None,
'SecondaryDns': None,
'IsDhcp': False
}
nic['Failover']['Hypervisor']['IpConfig']['IsDhcp'] = normalize_value(values['updated']) == 'true'
# If DHCP is enabled, clear other IP settings
if normalize_value(values['updated']) == 'true':
nic['Failover']['Hypervisor']['IpConfig'].update({
'StaticIp': None,
'SubnetMask': None,
'Gateway': None,
'PrimaryDns': None,
'SecondaryDns': None
})
elif field in ['Failover IP', 'Failover Subnet', 'Failover Gateway',
'Failover DNS1', 'Failover DNS2']:
if not nic['Failover']['Hypervisor'].get('IpConfig'):
nic['Failover']['Hypervisor']['IpConfig'] = {
'StaticIp': None,
'SubnetMask': None,
'Gateway': None,
'PrimaryDns': None,
'SecondaryDns': None,
'IsDhcp': False
}
if field == 'Failover IP':
nic['Failover']['Hypervisor']['IpConfig']['StaticIp'] = values['updated'] if values['updated'] else None
elif field == 'Failover Subnet':
nic['Failover']['Hypervisor']['IpConfig']['SubnetMask'] = values['updated'] if values['updated'] else '255.255.255.0'
elif field == 'Failover Gateway':
nic['Failover']['Hypervisor']['IpConfig']['Gateway'] = values['updated'] if values['updated'] else None
elif field == 'Failover DNS1':
nic['Failover']['Hypervisor']['IpConfig']['PrimaryDns'] = values['updated'] if values['updated'] else None
elif field == 'Failover DNS2':
nic['Failover']['Hypervisor']['IpConfig']['SecondaryDns'] = values['updated'] if values['updated'] else None
# Handle Failover Test settings
elif field in ['Failover Test Network', 'Failover Test ShouldReplaceIpConfiguration',
'Failover Test IP', 'Failover Test Subnet', 'Failover Test Gateway',
'Failover Test DNS1', 'Failover Test DNS2', 'Failover Test DHCP']:
if field == 'Failover Test ShouldReplaceIpConfiguration':
nic['FailoverTest']['Hypervisor']['ShouldReplaceIpConfiguration'] = normalize_value(values['updated']) == 'true'
elif field == 'Failover Test Network':
nic['FailoverTest']['Hypervisor']['NetworkIdentifier'] = values['updated']
elif field == 'Failover Test DHCP':
if not nic['FailoverTest']['Hypervisor'].get('IpConfig'):
nic['FailoverTest']['Hypervisor']['IpConfig'] = {
'StaticIp': None,
'SubnetMask': None,
'Gateway': None,
'PrimaryDns': None,
'SecondaryDns': None,
'IsDhcp': False
}
nic['FailoverTest']['Hypervisor']['IpConfig']['IsDhcp'] = normalize_value(values['updated']) == 'true'
# If DHCP is enabled, clear other IP settings
if normalize_value(values['updated']) == 'true':
nic['FailoverTest']['Hypervisor']['IpConfig'].update({
'StaticIp': None,
'SubnetMask': None,
'Gateway': None,
'PrimaryDns': None,
'SecondaryDns': None
})
elif field in ['Failover Test IP', 'Failover Test Subnet', 'Failover Test Gateway',
'Failover Test DNS1', 'Failover Test DNS2']:
if not nic['FailoverTest']['Hypervisor'].get('IpConfig'):
nic['FailoverTest']['Hypervisor']['IpConfig'] = {
'StaticIp': None,
'SubnetMask': None,
'Gateway': None,
'PrimaryDns': None,
'SecondaryDns': None,
'IsDhcp': False
}
if field == 'Failover Test IP':
nic['FailoverTest']['Hypervisor']['IpConfig']['StaticIp'] = values['updated'] if values['updated'] else None
elif field == 'Failover Test Subnet':
nic['FailoverTest']['Hypervisor']['IpConfig']['SubnetMask'] = values['updated'] if values['updated'] else '255.255.255.0'
elif field == 'Failover Test Gateway':
nic['FailoverTest']['Hypervisor']['IpConfig']['Gateway'] = values['updated'] if values['updated'] else None
elif field == 'Failover Test DNS1':
nic['FailoverTest']['Hypervisor']['IpConfig']['PrimaryDns'] = values['updated'] if values['updated'] else None
elif field == 'Failover Test DNS2':
nic['FailoverTest']['Hypervisor']['IpConfig']['SecondaryDns'] = values['updated'] if values['updated'] else None
logging.info(f"update_vpg_settings: Updated NIC structure: VPG {vpg_name} VM {vm_name} NIC {nic_id} nic={json.dumps(nic, indent=4)}")
# Update VPG settings with all changes
logging.info(f"update_vpg_settings: Updating VPG settings for {vpg_name}")
logging.info(f"update_vpg_settings: VPG settings: {json.dumps(vpg_settings, indent=4)}")
client.vpgs.update_vpg_settings(vpg_settings_id, vpg_settings)
# Commit changes
logging.info(f"update_vpg_settings: Committing changes for VPG: {vpg_name}")
client.vpgs.commit_vpg(vpg_settings_id, vpg_name, sync=False)
logging.info(f"update_vpg_settings: Successfully updated VPG: {vpg_name}")
def main():
parser = argparse.ArgumentParser(description="Import VPG settings from CSV")
parser.add_argument("--zvm_address", required=True, help="ZVM address")
parser.add_argument('--client_id', required=True, help='Keycloak client ID')
parser.add_argument('--client_secret', required=True, help='Keycloak client secret')
parser.add_argument("--ignore_ssl", action="store_true", help="Ignore SSL certificate verification")
parser.add_argument("--csv_file", required=True, help="Path to the CSV file with updated settings")
parser.add_argument("--vpg_names", help="Comma-separated list of VPG names to update (optional)")
args = parser.parse_args()
try:
# Setup client
client = setup_client(args)
# Process VPG names if provided
vpg_names = None
if args.vpg_names:
vpg_names = [name.strip() for name in args.vpg_names.split(',')]
logging.info(f"Updating settings for VPGs: {json.dumps(vpg_names, indent=4)}")
else:
logging.info("No VPG names provided, will update all VPGs in the CSV file")
# Read updated settings from CSV
print("\nReading updated settings from CSV...")
updated_settings = read_csv_settings(args.csv_file)
# logging.info(f"Updated settings: {updated_settings}")
# Get current settings
print("Getting current VPG settings...")
timestamp, current_settings = get_current_settings(client, vpg_names)
# Compare settings
print("Comparing settings...")
try:
changes = compare_settings(client, current_settings, updated_settings)
except ValueError as e:
print(f"\nError: {str(e)}")
print("\nPlease fix the configuration in the CSV file and try again.")
return
# Display changes
display_changes(client, changes)
if not changes:
print("\nNo changes to apply.")
return
# Ask for confirmation
while True:
response = input("\nDo you want to apply these changes? (yes/no): ").lower()
if response in ['yes', 'y']:
break
elif response in ['no', 'n']:
print("Changes cancelled.")
return
else:
print("Please answer 'yes' or 'no'.")
# Apply changes
print("\nApplying changes...")
update_vpg_settings(client, changes)
print("\nAll changes have been applied successfully.")
except Exception as e:
if isinstance(e, ValueError):
print(f"\nError: {str(e)}")
print("\nPlease fix the configuration in the CSV file and try again.")
else:
logging.exception("Error occurred:")
sys.exit(1)
if __name__ == "__main__":
main()
@@ -1,82 +0,0 @@
#!/usr/bin/env python3
"""
Exercise 7: Bulk Operations
This script demonstrates how to perform bulk IP modifications on multiple VMs.
"""
import sys
import csv
from pathlib import Path
# Add the parent directory to the Python path to import the SDK
sys.path.append(str(Path(__file__).parent.parent.parent.parent))
# Import the SDK modules
from zvml import ZertoClient
from zvml.vpgs import VPG
from zvml.common import ZertoVPGError
# Import configuration
try:
from prerequisites.config import (
ZVM_HOST,
ZVM_PORT,
ZVM_SSL_VERIFY,
KEYCLOAK_SERVER_URL,
KEYCLOAK_REALM,
CLIENT_ID,
CLIENT_SECRET
)
except ImportError:
print("Error: Please copy config.example.py to config.py and update with your values")
sys.exit(1)
def read_vm_list(csv_file):
"""
Read VM list from CSV file.
Expected format: vm_name,ip_address,subnet_mask,gateway
"""
# TODO: Implement CSV reading
# Hint: Use csv.DictReader
pass
def main():
"""
Main function to demonstrate bulk IP operations.
"""
# Step 1: Create and authenticate ZertoClient
# TODO: Initialize ZertoClient and authenticate
# Hint: Reuse the authentication code from previous exercises
# Step 2: Read VM list
# TODO: Read the VM list from CSV file
# Hint: Use the read_vm_list function
# Step 3: Get VPG
# TODO: Find and get the VPG containing the VMs
# Hint: Use client.vpgs.list() and client.vpgs.get()
# Step 4: Prepare IP changes
# TODO: Prepare the IP modification data
# Required for each VM:
# - VM identifier
# - New IP settings
# - Network information
# Step 5: Apply IP changes
# TODO: Apply the IP changes to all VMs
# Hint: Use vpg.modify_vm_ips() method
# Step 6: Monitor progress
# TODO: Monitor the bulk operation progress
# Required steps:
# - Track operation status
# - Handle any failures
# - Report results
# Step 7: Handle errors
# TODO: Add error handling for bulk operations
# Hint: Use try/except blocks for ZertoVPGError
if __name__ == "__main__":
main()
@@ -1,6 +0,0 @@
vm_name,ip_address,subnet_mask,gateway
vm-001,192.168.1.101,255.255.255.0,192.168.1.1
vm-002,192.168.1.102,255.255.255.0,192.168.1.1
vm-003,192.168.1.103,255.255.255.0,192.168.1.1
vm-004,192.168.1.104,255.255.255.0,192.168.1.1
vm-005,192.168.1.105,255.255.255.0,192.168.1.1
1 vm_name ip_address subnet_mask gateway
2 vm-001 192.168.1.101 255.255.255.0 192.168.1.1
3 vm-002 192.168.1.102 255.255.255.0 192.168.1.1
4 vm-003 192.168.1.103 255.255.255.0 192.168.1.1
5 vm-004 192.168.1.104 255.255.255.0 192.168.1.1
6 vm-005 192.168.1.105 255.255.255.0 192.168.1.1