Files
ProxLB/proxlb
2024-07-07 16:46:48 +02:00

409 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
# ProxLB
# ProxLB (re)balances VM workloads across nodes in Proxmox clusters.
# ProxLB obtains current metrics from all nodes within the cluster for
# further auto balancing by memory, disk or cpu and rebalances the VMs
# over all available nodes in a cluster by having an equal resource usage.
# Copyright (C) 2024 Florian Paul Azim Hoberg @gyptazy <gyptazy@gyptazy.ch>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import configparser
import logging
import os
try:
import proxmoxer
_imports = True
except ImportError as error:
_imports = False
import requests
import sys
import time
import urllib3
# Constants
__appname__ = "ProxLB"
__version__ = "0.9.9"
__author__ = "Florian Paul Azim Hoberg <gyptazy@gyptazy.ch> @gyptazy"
__errors__ = False
# Classes
## Logging class
class SystemdHandler(logging.Handler):
""" Class to handle logging options. """
PREFIX = {
logging.CRITICAL: "<2> " + __appname__ + ": ",
logging.ERROR: "<3> " + __appname__ + ": ",
logging.WARNING: "<4> " + __appname__ + ": ",
logging.INFO: "<6> " + __appname__ + ": ",
logging.DEBUG: "<7> " + __appname__ + ": ",
logging.NOTSET: "<7 " + __appname__ + ": ",
}
def __init__(self, stream=sys.stdout):
self.stream = stream
logging.Handler.__init__(self)
def emit(self, record):
try:
msg = self.PREFIX[record.levelno] + self.format(record) + "\n"
self.stream.write(msg)
self.stream.flush()
except Exception:
self.handleError(record)
# Functions
def initialize_logger(log_level, log_handler):
""" Initialize ProxLB logging handler. """
info_prefix = 'Info: [logger]:'
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
root_logger.addHandler(SystemdHandler())
logging.info(f'{info_prefix} Logger got initialized.')
def pre_validations(config_path):
""" Run pre-validations as sanity checks. """
info_prefix = 'Info: [pre-validations]:'
__validate_imports()
__validate_config_file(config_path)
logging.info(f'{info_prefix} All pre-validations done.')
def post_validations():
""" Run post-validations as sanity checks. """
error_prefix = 'Error: [post-validations]:'
info_prefix = 'Info: [post-validations]:'
if __errors__:
logging.critical(f'{error_prefix} Not all post-validations succeeded. Please validate!')
else:
logging.info(f'{info_prefix} All post-validations succeeded.')
def validate_daemon(daemon, schedule):
""" Validate if ProxLB runs as a daemon. """
info_prefix = 'Info: [daemon]:'
if bool(int(daemon)):
logging.info(f'{info_prefix} Running in daemon mode. Next run in {schedule} hours.')
time.sleep(int(schedule) * 60)
else:
logging.info(f'{info_prefix} Not running in daemon mode. Quitting.')
sys.exit(0)
def __validate_imports():
""" Validate if all Python imports succeeded. """
error_prefix = 'Error: [python-imports]:'
info_prefix = 'Info: [python-imports]:'
if not _imports:
logging.critical(f'{error_prefix} Could not import all dependencies. Please install "proxmoxer".')
sys.exit(2)
else:
logging.info(f'{info_prefix} All required dependencies were imported.')
def __validate_config_file(config_path):
""" Validate if all Python imports succeeded. """
error_prefix = 'Error: [config]:'
info_prefix = 'Info: [config]:'
if not os.path.isfile(config_path):
logging.critical(f'{error_prefix} Could not find config file in: {config_path}.')
sys.exit(2)
else:
logging.info(f'{info_prefix} Configuration file loaded from: {config_path}.')
def initialize_args():
""" Initialize given arguments for ProxLB. """
argparser = argparse.ArgumentParser(description='ProxLB')
argparser.add_argument('-c', '--config', type=str, help='Path to config file.')
return argparser.parse_args()
def initialize_config_path(app_args):
""" Initialize path to ProxLB config file. """
info_prefix = 'Info: [config]:'
config_path = app_args.config
if app_args.config is None:
config_path = '/etc/proxlb/proxlb.conf'
logging.info(f'{info_prefix} No config file provided. Falling back to: {config_path}.')
else:
logging.info(f'{info_prefix} Using config file: {config_path}.')
return config_path
def initialize_config_options(config_path):
""" Read configuration from given config file for ProxLB. """
error_prefix = 'Error: [config]:'
info_prefix = 'Info: [config]:'
try:
config = configparser.ConfigParser()
config.read(config_path)
# Proxmox config
proxmox_api_host = config['proxmox']['api_host']
proxmox_api_user = config['proxmox']['api_user']
proxmox_api_pass = config['proxmox']['api_pass']
proxmox_api_ssl_v = config['proxmox']['verify_ssl']
# Balancing
balancing_method = config['balancing'].get('method', 'memory')
ignore_nodes = config['balancing'].get('ignore_nodes', None)
ignore_vms = config['balancing'].get('ignore_vms', None)
# Service
daemon = config['service'].get('daemon', 1)
schedule = config['service'].get('schedule', 24)
except configparser.NoSectionError:
logging.critical(f'{error_prefix} Could not find the required section.')
sys.exit(2)
except configparser.ParsingError:
logging.critical(f'{error_prefix} Unable to parse the config file.')
sys.exit(2)
except KeyError:
logging.critical(f'{error_prefix} Could not find the required options in config file.')
sys.exit(2)
logging.info(f'{info_prefix} Configuration file loaded.')
return proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v, balancing_method, \
ignore_nodes, ignore_vms, daemon, schedule
def api_connect(proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v):
""" Connect and authenticate to the Proxmox remote API. """
error_prefix = 'Error: [api-connection]:'
warn_prefix = 'Warning: [api-connection]:'
info_prefix = 'Info: [api-connection]:'
proxmox_api_ssl_v = bool(int(proxmox_api_ssl_v))
if not proxmox_api_ssl_v:
requests.packages.urllib3.disable_warnings()
logging.warning(f'{warn_prefix} API connection does not verify SSL certificate.')
try:
api_object = proxmoxer.ProxmoxAPI(proxmox_api_host, user=proxmox_api_user, password=proxmox_api_pass, verify_ssl=proxmox_api_ssl_v)
except urllib3.exceptions.NameResolutionError:
logging.critical(f'{error_prefix} Could not resolve the given host: {proxmox_api_host}.')
sys.exit(2)
except requests.exceptions.ConnectTimeout:
logging.critical(f'{error_prefix} Connection time out to host: {proxmox_api_host}.')
sys.exit(2)
except requests.exceptions.SSLError:
logging.critical(f'{error_prefix} SSL certificate verification failed for host: {proxmox_api_host}.')
sys.exit(2)
logging.info(f'{info_prefix} API connection succeeded to host: {proxmox_api_host}.')
return api_object
def get_node_statistics(api_object, ignore_nodes):
""" Get statistics of cpu, memory and disk for each node in the cluster. """
info_prefix = 'Info: [node-statistics]:'
node_statistics = {}
ignore_nodes_list = ignore_nodes.split(',')
for node in api_object.nodes.get():
if node['status'] == 'online' and node['node'] not in ignore_nodes_list:
node_statistics[node['node']] = {}
node_statistics[node['node']]['cpu_total'] = node['maxcpu']
node_statistics[node['node']]['cpu_used'] = node['cpu']
node_statistics[node['node']]['cpu_free'] = int(node['maxcpu']) - int(node['cpu'])
node_statistics[node['node']]['memory_total'] = node['maxmem']
node_statistics[node['node']]['memory_used'] = node['mem']
node_statistics[node['node']]['memory_free'] = int(node['maxmem']) - int(node['mem'])
node_statistics[node['node']]['disk_total'] = node['maxdisk']
node_statistics[node['node']]['disk_used'] = node['disk']
node_statistics[node['node']]['disk_free'] = int(node['maxdisk']) - int(node['disk'])
logging.info(f'{info_prefix} Added node {node["node"]}.')
logging.info(f'{info_prefix} Created node statistics.')
return node_statistics
def get_vm_statistics(api_object, ignore_vms):
""" Get statistics of cpu, memory and disk for each vm in the cluster. """
info_prefix = 'Info: [vm-statistics]:'
vm_statistics = {}
ignore_vms_list = ignore_vms.split(',')
for node in api_object.nodes.get():
for vm in api_object.nodes(node['node']).qemu.get():
if vm['status'] == 'running' and vm['name'] not in ignore_vms_list:
vm_statistics[vm['name']] = {}
vm_statistics[vm['name']]['cpu_total'] = vm['cpus']
vm_statistics[vm['name']]['cpu_used'] = vm['cpu']
vm_statistics[vm['name']]['memory_total'] = vm['maxmem']
vm_statistics[vm['name']]['memory_used'] = vm['mem']
vm_statistics[vm['name']]['disk_total'] = vm['maxdisk']
vm_statistics[vm['name']]['disk_used'] = vm['disk']
vm_statistics[vm['name']]['vmid'] = vm['vmid']
vm_statistics[vm['name']]['node_parent'] = node['node']
# Rebalancing node will be overwritten after calculations.
# If the vm stays on the node, it will be removed at a
# later time.
vm_statistics[vm['name']]['node_rebalance'] = node['node']
logging.info(f'{info_prefix} Added vm {vm["name"]}.')
logging.info(f'{info_prefix} Created VM statistics.')
return vm_statistics
def balancing_calculations(balancing_method, node_statistics, vm_statistics):
""" Calculate re-balancing of VMs on present nodes across the cluster. """
error_prefix = 'Error: [rebalancing-calculator]:'
info_prefix = 'Info: [rebalancing-calculator]:'
if balancing_method not in ['memory', 'disk', 'cpu']:
logging.error(f'{error_prefix} Invalid balancing method: {balancing_method}')
sys.exit(2)
return node_statistics, vm_statistics
sorted_vms = sorted(vm_statistics.items(), key=lambda item: item[1][f'{balancing_method}_used'], reverse=True)
logging.info(f'{info_prefix} Balancing will be done for {balancing_method} efficiency.')
total_resource_free = sum(node_info[f'{balancing_method}_free'] for node_info in node_statistics.values())
total_resource_used = sum(vm_info[f'{balancing_method}_used'] for vm_info in vm_statistics.values())
if total_resource_used > total_resource_free:
logging.error(f'{error_prefix} Not enough {balancing_method} resources to accommodate all VMs.')
return node_statistics, vm_statistics
# Rebalance in Round-robin initial distribution to ensure each node gets at least one VM.
nodes = list(node_statistics.items())
node_count = len(nodes)
node_index = 0
for vm_name, vm_info in sorted_vms:
assigned = False
for _ in range(node_count):
node_name, node_info = nodes[node_index]
if vm_info[f'{balancing_method}_used'] <= node_info[f'{balancing_method}_free']:
vm_info['node_rebalance'] = node_name
node_info[f'{balancing_method}_free'] -= vm_info[f'{balancing_method}_used']
assigned = True
node_index = (node_index + 1) % node_count
break
node_index = (node_index + 1) % node_count
if not assigned:
logging.error(f'{error_prefix} VM {vm_name} with {balancing_method} usage {vm_info[f"{balancing_method}_used"]} cannot fit into any node.')
# Calculate and rebalance remaining VMs using best-fit strategy.
while True:
unassigned_vms = [vm for vm in vm_statistics.items() if 'node_rebalance' not in vm[1]]
if not unassigned_vms:
break
for vm_name, vm_info in unassigned_vms:
best_node_name = None
best_node_info = None
min_resource_diff = float('inf')
for node_name, node_info in node_statistics.items():
resource_free = node_info[f'{balancing_method}_free']
resource_diff = resource_free - vm_info[f'{balancing_method}_used']
if resource_diff >= 0 and resource_diff < min_resource_diff:
min_resource_diff = resource_diff
best_node_name = node_name
best_node_info = node_info
if best_node_name is not None:
vm_info['node_rebalance'] = best_node_name
best_node_info[f'{balancing_method}_free'] -= vm_info[f'{balancing_method}_used']
else:
logging.error(f'{error_prefix} VM {vm_name} with {balancing_method} usage {vm_info[f"{balancing_method}_used"]} cannot fit into any node.')
# Remove VMs where 'node_rebalance' is the same as 'node_parent' since they
# do not need to be migrated.
vms_to_remove = [vm_name for vm_name, vm_info in vm_statistics.items() if 'node_rebalance' in vm_info and vm_info['node_rebalance'] == vm_info.get('node_parent')]
for vm_name in vms_to_remove:
del vm_statistics[vm_name]
logging.info(f'{info_prefix} Balancing calculations done.')
return node_statistics, vm_statistics
def __get_node_most_free_values(balancing_method, node_statistics):
""" Get and return the most free resources of a node by the defined balancing method. """
if balancing_method == 'memory':
return max(node_statistics.items(), key=lambda item: item[1]['memory_free'])
if balancing_method == 'disk':
return max(node_statistics.items(), key=lambda item: item[1]['disk_free'])
if balancing_method == 'cpu':
return max(node_statistics.items(), key=lambda item: item[1]['cpu_free'])
def run_vm_rebalancing(api_object, vm_statistics_rebalanced):
""" Run rebalancing of vms to new nodes in cluster. """
error_prefix = 'Error: [rebalancing-executor]:'
info_prefix = 'Info: [rebalancing-executor]:'
logging.info(f'{info_prefix} Starting to rebalance vms to their new nodes.')
for vm, value in vm_statistics_rebalanced.items():
try:
logging.info(f'{info_prefix} Rebalancing vm {vm} from node {value["node_parent"]} to node {value["node_rebalance"]}.')
api_object.nodes(value['node_parent']).qemu(value['vmid']).migrate().post(target=value['node_rebalance'],online=1)
except proxmoxer.core.ResourceException as error_resource:
__errors__ = True
logging.critical(f'{error_prefix} {error_resource}')
def main():
""" Run ProxLB for balancing VM workloads across a Proxmox cluster. """
# Initialize PAS.
initialize_logger('CRITICAL', 'SystemdHandler()')
app_args = initialize_args()
config_path = initialize_config_path(app_args)
pre_validations(config_path)
# Parse global config
proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v, balancing_method, \
ignore_nodes, ignore_vms, daemon, schedule = initialize_config_options(config_path)
while True:
# API Authentication.
api_object = api_connect(proxmox_api_host, proxmox_api_user, proxmox_api_pass, proxmox_api_ssl_v)
# Get metric & statistics for vms and nodes.
node_statistics = get_node_statistics(api_object, ignore_nodes)
vm_statistics = get_vm_statistics(api_object, ignore_vms)
# Calculate rebalancing of vms.
node_statistics_rebalanced, vm_statistics_rebalanced = balancing_calculations(balancing_method, node_statistics, vm_statistics)
# Rebalance vms to new nodes within the cluster.
run_vm_rebalancing(api_object, vm_statistics_rebalanced)
# Validate for any errors
post_validations()
# Validate daemon service
validate_daemon(daemon, schedule)
if __name__ == '__main__':
main()