Files
OpenIris-ESPIDF/tests/utils.py

79 lines
2.3 KiB
Python

import time
import serial.tools.list_ports
from tools.openiris_device import OpenIrisDevice
OPENIRIS_DEVICE = None
class OpenIrisDeviceManager:
def __init__(self):
self._device: OpenIrisDevice | None = None
self.stored_ports = []
self._current_port: str | None = None
def get_device(self, port: str | None = None, config=None) -> OpenIrisDevice:
"""
Returns the current OpenIrisDevice connection helper
if the port changed from the one given previously, it will attempt to reconnect
if no device exists, we will create one and try to connect
This helper is designed to be used within a session long fixture
"""
if not port and not self._device:
raise ValueError("No device connected yet, provide a port first")
# I'm not sure if I like this approach
# maybe I need to rethink this fixture
current_ports = get_current_ports()
new_port = get_new_port(self.stored_ports, current_ports)
if new_port is not None:
self.stored_ports = current_ports
if not port:
port = new_port
if port and port != self._current_port:
print(f"Port changed from {self._current_port} to {port}, reconnecting...")
self._current_port = port
if self._device:
self._device.disconnect()
self._device = None
self._device = OpenIrisDevice(port, False, False)
self._device.connect()
time.sleep(config.SWITCH_MODE_REBOOT_TIME)
return self._device
def has_command_failed(result) -> bool:
return "error" in result or result["results"][0]["result"]["status"] != "success"
def get_current_ports() -> list[str]:
return [port.name for port in serial.tools.list_ports.comports()]
def get_new_port(old_ports, new_ports) -> str:
if ports_diff := list(set(new_ports) - set(old_ports)):
return ports_diff[0]
return None
class DetectPortChange:
def __init__(self):
self.old_ports = []
self.new_ports = []
def __enter__(self, *args, **kwargs):
self.old_ports = get_current_ports()
return self
def __exit__(self, *args, **kwargs):
self.new_ports = get_current_ports()
def get_new_port(self):
return get_new_port(self.old_ports, self.new_ports)