mirror of
https://github.com/MrUnknownDE/OpenIris-ESPIDF.git
synced 2026-05-04 21:26:04 +02:00
Make bssid optional in payload for backwards compatibility, add tests,
This commit is contained in:
@@ -1,8 +1,33 @@
|
|||||||
#include "CommandSchema.hpp"
|
#include "CommandSchema.hpp"
|
||||||
|
|
||||||
|
void to_json(nlohmann::json& j, const WifiPayload& payload)
|
||||||
|
{
|
||||||
|
j = nlohmann::json{
|
||||||
|
{"name", payload.name}, {"ssid", payload.ssid}, {"bssid", payload.bssid},
|
||||||
|
{"password", payload.password}, {"channel", payload.channel}, {"power", payload.power},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
void from_json(const nlohmann::json& j, WifiPayload& payload)
|
||||||
|
{
|
||||||
|
payload.name = j.at("name").get<std::string>();
|
||||||
|
payload.ssid = j.at("ssid").get<std::string>();
|
||||||
|
payload.password = j.at("password").get<std::string>();
|
||||||
|
payload.channel = j.at("channel").get<uint8_t>();
|
||||||
|
payload.power = j.at("power").get<uint8_t>();
|
||||||
|
|
||||||
|
if (j.contains("bssid"))
|
||||||
|
{
|
||||||
|
payload.bssid = j.at("bssid").get<std::string>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void to_json(nlohmann::json& j, const UpdateWifiPayload& payload)
|
void to_json(nlohmann::json& j, const UpdateWifiPayload& payload)
|
||||||
{
|
{
|
||||||
j = nlohmann::json{{"name", payload.name}, {"ssid", payload.ssid}, {"password", payload.password}, {"channel", payload.channel}, {"power", payload.power}};
|
j = nlohmann::json{
|
||||||
|
{"name", payload.name}, {"ssid", payload.ssid}, {"bssid", payload.bssid},
|
||||||
|
{"password", payload.password}, {"channel", payload.channel}, {"power", payload.power},
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
void from_json(const nlohmann::json& j, UpdateWifiPayload& payload)
|
void from_json(const nlohmann::json& j, UpdateWifiPayload& payload)
|
||||||
@@ -15,7 +40,7 @@ void from_json(const nlohmann::json& j, UpdateWifiPayload& payload)
|
|||||||
|
|
||||||
if (j.contains("bssid"))
|
if (j.contains("bssid"))
|
||||||
{
|
{
|
||||||
payload.ssid = j.at("bssid").get<std::string>();
|
payload.bssid = j.at("bssid").get<std::string>();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (j.contains("password"))
|
if (j.contains("password"))
|
||||||
@@ -59,7 +84,8 @@ void from_json(const nlohmann::json& j, UpdateAPWiFiPayload& payload)
|
|||||||
void to_json(nlohmann::json& j, const UpdateCameraConfigPayload& payload)
|
void to_json(nlohmann::json& j, const UpdateCameraConfigPayload& payload)
|
||||||
{
|
{
|
||||||
j = nlohmann::json{
|
j = nlohmann::json{
|
||||||
{"vflip", payload.vflip}, {"href", payload.href}, {"framesize", payload.framesize}, {"quality", payload.quality}, {"brightness", payload.brightness}};
|
{"vflip", payload.vflip}, {"href", payload.href}, {"framesize", payload.framesize}, {"quality", payload.quality}, {"brightness", payload.brightness},
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
void from_json(const nlohmann::json& j, UpdateCameraConfigPayload& payload)
|
void from_json(const nlohmann::json& j, UpdateCameraConfigPayload& payload)
|
||||||
|
|||||||
@@ -10,13 +10,14 @@ struct WifiPayload : BasePayload
|
|||||||
{
|
{
|
||||||
std::string name;
|
std::string name;
|
||||||
std::string ssid;
|
std::string ssid;
|
||||||
std::string bssid;
|
std::optional<std::string> bssid;
|
||||||
std::string password;
|
std::string password;
|
||||||
uint8_t channel;
|
uint8_t channel;
|
||||||
uint8_t power;
|
uint8_t power;
|
||||||
};
|
};
|
||||||
|
|
||||||
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(WifiPayload, name, ssid, bssid, password, channel, power)
|
void to_json(nlohmann::json& j, const WifiPayload& payload);
|
||||||
|
void from_json(const nlohmann::json& j, WifiPayload& payload);
|
||||||
|
|
||||||
struct UpdateWifiPayload : BasePayload
|
struct UpdateWifiPayload : BasePayload
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -19,14 +19,16 @@ CommandResult setWiFiCommand(std::shared_ptr<DependencyRegistry> registry, const
|
|||||||
return CommandResult::getErrorResult("Invalid payload: missing SSID");
|
return CommandResult::getErrorResult("Invalid payload: missing SSID");
|
||||||
}
|
}
|
||||||
|
|
||||||
auto bssid_len = payload.bssid.length();
|
// format is XX:XX:XX:XX:XX:XX
|
||||||
if (bssid_len > 0 && bssid_len != 11)
|
const std::string bssid = payload.bssid.has_value() ? payload.bssid.value() : "";
|
||||||
|
const auto bssid_len = bssid.length();
|
||||||
|
if (bssid_len > 0 && bssid_len != 17)
|
||||||
{
|
{
|
||||||
return CommandResult::getErrorResult("BSSID is malformed");
|
return CommandResult::getErrorResult("BSSID is malformed");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<ProjectConfig> projectConfig = registry->resolve<ProjectConfig>(DependencyType::project_config);
|
std::shared_ptr<ProjectConfig> projectConfig = registry->resolve<ProjectConfig>(DependencyType::project_config);
|
||||||
projectConfig->setWifiConfig(payload.name, payload.ssid, payload.bssid, payload.password, payload.channel, payload.power);
|
projectConfig->setWifiConfig(payload.name, payload.ssid, bssid, payload.password, payload.channel, payload.power);
|
||||||
|
|
||||||
return CommandResult::getSuccessResult("Config updated");
|
return CommandResult::getSuccessResult("Config updated");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
WIFI_SSID=
|
WIFI_SSID=
|
||||||
|
WIFI_BSSID=
|
||||||
WIFI_PASS=
|
WIFI_PASS=
|
||||||
SWITCH_MODE_REBOOT_TIME=5
|
SWITCH_MODE_REBOOT_TIME=5
|
||||||
WIFI_CONNECTION_TIMEOUT=5
|
WIFI_CONNECTION_TIMEOUT=5
|
||||||
|
|||||||
+7
-3
@@ -37,7 +37,8 @@ def pytest_addoption(parser):
|
|||||||
|
|
||||||
def pytest_configure(config):
|
def pytest_configure(config):
|
||||||
config.addinivalue_line(
|
config.addinivalue_line(
|
||||||
"markers", "has_capability(caps): skip if the board does not have the capability"
|
"markers",
|
||||||
|
"has_capability(caps): skip if the board does not have the capability",
|
||||||
)
|
)
|
||||||
config.addinivalue_line(
|
config.addinivalue_line(
|
||||||
"markers", "lacks_capability(caps): skip if the board DOES have the capability"
|
"markers", "lacks_capability(caps): skip if the board DOES have the capability"
|
||||||
@@ -60,7 +61,7 @@ def check_capability_marker(request, board_lacks_capability):
|
|||||||
"has_capability marker must be provided with a capability to check"
|
"has_capability marker must be provided with a capability to check"
|
||||||
)
|
)
|
||||||
|
|
||||||
for capability in marker.args:
|
for capability in marker.args:
|
||||||
if board_lacks_capability(capability):
|
if board_lacks_capability(capability):
|
||||||
pytest.skip(f"Board does not have capability {capability}")
|
pytest.skip(f"Board does not have capability {capability}")
|
||||||
|
|
||||||
@@ -72,7 +73,7 @@ def check_lacks_capability_marker(request, board_lacks_capability):
|
|||||||
raise ValueError(
|
raise ValueError(
|
||||||
"lacks_capability marker must be provided with a capability to check"
|
"lacks_capability marker must be provided with a capability to check"
|
||||||
)
|
)
|
||||||
|
|
||||||
for capability in lacks_capability_marker.args:
|
for capability in lacks_capability_marker.args:
|
||||||
if not board_lacks_capability(capability):
|
if not board_lacks_capability(capability):
|
||||||
pytest.skip(
|
pytest.skip(
|
||||||
@@ -119,6 +120,7 @@ def board_connection(request):
|
|||||||
@dataclass
|
@dataclass
|
||||||
class TestConfig:
|
class TestConfig:
|
||||||
WIFI_SSID: str
|
WIFI_SSID: str
|
||||||
|
WIFI_BSSID: str
|
||||||
WIFI_PASS: str
|
WIFI_PASS: str
|
||||||
SWITCH_MODE_REBOOT_TIME: int
|
SWITCH_MODE_REBOOT_TIME: int
|
||||||
WIFI_CONNECTION_TIMEOUT: int
|
WIFI_CONNECTION_TIMEOUT: int
|
||||||
@@ -127,12 +129,14 @@ class TestConfig:
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
WIFI_SSID: str,
|
WIFI_SSID: str,
|
||||||
|
WIFI_BSSID: str,
|
||||||
WIFI_PASS: str,
|
WIFI_PASS: str,
|
||||||
SWITCH_MODE_REBOOT_TIME: int,
|
SWITCH_MODE_REBOOT_TIME: int,
|
||||||
WIFI_CONNECTION_TIMEOUT: int,
|
WIFI_CONNECTION_TIMEOUT: int,
|
||||||
INVALID_WIFI_CONNECTION_TIMEOUT: int,
|
INVALID_WIFI_CONNECTION_TIMEOUT: int,
|
||||||
):
|
):
|
||||||
self.WIFI_SSID = WIFI_SSID
|
self.WIFI_SSID = WIFI_SSID
|
||||||
|
self.WIFI_BSSID = WIFI_BSSID
|
||||||
self.WIFI_PASS = WIFI_PASS
|
self.WIFI_PASS = WIFI_PASS
|
||||||
self.SWITCH_MODE_REBOOT_TIME = int(SWITCH_MODE_REBOOT_TIME)
|
self.SWITCH_MODE_REBOOT_TIME = int(SWITCH_MODE_REBOOT_TIME)
|
||||||
self.WIFI_CONNECTION_TIMEOUT = int(WIFI_CONNECTION_TIMEOUT)
|
self.WIFI_CONNECTION_TIMEOUT = int(WIFI_CONNECTION_TIMEOUT)
|
||||||
|
|||||||
+101
-2
@@ -255,8 +255,39 @@ def test_reset_config(get_openiris_device, config):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.has_capability("wireless")
|
@pytest.mark.has_capability("wireless")
|
||||||
def test_set_wifi(get_openiris_device, ensure_board_in_mode, config):
|
def test_set_wifi_no_bssid_in_payload(
|
||||||
# since we want to test actual connection to the network, let's reset the device and reboot it
|
get_openiris_device, ensure_board_in_mode, config
|
||||||
|
):
|
||||||
|
device = get_openiris_device()
|
||||||
|
reset_command = device.send_command("reset_config", {"section": "all"})
|
||||||
|
assert not has_command_failed(reset_command)
|
||||||
|
|
||||||
|
with DetectPortChange():
|
||||||
|
device.send_command("restart_device")
|
||||||
|
time.sleep(config.SWITCH_MODE_REBOOT_TIME)
|
||||||
|
|
||||||
|
device = ensure_board_in_mode("wifi", device)
|
||||||
|
params = {
|
||||||
|
"name": "main",
|
||||||
|
"ssid": config.WIFI_SSID,
|
||||||
|
"password": config.WIFI_PASS,
|
||||||
|
"channel": 0,
|
||||||
|
"power": 0,
|
||||||
|
}
|
||||||
|
set_wifi_result = device.send_command("set_wifi", params)
|
||||||
|
assert not has_command_failed(set_wifi_result)
|
||||||
|
|
||||||
|
connect_wifi_result = device.send_command("connect_wifi")
|
||||||
|
assert not -has_command_failed(connect_wifi_result)
|
||||||
|
time.sleep(config.WIFI_CONNECTION_TIMEOUT) # and let it try to for some time
|
||||||
|
|
||||||
|
wifi_status_command = device.send_command("get_wifi_status")
|
||||||
|
assert not has_command_failed(wifi_status_command)
|
||||||
|
assert wifi_status_command["results"][0]["result"]["data"]["status"] == "connected"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.has_capability("wireless")
|
||||||
|
def test_set_wifi_no_bssid(get_openiris_device, ensure_board_in_mode, config):
|
||||||
device = get_openiris_device()
|
device = get_openiris_device()
|
||||||
reset_command = device.send_command("reset_config", {"section": "all"})
|
reset_command = device.send_command("reset_config", {"section": "all"})
|
||||||
assert not has_command_failed(reset_command)
|
assert not has_command_failed(reset_command)
|
||||||
@@ -270,6 +301,7 @@ def test_set_wifi(get_openiris_device, ensure_board_in_mode, config):
|
|||||||
params = {
|
params = {
|
||||||
"name": "main",
|
"name": "main",
|
||||||
"ssid": config.WIFI_SSID,
|
"ssid": config.WIFI_SSID,
|
||||||
|
"bssid": "",
|
||||||
"password": config.WIFI_PASS,
|
"password": config.WIFI_PASS,
|
||||||
"channel": 0,
|
"channel": 0,
|
||||||
"power": 0,
|
"power": 0,
|
||||||
@@ -287,12 +319,76 @@ def test_set_wifi(get_openiris_device, ensure_board_in_mode, config):
|
|||||||
assert wifi_status_command["results"][0]["result"]["data"]["status"] == "connected"
|
assert wifi_status_command["results"][0]["result"]["data"]["status"] == "connected"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.has_capability("wireless")
|
||||||
|
def test_set_wifi_correct_bssid(get_openiris_device, ensure_board_in_mode, config):
|
||||||
|
device = get_openiris_device()
|
||||||
|
reset_command = device.send_command("reset_config", {"section": "all"})
|
||||||
|
assert not has_command_failed(reset_command)
|
||||||
|
|
||||||
|
with DetectPortChange():
|
||||||
|
device.send_command("restart_device")
|
||||||
|
time.sleep(config.SWITCH_MODE_REBOOT_TIME)
|
||||||
|
|
||||||
|
device = ensure_board_in_mode("wifi", device)
|
||||||
|
params = {
|
||||||
|
"name": "main",
|
||||||
|
"ssid": config.WIFI_SSID,
|
||||||
|
"bssid": config.WIFI_BSSID,
|
||||||
|
"password": config.WIFI_PASS,
|
||||||
|
"channel": 0,
|
||||||
|
"power": 0,
|
||||||
|
}
|
||||||
|
set_wifi_result = device.send_command("set_wifi", params)
|
||||||
|
assert not has_command_failed(set_wifi_result)
|
||||||
|
|
||||||
|
connect_wifi_result = device.send_command("connect_wifi")
|
||||||
|
assert not -has_command_failed(connect_wifi_result)
|
||||||
|
time.sleep(config.WIFI_CONNECTION_TIMEOUT)
|
||||||
|
|
||||||
|
wifi_status_command = device.send_command("get_wifi_status")
|
||||||
|
assert not has_command_failed(wifi_status_command)
|
||||||
|
assert wifi_status_command["results"][0]["result"]["data"]["status"] == "connected"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.has_capability("wireless")
|
||||||
|
def test_set_wifi_nonexitant_bssid(get_openiris_device, ensure_board_in_mode, config):
|
||||||
|
device = get_openiris_device()
|
||||||
|
reset_command = device.send_command("reset_config", {"section": "all"})
|
||||||
|
assert not has_command_failed(reset_command)
|
||||||
|
|
||||||
|
with DetectPortChange():
|
||||||
|
device.send_command("restart_device")
|
||||||
|
time.sleep(config.SWITCH_MODE_REBOOT_TIME)
|
||||||
|
|
||||||
|
device = ensure_board_in_mode("wifi", device)
|
||||||
|
params = {
|
||||||
|
"name": "main",
|
||||||
|
"ssid": config.WIFI_SSID,
|
||||||
|
"bssid": "99:99:99:99:99:99", # a completely wrong BSSID, just to test that we fail to connect
|
||||||
|
"password": config.WIFI_PASS,
|
||||||
|
"channel": 0,
|
||||||
|
"power": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
set_wifi_result = device.send_command("set_wifi", params)
|
||||||
|
assert not has_command_failed(set_wifi_result)
|
||||||
|
|
||||||
|
connect_wifi_result = device.send_command("connect_wifi")
|
||||||
|
assert not -has_command_failed(connect_wifi_result)
|
||||||
|
time.sleep(config.WIFI_CONNECTION_TIMEOUT)
|
||||||
|
|
||||||
|
wifi_status_command = device.send_command("get_wifi_status")
|
||||||
|
assert not has_command_failed(wifi_status_command)
|
||||||
|
assert wifi_status_command["results"][0]["result"]["data"]["status"] == "error"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.has_capability("wireless")
|
@pytest.mark.has_capability("wireless")
|
||||||
def test_set_wifi_invalid_network(get_openiris_device, ensure_board_in_mode, config):
|
def test_set_wifi_invalid_network(get_openiris_device, ensure_board_in_mode, config):
|
||||||
device = ensure_board_in_mode("wifi", get_openiris_device())
|
device = ensure_board_in_mode("wifi", get_openiris_device())
|
||||||
params = {
|
params = {
|
||||||
"name": "main",
|
"name": "main",
|
||||||
"ssid": "PleaseDontBeARealNetwork",
|
"ssid": "PleaseDontBeARealNetwork",
|
||||||
|
"bssid": "",
|
||||||
"password": "AndThePasswordIsFake",
|
"password": "AndThePasswordIsFake",
|
||||||
"channel": 0,
|
"channel": 0,
|
||||||
"power": 0,
|
"power": 0,
|
||||||
@@ -351,6 +447,7 @@ def test_update_main_wifi_network(ensure_board_in_mode, get_openiris_device, con
|
|||||||
params1 = {
|
params1 = {
|
||||||
"name": "main",
|
"name": "main",
|
||||||
"ssid": "Nada",
|
"ssid": "Nada",
|
||||||
|
"bssid": "",
|
||||||
"password": "Nuuh",
|
"password": "Nuuh",
|
||||||
"channel": 0,
|
"channel": 0,
|
||||||
"power": 0,
|
"power": 0,
|
||||||
@@ -377,6 +474,7 @@ def test_set_wifi_add_another_network(ensure_board_in_mode, get_openiris_device)
|
|||||||
params = {
|
params = {
|
||||||
"name": "anotherNetwork",
|
"name": "anotherNetwork",
|
||||||
"ssid": "PleaseDontBeARealNetwork",
|
"ssid": "PleaseDontBeARealNetwork",
|
||||||
|
"bssid": "",
|
||||||
"password": "AndThePassowrdIsFake",
|
"password": "AndThePassowrdIsFake",
|
||||||
"channel": 0,
|
"channel": 0,
|
||||||
"power": 0,
|
"power": 0,
|
||||||
@@ -475,6 +573,7 @@ def test_update_wifi_command(ensure_board_in_mode, get_openiris_device, payload)
|
|||||||
params = {
|
params = {
|
||||||
"name": "anotherNetwork",
|
"name": "anotherNetwork",
|
||||||
"ssid": "PleaseDontBeARealNetwork",
|
"ssid": "PleaseDontBeARealNetwork",
|
||||||
|
"bssid": "",
|
||||||
"password": "AndThePasswordIsFake",
|
"password": "AndThePasswordIsFake",
|
||||||
"channel": 0,
|
"channel": 0,
|
||||||
"power": 0,
|
"power": 0,
|
||||||
|
|||||||
+25
-18
@@ -82,6 +82,7 @@ class Menu(SubMenu):
|
|||||||
@dataclass
|
@dataclass
|
||||||
class WiFiNetwork:
|
class WiFiNetwork:
|
||||||
ssid: str
|
ssid: str
|
||||||
|
bssid: str
|
||||||
channel: int
|
channel: int
|
||||||
rssi: int
|
rssi: int
|
||||||
mac_address: str
|
mac_address: str
|
||||||
@@ -119,7 +120,7 @@ class WiFiScanner:
|
|||||||
"scan_networks", params={"timeout_ms": timeout_ms}, timeout=timeout
|
"scan_networks", params={"timeout_ms": timeout_ms}, timeout=timeout
|
||||||
)
|
)
|
||||||
if has_command_failed(response):
|
if has_command_failed(response):
|
||||||
print(f"❌ Scan failed: {response['error']}")
|
print(f"❌ Scan failed: {get_response_error(response)}")
|
||||||
return
|
return
|
||||||
|
|
||||||
channels_found = set()
|
channels_found = set()
|
||||||
@@ -130,6 +131,7 @@ class WiFiScanner:
|
|||||||
for net in networks:
|
for net in networks:
|
||||||
network = WiFiNetwork(
|
network = WiFiNetwork(
|
||||||
ssid=net["ssid"],
|
ssid=net["ssid"],
|
||||||
|
bssid=net["mac_address"],
|
||||||
channel=net["channel"],
|
channel=net["channel"],
|
||||||
rssi=net["rssi"],
|
rssi=net["rssi"],
|
||||||
mac_address=net["mac_address"],
|
mac_address=net["mac_address"],
|
||||||
@@ -144,7 +146,7 @@ class WiFiScanner:
|
|||||||
f"✅ Found {len(self.networks)} networks on channels: {sorted(channels_found)}"
|
f"✅ Found {len(self.networks)} networks on channels: {sorted(channels_found)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_networks(self) -> list:
|
def get_networks(self) -> list[WiFiNetwork]:
|
||||||
return self.networks
|
return self.networks
|
||||||
|
|
||||||
|
|
||||||
@@ -152,6 +154,10 @@ def has_command_failed(result) -> bool:
|
|||||||
return "error" in result or result["results"][0]["result"]["status"] != "success"
|
return "error" in result or result["results"][0]["result"]["status"] != "success"
|
||||||
|
|
||||||
|
|
||||||
|
def get_response_error(result) -> str:
|
||||||
|
return result["results"][0]["result"]["data"]
|
||||||
|
|
||||||
|
|
||||||
def get_device_mode(device: OpenIrisDevice) -> dict:
|
def get_device_mode(device: OpenIrisDevice) -> dict:
|
||||||
command_result = device.send_command("get_device_mode")
|
command_result = device.send_command("get_device_mode")
|
||||||
if has_command_failed(command_result):
|
if has_command_failed(command_result):
|
||||||
@@ -181,7 +187,7 @@ def get_led_duty_cycle(device: OpenIrisDevice) -> dict:
|
|||||||
def get_mdns_name(device: OpenIrisDevice) -> dict:
|
def get_mdns_name(device: OpenIrisDevice) -> dict:
|
||||||
response = device.send_command("get_mdns_name")
|
response = device.send_command("get_mdns_name")
|
||||||
if "error" in response:
|
if "error" in response:
|
||||||
print(f"❌ Failed to get device name: {response['error']}")
|
print(f"❌ Failed to get device name: {get_response_error(response)}")
|
||||||
return {"name": "unknown"}
|
return {"name": "unknown"}
|
||||||
|
|
||||||
return {"name": response["results"][0]["result"]["data"]["hostname"]}
|
return {"name": response["results"][0]["result"]["data"]["hostname"]}
|
||||||
@@ -190,7 +196,7 @@ def get_mdns_name(device: OpenIrisDevice) -> dict:
|
|||||||
def get_serial_info(device: OpenIrisDevice) -> dict:
|
def get_serial_info(device: OpenIrisDevice) -> dict:
|
||||||
response = device.send_command("get_serial")
|
response = device.send_command("get_serial")
|
||||||
if has_command_failed(response):
|
if has_command_failed(response):
|
||||||
print(f"❌ Failed to get serial/MAC: {response['error']}")
|
print(f"❌ Failed to get serial/MAC: {get_response_error(response)}")
|
||||||
return {"serial": None, "mac": None}
|
return {"serial": None, "mac": None}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
@@ -203,7 +209,7 @@ def get_device_info(device: OpenIrisDevice) -> dict:
|
|||||||
response = device.send_command("get_who_am_i")
|
response = device.send_command("get_who_am_i")
|
||||||
|
|
||||||
if has_command_failed(response):
|
if has_command_failed(response):
|
||||||
print(f"❌ Failed to get device info: {response['error']}")
|
print(f"❌ Failed to get device info: {get_response_error(response)}")
|
||||||
return {"who_am_i": None, "version": None}
|
return {"who_am_i": None, "version": None}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
@@ -215,7 +221,7 @@ def get_device_info(device: OpenIrisDevice) -> dict:
|
|||||||
def get_wifi_status(device: OpenIrisDevice) -> dict:
|
def get_wifi_status(device: OpenIrisDevice) -> dict:
|
||||||
response = device.send_command("get_wifi_status")
|
response = device.send_command("get_wifi_status")
|
||||||
if has_command_failed(response):
|
if has_command_failed(response):
|
||||||
print(f"❌ Failed to get wifi status: {response['error']}")
|
print(f"❌ Failed to get wifi status: {get_response_error(response)}")
|
||||||
return {"wifi_status": {"status": "unknown"}}
|
return {"wifi_status": {"status": "unknown"}}
|
||||||
|
|
||||||
return {"wifi_status": response["results"][0]["result"]["data"]}
|
return {"wifi_status": response["results"][0]["result"]["data"]}
|
||||||
@@ -267,7 +273,7 @@ def configure_device_name(device: OpenIrisDevice, *args, **kwargs):
|
|||||||
|
|
||||||
response = device.send_command("set_mdns", {"hostname": name_choice})
|
response = device.send_command("set_mdns", {"hostname": name_choice})
|
||||||
if "error" in response:
|
if "error" in response:
|
||||||
print(f"❌ MDNS name setup failed: {response['error']}")
|
print(f"❌ MDNS name setup failed: {get_response_error(response)}")
|
||||||
return
|
return
|
||||||
|
|
||||||
print("✅ MDNS name set successfully")
|
print("✅ MDNS name set successfully")
|
||||||
@@ -278,7 +284,7 @@ def start_streaming(device: OpenIrisDevice, *args, **kwargs):
|
|||||||
response = device.send_command("start_streaming")
|
response = device.send_command("start_streaming")
|
||||||
|
|
||||||
if "error" in response:
|
if "error" in response:
|
||||||
print(f"❌ Failed to start streaming: {response['error']}")
|
print(f"❌ Failed to start streaming: {get_response_error(response)}")
|
||||||
return
|
return
|
||||||
|
|
||||||
print("✅ Streaming mode started")
|
print("✅ Streaming mode started")
|
||||||
@@ -336,7 +342,7 @@ def set_led_duty_cycle(device: OpenIrisDevice, *args, **kwargs):
|
|||||||
"set_led_duty_cycle", {"dutyCycle": duty_cycle}
|
"set_led_duty_cycle", {"dutyCycle": duty_cycle}
|
||||||
)
|
)
|
||||||
if has_command_failed(response):
|
if has_command_failed(response):
|
||||||
print(f"❌ Failed to set LED duty cycle: {response['error']}")
|
print(f"❌ Failed to set LED duty cycle: {get_response_error(response)}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
print("✅ LED duty cycle set successfully")
|
print("✅ LED duty cycle set successfully")
|
||||||
@@ -400,7 +406,7 @@ def restart_device_command(device: OpenIrisDevice, *args, **kwargs):
|
|||||||
print("🔄 Restarting device...")
|
print("🔄 Restarting device...")
|
||||||
response = device.send_command("restart_device")
|
response = device.send_command("restart_device")
|
||||||
if has_command_failed(response):
|
if has_command_failed(response):
|
||||||
print(f"❌ Failed to restart device: {response['error']}")
|
print(f"❌ Failed to restart device: {get_response_error(response)}")
|
||||||
return
|
return
|
||||||
|
|
||||||
print("✅ Device restart command sent successfully")
|
print("✅ Device restart command sent successfully")
|
||||||
@@ -439,9 +445,9 @@ def display_networks(wifi_scanner: WiFiScanner, *args, **kwargs):
|
|||||||
return
|
return
|
||||||
|
|
||||||
print("\n📡 Available WiFi Networks:")
|
print("\n📡 Available WiFi Networks:")
|
||||||
print("-" * 85)
|
print("-" * 110)
|
||||||
print(f"{'#':<3} {'SSID':<32} {'Channel':<8} {'Signal':<20} {'Security':<15}")
|
print(f"{'#':<3} {'SSID':<32} {'Channel':<8} {'Signal':<20} {'BSSID':<22} {'Security':<15}")
|
||||||
print("-" * 85)
|
print("-" * 110)
|
||||||
|
|
||||||
channels = {}
|
channels = {}
|
||||||
for idx, network in enumerate(networks, 1):
|
for idx, network in enumerate(networks, 1):
|
||||||
@@ -452,10 +458,10 @@ def display_networks(wifi_scanner: WiFiScanner, *args, **kwargs):
|
|||||||
ssid_display = network.ssid if network.ssid else "<hidden>"
|
ssid_display = network.ssid if network.ssid else "<hidden>"
|
||||||
|
|
||||||
print(
|
print(
|
||||||
f"{idx:<3} {ssid_display:<32} {network.channel:<8} {signal_str:<20} {network.security_type:<15}"
|
f"{idx:<3} {ssid_display:<32} {network.channel:<8} {signal_str:<20} {network.bssid:<22} {network.security_type:<15}"
|
||||||
)
|
)
|
||||||
|
|
||||||
print("-" * 85)
|
print("-" * 110)
|
||||||
|
|
||||||
print("\n📊 Channel distribution: ", end="")
|
print("\n📊 Channel distribution: ", end="")
|
||||||
for channel in sorted(channels.keys()):
|
for channel in sorted(channels.keys()):
|
||||||
@@ -483,7 +489,7 @@ def configure_wifi(device: OpenIrisDevice, wifi_scanner: WiFiScanner, *args, **k
|
|||||||
|
|
||||||
sorted_networks = wifi_scanner.get_networks()
|
sorted_networks = wifi_scanner.get_networks()
|
||||||
if 0 <= net_idx < len(sorted_networks):
|
if 0 <= net_idx < len(sorted_networks):
|
||||||
selected_network = sorted_networks[net_idx]
|
selected_network = sorted_networks[net_idx]
|
||||||
ssid = selected_network.ssid
|
ssid = selected_network.ssid
|
||||||
|
|
||||||
print(f"\n🔐 Selected: {ssid}")
|
print(f"\n🔐 Selected: {ssid}")
|
||||||
@@ -502,6 +508,7 @@ def configure_wifi(device: OpenIrisDevice, wifi_scanner: WiFiScanner, *args, **k
|
|||||||
params = {
|
params = {
|
||||||
"name": "main",
|
"name": "main",
|
||||||
"ssid": ssid,
|
"ssid": ssid,
|
||||||
|
"bssid": selected_network.bssid,
|
||||||
"password": password,
|
"password": password,
|
||||||
"channel": 0,
|
"channel": 0,
|
||||||
"power": 0,
|
"power": 0,
|
||||||
@@ -509,7 +516,7 @@ def configure_wifi(device: OpenIrisDevice, wifi_scanner: WiFiScanner, *args, **k
|
|||||||
|
|
||||||
response = device.send_command("set_wifi", params)
|
response = device.send_command("set_wifi", params)
|
||||||
if has_command_failed(response):
|
if has_command_failed(response):
|
||||||
print(f"❌ WiFi setup failed: {response['error']}")
|
print(f"❌ WiFi setup failed: {get_response_error(response)}")
|
||||||
break
|
break
|
||||||
|
|
||||||
print("✅ WiFi configured successfully!")
|
print("✅ WiFi configured successfully!")
|
||||||
@@ -557,7 +564,7 @@ def attempt_wifi_connection(device: OpenIrisDevice, *args, **kwargs):
|
|||||||
print("🔗 Attempting WiFi connection...")
|
print("🔗 Attempting WiFi connection...")
|
||||||
response = device.send_command("connect_wifi")
|
response = device.send_command("connect_wifi")
|
||||||
if has_command_failed(response):
|
if has_command_failed(response):
|
||||||
print(f"❌ WiFi connection failed: {response['error']}")
|
print(f"❌ WiFi connection failed: {get_response_error(response)}")
|
||||||
return
|
return
|
||||||
|
|
||||||
print("✅ WiFi connection attempt started")
|
print("✅ WiFi connection attempt started")
|
||||||
|
|||||||
Reference in New Issue
Block a user