added journal modifier

This commit is contained in:
Kosta Mushkin
2025-05-02 16:33:19 -04:00
parent 937fc90ce6
commit 3353a7a138
4 changed files with 220 additions and 8 deletions
+3 -1
View File
@@ -18,7 +18,9 @@ This library provides a comprehensive Python interface to manage and automate Ze
## Installation ## Installation
git clone https://github.com/your-repo/zerto-python-library.git git clone https://github.com/your-repo/zerto-python-library.git
cd zerto-python-library cd zerto-python-sdk
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt pip install -r requirements.txt
## Dependencies ## Dependencies
+107
View File
@@ -0,0 +1,107 @@
import json
import argparse
import sys
from typing import Dict, Any, List, Tuple
def modify_vpg_settings(json_data: Dict[str, Any], modifications: List[Tuple[str, str, str]]) -> None:
"""
Modify VPG settings based on the list of modifications
Each modification is a tuple of (field_path, new_value, level)
"""
for vpg in json_data['ExportedVpgSettingsApi']:
# Check and modify VPG name
if 'Basic' in vpg and 'Name' in vpg['Basic']:
name = vpg['Basic']['Name']
if not name.startswith('dr-'):
print(f"Warning: VPG name '{name}' does not start with 'dr-'")
else:
vpg['Basic']['Name'] = 'adr-' + name[3:] # Replace 'dr-' with 'adr-'
# Handle VPG level modifications
for field_path, new_value, level in modifications:
if level == 'vpg':
if field_path == 'Basic.RecoverySiteIdentifier':
vpg['Basic']['RecoverySiteIdentifier'] = new_value
elif field_path == 'Recovery.DefaultHostIdentifier':
vpg['Recovery']['DefaultHostIdentifier'] = new_value
elif field_path == 'Networks.Failover.PublicCloud.SubnetIdentifier':
vpg['Networks']['Failover']['PublicCloud']['SubnetIdentifier'] = new_value
elif field_path == 'Networks.FailoverTest.PublicCloud.SubnetIdentifier':
vpg['Networks']['FailoverTest']['PublicCloud']['SubnetIdentifier'] = new_value
elif field_path == 'Recovery.DefaultDatastoreIdentifier':
vpg['Recovery']['DefaultDatastoreIdentifier'] = new_value
# Handle VM level modifications
if 'Vms' in vpg:
for vm in vpg['Vms']:
for field_path, new_value, level in modifications:
if level == 'vm':
if field_path == 'Vms[].Recovery.PublicCloud.Failover.VirtualNetworkIdentifier':
vm['Recovery']['PublicCloud']['Failover']['VirtualNetworkIdentifier'] = new_value
elif field_path == 'Vms[].Recovery.PublicCloud.FailoverTest.VirtualNetworkIdentifier':
vm['Recovery']['PublicCloud']['FailoverTest']['VirtualNetworkIdentifier'] = new_value
# Update NIC level SubnetIdentifier
if 'Nics' in vm:
for nic in vm['Nics']:
for field_path, new_value, level in modifications:
if level == 'vpg':
if field_path == 'Networks.Failover.PublicCloud.SubnetIdentifier':
nic['Failover']['PublicCloud']['SubnetIdentifier'] = new_value
elif field_path == 'Networks.FailoverTest.PublicCloud.SubnetIdentifier':
nic['FailoverTest']['PublicCloud']['SubnetIdentifier'] = new_value
# Remove Preseed field from volumes if it exists
if 'Volumes' in vm:
for volume in vm['Volumes']:
if 'Preseed' in volume:
del volume['Preseed']
def main():
parser = argparse.ArgumentParser(description='Modify VPG settings in JSON file')
parser.add_argument('input_file', help='Input JSON file path')
parser.add_argument('output_file', help='Output JSON file path')
parser.add_argument('--modifications', nargs='+', required=True,
help='List of modifications in format: "field_path:new_value:level"')
args = parser.parse_args()
# Parse modifications
modifications = []
for mod in args.modifications:
try:
field_path, new_value, level = mod.split(':')
if level not in ['vpg', 'vm']:
raise ValueError(f"Invalid level: {level}")
modifications.append((field_path, new_value, level))
except ValueError as e:
print(f"Error parsing modification '{mod}': {str(e)}")
sys.exit(1)
try:
# Read input JSON file
with open(args.input_file, 'r') as f:
json_data = json.load(f)
# Modify the JSON data
modify_vpg_settings(json_data, modifications)
# Write output JSON file
with open(args.output_file, 'w') as f:
json.dump(json_data, f, indent=2)
print(f"Successfully modified {len(modifications)} fields in {args.input_file}")
print(f"Output written to {args.output_file}")
except FileNotFoundError:
print(f"Error: File {args.input_file} not found")
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: {args.input_file} is not a valid JSON file")
sys.exit(1)
except Exception as e:
print(f"Error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
+100
View File
@@ -0,0 +1,100 @@
import argparse
import logging
import urllib3
import json
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from zvml import ZVMLClient
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def setup_client(args):
return ZVMLClient(
zvm_address=args.zvm_address,
client_id=args.client_id,
client_secret=args.client_secret,
verify_certificate=not args.ignore_ssl
)
def print_journal_settings(vpg_name, vpg_basic):
print(f"\nVPG: {vpg_name}")
print(" JournalHistoryInHours:", vpg_basic.get('JournalHistoryInHours'))
def main():
parser = argparse.ArgumentParser(description="Adjust VPG journal settings interactively or via CLI options")
parser.add_argument("--zvm_address", required=True, help="ZVM address")
parser.add_argument('--client_id', required=True, help='Keycloak client ID')
parser.add_argument('--client_secret', required=True, help='Keycloak client secret')
parser.add_argument("--ignore_ssl", action="store_true", help="Ignore SSL certificate verification")
parser.add_argument("--journal_days", type=int, help="Set journal history (days) for all VPGs (non-interactive)")
parser.add_argument("--journal_hours", type=int, default=0, help="Set additional journal history (hours) for all VPGs (non-interactive)")
args = parser.parse_args()
client = setup_client(args)
vpgs = client.vpgs.list_vpgs()
if not vpgs:
print("No VPGs found.")
sys.exit(0)
if isinstance(vpgs, dict): # If only one VPG, wrap in list
vpgs = [vpgs]
use_cli_journal = args.journal_days is not None
for vpg in vpgs:
vpg_name = vpg['VpgName']
vpg_identifier = vpg['VpgIdentifier']
print(f"\nProcessing VPG: {vpg_name}")
# Create VPG settings (get current settings object)
vpg_settings_id = client.vpgs.create_vpg_settings(vpg_identifier=vpg_identifier)
vpg_settings = client.vpgs.get_vpg_settings_by_id(vpg_settings_id)
vpg_basic = vpg_settings.get('Basic', {})
# Present current settings
print_journal_settings(vpg_name, vpg_basic)
if use_cli_journal:
total_hours = args.journal_days * 24 + args.journal_hours
print(f" Applying journal history: {args.journal_days} days + {args.journal_hours} hours = {total_hours} hours")
else:
# Ask user if they want to change
change = input("Do you want to change the journal history for this VPG? (y/n): ")
if change.lower() != 'y':
continue
# Prompt for new values (always store in hours)
while True:
try:
new_days = int(input(" Enter new journal history (days): "))
new_hours = int(input(" Enter additional journal history (hours): "))
total_hours = new_days * 24 + new_hours
break
except ValueError:
print(" Please enter valid integers.")
# Adjust VPG-level journal history
vpg_basic['JournalHistoryInHours'] = total_hours
# Present new settings
print("\nNew settings to be applied:")
print_journal_settings(vpg_name, vpg_basic)
if use_cli_journal:
# No confirmation, just apply
client.vpgs.update_vpg_settings(vpg_settings_id, vpg_settings)
client.vpgs.commit_vpg(vpg_settings_id, vpg_name, sync=False)
print(" Changes committed.")
else:
confirm = input("Commit these changes? (y/n): ")
if confirm.lower() == 'y':
client.vpgs.update_vpg_settings(vpg_settings_id, vpg_settings)
client.vpgs.commit_vpg(vpg_settings_id, vpg_name, sync=False)
print(" Changes committed.")
else:
print(" Changes not committed.")
if __name__ == "__main__":
main()
+7 -4
View File
@@ -767,12 +767,12 @@ class VPGs:
logging.error("HTTPError occurred with no response attached.") logging.error("HTTPError occurred with no response attached.")
raise raise
def export_vpg_settings(self, vpg_names: List[str]) -> dict: def export_vpg_settings(self, vpg_names: List[str] = None) -> dict:
""" """
Export settings for specified VPGs. Export settings for specified VPGs.
Args: Args:
vpg_names: List of VPG names to export settings for vpg_names: Optional list of VPG names to export settings for. If not provided, exports all.
Returns: Returns:
dict: The exported VPG settings in the format: dict: The exported VPG settings in the format:
@@ -793,17 +793,20 @@ class VPGs:
'Authorization': f'Bearer {self.client.token}' 'Authorization': f'Bearer {self.client.token}'
} }
if vpg_names:
payload = { payload = {
"vpgNames": vpg_names "vpgNames": vpg_names
} }
else:
payload = {}
logging.info(f"VPGs.export_vpg_settings: Exporting settings for VPGs: {vpg_names}") logging.info(f"VPGs.export_vpg_settings: Exporting settings for VPGs: {vpg_names if vpg_names else 'ALL'}")
try: try:
response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate) response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate)
response.raise_for_status() response.raise_for_status()
result = response.json() result = response.json()
logging.info(f"Successfully exported settings for {len(vpg_names)} VPGs at {result.get('timeStamp')}") logging.info(f"Successfully exported settings for {len(vpg_names) if vpg_names else 'ALL'} VPGs at {result.get('timeStamp')}")
logging.debug(f"Export result: {json.dumps(result, indent=2)}") logging.debug(f"Export result: {json.dumps(result, indent=2)}")
return result return result