Initial support for a hardware test harness with pytest and UV

This commit is contained in:
Lorow
2025-11-23 17:15:26 +01:00
parent 7d2eedf5f9
commit 00c4fe66c4
12 changed files with 440 additions and 142 deletions

5
.gitignore vendored
View File

@@ -90,11 +90,12 @@ sdkconfig
# Local History for Visual Studio Code
.history/
tests/.env
# Built Visual Studio Code Extensions
*.vsix
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
.ionide
*\__pycache__

View File

@@ -6,6 +6,8 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pyserial>=3.5",
"pytest>=9.0.1",
"python-dotenv>=1.2.1",
]
[dependency-groups]
@@ -26,4 +28,9 @@ push = false
]
"sdkconfig" = [
'CONFIG_GENERAL_VERSION="{version}"',
]
]
[tool.pytest]
testpaths = [
"tests"
]

View File

@@ -1,16 +0,0 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: CC0-1.0
import logging
import os
import pytest
from pytest_embedded_idf.dut import IdfDut
@pytest.mark.supported_targets
@pytest.mark.generic
def test_blink(dut: IdfDut) -> None:
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, "blink.bin")
bin_size = os.path.getsize(binary_file)
logging.info("blink_bin_size : {}KB".format(bin_size // 1024))

3
tests/.env.example Normal file
View File

@@ -0,0 +1,3 @@
WIFI_SSID=
WIFI_PASS=
SWITCH_MODE_REBOOT_TIME=10

0
tests/__init__.py Normal file
View File

165
tests/conftest.py Normal file
View File

@@ -0,0 +1,165 @@
import dotenv
import pytest
import time
from tests.utils import (
OpenIrisDeviceManager,
has_command_failed,
get_current_ports,
get_new_port,
)
board_capabilities = {
"esp_eye": ["wired", "wireless"],
"esp32AIThinker": ["wireless"],
"esp32Cam": ["wireless"],
"esp32M5Stack": ["wireless"],
"facefocusvr_eye_L": ["wired", "measure_current"],
"facefocusvr_eye_R": ["wired", "measure_current"],
"facefocusvr_face": ["wired", "measure_current"],
"project_babble": ["wireless", "wired"],
"seed_studio": ["wireless", "wired"],
"wrooms3": ["wireless", "wired"],
"wrooms3QIO": ["wireless", "wired"],
"wrover": ["wireless", "wired"],
}
def pytest_addoption(parser):
parser.addoption("--board", action="store")
parser.addoption(
"--connection",
action="store",
help="Defines how to connect to the given board, wireless by ip/mdns or wired by com/cdc",
)
def pytest_configure(config):
config.addinivalue_line(
"markers", "has_capability(cap): skip if the board does not have the capability"
)
@pytest.fixture(autouse=True)
def check_capability_marker(request, board_lacks_capability):
"""
Autorun on each test, checks if the board we started with, has the required capability
This lets us skip tests that are impossible to run on some boards - like for example:
It's impossible to run wired tests on a wireless board
It's impossible to run tests for measuring current on boards without this feature
"""
if marker := request.node.get_closest_marker("has_capability"):
if not len(marker.args):
raise ValueError(
"has_capability marker must be provided with a capability to check"
)
required_capability = marker.args[0]
if board_lacks_capability(required_capability):
pytest.skip(f"Board does not have capability {required_capability}")
@pytest.fixture(scope="session", autouse=True)
def board_name(request):
board_name = request.config.getoption("--board")
if not board_name:
raise ValueError("No board defined")
yield board_name
@pytest.fixture()
def board_lacks_capability(board_name):
def func(capability: str):
if board_name:
if board_name not in board_capabilities:
raise ValueError(f"Unknown board {board_name}")
return capability not in board_capabilities[board_name]
return True
return func
@pytest.fixture(scope="session", autouse=True)
def board_connection(request):
"""
Grabs the specified connection connection method, to be used ONLY for the initial connection. Everything after it HAS to be handled via Device Manager.
Ports WILL change throughout the tests, device manager can keep track of that and reconnect the board as needed.
"""
board_connection = request.config.getoption("--connection")
if not board_connection:
raise ValueError("No connection method defined")
yield board_connection
@pytest.fixture(scope="session")
def config():
config = dotenv.dotenv_values()
yield config
@pytest.fixture(scope="session")
def openiris_device_manager(board_connection, config):
manager = OpenIrisDeviceManager()
manager.get_device(board_connection, config)
yield manager
if manager._device:
manager._device.disconnect()
@pytest.fixture()
def get_openiris_device(openiris_device_manager):
def func():
return openiris_device_manager.get_device()
return func
@pytest.fixture()
def ensure_board_in_mode(openiris_device_manager, config):
"""
Given the OpenIrisDevice manager, grabs the current device and ensures it's in the required mode
if not, sends the command to switch and attempts reconnection if necessary, returning the device back
"""
supported_modes = ["wifi", "uvc"]
def func(mode, device):
if mode not in supported_modes:
raise ValueError(f"{mode} is not a supported mode")
command_result = device.send_command("get_device_mode")
if has_command_failed(command_result):
raise ValueError(f"Failed to get device mode, error: {command_result}")
current_mode = command_result["results"][0]["result"]["data"]["mode"].lower()
if mode == current_mode:
return device
old_ports = get_current_ports()
command_result = device.send_command("switch_mode", {"mode": mode})
if has_command_failed(command_result):
raise ValueError("Failed to switch mode, rerun the tests")
print("Rebooting the board after changing mode")
device.send_command("restart_device")
sleep_timeout = int(config["SWITCH_MODE_REBOOT_TIME"])
print(
f"Sleeping for {sleep_timeout} seconds to allow the device to switch modes and boot up"
)
time.sleep(sleep_timeout)
new_ports = get_current_ports()
new_device = openiris_device_manager.get_device(
get_new_port(old_ports, new_ports), config
)
return new_device
return func

View File

@@ -0,0 +1,11 @@
from tests.utils import has_command_failed
import pytest
@pytest.mark.has_capability("wired")
def test_ping_wired(get_openiris_device, ensure_board_in_mode):
device = get_openiris_device()
device = ensure_board_in_mode("wifi", device)
command_result = device.send_command("ping")
assert not has_command_failed(command_result)

49
tests/utils.py Normal file
View File

@@ -0,0 +1,49 @@
import time
import serial.tools.list_ports
from tools.openiris_device import OpenIrisDevice
OPENIRIS_DEVICE = None
class OpenIrisDeviceManager:
def __init__(self):
self._device: OpenIrisDevice | None = None
self._current_port: str | None = None
def get_device(self, port: str | None = None, config=None) -> OpenIrisDevice:
"""
Returns the current OpenIrisDevice connection helper
if the port changed from the one given previously, it will attempt to reconnect
if no device exists, we will create one and try to connect
This helper is designed to be used within a session long fixture
"""
if not port and not self._device:
raise ValueError("No device connected yet, provide a port first")
if port and port != self._current_port:
print(f"Port changed from {self._current_port} to {port}, reconnecting...")
self._current_port = port
if self._device:
self._device.disconnect()
self._device = None
self._device = OpenIrisDevice(port, False, False)
self._device.connect()
time.sleep(int(config["SWITCH_MODE_REBOOT_TIME"]))
return self._device
def has_command_failed(result) -> bool:
return "error" in result or result["results"][0]["result"]["status"] != "success"
def get_current_ports() -> list[str]:
return [port.name for port in serial.tools.list_ports.comports()]
def get_new_port(old_ports, new_ports) -> str:
return list(set(new_ports) - set(old_ports))[0]

0
tools/__init__.py Normal file
View File

129
tools/openiris_device.py Normal file
View File

@@ -0,0 +1,129 @@
import time
import json
import serial
class OpenIrisDevice:
def __init__(self, port: str, debug: bool, debug_commands: bool):
self.port = port
self.debug = debug
self.debug_commands = debug_commands
self.connection: serial.Serial | None = None
self.connected = False
def __enter__(self):
self.connected = self.__connect()
return self
def __exit__(self, type, value, traceback):
self.__disconnect()
self.connected = False
def connect(self):
self.connected = self.__connect()
def disconnect(self):
self.__disconnect()
self.connected = False
def __connect(self) -> bool:
print(f"📡 Connecting directly to {self.port}...")
try:
self.connection = serial.Serial(
port=self.port, baudrate=115200, timeout=1, write_timeout=1
)
self.connection.dtr = False
self.connection.rts = False
print(f"✅ Connected to the device on {self.port}")
return True
except Exception as e:
print(f"❌ Failed to connect to {self.port}: {e}")
return False
def __disconnect(self):
if self.connection and self.connection.is_open:
self.connection.close()
print(f"🔌 Disconnected from {self.port}")
def __check_if_response_is_complete(self, response) -> dict | None:
try:
return json.loads(response)
except ValueError:
return None
def __read_response(self, timeout: int | None = None) -> dict | None:
# we can try and retrieve the response now.
# it should be more or less immediate, but some commands may take longer
# so we gotta timeout
timeout = timeout if timeout is not None else 15
start_time = time.time()
response_buffer = ""
while time.time() - start_time < timeout:
if self.connection.in_waiting:
packet = self.connection.read_all().decode("utf-8", errors="ignore")
if self.debug and packet.strip():
print(f"Received: {packet}")
print("-" * 10)
print(f"Current buffer: {response_buffer}")
print("-" * 10)
# we can't rely on new lines to detect if we're done
# nor can we assume that we're always gonna get valid json response
# but we can assume that if we're to get a valid response, it's gonna be json
# so we can start actually building the buffer only when
# some part of the packet starts with "{", and start building from there
# we can assume that no further data will be returned, so we can validate
# right after receiving the last packet
if (not response_buffer and "{" in packet) or response_buffer:
# assume we just started building the buffer and we've received the first packet
# alternative approach in case this doesn't work - we're always sending a valid json
# so we can start building the buffer from the first packet and keep trying to find the
# starting and ending brackets, extract that part and validate, if the message is complete, return
if not response_buffer:
starting_idx = packet.find("{")
response_buffer = packet[starting_idx:]
else:
response_buffer += packet
# and once we get something, we can validate if it's a valid json
if parsed_response := self.__check_if_response_is_complete(
response_buffer
):
return parsed_response
else:
time.sleep(0.1)
return None
def is_connected(self) -> bool:
return self.connected
def send_command(
self, command: str, params: dict | None = None, timeout: int | None = None
) -> dict:
if not self.connection or not self.connection.is_open:
return {"error": "Device Not Connected"}
cmd_obj = {"commands": [{"command": command}]}
if params:
cmd_obj["commands"][0]["data"] = params
# we're expecting the json string to end with a new line
# to signify we've finished sending the command
cmd_str = json.dumps(cmd_obj) + "\n"
try:
# clean it out first, just to be sure we're starting fresh
self.connection.reset_input_buffer()
if self.debug or self.debug_commands:
print(f"Sending command: {cmd_str}")
self.connection.write(cmd_str.encode())
response = self.__read_response(timeout)
if self.debug:
print(f"Received response: {response}")
return response or {"error": "Command timeout"}
except Exception as e:
return {"error": f"Communication error: {e}"}

View File

@@ -5,14 +5,14 @@
# ///
import json
import time
import argparse
import sys
import serial
import string
from dataclasses import dataclass
from openiris_device import OpenIrisDevice
def is_back(choice: str):
return choice.lower() in ["back", "b", "exit"]
@@ -79,124 +79,6 @@ class Menu(SubMenu):
super().__init__(title, context, parent_menu)
class OpenIrisDevice:
def __init__(self, port: str, debug: bool, debug_commands: bool):
self.port = port
self.debug = debug
self.debug_commands = debug_commands
self.connection: serial.Serial | None = None
self.connected = False
def __enter__(self):
self.connected = self.__connect()
return self
def __exit__(self, type, value, traceback):
self.__disconnect()
def __connect(self) -> bool:
print(f"📡 Connecting directly to {self.port}...")
try:
self.connection = serial.Serial(
port=self.port, baudrate=115200, timeout=1, write_timeout=1
)
self.connection.dtr = False
self.connection.rts = False
print(f"✅ Connected to the device on {self.port}")
return True
except Exception as e:
print(f"❌ Failed to connect to {self.port}: {e}")
return False
def __disconnect(self):
if self.connection and self.connection.is_open:
self.connection.close()
print(f"🔌 Disconnected from {self.port}")
def __check_if_response_is_complete(self, response) -> dict | None:
try:
return json.loads(response)
except ValueError:
return None
def __read_response(self, timeout: int | None = None) -> dict | None:
# we can try and retrieve the response now.
# it should be more or less immediate, but some commands may take longer
# so we gotta timeout
timeout = timeout if timeout is not None else 15
start_time = time.time()
response_buffer = ""
while time.time() - start_time < timeout:
if self.connection.in_waiting:
packet = self.connection.read_all().decode("utf-8", errors="ignore")
if self.debug and packet.strip():
print(f"Received: {packet}")
print("-" * 10)
print(f"Current buffer: {response_buffer}")
print("-" * 10)
# we can't rely on new lines to detect if we're done
# nor can we assume that we're always gonna get valid json response
# but we can assume that if we're to get a valid response, it's gonna be json
# so we can start actually building the buffer only when
# some part of the packet starts with "{", and start building from there
# we can assume that no further data will be returned, so we can validate
# right after receiving the last packet
if (not response_buffer and "{" in packet) or response_buffer:
# assume we just started building the buffer and we've received the first packet
# alternative approach in case this doesn't work - we're always sending a valid json
# so we can start building the buffer from the first packet and keep trying to find the
# starting and ending brackets, extract that part and validate, if the message is complete, return
if not response_buffer:
starting_idx = packet.find("{")
response_buffer = packet[starting_idx:]
else:
response_buffer += packet
# and once we get something, we can validate if it's a valid json
if parsed_response := self.__check_if_response_is_complete(
response_buffer
):
return parsed_response
else:
time.sleep(0.1)
return None
def is_connected(self) -> bool:
return self.connected
def send_command(
self, command: str, params: dict | None = None, timeout: int | None = None
) -> dict:
if not self.connection or not self.connection.is_open:
return {"error": "Device Not Connected"}
cmd_obj = {"commands": [{"command": command}]}
if params:
cmd_obj["commands"][0]["data"] = params
# we're expecting the json string to end with a new line
# to signify we've finished sending the command
cmd_str = json.dumps(cmd_obj) + "\n"
try:
# clean it out first, just to be sure we're starting fresh
self.connection.reset_input_buffer()
if self.debug or self.debug_commands:
print(f"Sending command: {cmd_str}")
self.connection.write(cmd_str.encode())
response = self.__read_response(timeout)
if self.debug:
print(f"Received response: {response}")
return response or {"error": "Command timeout"}
except Exception as e:
return {"error": f"Communication error: {e}"}
@dataclass
class WiFiNetwork:
ssid: str
@@ -635,7 +517,7 @@ def automatic_wifi_configuration(
ip = None
last_status = None
while time.time() - start < timeout_s:
status = get_wifi_status(device).get("wifi_status", {})
status = get_wifi_status(device).get("wifi_status") or {}
last_status = status
ip = status.get("ip_address")
if ip and ip not in ("0.0.0.0", "", None):
@@ -663,7 +545,7 @@ def attempt_wifi_connection(device: OpenIrisDevice, *args, **kwargs):
def check_wifi_status(device: OpenIrisDevice, *args, **kwargs):
status = get_wifi_status(device).get("wifi_status")
status = get_wifi_status(device).get("wifi_status") or {}
print(f"📶 WiFi Status: {status.get('status', 'Unknown')}")
if ip_address := status.get("ip_address"):
print(f"🌐 IP Address: {ip_address}")

69
uv.lock generated
View File

@@ -8,6 +8,8 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "pyserial" },
{ name = "pytest" },
{ name = "python-dotenv" },
]
[package.dev-dependencies]
@@ -16,7 +18,11 @@ dev = [
]
[package.metadata]
requires-dist = [{ name = "pyserial", specifier = ">=3.5" }]
requires-dist = [
{ name = "pyserial", specifier = ">=3.5" },
{ name = "pytest", specifier = ">=9.0.1" },
{ name = "python-dotenv", specifier = ">=1.2.1" },
]
[package.metadata.requires-dev]
dev = [{ name = "bumpver", specifier = ">=2025.1131" }]
@@ -57,6 +63,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "iniconfig"
version = "2.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" },
]
[[package]]
name = "lexid"
version = "2021.1006"
@@ -66,6 +81,33 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/cf/e3/35764404a4b7e2021be1f88f42264c2e92e0c4720273559a62461ce64a47/lexid-2021.1006-py2.py3-none-any.whl", hash = "sha256:5526bb5606fd74c7add23320da5f02805bddd7c77916f2dc1943e6bada8605ed", size = 7587, upload-time = "2021-04-02T20:18:33.129Z" },
]
[[package]]
name = "packaging"
version = "25.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pyserial"
version = "3.5"
@@ -75,6 +117,31 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585, upload-time = "2020-11-23T03:59:13.41Z" },
]
[[package]]
name = "pytest"
version = "9.0.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/07/56/f013048ac4bc4c1d9be45afd4ab209ea62822fb1598f40687e6bf45dcea4/pytest-9.0.1.tar.gz", hash = "sha256:3e9c069ea73583e255c3b21cf46b8d3c56f6e3a1a8f6da94ccb0fcf57b9d73c8", size = 1564125, upload-time = "2025-11-12T13:05:09.333Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/8b/6300fb80f858cda1c51ffa17075df5d846757081d11ab4aa35cef9e6258b/pytest-9.0.1-py3-none-any.whl", hash = "sha256:67be0030d194df2dfa7b556f2e56fb3c3315bd5c8822c6951162b92b32ce7dad", size = 373668, upload-time = "2025-11-12T13:05:07.379Z" },
]
[[package]]
name = "python-dotenv"
version = "1.2.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f0/26/19cadc79a718c5edbec86fd4919a6b6d3f681039a2f6d66d14be94e75fb9/python_dotenv-1.2.1.tar.gz", hash = "sha256:42667e897e16ab0d66954af0e60a9caa94f0fd4ecf3aaf6d2d260eec1aa36ad6", size = 44221, upload-time = "2025-10-26T15:12:10.434Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/14/1b/a298b06749107c305e1fe0f814c6c74aea7b2f1e10989cb30f544a1b3253/python_dotenv-1.2.1-py3-none-any.whl", hash = "sha256:b81ee9561e9ca4004139c6cbba3a238c32b03e4894671e181b671e8cb8425d61", size = 21230, upload-time = "2025-10-26T15:12:09.109Z" },
]
[[package]]
name = "toml"
version = "0.10.2"