mirror of
https://github.com/gyptazy/ProxLB.git
synced 2026-04-06 04:41:58 +02:00
Merge pull request #243 from gyptazy/feature/241-make-amount-of-parallel-migrations-configureable
feature: Make the amount of parallel migrations configurable
This commit is contained in:
@@ -0,0 +1,2 @@
|
||||
feature:
|
||||
- Make the amount of parallel migrations configurable (by @gyptazy). [#241]
|
||||
@@ -255,6 +255,7 @@ The following options can be set in the configuration file `proxlb.yaml`:
|
||||
| | enable | | True | `Bool` | Enables the guest balancing.|
|
||||
| | enforce_affinity | | True | `Bool` | Enforcing affinity/anti-affinity rules but balancing might become worse. |
|
||||
| | parallel | | False | `Bool` | If guests should be moved in parallel or sequentially.|
|
||||
| | parallel_jobs | | 5 | `Int` | The amount if parallel jobs when migrating guests. (default: `5`)|
|
||||
| | live | | True | `Bool` | If guests should be moved live or shutdown.|
|
||||
| | with_local_disks | | True | `Bool` | If balancing of guests should include local disks.|
|
||||
| | balance_types | | ['vm', 'ct'] | `List` | Defined the types of guests that should be honored. [values: `vm`, `ct`]|
|
||||
|
||||
@@ -20,6 +20,9 @@ balancing:
|
||||
enable: True
|
||||
enforce_affinity: False
|
||||
parallel: False
|
||||
# If running parallel job, you can define
|
||||
# the amount of prallel jobs (default: 5)
|
||||
parallel_jobs: 1
|
||||
live: True
|
||||
with_local_disks: True
|
||||
balance_types: ['vm', 'ct']
|
||||
|
||||
@@ -12,6 +12,7 @@ __license__ = "GPL-3.0"
|
||||
|
||||
import proxmoxer
|
||||
import time
|
||||
from itertools import islice
|
||||
from utils.logger import SystemdLogger
|
||||
from typing import Dict, Any
|
||||
|
||||
@@ -48,34 +49,68 @@ class Balancing:
|
||||
Initializes the Balancing class with the provided ProxLB data.
|
||||
|
||||
Args:
|
||||
proxlb_data (dict): The data required for balancing VMs and CTs.
|
||||
proxmox_api (object): The Proxmox API client instance used to interact with the Proxmox cluster.
|
||||
proxlb_data (dict): A dictionary containing data related to the ProxLB load balancing configuration.
|
||||
"""
|
||||
for guest_name, guest_meta in proxlb_data["guests"].items():
|
||||
def chunk_dict(data, size):
|
||||
"""
|
||||
Splits a dictionary into chunks of a specified size.
|
||||
Args:
|
||||
data (dict): The dictionary to be split into chunks.
|
||||
size (int): The size of each chunk.
|
||||
Yields:
|
||||
dict: A chunk of the original dictionary with the specified size.
|
||||
"""
|
||||
logger.debug("Starting: chunk_dict.")
|
||||
it = iter(data.items())
|
||||
for chunk in range(0, len(data), size):
|
||||
yield dict(islice(it, size))
|
||||
|
||||
# Check if the guest's target is not the same as the current node
|
||||
if guest_meta["node_current"] != guest_meta["node_target"]:
|
||||
# Check if the guest is not ignored and perform the balancing
|
||||
# operation based on the guest type
|
||||
if not guest_meta["ignore"]:
|
||||
guest_id = guest_meta["id"]
|
||||
guest_node_current = guest_meta["node_current"]
|
||||
guest_node_target = guest_meta["node_target"]
|
||||
# Validate if balancing should be performed in parallel or sequentially.
|
||||
# If parallel balancing is enabled, set the number of parallel jobs.
|
||||
parallel_jobs = proxlb_data["meta"]["balancing"].get("parallel_jobs", 5)
|
||||
if not proxlb_data["meta"]["balancing"].get("parallel", False):
|
||||
parallel_jobs = 1
|
||||
logger.debug("Balancing: Parallel balancing is disabled. Running sequentially.")
|
||||
else:
|
||||
logger.debug(f"Balancing: Parallel balancing is enabled. Running with {parallel_jobs} parallel jobs.")
|
||||
|
||||
# VM Balancing
|
||||
if guest_meta["type"] == "vm":
|
||||
self.exec_rebalancing_vm(proxmox_api, proxlb_data, guest_name)
|
||||
for chunk in chunk_dict(proxlb_data["guests"], parallel_jobs):
|
||||
jobs_to_wait = []
|
||||
|
||||
# CT Balancing
|
||||
elif guest_meta["type"] == "ct":
|
||||
self.exec_rebalancing_ct(proxmox_api, proxlb_data, guest_name)
|
||||
for guest_name, guest_meta in chunk.items():
|
||||
|
||||
# Check if the guest's target is not the same as the current node
|
||||
if guest_meta["node_current"] != guest_meta["node_target"]:
|
||||
|
||||
# Check if the guest is not ignored and perform the balancing
|
||||
# operation based on the guest type
|
||||
if not guest_meta["ignore"]:
|
||||
job_id = None
|
||||
|
||||
# VM Balancing
|
||||
if guest_meta["type"] == "vm":
|
||||
job_id = self.exec_rebalancing_vm(proxmox_api, proxlb_data, guest_name)
|
||||
|
||||
# CT Balancing
|
||||
elif guest_meta["type"] == "ct":
|
||||
job_id = self.exec_rebalancing_ct(proxmox_api, proxlb_data, guest_name)
|
||||
|
||||
# Just in case we get a new type of guest in the future
|
||||
else:
|
||||
logger.critical(f"Balancing: Got unexpected guest type: {guest_meta['type']}. Cannot proceed guest: {guest_meta['name']}.")
|
||||
|
||||
if job_id:
|
||||
jobs_to_wait.append((guest_name, guest_meta["node_current"], job_id))
|
||||
|
||||
# Just in case we get a new type of guest in the future
|
||||
else:
|
||||
logger.critical(f"Balancing: Got unexpected guest type: {guest_meta['type']}. Cannot proceed guest: {guest_meta['name']}.")
|
||||
logger.debug(f"Balancing: Guest {guest_name} is ignored and will not be rebalanced.")
|
||||
else:
|
||||
logger.debug(f"Balancing: Guest {guest_name} is ignored and will not be rebalanced.")
|
||||
else:
|
||||
logger.debug(f"Balancing: Guest {guest_name} is already on the target node {guest_meta['node_target']} and will not be rebalanced.")
|
||||
logger.debug(f"Balancing: Guest {guest_name} is already on the target node {guest_meta['node_target']} and will not be rebalanced.")
|
||||
|
||||
# Wait for all jobs in the current chunk to complete
|
||||
for guest_name, node, job_id in jobs_to_wait:
|
||||
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, node, job_id)
|
||||
|
||||
def exec_rebalancing_vm(self, proxmox_api: any, proxlb_data: Dict[str, Any], guest_name: str) -> None:
|
||||
"""
|
||||
@@ -116,11 +151,11 @@ class Balancing:
|
||||
try:
|
||||
logger.debug(f"Balancing: Starting to migrate guest {guest_name} of type VM.")
|
||||
job_id = proxmox_api.nodes(guest_node_current).qemu(guest_id).migrate().post(**migration_options)
|
||||
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, guest_node_current, job_id)
|
||||
except proxmoxer.core.ResourceException as proxmox_api_error:
|
||||
logger.critical(f"Balancing: Failed to migrate guest {guest_name} of type VM due to some Proxmox errors. Please check if resource is locked or similar.")
|
||||
logger.debug(f"Balancing: Failed to migrate guest {guest_name} of type VM due to some Proxmox errors: {proxmox_api_error}")
|
||||
logger.debug("Finished: exec_rebalancing_vm.")
|
||||
return job_id
|
||||
|
||||
def exec_rebalancing_ct(self, proxmox_api: any, proxlb_data: Dict[str, Any], guest_name: str) -> None:
|
||||
"""
|
||||
@@ -145,11 +180,11 @@ class Balancing:
|
||||
try:
|
||||
logger.debug(f"Balancing: Starting to migrate guest {guest_name} of type CT.")
|
||||
job_id = proxmox_api.nodes(guest_node_current).lxc(guest_id).migrate().post(target=guest_node_target, restart=1)
|
||||
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, guest_node_current, job_id)
|
||||
except proxmoxer.core.ResourceException as proxmox_api_error:
|
||||
logger.critical(f"Balancing: Failed to migrate guest {guest_name} of type CT due to some Proxmox errors. Please check if resource is locked or similar.")
|
||||
logger.debug(f"Balancing: Failed to migrate guest {guest_name} of type CT due to some Proxmox errors: {proxmox_api_error}")
|
||||
logger.debug("Finished: exec_rebalancing_ct.")
|
||||
return job_id
|
||||
|
||||
def get_rebalancing_job_status(self, proxmox_api: any, proxlb_data: Dict[str, Any], guest_name: str, guest_current_node: str, job_id: int, retry_counter: int = 1) -> bool:
|
||||
"""
|
||||
@@ -167,35 +202,32 @@ class Balancing:
|
||||
bool: True if the job completed successfully, False otherwise.
|
||||
"""
|
||||
logger.debug("Starting: get_rebalancing_job_status.")
|
||||
# Parallel migrations can take a huge time and create a higher load, if not defined by an
|
||||
# operator we will use a sequential mode by default
|
||||
if not proxlb_data["meta"]["balancing"].get("parallel", False):
|
||||
job = proxmox_api.nodes(guest_current_node).tasks(job_id).status().get()
|
||||
job = proxmox_api.nodes(guest_current_node).tasks(job_id).status().get()
|
||||
|
||||
# Watch job id until it finalizes
|
||||
if job["status"] == "running":
|
||||
# Do not hammer the API while
|
||||
# watching the job status
|
||||
time.sleep(10)
|
||||
retry_counter += 1
|
||||
# Watch job id until it finalizes
|
||||
if job["status"] == "running":
|
||||
# Do not hammer the API while
|
||||
# watching the job status
|
||||
time.sleep(10)
|
||||
retry_counter += 1
|
||||
|
||||
# Run recursion until we hit the soft-limit of maximum migration time for a guest
|
||||
if retry_counter < proxlb_data["meta"]["balancing"].get("max_job_validation", 1800):
|
||||
logger.debug(f"Balancing: Job ID {job_id} (guest: {guest_name}) for migration is still running... (Run: {retry_counter})")
|
||||
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, guest_current_node, job_id, retry_counter)
|
||||
else:
|
||||
logger.warning(f"Balancing: Job ID {job_id} (guest: {guest_name}) for migration took too long. Please check manually.")
|
||||
logger.debug("Finished: get_rebalancing_job_status.")
|
||||
return False
|
||||
# Run recursion until we hit the soft-limit of maximum migration time for a guest
|
||||
if retry_counter < proxlb_data["meta"]["balancing"].get("max_job_validation", 1800):
|
||||
logger.debug(f"Balancing: Job ID {job_id} (guest: {guest_name}) for migration is still running... (Run: {retry_counter})")
|
||||
self.get_rebalancing_job_status(proxmox_api, proxlb_data, guest_name, guest_current_node, job_id, retry_counter)
|
||||
else:
|
||||
logger.warning(f"Balancing: Job ID {job_id} (guest: {guest_name}) for migration took too long. Please check manually.")
|
||||
logger.debug("Finished: get_rebalancing_job_status.")
|
||||
return False
|
||||
|
||||
# Validate job output for errors when finished
|
||||
if job["status"] == "stopped":
|
||||
# Validate job output for errors when finished
|
||||
if job["status"] == "stopped":
|
||||
|
||||
if job["exitstatus"] == "OK":
|
||||
logger.debug(f"Balancing: Job ID {job_id} (guest: {guest_name}) was successfully.")
|
||||
logger.debug("Finished: get_rebalancing_job_status.")
|
||||
return True
|
||||
else:
|
||||
logger.critical(f"Balancing: Job ID {job_id} (guest: {guest_name}) went into an error! Please check manually.")
|
||||
logger.debug("Finished: get_rebalancing_job_status.")
|
||||
return False
|
||||
if job["exitstatus"] == "OK":
|
||||
logger.debug(f"Balancing: Job ID {job_id} (guest: {guest_name}) was successfully.")
|
||||
logger.debug("Finished: get_rebalancing_job_status.")
|
||||
return True
|
||||
else:
|
||||
logger.critical(f"Balancing: Job ID {job_id} (guest: {guest_name}) went into an error! Please check manually.")
|
||||
logger.debug("Finished: get_rebalancing_job_status.")
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user