diff --git a/README.md b/README.md index d11124c..ef856c8 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,9 @@ This library provides a comprehensive Python interface to manage and automate Ze ## Installation git clone https://github.com/your-repo/zerto-python-library.git -cd zerto-python-library +cd zerto-python-sdk +python3 -m venv venv +source venv/bin/activate pip install -r requirements.txt ## Dependencies diff --git a/examples/exported_vpg_setting_modifyer.py b/examples/exported_vpg_setting_modifyer.py new file mode 100644 index 0000000..46051a4 --- /dev/null +++ b/examples/exported_vpg_setting_modifyer.py @@ -0,0 +1,107 @@ +import json +import argparse +import sys +from typing import Dict, Any, List, Tuple + +def modify_vpg_settings(json_data: Dict[str, Any], modifications: List[Tuple[str, str, str]]) -> None: + """ + Modify VPG settings based on the list of modifications + Each modification is a tuple of (field_path, new_value, level) + """ + for vpg in json_data['ExportedVpgSettingsApi']: + # Check and modify VPG name + if 'Basic' in vpg and 'Name' in vpg['Basic']: + name = vpg['Basic']['Name'] + if not name.startswith('dr-'): + print(f"Warning: VPG name '{name}' does not start with 'dr-'") + else: + vpg['Basic']['Name'] = 'adr-' + name[3:] # Replace 'dr-' with 'adr-' + + # Handle VPG level modifications + for field_path, new_value, level in modifications: + if level == 'vpg': + if field_path == 'Basic.RecoverySiteIdentifier': + vpg['Basic']['RecoverySiteIdentifier'] = new_value + elif field_path == 'Recovery.DefaultHostIdentifier': + vpg['Recovery']['DefaultHostIdentifier'] = new_value + elif field_path == 'Networks.Failover.PublicCloud.SubnetIdentifier': + vpg['Networks']['Failover']['PublicCloud']['SubnetIdentifier'] = new_value + elif field_path == 'Networks.FailoverTest.PublicCloud.SubnetIdentifier': + vpg['Networks']['FailoverTest']['PublicCloud']['SubnetIdentifier'] = new_value + elif field_path == 'Recovery.DefaultDatastoreIdentifier': + vpg['Recovery']['DefaultDatastoreIdentifier'] = new_value + + # Handle VM level modifications + if 'Vms' in vpg: + for vm in vpg['Vms']: + for field_path, new_value, level in modifications: + if level == 'vm': + if field_path == 'Vms[].Recovery.PublicCloud.Failover.VirtualNetworkIdentifier': + vm['Recovery']['PublicCloud']['Failover']['VirtualNetworkIdentifier'] = new_value + elif field_path == 'Vms[].Recovery.PublicCloud.FailoverTest.VirtualNetworkIdentifier': + vm['Recovery']['PublicCloud']['FailoverTest']['VirtualNetworkIdentifier'] = new_value + + # Update NIC level SubnetIdentifier + if 'Nics' in vm: + for nic in vm['Nics']: + for field_path, new_value, level in modifications: + if level == 'vpg': + if field_path == 'Networks.Failover.PublicCloud.SubnetIdentifier': + nic['Failover']['PublicCloud']['SubnetIdentifier'] = new_value + elif field_path == 'Networks.FailoverTest.PublicCloud.SubnetIdentifier': + nic['FailoverTest']['PublicCloud']['SubnetIdentifier'] = new_value + + # Remove Preseed field from volumes if it exists + if 'Volumes' in vm: + for volume in vm['Volumes']: + if 'Preseed' in volume: + del volume['Preseed'] + +def main(): + parser = argparse.ArgumentParser(description='Modify VPG settings in JSON file') + parser.add_argument('input_file', help='Input JSON file path') + parser.add_argument('output_file', help='Output JSON file path') + parser.add_argument('--modifications', nargs='+', required=True, + help='List of modifications in format: "field_path:new_value:level"') + + args = parser.parse_args() + + # Parse modifications + modifications = [] + for mod in args.modifications: + try: + field_path, new_value, level = mod.split(':') + if level not in ['vpg', 'vm']: + raise ValueError(f"Invalid level: {level}") + modifications.append((field_path, new_value, level)) + except ValueError as e: + print(f"Error parsing modification '{mod}': {str(e)}") + sys.exit(1) + + try: + # Read input JSON file + with open(args.input_file, 'r') as f: + json_data = json.load(f) + + # Modify the JSON data + modify_vpg_settings(json_data, modifications) + + # Write output JSON file + with open(args.output_file, 'w') as f: + json.dump(json_data, f, indent=2) + + print(f"Successfully modified {len(modifications)} fields in {args.input_file}") + print(f"Output written to {args.output_file}") + + except FileNotFoundError: + print(f"Error: File {args.input_file} not found") + sys.exit(1) + except json.JSONDecodeError: + print(f"Error: {args.input_file} is not a valid JSON file") + sys.exit(1) + except Exception as e: + print(f"Error: {str(e)}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/vpg_settings_journal_modifier.py b/examples/vpg_settings_journal_modifier.py new file mode 100644 index 0000000..d68ea3d --- /dev/null +++ b/examples/vpg_settings_journal_modifier.py @@ -0,0 +1,100 @@ +import argparse +import logging +import urllib3 +import json +import sys +import os + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from zvml import ZVMLClient + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +def setup_client(args): + return ZVMLClient( + zvm_address=args.zvm_address, + client_id=args.client_id, + client_secret=args.client_secret, + verify_certificate=not args.ignore_ssl + ) + +def print_journal_settings(vpg_name, vpg_basic): + print(f"\nVPG: {vpg_name}") + print(" JournalHistoryInHours:", vpg_basic.get('JournalHistoryInHours')) + +def main(): + parser = argparse.ArgumentParser(description="Adjust VPG journal settings interactively or via CLI options") + parser.add_argument("--zvm_address", required=True, help="ZVM address") + parser.add_argument('--client_id', required=True, help='Keycloak client ID') + parser.add_argument('--client_secret', required=True, help='Keycloak client secret') + parser.add_argument("--ignore_ssl", action="store_true", help="Ignore SSL certificate verification") + parser.add_argument("--journal_days", type=int, help="Set journal history (days) for all VPGs (non-interactive)") + parser.add_argument("--journal_hours", type=int, default=0, help="Set additional journal history (hours) for all VPGs (non-interactive)") + args = parser.parse_args() + + client = setup_client(args) + vpgs = client.vpgs.list_vpgs() + if not vpgs: + print("No VPGs found.") + sys.exit(0) + if isinstance(vpgs, dict): # If only one VPG, wrap in list + vpgs = [vpgs] + + use_cli_journal = args.journal_days is not None + + for vpg in vpgs: + vpg_name = vpg['VpgName'] + vpg_identifier = vpg['VpgIdentifier'] + print(f"\nProcessing VPG: {vpg_name}") + + # Create VPG settings (get current settings object) + vpg_settings_id = client.vpgs.create_vpg_settings(vpg_identifier=vpg_identifier) + vpg_settings = client.vpgs.get_vpg_settings_by_id(vpg_settings_id) + + vpg_basic = vpg_settings.get('Basic', {}) + + # Present current settings + print_journal_settings(vpg_name, vpg_basic) + + if use_cli_journal: + total_hours = args.journal_days * 24 + args.journal_hours + print(f" Applying journal history: {args.journal_days} days + {args.journal_hours} hours = {total_hours} hours") + else: + # Ask user if they want to change + change = input("Do you want to change the journal history for this VPG? (y/n): ") + if change.lower() != 'y': + continue + + # Prompt for new values (always store in hours) + while True: + try: + new_days = int(input(" Enter new journal history (days): ")) + new_hours = int(input(" Enter additional journal history (hours): ")) + total_hours = new_days * 24 + new_hours + break + except ValueError: + print(" Please enter valid integers.") + + # Adjust VPG-level journal history + vpg_basic['JournalHistoryInHours'] = total_hours + + # Present new settings + print("\nNew settings to be applied:") + print_journal_settings(vpg_name, vpg_basic) + + if use_cli_journal: + # No confirmation, just apply + client.vpgs.update_vpg_settings(vpg_settings_id, vpg_settings) + client.vpgs.commit_vpg(vpg_settings_id, vpg_name, sync=False) + print(" Changes committed.") + else: + confirm = input("Commit these changes? (y/n): ") + if confirm.lower() == 'y': + client.vpgs.update_vpg_settings(vpg_settings_id, vpg_settings) + client.vpgs.commit_vpg(vpg_settings_id, vpg_name, sync=False) + print(" Changes committed.") + else: + print(" Changes not committed.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/zvml/vpgs.py b/zvml/vpgs.py index d8e5b96..b944d86 100644 --- a/zvml/vpgs.py +++ b/zvml/vpgs.py @@ -767,12 +767,12 @@ class VPGs: logging.error("HTTPError occurred with no response attached.") raise - def export_vpg_settings(self, vpg_names: List[str]) -> dict: + def export_vpg_settings(self, vpg_names: List[str] = None) -> dict: """ Export settings for specified VPGs. Args: - vpg_names: List of VPG names to export settings for + vpg_names: Optional list of VPG names to export settings for. If not provided, exports all. Returns: dict: The exported VPG settings in the format: @@ -793,17 +793,20 @@ class VPGs: 'Authorization': f'Bearer {self.client.token}' } - payload = { - "vpgNames": vpg_names - } + if vpg_names: + payload = { + "vpgNames": vpg_names + } + else: + payload = {} - logging.info(f"VPGs.export_vpg_settings: Exporting settings for VPGs: {vpg_names}") + logging.info(f"VPGs.export_vpg_settings: Exporting settings for VPGs: {vpg_names if vpg_names else 'ALL'}") try: response = requests.post(url, headers=headers, json=payload, verify=self.client.verify_certificate) response.raise_for_status() result = response.json() - logging.info(f"Successfully exported settings for {len(vpg_names)} VPGs at {result.get('timeStamp')}") + logging.info(f"Successfully exported settings for {len(vpg_names) if vpg_names else 'ALL'} VPGs at {result.get('timeStamp')}") logging.debug(f"Export result: {json.dumps(result, indent=2)}") return result