From abd10fc61a4df436310a5326570f4861eaaa6899 Mon Sep 17 00:00:00 2001 From: Lorow Date: Sat, 6 Dec 2025 22:42:48 +0100 Subject: [PATCH] Finish writing tests for commands, fix resolution being accidentally set to lower one, improve fixtures - need rewrite --- .../CommandManager/commands/wifi_commands.cpp | 8 +- .../ProjectConfig/ProjectConfig/Models.hpp | 18 +- .../StreamServer/StreamServer.cpp | 13 +- tests/.env.example | 4 +- tests/test_commands.py | 328 ++++++++++++++++-- tests/utils.py | 15 +- 6 files changed, 340 insertions(+), 46 deletions(-) diff --git a/components/CommandManager/CommandManager/commands/wifi_commands.cpp b/components/CommandManager/CommandManager/commands/wifi_commands.cpp index db81371..f99b517 100644 --- a/components/CommandManager/CommandManager/commands/wifi_commands.cpp +++ b/components/CommandManager/CommandManager/commands/wifi_commands.cpp @@ -5,7 +5,7 @@ CommandResult setWiFiCommand(std::shared_ptr registry, const nlohmann::json &json) { #if !CONFIG_GENERAL_ENABLE_WIRELESS - return CommandResult::getErrorResult("Not supported by current firmware"); + return CommandResult::getErrorResult("Not supported by current firmware"); #endif auto payload = json.get(); @@ -33,7 +33,7 @@ CommandResult setWiFiCommand(std::shared_ptr registry, const CommandResult deleteWiFiCommand(std::shared_ptr registry, const nlohmann::json &json) { #if !CONFIG_GENERAL_ENABLE_WIRELESS - return CommandResult::getErrorResult("Not supported by current firmware"); + return CommandResult::getErrorResult("Not supported by current firmware"); #endif const auto payload = json.get(); @@ -49,7 +49,7 @@ CommandResult deleteWiFiCommand(std::shared_ptr registry, co CommandResult updateWiFiCommand(std::shared_ptr registry, const nlohmann::json &json) { #if !CONFIG_GENERAL_ENABLE_WIRELESS - return CommandResult::getErrorResult("Not supported by current firmware"); + return CommandResult::getErrorResult("Not supported by current firmware"); #endif auto payload = json.get(); @@ -82,7 +82,7 @@ CommandResult updateWiFiCommand(std::shared_ptr registry, co CommandResult updateAPWiFiCommand(std::shared_ptr registry, const nlohmann::json &json) { #if !CONFIG_GENERAL_ENABLE_WIRELESS - return CommandResult::getErrorResult("Not supported by current firmware"); + return CommandResult::getErrorResult("Not supported by current firmware"); #endif const auto payload = json.get(); diff --git a/components/ProjectConfig/ProjectConfig/Models.hpp b/components/ProjectConfig/ProjectConfig/Models.hpp index 448f625..915cfed 100644 --- a/components/ProjectConfig/ProjectConfig/Models.hpp +++ b/components/ProjectConfig/ProjectConfig/Models.hpp @@ -35,14 +35,14 @@ struct DeviceMode_t : BaseConfigModel void load() { - // Default mode can be controlled via sdkconfig: - // - If CONFIG_START_IN_UVC_MODE is enabled, default to UVC - // - Otherwise default to SETUP + // Default mode can be controlled via sdkconfig: + // - If CONFIG_START_IN_UVC_MODE is enabled, default to UVC + // - Otherwise default to SETUP int default_mode = #if CONFIG_START_IN_UVC_MODE static_cast(StreamingMode::UVC); #else - static_cast(StreamingMode::SETUP); + static_cast(StreamingMode::SETUP); #endif int stored_mode = this->pref->getInt("mode", default_mode); @@ -103,12 +103,12 @@ struct MDNSConfig_t : BaseConfigModel void load() { - // Default hostname comes from GENERAL_ADVERTISED_NAME (unified advertised name) - std::string default_hostname = + // Default hostname comes from GENERAL_ADVERTISED_NAME (unified advertised name) + std::string default_hostname = #ifdef CONFIG_GENERAL_ADVERTISED_NAME - CONFIG_GENERAL_ADVERTISED_NAME; + CONFIG_GENERAL_ADVERTISED_NAME; #else - "openiristracker"; + "openiristracker"; #endif if (default_hostname.empty()) @@ -146,7 +146,7 @@ struct CameraConfig_t : BaseConfigModel { this->vflip = this->pref->getInt("vflip", 0); this->href = this->pref->getInt("href", 0); - this->framesize = this->pref->getInt("framesize", 4); + this->framesize = this->pref->getInt("framesize", 5); this->quality = this->pref->getInt("quality", 7); this->brightness = this->pref->getInt("brightness", 2); }; diff --git a/components/StreamServer/StreamServer/StreamServer.cpp b/components/StreamServer/StreamServer/StreamServer.cpp index 5908452..de448e7 100644 --- a/components/StreamServer/StreamServer/StreamServer.cpp +++ b/components/StreamServer/StreamServer/StreamServer.cpp @@ -74,10 +74,11 @@ esp_err_t StreamHelpers::stream(httpd_req_t *req) } if (response != ESP_OK) break; - + // Only log every 100 frames to reduce overhead static int frame_count = 0; - if (++frame_count % 100 == 0) { + if (++frame_count % 100 == 0) + { long request_end = Helpers::getTimeInMillis(); long latency = (request_end - last_request_time); last_request_time = request_end; @@ -98,13 +99,12 @@ esp_err_t StreamServer::startStreamServer() { httpd_config_t config = HTTPD_DEFAULT_CONFIG(); config.stack_size = 20480; - // todo bring this back to 1 once we're done with logs over websockets config.max_uri_handlers = 10; config.server_port = STREAM_SERVER_PORT; config.ctrl_port = STREAM_SERVER_PORT; - config.recv_wait_timeout = 5; // 5 seconds for receiving - config.send_wait_timeout = 5; // 5 seconds for sending - config.lru_purge_enable = true; // Enable LRU purge for better connection handling + config.recv_wait_timeout = 5; // 5 seconds for receiving + config.send_wait_timeout = 5; // 5 seconds for sending + config.lru_purge_enable = true; // Enable LRU purge for better connection handling httpd_uri_t stream_page = { .uri = "/", @@ -139,7 +139,6 @@ esp_err_t StreamServer::startStreamServer() httpd_register_uri_handler(camera_stream, &stream_page); ESP_LOGI(STREAM_SERVER_TAG, "Stream server started on port %d", STREAM_SERVER_PORT); - // todo add printing IP addr here return ESP_OK; } \ No newline at end of file diff --git a/tests/.env.example b/tests/.env.example index 632a783..d59b7c2 100644 --- a/tests/.env.example +++ b/tests/.env.example @@ -1,3 +1,5 @@ WIFI_SSID= WIFI_PASS= -SWITCH_MODE_REBOOT_TIME=5 \ No newline at end of file +SWITCH_MODE_REBOOT_TIME=5 +WIFI_CONNECTION_TIMEOUT=5 +INVALID_WIFI_CONNECTION_TIMEOUT=30 \ No newline at end of file diff --git a/tests/test_commands.py b/tests/test_commands.py index ed166a4..ab9770e 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2,28 +2,6 @@ import time from tests.utils import has_command_failed, DetectPortChange import pytest -## TODO -# {"ping", CommandType::PING}, - tested -# {"set_wifi", CommandType::SET_WIFI}, -# {"update_wifi", CommandType::UPDATE_WIFI}, -# {"delete_network", CommandType::DELETE_NETWORK}, -# {"update_ap_wifi", CommandType::UPDATE_AP_WIFI}, -# {"set_mdns", CommandType::SET_MDNS}, - tested -# {"get_mdns_name", CommandType::GET_MDNS_NAME}, - tested -# {"update_camera", CommandType::UPDATE_CAMERA}, -# {"get_config", CommandType::GET_CONFIG}, - tested -# {"reset_config", CommandType::RESET_CONFIG}, -tested -# {"restart_device", CommandType::RESTART_DEVICE} - tested, -# {"scan_networks", CommandType::SCAN_NETWORKS}, - tested -# {"get_wifi_status", CommandType::GET_WIFI_STATUS}, - tested -# {"switch_mode", CommandType::SWITCH_MODE}, - tested -# {"get_device_mode", CommandType::GET_DEVICE_MODE}, - tested -# {"set_led_duty_cycle", CommandType::SET_LED_DUTY_CYCLE}, - tested -# {"get_led_duty_cycle", CommandType::GET_LED_DUTY_CYCLE}, - tested -# {"get_serial", CommandType::GET_SERIAL}, - tested -# {"get_led_current", CommandType::GET_LED_CURRENT}, - tested -# {"get_who_am_i", CommandType::GET_WHO_AM_I}, - tested - def test_sending_invalid_command(get_openiris_device): device = get_openiris_device() @@ -238,8 +216,14 @@ def test_get_config(get_openiris_device): assert not has_command_failed(command_result) -def test_reset_config(get_openiris_device): +def test_reset_config_invalid_payload(get_openiris_device): # to test the config, we can do two things. Set the mdns, get the config, reset it, get it again and compare + device = get_openiris_device() + reset_command = device.send_command("reset_config") + assert has_command_failed(reset_command) + + +def test_reset_config(get_openiris_device): device = get_openiris_device() command_result = device.send_command("set_mdns", {"hostname": "somedifferentname"}) assert not has_command_failed(command_result) @@ -247,10 +231,306 @@ def test_reset_config(get_openiris_device): current_config = device.send_command("get_config") assert not has_command_failed(current_config) - reset_command = device.send_command("reset_config") + reset_command = device.send_command("reset_config", {"section": "all"}) assert not has_command_failed(reset_command) + # since the config was reset, but the data will still be held in memory, we need to reboot the device + with DetectPortChange(): + device.send_command("restart_device") + time.sleep(3) + new_config = device.send_command("get_config") assert not has_command_failed(new_config) assert not new_config == current_config + + +@pytest.mark.has_capability("wireless") +def test_set_wifi(get_openiris_device, ensure_board_in_mode, config): + # since we want to test actual connection to the network, let's reset the device and reboot it + device = get_openiris_device() + reset_command = device.send_command("reset_config", {"section": "all"}) + assert not has_command_failed(reset_command) + + with DetectPortChange(): + device.send_command("restart_device") + time.sleep(3) + + # now that the config is clear, let's try setting the wifi + device = ensure_board_in_mode("wifi", device) + params = { + "name": "main", + "ssid": config["WIFI_SSID"], + "password": config["WIFI_PASS"], + "channel": 0, + "power": 0, + } + set_wifi_result = device.send_command("set_wifi", params) + assert not has_command_failed(set_wifi_result) + + # now, let's force connection and check if it worked + connect_wifi_result = device.send_command("connect_wifi") + assert not has_command_failed(connect_wifi_result) + time.sleep( + int(config["WIFI_CONNECTION_TIMEOUT"]) + ) # and let it try to for some time + + wifi_status_command = device.send_command("get_wifi_status") + assert not has_command_failed(wifi_status_command) + assert wifi_status_command["results"][0]["result"]["data"]["status"] == "connected" + + +@pytest.mark.has_capability("wireless") +def test_set_wifi_invalid_network(get_openiris_device, ensure_board_in_mode, config): + device = ensure_board_in_mode("wifi", get_openiris_device()) + params = { + "name": "main", + "ssid": "PleaseDontBeARealNetwork", + "password": "AndThePasswordIsFake", + "channel": 0, + "power": 0, + } + set_wifi_result = device.send_command("set_wifi", params) + # even if the network is fake, we should not fail to set it + assert not has_command_failed(set_wifi_result) + + device.send_command("connect_wifi") + + time.sleep( + int(config["INVALID_WIFI_CONNECTION_TIMEOUT"]) + ) # and let it try to for some time + + wifi_status_command = device.send_command("get_wifi_status") + # the command should not fail as well, but we should get an error result + assert not has_command_failed(wifi_status_command) + assert wifi_status_command["results"][0]["result"]["data"]["status"] == "error" + # and not to break other tests, clean up + device.send_command("reset_config", {"section": "all"}) + device.send_command("restart_device") + + +@pytest.mark.has_capability("wireless") +@pytest.mark.parametrize( + "payload", + ( + {}, + { + "ssid": "PleaseDontBeARealNetwork", + "password": "AndThePasswordIsFake", + "channel": 0, + "power": 0, + }, + { + "name": "IaintGotNoNameAndIMustConnect", + "password": "AndThePasswordIsFake", + "channel": 0, + "power": 0, + }, + ), +) +def test_set_wifi_invalid_payload(ensure_board_in_mode, get_openiris_device, payload): + device = ensure_board_in_mode("wifi", get_openiris_device()) + set_wifi_result = device.send_command("set_wifi", payload) + # even if the network is fake, we should not fail to set it + assert has_command_failed(set_wifi_result) + # and not to break other tests, clean up + device.send_command("reset_config", {"section": "all"}) + device.send_command("restart_device") + + +def test_update_main_wifi_network(ensure_board_in_mode, get_openiris_device, config): + # now that the config is clear, let's try setting the wifi + device = ensure_board_in_mode("wifi", get_openiris_device()) + params1 = { + "name": "main", + "ssid": "Nada", + "password": "Nuuh", + "channel": 0, + "power": 0, + } + + params2 = { + **params1, + "ssid": config["WIFI_SSID"], + "password": config["WIFI_PASS"], + } + + set_wifi_result = device.send_command("set_wifi", params1) + assert not has_command_failed(set_wifi_result) + + set_wifi_result = device.send_command("set_wifi", params2) + assert not has_command_failed(set_wifi_result) + # and not to break other tests, clean up + device.send_command("reset_config", {"section": "all"}) + device.send_command("restart_device") + + +def test_set_wifi_add_another_network(ensure_board_in_mode, get_openiris_device): + device = ensure_board_in_mode("wifi", get_openiris_device()) + params = { + "name": "anotherNetwork", + "ssid": "PleaseDontBeARealNetwork", + "password": "AndThePassowrdIsFake", + "channel": 0, + "power": 0, + } + set_wifi_result = device.send_command("set_wifi", params) + assert not has_command_failed(set_wifi_result) + # and not to break other tests, clean up + device.send_command("reset_config", {"section": "all"}) + device.send_command("restart_device") + + +@pytest.mark.parametrize( + "payload", + ( + { + "ssid": "testAP", + "password": "12345678", + "channel": 0, + }, + { + "ssid": "testAP", + "channel": 0, + }, + { + "ssid": "testAP", + "password": "12345678", + }, + { + "ssid": "testAP", + }, + { + "password": "12345678", + }, + ), +) +def test_update_ap_wifi(ensure_board_in_mode, get_openiris_device, payload): + device = ensure_board_in_mode("wifi", get_openiris_device()) + result = device.send_command("update_ap_wifi", payload) + assert not has_command_failed(result) + # and not to break other tests, clean up + device.send_command("reset_config", {"section": "all"}) + device.send_command("restart_device") + + +@pytest.mark.parametrize( + "payload", + ( + {}, # completely empty payload + { + "channel": 2 + }, # technically valid payload, but we're missing the network name, + { + "name": "IAMNOTTHERE", + "channel": 2, + }, # None-existent network + ), +) +@pytest.mark.has_capability("wireless") +def test_update_wifi_command_fail(ensure_board_in_mode, get_openiris_device, payload): + device = ensure_board_in_mode("wifi", get_openiris_device()) + result = device.send_command("update_wifi", payload) + assert has_command_failed(result) + + +@pytest.mark.parametrize( + "payload", + ( + { + "name": "anotherNetwork", + "ssid": "WEUPDATEDTHESSID", + "password": "ACOMPLETELYDIFFERENTPASS", + "channel": 1, + "power": 2, + }, + { + "name": "anotherNetwork", + "password": "ACOMPLETELYDIFFERENTPASS", + }, + { + "name": "anotherNetwork", + "ssid": "WEUPDATEDTHESSID", + }, + { + "name": "anotherNetwork", + "channel": 1, + }, + { + "name": "anotherNetwork", + "power": 2, + }, + ), +) +@pytest.mark.has_capability("wireless") +def test_update_wifi_command(ensure_board_in_mode, get_openiris_device, payload): + device = ensure_board_in_mode("wifi", get_openiris_device()) + params = { + "name": "anotherNetwork", + "ssid": "PleaseDontBeARealNetwork", + "password": "AndThePasswordIsFake", + "channel": 0, + "power": 0, + } + set_wifi_result = device.send_command("set_wifi", params) + assert not has_command_failed(set_wifi_result) + + device = ensure_board_in_mode("wifi", get_openiris_device()) + result = device.send_command("update_wifi", payload) + assert not has_command_failed(result) + + +@pytest.mark.parametrize( + "payload", + ( + {}, + {"name": ""}, + ), +) +@pytest.mark.has_capability("wireless") +def test_delete_network_fail(ensure_board_in_mode, get_openiris_device, payload): + device = ensure_board_in_mode("wifi", get_openiris_device()) + result = device.send_command("delete_network", payload) + assert has_command_failed(result) + + +@pytest.mark.parametrize("payload", ({"name": "main"}, {"name": "NOTANETWORK"})) +@pytest.mark.has_capability("wireless") +def test_delete_network(ensure_board_in_mode, get_openiris_device, payload): + device = ensure_board_in_mode("wifi", get_openiris_device()) + result = device.send_command("delete_network", payload) + assert not has_command_failed(result) + + +@pytest.mark.parametrize( + "payload", + ( + {}, + { + "vlip": 0, + "hflip": 0, + "framesize": 5, + "quality": 7, + "brightness": 2, + }, + { + "vlip": 0, + }, + { + "hflip": 0, + }, + { + "framesize": 5, + }, + { + "quality": 7, + }, + { + "brightness": 2, + }, + ), +) +def test_update_camera(get_openiris_device, payload): + device = get_openiris_device() + result = device.send_command("update_camera", payload) + assert not has_command_failed(result) diff --git a/tests/utils.py b/tests/utils.py index d3a8a25..e1ed437 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,6 +8,7 @@ OPENIRIS_DEVICE = None class OpenIrisDeviceManager: def __init__(self): self._device: OpenIrisDevice | None = None + self.stored_ports = [] self._current_port: str | None = None def get_device(self, port: str | None = None, config=None) -> OpenIrisDevice: @@ -22,6 +23,16 @@ class OpenIrisDeviceManager: if not port and not self._device: raise ValueError("No device connected yet, provide a port first") + # I'm not sure if I like this approach + # maybe I need to rethink this fixture + current_ports = get_current_ports() + new_port = get_new_port(self.stored_ports, current_ports) + if new_port is not None: + self.stored_ports = current_ports + + if not port: + port = new_port + if port and port != self._current_port: print(f"Port changed from {self._current_port} to {port}, reconnecting...") self._current_port = port @@ -46,7 +57,9 @@ def get_current_ports() -> list[str]: def get_new_port(old_ports, new_ports) -> str: - return list(set(new_ports) - set(old_ports))[0] + if ports_diff := list(set(new_ports) - set(old_ports)): + return ports_diff[0] + return None class DetectPortChange: