feature: Make the amount of parallel migrations configurable

Fixes: #241
This commit is contained in:
gyptazy
2025-06-05 16:09:41 +02:00
parent 617d0a3ae3
commit 7cb5a31b89
4 changed files with 89 additions and 51 deletions

View File

@@ -0,0 +1,2 @@
feature:
- Make the amount of parallel migrations configurable (by @gyptazy). [#241]

View File

@@ -255,6 +255,7 @@ The following options can be set in the configuration file `proxlb.yaml`:
| | enable | | True | `Bool` | Enables the guest balancing.|
| | enforce_affinity | | True | `Bool` | Enforcing affinity/anti-affinity rules but balancing might become worse. |
| | parallel | | False | `Bool` | If guests should be moved in parallel or sequentially.|
| | parallel_jobs | | 5 | `Int` | The amount if parallel jobs when migrating guests. (default: `5`)|
| | live | | True | `Bool` | If guests should be moved live or shutdown.|
| | with_local_disks | | True | `Bool` | If balancing of guests should include local disks.|
| | balance_types | | ['vm', 'ct'] | `List` | Defined the types of guests that should be honored. [values: `vm`, `ct`]|

View File

@@ -20,6 +20,9 @@ balancing:
enable: True
enforce_affinity: False
parallel: False
# If running parallel job, you can define
# the amount of prallel jobs (default: 5)
parallel_jobs: 1
live: True
with_local_disks: True
balance_types: ['vm', 'ct']

View File

@@ -12,6 +12,7 @@ __license__ = "GPL-3.0"
import proxmoxer
import time
from itertools import islice
from utils.logger import SystemdLogger
from typing import Dict, Any
@@ -48,34 +49,68 @@ class Balancing:
Initializes the Balancing class with the provided ProxLB data.
Args:
proxlb_data (dict): The data required for balancing VMs and CTs.
proxmox_api (object): The Proxmox API client instance used to interact with the Proxmox cluster.
proxlb_data (dict): A dictionary containing data related to the ProxLB load balancing configuration.
"""
for guest_name, guest_meta in proxlb_data["guests"].items():
def chunk_dict(data, size):
"""
Splits a dictionary into chunks of a specified size.
Args:
data (dict): The dictionary to be split into chunks.
size (int): The size of each chunk.
Yields:
dict: A chunk of the original dictionary with the specified size.
"""
logger.debug("Starting: chunk_dict.")
it = iter(data.items())
for chunk in range(0, len(data), size):
yield dict(islice(it, size))
# Check if the guest's target is not the same as the current node
if guest_meta["node_current"] != guest_meta["node_target"]:
# Check if the guest is not ignored and perform the balancing
# operation based on the guest type
if not guest_meta["ignore"]:
guest_id = guest_meta["id"]
guest_node_current = guest_meta["node_current"]
guest_node_target = guest_meta["node_target"]
# Validate if balancing should be performed in parallel or sequentially.
# If parallel balancing is enabled, set the number of parallel jobs.
parallel_jobs = proxlb_data["meta"]["balancing"].get("parallel_jobs", 5)
if not proxlb_data["meta"]["balancing"].get("parallel", False):
parallel_jobs = 1
logger.debug("Balancing: Parallel balancing is disabled. Running sequentially.")
else:
logger.debug(f"Balancing: Parallel balancing is enabled. Running with {parallel_jobs} parallel jobs.")
# VM Balancing
if guest_meta["type"] == "vm":
self.exec_rebalancing_vm(proxmox_api, proxlb_data, guest_name)
for chunk in chunk_dict(proxlb_data["guests"], parallel_jobs):
jobs_to_wait = []
# CT Balancing
elif guest_meta["type"] == "ct":
self.exec_rebalancing_ct(proxmox_api, proxlb_data, guest_name)
for guest_name, guest_meta in chunk.items():
# Check if the guest's target is not the same as the current node
if guest_meta["node_current"] != guest_meta["node_target"]:
# Check if the guest is not ignored and perform the balancing
# operation based on the guest type
if not guest_meta["ignore"]:
job_id = None
# VM Balancing
if guest_meta["type"] == "vm":
job_id = self.exec_rebalancing_vm(proxmox_api, proxlb_data, guest_name)
# CT Balancing
elif guest_meta["type"] == "ct":
job_id = self.exec_rebalancing_ct(proxmox_api, proxlb_data, guest_name)
# Just in case we get a new type of guest in the future
else:
logger.critical(f"Balancing: Got unexpected guest type: {guest_meta['type']}. Cannot proceed guest: {guest_meta['name']}.")
if job_id:
jobs_to_wait.append((guest_name, guest_meta["node_current"], job_id))
# Just in case we get a new type of guest in the future
else:
logger.critical(f"Balancing: Got unexpected guest type: {guest_meta['type']}. Cannot proceed guest: {guest_meta['name']}.")
logger.debug(f"Balancing: Guest {guest_name} is ignored and will not be rebalanced.")
else:
logger.debug(f"Balancing: Guest {guest_name} is ignored and will not be rebalanced.")
else:
logger.debug(f"Balancing: Guest {guest_name} is already on the target node {guest_meta['node_target']} and will not be rebalanced.")
logger.debug(f"Balancing: Guest {guest_name} is already on the target node {guest_meta['node_target']} and will not be rebalanced.")
# Wait for all jobs in the current chunk to complete
for guest_name, node, job_id in jobs_to_wait:
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, node, job_id)
def exec_rebalancing_vm(self, proxmox_api: any, proxlb_data: Dict[str, Any], guest_name: str) -> None:
"""
@@ -116,11 +151,11 @@ class Balancing:
try:
logger.debug(f"Balancing: Starting to migrate guest {guest_name} of type VM.")
job_id = proxmox_api.nodes(guest_node_current).qemu(guest_id).migrate().post(**migration_options)
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, guest_node_current, job_id)
except proxmoxer.core.ResourceException as proxmox_api_error:
logger.critical(f"Balancing: Failed to migrate guest {guest_name} of type VM due to some Proxmox errors. Please check if resource is locked or similar.")
logger.debug(f"Balancing: Failed to migrate guest {guest_name} of type VM due to some Proxmox errors: {proxmox_api_error}")
logger.debug("Finished: exec_rebalancing_vm.")
return job_id
def exec_rebalancing_ct(self, proxmox_api: any, proxlb_data: Dict[str, Any], guest_name: str) -> None:
"""
@@ -145,11 +180,11 @@ class Balancing:
try:
logger.debug(f"Balancing: Starting to migrate guest {guest_name} of type CT.")
job_id = proxmox_api.nodes(guest_node_current).lxc(guest_id).migrate().post(target=guest_node_target, restart=1)
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, guest_node_current, job_id)
except proxmoxer.core.ResourceException as proxmox_api_error:
logger.critical(f"Balancing: Failed to migrate guest {guest_name} of type CT due to some Proxmox errors. Please check if resource is locked or similar.")
logger.debug(f"Balancing: Failed to migrate guest {guest_name} of type CT due to some Proxmox errors: {proxmox_api_error}")
logger.debug("Finished: exec_rebalancing_ct.")
return job_id
def get_rebalancing_job_status(self, proxmox_api: any, proxlb_data: Dict[str, Any], guest_name: str, guest_current_node: str, job_id: int, retry_counter: int = 1) -> bool:
"""
@@ -167,35 +202,32 @@ class Balancing:
bool: True if the job completed successfully, False otherwise.
"""
logger.debug("Starting: get_rebalancing_job_status.")
# Parallel migrations can take a huge time and create a higher load, if not defined by an
# operator we will use a sequential mode by default
if not proxlb_data["meta"]["balancing"].get("parallel", False):
job = proxmox_api.nodes(guest_current_node).tasks(job_id).status().get()
job = proxmox_api.nodes(guest_current_node).tasks(job_id).status().get()
# Watch job id until it finalizes
if job["status"] == "running":
# Do not hammer the API while
# watching the job status
time.sleep(10)
retry_counter += 1
# Watch job id until it finalizes
if job["status"] == "running":
# Do not hammer the API while
# watching the job status
time.sleep(10)
retry_counter += 1
# Run recursion until we hit the soft-limit of maximum migration time for a guest
if retry_counter < proxlb_data["meta"]["balancing"].get("max_job_validation", 1800):
logger.debug(f"Balancing: Job ID {job_id} (guest: {guest_name}) for migration is still running... (Run: {retry_counter})")
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, guest_current_node, job_id, retry_counter)
else:
logger.warning(f"Balancing: Job ID {job_id} (guest: {guest_name}) for migration took too long. Please check manually.")
logger.debug("Finished: get_rebalancing_job_status.")
return False
# Run recursion until we hit the soft-limit of maximum migration time for a guest
if retry_counter < proxlb_data["meta"]["balancing"].get("max_job_validation", 1800):
logger.debug(f"Balancing: Job ID {job_id} (guest: {guest_name}) for migration is still running... (Run: {retry_counter})")
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, guest_current_node, job_id, retry_counter)
else:
logger.warning(f"Balancing: Job ID {job_id} (guest: {guest_name}) for migration took too long. Please check manually.")
logger.debug("Finished: get_rebalancing_job_status.")
return False
# Validate job output for errors when finished
if job["status"] == "stopped":
# Validate job output for errors when finished
if job["status"] == "stopped":
if job["exitstatus"] == "OK":
logger.debug(f"Balancing: Job ID {job_id} (guest: {guest_name}) was successfully.")
logger.debug("Finished: get_rebalancing_job_status.")
return True
else:
logger.critical(f"Balancing: Job ID {job_id} (guest: {guest_name}) went into an error! Please check manually.")
logger.debug("Finished: get_rebalancing_job_status.")
return False
if job["exitstatus"] == "OK":
logger.debug(f"Balancing: Job ID {job_id} (guest: {guest_name}) was successfully.")
logger.debug("Finished: get_rebalancing_job_status.")
return True
else:
logger.critical(f"Balancing: Job ID {job_id} (guest: {guest_name}) went into an error! Please check manually.")
logger.debug("Finished: get_rebalancing_job_status.")
return False