#!/usr/bin/env python3 # Legal Disclaimer # This script is an example script and is not supported under any Zerto support program or service. # The author and Zerto further disclaim all implied warranties including, without limitation, # any implied warranties of merchantability or of fitness for a particular purpose. # In no event shall Zerto, its authors or anyone else involved in the creation, # production or delivery of the scripts be liable for any damages whatsoever (including, # without limitation, damages for loss of business profits, business interruption, loss of business # information, or other pecuniary loss) arising out of the use of or the inability to use the sample # scripts or documentation, even if the author or Zerto has been advised of the possibility of such damages. # The entire risk arising out of the use or performance of the sample scripts and documentation remains with you. """ Delete VPGs by VM list (CSV) This script reads a CSV file with a required "VMName" column (case-insensitive), validates each VM using the VMs.list_vms API, and deletes the VPG associated with each VM using VPGs.delete_vpg. Required Arguments: --zvm_address: ZVM address --client_id: Keycloak client ID --client_secret: Keycloak client secret --csv_file: Path to CSV file containing VMName column Optional Arguments: --ignore_ssl: Ignore SSL certificate verification Example Usage: python delete_vpgs_from_csv.py \ --zvm_address "192.168.111.20" \ --client_id "zerto-api" \ --client_secret "your-secret-here" \ --csv_file "vm_list.csv" \ --ignore_ssl """ import argparse import csv import logging import os import sys import urllib3 from typing import List, Optional # Add parent directory to Python path sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from zvml import ZVMLClient # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def setup_argparse() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Delete VPGs by VM list from a CSV file") parser.add_argument('--zvm_address', required=True, help='ZVM address') parser.add_argument('--client_id', required=True, help='Keycloak client ID') parser.add_argument('--client_secret', required=True, help='Keycloak client secret') parser.add_argument('--csv_file', required=True, help='Path to CSV file with VMName column') parser.add_argument('--ignore_ssl', action='store_true', help='Ignore SSL certificate validation') return parser def find_vmname_column(fieldnames: List[str]) -> Optional[str]: for field in fieldnames: if field and field.strip().lower() == "vmname": return field return None def load_vm_names(csv_file: str) -> List[str]: with open(csv_file, newline='', encoding='utf-8-sig') as file_handle: reader = csv.DictReader(file_handle) if not reader.fieldnames: raise ValueError("CSV file has no header row") vmname_column = find_vmname_column(reader.fieldnames) if not vmname_column: raise ValueError("CSV file must include a VMName column (case-insensitive)") vm_names: List[str] = [] for row_index, row in enumerate(reader, start=2): raw_name = row.get(vmname_column, "") vm_name = raw_name.strip() if raw_name else "" if not vm_name: logger.warning(f"Row {row_index} has empty VMName. Skipping.") continue vm_names.append(vm_name) if not vm_names: raise ValueError("No VM names found in CSV file") return vm_names def main() -> None: args = setup_argparse().parse_args() client = ZVMLClient( zvm_address=args.zvm_address, client_id=args.client_id, client_secret=args.client_secret, verify_certificate=not args.ignore_ssl ) vm_names = load_vm_names(args.csv_file) deleted_vpg_ids = set() for vm_name in vm_names: logger.info(f"Processing VM '{vm_name}'...") vms = client.vms.list_vms(vm_name=vm_name) if not vms: print(f"VM '{vm_name}' not found. Skipping.") continue if isinstance(vms, dict): vms = [vms] matching_vms = [ vm for vm in vms if vm.get("VmName", "").lower() == vm_name.lower() ] if not matching_vms: print(f"VM '{vm_name}' not found (case-insensitive match). Skipping.") continue for vm in matching_vms: vpg_identifier = vm.get("VpgIdentifier") if not vpg_identifier: print(f"VM '{vm_name}' has no VPG identifier. Skipping.") continue if vpg_identifier in deleted_vpg_ids: logger.info(f"VPG '{vpg_identifier}' already deleted. Skipping.") continue vpg_name = vm.get("VpgName") if not vpg_name: vpg_info = client.vpgs.list_vpgs(vpg_identifier=vpg_identifier) if isinstance(vpg_info, dict): vpg_name = vpg_info.get("VpgName") if not vpg_name: print(f"Could not determine VPG name for VM '{vm_name}' (ID: {vpg_identifier}). Skipping.") continue try: logger.info(f"Deleting VPG '{vpg_name}' (ID: {vpg_identifier}) for VM '{vm_name}'...") client.vpgs.delete_vpg(vpg_name=vpg_name, keep_recovery_volumes=False) deleted_vpg_ids.add(vpg_identifier) except Exception as exc: logger.error(f"Failed to delete VPG '{vpg_name}' for VM '{vm_name}': {exc}") continue if __name__ == '__main__': main()