feature: Add ProxLB Server

Fixes: #217
This commit is contained in:
Florian Paul Azim Hoberg
2025-05-06 09:28:37 +02:00
parent 1e096e1aae
commit e88318b023
8 changed files with 379 additions and 4 deletions

View File

@@ -246,6 +246,11 @@ The following options can be set in the configuration file `proxlb.yaml`:
| | timeout | | 10 | `Int` | Timeout for the Proxmox API in sec. |
| | retries | | 1 | `Int` | How often a connection attempt to the defined API host should be performed. |
| | wait_time | | 1 | `Int` | How many seconds should be waited before performing another connection attempt to the API host. |
| `proxlb_api` | | | | | |
| | enable | | True | `Bool` | Enables the ProxLB API (required for DPM & auto-patching).|
| | listen_address | | 0.0.0.0 | `Str` | Defines the listener of the ProxLB API Server. (default: `127.0.0.1`)|
| | port | | 8008 | `Int` | Defines the listen port of the ProxLB API Server. (default: `8008`)|
| | allowed_api_keys | | ['RatpmrqUbmXqV7kmcoNu9w4y4ParWyAbYgky94b9', ...] | `List` | List of allowed API Tokens. (default: `[]`)|
| `proxmox_cluster` | | | | | |
| | maintenance_nodes | | ['virt66.example.com'] | `List` | A list of Proxmox nodes that are defined to be in a maintenance. |
| | ignore_nodes | | [] | `List` | A list of Proxmox nodes that are defined to be ignored. |
@@ -284,6 +289,11 @@ proxmox_api:
# retries: 1
# wait_time: 1
proxlb_api:
enable: True
listen_address: 0.0.0.0
port: 8008
proxmox_cluster:
maintenance_nodes: ['virt66.example.com']
ignore_nodes: []

View File

@@ -11,6 +11,14 @@ proxmox_api:
# retries: 1
# wait_time: 1
proxlb_api:
enable: True
listen_address: 0.0.0.0
port: 8008
allowed_api_keys:
- "RatpmrqUbmXqV7kmcoNu9w4y4ParWyAbYgky94b9"
- "fzeHoWKzFyinpNJHVjAx9PakAPhsVdffrTpFyEjC"
proxmox_cluster:
maintenance_nodes: ['virt66.example.com']
ignore_nodes: []

View File

@@ -17,6 +17,7 @@ from utils.logger import SystemdLogger
from utils.cli_parser import CliParser
from utils.config_parser import ConfigParser
from utils.proxmox_api import ProxmoxApi
from models.proxlb_api import ProxlbApi
from models.nodes import Nodes
from models.guests import Guests
from models.groups import Groups
@@ -50,6 +51,14 @@ def main():
# Overwrite password after creating the API object
proxlb_config["proxmox_api"]["pass"] = "********"
# ProxLB API Server
proxlb_api_content = {}
proxlb_api = ProxlbApi(proxlb_config)
proxlb_api.server(proxlb_config)
import time
time.sleep(5)
while True:
# Get all required objects from the Proxmox cluster
meta = {"meta": proxlb_config}

View File

@@ -306,7 +306,7 @@ class Calculations:
logger.warning(f"Guest '{guest_name}' has a specific relationship defined to node: {proxlb_data['guests'][guest_name]['node_relationship']} but this node name is not known in the cluster!")
else:
logger.info(f"Guest '{guest_name}' does not have any specific node relationships.")
logger.debug(f"Guest '{guest_name}' does not have any specific node relationships.")
logger.debug("Finished: val_node_relationship.")

View File

