From 9ac95a2a7686255eb8a364ce5774d1c8bab7ec98 Mon Sep 17 00:00:00 2001 From: Lorow Date: Sun, 7 Dec 2025 15:32:35 +0100 Subject: [PATCH] Fix json parsing in openiris_device in cases where logs got stiched together, fix tests not waiting long enough after reboot, add board cleanup step after test session, fix leftover prints in networks representation breaking json output --- .../ProjectConfig/ProjectConfig/Models.hpp | 3 - tests/conftest.py | 38 +++++++++++- tests/test_commands.py | 60 +++++++++++-------- tests/utils.py | 2 +- tools/openiris_device.py | 14 +++++ 5 files changed, 85 insertions(+), 32 deletions(-) diff --git a/components/ProjectConfig/ProjectConfig/Models.hpp b/components/ProjectConfig/ProjectConfig/Models.hpp index 915cfed..1b45b05 100644 --- a/components/ProjectConfig/ProjectConfig/Models.hpp +++ b/components/ProjectConfig/ProjectConfig/Models.hpp @@ -331,14 +331,11 @@ public: { for (auto i = 0; i < this->networks.size() - 1; i++) { - printf("we're at %d while networks size is %d ", i, this->networks.size() - 2); WifiConfigRepresentation += Helpers::format_string("%s, ", this->networks[i].toRepresentation().c_str()); } } WifiConfigRepresentation += Helpers::format_string("%s", this->networks[networks.size() - 1].toRepresentation().c_str()); - printf(WifiConfigRepresentation.c_str()); - printf("\n"); } return Helpers::format_string( diff --git a/tests/conftest.py b/tests/conftest.py index 3af100d..3414e8c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass import dotenv import pytest import time @@ -115,9 +116,32 @@ def board_connection(request): yield board_connection +@dataclass +class TestConfig: + WIFI_SSID: str + WIFI_PASS: str + SWITCH_MODE_REBOOT_TIME: int + WIFI_CONNECTION_TIMEOUT: int + INVALID_WIFI_CONNECTION_TIMEOUT: int + + def __init__( + self, + WIFI_SSID: str, + WIFI_PASS: str, + SWITCH_MODE_REBOOT_TIME: int, + WIFI_CONNECTION_TIMEOUT: int, + INVALID_WIFI_CONNECTION_TIMEOUT: int, + ): + self.WIFI_SSID = WIFI_SSID + self.WIFI_PASS = WIFI_PASS + self.SWITCH_MODE_REBOOT_TIME = int(SWITCH_MODE_REBOOT_TIME) + self.WIFI_CONNECTION_TIMEOUT = int(WIFI_CONNECTION_TIMEOUT) + self.INVALID_WIFI_CONNECTION_TIMEOUT = int(INVALID_WIFI_CONNECTION_TIMEOUT) + + @pytest.fixture(scope="session") def config(): - config = dotenv.dotenv_values() + config = TestConfig(**dotenv.dotenv_values()) yield config @@ -168,7 +192,7 @@ def ensure_board_in_mode(openiris_device_manager, config): print("Rebooting the board after changing mode") device.send_command("restart_device") - sleep_timeout = int(config["SWITCH_MODE_REBOOT_TIME"]) + sleep_timeout = int(config.SWITCH_MODE_REBOOT_TIME) print( f"Sleeping for {sleep_timeout} seconds to allow the device to switch modes and boot up" ) @@ -181,3 +205,13 @@ def ensure_board_in_mode(openiris_device_manager, config): return new_device return func + + +@pytest.fixture(scope="session", autouse=True) +def after_session_cleanup(openiris_device_manager, config): + yield + + print("Cleanup: Resetting the config and restarting device") + device = openiris_device_manager.get_device(config=config) + device.send_command("reset_config", {"section": "all"}) + device.send_command("restart_device") diff --git a/tests/test_commands.py b/tests/test_commands.py index ab9770e..0ddc18a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -24,18 +24,16 @@ def test_ping_wired(get_openiris_device): @pytest.mark.has_capability("wired") @pytest.mark.has_capability("wireless") -def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode): +def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode, config): device = get_openiris_device() # let's make sure we're in the wireless mode first, if we're going to try changing it device = ensure_board_in_mode("wifi", device) - command_result = device.send_command("switch_mode", {"mode": "uvc"}) - assert not has_command_failed(command_result) - - # to avoid any issues, let's restart the board with DetectPortChange() as port_selector: + command_result = device.send_command("switch_mode", {"mode": "uvc"}) + assert not has_command_failed(command_result) device.send_command("restart_device") - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) # and since we've changed the ports device = get_openiris_device(port_selector.get_new_port()) @@ -45,7 +43,7 @@ def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode): assert not has_command_failed(result) -def test_changing_mode_same_mode(get_openiris_device, ensure_board_in_mode): +def test_changing_mode_same_mode(get_openiris_device): device = get_openiris_device() result = device.send_command("get_device_mode") current_mode = result["results"][0]["result"]["data"]["mode"].lower() @@ -63,7 +61,7 @@ def test_changing_mode_invalid_mode(get_openiris_device): assert has_command_failed(command_result) -def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode): +def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode, config): def check_mdns_name(name: str): command_result = device.send_command("get_mdns_name") assert not has_command_failed(command_result) @@ -84,7 +82,7 @@ def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode): device.send_command("restart_device") # let the board boot, wait till it connects - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) check_mdns_name(second_name) @@ -97,7 +95,7 @@ def test_setting_mdns_name_invalid_payload(get_openiris_device, payload): @pytest.mark.has_capability("wired") @pytest.mark.has_capability("wireless") -def test_reboot_command(get_openiris_device, ensure_board_in_mode): +def test_reboot_command(get_openiris_device, ensure_board_in_mode, config): device = ensure_board_in_mode("wifi", get_openiris_device()) command_result = device.send_command("switch_mode", {"mode": "uvc"}) @@ -108,9 +106,9 @@ def test_reboot_command(get_openiris_device, ensure_board_in_mode): # which might be a little overkill kill and won't work on boards not supporting both modes with DetectPortChange() as port_selector: device.send_command("restart_device") - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) - assert len(port_selector.get_new_port()) + assert port_selector.get_new_port() def test_get_serial(get_openiris_device): @@ -203,8 +201,20 @@ def test_check_wifi_status(get_openiris_device, ensure_board_in_mode): @pytest.mark.has_capability("wireless") -def test_scan_networks(get_openiris_device, ensure_board_in_mode): - device = ensure_board_in_mode("wifi", get_openiris_device()) +def test_scan_networks(get_openiris_device, ensure_board_in_mode, config): + # this test might run after some tests that affect the network on the device + # which might prevent us from scanning and thus make the test fail, so we reset the config + device = get_openiris_device() + reset_command = device.send_command("reset_config", {"section": "all"}) + assert not has_command_failed(reset_command) + + with DetectPortChange() as port_selector: + device.send_command("restart_device") + time.sleep(config.SWITCH_MODE_REBOOT_TIME) + + device = ensure_board_in_mode( + "wifi", get_openiris_device(port_selector.get_new_port()) + ) command_result = device.send_command("scan_networks") assert not has_command_failed(command_result) assert len(command_result["results"][0]["result"]["data"]["networks"]) != 0 @@ -223,7 +233,7 @@ def test_reset_config_invalid_payload(get_openiris_device): assert has_command_failed(reset_command) -def test_reset_config(get_openiris_device): +def test_reset_config(get_openiris_device, config): device = get_openiris_device() command_result = device.send_command("set_mdns", {"hostname": "somedifferentname"}) assert not has_command_failed(command_result) @@ -237,7 +247,7 @@ def test_reset_config(get_openiris_device): # since the config was reset, but the data will still be held in memory, we need to reboot the device with DetectPortChange(): device.send_command("restart_device") - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) new_config = device.send_command("get_config") assert not has_command_failed(new_config) @@ -254,14 +264,14 @@ def test_set_wifi(get_openiris_device, ensure_board_in_mode, config): with DetectPortChange(): device.send_command("restart_device") - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) # now that the config is clear, let's try setting the wifi device = ensure_board_in_mode("wifi", device) params = { "name": "main", - "ssid": config["WIFI_SSID"], - "password": config["WIFI_PASS"], + "ssid": config.WIFI_SSID, + "password": config.WIFI_PASS, "channel": 0, "power": 0, } @@ -270,10 +280,8 @@ def test_set_wifi(get_openiris_device, ensure_board_in_mode, config): # now, let's force connection and check if it worked connect_wifi_result = device.send_command("connect_wifi") - assert not has_command_failed(connect_wifi_result) - time.sleep( - int(config["WIFI_CONNECTION_TIMEOUT"]) - ) # and let it try to for some time + assert not -has_command_failed(connect_wifi_result) + time.sleep(config.WIFI_CONNECTION_TIMEOUT) # and let it try to for some time wifi_status_command = device.send_command("get_wifi_status") assert not has_command_failed(wifi_status_command) @@ -297,7 +305,7 @@ def test_set_wifi_invalid_network(get_openiris_device, ensure_board_in_mode, con device.send_command("connect_wifi") time.sleep( - int(config["INVALID_WIFI_CONNECTION_TIMEOUT"]) + config.INVALID_WIFI_CONNECTION_TIMEOUT ) # and let it try to for some time wifi_status_command = device.send_command("get_wifi_status") @@ -351,8 +359,8 @@ def test_update_main_wifi_network(ensure_board_in_mode, get_openiris_device, con params2 = { **params1, - "ssid": config["WIFI_SSID"], - "password": config["WIFI_PASS"], + "ssid": config.WIFI_SSID, + "password": config.WIFI_PASS, } set_wifi_result = device.send_command("set_wifi", params1) diff --git a/tests/utils.py b/tests/utils.py index e1ed437..3b28856 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -43,7 +43,7 @@ class OpenIrisDeviceManager: self._device = OpenIrisDevice(port, False, False) self._device.connect() - time.sleep(int(config["SWITCH_MODE_REBOOT_TIME"])) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) return self._device diff --git a/tools/openiris_device.py b/tools/openiris_device.py index d35f094..f54c5d8 100644 --- a/tools/openiris_device.py +++ b/tools/openiris_device.py @@ -48,8 +48,12 @@ class OpenIrisDevice: def __check_if_response_is_complete(self, response) -> dict | None: try: + if self.debug: + print(f"\nCHECKING: {response} \n") return json.loads(response) except ValueError: + if self.debug: + print("\nCHECK FAILED\n") return None def __read_response(self, timeout: int | None = None) -> dict | None: @@ -92,6 +96,16 @@ class OpenIrisDevice: response_buffer ): return parsed_response + else: + # if it's not a valid response just yet, + # we might've stumbled a case where we got a valid response + # but to the end of it, we've got some leftovers attached + # so, we can try and find the ending + ending_idx = response_buffer[::-1].find("}") + if reparsed_response := self.__check_if_response_is_complete( + response_buffer[: len(response_buffer) - ending_idx] + ): + return reparsed_response else: time.sleep(0.1) return None