#!/usr/bin/env python3 # ProxLB # ProxLB (re)balances VM workloads across nodes in Proxmox clusters. # ProxLB obtains current metrics from all nodes within the cluster for # further auto balancing by memory, disk or cpu and rebalances the VMs # over all available nodes in a cluster by having an equal resource usage. # Copyright (C) 2024 Florian Paul Azim Hoberg @gyptazy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import configparser import copy import json import logging import os try: import proxmoxer _imports = True except ImportError: _imports = False import random import re import requests import socket import sys import time import urllib3 # Constants __appname__ = "ProxLB" __version__ = "1.0.3b" __config_version__ = 3 __author__ = "Florian Paul Azim Hoberg @gyptazy" __errors__ = False # Classes ## Logging class class SystemdHandler(logging.Handler): """ Class to handle logging options. """ PREFIX = { logging.CRITICAL: "<2> " + __appname__ + ": ", logging.ERROR: "<3> " + __appname__ + ": ", logging.WARNING: "<4> " + __appname__ + ": ", logging.INFO: "<6> " + __appname__ + ": ", logging.DEBUG: "<7> " + __appname__ + ": ", logging.NOTSET: "<7 " + __appname__ + ": ", } def __init__(self, stream=sys.stdout): self.stream = stream logging.Handler.__init__(self) def emit(self, record): try: msg = self.PREFIX[record.levelno] + self.format(record) + "\n" self.stream.write(msg) self.stream.flush() except Exception: self.handleError(record) # Functions def initialize_logger(log_level, update_log_verbosity=False): """ Initialize ProxLB logging handler. """ info_prefix = 'Info: [logger]:' root_logger = logging.getLogger() root_logger.setLevel(log_level) if not update_log_verbosity: root_logger.addHandler(SystemdHandler()) logging.info(f'{info_prefix} Logger got initialized.') else: logging.info(f'{info_prefix} Logger verbosity got updated to: {log_level}.') def pre_validations(config_path): """ Run pre-validations as sanity checks. """ info_prefix = 'Info: [pre-validations]:' __validate_imports() __validate_config_file(config_path) logging.info(f'{info_prefix} All pre-validations done.') def post_validations(): """ Run post-validations as sanity checks. """ error_prefix = 'Error: [post-validations]:' info_prefix = 'Info: [post-validations]:' if __errors__: logging.critical(f'{error_prefix} Not all post-validations succeeded. Please validate!') else: logging.info(f'{info_prefix} All post-validations succeeded.') def validate_daemon(daemon, schedule): """ Validate if ProxLB runs as a daemon. """ info_prefix = 'Info: [daemon]:' if bool(int(daemon)): logging.info(f'{info_prefix} Running in daemon mode. Next run in {schedule} hours.') time.sleep(int(schedule) * 60 * 60) else: logging.info(f'{info_prefix} Not running in daemon mode. Quitting.') sys.exit(0) def __validate_imports(): """ Validate if all Python imports succeeded. """ error_prefix = 'Error: [python-imports]:' info_prefix = 'Info: [python-imports]:' if not _imports: logging.critical(f'{error_prefix} Could not import all dependencies. Please install "proxmoxer".') sys.exit(2) else: logging.info(f'{info_prefix} All required dependencies were imported.') def __validate_config_file(config_path): """ Validate if all Python imports succeeded. """ error_prefix = 'Error: [config]:' info_prefix = 'Info: [config]:' if not os.path.isfile(config_path): logging.critical(f'{error_prefix} Could not find config file in: {config_path}.') sys.exit(2) else: logging.info(f'{info_prefix} Configuration file loaded from: {config_path}.') def initialize_args(): """ Initialize given arguments for ProxLB. """ argparser = argparse.ArgumentParser(description='ProxLB') argparser.add_argument('-c', '--config', type=str, help='Path to config file.', required=False) argparser.add_argument('-d', '--dry-run', help='Perform a dry-run without doing any actions.', action='store_true', required=False) argparser.add_argument('-j', '--json', help='Return a JSON of the VM movement.', action='store_true', required=False) argparser.add_argument('-b', '--best-node', help='Returns the best next node.', action='store_true', required=False) return argparser.parse_args() def initialize_config_path(app_args): """ Initialize path to ProxLB config file. """ info_prefix = 'Info: [config]:' config_path = app_args.config if app_args.config is None: config_path = '/etc/proxlb/proxlb.conf' logging.info(f'{info_prefix} No config file provided. Falling back to: {config_path}.') else: logging.info(f'{info_prefix} Using config file: {config_path}.') return config_path def initialize_config_options(config_path): """ Read configuration from given config file for ProxLB. """ error_prefix = 'Error: [config]:' info_prefix = 'Info: [config]:' proxlb_config = {} try: config = configparser.ConfigParser() config.read(config_path) # Proxmox config proxlb_config['proxmox_api_host'] = config['proxmox']['api_host'] proxlb_config['proxmox_api_user'] = config['proxmox']['api_user'] proxlb_config['proxmox_api_pass'] = config['proxmox']['api_pass'] proxlb_config['proxmox_api_ssl_v'] = config['proxmox']['verify_ssl'] # VM Balancing proxlb_config['vm_balancing_enable'] = config['vm_balancing'].get('enable', 1) proxlb_config['vm_balancing_method'] = config['vm_balancing'].get('method', 'memory') proxlb_config['vm_balancing_mode'] = config['vm_balancing'].get('mode', 'used') proxlb_config['vm_balancing_mode_option'] = config['vm_balancing'].get('mode_option', 'bytes') proxlb_config['vm_balancing_type'] = config['vm_balancing'].get('type', 'vm') proxlb_config['vm_balanciness'] = config['vm_balancing'].get('balanciness', 10) proxlb_config['vm_parallel_migrations'] = config['vm_balancing'].get('parallel_migrations', 1) proxlb_config['vm_ignore_nodes'] = config['vm_balancing'].get('ignore_nodes', None) proxlb_config['vm_ignore_vms'] = config['vm_balancing'].get('ignore_vms', None) # Storage Balancing proxlb_config['storage_balancing_enable'] = config['storage_balancing'].get('enable', 0) proxlb_config['storage_balancing_method'] = config['storage_balancing'].get('method', 'disk_space') proxlb_config['storage_balanciness'] = config['storage_balancing'].get('balanciness', 10) proxlb_config['storage_parallel_migrations'] = config['storage_balancing'].get('parallel_migrations', 1) # Update Support proxlb_config['update_service'] = config['update_service'].get('enable', 0) # API proxlb_config['api'] = config['update_service'].get('enable', 0) # Service proxlb_config['master_only'] = config['service'].get('master_only', 0) proxlb_config['daemon'] = config['service'].get('daemon', 1) proxlb_config['schedule'] = config['service'].get('schedule', 24) proxlb_config['log_verbosity'] = config['service'].get('log_verbosity', 'CRITICAL') proxlb_config['config_version'] = config['service'].get('config_version', 2) except configparser.NoSectionError: logging.critical(f'{error_prefix} Could not find the required section.') sys.exit(2) except configparser.ParsingError: logging.critical(f'{error_prefix} Unable to parse the config file.') sys.exit(2) except KeyError: logging.critical(f'{error_prefix} Could not find the required options in config file.') sys.exit(2) # Normalize and update bools. Afterwards, validate minimum required config version. proxlb_config = __update_config_parser_bools(proxlb_config) validate_config_minimum_version(proxlb_config) logging.info(f'{info_prefix} Configuration file loaded.') return proxlb_config def __update_config_parser_bools(proxlb_config): """ Update bools in config from configparser to real bools """ info_prefix = 'Info: [config-bool-converter]:' # Normalize and update config parser values to bools. for section, option_value in proxlb_config.items(): if option_value in [1, '1', 'yes', 'Yes', 'true', 'True', 'enable']: logging.info(f'{info_prefix} Converting {section} to bool: True.') proxlb_config[section] = True if option_value in [0, '0', 'no', 'No', 'false', 'False', 'disable']: logging.info(f'{info_prefix} Converting {section} to bool: False.') proxlb_config[section] = False return proxlb_config def validate_config_minimum_version(proxlb_config): """ Validate the minimum required config file for ProxLB """ info_prefix = 'Info: [config-version-validator]:' error_prefix = 'Error: [config-version-validator]:' if int(proxlb_config['config_version']) < __config_version__: logging.error(f'{error_prefix} ProxLB config version {proxlb_config["config_version"]} is too low. Required: {__config_version__}.') print(f'{error_prefix} ProxLB config version {proxlb_config["config_version"]} is too low. Required: {__config_version__}.') sys.exit(1) else: logging.info(f'{info_prefix} ProxLB config version {proxlb_config["config_version"]} is fine. Required: {__config_version__}.') def api_connect(proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v): """ Connect and authenticate to the Proxmox remote API. """ error_prefix = 'Error: [api-connection]:' warn_prefix = 'Warning: [api-connection]:' info_prefix = 'Info: [api-connection]:' proxmox_api_ssl_v = bool(int(proxmox_api_ssl_v)) if not proxmox_api_ssl_v: requests.packages.urllib3.disable_warnings() logging.warning(f'{warn_prefix} API connection does not verify SSL certificate.') proxmox_api_host = __api_connect_get_host(proxmox_api_host) try: api_object = proxmoxer.ProxmoxAPI(proxmox_api_host, user=proxmox_api_user, password=proxmox_api_pass, verify_ssl=proxmox_api_ssl_v) except urllib3.exceptions.NameResolutionError: logging.critical(f'{error_prefix} Could not resolve the given host: {proxmox_api_host}.') sys.exit(2) except requests.exceptions.ConnectTimeout: logging.critical(f'{error_prefix} Connection time out to host: {proxmox_api_host}.') sys.exit(2) except requests.exceptions.SSLError: logging.critical(f'{error_prefix} SSL certificate verification failed for host: {proxmox_api_host}.') sys.exit(2) logging.info(f'{info_prefix} API connection succeeded to host: {proxmox_api_host}.') return api_object def __api_connect_get_host(proxmox_api_host): """ Validate if a list of API hosts got provided and pre-validate the hosts. """ info_prefix = 'Info: [api-connect-get-host]:' proxmox_port = 8006 if ',' in proxmox_api_host: logging.info(f'{info_prefix} Multiple hosts for API connection are given. Testing hosts for further usage.') proxmox_api_host = proxmox_api_host.split(',') # Validate all given hosts and check for responsive on Proxmox web port. for host in proxmox_api_host: logging.info(f'{info_prefix} Testing host {host} on port tcp/{proxmox_port}.') reachable = __api_connect_test_ipv4_host(host, proxmox_port) if reachable: return host else: logging.info(f'{info_prefix} Using host {proxmox_api_host} on port tcp/{proxmox_port}.') return proxmox_api_host def __api_connect_test_ipv4_host(proxmox_api_host, port): error_prefix = 'Error: [api-connect-test-host]:' info_prefix = 'Info: [api-connect-test-host]:' proxmox_connection_timeout = 2 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(proxmox_connection_timeout) logging.info(f'{info_prefix} Timeout for host {proxmox_api_host} is set to {proxmox_connection_timeout} seconds.') result = sock.connect_ex((proxmox_api_host,port)) if result == 0: sock.close() logging.info(f'{info_prefix} Host {proxmox_api_host} is reachable on port tcp/{port}.') return True else: sock.close() logging.critical(f'{error_prefix} Host {proxmox_api_host} is unreachable on port tcp/{port}.') return False def __api_connect_test_ipv6_host(proxmox_api_host, port): error_prefix = 'Error: [api-connect-test-host]:' info_prefix = 'Info: [api-connect-test-host]:' proxmox_connection_timeout = 2 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.settimeout(proxmox_connection_timeout) logging.info(f'{info_prefix} Timeout for host {proxmox_api_host} is set to {proxmox_connection_timeout}.') result = sock.connect_ex((proxmox_api_host,port)) if result == 0: sock.close() logging.info(f'{info_prefix} Host {proxmox_api_host} is reachable on port tcp/{port}.') return True else: sock.close() logging.critical(f'{error_prefix} Host {proxmox_api_host} is unreachable on port tcp/{port}.') return False def execute_rebalancing_only_by_master(api_object, master_only): """ Validate if balancing should only be done by the cluster master. Afterwards, validate if this node is the cluster master. """ info_prefix = 'Info: [only-on-master-executor]:' master_only = bool(int(master_only)) if bool(int(master_only)): logging.info(f'{info_prefix} Master only rebalancing is defined. Starting validation.') cluster_master_node = get_cluster_master(api_object) cluster_master = validate_cluster_master(cluster_master_node) return cluster_master, master_only else: logging.info(f'{info_prefix} No master only rebalancing is defined. Skipping validation.') return False, master_only def get_cluster_master(api_object): """ Get the current master of the Proxmox cluster. """ error_prefix = 'Error: [cluster-master-getter]:' info_prefix = 'Info: [cluster-master-getter]:' try: ha_status_object = api_object.cluster().ha().status().manager_status().get() logging.info(f'{info_prefix} Master node: {ha_status_object.get("manager_status", None).get("master_node", None)}') except urllib3.exceptions.NameResolutionError: logging.critical(f'{error_prefix} Could not resolve the API.') sys.exit(2) except requests.exceptions.ConnectTimeout: logging.critical(f'{error_prefix} Connection time out to API.') sys.exit(2) except requests.exceptions.SSLError: logging.critical(f'{error_prefix} SSL certificate verification failed for API.') sys.exit(2) cluster_master = ha_status_object.get("manager_status", None).get("master_node", None) if cluster_master: return cluster_master else: logging.critical(f'{error_prefix} Could not obtain cluster master. Please check your configuration - stopping.') sys.exit(2) def validate_cluster_master(cluster_master): """ Validate if the current execution node is the cluster master. """ info_prefix = 'Info: [cluster-master-validator]:' node_executor_hostname = socket.gethostname() logging.info(f'{info_prefix} Node executor hostname is: {node_executor_hostname}') if node_executor_hostname != cluster_master: logging.info(f'{info_prefix} {node_executor_hostname} is not the cluster master ({cluster_master}).') return False else: return True def get_node_statistics(api_object, ignore_nodes): """ Get statistics of cpu, memory and disk for each node in the cluster. """ info_prefix = 'Info: [node-statistics]:' node_statistics = {} ignore_nodes_list = ignore_nodes.split(',') for node in api_object.nodes.get(): if node['status'] == 'online' and node['node'] not in ignore_nodes_list: node_statistics[node['node']] = {} node_statistics[node['node']]['cpu_total'] = node['maxcpu'] node_statistics[node['node']]['cpu_assigned'] = node['cpu'] node_statistics[node['node']]['cpu_assigned_percent'] = int((node_statistics[node['node']]['cpu_assigned']) / int(node_statistics[node['node']]['cpu_total']) * 100) node_statistics[node['node']]['cpu_assigned_percent_last_run'] = 0 node_statistics[node['node']]['cpu_used'] = 0 node_statistics[node['node']]['cpu_free'] = int(node['maxcpu']) - int(node['cpu']) node_statistics[node['node']]['cpu_free_percent'] = int((node_statistics[node['node']]['cpu_free']) / int(node['maxcpu']) * 100) node_statistics[node['node']]['cpu_free_percent_last_run'] = 0 node_statistics[node['node']]['memory_total'] = node['maxmem'] node_statistics[node['node']]['memory_assigned'] = 0 node_statistics[node['node']]['memory_assigned_percent'] = int((node_statistics[node['node']]['memory_assigned']) / int(node_statistics[node['node']]['memory_total']) * 100) node_statistics[node['node']]['memory_assigned_percent_last_run'] = 0 node_statistics[node['node']]['memory_used'] = node['mem'] node_statistics[node['node']]['memory_free'] = int(node['maxmem']) - int(node['mem']) node_statistics[node['node']]['memory_free_percent'] = int((node_statistics[node['node']]['memory_free']) / int(node['maxmem']) * 100) node_statistics[node['node']]['memory_free_percent_last_run'] = 0 node_statistics[node['node']]['disk_total'] = node['maxdisk'] node_statistics[node['node']]['disk_assigned'] = 0 node_statistics[node['node']]['disk_assigned_percent'] = int((node_statistics[node['node']]['disk_assigned']) / int(node_statistics[node['node']]['disk_total']) * 100) node_statistics[node['node']]['disk_assigned_percent_last_run'] = 0 node_statistics[node['node']]['disk_used'] = node['disk'] node_statistics[node['node']]['disk_free'] = int(node['maxdisk']) - int(node['disk']) node_statistics[node['node']]['disk_free_percent'] = int((node_statistics[node['node']]['disk_free']) / int(node['maxdisk']) * 100) node_statistics[node['node']]['disk_free_percent_last_run'] = 0 logging.info(f'{info_prefix} Added node {node["node"]}.') logging.info(f'{info_prefix} Created node statistics.') return node_statistics def get_vm_statistics(api_object, ignore_vms, balancing_type): """ Get statistics of cpu, memory and disk for each vm in the cluster. """ info_prefix = 'Info: [vm-statistics]:' warn_prefix = 'Warn: [vm-statistics]:' vm_statistics = {} ignore_vms_list = ignore_vms.split(',') group_include = None group_exclude = None vm_ignore = None vm_ignore_wildcard = False _vm_details_storage_allowed = ['ide', 'nvme', 'scsi', 'virtio', 'sata', 'rootfs'] # Wildcard support: Initially validate if we need to honour # any wildcards within the vm_ignore list. vm_ignore_wildcard = __validate_ignore_vm_wildcard(ignore_vms) for node in api_object.nodes.get(): # Add all virtual machines if type is vm or all. if balancing_type == 'vm' or balancing_type == 'all': for vm in api_object.nodes(node['node']).qemu.get(): # Get the VM tags from API. vm_tags = __get_vm_tags(api_object, node, vm['vmid'], 'vm') if vm_tags is not None: group_include, group_exclude, vm_ignore = __get_proxlb_groups(vm_tags) # Get wildcard match for VMs to ignore if a wildcard pattern was # previously found. Wildcards may slow down the task when using # many patterns in the ignore list. Therefore, run this only if # a wildcard pattern was found. We also do not need to validate # this if the VM is already being ignored by a defined tag. if vm_ignore_wildcard and not vm_ignore: vm_ignore = __check_vm_name_wildcard_pattern(vm['name'], ignore_vms_list) if vm['status'] == 'running' and vm['name'] not in ignore_vms_list and not vm_ignore: vm_statistics[vm['name']] = {} vm_statistics[vm['name']]['group_include'] = group_include vm_statistics[vm['name']]['group_exclude'] = group_exclude vm_statistics[vm['name']]['cpu_total'] = vm['cpus'] vm_statistics[vm['name']]['cpu_used'] = vm['cpu'] vm_statistics[vm['name']]['memory_total'] = vm['maxmem'] vm_statistics[vm['name']]['memory_used'] = vm['mem'] vm_statistics[vm['name']]['disk_total'] = vm['maxdisk'] vm_statistics[vm['name']]['disk_used'] = vm['disk'] vm_statistics[vm['name']]['vmid'] = vm['vmid'] vm_statistics[vm['name']]['node_parent'] = node['node'] vm_statistics[vm['name']]['node_rebalance'] = node['node'] vm_statistics[vm['name']]['storage'] = {} vm_statistics[vm['name']]['type'] = 'vm' # Get disk details of the related object. _vm_details = api_object.nodes(node['node']).qemu(vm['vmid']).config.get() logging.info(f'{info_prefix} Getting disk information for vm {vm["name"]}.') for vm_detail_key, vm_detail_value in _vm_details.items(): # vm_detail_key_validator = re.sub('\d+$', '', vm_detail_key) vm_detail_key_validator = re.sub(r'\d+$', '', vm_detail_key) if vm_detail_key_validator in _vm_details_storage_allowed: vm_statistics[vm['name']]['storage'][vm_detail_key] = {} match = re.match(r'([^:]+):[^/]+/(.+),iothread=\d+,size=(\d+G)', _vm_details[vm_detail_key]) # Create an efficient match group and split the strings to assign them to the storage information. if match: _volume = match.group(1) _disk_name = match.group(2) _disk_size = match.group(3) vm_statistics[vm['name']]['storage'][vm_detail_key]['name'] = _disk_name vm_statistics[vm['name']]['storage'][vm_detail_key]['device_name'] = vm_detail_key vm_statistics[vm['name']]['storage'][vm_detail_key]['volume'] = _volume vm_statistics[vm['name']]['storage'][vm_detail_key]['storage_parent'] = _volume vm_statistics[vm['name']]['storage'][vm_detail_key]['storage_rebalance'] = _volume vm_statistics[vm['name']]['storage'][vm_detail_key]['size'] = _disk_size[:-1] logging.info(f'{info_prefix} Added disk for {vm["name"]}: Name {_disk_name} on volume {_volume} with size {_disk_size}.') else: logging.info(f'{info_prefix} No (or unsupported) disk(s) for {vm["name"]} found.') logging.info(f'{info_prefix} Added vm {vm["name"]}.') # Add all containers if type is ct or all. if balancing_type == 'ct' or balancing_type == 'all': for vm in api_object.nodes(node['node']).lxc.get(): logging.warning(f'{warn_prefix} Rebalancing on LXC containers (CT) always requires them to shut down.') logging.warning(f'{warn_prefix} {vm["name"]} is from type CT and cannot be live migrated!') # Get the VM tags from API. vm_tags = __get_vm_tags(api_object, node, vm['vmid'], 'ct') if vm_tags is not None: group_include, group_exclude, vm_ignore = __get_proxlb_groups(vm_tags) # Get wildcard match for VMs to ignore if a wildcard pattern was # previously found. Wildcards may slow down the task when using # many patterns in the ignore list. Therefore, run this only if # a wildcard pattern was found. We also do not need to validate # this if the VM is already being ignored by a defined tag. if vm_ignore_wildcard and not vm_ignore: vm_ignore = __check_vm_name_wildcard_pattern(vm['name'], ignore_vms_list) if vm['status'] == 'running' and vm['name'] not in ignore_vms_list and not vm_ignore: vm_statistics[vm['name']] = {} vm_statistics[vm['name']]['group_include'] = group_include vm_statistics[vm['name']]['group_exclude'] = group_exclude vm_statistics[vm['name']]['cpu_total'] = vm['cpus'] vm_statistics[vm['name']]['cpu_used'] = vm['cpu'] vm_statistics[vm['name']]['memory_total'] = vm['maxmem'] vm_statistics[vm['name']]['memory_used'] = vm['mem'] vm_statistics[vm['name']]['disk_total'] = vm['maxdisk'] vm_statistics[vm['name']]['disk_used'] = vm['disk'] vm_statistics[vm['name']]['vmid'] = vm['vmid'] vm_statistics[vm['name']]['node_parent'] = node['node'] vm_statistics[vm['name']]['node_rebalance'] = node['node'] vm_statistics[vm['name']]['storage'] = {} vm_statistics[vm['name']]['type'] = 'ct' # Get disk details of the related object. _vm_details = api_object.nodes(node['node']).lxc(vm['vmid']).config.get() logging.info(f'{info_prefix} Getting disk information for vm {vm["name"]}.') for vm_detail_key, vm_detail_value in _vm_details.items(): # vm_detail_key_validator = re.sub('\d+$', '', vm_detail_key) vm_detail_key_validator = re.sub(r'\d+$', '', vm_detail_key) if vm_detail_key_validator in _vm_details_storage_allowed: vm_statistics[vm['name']]['storage'][vm_detail_key] = {} match = re.match(r'(?P[^:]+):(?P[^,]+),size=(?P\S+)', _vm_details[vm_detail_key]) # Create an efficient match group and split the strings to assign them to the storage information. if match: _volume = match.group(1) _disk_name = match.group(2) _disk_size = match.group(3) vm_statistics[vm['name']]['storage'][vm_detail_key]['name'] = _disk_name vm_statistics[vm['name']]['storage'][vm_detail_key]['device_name'] = vm_detail_key vm_statistics[vm['name']]['storage'][vm_detail_key]['volume'] = _volume vm_statistics[vm['name']]['storage'][vm_detail_key]['storage_parent'] = _volume vm_statistics[vm['name']]['storage'][vm_detail_key]['storage_rebalance'] = _volume vm_statistics[vm['name']]['storage'][vm_detail_key]['size'] = _disk_size[:-1] logging.info(f'{info_prefix} Added disk for {vm["name"]}: Name {_disk_name} on volume {_volume} with size {_disk_size}.') else: logging.info(f'{info_prefix} No disks for {vm["name"]} found.') logging.info(f'{info_prefix} Added vm {vm["name"]}.') logging.info(f'{info_prefix} Created VM statistics.') return vm_statistics def update_node_statistics(node_statistics, vm_statistics): """ Update node statistics by VMs statistics. """ info_prefix = 'Info: [node-update-statistics]:' warn_prefix = 'Warning: [node-update-statistics]:' for vm, vm_value in vm_statistics.items(): node_statistics[vm_value['node_parent']]['cpu_assigned'] = node_statistics[vm_value['node_parent']]['cpu_assigned'] + int(vm_value['cpu_total']) node_statistics[vm_value['node_parent']]['cpu_assigned_percent'] = (node_statistics[vm_value['node_parent']]['cpu_assigned'] / node_statistics[vm_value['node_parent']]['cpu_total']) * 100 node_statistics[vm_value['node_parent']]['memory_assigned'] = node_statistics[vm_value['node_parent']]['memory_assigned'] + int(vm_value['memory_total']) node_statistics[vm_value['node_parent']]['memory_assigned_percent'] = (node_statistics[vm_value['node_parent']]['memory_assigned'] / node_statistics[vm_value['node_parent']]['memory_total']) * 100 node_statistics[vm_value['node_parent']]['disk_assigned'] = node_statistics[vm_value['node_parent']]['disk_assigned'] + int(vm_value['disk_total']) node_statistics[vm_value['node_parent']]['disk_assigned_percent'] = (node_statistics[vm_value['node_parent']]['disk_assigned'] / node_statistics[vm_value['node_parent']]['disk_total']) * 100 if node_statistics[vm_value['node_parent']]['cpu_assigned_percent'] > 99: logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for CPU by {int(node_statistics[vm_value["node_parent"]]["cpu_assigned_percent"])}%.') if node_statistics[vm_value['node_parent']]['memory_assigned_percent'] > 99: logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for memory by {int(node_statistics[vm_value["node_parent"]]["memory_assigned_percent"])}%.') if node_statistics[vm_value['node_parent']]['disk_assigned_percent'] > 99: logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for disk by {int(node_statistics[vm_value["node_parent"]]["disk_assigned_percent"])}%.') logging.info(f'{info_prefix} Updated node resource assignments by all VMs.') logging.debug('node_statistics') return node_statistics def get_storage_statistics(api_object): """ Get statistics of all storage in the cluster. """ info_prefix = 'Info: [storage-statistics]:' storage_statistics = {} for node in api_object.nodes.get(): for storage in api_object.nodes(node['node']).storage.get(): # Only add enabled and active storage repositories that might be suitable for further # storage balancing. if storage['enabled'] and storage['active'] and storage['shared']: storage_statistics[storage['storage']] = {} storage_statistics[storage['storage']]['name'] = storage['storage'] storage_statistics[storage['storage']]['total'] = storage['total'] storage_statistics[storage['storage']]['used'] = storage['used'] storage_statistics[storage['storage']]['used_percent'] = storage['used'] / storage['total'] * 100 storage_statistics[storage['storage']]['used_percent_last_run'] = 0 storage_statistics[storage['storage']]['free'] = storage['total'] - storage['used'] storage_statistics[storage['storage']]['free_percent'] = storage_statistics[storage['storage']]['free'] / storage['total'] * 100 storage_statistics[storage['storage']]['used_fraction'] = storage['used_fraction'] storage_statistics[storage['storage']]['type'] = storage['type'] storage_statistics[storage['storage']]['content'] = storage['content'] storage_statistics[storage['storage']]['usage_type'] = '' # Split the Proxmox returned values to a list and validate the supported # types of the underlying storage for further migrations. storage_content_list = storage['content'].split(',') usage_ct = False usage_vm = False if 'rootdir' in storage_content_list: usage_ct = True storage_statistics[storage['storage']]['usage_type'] = 'ct' logging.info(f'{info_prefix} Storage {storage["storage"]} support CTs.') if 'images' in storage_content_list: usage_vm = True storage_statistics[storage['storage']]['usage_type'] = 'vm' logging.info(f'{info_prefix} Storage {storage["storage"]} support VMs.') if usage_ct and usage_vm: storage_statistics[storage['storage']]['usage_type'] = 'all' logging.info(f'{info_prefix} Updateing storage {storage["storage"]} support to CTs and VMs.') logging.info(f'{info_prefix} Added storage {storage["storage"]}.') logging.info(f'{info_prefix} Created storage statistics.') return storage_statistics def __validate_ignore_vm_wildcard(ignore_vms): """ Validate if a wildcard is used for ignored VMs. """ if '*' in ignore_vms: return True def __check_vm_name_wildcard_pattern(vm_name, ignore_vms_list): """ Validate if the VM name is in the ignore list pattern included. """ for ignore_vm in ignore_vms_list: if '*' in ignore_vm: if ignore_vm[:-1] in vm_name: return True def __get_vm_tags(api_object, node, vmid, balancing_type): """ Get tags for a VM/CT for a given VMID. """ info_prefix = 'Info: [api-get-vm-tags]:' if balancing_type == 'vm': vm_config = api_object.nodes(node['node']).qemu(vmid).config.get() if balancing_type == 'ct': vm_config = api_object.nodes(node['node']).lxc(vmid).config.get() logging.info(f'{info_prefix} Got VM/CT tag from API.') return vm_config.get('tags', None) def __get_proxlb_groups(vm_tags): """ Get ProxLB related include and exclude groups. """ info_prefix = 'Info: [api-get-vm-include-exclude-tags]:' group_include = None group_exclude = None vm_ignore = None group_list = re.split(";", vm_tags) for group in group_list: if group.startswith('plb_include_'): logging.info(f'{info_prefix} Got PLB include group.') group_include = group if group.startswith('plb_exclude_'): logging.info(f'{info_prefix} Got PLB include group.') group_exclude = group if group.startswith('plb_ignore_vm'): logging.info(f'{info_prefix} Got PLB ignore group.') vm_ignore = True return group_include, group_exclude, vm_ignore def balancing_vm_calculations(balancing_method, balancing_mode, balancing_mode_option, node_statistics, vm_statistics, balanciness, app_args, rebalance, processed_vms): """ Calculate re-balancing of VMs on present nodes across the cluster. """ info_prefix = 'Info: [rebalancing-vm-calculator]:' # Validate for a supported balancing method, mode and if rebalancing is required. __validate_balancing_method(balancing_method) __validate_balancing_mode(balancing_mode) __validate_vm_statistics(vm_statistics) rebalance = __validate_balanciness(balanciness, balancing_method, balancing_mode, node_statistics) if rebalance: # Get most used/assigned resources of the VM and the most free or less allocated node. resources_vm_most_used, processed_vms = __get_most_used_resources_vm(balancing_method, balancing_mode, vm_statistics, processed_vms) resources_node_most_free = __get_most_free_resources_node(balancing_method, balancing_mode, balancing_mode_option, node_statistics) # Update resource statistics for VMs and nodes. node_statistics, vm_statistics = __update_vm_resource_statistics(resources_vm_most_used, resources_node_most_free, vm_statistics, node_statistics, balancing_method, balancing_mode) # Start recursion until we do not have any needs to rebalance anymore. balancing_vm_calculations(balancing_method, balancing_mode, balancing_mode_option, node_statistics, vm_statistics, balanciness, app_args, rebalance, processed_vms) # If only best node argument set we simply return the next best node for VM # and CT placement on the CLI and stop ProxLB. if app_args.best_node: logging.info(f'{info_prefix} Only best next node for new VM & CT placement requsted.') best_next_node = __get_most_free_resources_node(balancing_method, balancing_mode, balancing_mode_option, node_statistics) print(best_next_node[0]) logging.info(f'{info_prefix} Best next node for VM & CT placement: {best_next_node[0]}') sys.exit(0) # Honour groupings for include and exclude groups for rebalancing VMs. node_statistics, vm_statistics = __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode) node_statistics, vm_statistics = __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode) logging.info(f'{info_prefix} Balancing calculations done.') return node_statistics, vm_statistics def __validate_balancing_method(balancing_method): """ Validate for valid and supported balancing method. """ error_prefix = 'Error: [balancing-method-validation]:' info_prefix = 'Info: [balancing-method-validation]:' if balancing_method not in ['memory', 'disk', 'cpu']: logging.error(f'{error_prefix} Invalid balancing method: {balancing_method}') sys.exit(2) else: logging.info(f'{info_prefix} Valid balancing method: {balancing_method}') def __validate_balancing_mode(balancing_mode): """ Validate for valid and supported balancing mode. """ error_prefix = 'Error: [balancing-mode-validation]:' info_prefix = 'Info: [balancing-mode-validation]:' if balancing_mode not in ['used', 'assigned']: logging.error(f'{error_prefix} Invalid balancing method: {balancing_mode}') sys.exit(2) else: logging.info(f'{info_prefix} Valid balancing method: {balancing_mode}') def __validate_vm_statistics(vm_statistics): """ Validate for at least a single object of type CT/VM to rebalance. """ error_prefix = 'Error: [balancing-vm-stats-validation]:' if len(vm_statistics) == 0: logging.error(f'{error_prefix} Not a single CT/VM found in cluster.') sys.exit(1) def __validate_balanciness(balanciness, balancing_method, balancing_mode, node_statistics): """ Validate for balanciness to ensure further rebalancing is needed. """ info_prefix = 'Info: [balanciness-validation]:' node_resource_percent_list = [] node_assigned_percent_match = [] # Remap balancing mode to get the related values from nodes dict. if balancing_mode == 'used': node_resource_selector = 'free' if balancing_mode == 'assigned': node_resource_selector = 'assigned' for node_name, node_info in node_statistics.items(): # Save information of nodes from current run to compare them in the next recursion. if node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent_last_run'] == node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent']: node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent_match'] = True else: node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent_match'] = False # Update value to the current value of the recursion run. node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent_last_run'] = node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent'] # If all node resources are unchanged, the recursion can be left. for key, value in node_statistics.items(): node_assigned_percent_match.append(value.get(f'{balancing_method}_{node_resource_selector}_percent_match', False)) if False not in node_assigned_percent_match: return False # Add node information to resource list. node_resource_percent_list.append(int(node_info[f'{balancing_method}_{node_resource_selector}_percent'])) logging.debug(f'{info_prefix} Node: {node_name} with values: {node_info}') # Create a sorted list of the delta + balanciness between the node resources. node_resource_percent_list_sorted = sorted(node_resource_percent_list) node_lowest_percent = node_resource_percent_list_sorted[0] node_highest_percent = node_resource_percent_list_sorted[-1] # Validate if the recursion should be proceeded for further rebalancing. if (int(node_lowest_percent) + int(balanciness)) < int(node_highest_percent): logging.info(f'{info_prefix} Rebalancing for {balancing_method} is needed. Highest usage: {int(node_highest_percent)}% | Lowest usage: {int(node_lowest_percent)}%.') return True else: logging.info(f'{info_prefix} Rebalancing for {balancing_method} is not needed. Highest usage: {int(node_highest_percent)}% | Lowest usage: {int(node_lowest_percent)}%.') return False def __get_most_used_resources_vm(balancing_method, balancing_mode, vm_statistics, processed_vms): """ Get and return the most used resources of a VM by the defined balancing method. """ info_prefix = 'Info: [get-most-used-resources-vm]:' # Remap balancing mode to get the related values from nodes dict. if balancing_mode == 'used': vm_resource_selector = 'used' if balancing_mode == 'assigned': vm_resource_selector = 'total' vm = max(vm_statistics.items(), key=lambda item: item[1][f'{balancing_method}_{vm_resource_selector}'] if item[0] not in processed_vms else -float('inf')) processed_vms.append(vm[0]) logging.info(f'{info_prefix} {vm}') return vm, processed_vms def __get_most_free_resources_node(balancing_method, balancing_mode, balancing_mode_option, node_statistics): """ Get and return the most free resources of a node by the defined balancing method. """ info_prefix = 'Info: [get-most-free-resources-nodes]:' # Return the node information based on the balancing mode. if balancing_mode == 'used' and balancing_mode_option == 'bytes': node = max(node_statistics.items(), key=lambda item: item[1][f'{balancing_method}_free']) if balancing_mode == 'used' and balancing_mode_option == 'percent': node = max(node_statistics.items(), key=lambda item: item[1][f'{balancing_method}_free_percent']) if balancing_mode == 'assigned': node = min(node_statistics.items(), key=lambda item: item[1][f'{balancing_method}_assigned'] if item[1][f'{balancing_method}_assigned_percent'] > 0 or item[1][f'{balancing_method}_assigned_percent'] < 100 else -float('inf')) logging.info(f'{info_prefix} {node}') return node def __update_vm_resource_statistics(resource_highest_used_resources_vm, resource_highest_free_resources_node, vm_statistics, node_statistics, balancing_method, balancing_mode): """ Update VM and node resource statistics. """ info_prefix = 'Info: [rebalancing-resource-statistics-update]:' if resource_highest_used_resources_vm[1]['node_parent'] != resource_highest_free_resources_node[0]: vm_name = resource_highest_used_resources_vm[0] vm_node_parent = resource_highest_used_resources_vm[1]['node_parent'] vm_node_rebalance = resource_highest_free_resources_node[0] vm_resource_used = vm_statistics[resource_highest_used_resources_vm[0]][f'{balancing_method}_used'] vm_resource_total = vm_statistics[resource_highest_used_resources_vm[0]][f'{balancing_method}_total'] # Update dictionaries for new values # Assign new rebalance node to vm vm_statistics[vm_name]['node_rebalance'] = vm_node_rebalance logging.info(f'Moving {vm_name} from {vm_node_parent} to {vm_node_rebalance}') # Recalculate values for nodes ## Add freed resources to old parent node node_statistics[vm_node_parent][f'{balancing_method}_used'] = int(node_statistics[vm_node_parent][f'{balancing_method}_used']) - int(vm_resource_used) node_statistics[vm_node_parent][f'{balancing_method}_free'] = int(node_statistics[vm_node_parent][f'{balancing_method}_free']) + int(vm_resource_used) node_statistics[vm_node_parent][f'{balancing_method}_free_percent'] = int(int(node_statistics[vm_node_parent][f'{balancing_method}_free']) / int(node_statistics[vm_node_parent][f'{balancing_method}_total']) * 100) node_statistics[vm_node_parent][f'{balancing_method}_assigned'] = int(node_statistics[vm_node_parent][f'{balancing_method}_assigned']) - int(vm_resource_total) node_statistics[vm_node_parent][f'{balancing_method}_assigned_percent'] = int(int(node_statistics[vm_node_parent][f'{balancing_method}_assigned']) / int(node_statistics[vm_node_parent][f'{balancing_method}_total']) * 100) ## Removed newly allocated resources to new rebalanced node node_statistics[vm_node_rebalance][f'{balancing_method}_used'] = int(node_statistics[vm_node_rebalance][f'{balancing_method}_used']) + int(vm_resource_used) node_statistics[vm_node_rebalance][f'{balancing_method}_free'] = int(node_statistics[vm_node_rebalance][f'{balancing_method}_free']) - int(vm_resource_used) node_statistics[vm_node_rebalance][f'{balancing_method}_free_percent'] = int(int(node_statistics[vm_node_rebalance][f'{balancing_method}_free']) / int(node_statistics[vm_node_rebalance][f'{balancing_method}_total']) * 100) node_statistics[vm_node_rebalance][f'{balancing_method}_assigned'] = int(node_statistics[vm_node_rebalance][f'{balancing_method}_assigned']) + int(vm_resource_total) node_statistics[vm_node_rebalance][f'{balancing_method}_assigned_percent'] = int(int(node_statistics[vm_node_rebalance][f'{balancing_method}_assigned']) / int(node_statistics[vm_node_rebalance][f'{balancing_method}_total']) * 100) logging.info(f'{info_prefix} Updated VM and node statistics.') return node_statistics, vm_statistics def __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode): """ Get VMs tags for include groups. """ info_prefix = 'Info: [rebalancing-tags-group-include]:' tags_include_vms = {} processed_vm = [] # Create groups of tags with belongings hosts. for vm_name, vm_values in vm_statistics.items(): if vm_values.get('group_include', None): if not tags_include_vms.get(vm_values['group_include'], None): tags_include_vms[vm_values['group_include']] = [vm_name] else: tags_include_vms[vm_values['group_include']] = tags_include_vms[vm_values['group_include']] + [vm_name] # Update the VMs to the corresponding node to their group assignments. for group, vm_names in tags_include_vms.items(): # Do not take care of tags that have only a single host included. if len(vm_names) < 2: logging.info(f'{info_prefix} Only one host in group assignment.') return node_statistics, vm_statistics vm_node_rebalance = False logging.info(f'{info_prefix} Create include groups of VM hosts.') for vm_name in vm_names: if vm_name not in processed_vm: if not vm_node_rebalance: vm_node_rebalance = vm_statistics[vm_name]['node_rebalance'] else: _mocked_vm_object = (vm_name, vm_statistics[vm_name]) node_statistics, vm_statistics = __update_vm_resource_statistics(_mocked_vm_object, [vm_node_rebalance], vm_statistics, node_statistics, balancing_method, balancing_mode) processed_vm.append(vm_name) return node_statistics, vm_statistics def __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode): """ Get VMs tags for exclude groups. """ info_prefix = 'Info: [rebalancing-tags-group-exclude]:' tags_exclude_vms = {} processed_vm = [] # Create groups of tags with belongings hosts. for vm_name, vm_values in vm_statistics.items(): if vm_values.get('group_include', None): if not tags_exclude_vms.get(vm_values['group_include'], None): tags_exclude_vms[vm_values['group_include']] = [vm_name] else: tags_exclude_vms[vm_values['group_include']] = tags_exclude_vms[vm_values['group_include']] + [vm_name] # Update the VMs to the corresponding node to their group assignments. for group, vm_names in tags_exclude_vms.items(): # Do not take care of tags that have only a single host included. if len(vm_names) < 2: logging.info(f'{info_prefix} Only one host in group assignment.') return node_statistics, vm_statistics vm_node_rebalance = False logging.info(f'{info_prefix} Create exclude groups of VM hosts.') for vm_name in vm_names: if vm_name not in processed_vm: if not vm_node_rebalance: random_node = vm_statistics[vm_name]['node_parent'] # Get a random node and make sure that it is not by accident the # currently assigned one. while random_node == vm_statistics[vm_name]['node_parent']: random_node = random.choice(list(node_statistics.keys())) else: _mocked_vm_object = (vm_name, vm_statistics[vm_name]) node_statistics, vm_statistics = __update_vm_resource_statistics(_mocked_vm_object, [random_node], vm_statistics, node_statistics, balancing_method, balancing_mode) processed_vm.append(vm_name) return node_statistics, vm_statistics def __wait_job_finalized(api_object, node_name, job_id, counter): """ Wait for a job to be finalized. """ error_prefix = 'Error: [job-status-getter]:' info_prefix = 'Info: [job-status-getter]:' logging.info(f'{info_prefix} Getting job status for job {job_id}.') task = api_object.nodes(node_name).tasks(job_id).status().get() logging.info(f'{info_prefix} {task}') if task['status'] == 'running': logging.info(f'{info_prefix} Validating job {job_id} for the {counter} run.') # Do not run for infinity this recursion and fail when reaching the limit. if counter == 300: logging.critical(f'{error_prefix} The job {job_id} on node {node_name} did not finished in time for migration.') time.sleep(5) counter = counter + 1 logging.info(f'{info_prefix} Revalidating job {job_id} in a next run.') __wait_job_finalized(api_object, node_name, job_id, counter) logging.info(f'{info_prefix} Job {job_id} for migration from {node_name} terminiated succesfully.') def __run_vm_rebalancing(api_object, _vm_vm_statistics, app_args, parallel_migrations): """ Run & execute the VM rebalancing via API. """ error_prefix = 'Error: [vm-rebalancing-executor]:' info_prefix = 'Info: [vm-rebalancing-executor]:' # Remove VMs/CTs that do not have a new node location. vms_to_remove = [vm_name for vm_name, vm_info in _vm_vm_statistics.items() if 'node_rebalance' in vm_info and vm_info['node_rebalance'] == vm_info.get('node_parent')] for vm_name in vms_to_remove: del _vm_vm_statistics[vm_name] if len(_vm_vm_statistics) > 0 and not app_args.dry_run: for vm, value in _vm_vm_statistics.items(): try: # Migrate type VM (live migration). if value['type'] == 'vm': logging.info(f'{info_prefix} Rebalancing VM {vm} from node {value["node_parent"]} to node {value["node_rebalance"]}.') job_id = api_object.nodes(value['node_parent']).qemu(value['vmid']).migrate().post(target=value['node_rebalance'],online=1) # Migrate type CT (requires restart of container). if value['type'] == 'ct': logging.info(f'{info_prefix} Rebalancing CT {vm} from node {value["node_parent"]} to node {value["node_rebalance"]}.') job_id = api_object.nodes(value['node_parent']).lxc(value['vmid']).migrate().post(target=value['node_rebalance'],restart=1) except proxmoxer.core.ResourceException as error_resource: logging.critical(f'{error_prefix} {error_resource}') # Wait for migration to be finished unless running parallel migrations. if not bool(int(parallel_migrations)): logging.info(f'{info_prefix} Rebalancing will be performed sequentially.') __wait_job_finalized(api_object, value['node_parent'], job_id, counter=1) else: logging.info(f'{info_prefix} Rebalancing will be performed parallely.') else: logging.info(f'{info_prefix} No rebalancing needed.') return _vm_vm_statistics def __run_storage_rebalancing(api_object, _storage_vm_statistics, app_args, parallel_migrations): """ Run & execute the storage rebalancing via API. """ error_prefix = 'Error: [storage-rebalancing-executor]:' info_prefix = 'Info: [storage-rebalancing-executor]:' # Remove VMs/CTs that do not have a new storage location. vms_to_remove = [vm_name for vm_name, vm_info in _storage_vm_statistics.items() if all(storage.get('storage_rebalance') == storage.get('storage_parent') for storage in vm_info.get('storage', {}).values())] for vm_name in vms_to_remove: del _storage_vm_statistics[vm_name] if len(_storage_vm_statistics) > 0 and not app_args.dry_run: for vm, value in _storage_vm_statistics.items(): for disk, disk_info in value['storage'].items(): if disk_info.get('storage_rebalance', None) is not None: try: # Migrate type VM (live migration). logging.info(f'{info_prefix} Rebalancing storage of VM {vm} from node.') job_id = api_object.nodes(value['node_parent']).qemu(value['vmid']).move_disk().post(disk=disk,storage=disk_info.get('storage_rebalance', None), delete=1) except proxmoxer.core.ResourceException as error_resource: logging.critical(f'{error_prefix} {error_resource}') # Wait for migration to be finished unless running parallel migrations. if not bool(int(parallel_migrations)): logging.info(f'{info_prefix} Rebalancing will be performed sequentially.') __wait_job_finalized(api_object, value['node_parent'], job_id, counter=1) else: logging.info(f'{info_prefix} Rebalancing will be performed parallely.') else: logging.info(f'{info_prefix} No rebalancing needed.') return _storage_vm_statistics def __create_json_output(vm_statistics, app_args): """ Create a machine parsable json output of VM rebalance statitics. """ info_prefix = 'Info: [json-output-generator]:' if app_args.json: logging.info(f'{info_prefix} Printing json output of VM statistics.') print(json.dumps(vm_statistics)) def __create_cli_output(vm_statistics, app_args): """ Create output for CLI when running in dry-run mode. """ info_prefix_dry_run = 'Info: [cli-output-generator-dry-run]:' info_prefix_run = 'Info: [cli-output-generator]:' vm_to_node_list = [] if app_args.dry_run: info_prefix = info_prefix_dry_run logging.info(f'{info_prefix} Starting dry-run to rebalance vms to their new nodes.') else: info_prefix = info_prefix_run logging.info(f'{info_prefix} Start rebalancing vms to their new nodes.') vm_to_node_list.append(['VM', 'Current Node', 'Rebalanced Node', 'Current Storage', 'Rebalanced Storage', 'VM Type']) for vm_name, vm_values in vm_statistics.items(): for disk, disk_values in vm_values['storage'].items(): vm_to_node_list.append([vm_name, vm_values['node_parent'], vm_values['node_rebalance'], f'{disk_values.get("storage_parent", "N/A")} ({disk_values.get("device_name", "N/A")})', f'{disk_values.get("storage_rebalance", "N/A")} ({disk_values.get("device_name", "N/A")})', vm_values['type']]) if len(vm_statistics) > 0: logging.info(f'{info_prefix} Printing cli output of VM rebalancing.') __print_table_cli(vm_to_node_list, app_args.dry_run) else: logging.info(f'{info_prefix} No rebalancing needed.') def __print_table_cli(table, dry_run=False): """ Pretty print a given table to the cli. """ info_prefix_dry_run = 'Info: [cli-output-generator-table-dryn-run]:' info_prefix_run = 'Info: [cli-output-generator-table]:' info_prefix = info_prefix_run longest_cols = [ (max([len(str(row[i])) for row in table]) + 3) for i in range(len(table[0])) ] row_format = "".join(["{:>" + str(longest_col) + "}" for longest_col in longest_cols]) for row in table: # Print CLI output when running in dry-run mode to make the user's life easier. if dry_run: info_prefix = info_prefix_dry_run print(row_format.format(*row)) # Log all items in info mode. logging.info(f'{info_prefix} {row_format.format(*row)}') def run_rebalancing(api_object, vm_statistics, app_args, parallel_migrations, balancing_type): """ Run rebalancing of vms to new nodes in cluster. """ _vm_vm_statistics = {} _storage_vm_statistics = {} if balancing_type == 'vm': _vm_vm_statistics = copy.deepcopy(vm_statistics) _vm_vm_statistics = __run_vm_rebalancing(api_object, _vm_vm_statistics, app_args, parallel_migrations) return _vm_vm_statistics if balancing_type == 'storage': _storage_vm_statistics = copy.deepcopy(vm_statistics) _storage_vm_statistics = __run_storage_rebalancing(api_object, _storage_vm_statistics, app_args, parallel_migrations) return _storage_vm_statistics def run_output_rebalancing(app_args, vm_output_statistics, storage_output_statistics): """ Generate output of rebalanced resources. """ output_statistics = {**vm_output_statistics, **storage_output_statistics} __create_json_output(output_statistics, app_args) __create_cli_output(output_statistics, app_args) def balancing_storage_calculations(storage_balancing_method, storage_statistics, vm_statistics, balanciness, rebalance, processed_vms): """ Calculate re-balancing of storage on present datastores across the cluster. """ info_prefix = 'Info: [storage-rebalancing-calculator]:' # Validate for a supported balancing method, mode and if rebalancing is required. __validate_vm_statistics(vm_statistics) rebalance = __validate_storage_balanciness(balanciness, storage_balancing_method, storage_statistics) if rebalance: vm_name, vm_disk_device = __get_most_used_resources_vm_storage(vm_statistics) if vm_name not in processed_vms: processed_vms.append(vm_name) resources_storage_most_free = __get_most_free_storage(storage_balancing_method, storage_statistics) # Update resource statistics for VMs and storage. storage_statistics, vm_statistic = __update_resource_storage_statistics(storage_statistics, resources_storage_most_free, vm_statistics, vm_name, vm_disk_device) # Start recursion until we do not have any needs to rebalance anymore. balancing_storage_calculations(storage_balancing_method, storage_statistics, vm_statistics, balanciness, rebalance, processed_vms) logging.info(f'{info_prefix} Balancing calculations done.') return storage_statistics, vm_statistics def __validate_storage_balanciness(balanciness, storage_balancing_method, storage_statistics): """ Validate for balanciness of storage to ensure further rebalancing is needed. """ info_prefix = 'Info: [storage-balanciness-validation]:' error_prefix = 'Error: [storage-balanciness-validation]:' storage_resource_percent_list = [] storage_assigned_percent_match = [] # Validate for an allowed balancing method and define the storage resource selector. if storage_balancing_method == 'disk_space': logging.info(f'{info_prefix} Getting most free storage volume by disk size.') storage_resource_selector = 'used' elif storage_balancing_method == 'disk_io': logging.error(f'{error_prefix} Getting most free storage volume by disk IO is not yet supported.') sys.exit(2) else: logging.error(f'{error_prefix} Getting most free storage volume by disk IO is not yet supported.') sys.exit(2) # Obtain the metrics for storage_name, storage_info in storage_statistics.items(): logging.info(f'{info_prefix} Validating storage: {storage_name} for balanciness for usage with: {storage_balancing_method}.') # Save information of nodes from current run to compare them in the next recursion. if storage_statistics[storage_name][f'{storage_resource_selector}_percent_last_run'] == storage_statistics[storage_name][f'{storage_resource_selector}_percent']: storage_statistics[storage_name][f'{storage_resource_selector}_percent_match'] = True else: storage_statistics[storage_name][f'{storage_resource_selector}_percent_match'] = False # Update value to the current value of the recursion run. storage_statistics[storage_name][f'{storage_resource_selector}_percent_last_run'] = storage_statistics[storage_name][f'{storage_resource_selector}_percent'] # If all node resources are unchanged, the recursion can be left. for key, value in storage_statistics.items(): storage_assigned_percent_match.append(value.get(f'{storage_resource_selector}_percent_match', False)) if False not in storage_assigned_percent_match: return False # Add node information to resource list. storage_resource_percent_list.append(int(storage_info[f'{storage_resource_selector}_percent'])) logging.info(f'{info_prefix} Storage: {storage_name} with values: {storage_info}') # Create a sorted list of the delta + balanciness between the node resources. storage_resource_percent_list_sorted = sorted(storage_resource_percent_list) storage_lowest_percent = storage_resource_percent_list_sorted[0] storage_highest_percent = storage_resource_percent_list_sorted[-1] # Validate if the recursion should be proceeded for further rebalancing. if (int(storage_lowest_percent) + int(balanciness)) < int(storage_highest_percent): logging.info(f'{info_prefix} Rebalancing for type "{storage_resource_selector}" of storage is needed. Highest usage: {int(storage_highest_percent)}% | Lowest usage: {int(storage_lowest_percent)}%.') return True else: logging.info(f'{info_prefix} Rebalancing for type "{storage_resource_selector}" of storage is not needed. Highest usage: {int(storage_highest_percent)}% | Lowest usage: {int(storage_lowest_percent)}%.') return False def __get_most_used_resources_vm_storage(vm_statistics): """ Get and return the most used disk of a VM by storage. """ info_prefix = 'Info: [get-most-used-disks-resources-vm]:' # Get the biggest storage of a VM/CT. A VM/CT can hold multiple disks. Therefore, we need to iterate # over all assigned disks to get the biggest one. vm_object = sorted( vm_statistics.items(), key=lambda x: max( (size_in_bytes(storage['size']) for storage in x[1].get('storage', {}).values() if 'size' in storage), default=0 ), reverse=True ) vm_object = vm_object[0] vm_name = vm_object[0] vm_disk_device = max(vm_object[1]['storage'], key=lambda x: int(vm_object[1]['storage'][x]['size'])) logging.info(f'{info_prefix} Got most used VM: {vm_name} with storage device: {vm_disk_device}.') return vm_name, vm_disk_device def __get_most_free_storage(storage_balancing_method, storage_statistics): """ Get the storage with the most free space or IO, depending on the balancing mode. """ info_prefix = 'Info: [get-most-free-storage]:' error_prefix = 'Error: [get-most-free-storage]:' storage_volume = None logging.info(f'{info_prefix} Starting to evaluate the most free storage volume.') if storage_balancing_method == 'disk_space': logging.info(f'{info_prefix} Getting most free storage volume by disk space.') storage_volume = max(storage_statistics, key=lambda x: storage_statistics[x]['free_percent']) if storage_balancing_method == 'disk_io': logging.info(f'{info_prefix} Getting most free storage volume by disk IO.') logging.error(f'{error_prefix} Getting most free storage volume by disk IO is not yet supported.') sys.exit(2) return storage_volume def __update_resource_storage_statistics(storage_statistics, resources_storage_most_free, vm_statistics, vm_name, vm_disk_device): """ Update VM and storage resource statistics. """ info_prefix = 'Info: [rebalancing-storage-resource-statistics-update]:' current_storage = vm_statistics[vm_name]['storage'][vm_disk_device]['storage_parent'] current_storage_size = storage_statistics[current_storage]['free'] / (1024 ** 3) rebalance_storage = resources_storage_most_free rebalance_storage_size = storage_statistics[rebalance_storage]['free'] / (1024 ** 3) vm_storage_size = vm_statistics[vm_name]['storage'][vm_disk_device]['size'] vm_storage_size_bytes = int(vm_storage_size) * 1024**3 # Assign new storage device to vm logging.info(f'{info_prefix} Validating VM {vm_name} for potential storage rebalancing.') if vm_statistics[vm_name]['storage'][vm_disk_device]['storage_rebalance'] == vm_statistics[vm_name]['storage'][vm_disk_device]['storage_parent']: logging.info(f'{info_prefix} Setting VM {vm_name} from {current_storage} to {rebalance_storage} storage.') vm_statistics[vm_name]['storage'][vm_disk_device]['storage_rebalance'] = resources_storage_most_free else: logging.info(f'{info_prefix} Setting VM {vm_name} from {current_storage} to {rebalance_storage} storage.') # Recalculate values for storage ## Add freed resources to old parent storage device storage_statistics[current_storage]['used'] = storage_statistics[current_storage]['used'] - vm_storage_size_bytes storage_statistics[current_storage]['free'] = storage_statistics[current_storage]['free'] + vm_storage_size_bytes storage_statistics[current_storage]['free_percent'] = (storage_statistics[current_storage]['free'] / storage_statistics[current_storage]['total']) * 100 storage_statistics[current_storage]['used_percent'] = (storage_statistics[current_storage]['used'] / storage_statistics[current_storage]['total']) * 100 logging.info(f'{info_prefix} Adding free space of {vm_storage_size}G to old storage with {current_storage_size}G. [free: {int(current_storage_size) + int(vm_storage_size)}G | {storage_statistics[current_storage]["free_percent"]}%]') ## Removed newly allocated resources to new rebalanced storage device storage_statistics[rebalance_storage]['used'] = storage_statistics[rebalance_storage]['used'] + vm_storage_size_bytes storage_statistics[rebalance_storage]['free'] = storage_statistics[rebalance_storage]['free'] - vm_storage_size_bytes storage_statistics[rebalance_storage]['free_percent'] = (storage_statistics[rebalance_storage]['free'] / storage_statistics[rebalance_storage]['total']) * 100 storage_statistics[rebalance_storage]['used_percent'] = (storage_statistics[rebalance_storage]['used'] / storage_statistics[rebalance_storage]['total']) * 100 logging.info(f'{info_prefix} Adding used space of {vm_storage_size}G to new storage with {rebalance_storage_size}G. [free: {int(rebalance_storage_size) - int(vm_storage_size)}G | {storage_statistics[rebalance_storage]["free_percent"]}%]') logging.info(f'{info_prefix} Updated VM and storage statistics.') return storage_statistics, vm_statistics def size_in_bytes(size_str): size_unit = size_str[-1].upper() size_value = float(size_str) size_multipliers = {'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} return size_value * size_multipliers.get(size_unit, 1) def main(): """ Run ProxLB for balancing VM workloads across a Proxmox cluster. """ vm_output_statistics = {} storage_output_statistics = {} # Initialize PAS. initialize_logger('CRITICAL') app_args = initialize_args() config_path = initialize_config_path(app_args) pre_validations(config_path) # Parse global config. proxlb_config = initialize_config_options(config_path) # Overwrite logging handler with user defined log verbosity. initialize_logger(proxlb_config['log_verbosity'], update_log_verbosity=True) while True: # API Authentication. api_object = api_connect(proxlb_config['proxmox_api_host'], proxlb_config['proxmox_api_user'], proxlb_config['proxmox_api_pass'], proxlb_config['proxmox_api_ssl_v']) # Get master node of cluster and ensure that ProxLB is only performed on the # cluster master node to avoid ongoing rebalancing. cluster_master, master_only = execute_rebalancing_only_by_master(api_object, proxlb_config['master_only']) # Validate daemon service and skip following tasks when not being the cluster master. if not cluster_master and master_only: validate_daemon(proxlb_config['daemon'], proxlb_config['schedule']) continue # Get metric & statistics for vms and nodes. if proxlb_config['vm_balancing_enable'] or proxlb_config['storage_balancing_enable'] or app_args.best_node: node_statistics = get_node_statistics(api_object, proxlb_config['vm_ignore_nodes']) vm_statistics = get_vm_statistics(api_object, proxlb_config['vm_ignore_vms'], proxlb_config['vm_balancing_type']) node_statistics = update_node_statistics(node_statistics, vm_statistics) storage_statistics = get_storage_statistics(api_object) # Execute VM/CT balancing sub-routines. if proxlb_config['vm_balancing_enable'] or app_args.best_node: node_statistics, vm_statistics = balancing_vm_calculations(proxlb_config['vm_balancing_method'], proxlb_config['vm_balancing_mode'], proxlb_config['vm_balancing_mode_option'], node_statistics, vm_statistics, proxlb_config['vm_balanciness'], app_args, rebalance=False, processed_vms=[]) vm_output_statistics = run_rebalancing(api_object, vm_statistics, app_args, proxlb_config['vm_parallel_migrations'], 'vm') # Execute storage balancing sub-routines. if proxlb_config['storage_balancing_enable']: storage_statistics, vm_statistics = balancing_storage_calculations(proxlb_config['storage_balancing_method'], storage_statistics, vm_statistics, proxlb_config['storage_balanciness'], rebalance=False, processed_vms=[]) storage_output_statistics = run_rebalancing(api_object, vm_statistics, app_args, proxlb_config['storage_parallel_migrations'], 'storage') # Generate balancing output if proxlb_config['vm_balancing_enable'] or proxlb_config['storage_balancing_enable']: run_output_rebalancing(app_args, vm_output_statistics, storage_output_statistics) # Validate for any errors. post_validations() # Validate daemon service. validate_daemon(proxlb_config['daemon'], proxlb_config['schedule']) if __name__ == '__main__': main()