mirror of
https://github.com/MrUnknownDE/OpenIris-ESPIDF.git
synced 2026-04-17 05:23:44 +02:00
Implement WiFi setup, scanning, auto setup and fix smaller bugs
This commit is contained in:
@@ -11,6 +11,7 @@ import argparse
|
||||
import sys
|
||||
import serial
|
||||
import string
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
def is_back(choice: str):
|
||||
@@ -194,6 +195,70 @@ class OpenIrisDevice:
|
||||
return {"error": f"Communication error: {e}"}
|
||||
|
||||
|
||||
@dataclass
|
||||
class WiFiNetwork:
|
||||
ssid: str
|
||||
channel: int
|
||||
rssi: int
|
||||
mac_address: str
|
||||
auth_mode: int
|
||||
|
||||
@property
|
||||
def security_type(self) -> str:
|
||||
"""Convert auth_mode to human readable string"""
|
||||
auth_modes = {
|
||||
0: "Open",
|
||||
1: "WEP",
|
||||
2: "WPA PSK",
|
||||
3: "WPA2 PSK",
|
||||
4: "WPA WPA2 PSK",
|
||||
5: "WPA2 Enterprise",
|
||||
6: "WPA3 PSK",
|
||||
7: "WPA2 WPA3 PSK",
|
||||
}
|
||||
return auth_modes.get(self.auth_mode, f"Unknown ({self.auth_mode})")
|
||||
|
||||
|
||||
class WiFiScanner:
|
||||
def __init__(self, device: OpenIrisDevice):
|
||||
self.device = device
|
||||
self.networks = []
|
||||
|
||||
def scan_networks(self, timeout: int = 30):
|
||||
print(
|
||||
f"🔍 Scanning for WiFi networks (this may take up to {timeout} seconds)..."
|
||||
)
|
||||
response = self.device.send_command("scan_networks", timeout=timeout)
|
||||
if has_command_failed(response):
|
||||
print(f"❌ Scan failed: {response['error']}")
|
||||
return
|
||||
|
||||
channels_found = set()
|
||||
networks = response["results"][0]["result"]["data"]["networks"]
|
||||
|
||||
# after each scan, clear the network list to avoid duplication
|
||||
self.networks = []
|
||||
for net in networks:
|
||||
network = WiFiNetwork(
|
||||
ssid=net["ssid"],
|
||||
channel=net["channel"],
|
||||
rssi=net["rssi"],
|
||||
mac_address=net["mac_address"],
|
||||
auth_mode=net["auth_mode"],
|
||||
)
|
||||
self.networks.append(network)
|
||||
channels_found.add(net["channel"])
|
||||
|
||||
# Sort networks by RSSI (strongest first)
|
||||
self.networks.sort(key=lambda x: x.rssi, reverse=True)
|
||||
print(
|
||||
f"✅ Found {len(self.networks)} networks on channels: {sorted(channels_found)}"
|
||||
)
|
||||
|
||||
def get_networks(self) -> list:
|
||||
return self.networks
|
||||
|
||||
|
||||
def has_command_failed(result) -> bool:
|
||||
return "error" in result or result["results"][0]["result"]["status"] != "success"
|
||||
|
||||
@@ -249,7 +314,7 @@ def get_wifi_status(device: OpenIrisDevice) -> dict:
|
||||
response = device.send_command("get_wifi_status")
|
||||
if has_command_failed(response):
|
||||
print(f"❌ Failed to get wifi status: {response['error']}")
|
||||
return {"wifi_status": "unknown"}
|
||||
return {"wifi_status": {"status": "unknown"}}
|
||||
|
||||
return {"wifi_status": response["results"][0]["result"]["data"]}
|
||||
|
||||
@@ -370,7 +435,7 @@ def get_settings_summary(device: OpenIrisDevice, *args, **kwargs):
|
||||
|
||||
print(f"🔑 Serial: {summary['Identity']}")
|
||||
print(f"💡 LED PWM Duty: {summary['LED']['duty_cycle']}%")
|
||||
print(f"🎚️ Mode: {summary['Mode']['mode']}")
|
||||
print(f"🎚️ Mode: {summary['Mode']['mode']}")
|
||||
|
||||
wifi = summary.get("WiFi", {}).get("wifi_status", {})
|
||||
status = wifi.get("status", "unknown")
|
||||
@@ -379,22 +444,184 @@ def get_settings_summary(device: OpenIrisDevice, *args, **kwargs):
|
||||
print(f"📶 WiFi: {status} | IP: {ip} | Networks configured: {configured}")
|
||||
|
||||
|
||||
def scan_networks(wifi_scanner: WiFiScanner, *args, **kwargs):
|
||||
use_custom_timeout = (
|
||||
input("Should we use a custom scan timeout? (y/n)\n>> ").strip().lower() == "y"
|
||||
)
|
||||
if use_custom_timeout:
|
||||
timeout = input("Enter timeout in seconds (5-120) or back to go back\n>> ")
|
||||
if is_back(timeout):
|
||||
return
|
||||
|
||||
try:
|
||||
timeout = int(timeout)
|
||||
if 5 <= timeout <= 120:
|
||||
print(
|
||||
f"🔍 Scanning for WiFi networks (this may take up to {timeout} seconds)..."
|
||||
)
|
||||
wifi_scanner.scan_networks(timeout)
|
||||
else:
|
||||
print("❌ Timeout must be between 5 and 120 seconds, using default")
|
||||
wifi_scanner.scan_networks()
|
||||
except ValueError:
|
||||
print("❌ Invalid timeout")
|
||||
else:
|
||||
wifi_scanner.scan_networks()
|
||||
|
||||
|
||||
def display_networks(wifi_scanner: WiFiScanner, *args, **kwargs):
|
||||
networks = wifi_scanner.get_networks()
|
||||
if not networks:
|
||||
print("❌ No networks found, please scan first")
|
||||
return
|
||||
|
||||
print("\n📡 Available WiFi Networks:")
|
||||
print("-" * 85)
|
||||
print(f"{'#':<3} {'SSID':<32} {'Channel':<8} {'Signal':<20} {'Security':<15}")
|
||||
print("-" * 85)
|
||||
|
||||
channels = {}
|
||||
for idx, network in enumerate(networks, 1):
|
||||
channels[network.channel] = channels.get(network.channel, 0) + 1
|
||||
|
||||
signal_bars = "▓" * min(5, max(0, (network.rssi + 100) // 10))
|
||||
signal_str = f"{signal_bars:<5} ({network.rssi} dBm)"
|
||||
ssid_display = network.ssid if network.ssid else "<hidden>"
|
||||
|
||||
print(
|
||||
f"{idx:<3} {ssid_display:<32} {network.channel:<8} {signal_str:<20} {network.security_type:<15}"
|
||||
)
|
||||
|
||||
print("-" * 85)
|
||||
|
||||
print("\n📊 Channel distribution: ", end="")
|
||||
for channel in sorted(channels.keys()):
|
||||
print(f"Ch{channel}: {channels[channel]} networks ", end="")
|
||||
|
||||
|
||||
def configure_wifi(device: OpenIrisDevice, wifi_scanner: WiFiScanner, *args, **kwargs):
|
||||
networks = wifi_scanner.get_networks()
|
||||
if not networks:
|
||||
print("❌ No networks available. Please scan first.")
|
||||
return
|
||||
|
||||
display_networks(wifi_scanner, *args, **kwargs)
|
||||
|
||||
while True:
|
||||
net_choice = input("\nEnter network number (or 'back'): ").strip()
|
||||
if is_back(net_choice):
|
||||
break
|
||||
|
||||
try:
|
||||
net_idx = int(net_choice) - 1
|
||||
except ValueError:
|
||||
print("❌ Please enter a number or 'back'")
|
||||
continue
|
||||
|
||||
sorted_networks = wifi_scanner.get_networks()
|
||||
if 0 <= net_idx < len(sorted_networks):
|
||||
selected_network = sorted_networks[net_idx]
|
||||
ssid = selected_network.ssid
|
||||
|
||||
print(f"\n🔐 Selected: {ssid}")
|
||||
print(f"Security: {selected_network.security_type}")
|
||||
|
||||
if selected_network.auth_mode == 0: # Open network
|
||||
password = ""
|
||||
print("🔓 Open network - no password required")
|
||||
else:
|
||||
password = input("Enter WiFi password (or 'back'): ")
|
||||
if is_back(password):
|
||||
break
|
||||
|
||||
print(f"🔧 Setting WiFi credentials for '{ssid}'...")
|
||||
|
||||
params = {
|
||||
"name": "main",
|
||||
"ssid": ssid,
|
||||
"password": password,
|
||||
"channel": 0,
|
||||
"power": 0,
|
||||
}
|
||||
|
||||
response = device.send_command("set_wifi", params)
|
||||
if has_command_failed(response):
|
||||
print(f"❌ WiFi setup failed: {response['error']}")
|
||||
break
|
||||
|
||||
print("✅ WiFi configured successfully!")
|
||||
print("💡 Next steps:")
|
||||
print(" • Open WiFi menu to connect to WiFi (if needed)")
|
||||
print(" • Open WiFi menu to check WiFi status")
|
||||
print(" • Start streaming from the main menu when connected")
|
||||
break
|
||||
else:
|
||||
print("❌ Invalid network number")
|
||||
|
||||
|
||||
def automatic_wifi_configuration(
|
||||
device: OpenIrisDevice, wifi_scanner: WiFiScanner, *args, **kwargs
|
||||
):
|
||||
print("\n⚙️ Automatic WiFi setup starting...")
|
||||
scan_networks(wifi_scanner, *args, **kwargs)
|
||||
configure_wifi(device, wifi_scanner, *args, **kwargs)
|
||||
attempt_wifi_connection(device)
|
||||
|
||||
print("⏳ Connecting to WiFi, waiting for IP...")
|
||||
start = time.time()
|
||||
timeout_s = 30
|
||||
ip = None
|
||||
last_status = None
|
||||
while time.time() - start < timeout_s:
|
||||
status = get_wifi_status(device).get("wifi_status", {})
|
||||
last_status = status
|
||||
ip = status.get("ip_address")
|
||||
if ip and ip not in ("0.0.0.0", "", None):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
if ip and ip not in ("0.0.0.0", "", None):
|
||||
print(f"✅ Connected! IP Address: {ip}")
|
||||
else:
|
||||
print("⚠️ Connection not confirmed within timeout")
|
||||
if last_status:
|
||||
print(
|
||||
f" Status: {last_status.get('status', 'unknown')} | IP: {last_status.get('ip_address', '-')}"
|
||||
)
|
||||
|
||||
|
||||
def attempt_wifi_connection(device: OpenIrisDevice, *args, **kwargs):
|
||||
print("🔗 Attempting WiFi connection...")
|
||||
response = device.send_command("connect_wifi")
|
||||
if has_command_failed(response):
|
||||
print(f"❌ WiFi connection failed: {response['error']}")
|
||||
return
|
||||
|
||||
print("✅ WiFi connection attempt started")
|
||||
|
||||
|
||||
def check_wifi_status(device: OpenIrisDevice, *args, **kwargs):
|
||||
status = get_wifi_status(device).get("wifi_status")
|
||||
print(f"📶 WiFi Status: {status.get('status', 'Unknown')}")
|
||||
if ip_address := status.get("ip_address"):
|
||||
print(f"🌐 IP Address: {ip_address}")
|
||||
|
||||
|
||||
def handle_menu(menu_context: dict | None = None) -> str:
|
||||
menu = Menu("OpenIris Setup", menu_context)
|
||||
wifi_settings = SubMenu("📶 WiFi settings", menu_context, menu)
|
||||
wifi_settings.add_action("⚙️ Automatic WiFi setup", lambda: None)
|
||||
wifi_settings.add_action("⚙️ Automatic WiFi setup", automatic_wifi_configuration)
|
||||
manual_wifi_actions = SubMenu(
|
||||
"📁 WiFi Manual Actions:",
|
||||
menu_context,
|
||||
wifi_settings,
|
||||
)
|
||||
|
||||
# simple commands can just be functions, they will get passed some context to them by the menu
|
||||
manual_wifi_actions.add_action("🔍 Scan for WiFi networks", lambda: None)
|
||||
manual_wifi_actions.add_action("📡 Show available networks", lambda: None)
|
||||
manual_wifi_actions.add_action("🔐 Configure WiFi", lambda: None)
|
||||
manual_wifi_actions.add_action("🔗 Connect to WiFi", lambda: None)
|
||||
manual_wifi_actions.add_action("🛰️ Check WiFi status", lambda: None)
|
||||
manual_wifi_actions.add_action("🔍 Scan for WiFi networks", scan_networks)
|
||||
manual_wifi_actions.add_action("📡 Show available networks", display_networks)
|
||||
manual_wifi_actions.add_action("🔐 Configure WiFi", configure_wifi)
|
||||
manual_wifi_actions.add_action("🔗 Connect to WiFi", attempt_wifi_connection)
|
||||
manual_wifi_actions.add_action("🛰️ Check WiFi status", check_wifi_status)
|
||||
|
||||
menu.add_action("🌐 Configure MDNS", configure_device_name)
|
||||
menu.add_action("💻 Configure UVC Name", configure_device_name)
|
||||
@@ -438,8 +665,9 @@ def main():
|
||||
if not device.is_connected():
|
||||
return 1
|
||||
|
||||
wifi_scanner = WiFiScanner(device)
|
||||
try:
|
||||
handle_menu({"device": device, "args": args})
|
||||
handle_menu({"device": device, "args": args, "wifi_scanner": wifi_scanner})
|
||||
except KeyboardInterrupt:
|
||||
print("\n🛑 Setup interrupted, disconnecting")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user