diff --git a/components/ProjectConfig/ProjectConfig/Models.hpp b/components/ProjectConfig/ProjectConfig/Models.hpp index 915cfed..1b45b05 100644 --- a/components/ProjectConfig/ProjectConfig/Models.hpp +++ b/components/ProjectConfig/ProjectConfig/Models.hpp @@ -331,14 +331,11 @@ public: { for (auto i = 0; i < this->networks.size() - 1; i++) { - printf("we're at %d while networks size is %d ", i, this->networks.size() - 2); WifiConfigRepresentation += Helpers::format_string("%s, ", this->networks[i].toRepresentation().c_str()); } } WifiConfigRepresentation += Helpers::format_string("%s", this->networks[networks.size() - 1].toRepresentation().c_str()); - printf(WifiConfigRepresentation.c_str()); - printf("\n"); } return Helpers::format_string( diff --git a/tests/conftest.py b/tests/conftest.py index 3af100d..3414e8c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass import dotenv import pytest import time @@ -115,9 +116,32 @@ def board_connection(request): yield board_connection +@dataclass +class TestConfig: + WIFI_SSID: str + WIFI_PASS: str + SWITCH_MODE_REBOOT_TIME: int + WIFI_CONNECTION_TIMEOUT: int + INVALID_WIFI_CONNECTION_TIMEOUT: int + + def __init__( + self, + WIFI_SSID: str, + WIFI_PASS: str, + SWITCH_MODE_REBOOT_TIME: int, + WIFI_CONNECTION_TIMEOUT: int, + INVALID_WIFI_CONNECTION_TIMEOUT: int, + ): + self.WIFI_SSID = WIFI_SSID + self.WIFI_PASS = WIFI_PASS + self.SWITCH_MODE_REBOOT_TIME = int(SWITCH_MODE_REBOOT_TIME) + self.WIFI_CONNECTION_TIMEOUT = int(WIFI_CONNECTION_TIMEOUT) + self.INVALID_WIFI_CONNECTION_TIMEOUT = int(INVALID_WIFI_CONNECTION_TIMEOUT) + + @pytest.fixture(scope="session") def config(): - config = dotenv.dotenv_values() + config = TestConfig(**dotenv.dotenv_values()) yield config @@ -168,7 +192,7 @@ def ensure_board_in_mode(openiris_device_manager, config): print("Rebooting the board after changing mode") device.send_command("restart_device") - sleep_timeout = int(config["SWITCH_MODE_REBOOT_TIME"]) + sleep_timeout = int(config.SWITCH_MODE_REBOOT_TIME) print( f"Sleeping for {sleep_timeout} seconds to allow the device to switch modes and boot up" ) @@ -181,3 +205,13 @@ def ensure_board_in_mode(openiris_device_manager, config): return new_device return func + + +@pytest.fixture(scope="session", autouse=True) +def after_session_cleanup(openiris_device_manager, config): + yield + + print("Cleanup: Resetting the config and restarting device") + device = openiris_device_manager.get_device(config=config) + device.send_command("reset_config", {"section": "all"}) + device.send_command("restart_device") diff --git a/tests/test_commands.py b/tests/test_commands.py index ab9770e..0ddc18a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -24,18 +24,16 @@ def test_ping_wired(get_openiris_device): @pytest.mark.has_capability("wired") @pytest.mark.has_capability("wireless") -def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode): +def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode, config): device = get_openiris_device() # let's make sure we're in the wireless mode first, if we're going to try changing it device = ensure_board_in_mode("wifi", device) - command_result = device.send_command("switch_mode", {"mode": "uvc"}) - assert not has_command_failed(command_result) - - # to avoid any issues, let's restart the board with DetectPortChange() as port_selector: + command_result = device.send_command("switch_mode", {"mode": "uvc"}) + assert not has_command_failed(command_result) device.send_command("restart_device") - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) # and since we've changed the ports device = get_openiris_device(port_selector.get_new_port()) @@ -45,7 +43,7 @@ def test_changing_mode_to_wired(get_openiris_device, ensure_board_in_mode): assert not has_command_failed(result) -def test_changing_mode_same_mode(get_openiris_device, ensure_board_in_mode): +def test_changing_mode_same_mode(get_openiris_device): device = get_openiris_device() result = device.send_command("get_device_mode") current_mode = result["results"][0]["result"]["data"]["mode"].lower() @@ -63,7 +61,7 @@ def test_changing_mode_invalid_mode(get_openiris_device): assert has_command_failed(command_result) -def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode): +def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode, config): def check_mdns_name(name: str): command_result = device.send_command("get_mdns_name") assert not has_command_failed(command_result) @@ -84,7 +82,7 @@ def test_setting_mdns_name(get_openiris_device, ensure_board_in_mode): device.send_command("restart_device") # let the board boot, wait till it connects - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) check_mdns_name(second_name) @@ -97,7 +95,7 @@ def test_setting_mdns_name_invalid_payload(get_openiris_device, payload): @pytest.mark.has_capability("wired") @pytest.mark.has_capability("wireless") -def test_reboot_command(get_openiris_device, ensure_board_in_mode): +def test_reboot_command(get_openiris_device, ensure_board_in_mode, config): device = ensure_board_in_mode("wifi", get_openiris_device()) command_result = device.send_command("switch_mode", {"mode": "uvc"}) @@ -108,9 +106,9 @@ def test_reboot_command(get_openiris_device, ensure_board_in_mode): # which might be a little overkill kill and won't work on boards not supporting both modes with DetectPortChange() as port_selector: device.send_command("restart_device") - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) - assert len(port_selector.get_new_port()) + assert port_selector.get_new_port() def test_get_serial(get_openiris_device): @@ -203,8 +201,20 @@ def test_check_wifi_status(get_openiris_device, ensure_board_in_mode): @pytest.mark.has_capability("wireless") -def test_scan_networks(get_openiris_device, ensure_board_in_mode): - device = ensure_board_in_mode("wifi", get_openiris_device()) +def test_scan_networks(get_openiris_device, ensure_board_in_mode, config): + # this test might run after some tests that affect the network on the device + # which might prevent us from scanning and thus make the test fail, so we reset the config + device = get_openiris_device() + reset_command = device.send_command("reset_config", {"section": "all"}) + assert not has_command_failed(reset_command) + + with DetectPortChange() as port_selector: + device.send_command("restart_device") + time.sleep(config.SWITCH_MODE_REBOOT_TIME) + + device = ensure_board_in_mode( + "wifi", get_openiris_device(port_selector.get_new_port()) + ) command_result = device.send_command("scan_networks") assert not has_command_failed(command_result) assert len(command_result["results"][0]["result"]["data"]["networks"]) != 0 @@ -223,7 +233,7 @@ def test_reset_config_invalid_payload(get_openiris_device): assert has_command_failed(reset_command) -def test_reset_config(get_openiris_device): +def test_reset_config(get_openiris_device, config): device = get_openiris_device() command_result = device.send_command("set_mdns", {"hostname": "somedifferentname"}) assert not has_command_failed(command_result) @@ -237,7 +247,7 @@ def test_reset_config(get_openiris_device): # since the config was reset, but the data will still be held in memory, we need to reboot the device with DetectPortChange(): device.send_command("restart_device") - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) new_config = device.send_command("get_config") assert not has_command_failed(new_config) @@ -254,14 +264,14 @@ def test_set_wifi(get_openiris_device, ensure_board_in_mode, config): with DetectPortChange(): device.send_command("restart_device") - time.sleep(3) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) # now that the config is clear, let's try setting the wifi device = ensure_board_in_mode("wifi", device) params = { "name": "main", - "ssid": config["WIFI_SSID"], - "password": config["WIFI_PASS"], + "ssid": config.WIFI_SSID, + "password": config.WIFI_PASS, "channel": 0, "power": 0, } @@ -270,10 +280,8 @@ def test_set_wifi(get_openiris_device, ensure_board_in_mode, config): # now, let's force connection and check if it worked connect_wifi_result = device.send_command("connect_wifi") - assert not has_command_failed(connect_wifi_result) - time.sleep( - int(config["WIFI_CONNECTION_TIMEOUT"]) - ) # and let it try to for some time + assert not -has_command_failed(connect_wifi_result) + time.sleep(config.WIFI_CONNECTION_TIMEOUT) # and let it try to for some time wifi_status_command = device.send_command("get_wifi_status") assert not has_command_failed(wifi_status_command) @@ -297,7 +305,7 @@ def test_set_wifi_invalid_network(get_openiris_device, ensure_board_in_mode, con device.send_command("connect_wifi") time.sleep( - int(config["INVALID_WIFI_CONNECTION_TIMEOUT"]) + config.INVALID_WIFI_CONNECTION_TIMEOUT ) # and let it try to for some time wifi_status_command = device.send_command("get_wifi_status") @@ -351,8 +359,8 @@ def test_update_main_wifi_network(ensure_board_in_mode, get_openiris_device, con params2 = { **params1, - "ssid": config["WIFI_SSID"], - "password": config["WIFI_PASS"], + "ssid": config.WIFI_SSID, + "password": config.WIFI_PASS, } set_wifi_result = device.send_command("set_wifi", params1) diff --git a/tests/utils.py b/tests/utils.py index e1ed437..3b28856 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -43,7 +43,7 @@ class OpenIrisDeviceManager: self._device = OpenIrisDevice(port, False, False) self._device.connect() - time.sleep(int(config["SWITCH_MODE_REBOOT_TIME"])) + time.sleep(config.SWITCH_MODE_REBOOT_TIME) return self._device diff --git a/tools/openiris_device.py b/tools/openiris_device.py index d35f094..f54c5d8 100644 --- a/tools/openiris_device.py +++ b/tools/openiris_device.py @@ -48,8 +48,12 @@ class OpenIrisDevice: def __check_if_response_is_complete(self, response) -> dict | None: try: + if self.debug: + print(f"\nCHECKING: {response} \n") return json.loads(response) except ValueError: + if self.debug: + print("\nCHECK FAILED\n") return None def __read_response(self, timeout: int | None = None) -> dict | None: @@ -92,6 +96,16 @@ class OpenIrisDevice: response_buffer ): return parsed_response + else: + # if it's not a valid response just yet, + # we might've stumbled a case where we got a valid response + # but to the end of it, we've got some leftovers attached + # so, we can try and find the ending + ending_idx = response_buffer[::-1].find("}") + if reparsed_response := self.__check_if_response_is_complete( + response_buffer[: len(response_buffer) - ending_idx] + ): + return reparsed_response else: time.sleep(0.1) return None