mirror of
https://github.com/MrUnknownDE/OpenIris-ESPIDF.git
synced 2026-04-09 01:43:49 +02:00
79 lines
2.3 KiB
Python
79 lines
2.3 KiB
Python
import time
|
|
import serial.tools.list_ports
|
|
from tools.openiris_device import OpenIrisDevice
|
|
|
|
OPENIRIS_DEVICE = None
|
|
|
|
|
|
class OpenIrisDeviceManager:
|
|
def __init__(self):
|
|
self._device: OpenIrisDevice | None = None
|
|
self.stored_ports = []
|
|
self._current_port: str | None = None
|
|
|
|
def get_device(self, port: str | None = None, config=None) -> OpenIrisDevice:
|
|
"""
|
|
Returns the current OpenIrisDevice connection helper
|
|
|
|
if the port changed from the one given previously, it will attempt to reconnect
|
|
if no device exists, we will create one and try to connect
|
|
|
|
This helper is designed to be used within a session long fixture
|
|
"""
|
|
if not port and not self._device:
|
|
raise ValueError("No device connected yet, provide a port first")
|
|
|
|
# I'm not sure if I like this approach
|
|
# maybe I need to rethink this fixture
|
|
current_ports = get_current_ports()
|
|
new_port = get_new_port(self.stored_ports, current_ports)
|
|
if new_port is not None:
|
|
self.stored_ports = current_ports
|
|
|
|
if not port:
|
|
port = new_port
|
|
|
|
if port and port != self._current_port:
|
|
print(f"Port changed from {self._current_port} to {port}, reconnecting...")
|
|
self._current_port = port
|
|
|
|
if self._device:
|
|
self._device.disconnect()
|
|
self._device = None
|
|
|
|
self._device = OpenIrisDevice(port, False, False)
|
|
self._device.connect()
|
|
time.sleep(config.SWITCH_MODE_REBOOT_TIME)
|
|
|
|
return self._device
|
|
|
|
|
|
def has_command_failed(result) -> bool:
|
|
return "error" in result or result["results"][0]["result"]["status"] != "success"
|
|
|
|
|
|
def get_current_ports() -> list[str]:
|
|
return [port.name for port in serial.tools.list_ports.comports()]
|
|
|
|
|
|
def get_new_port(old_ports, new_ports) -> str:
|
|
if ports_diff := list(set(new_ports) - set(old_ports)):
|
|
return ports_diff[0]
|
|
return None
|
|
|
|
|
|
class DetectPortChange:
|
|
def __init__(self):
|
|
self.old_ports = []
|
|
self.new_ports = []
|
|
|
|
def __enter__(self, *args, **kwargs):
|
|
self.old_ports = get_current_ports()
|
|
return self
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
self.new_ports = get_current_ports()
|
|
|
|
def get_new_port(self):
|
|
return get_new_port(self.old_ports, self.new_ports)
|