added bulk delete vpg by vm example
This commit is contained in:
@@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Legal Disclaimer
|
||||
# This script is an example script and is not supported under any Zerto support program or service.
|
||||
# The author and Zerto further disclaim all implied warranties including, without limitation,
|
||||
# any implied warranties of merchantability or of fitness for a particular purpose.
|
||||
# In no event shall Zerto, its authors or anyone else involved in the creation,
|
||||
# production or delivery of the scripts be liable for any damages whatsoever (including,
|
||||
# without limitation, damages for loss of business profits, business interruption, loss of business
|
||||
# information, or other pecuniary loss) arising out of the use of or the inability to use the sample
|
||||
# scripts or documentation, even if the author or Zerto has been advised of the possibility of such damages.
|
||||
# The entire risk arising out of the use or performance of the sample scripts and documentation remains with you.
|
||||
|
||||
"""
|
||||
Delete VPGs by VM list (CSV)
|
||||
|
||||
This script reads a CSV file with a required "VMName" column (case-insensitive),
|
||||
validates each VM using the VMs.list_vms API, and deletes the VPG associated
|
||||
with each VM using VPGs.delete_vpg.
|
||||
|
||||
Required Arguments:
|
||||
--zvm_address: ZVM address
|
||||
--client_id: Keycloak client ID
|
||||
--client_secret: Keycloak client secret
|
||||
--csv_file: Path to CSV file containing VMName column
|
||||
|
||||
Optional Arguments:
|
||||
--ignore_ssl: Ignore SSL certificate verification
|
||||
|
||||
Example Usage:
|
||||
python delete_vpgs_from_csv.py \
|
||||
--zvm_address "192.168.111.20" \
|
||||
--client_id "zerto-api" \
|
||||
--client_secret "your-secret-here" \
|
||||
--csv_file "vm_list.csv" \
|
||||
--ignore_ssl
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import urllib3
|
||||
from typing import List, Optional
|
||||
|
||||
# Add parent directory to Python path
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
||||
|
||||
from zvml import ZVMLClient
|
||||
|
||||
# Disable SSL warnings
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup_argparse() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Delete VPGs by VM list from a CSV file")
|
||||
parser.add_argument('--zvm_address', required=True, help='ZVM address')
|
||||
parser.add_argument('--client_id', required=True, help='Keycloak client ID')
|
||||
parser.add_argument('--client_secret', required=True, help='Keycloak client secret')
|
||||
parser.add_argument('--csv_file', required=True, help='Path to CSV file with VMName column')
|
||||
parser.add_argument('--ignore_ssl', action='store_true', help='Ignore SSL certificate validation')
|
||||
return parser
|
||||
|
||||
|
||||
def find_vmname_column(fieldnames: List[str]) -> Optional[str]:
|
||||
for field in fieldnames:
|
||||
if field and field.strip().lower() == "vmname":
|
||||
return field
|
||||
return None
|
||||
|
||||
|
||||
def load_vm_names(csv_file: str) -> List[str]:
|
||||
with open(csv_file, newline='', encoding='utf-8-sig') as file_handle:
|
||||
reader = csv.DictReader(file_handle)
|
||||
if not reader.fieldnames:
|
||||
raise ValueError("CSV file has no header row")
|
||||
|
||||
vmname_column = find_vmname_column(reader.fieldnames)
|
||||
if not vmname_column:
|
||||
raise ValueError("CSV file must include a VMName column (case-insensitive)")
|
||||
|
||||
vm_names: List[str] = []
|
||||
for row_index, row in enumerate(reader, start=2):
|
||||
raw_name = row.get(vmname_column, "")
|
||||
vm_name = raw_name.strip() if raw_name else ""
|
||||
if not vm_name:
|
||||
logger.warning(f"Row {row_index} has empty VMName. Skipping.")
|
||||
continue
|
||||
vm_names.append(vm_name)
|
||||
|
||||
if not vm_names:
|
||||
raise ValueError("No VM names found in CSV file")
|
||||
return vm_names
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = setup_argparse().parse_args()
|
||||
|
||||
client = ZVMLClient(
|
||||
zvm_address=args.zvm_address,
|
||||
client_id=args.client_id,
|
||||
client_secret=args.client_secret,
|
||||
verify_certificate=not args.ignore_ssl
|
||||
)
|
||||
|
||||
vm_names = load_vm_names(args.csv_file)
|
||||
deleted_vpg_ids = set()
|
||||
|
||||
for vm_name in vm_names:
|
||||
logger.info(f"Processing VM '{vm_name}'...")
|
||||
vms = client.vms.list_vms(vm_name=vm_name)
|
||||
if not vms:
|
||||
print(f"VM '{vm_name}' not found. Skipping.")
|
||||
continue
|
||||
|
||||
if isinstance(vms, dict):
|
||||
vms = [vms]
|
||||
|
||||
matching_vms = [
|
||||
vm for vm in vms
|
||||
if vm.get("VmName", "").lower() == vm_name.lower()
|
||||
]
|
||||
|
||||
if not matching_vms:
|
||||
print(f"VM '{vm_name}' not found (case-insensitive match). Skipping.")
|
||||
continue
|
||||
|
||||
for vm in matching_vms:
|
||||
vpg_identifier = vm.get("VpgIdentifier")
|
||||
if not vpg_identifier:
|
||||
print(f"VM '{vm_name}' has no VPG identifier. Skipping.")
|
||||
continue
|
||||
|
||||
if vpg_identifier in deleted_vpg_ids:
|
||||
logger.info(f"VPG '{vpg_identifier}' already deleted. Skipping.")
|
||||
continue
|
||||
|
||||
vpg_name = vm.get("VpgName")
|
||||
if not vpg_name:
|
||||
vpg_info = client.vpgs.list_vpgs(vpg_identifier=vpg_identifier)
|
||||
if isinstance(vpg_info, dict):
|
||||
vpg_name = vpg_info.get("VpgName")
|
||||
|
||||
if not vpg_name:
|
||||
print(f"Could not determine VPG name for VM '{vm_name}' (ID: {vpg_identifier}). Skipping.")
|
||||
continue
|
||||
|
||||
try:
|
||||
logger.info(f"Deleting VPG '{vpg_name}' (ID: {vpg_identifier}) for VM '{vm_name}'...")
|
||||
client.vpgs.delete_vpg(vpg_name=vpg_name, keep_recovery_volumes=False)
|
||||
deleted_vpg_ids.add(vpg_identifier)
|
||||
except Exception as exc:
|
||||
logger.error(f"Failed to delete VPG '{vpg_name}' for VM '{vm_name}': {exc}")
|
||||
continue
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user