mirror of
https://github.com/gyptazy/ProxLB.git
synced 2026-04-05 20:31:57 +02:00
1580 lines
85 KiB
Python
Executable File
1580 lines
85 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# ProxLB
|
|
# ProxLB (re)balances VM workloads across nodes in Proxmox clusters.
|
|
# ProxLB obtains current metrics from all nodes within the cluster for
|
|
# further auto balancing by memory, disk or cpu and rebalances the VMs
|
|
# over all available nodes in a cluster by having an equal resource usage.
|
|
# Copyright (C) 2024 Florian Paul Azim Hoberg @gyptazy <gyptazy@gyptazy.ch>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import argparse
|
|
import configparser
|
|
import copy
|
|
import json
|
|
import logging
|
|
import os
|
|
try:
|
|
import proxmoxer
|
|
_imports = True
|
|
except ImportError:
|
|
_imports = False
|
|
import random
|
|
import re
|
|
import requests
|
|
import socket
|
|
import sys
|
|
import time
|
|
import urllib3
|
|
|
|
|
|
# Constants
|
|
__appname__ = "ProxLB"
|
|
__version__ = "1.0.5"
|
|
__config_version__ = 3
|
|
__author__ = "Florian Paul Azim Hoberg <gyptazy@gyptazy.com> @gyptazy"
|
|
__errors__ = False
|
|
|
|
|
|
# Classes
|
|
## Logging class
|
|
class SystemdHandler(logging.Handler):
|
|
""" Class to handle logging options. """
|
|
PREFIX = {
|
|
logging.CRITICAL: "<2> " + __appname__ + ": ",
|
|
logging.ERROR: "<3> " + __appname__ + ": ",
|
|
logging.WARNING: "<4> " + __appname__ + ": ",
|
|
logging.INFO: "<6> " + __appname__ + ": ",
|
|
logging.DEBUG: "<7> " + __appname__ + ": ",
|
|
logging.NOTSET: "<7 " + __appname__ + ": ",
|
|
}
|
|
|
|
def __init__(self, stream=sys.stdout):
|
|
self.stream = stream
|
|
logging.Handler.__init__(self)
|
|
|
|
def emit(self, record):
|
|
try:
|
|
msg = self.PREFIX[record.levelno] + self.format(record) + "\n"
|
|
self.stream.write(msg)
|
|
self.stream.flush()
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
|
|
# Functions
|
|
def initialize_logger(log_level, update_log_verbosity=False):
|
|
""" Initialize ProxLB logging handler. """
|
|
info_prefix = 'Info: [logger]:'
|
|
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(log_level)
|
|
|
|
if not update_log_verbosity:
|
|
root_logger.addHandler(SystemdHandler())
|
|
logging.info(f'{info_prefix} Logger got initialized.')
|
|
else:
|
|
logging.info(f'{info_prefix} Logger verbosity got updated to: {log_level}.')
|
|
|
|
|
|
def pre_validations(config_path, proxlb_config=False):
|
|
""" Run pre-validations as sanity checks. """
|
|
info_prefix = 'Info: [pre-validations]:'
|
|
|
|
if proxlb_config:
|
|
logging.info(f'{info_prefix} Validating ProxLB config file content.')
|
|
__validate_config_content(proxlb_config)
|
|
logging.info(f'{info_prefix} ProxLB config file content validation done.')
|
|
else:
|
|
logging.info(f'{info_prefix} Validating basic configuration.')
|
|
__validate_imports()
|
|
__validate_config_file(config_path)
|
|
logging.info(f'{info_prefix} All pre-validations done.')
|
|
|
|
|
|
def post_validations():
|
|
""" Run post-validations as sanity checks. """
|
|
error_prefix = 'Error: [post-validations]:'
|
|
info_prefix = 'Info: [post-validations]:'
|
|
|
|
if __errors__:
|
|
logging.critical(f'{error_prefix} Not all post-validations succeeded. Please validate!')
|
|
else:
|
|
logging.info(f'{info_prefix} All post-validations succeeded.')
|
|
|
|
|
|
def validate_daemon(daemon, schedule):
|
|
""" Validate if ProxLB runs as a daemon. """
|
|
info_prefix = 'Info: [daemon]:'
|
|
|
|
if bool(int(daemon)):
|
|
logging.info(f'{info_prefix} Running in daemon mode. Next run in {schedule} hours.')
|
|
time.sleep(int(schedule) * 60 * 60)
|
|
else:
|
|
logging.info(f'{info_prefix} Not running in daemon mode. Quitting.')
|
|
sys.exit(0)
|
|
|
|
|
|
def __validate_imports():
|
|
""" Validate if all Python imports succeeded. """
|
|
error_prefix = 'Error: [python-imports]:'
|
|
info_prefix = 'Info: [python-imports]:'
|
|
|
|
if not _imports:
|
|
logging.critical(f'{error_prefix} Could not import all dependencies. Please install "proxmoxer".')
|
|
sys.exit(2)
|
|
else:
|
|
logging.info(f'{info_prefix} All required dependencies were imported.')
|
|
|
|
|
|
def __validate_config_file(config_path):
|
|
""" Validate if all Python imports succeeded. """
|
|
error_prefix = 'Error: [config]:'
|
|
info_prefix = 'Info: [config]:'
|
|
|
|
if not os.path.isfile(config_path):
|
|
logging.critical(f'{error_prefix} Could not find config file in: {config_path}.')
|
|
sys.exit(2)
|
|
else:
|
|
logging.info(f'{info_prefix} Configuration file loaded from: {config_path}.')
|
|
|
|
|
|
def __validate_config_content(proxlb_config):
|
|
""" Validate the user's config options. """
|
|
error_prefix = 'Error: [config]:'
|
|
info_prefix = 'Info: [config]:'
|
|
|
|
validate_bool_options = [
|
|
'proxmox_api_ssl_v',
|
|
'vm_balancing_enable',
|
|
'vm_parallel_migrations',
|
|
'storage_balancing_enable',
|
|
'storage_parallel_migrations',
|
|
'update_service',
|
|
'api',
|
|
'master_only',
|
|
'daemon'
|
|
]
|
|
|
|
for bool_val in validate_bool_options:
|
|
if type(proxlb_config.get(bool_val, None)) == bool:
|
|
logging.info(f'{info_prefix} Config option {bool_val} is in a correct format.')
|
|
else:
|
|
logging.critical(f'{error_prefix} Config option {bool_val} is incorrect: {proxlb_config.get(bool_val, None)}')
|
|
sys.exit(2)
|
|
|
|
validate_string_options = [
|
|
'vm_balancing_method',
|
|
'vm_balancing_mode',
|
|
'vm_balancing_mode_option',
|
|
'vm_balancing_type',
|
|
'storage_balancing_method',
|
|
'log_verbosity'
|
|
]
|
|
|
|
whitelist_string_options = {
|
|
'vm_balancing_method': ['memory', 'disk', 'cpu'],
|
|
'vm_balancing_mode': ['used', 'assigned'],
|
|
'vm_balancing_mode_option': ['bytes', 'percent'],
|
|
'vm_balancing_type': ['vm', 'ct', 'all'],
|
|
'storage_balancing_method': ['disk_space'],
|
|
'log_verbosity': ['DEBUG', 'INFO', 'WARNING', 'CRITICAL']
|
|
}
|
|
|
|
for string_val in validate_string_options:
|
|
if proxlb_config[string_val] in whitelist_string_options[string_val]:
|
|
logging.info(f'{info_prefix} Config option {string_val} is in a correct format.')
|
|
else:
|
|
logging.critical(f'{error_prefix} Config option {string_val} is incorrect: {proxlb_config.get(string_val, None)}')
|
|
sys.exit(2)
|
|
|
|
|
|
def initialize_args():
|
|
""" Initialize given arguments for ProxLB. """
|
|
argparser = argparse.ArgumentParser(description='ProxLB')
|
|
argparser.add_argument('-c', '--config', help='Path to config file', type=str, required=False)
|
|
argparser.add_argument('-d', '--dry-run', help='Perform a dry-run without doing any actions.', action='store_true', required=False)
|
|
argparser.add_argument('-j', '--json', help='Return a JSON of the VM movement.', action='store_true', required=False)
|
|
argparser.add_argument('-b', '--best-node', help='Returns the best next node.', action='store_true', required=False)
|
|
argparser.add_argument('-m', '--maintenance', help='Sets node to maintenance mode & moves workloads away.', type=str, required=False)
|
|
argparser.add_argument('-v', '--version', help='Returns the current ProxLB version.', action='store_true', required=False)
|
|
return argparser.parse_args()
|
|
|
|
|
|
def proxlb_output_version():
|
|
""" Print ProxLB version information on CLI. """
|
|
print(f'{__appname__} version {__version__}\nRequired config version: >= {__config_version__}')
|
|
print('ProxLB support: https://github.com/gyptazy/ProxLB\nDeveloper: gyptazy.com')
|
|
sys.exit(0)
|
|
|
|
|
|
def initialize_config_path(app_args):
|
|
""" Initialize path to ProxLB config file. """
|
|
info_prefix = 'Info: [config]:'
|
|
|
|
config_path = app_args.config
|
|
if app_args.config is None:
|
|
config_path = '/etc/proxlb/proxlb.conf'
|
|
logging.info(f'{info_prefix} No config file provided. Falling back to: {config_path}.')
|
|
else:
|
|
logging.info(f'{info_prefix} Using config file: {config_path}.')
|
|
return config_path
|
|
|
|
|
|
def initialize_config_options(config_path):
|
|
""" Read configuration from given config file for ProxLB. """
|
|
error_prefix = 'Error: [config]:'
|
|
info_prefix = 'Info: [config]:'
|
|
proxlb_config = {}
|
|
|
|
try:
|
|
config = configparser.ConfigParser()
|
|
config.read(config_path)
|
|
# Proxmox config
|
|
proxlb_config['proxmox_api_host'] = config['proxmox']['api_host']
|
|
proxlb_config['proxmox_api_user'] = config['proxmox']['api_user']
|
|
proxlb_config['proxmox_api_pass'] = config['proxmox']['api_pass']
|
|
proxlb_config['proxmox_api_ssl_v'] = config['proxmox']['verify_ssl']
|
|
proxlb_config['proxmox_api_timeout'] = config['proxmox'].get('timeout', 10)
|
|
# VM Balancing
|
|
proxlb_config['vm_balancing_enable'] = config['vm_balancing'].get('enable', 1)
|
|
proxlb_config['vm_balancing_method'] = config['vm_balancing'].get('method', 'memory')
|
|
proxlb_config['vm_balancing_mode'] = config['vm_balancing'].get('mode', 'used')
|
|
proxlb_config['vm_balancing_mode_option'] = config['vm_balancing'].get('mode_option', 'bytes')
|
|
proxlb_config['vm_balancing_type'] = config['vm_balancing'].get('type', 'vm')
|
|
proxlb_config['vm_balanciness'] = config['vm_balancing'].get('balanciness', 10)
|
|
proxlb_config['vm_parallel_migrations'] = config['vm_balancing'].get('parallel_migrations', 1)
|
|
proxlb_config['vm_maintenance_nodes'] = config['vm_balancing'].get('maintenance_nodes', '')
|
|
proxlb_config['vm_ignore_nodes'] = config['vm_balancing'].get('ignore_nodes', '')
|
|
proxlb_config['vm_ignore_vms'] = config['vm_balancing'].get('ignore_vms', '')
|
|
proxlb_config['vm_enforce_affinity_groups'] = config['vm_balancing'].get('enforce_affinity_groups', 1)
|
|
# Storage Balancing
|
|
proxlb_config['storage_balancing_enable'] = config['storage_balancing'].get('enable', 0)
|
|
proxlb_config['storage_balancing_method'] = config['storage_balancing'].get('method', 'disk_space')
|
|
proxlb_config['storage_balanciness'] = config['storage_balancing'].get('balanciness', 10)
|
|
proxlb_config['storage_parallel_migrations'] = config['storage_balancing'].get('parallel_migrations', 1)
|
|
# Update Support
|
|
proxlb_config['update_service'] = config['update_service'].get('enable', 0)
|
|
# API
|
|
proxlb_config['api'] = config['update_service'].get('enable', 0)
|
|
# Service
|
|
proxlb_config['master_only'] = config['service'].get('master_only', 0)
|
|
proxlb_config['daemon'] = config['service'].get('daemon', 1)
|
|
proxlb_config['schedule'] = config['service'].get('schedule', 24)
|
|
proxlb_config['log_verbosity'] = config['service'].get('log_verbosity', 'CRITICAL')
|
|
proxlb_config['config_version'] = config['service'].get('config_version', 2)
|
|
except configparser.NoSectionError:
|
|
logging.critical(f'{error_prefix} Could not find the required section.')
|
|
sys.exit(2)
|
|
except configparser.ParsingError:
|
|
logging.critical(f'{error_prefix} Unable to parse the config file.')
|
|
sys.exit(2)
|
|
except KeyError:
|
|
logging.critical(f'{error_prefix} Could not find the required options in config file.')
|
|
sys.exit(2)
|
|
|
|
# Normalize and update bools. Afterwards, validate minimum required config version.
|
|
proxlb_config = __update_config_parser_bools(proxlb_config)
|
|
validate_config_minimum_version(proxlb_config)
|
|
logging.info(f'{info_prefix} Configuration file loaded.')
|
|
|
|
return proxlb_config
|
|
|
|
|
|
def __update_config_parser_bools(proxlb_config):
|
|
""" Update bools in config from configparser to real bools """
|
|
info_prefix = 'Info: [config-bool-converter]:'
|
|
ignore_sections = ['schedule']
|
|
|
|
# Normalize and update config parser values to bools.
|
|
for section, option_value in proxlb_config.items():
|
|
|
|
if option_value in [1, '1', 'yes', 'Yes', 'true', 'True', 'enable']:
|
|
if section not in ignore_sections:
|
|
logging.info(f'{info_prefix} Converting {section} to bool: True.')
|
|
proxlb_config[section] = True
|
|
|
|
if option_value in [0, '0', 'no', 'No', 'false', 'False', 'disable']:
|
|
if section not in ignore_sections:
|
|
logging.info(f'{info_prefix} Converting {section} to bool: False.')
|
|
proxlb_config[section] = False
|
|
|
|
return proxlb_config
|
|
|
|
|
|
def validate_config_minimum_version(proxlb_config):
|
|
""" Validate the minimum required config file for ProxLB """
|
|
info_prefix = 'Info: [config-version-validator]:'
|
|
error_prefix = 'Error: [config-version-validator]:'
|
|
|
|
if int(proxlb_config['config_version']) < __config_version__:
|
|
logging.error(f'{error_prefix} ProxLB config version {proxlb_config["config_version"]} is too low. Required: {__config_version__}.')
|
|
print(f'{error_prefix} ProxLB config version {proxlb_config["config_version"]} is too low. Required: {__config_version__}.')
|
|
sys.exit(1)
|
|
else:
|
|
logging.info(f'{info_prefix} ProxLB config version {proxlb_config["config_version"]} is fine. Required: {__config_version__}.')
|
|
|
|
|
|
def api_connect(proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v, proxmox_api_timeout):
|
|
""" Connect and authenticate to the Proxmox remote API. """
|
|
error_prefix = 'Error: [api-connection]:'
|
|
warn_prefix = 'Warning: [api-connection]:'
|
|
info_prefix = 'Info: [api-connection]:'
|
|
proxmox_api_ssl_v = bool(int(proxmox_api_ssl_v))
|
|
|
|
if not proxmox_api_ssl_v:
|
|
requests.packages.urllib3.disable_warnings()
|
|
logging.warning(f'{warn_prefix} API connection does not verify SSL certificate.')
|
|
|
|
proxmox_api_host = __api_connect_get_host(proxmox_api_host)
|
|
|
|
try:
|
|
api_object = proxmoxer.ProxmoxAPI(proxmox_api_host, user=proxmox_api_user, password=proxmox_api_pass, verify_ssl=proxmox_api_ssl_v, timeout=int(proxmox_api_timeout))
|
|
except proxmoxer.backends.https.AuthenticationError as proxmox_api_error:
|
|
logging.critical(f'{error_prefix} Provided credentials do not work: {proxmox_api_error}')
|
|
sys.exit(2)
|
|
except urllib3.exceptions.NameResolutionError:
|
|
logging.critical(f'{error_prefix} Could not resolve the given host: {proxmox_api_host}.')
|
|
sys.exit(2)
|
|
except requests.exceptions.ConnectTimeout:
|
|
logging.critical(f'{error_prefix} Connection time out to host: {proxmox_api_host}.')
|
|
sys.exit(2)
|
|
except requests.exceptions.SSLError:
|
|
logging.critical(f'{error_prefix} SSL certificate verification failed for host: {proxmox_api_host}.')
|
|
sys.exit(2)
|
|
|
|
logging.info(f'{info_prefix} API connection succeeded to host: {proxmox_api_host}.')
|
|
return api_object
|
|
|
|
|
|
def __api_connect_get_host(proxmox_api_host):
|
|
""" Validate if a list of API hosts got provided and pre-validate the hosts. """
|
|
info_prefix = 'Info: [api-connect-get-host]:'
|
|
proxmox_port = 8006
|
|
|
|
if ',' in proxmox_api_host:
|
|
logging.info(f'{info_prefix} Multiple hosts for API connection are given. Testing hosts for further usage.')
|
|
proxmox_api_host = proxmox_api_host.split(',')
|
|
|
|
# Validate all given hosts and check for responsive on Proxmox web port.
|
|
for host in proxmox_api_host:
|
|
logging.info(f'{info_prefix} Testing host {host} on port tcp/{proxmox_port}.')
|
|
reachable = __api_connect_test_ipv4_host(host, proxmox_port)
|
|
if reachable:
|
|
return host
|
|
else:
|
|
logging.info(f'{info_prefix} Using host {proxmox_api_host} on port tcp/{proxmox_port}.')
|
|
return proxmox_api_host
|
|
|
|
|
|
def __api_connect_test_ipv4_host(proxmox_api_host, port):
|
|
""" Validate if a given host on the IPv4 management address is reachable. """
|
|
error_prefix = 'Error: [api-connect-test-host]:'
|
|
info_prefix = 'Info: [api-connect-test-host]:'
|
|
proxmox_connection_timeout = 2
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(proxmox_connection_timeout)
|
|
logging.info(f'{info_prefix} Timeout for host {proxmox_api_host} is set to {proxmox_connection_timeout} seconds.')
|
|
result = sock.connect_ex((proxmox_api_host,port))
|
|
|
|
if result == 0:
|
|
sock.close()
|
|
logging.info(f'{info_prefix} Host {proxmox_api_host} is reachable on port tcp/{port}.')
|
|
return True
|
|
else:
|
|
sock.close()
|
|
logging.critical(f'{error_prefix} Host {proxmox_api_host} is unreachable on port tcp/{port}.')
|
|
return False
|
|
|
|
|
|
def __api_connect_test_ipv6_host(proxmox_api_host, port):
|
|
""" Validate if a given host on the IPv6 management address is reachable. """
|
|
error_prefix = 'Error: [api-connect-test-host]:'
|
|
info_prefix = 'Info: [api-connect-test-host]:'
|
|
proxmox_connection_timeout = 2
|
|
|
|
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
|
sock.settimeout(proxmox_connection_timeout)
|
|
logging.info(f'{info_prefix} Timeout for host {proxmox_api_host} is set to {proxmox_connection_timeout}.')
|
|
result = sock.connect_ex((proxmox_api_host,port))
|
|
|
|
if result == 0:
|
|
sock.close()
|
|
logging.info(f'{info_prefix} Host {proxmox_api_host} is reachable on port tcp/{port}.')
|
|
return True
|
|
else:
|
|
sock.close()
|
|
logging.critical(f'{error_prefix} Host {proxmox_api_host} is unreachable on port tcp/{port}.')
|
|
return False
|
|
|
|
|
|
def execute_rebalancing_only_by_master(api_object, master_only):
|
|
""" Validate if balancing should only be done by the cluster master. Afterwards, validate if this node is the cluster master. """
|
|
info_prefix = 'Info: [only-on-master-executor]:'
|
|
master_only = bool(int(master_only))
|
|
|
|
if bool(int(master_only)):
|
|
logging.info(f'{info_prefix} Master only rebalancing is defined. Starting validation.')
|
|
cluster_master_node = get_cluster_master(api_object)
|
|
cluster_master = validate_cluster_master(cluster_master_node)
|
|
return cluster_master, master_only
|
|
else:
|
|
logging.info(f'{info_prefix} No master only rebalancing is defined. Skipping validation.')
|
|
return False, master_only
|
|
|
|
|
|
def get_cluster_master(api_object):
|
|
""" Get the current master of the Proxmox cluster. """
|
|
error_prefix = 'Error: [cluster-master-getter]:'
|
|
info_prefix = 'Info: [cluster-master-getter]:'
|
|
|
|
try:
|
|
ha_status_object = api_object.cluster().ha().status().manager_status().get()
|
|
logging.info(f'{info_prefix} Master node: {ha_status_object.get("manager_status", None).get("master_node", None)}')
|
|
except urllib3.exceptions.NameResolutionError:
|
|
logging.critical(f'{error_prefix} Could not resolve the API.')
|
|
sys.exit(2)
|
|
except requests.exceptions.ConnectTimeout:
|
|
logging.critical(f'{error_prefix} Connection time out to API.')
|
|
sys.exit(2)
|
|
except requests.exceptions.SSLError:
|
|
logging.critical(f'{error_prefix} SSL certificate verification failed for API.')
|
|
sys.exit(2)
|
|
|
|
cluster_master = ha_status_object.get("manager_status", None).get("master_node", None)
|
|
|
|
if cluster_master:
|
|
return cluster_master
|
|
else:
|
|
logging.critical(f'{error_prefix} Could not obtain cluster master. Please check your configuration and ensure HA services in Proxmox are enabled. Stopping.')
|
|
sys.exit(2)
|
|
|
|
|
|
def validate_cluster_master(cluster_master):
|
|
""" Validate if the current execution node is the cluster master. """
|
|
info_prefix = 'Info: [cluster-master-validator]:'
|
|
|
|
node_executor_hostname = socket.gethostname()
|
|
logging.info(f'{info_prefix} Node executor hostname is: {node_executor_hostname}')
|
|
|
|
if node_executor_hostname != cluster_master:
|
|
logging.info(f'{info_prefix} {node_executor_hostname} is not the cluster master ({cluster_master}).')
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def get_node_statistics(api_object, ignore_nodes, maintenance_nodes):
|
|
""" Get statistics of cpu, memory and disk for each node in the cluster. """
|
|
info_prefix = 'Info: [node-statistics]:'
|
|
node_statistics = {}
|
|
ignore_nodes_list = ignore_nodes.split(',')
|
|
maintenance_nodes_list = maintenance_nodes.split(',')
|
|
|
|
for node in api_object.nodes.get():
|
|
if node['status'] == 'online':
|
|
node_statistics[node['node']] = {}
|
|
node_statistics[node['node']]['maintenance'] = False
|
|
node_statistics[node['node']]['ignore'] = False
|
|
node_statistics[node['node']]['cpu_total'] = node['maxcpu']
|
|
node_statistics[node['node']]['cpu_assigned'] = 0
|
|
node_statistics[node['node']]['cpu_assigned_percent'] = int((node_statistics[node['node']]['cpu_assigned']) / int(node_statistics[node['node']]['cpu_total']) * 100)
|
|
node_statistics[node['node']]['cpu_assigned_percent_last_run'] = 0
|
|
node_statistics[node['node']]['cpu_used'] = node['cpu']
|
|
node_statistics[node['node']]['cpu_free'] = (node['maxcpu']) - (node['cpu'] * node['maxcpu'])
|
|
node_statistics[node['node']]['cpu_free_percent'] = int((node_statistics[node['node']]['cpu_free']) / int(node['maxcpu']) * 100)
|
|
node_statistics[node['node']]['cpu_free_percent_last_run'] = 0
|
|
node_statistics[node['node']]['memory_total'] = node['maxmem']
|
|
node_statistics[node['node']]['memory_assigned'] = 0
|
|
node_statistics[node['node']]['memory_assigned_percent'] = int((node_statistics[node['node']]['memory_assigned']) / int(node_statistics[node['node']]['memory_total']) * 100)
|
|
node_statistics[node['node']]['memory_assigned_percent_last_run'] = 0
|
|
node_statistics[node['node']]['memory_used'] = node['mem']
|
|
node_statistics[node['node']]['memory_free'] = int(node['maxmem']) - int(node['mem'])
|
|
node_statistics[node['node']]['memory_free_percent'] = int((node_statistics[node['node']]['memory_free']) / int(node['maxmem']) * 100)
|
|
node_statistics[node['node']]['memory_free_percent_last_run'] = 0
|
|
node_statistics[node['node']]['disk_total'] = node['maxdisk']
|
|
node_statistics[node['node']]['disk_assigned'] = 0
|
|
node_statistics[node['node']]['disk_assigned_percent'] = int((node_statistics[node['node']]['disk_assigned']) / int(node_statistics[node['node']]['disk_total']) * 100)
|
|
node_statistics[node['node']]['disk_assigned_percent_last_run'] = 0
|
|
node_statistics[node['node']]['disk_used'] = node['disk']
|
|
node_statistics[node['node']]['disk_free'] = int(node['maxdisk']) - int(node['disk'])
|
|
node_statistics[node['node']]['disk_free_percent'] = int((node_statistics[node['node']]['disk_free']) / int(node['maxdisk']) * 100)
|
|
node_statistics[node['node']]['disk_free_percent_last_run'] = 0
|
|
logging.info(f'{info_prefix} Added node {node["node"]}.')
|
|
|
|
# Update node specific vars
|
|
if node['node'] in maintenance_nodes_list:
|
|
node_statistics[node['node']]['maintenance'] = True
|
|
logging.info(f'{info_prefix} Maintenance mode: {node["node"]} is set to maintenance mode.')
|
|
|
|
if node['node'] in ignore_nodes_list:
|
|
node_statistics[node['node']]['ignore'] = True
|
|
logging.info(f'{info_prefix} Ignore Node: {node["node"]} is set to be ignored.')
|
|
|
|
logging.info(f'{info_prefix} Created node statistics.')
|
|
return node_statistics
|
|
|
|
|
|
def get_vm_statistics(api_object, ignore_vms, balancing_type):
|
|
""" Get statistics of cpu, memory and disk for each vm in the cluster. """
|
|
info_prefix = 'Info: [vm-statistics]:'
|
|
warn_prefix = 'Warn: [vm-statistics]:'
|
|
vm_statistics = {}
|
|
ignore_vms_list = ignore_vms.split(',')
|
|
group_include = None
|
|
group_exclude = None
|
|
vm_ignore = None
|
|
vm_ignore_wildcard = False
|
|
_vm_details_storage_allowed = ['ide', 'nvme', 'scsi', 'virtio', 'sata', 'rootfs']
|
|
|
|
# Wildcard support: Initially validate if we need to honour
|
|
# any wildcards within the vm_ignore list.
|
|
vm_ignore_wildcard = __validate_ignore_vm_wildcard(ignore_vms)
|
|
|
|
for node in api_object.nodes.get():
|
|
|
|
# Get VM/CT objects only when the node is online and reachable.
|
|
if node['status'] == 'online':
|
|
|
|
# Add all virtual machines if type is vm or all.
|
|
if balancing_type == 'vm' or balancing_type == 'all':
|
|
for vm in api_object.nodes(node['node']).qemu.get():
|
|
|
|
# Get the VM tags from API.
|
|
vm_tags = __get_vm_tags(api_object, node, vm['vmid'], 'vm')
|
|
if vm_tags is not None:
|
|
group_include, group_exclude, vm_ignore = __get_proxlb_groups(vm_tags)
|
|
|
|
# Get wildcard match for VMs to ignore if a wildcard pattern was
|
|
# previously found. Wildcards may slow down the task when using
|
|
# many patterns in the ignore list. Therefore, run this only if
|
|
# a wildcard pattern was found. We also do not need to validate
|
|
# this if the VM is already being ignored by a defined tag.
|
|
if vm_ignore_wildcard and not vm_ignore:
|
|
vm_ignore = __check_vm_name_wildcard_pattern(vm['name'], ignore_vms_list)
|
|
|
|
if vm['status'] == 'running' and vm['name'] not in ignore_vms_list and not vm_ignore:
|
|
vm_statistics[vm['name']] = {}
|
|
vm_statistics[vm['name']]['group_include'] = group_include
|
|
vm_statistics[vm['name']]['group_exclude'] = group_exclude
|
|
vm_statistics[vm['name']]['cpu_total'] = vm['cpus']
|
|
vm_statistics[vm['name']]['cpu_used'] = vm['cpu']
|
|
vm_statistics[vm['name']]['memory_total'] = vm['maxmem']
|
|
vm_statistics[vm['name']]['memory_used'] = vm['mem']
|
|
vm_statistics[vm['name']]['disk_total'] = vm['maxdisk']
|
|
vm_statistics[vm['name']]['disk_used'] = vm['disk']
|
|
vm_statistics[vm['name']]['vmid'] = vm['vmid']
|
|
vm_statistics[vm['name']]['node_parent'] = node['node']
|
|
vm_statistics[vm['name']]['node_rebalance'] = node['node']
|
|
vm_statistics[vm['name']]['storage'] = {}
|
|
vm_statistics[vm['name']]['type'] = 'vm'
|
|
|
|
# Get disk details of the related object.
|
|
_vm_details = api_object.nodes(node['node']).qemu(vm['vmid']).config.get()
|
|
logging.info(f'{info_prefix} Getting disk information for vm {vm["name"]}.')
|
|
|
|
for vm_detail_key, vm_detail_value in _vm_details.items():
|
|
# vm_detail_key_validator = re.sub('\d+$', '', vm_detail_key)
|
|
vm_detail_key_validator = re.sub(r'\d+$', '', vm_detail_key)
|
|
|
|
if vm_detail_key_validator in _vm_details_storage_allowed:
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key] = {}
|
|
match = re.match(r'([^:]+):[^/]+/(.+),iothread=\d+,size=(\d+G)', _vm_details[vm_detail_key])
|
|
|
|
# Create an efficient match group and split the strings to assign them to the storage information.
|
|
if match:
|
|
_volume = match.group(1)
|
|
_disk_name = match.group(2)
|
|
_disk_size = match.group(3)
|
|
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['name'] = _disk_name
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['device_name'] = vm_detail_key
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['volume'] = _volume
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['storage_parent'] = _volume
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['storage_rebalance'] = _volume
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['size'] = _disk_size[:-1]
|
|
logging.info(f'{info_prefix} Added disk for {vm["name"]}: Name {_disk_name} on volume {_volume} with size {_disk_size}.')
|
|
else:
|
|
logging.info(f'{info_prefix} No (or unsupported) disk(s) for {vm["name"]} found.')
|
|
|
|
logging.info(f'{info_prefix} Added vm {vm["name"]}.')
|
|
|
|
# Add all containers if type is ct or all.
|
|
if balancing_type == 'ct' or balancing_type == 'all':
|
|
for vm in api_object.nodes(node['node']).lxc.get():
|
|
|
|
logging.warning(f'{warn_prefix} Rebalancing on LXC containers (CT) always requires them to shut down.')
|
|
logging.warning(f'{warn_prefix} {vm["name"]} is from type CT and cannot be live migrated!')
|
|
# Get the VM tags from API.
|
|
vm_tags = __get_vm_tags(api_object, node, vm['vmid'], 'ct')
|
|
if vm_tags is not None:
|
|
group_include, group_exclude, vm_ignore = __get_proxlb_groups(vm_tags)
|
|
|
|
# Get wildcard match for VMs to ignore if a wildcard pattern was
|
|
# previously found. Wildcards may slow down the task when using
|
|
# many patterns in the ignore list. Therefore, run this only if
|
|
# a wildcard pattern was found. We also do not need to validate
|
|
# this if the VM is already being ignored by a defined tag.
|
|
if vm_ignore_wildcard and not vm_ignore:
|
|
vm_ignore = __check_vm_name_wildcard_pattern(vm['name'], ignore_vms_list)
|
|
|
|
if vm['status'] == 'running' and vm['name'] not in ignore_vms_list and not vm_ignore:
|
|
vm_statistics[vm['name']] = {}
|
|
vm_statistics[vm['name']]['group_include'] = group_include
|
|
vm_statistics[vm['name']]['group_exclude'] = group_exclude
|
|
vm_statistics[vm['name']]['cpu_total'] = vm['cpus']
|
|
vm_statistics[vm['name']]['cpu_used'] = vm['cpu']
|
|
vm_statistics[vm['name']]['memory_total'] = vm['maxmem']
|
|
vm_statistics[vm['name']]['memory_used'] = vm['mem']
|
|
vm_statistics[vm['name']]['disk_total'] = vm['maxdisk']
|
|
vm_statistics[vm['name']]['disk_used'] = vm['disk']
|
|
vm_statistics[vm['name']]['vmid'] = vm['vmid']
|
|
vm_statistics[vm['name']]['node_parent'] = node['node']
|
|
vm_statistics[vm['name']]['node_rebalance'] = node['node']
|
|
vm_statistics[vm['name']]['storage'] = {}
|
|
vm_statistics[vm['name']]['type'] = 'ct'
|
|
|
|
# Get disk details of the related object.
|
|
_vm_details = api_object.nodes(node['node']).lxc(vm['vmid']).config.get()
|
|
logging.info(f'{info_prefix} Getting disk information for vm {vm["name"]}.')
|
|
|
|
for vm_detail_key, vm_detail_value in _vm_details.items():
|
|
# vm_detail_key_validator = re.sub('\d+$', '', vm_detail_key)
|
|
vm_detail_key_validator = re.sub(r'\d+$', '', vm_detail_key)
|
|
|
|
if vm_detail_key_validator in _vm_details_storage_allowed:
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key] = {}
|
|
match = re.match(r'(?P<volume>[^:]+):(?P<disk_name>[^,]+),size=(?P<disk_size>\S+)', _vm_details[vm_detail_key])
|
|
|
|
# Create an efficient match group and split the strings to assign them to the storage information.
|
|
if match:
|
|
_volume = match.group(1)
|
|
_disk_name = match.group(2)
|
|
_disk_size = match.group(3)
|
|
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['name'] = _disk_name
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['device_name'] = vm_detail_key
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['volume'] = _volume
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['storage_parent'] = _volume
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['storage_rebalance'] = _volume
|
|
vm_statistics[vm['name']]['storage'][vm_detail_key]['size'] = _disk_size[:-1]
|
|
logging.info(f'{info_prefix} Added disk for {vm["name"]}: Name {_disk_name} on volume {_volume} with size {_disk_size}.')
|
|
else:
|
|
logging.info(f'{info_prefix} No disks for {vm["name"]} found.')
|
|
|
|
logging.info(f'{info_prefix} Added vm {vm["name"]}.')
|
|
|
|
logging.info(f'{info_prefix} Created VM statistics.')
|
|
return vm_statistics
|
|
|
|
|
|
def update_node_statistics(node_statistics, vm_statistics):
|
|
""" Update node statistics by VMs statistics. """
|
|
info_prefix = 'Info: [node-update-statistics]:'
|
|
warn_prefix = 'Warning: [node-update-statistics]:'
|
|
|
|
for vm, vm_value in vm_statistics.items():
|
|
node_statistics[vm_value['node_parent']]['cpu_assigned'] = node_statistics[vm_value['node_parent']]['cpu_assigned'] + int(vm_value['cpu_total'])
|
|
node_statistics[vm_value['node_parent']]['cpu_assigned_percent'] = (node_statistics[vm_value['node_parent']]['cpu_assigned'] / node_statistics[vm_value['node_parent']]['cpu_total']) * 100
|
|
node_statistics[vm_value['node_parent']]['memory_assigned'] = node_statistics[vm_value['node_parent']]['memory_assigned'] + int(vm_value['memory_total'])
|
|
node_statistics[vm_value['node_parent']]['memory_assigned_percent'] = (node_statistics[vm_value['node_parent']]['memory_assigned'] / node_statistics[vm_value['node_parent']]['memory_total']) * 100
|
|
node_statistics[vm_value['node_parent']]['disk_assigned'] = node_statistics[vm_value['node_parent']]['disk_assigned'] + int(vm_value['disk_total'])
|
|
node_statistics[vm_value['node_parent']]['disk_assigned_percent'] = (node_statistics[vm_value['node_parent']]['disk_assigned'] / node_statistics[vm_value['node_parent']]['disk_total']) * 100
|
|
|
|
if node_statistics[vm_value['node_parent']]['cpu_assigned_percent'] > 99:
|
|
logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for CPU by {int(node_statistics[vm_value["node_parent"]]["cpu_assigned_percent"])}%.')
|
|
|
|
if node_statistics[vm_value['node_parent']]['memory_assigned_percent'] > 99:
|
|
logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for memory by {int(node_statistics[vm_value["node_parent"]]["memory_assigned_percent"])}%.')
|
|
|
|
if node_statistics[vm_value['node_parent']]['disk_assigned_percent'] > 99:
|
|
logging.warning(f'{warn_prefix} Node {vm_value["node_parent"]} is overprovisioned for disk by {int(node_statistics[vm_value["node_parent"]]["disk_assigned_percent"])}%.')
|
|
|
|
logging.info(f'{info_prefix} Updated node resource assignments by all VMs.')
|
|
logging.debug('node_statistics')
|
|
return node_statistics
|
|
|
|
|
|
def get_storage_statistics(api_object):
|
|
""" Get statistics of all storage in the cluster. """
|
|
info_prefix = 'Info: [storage-statistics]:'
|
|
storage_whitelist = ['nfs']
|
|
storage_statistics = {}
|
|
|
|
for node in api_object.nodes.get():
|
|
|
|
for storage in api_object.nodes(node['node']).storage.get():
|
|
|
|
# Only add enabled and active storage repositories that might be suitable for further
|
|
# storage balancing.
|
|
if storage['enabled'] and storage['active'] and storage['shared'] and storage['type'] in storage_whitelist:
|
|
storage_statistics[storage['storage']] = {}
|
|
storage_statistics[storage['storage']]['name'] = storage['storage']
|
|
storage_statistics[storage['storage']]['total'] = storage['total']
|
|
storage_statistics[storage['storage']]['used'] = storage['used']
|
|
storage_statistics[storage['storage']]['used_percent'] = storage['used'] / storage['total'] * 100
|
|
storage_statistics[storage['storage']]['used_percent_last_run'] = 0
|
|
storage_statistics[storage['storage']]['free'] = storage['total'] - storage['used']
|
|
storage_statistics[storage['storage']]['free_percent'] = storage_statistics[storage['storage']]['free'] / storage['total'] * 100
|
|
storage_statistics[storage['storage']]['used_fraction'] = storage['used_fraction']
|
|
storage_statistics[storage['storage']]['type'] = storage['type']
|
|
storage_statistics[storage['storage']]['content'] = storage['content']
|
|
storage_statistics[storage['storage']]['usage_type'] = ''
|
|
|
|
# Split the Proxmox returned values to a list and validate the supported
|
|
# types of the underlying storage for further migrations.
|
|
storage_content_list = storage['content'].split(',')
|
|
usage_ct = False
|
|
usage_vm = False
|
|
|
|
if 'rootdir' in storage_content_list:
|
|
usage_ct = True
|
|
storage_statistics[storage['storage']]['usage_type'] = 'ct'
|
|
logging.info(f'{info_prefix} Storage {storage["storage"]} support CTs.')
|
|
|
|
if 'images' in storage_content_list:
|
|
usage_vm = True
|
|
storage_statistics[storage['storage']]['usage_type'] = 'vm'
|
|
logging.info(f'{info_prefix} Storage {storage["storage"]} support VMs.')
|
|
|
|
if usage_ct and usage_vm:
|
|
storage_statistics[storage['storage']]['usage_type'] = 'all'
|
|
logging.info(f'{info_prefix} Updateing storage {storage["storage"]} support to CTs and VMs.')
|
|
|
|
logging.info(f'{info_prefix} Added storage {storage["storage"]}.')
|
|
|
|
logging.info(f'{info_prefix} Created storage statistics.')
|
|
return storage_statistics
|
|
|
|
|
|
def __validate_ignore_vm_wildcard(ignore_vms):
|
|
""" Validate if a wildcard is used for ignored VMs. """
|
|
if '*' in ignore_vms:
|
|
return True
|
|
|
|
|
|
def __check_vm_name_wildcard_pattern(vm_name, ignore_vms_list):
|
|
""" Validate if the VM name is in the ignore list pattern included. """
|
|
for ignore_vm in ignore_vms_list:
|
|
if '*' in ignore_vm:
|
|
if ignore_vm[:-1] in vm_name:
|
|
return True
|
|
|
|
|
|
def __get_vm_tags(api_object, node, vmid, balancing_type):
|
|
""" Get tags for a VM/CT for a given VMID. """
|
|
info_prefix = 'Info: [api-get-vm-tags]:'
|
|
|
|
if balancing_type == 'vm':
|
|
vm_config = api_object.nodes(node['node']).qemu(vmid).config.get()
|
|
|
|
if balancing_type == 'ct':
|
|
vm_config = api_object.nodes(node['node']).lxc(vmid).config.get()
|
|
|
|
if vm_config.get("tags", None) is None:
|
|
logging.info(f'{info_prefix} Got no VM/CT tag for VM {vm_config.get("name", None)} from API.')
|
|
else:
|
|
logging.info(f'{info_prefix} Got VM/CT tag {vm_config.get("tags", None)} for VM {vm_config.get("name", None)} from API.')
|
|
return vm_config.get('tags', None)
|
|
|
|
|
|
def __get_proxlb_groups(vm_tags):
|
|
""" Get ProxLB related include and exclude groups. """
|
|
info_prefix = 'Info: [api-get-vm-include-exclude-tags]:'
|
|
group_include = None
|
|
group_exclude = None
|
|
vm_ignore = None
|
|
|
|
group_list = re.split(";", vm_tags)
|
|
for group in group_list:
|
|
|
|
if group.startswith('plb_include_'):
|
|
logging.info(f'{info_prefix} Got PLB include group.')
|
|
group_include = group
|
|
|
|
if group.startswith('plb_affinity_'):
|
|
logging.info(f'{info_prefix} Got PLB include group.')
|
|
group_include = group
|
|
|
|
if group.startswith('plb_exclude_'):
|
|
logging.info(f'{info_prefix} Got PLB exclude group.')
|
|
group_exclude = group
|
|
|
|
if group.startswith('plb_antiaffinity_'):
|
|
logging.info(f'{info_prefix} Got PLB exclude group.')
|
|
group_exclude = group
|
|
|
|
if group.startswith('plb_ignore_vm'):
|
|
logging.info(f'{info_prefix} Got PLB ignore group.')
|
|
vm_ignore = True
|
|
|
|
return group_include, group_exclude, vm_ignore
|
|
|
|
|
|
def balancing_vm_calculations(balancing_method, balancing_mode, balancing_mode_option, node_statistics, vm_statistics, balanciness, app_args, rebalance, processed_vms):
|
|
""" Calculate re-balancing of VMs on present nodes across the cluster. """
|
|
info_prefix = 'Info: [rebalancing-vm-calculator]:'
|
|
|
|
# Validate for a supported balancing method, mode and if rebalancing is required.
|
|
__validate_balancing_method(balancing_method)
|
|
__validate_balancing_mode(balancing_mode)
|
|
__validate_vm_statistics(vm_statistics)
|
|
rebalance = __validate_balanciness(balanciness, balancing_method, balancing_mode, node_statistics)
|
|
|
|
# Run rebalancing calculations.
|
|
if rebalance:
|
|
# Get most used/assigned resources of the VM and the most free or less allocated node.
|
|
resources_vm_most_used, processed_vms = __get_most_used_resources_vm(balancing_method, balancing_mode, vm_statistics, processed_vms)
|
|
resources_node_most_free = __get_most_free_resources_node(balancing_method, balancing_mode, balancing_mode_option, node_statistics)
|
|
|
|
# If most used vm is on most free node then skip it and get another one.
|
|
while resources_vm_most_used[1]['node_parent'] == resources_node_most_free[0] and len(processed_vms) < len(vm_statistics):
|
|
resources_vm_most_used, processed_vms = __get_most_used_resources_vm(balancing_method, balancing_mode, vm_statistics, processed_vms)
|
|
logging.debug(f'{info_prefix} processed {len(processed_vms)} out of {len(vm_statistics)} vms.')
|
|
|
|
# Update resource statistics for VMs and nodes.
|
|
node_statistics, vm_statistics = __update_vm_resource_statistics(resources_vm_most_used, resources_node_most_free,
|
|
vm_statistics, node_statistics, balancing_method, balancing_mode)
|
|
|
|
# Start recursion until we do not have any needs to rebalance anymore.
|
|
balancing_vm_calculations(balancing_method, balancing_mode, balancing_mode_option, node_statistics, vm_statistics, balanciness, app_args, rebalance, processed_vms)
|
|
|
|
# If only best node argument set we simply return the next best node for VM
|
|
# and CT placement on the CLI and stop ProxLB.
|
|
if app_args.best_node:
|
|
logging.info(f'{info_prefix} Only best next node for new VM & CT placement requsted.')
|
|
best_next_node = __get_most_free_resources_node(balancing_method, balancing_mode, balancing_mode_option, node_statistics)
|
|
print(best_next_node[0])
|
|
logging.info(f'{info_prefix} Best next node for VM & CT placement: {best_next_node[0]}')
|
|
sys.exit(0)
|
|
|
|
logging.info(f'{info_prefix} Balancing calculations done.')
|
|
return node_statistics, vm_statistics
|
|
|
|
|
|
def balancing_vm_maintenance(proxlb_config, app_args, node_statistics, vm_statistics):
|
|
""" Calculate re-balancing of VMs that need to be moved away from maintenance nodes. """
|
|
info_prefix = 'Info: [rebalancing-maintenance-vm-calculator]:'
|
|
maintenance_nodes_list = proxlb_config['vm_maintenance_nodes'].split(',')
|
|
nodes_present = list(node_statistics.keys())
|
|
balancing_method = proxlb_config['vm_balancing_method']
|
|
balancing_mode = proxlb_config['vm_balancing_mode']
|
|
balancing_mode_option = proxlb_config['vm_balancing_mode_option']
|
|
|
|
# Merge maintenance nodes from config and cli args.
|
|
if app_args.maintenance is not None:
|
|
logging.info(f'{info_prefix} Maintenance nodes from CLI arg and config will be merged.')
|
|
maintenance_nodes_list = maintenance_nodes_list + app_args.maintenance.split(',')
|
|
|
|
# Ensure that only existing nodes in the cluster will be used.
|
|
if len(maintenance_nodes_list) > 1:
|
|
maintenance_nodes_list = set(maintenance_nodes_list) & set(nodes_present)
|
|
logging.info(f'{info_prefix} Maintenance mode for the following hosts defined: {maintenance_nodes_list}')
|
|
else:
|
|
logging.info(f'{info_prefix} No nodes for maintenance mode defined.')
|
|
return node_statistics, vm_statistics
|
|
|
|
for node_name in maintenance_nodes_list:
|
|
node_vms = list(filter(lambda item: item[0] if item[1]['node_parent'] == node_name else [], vm_statistics.items()))
|
|
# Update resource statistics for VMs and nodes.
|
|
for vm in node_vms:
|
|
resources_node_most_free = __get_most_free_resources_node(balancing_method, balancing_mode, balancing_mode_option, node_statistics)
|
|
node_statistics, vm_statistics = __update_vm_resource_statistics(vm, resources_node_most_free, vm_statistics, node_statistics, balancing_method, balancing_mode)
|
|
|
|
return node_statistics, vm_statistics
|
|
|
|
|
|
def __validate_balancing_method(balancing_method):
|
|
""" Validate for valid and supported balancing method. """
|
|
error_prefix = 'Error: [balancing-method-validation]:'
|
|
info_prefix = 'Info: [balancing-method-validation]:'
|
|
|
|
if balancing_method not in ['memory', 'disk', 'cpu']:
|
|
logging.error(f'{error_prefix} Invalid balancing method: {balancing_method}')
|
|
sys.exit(2)
|
|
else:
|
|
logging.info(f'{info_prefix} Valid balancing method: {balancing_method}')
|
|
|
|
|
|
def __validate_balancing_mode(balancing_mode):
|
|
""" Validate for valid and supported balancing mode. """
|
|
error_prefix = 'Error: [balancing-mode-validation]:'
|
|
info_prefix = 'Info: [balancing-mode-validation]:'
|
|
|
|
if balancing_mode not in ['used', 'assigned']:
|
|
logging.error(f'{error_prefix} Invalid balancing method: {balancing_mode}')
|
|
sys.exit(2)
|
|
else:
|
|
logging.info(f'{info_prefix} Valid balancing method: {balancing_mode}')
|
|
|
|
|
|
def __validate_vm_statistics(vm_statistics):
|
|
""" Validate for at least a single object of type CT/VM to rebalance. """
|
|
error_prefix = 'Error: [balancing-vm-stats-validation]:'
|
|
|
|
if len(vm_statistics) == 0:
|
|
logging.error(f'{error_prefix} Not a single CT/VM found in cluster.')
|
|
sys.exit(1)
|
|
|
|
|
|
def __validate_balanciness(balanciness, balancing_method, balancing_mode, node_statistics):
|
|
""" Validate for balanciness to ensure further rebalancing is needed. """
|
|
info_prefix = 'Info: [balanciness-validation]:'
|
|
node_resource_percent_list = []
|
|
node_assigned_percent_match = []
|
|
|
|
# Remap balancing mode to get the related values from nodes dict.
|
|
if balancing_mode == 'used':
|
|
node_resource_selector = 'free'
|
|
if balancing_mode == 'assigned':
|
|
node_resource_selector = 'assigned'
|
|
|
|
for node_name, node_info in node_statistics.items():
|
|
|
|
# Save information of nodes from current run to compare them in the next recursion.
|
|
if node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent_last_run'] == node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent']:
|
|
node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent_match'] = True
|
|
else:
|
|
node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent_match'] = False
|
|
# Update value to the current value of the recursion run.
|
|
node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent_last_run'] = node_statistics[node_name][f'{balancing_method}_{node_resource_selector}_percent']
|
|
|
|
# If all node resources are unchanged, the recursion can be left.
|
|
for key, value in node_statistics.items():
|
|
node_assigned_percent_match.append(value.get(f'{balancing_method}_{node_resource_selector}_percent_match', False))
|
|
|
|
if False not in node_assigned_percent_match:
|
|
return False
|
|
|
|
# Add node information to resource list.
|
|
if not node_statistics[node_name]['maintenance']:
|
|
node_resource_percent_list.append(int(node_info[f'{balancing_method}_{node_resource_selector}_percent']))
|
|
logging.debug(f'{info_prefix} Node: {node_name} with values: {node_info}')
|
|
|
|
# Create a sorted list of the delta + balanciness between the node resources.
|
|
node_resource_percent_list_sorted = sorted(node_resource_percent_list)
|
|
node_lowest_percent = node_resource_percent_list_sorted[0]
|
|
node_highest_percent = node_resource_percent_list_sorted[-1]
|
|
|
|
# Validate if the recursion should be proceeded for further rebalancing.
|
|
if (int(node_lowest_percent) + int(balanciness)) < int(node_highest_percent):
|
|
logging.info(f'{info_prefix} Rebalancing for {balancing_method} is needed. Highest usage: {int(node_highest_percent)}% | Lowest usage: {int(node_lowest_percent)}%.')
|
|
return True
|
|
else:
|
|
logging.info(f'{info_prefix} Rebalancing for {balancing_method} is not needed. Highest usage: {int(node_highest_percent)}% | Lowest usage: {int(node_lowest_percent)}%.')
|
|
return False
|
|
|
|
|
|
def __get_most_used_resources_vm(balancing_method, balancing_mode, vm_statistics, processed_vms):
|
|
""" Get and return the most used resources of a VM by the defined balancing method. """
|
|
info_prefix = 'Info: [get-most-used-resources-vm]:'
|
|
|
|
# Remap balancing mode to get the related values from nodes dict.
|
|
if balancing_mode == 'used':
|
|
vm_resource_selector = 'used'
|
|
if balancing_mode == 'assigned':
|
|
vm_resource_selector = 'total'
|
|
|
|
vm = max(vm_statistics.items(), key=lambda item: item[1][f'{balancing_method}_{vm_resource_selector}'] if item[0] not in processed_vms else -float('inf'))
|
|
processed_vms.append(vm[0])
|
|
|
|
logging.info(f'{info_prefix} {vm}')
|
|
return vm, processed_vms
|
|
|
|
|
|
def __get_most_free_resources_node(balancing_method, balancing_mode, balancing_mode_option, node_statistics):
|
|
""" Get and return the most free resources of a node by the defined balancing method. """
|
|
info_prefix = 'Info: [get-most-free-resources-nodes]:'
|
|
|
|
# Return the node information based on the balancing mode.
|
|
if balancing_mode == 'used' and balancing_mode_option == 'bytes':
|
|
node = max(node_statistics.items(), key=lambda item: item[1][f'{balancing_method}_free'] if not item[1]['maintenance'] else -float('inf'))
|
|
if balancing_mode == 'used' and balancing_mode_option == 'percent':
|
|
node = max(node_statistics.items(), key=lambda item: item[1][f'{balancing_method}_free_percent'] if not item[1]['maintenance'] else -float('inf'))
|
|
if balancing_mode == 'assigned':
|
|
node = min(node_statistics.items(), key=lambda item: item[1][f'{balancing_method}_assigned'] if not item[1]['maintenance'] and (item[1][f'{balancing_method}_assigned_percent'] > 0 or item[1][f'{balancing_method}_assigned_percent'] < 100) else -float('inf'))
|
|
|
|
logging.info(f'{info_prefix} {node}')
|
|
return node
|
|
|
|
|
|
def __update_vm_resource_statistics(resource_highest_used_resources_vm, resource_highest_free_resources_node, vm_statistics, node_statistics, balancing_method, balancing_mode):
|
|
""" Update VM and node resource statistics. """
|
|
info_prefix = 'Info: [rebalancing-resource-statistics-update]:'
|
|
|
|
if resource_highest_used_resources_vm[1]['node_parent'] != resource_highest_free_resources_node[0]:
|
|
vm_name = resource_highest_used_resources_vm[0]
|
|
vm_node_parent = resource_highest_used_resources_vm[1]['node_parent']
|
|
vm_node_rebalance = resource_highest_free_resources_node[0]
|
|
vm_resource_used = vm_statistics[resource_highest_used_resources_vm[0]][f'{balancing_method}_used']
|
|
vm_resource_total = vm_statistics[resource_highest_used_resources_vm[0]][f'{balancing_method}_total']
|
|
|
|
# Update dictionaries for new values
|
|
# Assign new rebalance node to vm
|
|
vm_statistics[vm_name]['node_rebalance'] = vm_node_rebalance
|
|
|
|
logging.info(f'{info_prefix} Moving {vm_name} from {vm_node_parent} to {vm_node_rebalance}')
|
|
|
|
# Recalculate values for nodes
|
|
## Add freed resources to old parent node
|
|
node_statistics[vm_node_parent][f'{balancing_method}_used'] = int(node_statistics[vm_node_parent][f'{balancing_method}_used']) - int(vm_resource_used)
|
|
node_statistics[vm_node_parent][f'{balancing_method}_free'] = int(node_statistics[vm_node_parent][f'{balancing_method}_free']) + int(vm_resource_used)
|
|
node_statistics[vm_node_parent][f'{balancing_method}_free_percent'] = int(int(node_statistics[vm_node_parent][f'{balancing_method}_free']) / int(node_statistics[vm_node_parent][f'{balancing_method}_total']) * 100)
|
|
node_statistics[vm_node_parent][f'{balancing_method}_assigned'] = int(node_statistics[vm_node_parent][f'{balancing_method}_assigned']) - int(vm_resource_total)
|
|
node_statistics[vm_node_parent][f'{balancing_method}_assigned_percent'] = int(int(node_statistics[vm_node_parent][f'{balancing_method}_assigned']) / int(node_statistics[vm_node_parent][f'{balancing_method}_total']) * 100)
|
|
|
|
## Removed newly allocated resources to new rebalanced node
|
|
node_statistics[vm_node_rebalance][f'{balancing_method}_used'] = int(node_statistics[vm_node_rebalance][f'{balancing_method}_used']) + int(vm_resource_used)
|
|
node_statistics[vm_node_rebalance][f'{balancing_method}_free'] = int(node_statistics[vm_node_rebalance][f'{balancing_method}_free']) - int(vm_resource_used)
|
|
node_statistics[vm_node_rebalance][f'{balancing_method}_free_percent'] = int(int(node_statistics[vm_node_rebalance][f'{balancing_method}_free']) / int(node_statistics[vm_node_rebalance][f'{balancing_method}_total']) * 100)
|
|
node_statistics[vm_node_rebalance][f'{balancing_method}_assigned'] = int(node_statistics[vm_node_rebalance][f'{balancing_method}_assigned']) + int(vm_resource_total)
|
|
node_statistics[vm_node_rebalance][f'{balancing_method}_assigned_percent'] = int(int(node_statistics[vm_node_rebalance][f'{balancing_method}_assigned']) / int(node_statistics[vm_node_rebalance][f'{balancing_method}_total']) * 100)
|
|
|
|
logging.info(f'{info_prefix} Updated VM and node statistics.')
|
|
return node_statistics, vm_statistics
|
|
|
|
|
|
def __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode):
|
|
""" Get VMs tags for include groups. """
|
|
info_prefix = 'Info: [rebalancing-tags-group-include]:'
|
|
tags_include_vms = {}
|
|
processed_vm = []
|
|
|
|
# Create groups of tags with belongings hosts.
|
|
for vm_name, vm_values in vm_statistics.items():
|
|
if vm_values.get('group_include', None):
|
|
if not tags_include_vms.get(vm_values['group_include'], None):
|
|
tags_include_vms[vm_values['group_include']] = [vm_name]
|
|
else:
|
|
tags_include_vms[vm_values['group_include']] = tags_include_vms[vm_values['group_include']] + [vm_name]
|
|
|
|
# Update the VMs to the corresponding node to their group assignments.
|
|
for group, vm_names in tags_include_vms.items():
|
|
# Do not take care of tags that have only a single host included.
|
|
if len(vm_names) < 2:
|
|
logging.info(f'{info_prefix} Only one host in group assignment.')
|
|
return node_statistics, vm_statistics
|
|
|
|
vm_node_rebalance = False
|
|
logging.info(f'{info_prefix} Create include groups of VM hosts.')
|
|
for vm_name in vm_names:
|
|
if vm_name not in processed_vm:
|
|
if not vm_node_rebalance:
|
|
vm_node_rebalance = vm_statistics[vm_name]['node_rebalance']
|
|
else:
|
|
_mocked_vm_object = (vm_name, vm_statistics[vm_name])
|
|
node_statistics, vm_statistics = __update_vm_resource_statistics(_mocked_vm_object, [vm_node_rebalance], vm_statistics, node_statistics, balancing_method, balancing_mode)
|
|
processed_vm.append(vm_name)
|
|
|
|
return node_statistics, vm_statistics
|
|
|
|
|
|
def __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode):
|
|
""" Get VMs tags for exclude groups. """
|
|
info_prefix = 'Info: [rebalancing-tags-group-exclude]:'
|
|
tags_exclude_vms = {}
|
|
|
|
# Create groups of tags with belongings hosts.
|
|
for vm_name, vm_values in vm_statistics.items():
|
|
if vm_values.get('group_exclude', None):
|
|
if not tags_exclude_vms.get(vm_values['group_exclude'], None):
|
|
tags_exclude_vms[vm_values['group_exclude']] = {}
|
|
tags_exclude_vms[vm_values['group_exclude']]['nodes_used'] = []
|
|
tags_exclude_vms[vm_values['group_exclude']]['nodes_used'].append(vm_statistics[vm_name]['node_rebalance'])
|
|
tags_exclude_vms[vm_values['group_exclude']]['vms'] = [vm_name]
|
|
else:
|
|
tags_exclude_vms[vm_values['group_exclude']]['vms'] = tags_exclude_vms[vm_values['group_exclude']]['vms'] + [vm_name]
|
|
tags_exclude_vms[vm_values['group_exclude']]['nodes_used'].append(vm_statistics[vm_name]['node_rebalance'])
|
|
|
|
# Evaluate all VMs assigned for each exclude groups and validate that they will be moved to another random node.
|
|
# However, if there are still more VMs than nodes we need to deal with it.
|
|
for exclude_group, group_values in tags_exclude_vms.items():
|
|
|
|
group_values['nodes_used'] = []
|
|
for vm in group_values['vms']:
|
|
|
|
proceed = True
|
|
counter = 0
|
|
|
|
while proceed:
|
|
|
|
if vm_statistics[vm]['node_rebalance'] in group_values['nodes_used']:
|
|
# Find another possible new target node if possible by randomly get any node from
|
|
# the cluster and validating if this is already used for this anti-affinity group.
|
|
logging.info(f'{info_prefix} Rebalancing of VM {vm} is needed due to anti-affinity group policy.')
|
|
random_node, counter, proceed = __get_random_node(counter, node_statistics, vm)
|
|
|
|
if random_node not in group_values['nodes_used']:
|
|
logging.info(f'{info_prefix} New random node {random_node} has not yet been used for the anti-affinity group {exclude_group}.')
|
|
group_values['nodes_used'].append(random_node)
|
|
logging.info(f'{info_prefix} New random node {random_node} has been added as an already used node to the anti-affinity group {exclude_group}.')
|
|
logging.info(f'{info_prefix} VM {vm} switched node from {vm_statistics[vm]["node_rebalance"]} to {random_node} due to the anti-affinity group {exclude_group}.')
|
|
vm_statistics[vm]['node_rebalance'] = random_node
|
|
|
|
else:
|
|
# Add the used node to the list for the anti-affinity group to ensure no
|
|
# other VM with the same anti-affinity group will use it (if possible).
|
|
logging.info(f'{info_prefix} Node {vm_statistics[vm]["node_rebalance"]} has been added as an already used node to the anti-affinity group {exclude_group}.')
|
|
logging.info(f'{info_prefix} No rebalancing for VM {vm} needed due to any anti-affinity group policies.')
|
|
group_values['nodes_used'].append(vm_statistics[vm]['node_rebalance'])
|
|
proceed = False
|
|
|
|
return node_statistics, vm_statistics
|
|
|
|
|
|
def __get_random_node(counter, node_statistics, vm):
|
|
""" Get a random node within the Proxmox cluster. """
|
|
warning_prefix = 'Warning: [random-node-getter]:'
|
|
info_prefix = 'Info: [random-node-getter]:'
|
|
|
|
counter = counter + 1
|
|
random_node = None
|
|
if counter < 30:
|
|
random_node = random.choice(list(node_statistics.keys()))
|
|
logging.info(f'{info_prefix} New random node {random_node} evaluated for vm {vm} in run {counter}.')
|
|
return random_node, counter, False
|
|
else:
|
|
logging.warning(f'{warning_prefix} Reached limit for random node evaluation for vm {vm}. Unable to find a suitable new node.')
|
|
return random_node, counter, False
|
|
|
|
|
|
def __wait_job_finalized(api_object, node_name, job_id, counter):
|
|
""" Wait for a job to be finalized. """
|
|
error_prefix = 'Error: [job-status-getter]:'
|
|
info_prefix = 'Info: [job-status-getter]:'
|
|
|
|
logging.info(f'{info_prefix} Getting job status for job {job_id}.')
|
|
task = api_object.nodes(node_name).tasks(job_id).status().get()
|
|
logging.info(f'{info_prefix} {task}')
|
|
|
|
if task['status'] == 'running':
|
|
logging.info(f'{info_prefix} Validating job {job_id} for the {counter} run.')
|
|
|
|
# Do not run for infinity this recursion and fail when reaching the limit.
|
|
if counter == 300:
|
|
logging.critical(f'{error_prefix} The job {job_id} on node {node_name} did not finished in time for migration.')
|
|
|
|
time.sleep(5)
|
|
counter = counter + 1
|
|
logging.info(f'{info_prefix} Revalidating job {job_id} in a next run.')
|
|
__wait_job_finalized(api_object, node_name, job_id, counter)
|
|
|
|
logging.info(f'{info_prefix} Job {job_id} for migration from {node_name} terminiated succesfully.')
|
|
|
|
|
|
def __run_vm_rebalancing(api_object, _vm_vm_statistics, app_args, parallel_migrations):
|
|
""" Run & execute the VM rebalancing via API. """
|
|
error_prefix = 'Error: [vm-rebalancing-executor]:'
|
|
info_prefix = 'Info: [vm-rebalancing-executor]:'
|
|
|
|
# Remove VMs/CTs that do not have a new node location.
|
|
vms_to_remove = [vm_name for vm_name, vm_info in _vm_vm_statistics.items() if 'node_rebalance' in vm_info and vm_info['node_rebalance'] == vm_info.get('node_parent')]
|
|
for vm_name in vms_to_remove:
|
|
del _vm_vm_statistics[vm_name]
|
|
|
|
if len(_vm_vm_statistics) > 0 and not app_args.dry_run:
|
|
for vm, value in _vm_vm_statistics.items():
|
|
|
|
try:
|
|
# Migrate type VM (live migration).
|
|
if value['type'] == 'vm':
|
|
logging.info(f'{info_prefix} Rebalancing VM {vm} from node {value["node_parent"]} to node {value["node_rebalance"]}.')
|
|
options = {'target': value['node_rebalance'], 'online': 1, 'with-local-disks': 1}
|
|
job_id = api_object.nodes(value['node_parent']).qemu(value['vmid']).migrate().post(**options)
|
|
|
|
# Migrate type CT (requires restart of container).
|
|
if value['type'] == 'ct':
|
|
logging.info(f'{info_prefix} Rebalancing CT {vm} from node {value["node_parent"]} to node {value["node_rebalance"]}.')
|
|
job_id = api_object.nodes(value['node_parent']).lxc(value['vmid']).migrate().post(target=value['node_rebalance'],restart=1)
|
|
|
|
except proxmoxer.core.ResourceException as error_resource:
|
|
logging.critical(f'{error_prefix} {error_resource}')
|
|
|
|
# Wait for migration to be finished unless running parallel migrations.
|
|
if not bool(int(parallel_migrations)):
|
|
logging.info(f'{info_prefix} Rebalancing will be performed sequentially.')
|
|
__wait_job_finalized(api_object, value['node_parent'], job_id, counter=1)
|
|
else:
|
|
logging.info(f'{info_prefix} Rebalancing will be performed parallely.')
|
|
|
|
else:
|
|
if app_args.dry_run:
|
|
logging.info(f'{info_prefix} Running in dry run mode. Not executing any balancing.')
|
|
else:
|
|
logging.info(f'{info_prefix} No rebalancing needed.')
|
|
|
|
return _vm_vm_statistics
|
|
|
|
|
|
def __run_storage_rebalancing(api_object, _storage_vm_statistics, app_args, parallel_migrations):
|
|
""" Run & execute the storage rebalancing via API. """
|
|
error_prefix = 'Error: [storage-rebalancing-executor]:'
|
|
info_prefix = 'Info: [storage-rebalancing-executor]:'
|
|
|
|
# Remove VMs/CTs that do not have a new storage location.
|
|
vms_to_remove = [vm_name for vm_name, vm_info in _storage_vm_statistics.items() if all(storage.get('storage_rebalance') == storage.get('storage_parent') for storage in vm_info.get('storage', {}).values())]
|
|
for vm_name in vms_to_remove:
|
|
del _storage_vm_statistics[vm_name]
|
|
|
|
if len(_storage_vm_statistics) > 0 and not app_args.dry_run:
|
|
for vm, value in _storage_vm_statistics.items():
|
|
for disk, disk_info in value['storage'].items():
|
|
|
|
if disk_info.get('storage_rebalance', None) is not None:
|
|
try:
|
|
# Migrate type VM (live migration).
|
|
logging.info(f'{info_prefix} Rebalancing storage of VM {vm} from node.')
|
|
job_id = api_object.nodes(value['node_parent']).qemu(value['vmid']).move_disk().post(disk=disk,storage=disk_info.get('storage_rebalance', None), delete=1)
|
|
|
|
except proxmoxer.core.ResourceException as error_resource:
|
|
logging.critical(f'{error_prefix} {error_resource}')
|
|
|
|
# Wait for migration to be finished unless running parallel migrations.
|
|
if not bool(int(parallel_migrations)):
|
|
logging.info(f'{info_prefix} Rebalancing will be performed sequentially.')
|
|
__wait_job_finalized(api_object, value['node_parent'], job_id, counter=1)
|
|
else:
|
|
logging.info(f'{info_prefix} Rebalancing will be performed parallely.')
|
|
|
|
else:
|
|
logging.info(f'{info_prefix} No rebalancing needed.')
|
|
|
|
return _storage_vm_statistics
|
|
|
|
|
|
def __create_json_output(vm_statistics, app_args):
|
|
""" Create a machine parsable json output of VM rebalance statitics. """
|
|
info_prefix = 'Info: [json-output-generator]:'
|
|
|
|
if app_args.json:
|
|
logging.info(f'{info_prefix} Printing json output of VM statistics.')
|
|
print(json.dumps(vm_statistics))
|
|
|
|
|
|
def __create_cli_output(vm_statistics, app_args):
|
|
""" Create output for CLI when running in dry-run mode. """
|
|
info_prefix_dry_run = 'Info: [cli-output-generator-dry-run]:'
|
|
info_prefix_run = 'Info: [cli-output-generator]:'
|
|
vm_to_node_list = []
|
|
|
|
if app_args.dry_run:
|
|
info_prefix = info_prefix_dry_run
|
|
logging.info(f'{info_prefix} Starting dry-run to rebalance vms to their new nodes.')
|
|
else:
|
|
info_prefix = info_prefix_run
|
|
logging.info(f'{info_prefix} Start rebalancing vms to their new nodes.')
|
|
|
|
vm_to_node_list.append(['VM', 'Current Node', 'Rebalanced Node', 'Current Storage', 'Rebalanced Storage', 'VM Type'])
|
|
for vm_name, vm_values in vm_statistics.items():
|
|
for disk, disk_values in vm_values['storage'].items():
|
|
vm_to_node_list.append([vm_name, vm_values['node_parent'], vm_values['node_rebalance'], f'{disk_values.get("storage_parent", "N/A")} ({disk_values.get("device_name", "N/A")})', f'{disk_values.get("storage_rebalance", "N/A")} ({disk_values.get("device_name", "N/A")})', vm_values['type']])
|
|
|
|
if len(vm_statistics) > 0:
|
|
logging.info(f'{info_prefix} Printing cli output of VM rebalancing.')
|
|
__print_table_cli(vm_to_node_list, app_args.dry_run)
|
|
else:
|
|
logging.info(f'{info_prefix} No rebalancing needed.')
|
|
|
|
|
|
def __print_table_cli(table, dry_run=False):
|
|
""" Pretty print a given table to the cli. """
|
|
info_prefix_dry_run = 'Info: [cli-output-generator-table-dryn-run]:'
|
|
info_prefix_run = 'Info: [cli-output-generator-table]:'
|
|
info_prefix = info_prefix_run
|
|
|
|
longest_cols = [
|
|
(max([len(str(row[i])) for row in table]) + 3)
|
|
for i in range(len(table[0]))
|
|
]
|
|
|
|
row_format = "".join(["{:>" + str(longest_col) + "}" for longest_col in longest_cols])
|
|
|
|
for row in table:
|
|
# Print CLI output when running in dry-run mode to make the user's life easier.
|
|
if dry_run:
|
|
info_prefix = info_prefix_dry_run
|
|
print(row_format.format(*row))
|
|
|
|
# Log all items in info mode.
|
|
logging.info(f'{info_prefix} {row_format.format(*row)}')
|
|
|
|
|
|
def run_rebalancing(api_object, vm_statistics, app_args, parallel_migrations, balancing_type):
|
|
""" Run rebalancing of vms to new nodes in cluster. """
|
|
_vm_vm_statistics = {}
|
|
_storage_vm_statistics = {}
|
|
|
|
if balancing_type == 'vm':
|
|
_vm_vm_statistics = copy.deepcopy(vm_statistics)
|
|
_vm_vm_statistics = __run_vm_rebalancing(api_object, _vm_vm_statistics, app_args, parallel_migrations)
|
|
return _vm_vm_statistics
|
|
|
|
if balancing_type == 'storage':
|
|
_storage_vm_statistics = copy.deepcopy(vm_statistics)
|
|
_storage_vm_statistics = __run_storage_rebalancing(api_object, _storage_vm_statistics, app_args, parallel_migrations)
|
|
return _storage_vm_statistics
|
|
|
|
|
|
def run_output_rebalancing(app_args, vm_output_statistics, storage_output_statistics):
|
|
""" Generate output of rebalanced resources. """
|
|
output_statistics = {**vm_output_statistics, **storage_output_statistics}
|
|
__create_json_output(output_statistics, app_args)
|
|
__create_cli_output(output_statistics, app_args)
|
|
|
|
|
|
def balancing_storage_calculations(storage_balancing_method, storage_statistics, vm_statistics, balanciness, rebalance, processed_vms):
|
|
""" Calculate re-balancing of storage on present datastores across the cluster. """
|
|
info_prefix = 'Info: [storage-rebalancing-calculator]:'
|
|
|
|
# Validate for a supported balancing method, mode and if rebalancing is required.
|
|
__validate_vm_statistics(vm_statistics)
|
|
rebalance = __validate_storage_balanciness(balanciness, storage_balancing_method, storage_statistics)
|
|
|
|
if rebalance:
|
|
vm_name, vm_disk_device = __get_most_used_resources_vm_storage(vm_statistics)
|
|
|
|
if vm_name not in processed_vms:
|
|
processed_vms.append(vm_name)
|
|
resources_storage_most_free = __get_most_free_storage(storage_balancing_method, storage_statistics)
|
|
|
|
# Update resource statistics for VMs and storage.
|
|
storage_statistics, vm_statistic = __update_resource_storage_statistics(storage_statistics, resources_storage_most_free, vm_statistics, vm_name, vm_disk_device)
|
|
|
|
# Start recursion until we do not have any needs to rebalance anymore.
|
|
balancing_storage_calculations(storage_balancing_method, storage_statistics, vm_statistics, balanciness, rebalance, processed_vms)
|
|
|
|
logging.info(f'{info_prefix} Balancing calculations done.')
|
|
return storage_statistics, vm_statistics
|
|
|
|
|
|
def __validate_storage_balanciness(balanciness, storage_balancing_method, storage_statistics):
|
|
""" Validate for balanciness of storage to ensure further rebalancing is needed. """
|
|
info_prefix = 'Info: [storage-balanciness-validation]:'
|
|
error_prefix = 'Error: [storage-balanciness-validation]:'
|
|
storage_resource_percent_list = []
|
|
storage_assigned_percent_match = []
|
|
|
|
# Validate for an allowed balancing method and define the storage resource selector.
|
|
if storage_balancing_method == 'disk_space':
|
|
logging.info(f'{info_prefix} Getting most free storage volume by disk size.')
|
|
storage_resource_selector = 'used'
|
|
elif storage_balancing_method == 'disk_io':
|
|
logging.error(f'{error_prefix} Getting most free storage volume by disk IO is not yet supported.')
|
|
sys.exit(2)
|
|
else:
|
|
logging.error(f'{error_prefix} Getting most free storage volume by disk IO is not yet supported.')
|
|
sys.exit(2)
|
|
|
|
# Obtain the metrics
|
|
for storage_name, storage_info in storage_statistics.items():
|
|
|
|
logging.info(f'{info_prefix} Validating storage: {storage_name} for balanciness for usage with: {storage_balancing_method}.')
|
|
# Save information of nodes from current run to compare them in the next recursion.
|
|
if storage_statistics[storage_name][f'{storage_resource_selector}_percent_last_run'] == storage_statistics[storage_name][f'{storage_resource_selector}_percent']:
|
|
storage_statistics[storage_name][f'{storage_resource_selector}_percent_match'] = True
|
|
else:
|
|
storage_statistics[storage_name][f'{storage_resource_selector}_percent_match'] = False
|
|
|
|
# Update value to the current value of the recursion run.
|
|
storage_statistics[storage_name][f'{storage_resource_selector}_percent_last_run'] = storage_statistics[storage_name][f'{storage_resource_selector}_percent']
|
|
|
|
# If all node resources are unchanged, the recursion can be left.
|
|
for key, value in storage_statistics.items():
|
|
storage_assigned_percent_match.append(value.get(f'{storage_resource_selector}_percent_match', False))
|
|
|
|
if False not in storage_assigned_percent_match:
|
|
return False
|
|
|
|
# Add node information to resource list.
|
|
storage_resource_percent_list.append(int(storage_info[f'{storage_resource_selector}_percent']))
|
|
logging.info(f'{info_prefix} Storage: {storage_name} with values: {storage_info}')
|
|
|
|
# Create a sorted list of the delta + balanciness between the node resources.
|
|
storage_resource_percent_list_sorted = sorted(storage_resource_percent_list)
|
|
storage_lowest_percent = storage_resource_percent_list_sorted[0]
|
|
storage_highest_percent = storage_resource_percent_list_sorted[-1]
|
|
|
|
# Validate if the recursion should be proceeded for further rebalancing.
|
|
if (int(storage_lowest_percent) + int(balanciness)) < int(storage_highest_percent):
|
|
logging.info(f'{info_prefix} Rebalancing for type "{storage_resource_selector}" of storage is needed. Highest usage: {int(storage_highest_percent)}% | Lowest usage: {int(storage_lowest_percent)}%.')
|
|
return True
|
|
else:
|
|
logging.info(f'{info_prefix} Rebalancing for type "{storage_resource_selector}" of storage is not needed. Highest usage: {int(storage_highest_percent)}% | Lowest usage: {int(storage_lowest_percent)}%.')
|
|
return False
|
|
|
|
|
|
def __get_most_used_resources_vm_storage(vm_statistics):
|
|
""" Get and return the most used disk of a VM by storage. """
|
|
info_prefix = 'Info: [get-most-used-disks-resources-vm]:'
|
|
|
|
# Get the biggest storage of a VM/CT. A VM/CT can hold multiple disks. Therefore, we need to iterate
|
|
# over all assigned disks to get the biggest one.
|
|
vm_object = sorted(
|
|
vm_statistics.items(),
|
|
key=lambda x: max(
|
|
(size_in_bytes(storage['size']) for storage in x[1].get('storage', {}).values() if 'size' in storage),
|
|
default=0
|
|
),
|
|
reverse=True
|
|
)
|
|
|
|
vm_object = vm_object[0]
|
|
vm_name = vm_object[0]
|
|
vm_disk_device = max(vm_object[1]['storage'], key=lambda x: int(vm_object[1]['storage'][x]['size']))
|
|
logging.info(f'{info_prefix} Got most used VM: {vm_name} with storage device: {vm_disk_device}.')
|
|
|
|
return vm_name, vm_disk_device
|
|
|
|
|
|
def __get_most_free_storage(storage_balancing_method, storage_statistics):
|
|
""" Get the storage with the most free space or IO, depending on the balancing mode. """
|
|
info_prefix = 'Info: [get-most-free-storage]:'
|
|
error_prefix = 'Error: [get-most-free-storage]:'
|
|
storage_volume = None
|
|
logging.info(f'{info_prefix} Starting to evaluate the most free storage volume.')
|
|
|
|
if storage_balancing_method == 'disk_space':
|
|
logging.info(f'{info_prefix} Getting most free storage volume by disk space.')
|
|
storage_volume = max(storage_statistics, key=lambda x: storage_statistics[x]['free_percent'])
|
|
|
|
if storage_balancing_method == 'disk_io':
|
|
logging.info(f'{info_prefix} Getting most free storage volume by disk IO.')
|
|
logging.error(f'{error_prefix} Getting most free storage volume by disk IO is not yet supported.')
|
|
sys.exit(2)
|
|
|
|
return storage_volume
|
|
|
|
|
|
def __update_resource_storage_statistics(storage_statistics, resources_storage_most_free, vm_statistics, vm_name, vm_disk_device):
|
|
""" Update VM and storage resource statistics. """
|
|
info_prefix = 'Info: [rebalancing-storage-resource-statistics-update]:'
|
|
current_storage = vm_statistics[vm_name]['storage'][vm_disk_device]['storage_parent']
|
|
current_storage_size = storage_statistics[current_storage]['free'] / (1024 ** 3)
|
|
rebalance_storage = resources_storage_most_free
|
|
rebalance_storage_size = storage_statistics[rebalance_storage]['free'] / (1024 ** 3)
|
|
vm_storage_size = vm_statistics[vm_name]['storage'][vm_disk_device]['size']
|
|
vm_storage_size_bytes = int(vm_storage_size) * 1024**3
|
|
|
|
# Assign new storage device to vm
|
|
logging.info(f'{info_prefix} Validating VM {vm_name} for potential storage rebalancing.')
|
|
if vm_statistics[vm_name]['storage'][vm_disk_device]['storage_rebalance'] == vm_statistics[vm_name]['storage'][vm_disk_device]['storage_parent']:
|
|
logging.info(f'{info_prefix} Setting VM {vm_name} from {current_storage} to {rebalance_storage} storage.')
|
|
vm_statistics[vm_name]['storage'][vm_disk_device]['storage_rebalance'] = resources_storage_most_free
|
|
else:
|
|
logging.info(f'{info_prefix} Setting VM {vm_name} from {current_storage} to {rebalance_storage} storage.')
|
|
|
|
# Recalculate values for storage
|
|
## Add freed resources to old parent storage device
|
|
storage_statistics[current_storage]['used'] = storage_statistics[current_storage]['used'] - vm_storage_size_bytes
|
|
storage_statistics[current_storage]['free'] = storage_statistics[current_storage]['free'] + vm_storage_size_bytes
|
|
storage_statistics[current_storage]['free_percent'] = (storage_statistics[current_storage]['free'] / storage_statistics[current_storage]['total']) * 100
|
|
storage_statistics[current_storage]['used_percent'] = (storage_statistics[current_storage]['used'] / storage_statistics[current_storage]['total']) * 100
|
|
logging.info(f'{info_prefix} Adding free space of {vm_storage_size}G to old storage with {current_storage_size}G. [free: {int(current_storage_size) + int(vm_storage_size)}G | {storage_statistics[current_storage]["free_percent"]}%]')
|
|
|
|
## Removed newly allocated resources to new rebalanced storage device
|
|
storage_statistics[rebalance_storage]['used'] = storage_statistics[rebalance_storage]['used'] + vm_storage_size_bytes
|
|
storage_statistics[rebalance_storage]['free'] = storage_statistics[rebalance_storage]['free'] - vm_storage_size_bytes
|
|
storage_statistics[rebalance_storage]['free_percent'] = (storage_statistics[rebalance_storage]['free'] / storage_statistics[rebalance_storage]['total']) * 100
|
|
storage_statistics[rebalance_storage]['used_percent'] = (storage_statistics[rebalance_storage]['used'] / storage_statistics[rebalance_storage]['total']) * 100
|
|
logging.info(f'{info_prefix} Adding used space of {vm_storage_size}G to new storage with {rebalance_storage_size}G. [free: {int(rebalance_storage_size) - int(vm_storage_size)}G | {storage_statistics[rebalance_storage]["free_percent"]}%]')
|
|
|
|
logging.info(f'{info_prefix} Updated VM and storage statistics.')
|
|
return storage_statistics, vm_statistics
|
|
|
|
|
|
def size_in_bytes(size_str):
|
|
size_unit = size_str[-1].upper()
|
|
size_value = float(size_str)
|
|
size_multipliers = {'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4}
|
|
return size_value * size_multipliers.get(size_unit, 1)
|
|
|
|
|
|
def balancing_vm_affinity_groups(node_statistics, vm_statistics, balancing_method, balancing_mode):
|
|
""" Enforce (anti-)affinity groups for further VM movement across the cluster. """
|
|
node_statistics, vm_statistics = __get_vm_tags_include_groups(vm_statistics, node_statistics, balancing_method, balancing_mode)
|
|
node_statistics, vm_statistics = __get_vm_tags_exclude_groups(vm_statistics, node_statistics, balancing_method, balancing_mode)
|
|
return node_statistics, vm_statistics
|
|
|
|
|
|
def main():
|
|
""" Run ProxLB for balancing VM workloads across a Proxmox cluster. """
|
|
vm_output_statistics = {}
|
|
storage_output_statistics = {}
|
|
|
|
# Initialize PAS.
|
|
initialize_logger('CRITICAL')
|
|
app_args = initialize_args()
|
|
if app_args.version:
|
|
proxlb_output_version()
|
|
config_path = initialize_config_path(app_args)
|
|
pre_validations(config_path)
|
|
|
|
# Parse global config.
|
|
proxlb_config = initialize_config_options(config_path)
|
|
pre_validations(config_path, proxlb_config)
|
|
|
|
# Overwrite logging handler with user defined log verbosity.
|
|
initialize_logger(proxlb_config['log_verbosity'], update_log_verbosity=True)
|
|
|
|
while True:
|
|
# API Authentication.
|
|
api_object = api_connect(proxlb_config['proxmox_api_host'], proxlb_config['proxmox_api_user'], proxlb_config['proxmox_api_pass'], proxlb_config['proxmox_api_ssl_v'], proxlb_config['proxmox_api_timeout'])
|
|
|
|
# Get master node of cluster and ensure that ProxLB is only performed on the
|
|
# cluster master node to avoid ongoing rebalancing.
|
|
cluster_master, master_only = execute_rebalancing_only_by_master(api_object, proxlb_config['master_only'])
|
|
|
|
# Validate daemon service and skip following tasks when not being the cluster master.
|
|
if not cluster_master and master_only:
|
|
validate_daemon(proxlb_config['daemon'], proxlb_config['schedule'])
|
|
continue
|
|
|
|
# Get metrics & statistics for vms and nodes.
|
|
if proxlb_config['vm_balancing_enable'] or proxlb_config['storage_balancing_enable'] or app_args.best_node:
|
|
node_statistics = get_node_statistics(api_object, proxlb_config['vm_ignore_nodes'], proxlb_config['vm_maintenance_nodes'])
|
|
vm_statistics = get_vm_statistics(api_object, proxlb_config['vm_ignore_vms'], proxlb_config['vm_balancing_type'])
|
|
node_statistics = update_node_statistics(node_statistics, vm_statistics)
|
|
# Obtaining metrics for the storage may take longer times and is not needed for VM/CT balancing.
|
|
# We can save time by skipping this when not really needed.
|
|
if proxlb_config['storage_balancing_enable']:
|
|
storage_statistics = get_storage_statistics(api_object)
|
|
|
|
# Execute VM/CT balancing sub-routines.
|
|
if proxlb_config['vm_balancing_enable'] or app_args.best_node:
|
|
node_statistics, vm_statistics = balancing_vm_calculations(proxlb_config['vm_balancing_method'], proxlb_config['vm_balancing_mode'], proxlb_config['vm_balancing_mode_option'], node_statistics, vm_statistics, proxlb_config['vm_balanciness'], app_args, rebalance=False, processed_vms=[])
|
|
node_statistics, vm_statistics = balancing_vm_maintenance(proxlb_config, app_args, node_statistics, vm_statistics)
|
|
node_statistics, vm_statistics = balancing_vm_affinity_groups(node_statistics, vm_statistics, proxlb_config['vm_balancing_method'], proxlb_config['vm_balancing_mode'],)
|
|
vm_output_statistics = run_rebalancing(api_object, vm_statistics, app_args, proxlb_config['vm_parallel_migrations'], 'vm')
|
|
|
|
# Execute storage balancing sub-routines.
|
|
if proxlb_config['storage_balancing_enable']:
|
|
storage_statistics, vm_statistics = balancing_storage_calculations(proxlb_config['storage_balancing_method'], storage_statistics, vm_statistics, proxlb_config['storage_balanciness'], rebalance=False, processed_vms=[])
|
|
storage_output_statistics = run_rebalancing(api_object, vm_statistics, app_args, proxlb_config['storage_parallel_migrations'], 'storage')
|
|
|
|
# Generate balancing output
|
|
if proxlb_config['vm_balancing_enable'] or proxlb_config['storage_balancing_enable']:
|
|
run_output_rebalancing(app_args, vm_output_statistics, storage_output_statistics)
|
|
|
|
# Validate for any errors.
|
|
post_validations()
|
|
|
|
# Validate daemon service.
|
|
validate_daemon(proxlb_config['daemon'], proxlb_config['schedule'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|