Fix json parsing in openiris_device in cases where logs got stiched together, fix tests not waiting long enough after reboot, add board cleanup step after test session, fix leftover prints in networks representation breaking json output

This commit is contained in:
Lorow
2025-12-07 15:32:35 +01:00
parent abd10fc61a
commit 9ac95a2a76
5 changed files with 85 additions and 32 deletions

View File

@@ -331,14 +331,11 @@ public:
{ {
for (auto i = 0; i < this->networks.size() - 1; i++) for (auto i = 0; i < this->networks.size() - 1; i++)
{ {
printf("we're at %d while networks size is %d ", i, this->networks.size() - 2);
WifiConfigRepresentation += Helpers::format_string("%s, ", this->networks[i].toRepresentation().c_str()); WifiConfigRepresentation += Helpers::format_string("%s, ", this->networks[i].toRepresentation().c_str());
} }
} }
WifiConfigRepresentation += Helpers::format_string("%s", this->networks[networks.size() - 1].toRepresentation().c_str()); WifiConfigRepresentation += Helpers::format_string("%s", this->networks[networks.size() - 1].toRepresentation().c_str());
printf(WifiConfigRepresentation.c_str());
printf("\n");
} }
return Helpers::format_string( return Helpers::format_string(

View File

@@ -1,3 +1,4 @@
from dataclasses import dataclass
import dotenv import dotenv
import pytest import pytest
import time import time
@@ -115,9 +116,32 @@ def board_connection(request):
yield board_connection yield board_connection
@dataclass
class TestConfig:
WIFI_SSID: str
WIFI_PASS: str
SWITCH_MODE_REBOOT_TIME: int
WIFI_CONNECTION_TIMEOUT: int
INVALID_WIFI_CONNECTION_TIMEOUT: int
def __init__(
self,
WIFI_SSID: str,
WIFI_PASS: str,
SWITCH_MODE_REBOOT_TIME: int,
WIFI_CONNECTION_TIMEOUT: int,
INVALID_WIFI_CONNECTION_TIMEOUT: int,
):
self.WIFI_SSID = WIFI_SSID
self.WIFI_PASS = WIFI_PASS
self.SWITCH_MODE_REBOOT_TIME = int(SWITCH_MODE_REBOOT_TIME)
self.WIFI_CONNECTION_TIMEOUT = int(WIFI_CONNECTION_TIMEOUT)
self.INVALID_WIFI_CONNECTION_TIMEOUT = int(INVALID_WIFI_CONNECTION_TIMEOUT)
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def config(): def config():
config = dotenv.dotenv_values() config = TestConfig(**dotenv.dotenv_values())
yield config yield config
@@ -168,7 +192,7 @@ def ensure_board_in_mode(openiris_device_manager, config):
print("Rebooting the board after changing mode") print("Rebooting the board after changing mode")
device.send_command("restart_device") device.send_command("restart_device")
sleep_timeout = int(config["SWITCH_MODE_REBOOT_TIME"]) sleep_timeout = int(config.SWITCH_MODE_REBOOT_TIME)
print( print(
f"Sleeping for {sleep_timeout} seconds to allow the device to switch modes and boot up" f"Sleeping for {sleep_timeout} seconds to allow the device to switch modes and boot up"
) )
@@ -181,3 +205,13 @@ def ensure_board_in_mode(openiris_device_manager, config):
return new_device return new_device
return func return func
@pytest.fixture(scope="session", autouse=True)
def after_session_cleanup(openiris_device_manager, config):
yield
print("Cleanup: Resetting the config and restarting device")
device = openiris_device_manager.get_device(config=config)
device.send_command("reset_config", {"section": "all"})
device.send_command("restart_device")

View File

@@ -24,18 +24,16 @@ def test_ping_wired(get_openiris_device):
@pytest.mark.has_capability("wired") @pytest.mark.has_capability("wired")
@pytest.mark.has_capability("wireless") @pytest.mark.has_capability("wireless")
def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode): def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode, config):
device = get_openiris_device() device = get_openiris_device()
# let's make sure we're in the wireless mode first, if we're going to try changing it # let's make sure we're in the wireless mode first, if we're going to try changing it
device = ensure_board_in_mode("wifi", device) device = ensure_board_in_mode("wifi", device)
command_result = device.send_command("switch_mode", {"mode": "uvc"})
assert not has_command_failed(command_result)
# to avoid any issues, let's restart the board
with DetectPortChange() as port_selector: with DetectPortChange() as port_selector:
command_result = device.send_command("switch_mode", {"mode": "uvc"})
assert not has_command_failed(command_result)
device.send_command("restart_device") device.send_command("restart_device")
time.sleep(3) time.sleep(config.SWITCH_MODE_REBOOT_TIME)
# and since we've changed the ports # and since we've changed the ports
device = get_openiris_device(port_selector.get_new_port()) device = get_openiris_device(port_selector.get_new_port())
@@ -45,7 +43,7 @@ def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode):
assert not has_command_failed(result) assert not has_command_failed(result)
def test_changing_mode_same_mode(get_openiris_device, ensure_board_in_mode): def test_changing_mode_same_mode(get_openiris_device):
device = get_openiris_device() device = get_openiris_device()
result = device.send_command("get_device_mode") result = device.send_command("get_device_mode")
current_mode = result["results"][0]["result"]["data"]["mode"].lower() current_mode = result["results"][0]["result"]["data"]["mode"].lower()
@@ -63,7 +61,7 @@ def test_changing_mode_invalid_mode(get_openiris_device):
assert has_command_failed(command_result) assert has_command_failed(command_result)
def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode): def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode, config):
def check_mdns_name(name: str): def check_mdns_name(name: str):
command_result = device.send_command("get_mdns_name") command_result = device.send_command("get_mdns_name")
assert not has_command_failed(command_result) assert not has_command_failed(command_result)
@@ -84,7 +82,7 @@ def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode):
device.send_command("restart_device") device.send_command("restart_device")
# let the board boot, wait till it connects # let the board boot, wait till it connects
time.sleep(3) time.sleep(config.SWITCH_MODE_REBOOT_TIME)
check_mdns_name(second_name) check_mdns_name(second_name)
@@ -97,7 +95,7 @@ def test_setting_mdns_name_invalid_payload(get_openiris_device, payload):
@pytest.mark.has_capability("wired") @pytest.mark.has_capability("wired")
@pytest.mark.has_capability("wireless") @pytest.mark.has_capability("wireless")
def test_reboot_command(get_openiris_device, ensure_board_in_mode): def test_reboot_command(get_openiris_device, ensure_board_in_mode, config):
device = ensure_board_in_mode("wifi", get_openiris_device()) device = ensure_board_in_mode("wifi", get_openiris_device())
command_result = device.send_command("switch_mode", {"mode": "uvc"}) command_result = device.send_command("switch_mode", {"mode": "uvc"})
@@ -108,9 +106,9 @@ def test_reboot_command(get_openiris_device, ensure_board_in_mode):
# which might be a little overkill kill and won't work on boards not supporting both modes # which might be a little overkill kill and won't work on boards not supporting both modes
with DetectPortChange() as port_selector: with DetectPortChange() as port_selector:
device.send_command("restart_device") device.send_command("restart_device")
time.sleep(3) time.sleep(config.SWITCH_MODE_REBOOT_TIME)
assert len(port_selector.get_new_port()) assert port_selector.get_new_port()
def test_get_serial(get_openiris_device): def test_get_serial(get_openiris_device):
@@ -203,8 +201,20 @@ def test_check_wifi_status(get_openiris_device, ensure_board_in_mode):
@pytest.mark.has_capability("wireless") @pytest.mark.has_capability("wireless")
def test_scan_networks(get_openiris_device, ensure_board_in_mode): def test_scan_networks(get_openiris_device, ensure_board_in_mode, config):
device = ensure_board_in_mode("wifi", get_openiris_device()) # this test might run after some tests that affect the network on the device
# which might prevent us from scanning and thus make the test fail, so we reset the config
device = get_openiris_device()
reset_command = device.send_command("reset_config", {"section": "all"})
assert not has_command_failed(reset_command)
with DetectPortChange() as port_selector:
device.send_command("restart_device")
time.sleep(config.SWITCH_MODE_REBOOT_TIME)
device = ensure_board_in_mode(
"wifi", get_openiris_device(port_selector.get_new_port())
)
command_result = device.send_command("scan_networks") command_result = device.send_command("scan_networks")
assert not has_command_failed(command_result) assert not has_command_failed(command_result)
assert len(command_result["results"][0]["result"]["data"]["networks"]) != 0 assert len(command_result["results"][0]["result"]["data"]["networks"]) != 0
@@ -223,7 +233,7 @@ def test_reset_config_invalid_payload(get_openiris_device):
assert has_command_failed(reset_command) assert has_command_failed(reset_command)
def test_reset_config(get_openiris_device): def test_reset_config(get_openiris_device, config):
device = get_openiris_device() device = get_openiris_device()
command_result = device.send_command("set_mdns", {"hostname": "somedifferentname"}) command_result = device.send_command("set_mdns", {"hostname": "somedifferentname"})
assert not has_command_failed(command_result) assert not has_command_failed(command_result)
@@ -237,7 +247,7 @@ def test_reset_config(get_openiris_device):
# since the config was reset, but the data will still be held in memory, we need to reboot the device # since the config was reset, but the data will still be held in memory, we need to reboot the device
with DetectPortChange(): with DetectPortChange():
device.send_command("restart_device") device.send_command("restart_device")
time.sleep(3) time.sleep(config.SWITCH_MODE_REBOOT_TIME)
new_config = device.send_command("get_config") new_config = device.send_command("get_config")
assert not has_command_failed(new_config) assert not has_command_failed(new_config)
@@ -254,14 +264,14 @@ def test_set_wifi(get_openiris_device, ensure_board_in_mode, config):
with DetectPortChange(): with DetectPortChange():
device.send_command("restart_device") device.send_command("restart_device")
time.sleep(3) time.sleep(config.SWITCH_MODE_REBOOT_TIME)
# now that the config is clear, let's try setting the wifi # now that the config is clear, let's try setting the wifi
device = ensure_board_in_mode("wifi", device) device = ensure_board_in_mode("wifi", device)
params = { params = {
"name": "main", "name": "main",
"ssid": config["WIFI_SSID"], "ssid": config.WIFI_SSID,
"password": config["WIFI_PASS"], "password": config.WIFI_PASS,
"channel": 0, "channel": 0,
"power": 0, "power": 0,
} }
@@ -270,10 +280,8 @@ def test_set_wifi(get_openiris_device, ensure_board_in_mode, config):
# now, let's force connection and check if it worked # now, let's force connection and check if it worked
connect_wifi_result = device.send_command("connect_wifi") connect_wifi_result = device.send_command("connect_wifi")
assert not has_command_failed(connect_wifi_result) assert not -has_command_failed(connect_wifi_result)
time.sleep( time.sleep(config.WIFI_CONNECTION_TIMEOUT) # and let it try to for some time
int(config["WIFI_CONNECTION_TIMEOUT"])
) # and let it try to for some time
wifi_status_command = device.send_command("get_wifi_status") wifi_status_command = device.send_command("get_wifi_status")
assert not has_command_failed(wifi_status_command) assert not has_command_failed(wifi_status_command)
@@ -297,7 +305,7 @@ def test_set_wifi_invalid_network(get_openiris_device, ensure_board_in_mode, con
device.send_command("connect_wifi") device.send_command("connect_wifi")
time.sleep( time.sleep(
int(config["INVALID_WIFI_CONNECTION_TIMEOUT"]) config.INVALID_WIFI_CONNECTION_TIMEOUT
) # and let it try to for some time ) # and let it try to for some time
wifi_status_command = device.send_command("get_wifi_status") wifi_status_command = device.send_command("get_wifi_status")
@@ -351,8 +359,8 @@ def test_update_main_wifi_network(ensure_board_in_mode, get_openiris_device, con
params2 = { params2 = {
**params1, **params1,
"ssid": config["WIFI_SSID"], "ssid": config.WIFI_SSID,
"password": config["WIFI_PASS"], "password": config.WIFI_PASS,
} }
set_wifi_result = device.send_command("set_wifi", params1) set_wifi_result = device.send_command("set_wifi", params1)

View File

@@ -43,7 +43,7 @@ class OpenIrisDeviceManager:
self._device = OpenIrisDevice(port, False, False) self._device = OpenIrisDevice(port, False, False)
self._device.connect() self._device.connect()
time.sleep(int(config["SWITCH_MODE_REBOOT_TIME"])) time.sleep(config.SWITCH_MODE_REBOOT_TIME)
return self._device return self._device

View File

@@ -48,8 +48,12 @@ class OpenIrisDevice:
def __check_if_response_is_complete(self, response) -> dict | None: def __check_if_response_is_complete(self, response) -> dict | None:
try: try:
if self.debug:
print(f"\nCHECKING: {response} \n")
return json.loads(response) return json.loads(response)
except ValueError: except ValueError:
if self.debug:
print("\nCHECK FAILED\n")
return None return None
def __read_response(self, timeout: int | None = None) -> dict | None: def __read_response(self, timeout: int | None = None) -> dict | None:
@@ -92,6 +96,16 @@ class OpenIrisDevice:
response_buffer response_buffer
): ):
return parsed_response return parsed_response
else:
# if it's not a valid response just yet,
# we might've stumbled a case where we got a valid response
# but to the end of it, we've got some leftovers attached
# so, we can try and find the ending
ending_idx = response_buffer[::-1].find("}")
if reparsed_response := self.__check_if_response_is_complete(
response_buffer[: len(response_buffer) - ending_idx]
):
return reparsed_response
else: else:
time.sleep(0.1) time.sleep(0.1)
return None return None