@@ -21,7 +21,9 @@ __copyright__ = "Copyright (C) 2025 Florian Paul Azim Hoberg (@gyptazy)"
__license__ = "GPL-3.0"
import json
from typing import Dict, Any
from utils.helper import Helper
from utils.logger import SystemdLogger
logger = SystemdLogger()
@@ -87,6 +89,24 @@ class Nodes:
if Nodes.set_node_maintenance(proxlb_config, node["node"]):
nodes["nodes"][node["node"]]["maintenance"] = True
# Evaluate guest count on node
guests_vm = [
guest for guest in proxmox_api.nodes(node["node"]).qemu.get()
if guest.get('status') == 'running'
]
guests_ct = [
guest for guest in proxmox_api.nodes(node["node"]).lxc.get()
if guest.get('status') == 'running'
]
guests_vm = len(guests_vm)
guests_ct = len(guests_ct)
nodes["nodes"][node["node"]]["guest_count"] = guests_vm + guests_ct
# Add debug log of node
logger.debug(f"Added node: {nodes['nodes'][node['node']]}.")
logger.debug("Finished: get_nodes.")
return nodes
@@ -107,12 +127,52 @@ class Nodes:
"""
logger.debug("Starting: set_node_maintenance.")
if proxlb_config.get("proxmox_cluster", None).get("maintenance_nodes", None) is not None:
if len(proxlb_config.get("proxmox_cluster", {}).get("maintenance_nodes", [])) > 0:
# Only validate if we have more than a single node in our cluster
if len(proxlb_config.get("proxmox_cluster", {}).get("maintenance_nodes", [])) > 0:
# Evaluate maintenance mode by config
logger.debug("Evaluate maintenance mode by config.")
if proxlb_config.get("proxmox_cluster", None).get("maintenance_nodes", None) is not None:
if node_name in proxlb_config.get("proxmox_cluster", {}).get("maintenance_nodes", []):
logger.warning(f"Node: {node_name} has been set to maintenance mode.")
return True
# Evaluate maintenance mode by ProxLB API
logger.debug("Evaluate maintenance mode by ProxLB API.")
if proxlb_config.get("proxlb_api", {}).get("enable", False):
logger.debug("ProxLB API is active.")
proxlb_api_listener = proxlb_config.get("proxlb_api", {}).get("listen_address", "127.0.0.1")
proxlb_api_port = proxlb_config.get("proxlb_api", {}).get("port", 8008)
try:
api_node_status = Helper.http_client_get(f"http://{proxlb_api_listener}:{proxlb_api_port}/nodes/{node_name}", show_errors=False)
api_node_status = json.loads(api_node_status)
except:
pass
# Set to maintenance when DPM or node patching is active and the
# node has not been released yet
if isinstance(api_node_status, dict):
logger.debug(f"Information for Node: {node_name} in ProxLB API available.")
if api_node_status.get("mode_dpm") or api_node_status.get("mode_patch"):
logger.debug(f"Node: {node_name} is defined for DPM or node-patching.")
if not api_node_status.get("processed"):
logger.debug(f"Node: {node_name} has not been processed. Setting to maintenance.")
if not api_node_status.get("release"):
logger.debug(f"Node: {node_name} has not been released. Waiting until node has been released. Setting to maintenance.")
return True
else:
logger.debug(f"Node: {node_name} has been released. Removing maintenance.")
else:
logger.debug(f"Node: {node_name} has been processed. Removing maintenance.")
else:
logger.debug(f"Node: {node_name} is not defined for DPM or node-patching.")
else:
logger.debug("ProxLB API is not active. Skipping ProxLB API validations.")
logger.debug("Finished: set_node_maintenance.")
@staticmethod

256
proxlb/models/proxlb_api.py Normal file
View File

@@ -0,0 +1,256 @@
"""
ProxLB API
"""
__author__ = "Florian Paul Azim Hoberg <gyptazy>"
__copyright__ = "Copyright (C) 2025 Florian Paul Azim Hoberg (@gyptazy)"
__license__ = "GPL-3.0"
import threading
try:
import uvicorn
UVICORN_PRESENT = True
except ImportError:
UVICORN_PRESENT = False
import utils.version
try:
from fastapi import FastAPI, HTTPException, Depends, Header, HTTPException, status
FASTAPI_PRESENT = True
except ImportError:
FASTAPI_PRESENT = False
from utils.helper import Helper
from utils.logger import SystemdLogger
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from typing import Dict, Any
logger = SystemdLogger()
if not UVICORN_PRESENT:
print("Error: The required library 'uvicorn' is not installed. ProxLB API cannot be used.")
if not FASTAPI_PRESENT:
print("Error: The required library 'fastapi' is not installed. ProxLB API cannot be used.")
class ProxlbApi:
"""
ProxLB API
"""
def __init__(self, proxlb_config):
"""
Initializes the API class with the ProxLB API object.
"""
logger.debug("Starting: ProxLB API.")
# ProxLB API Object
self.proxlb_api = FastAPI()
# ProxLB API Model
class Node(BaseModel):
name: str = ""
wol_mac: str = ""
guest_count: int = 0
mode_patch: bool = False
mode_dpm: bool = False
release: bool = False
processed: bool = False
# ProxLB API Data
nodes = {}
# ProxLB API Routes
@self.proxlb_api.get("/")
async def root():
return {"Application": "ProxLB", "version": f"{utils.version.__version__}", "status": "healthy"}
@self.proxlb_api.get("/status")
async def status():
self.api_exec_status()
return {"message": "status"}
@self.proxlb_api.get("/reboot")
async def reboot():
self.api_exec_reboot()
return {"message": "reboot"}
@self.proxlb_api.get("/shutdown")
async def shutdown():
self.api_exec_shutdown()
return {"message": "shutdown"}
@self.proxlb_api.get("/wol")
async def wol():
self.api_exec_wol()
return {"message": "wol"}
@self.proxlb_api.get("/update")
async def update():
self.api_exec_update()
return {"message": "update"}
@self.proxlb_api.get("/nodes", dependencies=[Depends(self.get_api_key_dependency(proxlb_config))])
async def get_node_items():
keys = []
for k, v in nodes.items():
keys.append(k)
return keys
@self.proxlb_api.get("/nodes/{item_id}", response_model=Node)
async def get_node_items(item_id: str):
if item_id not in nodes:
raise HTTPException(status_code=404, detail=f"Node with ID '{item_id}' not found")
return nodes[item_id]
@self.proxlb_api.patch("/nodes/{item_id}", response_model=Node, dependencies=[Depends(self.get_api_key_dependency(proxlb_config))])
async def update_node_items(item_id: str, item: Node):
stored_item_data = nodes[item_id]
stored_item_model = Node(**stored_item_data)
update_data = item.dict(exclude_unset=True)
updated_item = stored_item_model.copy(update=update_data)
nodes[item_id] = jsonable_encoder(updated_item)
return updated_item
@self.proxlb_api.post("/nodes/{item_id}", response_model=Node, dependencies=[Depends(self.get_api_key_dependency(proxlb_config))])
async def set_node_items(item_id: str, item: Node):
if item_id in nodes:
raise HTTPException(status_code=400, detail=f"Node: {item_id} already exists.")
nodes[item_id] = jsonable_encoder(item)
return item
logger.debug("Finalized: ProxLB API.")
# ProxLB API Server API Key validation
def get_api_key_dependency(self, proxlb_config: Dict[str, str]):
def get_api_key(auth_header: str = Header(..., alias="Authorization")):
auth_header = auth_header.replace("Bearer ", "")
if auth_header not in proxlb_config.get("proxlb_api", {}).get("allowed_api_keys", []):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Error: Invalid or missing API Key.",
)
return get_api_key
# ProxLB API Context Path Actions
def api_exec_reboot(self):
logger.debug("Rebooting system.")
print("Rebooting system.")
def api_exec_shutdown(self):
logger.debug("Shutting down system.")
print("Shutting down system.")
def api_exec_wol(self):
logger.debug("Sending WOL signal.")
print("Sending WOL signal.")
def api_exec_update(self):
logger.debug("Updating system.")
print("Updating system.")
# ProxLB API Uvicorn Server
def server(self, proxlb_config):
"""
"""
def exec_api_server():
"""
"""
# Define a custom formatter to match ProxLB logging syntax
log_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"()": "uvicorn.logging.DefaultFormatter",
"fmt": "%(asctime)s - ProxLB API - INFO - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S,100",
},
"access": {
"()": "uvicorn.logging.AccessFormatter",
"fmt": "%(asctime)s - ProxLB API - ACCESS - %(client_addr)s - \"%(request_line)s\" %(status_code)s",
"datefmt": "%Y-%m-%d %H:%M:%S,100",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
"access": {
"formatter": "access",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"loggers": {
"uvicorn": {"handlers": ["default"], "level": "INFO"},
"uvicorn.error": {"handlers": ["default"], "level": "INFO", "propagate": True},
"uvicorn.access": {"handlers": ["access"], "level": "INFO", "propagate": False},
},
}
# Run ProxLB API via Uvicorn with custom log formatter
proxlb_api_listener = proxlb_config.get("proxlb_api", {}).get("listen_address", "127.0.0.1")
proxlb_api_port = proxlb_config.get("proxlb_api", {}).get("port", 8008)
proxlb_api_log_verbosity = proxlb_config.get("service", {}).get("log_level", "info")
logger.debug(f"Starting ProxLB API Server on {proxlb_api_listener}:{proxlb_api_port}.")
uvicorn.run(self.proxlb_api, host=proxlb_api_listener, port=proxlb_api_port, log_level=proxlb_api_log_verbosity.lower(), log_config=log_config)
# Execute the Uvicorn in a threaded action to avoid blocking
if proxlb_config.get("proxlb_api", {}).get("enable", False):
logger.debug("ProxLB API Server is enabled. Starting API server...")
server_thread = threading.Thread(target=exec_api_server, daemon=True)
server_thread.start()
else:
logger.debug("ProxLB API Server is not enabled.")
### Example:
# INSERT
# curl -X POST "http://127.0.0.1:8008/nodes/virt01" -H "Content-Type: application/json" -H "Authorization: Bearer RatpmrqUbmXqV7kmcoNu9w4y4ParWyAbYgky94b9" -d '{
# "name": "virt01",
# "wol_mac": "virt01",
# "mode_patch": false,
# "mode_dpm": false,
# "released": false,
# "processed": false
# }'
# # GET
# curl -X GET "http://127.0.0.1:8008/status" \
# -H "Authorization: Bearer RatpmrqUbmXqV7kmcoNu9w4y4ParWyAbYgky94b9"
# curl -X POST "http://127.0.0.1:8008/nodes/virt01" -H "Content-Type: application/json" -H "Authorization: Bearer RatpmrqUbmXqV7kmcoNu9w4y4ParWyAbYgky94b9" -d '{
# "name": "virt01",
# "wol_mac": "virt01",
# "mode_patch": false,
# "mode_dpm": false,
# "released": false,
# "processed": false
# }'
# curl -X POST "http://127.0.0.1:8008/nodes/virt02" -H "Content-Type: application/json" -H "Authorization: Bearer RatpmrqUbmXqV7kmcoNu9w4y4ParWyAbYgky94b9" -d '{
# "name": "virt02",
# "wol_mac": "virt02",
# "mode_patch": false,
# "mode_dpm": false,
# "released": false,
# "processed": false
# }'
# curl -X POST "http://127.0.0.1:8008/nodes/virt03" -H "Content-Type: application/json" -H "Authorization: Bearer RatpmrqUbmXqV7kmcoNu9w4y4ParWyAbYgky94b9" -d '{
# "name": "virt03",
# "wol_mac": "virt03",
# "mode_patch": false,
# "mode_dpm": false,
# "released": false,
# "processed": false
# }'

View File

@@ -9,6 +9,7 @@ __license__ = "GPL-3.0"
import json
import urllib
import uuid
import sys
import time
@@ -162,3 +163,34 @@ class Helper:
print(json.dumps(filtered_data, indent=4))
logger.debug("Finished: print_json.")
@staticmethod
def http_client_get(uri: str, show_errors=True) -> str:
"""
Receives the content of a GET request from a given URI.
Parameters:
uri (str): The URI to get the content from.
Returns:
str: The response content.
"""
logger.debug("Starting: http_client_get.")
http_charset = "utf-8"
http_headers = {
"User-Agent": "ProxLB API client/1.0"
}
http_request = urllib.request.Request(uri, headers=http_headers, method="GET")
try:
logger.debug(f"Get http client information from {uri}.")
with urllib.request.urlopen(http_request) as response:
http_client_content = response.read().decode(http_charset)
return http_client_content
except urllib.error.HTTPError as e:
if show_errors:
logger.error(f"HTTP error: {e.code} - {e.reason}")
except urllib.error.URLError as e:
if show_errors:
logger.error(f"URL error: {e.reason}")
logger.debug("Finished: http_client_get.")

View File

@@ -53,7 +53,7 @@ class SystemdLogger:
# Create a singleton instance variable
instance = None
def __new__(cls, name: str = "ProxLB", level: str = logging.INFO) -> 'SystemdLogger':
def __new__(cls, name: str = "ProxLB App", level: str = logging.INFO) -> 'SystemdLogger':
"""
Creating a new systemd logger class based on a given logging name
and its logging level/verbosity.