Finish writing tests for commands, fix resolution being accidentally set to lower one, improve fixtures - need rewrite

This commit is contained in:
Lorow
2025-12-06 22:42:48 +01:00
parent e2d0981e0e
commit abd10fc61a
6 changed files with 340 additions and 46 deletions

View File

@@ -5,7 +5,7 @@
CommandResult setWiFiCommand(std::shared_ptr<DependencyRegistry> registry, const nlohmann::json &json)
{
#if !CONFIG_GENERAL_ENABLE_WIRELESS
return CommandResult::getErrorResult("Not supported by current firmware");
return CommandResult::getErrorResult("Not supported by current firmware");
#endif
auto payload = json.get<WifiPayload>();
@@ -33,7 +33,7 @@ CommandResult setWiFiCommand(std::shared_ptr<DependencyRegistry> registry, const
CommandResult deleteWiFiCommand(std::shared_ptr<DependencyRegistry> registry, const nlohmann::json &json)
{
#if !CONFIG_GENERAL_ENABLE_WIRELESS
return CommandResult::getErrorResult("Not supported by current firmware");
return CommandResult::getErrorResult("Not supported by current firmware");
#endif
const auto payload = json.get<deleteNetworkPayload>();
@@ -49,7 +49,7 @@ CommandResult deleteWiFiCommand(std::shared_ptr<DependencyRegistry> registry, co
CommandResult updateWiFiCommand(std::shared_ptr<DependencyRegistry> registry, const nlohmann::json &json)
{
#if !CONFIG_GENERAL_ENABLE_WIRELESS
return CommandResult::getErrorResult("Not supported by current firmware");
return CommandResult::getErrorResult("Not supported by current firmware");
#endif
auto payload = json.get<UpdateWifiPayload>();
@@ -82,7 +82,7 @@ CommandResult updateWiFiCommand(std::shared_ptr<DependencyRegistry> registry, co
CommandResult updateAPWiFiCommand(std::shared_ptr<DependencyRegistry> registry, const nlohmann::json &json)
{
#if !CONFIG_GENERAL_ENABLE_WIRELESS
return CommandResult::getErrorResult("Not supported by current firmware");
return CommandResult::getErrorResult("Not supported by current firmware");
#endif
const auto payload = json.get<UpdateAPWiFiPayload>();

View File

@@ -35,14 +35,14 @@ struct DeviceMode_t : BaseConfigModel
void load()
{
// Default mode can be controlled via sdkconfig:
// - If CONFIG_START_IN_UVC_MODE is enabled, default to UVC
// - Otherwise default to SETUP
// Default mode can be controlled via sdkconfig:
// - If CONFIG_START_IN_UVC_MODE is enabled, default to UVC
// - Otherwise default to SETUP
int default_mode =
#if CONFIG_START_IN_UVC_MODE
static_cast<int>(StreamingMode::UVC);
#else
static_cast<int>(StreamingMode::SETUP);
static_cast<int>(StreamingMode::SETUP);
#endif
int stored_mode = this->pref->getInt("mode", default_mode);
@@ -103,12 +103,12 @@ struct MDNSConfig_t : BaseConfigModel
void load()
{
// Default hostname comes from GENERAL_ADVERTISED_NAME (unified advertised name)
std::string default_hostname =
// Default hostname comes from GENERAL_ADVERTISED_NAME (unified advertised name)
std::string default_hostname =
#ifdef CONFIG_GENERAL_ADVERTISED_NAME
CONFIG_GENERAL_ADVERTISED_NAME;
CONFIG_GENERAL_ADVERTISED_NAME;
#else
"openiristracker";
"openiristracker";
#endif
if (default_hostname.empty())
@@ -146,7 +146,7 @@ struct CameraConfig_t : BaseConfigModel
{
this->vflip = this->pref->getInt("vflip", 0);
this->href = this->pref->getInt("href", 0);
this->framesize = this->pref->getInt("framesize", 4);
this->framesize = this->pref->getInt("framesize", 5);
this->quality = this->pref->getInt("quality", 7);
this->brightness = this->pref->getInt("brightness", 2);
};

View File

@@ -74,10 +74,11 @@ esp_err_t StreamHelpers::stream(httpd_req_t *req)
}
if (response != ESP_OK)
break;
// Only log every 100 frames to reduce overhead
static int frame_count = 0;
if (++frame_count % 100 == 0) {
if (++frame_count % 100 == 0)
{
long request_end = Helpers::getTimeInMillis();
long latency = (request_end - last_request_time);
last_request_time = request_end;
@@ -98,13 +99,12 @@ esp_err_t StreamServer::startStreamServer()
{
httpd_config_t config = HTTPD_DEFAULT_CONFIG();
config.stack_size = 20480;
// todo bring this back to 1 once we're done with logs over websockets
config.max_uri_handlers = 10;
config.server_port = STREAM_SERVER_PORT;
config.ctrl_port = STREAM_SERVER_PORT;
config.recv_wait_timeout = 5; // 5 seconds for receiving
config.send_wait_timeout = 5; // 5 seconds for sending
config.lru_purge_enable = true; // Enable LRU purge for better connection handling
config.recv_wait_timeout = 5; // 5 seconds for receiving
config.send_wait_timeout = 5; // 5 seconds for sending
config.lru_purge_enable = true; // Enable LRU purge for better connection handling
httpd_uri_t stream_page = {
.uri = "/",
@@ -139,7 +139,6 @@ esp_err_t StreamServer::startStreamServer()
httpd_register_uri_handler(camera_stream, &stream_page);
ESP_LOGI(STREAM_SERVER_TAG, "Stream server started on port %d", STREAM_SERVER_PORT);
// todo add printing IP addr here
return ESP_OK;
}

View File

@@ -1,3 +1,5 @@
WIFI_SSID=
WIFI_PASS=
SWITCH_MODE_REBOOT_TIME=5
SWITCH_MODE_REBOOT_TIME=5
WIFI_CONNECTION_TIMEOUT=5
INVALID_WIFI_CONNECTION_TIMEOUT=30

View File

@@ -2,28 +2,6 @@ import time
from tests.utils import has_command_failed, DetectPortChange
import pytest
## TODO
# {"ping", CommandType::PING}, - tested
# {"set_wifi", CommandType::SET_WIFI},
# {"update_wifi", CommandType::UPDATE_WIFI},
# {"delete_network", CommandType::DELETE_NETWORK},
# {"update_ap_wifi", CommandType::UPDATE_AP_WIFI},
# {"set_mdns", CommandType::SET_MDNS}, - tested
# {"get_mdns_name", CommandType::GET_MDNS_NAME}, - tested
# {"update_camera", CommandType::UPDATE_CAMERA},
# {"get_config", CommandType::GET_CONFIG}, - tested
# {"reset_config", CommandType::RESET_CONFIG}, -tested
# {"restart_device", CommandType::RESTART_DEVICE} - tested,
# {"scan_networks", CommandType::SCAN_NETWORKS}, - tested
# {"get_wifi_status", CommandType::GET_WIFI_STATUS}, - tested
# {"switch_mode", CommandType::SWITCH_MODE}, - tested
# {"get_device_mode", CommandType::GET_DEVICE_MODE}, - tested
# {"set_led_duty_cycle", CommandType::SET_LED_DUTY_CYCLE}, - tested
# {"get_led_duty_cycle", CommandType::GET_LED_DUTY_CYCLE}, - tested
# {"get_serial", CommandType::GET_SERIAL}, - tested
# {"get_led_current", CommandType::GET_LED_CURRENT}, - tested
# {"get_who_am_i", CommandType::GET_WHO_AM_I}, - tested
def test_sending_invalid_command(get_openiris_device):
device = get_openiris_device()
@@ -238,8 +216,14 @@ def test_get_config(get_openiris_device):
assert not has_command_failed(command_result)
def test_reset_config(get_openiris_device):
def test_reset_config_invalid_payload(get_openiris_device):
# to test the config, we can do two things. Set the mdns, get the config, reset it, get it again and compare
device = get_openiris_device()
reset_command = device.send_command("reset_config")
assert has_command_failed(reset_command)
def test_reset_config(get_openiris_device):
device = get_openiris_device()
command_result = device.send_command("set_mdns", {"hostname": "somedifferentname"})
assert not has_command_failed(command_result)
@@ -247,10 +231,306 @@ def test_reset_config(get_openiris_device):
current_config = device.send_command("get_config")
assert not has_command_failed(current_config)
reset_command = device.send_command("reset_config")
reset_command = device.send_command("reset_config", {"section": "all"})
assert not has_command_failed(reset_command)
# since the config was reset, but the data will still be held in memory, we need to reboot the device
with DetectPortChange():
device.send_command("restart_device")
time.sleep(3)
new_config = device.send_command("get_config")
assert not has_command_failed(new_config)
assert not new_config == current_config
@pytest.mark.has_capability("wireless")
def test_set_wifi(get_openiris_device, ensure_board_in_mode, config):
# since we want to test actual connection to the network, let's reset the device and reboot it
device = get_openiris_device()
reset_command = device.send_command("reset_config", {"section": "all"})
assert not has_command_failed(reset_command)
with DetectPortChange():
device.send_command("restart_device")
time.sleep(3)
# now that the config is clear, let's try setting the wifi
device = ensure_board_in_mode("wifi", device)
params = {
"name": "main",
"ssid": config["WIFI_SSID"],
"password": config["WIFI_PASS"],
"channel": 0,
"power": 0,
}
set_wifi_result = device.send_command("set_wifi", params)
assert not has_command_failed(set_wifi_result)
# now, let's force connection and check if it worked
connect_wifi_result = device.send_command("connect_wifi")
assert not has_command_failed(connect_wifi_result)
time.sleep(
int(config["WIFI_CONNECTION_TIMEOUT"])
) # and let it try to for some time
wifi_status_command = device.send_command("get_wifi_status")
assert not has_command_failed(wifi_status_command)
assert wifi_status_command["results"][0]["result"]["data"]["status"] == "connected"
@pytest.mark.has_capability("wireless")
def test_set_wifi_invalid_network(get_openiris_device, ensure_board_in_mode, config):
device = ensure_board_in_mode("wifi", get_openiris_device())
params = {
"name": "main",
"ssid": "PleaseDontBeARealNetwork",
"password": "AndThePasswordIsFake",
"channel": 0,
"power": 0,
}
set_wifi_result = device.send_command("set_wifi", params)
# even if the network is fake, we should not fail to set it
assert not has_command_failed(set_wifi_result)
device.send_command("connect_wifi")
time.sleep(
int(config["INVALID_WIFI_CONNECTION_TIMEOUT"])
) # and let it try to for some time
wifi_status_command = device.send_command("get_wifi_status")
# the command should not fail as well, but we should get an error result
assert not has_command_failed(wifi_status_command)
assert wifi_status_command["results"][0]["result"]["data"]["status"] == "error"
# and not to break other tests, clean up
device.send_command("reset_config", {"section": "all"})
device.send_command("restart_device")
@pytest.mark.has_capability("wireless")
@pytest.mark.parametrize(
"payload",
(
{},
{
"ssid": "PleaseDontBeARealNetwork",
"password": "AndThePasswordIsFake",
"channel": 0,
"power": 0,
},
{
"name": "IaintGotNoNameAndIMustConnect",
"password": "AndThePasswordIsFake",
"channel": 0,
"power": 0,
},
),
)
def test_set_wifi_invalid_payload(ensure_board_in_mode, get_openiris_device, payload):
device = ensure_board_in_mode("wifi", get_openiris_device())
set_wifi_result = device.send_command("set_wifi", payload)
# even if the network is fake, we should not fail to set it
assert has_command_failed(set_wifi_result)
# and not to break other tests, clean up
device.send_command("reset_config", {"section": "all"})
device.send_command("restart_device")
def test_update_main_wifi_network(ensure_board_in_mode, get_openiris_device, config):
# now that the config is clear, let's try setting the wifi
device = ensure_board_in_mode("wifi", get_openiris_device())
params1 = {
"name": "main",
"ssid": "Nada",
"password": "Nuuh",
"channel": 0,
"power": 0,
}
params2 = {
**params1,
"ssid": config["WIFI_SSID"],
"password": config["WIFI_PASS"],
}
set_wifi_result = device.send_command("set_wifi", params1)
assert not has_command_failed(set_wifi_result)
set_wifi_result = device.send_command("set_wifi", params2)
assert not has_command_failed(set_wifi_result)
# and not to break other tests, clean up
device.send_command("reset_config", {"section": "all"})
device.send_command("restart_device")
def test_set_wifi_add_another_network(ensure_board_in_mode, get_openiris_device):
device = ensure_board_in_mode("wifi", get_openiris_device())
params = {
"name": "anotherNetwork",
"ssid": "PleaseDontBeARealNetwork",
"password": "AndThePassowrdIsFake",
"channel": 0,
"power": 0,
}
set_wifi_result = device.send_command("set_wifi", params)
assert not has_command_failed(set_wifi_result)
# and not to break other tests, clean up
device.send_command("reset_config", {"section": "all"})
device.send_command("restart_device")
@pytest.mark.parametrize(
"payload",
(
{
"ssid": "testAP",
"password": "12345678",
"channel": 0,
},
{
"ssid": "testAP",
"channel": 0,
},
{
"ssid": "testAP",
"password": "12345678",
},
{
"ssid": "testAP",
},
{
"password": "12345678",
},
),
)
def test_update_ap_wifi(ensure_board_in_mode, get_openiris_device, payload):
device = ensure_board_in_mode("wifi", get_openiris_device())
result = device.send_command("update_ap_wifi", payload)
assert not has_command_failed(result)
# and not to break other tests, clean up
device.send_command("reset_config", {"section": "all"})
device.send_command("restart_device")
@pytest.mark.parametrize(
"payload",
(
{}, # completely empty payload
{
"channel": 2
}, # technically valid payload, but we're missing the network name,
{
"name": "IAMNOTTHERE",
"channel": 2,
}, # None-existent network
),
)
@pytest.mark.has_capability("wireless")
def test_update_wifi_command_fail(ensure_board_in_mode, get_openiris_device, payload):
device = ensure_board_in_mode("wifi", get_openiris_device())
result = device.send_command("update_wifi", payload)
assert has_command_failed(result)
@pytest.mark.parametrize(
"payload",
(
{
"name": "anotherNetwork",
"ssid": "WEUPDATEDTHESSID",
"password": "ACOMPLETELYDIFFERENTPASS",
"channel": 1,
"power": 2,
},
{
"name": "anotherNetwork",
"password": "ACOMPLETELYDIFFERENTPASS",
},
{
"name": "anotherNetwork",
"ssid": "WEUPDATEDTHESSID",
},
{
"name": "anotherNetwork",
"channel": 1,
},
{
"name": "anotherNetwork",
"power": 2,
},
),
)
@pytest.mark.has_capability("wireless")
def test_update_wifi_command(ensure_board_in_mode, get_openiris_device, payload):
device = ensure_board_in_mode("wifi", get_openiris_device())
params = {
"name": "anotherNetwork",
"ssid": "PleaseDontBeARealNetwork",
"password": "AndThePasswordIsFake",
"channel": 0,
"power": 0,
}
set_wifi_result = device.send_command("set_wifi", params)
assert not has_command_failed(set_wifi_result)
device = ensure_board_in_mode("wifi", get_openiris_device())
result = device.send_command("update_wifi", payload)
assert not has_command_failed(result)
@pytest.mark.parametrize(
"payload",
(
{},
{"name": ""},
),
)
@pytest.mark.has_capability("wireless")
def test_delete_network_fail(ensure_board_in_mode, get_openiris_device, payload):
device = ensure_board_in_mode("wifi", get_openiris_device())
result = device.send_command("delete_network", payload)
assert has_command_failed(result)
@pytest.mark.parametrize("payload", ({"name": "main"}, {"name": "NOTANETWORK"}))
@pytest.mark.has_capability("wireless")
def test_delete_network(ensure_board_in_mode, get_openiris_device, payload):
device = ensure_board_in_mode("wifi", get_openiris_device())
result = device.send_command("delete_network", payload)
assert not has_command_failed(result)
@pytest.mark.parametrize(
"payload",
(
{},
{
"vlip": 0,
"hflip": 0,
"framesize": 5,
"quality": 7,
"brightness": 2,
},
{
"vlip": 0,
},
{
"hflip": 0,
},
{
"framesize": 5,
},
{
"quality": 7,
},
{
"brightness": 2,
},
),
)
def test_update_camera(get_openiris_device, payload):
device = get_openiris_device()
result = device.send_command("update_camera", payload)
assert not has_command_failed(result)

View File

@@ -8,6 +8,7 @@ OPENIRIS_DEVICE = None
class OpenIrisDeviceManager:
def __init__(self):
self._device: OpenIrisDevice | None = None
self.stored_ports = []
self._current_port: str | None = None
def get_device(self, port: str | None = None, config=None) -> OpenIrisDevice:
@@ -22,6 +23,16 @@ class OpenIrisDeviceManager:
if not port and not self._device:
raise ValueError("No device connected yet, provide a port first")
# I'm not sure if I like this approach
# maybe I need to rethink this fixture
current_ports = get_current_ports()
new_port = get_new_port(self.stored_ports, current_ports)
if new_port is not None:
self.stored_ports = current_ports
if not port:
port = new_port
if port and port != self._current_port:
print(f"Port changed from {self._current_port} to {port}, reconnecting...")
self._current_port = port
@@ -46,7 +57,9 @@ def get_current_ports() -> list[str]:
def get_new_port(old_ports, new_ports) -> str:
return list(set(new_ports) - set(old_ports))[0]
if ports_diff := list(set(new_ports) - set(old_ports)):
return ports_diff[0]
return None
class DetectPortChange: