mirror of
https://github.com/MrUnknownDE/OpenIris-ESPIDF.git
synced 2026-04-10 02:13:44 +02:00
1243 lines
47 KiB
Python
1243 lines
47 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
OpenIris Setup CLI Tool
|
||
|
||
This tool automatically discovers OpenIris devices via heartbeat,
|
||
allows WiFi configuration, and monitors the device logs.
|
||
"""
|
||
|
||
import re
|
||
import json
|
||
import time
|
||
import threading
|
||
import argparse
|
||
import sys
|
||
import string
|
||
from typing import Dict, List, Optional, Tuple
|
||
import serial
|
||
import serial.tools.list_ports
|
||
from dataclasses import dataclass
|
||
|
||
|
||
@dataclass
|
||
class WiFiNetwork:
|
||
ssid: str
|
||
channel: int
|
||
rssi: int
|
||
mac_address: str
|
||
auth_mode: int
|
||
|
||
@property
|
||
def security_type(self) -> str:
|
||
"""Convert auth_mode to human readable string"""
|
||
auth_modes = {
|
||
0: "Open",
|
||
1: "WEP",
|
||
2: "WPA PSK",
|
||
3: "WPA2 PSK",
|
||
4: "WPA WPA2 PSK",
|
||
5: "WPA2 Enterprise",
|
||
6: "WPA3 PSK",
|
||
7: "WPA2 WPA3 PSK"
|
||
}
|
||
return auth_modes.get(self.auth_mode, f"Unknown ({self.auth_mode})")
|
||
|
||
|
||
class OpenIrisDevice:
|
||
def __init__(self, port: str, serial_number: str, debug: bool = False):
|
||
|
||
self.port = port
|
||
self.serial_number = serial_number
|
||
self.connection: Optional[serial.Serial] = None
|
||
self.networks: List[WiFiNetwork] = []
|
||
self.debug = debug
|
||
|
||
def connect(self) -> bool:
|
||
"""Connect to the device"""
|
||
try:
|
||
self.connection = serial.Serial(
|
||
port=self.port,
|
||
baudrate=115200,
|
||
timeout=1,
|
||
write_timeout=1
|
||
)
|
||
print(f"✅ Connected to device {self.serial_number} on {self.port}")
|
||
|
||
# Immediately send pause command to keep device in setup mode
|
||
print("⏸️ Pausing device startup...")
|
||
# Use shorter timeout for pause command since device is just starting up
|
||
pause_response = self.send_command("pause", {"pause": True}, timeout=5)
|
||
if "error" not in pause_response and pause_response.get("results"):
|
||
print("✅ Device paused in setup mode")
|
||
elif "error" in pause_response and pause_response["error"] == "Command timeout":
|
||
# Even if we timeout, the command likely worked (as seen in logs)
|
||
print("✅ Device pause command sent (startup logs may have obscured response)")
|
||
else:
|
||
print(f"⚠️ Pause status uncertain: {pause_response}")
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"❌ Failed to connect to {self.port}: {e}")
|
||
return False
|
||
|
||
def disconnect(self):
|
||
"""Disconnect from the device"""
|
||
if self.connection and self.connection.is_open:
|
||
# Optionally unpause the device before disconnecting
|
||
print("Resuming device startup...")
|
||
self.send_command("pause", {"pause": False})
|
||
|
||
self.connection.close()
|
||
print(f"🔌 Disconnected from {self.port}")
|
||
|
||
def send_command(self, command: str, params: Dict = None, timeout: int = None) -> Dict:
|
||
"""Send a command to the device and wait for response"""
|
||
if not self.connection or not self.connection.is_open:
|
||
return {"error": "Not connected"}
|
||
|
||
cmd_obj = {"commands": [{"command": command}]}
|
||
if params:
|
||
cmd_obj["commands"][0]["data"] = params
|
||
|
||
cmd_json = json.dumps(cmd_obj) + '\n'
|
||
try:
|
||
# Clear any pending data
|
||
self.connection.reset_input_buffer()
|
||
|
||
# Send command
|
||
print(f"📤 Sending: {cmd_json.strip()}")
|
||
self.connection.write(cmd_json.encode())
|
||
|
||
# For scan_networks command, handle special case
|
||
if command == "scan_networks":
|
||
# Use provided timeout or default to 30 seconds for scan
|
||
scan_timeout = timeout if timeout is not None else 30
|
||
return self._handle_scan_response(scan_timeout)
|
||
|
||
# Wait for response (skip heartbeats and logs)
|
||
start_time = time.time()
|
||
response_buffer = ""
|
||
|
||
# Use provided timeout or default to 15 seconds
|
||
cmd_timeout = timeout if timeout is not None else 15
|
||
while time.time() - start_time < cmd_timeout:
|
||
try:
|
||
if self.connection.in_waiting:
|
||
data = self.connection.read(self.connection.in_waiting).decode('utf-8', errors='ignore')
|
||
response_buffer += data
|
||
|
||
# Show raw data for debugging
|
||
if self.debug and data.strip():
|
||
print(f"📡 Raw: {repr(data)}")
|
||
print(f"📝 Buffer: {repr(response_buffer[-200:])}")
|
||
|
||
# Remove ANSI escape sequences
|
||
clean_buffer = re.sub(r'\x1b\[[0-9;]*m', '', response_buffer)
|
||
clean_buffer = clean_buffer.replace('\r', '')
|
||
|
||
# Look for JSON objects - handle both single-line and multi-line
|
||
# Try to find complete JSON objects
|
||
start_idx = clean_buffer.find('{')
|
||
while start_idx >= 0:
|
||
# Count braces to find complete JSON
|
||
brace_count = 0
|
||
end_idx = -1
|
||
|
||
for i in range(start_idx, len(clean_buffer)):
|
||
if clean_buffer[i] == '{':
|
||
brace_count += 1
|
||
elif clean_buffer[i] == '}':
|
||
brace_count -= 1
|
||
if brace_count == 0:
|
||
end_idx = i + 1
|
||
break
|
||
|
||
if end_idx > start_idx:
|
||
json_str = clean_buffer[start_idx:end_idx]
|
||
|
||
# Try to parse any complete JSON object
|
||
try:
|
||
# Clean up the JSON
|
||
clean_json = json_str.replace('\t', ' ').replace('\n', ' ').replace('\r', '')
|
||
clean_json = re.sub(r'\s+', ' ', clean_json)
|
||
|
||
response = json.loads(clean_json)
|
||
|
||
# Return if this is a command response with results
|
||
if "results" in response:
|
||
return response
|
||
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# Look for next JSON object
|
||
start_idx = clean_buffer.find('{', end_idx)
|
||
else:
|
||
# No complete JSON found yet
|
||
break
|
||
else:
|
||
time.sleep(0.1)
|
||
except Exception as e:
|
||
print(f"⚠️ Exception: {e}")
|
||
continue
|
||
|
||
return {"error": "Command timeout"}
|
||
except Exception as e:
|
||
return {"error": f"Communication error: {e}"}
|
||
|
||
def _handle_scan_response(self, timeout: int = 30) -> Dict:
|
||
"""Handle scan_networks command response which outputs raw JSON first"""
|
||
start_time = time.time()
|
||
response_buffer = ""
|
||
|
||
while time.time() - start_time < timeout: # Configurable timeout for scan
|
||
if self.connection.in_waiting:
|
||
data = self.connection.read(self.connection.in_waiting).decode('utf-8', errors='ignore')
|
||
response_buffer += data
|
||
|
||
# Look for WiFi networks JSON directly (new format)
|
||
# The scan command now outputs JSON directly followed by command result
|
||
if '{"networks":[' in response_buffer:
|
||
import re
|
||
|
||
# Look for the networks JSON pattern that appears first
|
||
networks_pattern = r'\{"networks":\[.*?\]\}'
|
||
matches = re.findall(networks_pattern, response_buffer, re.DOTALL)
|
||
|
||
for match in matches:
|
||
try:
|
||
# Parse the networks JSON directly
|
||
networks_data = json.loads(match)
|
||
if "networks" in networks_data:
|
||
# Return in the expected format for compatibility
|
||
return {"results": [json.dumps({"result": match})]}
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
# Also check if we have the command result indicating completion
|
||
if '{"results":' in response_buffer and '"Networks scanned"' in response_buffer:
|
||
# We've received the completion message, parse any networks found
|
||
import re
|
||
networks_pattern = r'\{"networks":\[.*?\]\}'
|
||
matches = re.findall(networks_pattern, response_buffer, re.DOTALL)
|
||
|
||
for match in matches:
|
||
try:
|
||
networks_data = json.loads(match)
|
||
if "networks" in networks_data:
|
||
return {"results": [json.dumps({"result": match})]}
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
# If we get here, scan completed but no networks found
|
||
return {"results": [json.dumps({"result": '{"networks":[]}'})]}
|
||
else:
|
||
time.sleep(0.1)
|
||
|
||
return {"error": "Scan timeout"}
|
||
|
||
def scan_networks(self, timeout: int = 30) -> bool:
|
||
"""Scan for WiFi networks"""
|
||
print(f"🔍 Scanning for WiFi networks (this may take up to {timeout} seconds)...")
|
||
response = self.send_command("scan_networks", timeout=timeout)
|
||
|
||
if "error" in response:
|
||
print(f"❌ Scan failed: {response['error']}")
|
||
return False
|
||
|
||
try:
|
||
# Parse the nested JSON response
|
||
results = response.get("results", [])
|
||
if not results:
|
||
print("❌ No scan results received")
|
||
return False
|
||
|
||
# The result is JSON-encoded string inside the response
|
||
result_data = json.loads(results[0])
|
||
networks_data = json.loads(result_data["result"])
|
||
|
||
self.networks = []
|
||
channels_found = set()
|
||
|
||
for net in networks_data.get("networks", []):
|
||
network = WiFiNetwork(
|
||
ssid=net["ssid"],
|
||
channel=net["channel"],
|
||
rssi=net["rssi"],
|
||
mac_address=net["mac_address"],
|
||
auth_mode=net["auth_mode"]
|
||
)
|
||
self.networks.append(network)
|
||
channels_found.add(net["channel"])
|
||
|
||
# Sort networks by RSSI (strongest first)
|
||
self.networks.sort(key=lambda x: x.rssi, reverse=True)
|
||
|
||
print(f"✅ Found {len(self.networks)} networks on channels: {sorted(channels_found)}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ Failed to parse scan results: {e}")
|
||
return False
|
||
|
||
def set_wifi(self, ssid: str, password: str) -> bool:
|
||
"""Configure WiFi credentials"""
|
||
print(f"🔧 Setting WiFi credentials for '{ssid}'...")
|
||
|
||
params = {
|
||
"name": "main",
|
||
"ssid": ssid,
|
||
"password": password,
|
||
"channel": 0,
|
||
"power": 0
|
||
}
|
||
|
||
response = self.send_command("set_wifi", params)
|
||
|
||
if "error" in response:
|
||
print(f"❌ WiFi setup failed: {response['error']}")
|
||
return False
|
||
|
||
print("✅ WiFi credentials set successfully")
|
||
return True
|
||
|
||
def set_mdns_name(self, name: str) -> bool:
|
||
"""Configure the MDNS hostname"""
|
||
|
||
print(f"🔧 Setting MDNS name to '{name}'...")
|
||
print(" The device should be accessible under")
|
||
print(f"http://{name}.local/")
|
||
print("\n Note, this will also modify the name of the UVC device")
|
||
|
||
params = {
|
||
"hostname": name
|
||
}
|
||
|
||
response = self.send_command("set_mdns", params)
|
||
if "error" in response:
|
||
print(f"❌ MDNS name setup failed: {response['error']}")
|
||
return False
|
||
|
||
print("✅ MDNS name set successfully")
|
||
return True
|
||
|
||
def get_wifi_status(self) -> Dict:
|
||
"""Get current WiFi connection status"""
|
||
response = self.send_command("get_wifi_status")
|
||
|
||
if "error" in response:
|
||
print(f"❌ Failed to get WiFi status: {response['error']}")
|
||
return {}
|
||
|
||
try:
|
||
# Parse the nested JSON response
|
||
results = response.get("results", [])
|
||
if results:
|
||
result_data = json.loads(results[0])
|
||
# The result is a JSON-encoded string, need to decode it
|
||
status_json = result_data["result"]
|
||
|
||
# First, unescape the JSON string properly
|
||
# Replace escaped backslashes and quotes
|
||
status_json = status_json.replace('\\\\', '\\')
|
||
status_json = status_json.replace('\\"', '"')
|
||
|
||
# Now parse the cleaned JSON
|
||
status_data = json.loads(status_json)
|
||
return status_data
|
||
except Exception as e:
|
||
print(f"❌ Failed to parse WiFi status: {e}")
|
||
# Try to show raw response for debugging
|
||
if "results" in response and response["results"]:
|
||
print(f"📝 Raw result: {response['results'][0]}")
|
||
|
||
return {}
|
||
|
||
def connect_wifi(self) -> bool:
|
||
"""Attempt to connect to configured WiFi"""
|
||
print("🔗 Attempting WiFi connection...")
|
||
response = self.send_command("connect_wifi")
|
||
|
||
if "error" in response:
|
||
print(f"❌ WiFi connection failed: {response['error']}")
|
||
return False
|
||
|
||
print("✅ WiFi connection attempt started")
|
||
return True
|
||
|
||
def start_streaming(self) -> bool:
|
||
"""Start streaming mode"""
|
||
print("🚀 Starting streaming mode...")
|
||
response = self.send_command("start_streaming")
|
||
|
||
if "error" in response:
|
||
print(f"❌ Failed to start streaming: {response['error']}")
|
||
return False
|
||
|
||
print("✅ Streaming mode started")
|
||
return True
|
||
|
||
def switch_mode(self, mode: str) -> bool:
|
||
"""Switch device mode between WiFi, UVC, and Setup"""
|
||
print(f"🔄 Switching device mode to '{mode}'...")
|
||
|
||
params = {"mode": mode}
|
||
response = self.send_command("switch_mode", params)
|
||
|
||
if "error" in response:
|
||
print(f"❌ Failed to switch mode: {response['error']}")
|
||
return False
|
||
|
||
print(f"✅ Device mode switched to '{mode}' successfully!")
|
||
print("🔄 Please restart the device for changes to take effect")
|
||
return True
|
||
|
||
def get_device_mode(self) -> str:
|
||
"""Get current device mode"""
|
||
response = self.send_command("get_device_mode")
|
||
|
||
if "error" in response:
|
||
print(f"❌ Failed to get device mode: {response['error']}")
|
||
return "unknown"
|
||
|
||
try:
|
||
results = response.get("results", [])
|
||
if results:
|
||
result_data = json.loads(results[0])
|
||
mode_data = json.loads(result_data["result"])
|
||
return mode_data.get("mode", "unknown")
|
||
except Exception as e:
|
||
print(f"❌ Failed to parse mode response: {e}")
|
||
return "unknown"
|
||
|
||
def get_mdns_name(self) -> str:
|
||
"""Get the current MDNS name"""
|
||
response = self.send_command("get_mdns_name")
|
||
if "error" in response:
|
||
print(f"❌ Failed to get device name: {response['error']}")
|
||
return "unknown"
|
||
|
||
try:
|
||
results = response.get("results", [])
|
||
if results:
|
||
result_data = json.loads(results[0])
|
||
name_data = json.loads(result_data["result"])
|
||
return name_data.get("hostname", "unknown")
|
||
except Exception as e:
|
||
print(f"❌ Failed to parse name response: {e}")
|
||
return "unknown"
|
||
|
||
def set_led_duty_cycle(self, duty_cycle):
|
||
"""Sets the PWN duty cycle of the LED"""
|
||
print(f"🌟 Setting LED duty cycle to {duty_cycle}%...")
|
||
response = self.send_command("set_led_duty_cycle", {"dutyCycle": duty_cycle})
|
||
if "error" in response:
|
||
print(f"❌ Failed to set LED duty cycle: {response['error']}")
|
||
return False
|
||
|
||
print("✅ LED duty cycle set successfully")
|
||
return True
|
||
|
||
def get_led_duty_cycle(self) -> Optional[int]:
|
||
"""Get the current LED PWM duty cycle from the device"""
|
||
response = self.send_command("get_led_duty_cycle")
|
||
if "error" in response:
|
||
print(f"❌ Failed to get LED duty cycle: {response['error']}")
|
||
return None
|
||
try:
|
||
results = response.get("results", [])
|
||
if results:
|
||
result_data = json.loads(results[0])
|
||
payload = result_data["result"]
|
||
if isinstance(payload, str):
|
||
payload = json.loads(payload)
|
||
return int(payload.get("led_external_pwm_duty_cycle"))
|
||
except Exception as e:
|
||
print(f"❌ Failed to parse LED duty cycle: {e}")
|
||
return None
|
||
|
||
def get_serial_info(self) -> Optional[Tuple[str, str]]:
|
||
"""Get device serial number and MAC address"""
|
||
response = self.send_command("get_serial")
|
||
if "error" in response:
|
||
print(f"❌ Failed to get serial/MAC: {response['error']}")
|
||
return None
|
||
try:
|
||
results = response.get("results", [])
|
||
if results:
|
||
result_data = json.loads(results[0])
|
||
payload = result_data["result"]
|
||
if isinstance(payload, str):
|
||
payload = json.loads(payload)
|
||
serial = payload.get("serial")
|
||
mac = payload.get("mac")
|
||
return serial, mac
|
||
except Exception as e:
|
||
print(f"❌ Failed to parse serial/MAC: {e}")
|
||
return None
|
||
|
||
def monitor_logs(self):
|
||
"""Monitor device logs until interrupted"""
|
||
print("📋 Monitoring device logs (Press Ctrl+C to exit)...")
|
||
print("-" * 60)
|
||
|
||
if not self.connection or not self.connection.is_open:
|
||
print("❌ Not connected to device")
|
||
return
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
if self.connection.in_waiting > 0:
|
||
line = self.connection.readline().decode().strip()
|
||
if line:
|
||
# Skip JSON command responses, show raw logs
|
||
if not (line.startswith('{') and line.endswith('}')):
|
||
print(line)
|
||
elif "heartbeat" not in line:
|
||
# Show non-heartbeat JSON responses
|
||
print(f"📡 {line}")
|
||
else:
|
||
time.sleep(0.1) # Small delay to prevent busy waiting
|
||
except Exception:
|
||
continue
|
||
except KeyboardInterrupt:
|
||
print("\n🛑 Log monitoring stopped")
|
||
|
||
|
||
class OpenIrisDiscovery:
|
||
def __init__(self):
|
||
self.devices: Dict[str, OpenIrisDevice] = {}
|
||
self.discovery_active = False
|
||
|
||
def discover_devices(self, timeout: int = 3) -> List[OpenIrisDevice]:
|
||
"""Discover OpenIris devices via heartbeat - ultra-fast concurrent scanning"""
|
||
print(f"⚡ Fast-scanning for OpenIris devices...")
|
||
|
||
# Get all serial ports
|
||
ports = list(serial.tools.list_ports.comports())
|
||
if not ports:
|
||
print("❌ No serial ports found")
|
||
return []
|
||
|
||
# Prioritize likely ESP32 USB ports for faster detection
|
||
priority_ports = []
|
||
other_ports = []
|
||
|
||
for port in ports:
|
||
# Common ESP32 USB-to-serial descriptions
|
||
desc_lower = (port.description or "").lower()
|
||
# Include generic "USB Serial Device" which is common on Windows
|
||
if any(keyword in desc_lower for keyword in
|
||
["cp210", "ch340", "ftdi", "esp32", "silicon labs", "usb-serial", "usb serial", "usb serial device"]):
|
||
priority_ports.append(port)
|
||
else:
|
||
other_ports.append(port)
|
||
|
||
# Check priority ports first, then others
|
||
sorted_ports = priority_ports + other_ports
|
||
|
||
if priority_ports:
|
||
print(f"📡 Checking {len(sorted_ports)} ports ({len(priority_ports)} prioritized USB serial ports)...")
|
||
else:
|
||
print(f"📡 Checking {len(sorted_ports)} serial ports...")
|
||
|
||
discovered = {}
|
||
lock = threading.Lock()
|
||
threads = []
|
||
|
||
def check_port_fast(port_info):
|
||
"""Check a single port for OpenIris heartbeat - optimized for speed"""
|
||
try:
|
||
# Initial connection timeout - 500ms
|
||
ser = serial.Serial(port_info.device, 115200, timeout=0.5)
|
||
ser.reset_input_buffer()
|
||
|
||
# Wait up to 2 seconds for heartbeat
|
||
start_time = time.time()
|
||
while time.time() - start_time < 2.0:
|
||
try:
|
||
# Read timeout - 200ms
|
||
ser.timeout = 10
|
||
if ser.in_waiting > 0:
|
||
line = ser.readline()
|
||
if line:
|
||
try:
|
||
data = json.loads(line.decode().strip())
|
||
if (data.get("heartbeat") == "openiris_setup_mode" and
|
||
"serial" in data):
|
||
serial_num = data["serial"]
|
||
with lock:
|
||
if serial_num not in discovered:
|
||
device = OpenIrisDevice(port_info.device, serial_num, debug=False)
|
||
discovered[serial_num] = device
|
||
print(f"💓 Found {serial_num} on {port_info.device}")
|
||
# Return immediately to stop checking this port
|
||
ser.close()
|
||
return True
|
||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||
pass
|
||
else:
|
||
time.sleep(0.05) # Very short sleep
|
||
except Exception:
|
||
pass
|
||
|
||
ser.close()
|
||
except Exception:
|
||
# Port not available or not the right device
|
||
pass
|
||
return False
|
||
|
||
# Start concurrent port checking
|
||
for port in sorted_ports:
|
||
thread = threading.Thread(target=check_port_fast, args=(port,))
|
||
thread.daemon = True
|
||
thread.start()
|
||
threads.append(thread)
|
||
|
||
# Wait for threads to complete or timeout
|
||
timeout_time = time.time() + timeout
|
||
for thread in threads:
|
||
remaining = timeout_time - time.time()
|
||
if remaining > 0:
|
||
thread.join(timeout=remaining)
|
||
|
||
# If we found at least one device, return immediately
|
||
if discovered:
|
||
break
|
||
|
||
devices = list(discovered.values())
|
||
|
||
if devices:
|
||
print(f"✅ Found {len(devices)} OpenIris device(s)")
|
||
else:
|
||
print("❌ No OpenIris devices found in {:.1f} seconds".format(time.time() - (timeout_time - timeout)))
|
||
print("💡 Device has 20-second setup window after power on")
|
||
|
||
return devices
|
||
|
||
def _check_port(self, port: str, discovered: Dict, timeout: int):
|
||
"""Check a single port for OpenIris heartbeat"""
|
||
try:
|
||
with serial.Serial(port, 115200, timeout=1) as ser:
|
||
start_time = time.time()
|
||
|
||
while time.time() - start_time < timeout:
|
||
try:
|
||
line = ser.readline().decode().strip()
|
||
if line:
|
||
try:
|
||
data = json.loads(line)
|
||
if (data.get("heartbeat") == "openiris_setup_mode" and
|
||
"serial" in data):
|
||
|
||
serial_num = data["serial"]
|
||
if serial_num not in discovered:
|
||
discovered[serial_num] = OpenIrisDevice(port, serial_num, debug=False)
|
||
print(f"💓 Found device {serial_num} on {port}")
|
||
return
|
||
except json.JSONDecodeError:
|
||
continue
|
||
except Exception:
|
||
continue
|
||
except Exception:
|
||
# Port not available or not a serial device
|
||
pass
|
||
|
||
|
||
def scan_networks(device: OpenIrisDevice, args = None):
|
||
should_use_custom_timeout = input("Use custom scan timeout? (y/N): ").strip().lower()
|
||
if should_use_custom_timeout == 'y':
|
||
try:
|
||
timeout = int(input("Enter timeout in seconds (5-120): "))
|
||
if 5 <= timeout <= 120:
|
||
device.scan_networks(timeout)
|
||
else:
|
||
print("❌ Timeout must be between 5 and 120 seconds")
|
||
device.scan_networks(args.scan_timeout)
|
||
except ValueError:
|
||
print("❌ Invalid timeout, using default")
|
||
device.scan_networks(args.scan_timeout)
|
||
else:
|
||
device.scan_networks(args.scan_timeout)
|
||
|
||
|
||
def configure_wifi(device: OpenIrisDevice, args = None):
|
||
if not device.networks:
|
||
print("❌ No networks available. Please scan first.")
|
||
return
|
||
|
||
display_networks(device)
|
||
|
||
while True:
|
||
net_choice = input("\nEnter network number (or 'back'): ").strip()
|
||
if net_choice.lower() == 'back':
|
||
break
|
||
|
||
try:
|
||
net_idx = int(net_choice) - 1
|
||
except ValueError:
|
||
print("❌ Please enter a number or 'back'")
|
||
continue
|
||
|
||
sorted_networks = sorted(device.networks, key=lambda x: x.rssi, reverse=True)
|
||
|
||
if 0 <= net_idx < len(sorted_networks):
|
||
selected_network = sorted_networks[net_idx]
|
||
|
||
print(f"\n🔐 Selected: {selected_network.ssid}")
|
||
print(f"Security: {selected_network.security_type}")
|
||
|
||
if selected_network.auth_mode == 0: # Open network
|
||
password = ""
|
||
print("🔓 Open network - no password required")
|
||
else:
|
||
password = input("Enter WiFi password: ")
|
||
|
||
if device.set_wifi(selected_network.ssid, password):
|
||
print("✅ WiFi configured successfully!")
|
||
print("💡 Next steps:")
|
||
print(" • Open WiFi menu to connect to WiFi (if needed)")
|
||
print(" • Open WiFi menu to check WiFi status")
|
||
print(" • Start streaming from the main menu when connected")
|
||
break
|
||
else:
|
||
print("❌ Invalid network number")
|
||
|
||
|
||
def configure_mdns(device: OpenIrisDevice, args = None):
|
||
current_name = device.get_mdns_name()
|
||
print(f"\n📍 Current advertised name: {current_name} \n")
|
||
print("💡 This single name is used for both: mDNS (http://<name>.local/) and USB UVC device descriptor.")
|
||
print("💡 Avoid spaces / special chars. Enter 'back' to cancel.")
|
||
|
||
while True:
|
||
name_choice = input("\nDevice name: ").strip()
|
||
if any(space in name_choice for space in string.whitespace):
|
||
print("❌ Please avoid spaces and special characters")
|
||
else:
|
||
break
|
||
|
||
if name_choice.lower() == "back":
|
||
return
|
||
|
||
if device.set_mdns_name(name_choice):
|
||
print("✅ MDNS configured successfully!")
|
||
|
||
|
||
def display_networks(device: OpenIrisDevice, args = None):
|
||
"""Display available WiFi networks in a formatted table"""
|
||
networks = device.networks
|
||
|
||
if not networks:
|
||
print("❌ No networks available")
|
||
return
|
||
|
||
print("\n📡 Available WiFi Networks:")
|
||
print("-" * 85)
|
||
print(f"{'#':<3} {'SSID':<32} {'Channel':<8} {'Signal':<20} {'Security':<15}")
|
||
print("-" * 85)
|
||
|
||
# Networks are already sorted by signal strength from scan_networks
|
||
for i, network in enumerate(networks, 1):
|
||
# Create signal strength visualization
|
||
signal_bars = "▓" * min(5, max(0, (network.rssi + 100) // 10))
|
||
signal_str = f"{signal_bars:<5} ({network.rssi} dBm)"
|
||
|
||
# Format SSID (show hidden networks as <hidden>)
|
||
ssid_display = network.ssid if network.ssid else "<hidden>"
|
||
|
||
print(f"{i:<3} {ssid_display:<32} {network.channel:<8} {signal_str:<20} {network.security_type:<15}")
|
||
|
||
print("-" * 85)
|
||
|
||
# Show channel distribution
|
||
channels = {}
|
||
for net in networks:
|
||
channels[net.channel] = channels.get(net.channel, 0) + 1
|
||
|
||
print(f"\n📊 Channel distribution: ", end="")
|
||
for ch in sorted(channels.keys()):
|
||
print(f"Ch{ch}: {channels[ch]} networks ", end="")
|
||
print()
|
||
|
||
|
||
def check_wifi_status(device: OpenIrisDevice, args = None):
|
||
status = device.get_wifi_status()
|
||
if status:
|
||
print(f"📶 WiFi Status: {status.get('status', 'unknown')}")
|
||
print(f"📡 Networks configured: {status.get('networks_configured', 0)}")
|
||
if status.get('ip_address'):
|
||
print(f"🌐 IP Address: {status['ip_address']}")
|
||
else:
|
||
print("❌ Unable to get WiFi status")
|
||
|
||
|
||
def attempt_wifi_connection(device: OpenIrisDevice, args = None):
|
||
device.connect_wifi()
|
||
print("🕰️ Wait a few seconds then check status in the WiFi menu")
|
||
|
||
|
||
def start_streaming(device: OpenIrisDevice, args = None):
|
||
device.start_streaming()
|
||
print("🚀 Streaming started! Use 'Monitor logs' from the main menu.")
|
||
|
||
|
||
# ----- WiFi submenu -----
|
||
def wifi_auto_setup(device: OpenIrisDevice, args=None):
|
||
print("\n⚙️ Automatic WiFi setup starting...")
|
||
scan_timeout = getattr(args, "scan_timeout", 30) if args else 30
|
||
|
||
# 1) Scan
|
||
if not device.scan_networks(timeout=scan_timeout):
|
||
print("❌ Auto-setup aborted: no networks found or scan failed")
|
||
return
|
||
|
||
# 2) Show networks (sorted strongest-first already)
|
||
display_networks(device)
|
||
|
||
# 3) Select a network (default strongest)
|
||
choice = input("Select network number [default: 1] or 'back': ").strip()
|
||
if choice.lower() == "back":
|
||
return
|
||
try:
|
||
idx = int(choice) - 1 if choice else 0
|
||
except ValueError:
|
||
idx = 0
|
||
|
||
sorted_networks = sorted(device.networks, key=lambda x: x.rssi, reverse=True)
|
||
if not (0 <= idx < len(sorted_networks)):
|
||
print("⚠️ Invalid selection, using strongest network")
|
||
idx = 0
|
||
|
||
selected = sorted_networks[idx]
|
||
print(f"\n🔐 Selected: {selected.ssid if selected.ssid else '<hidden>'}")
|
||
if selected.auth_mode == 0:
|
||
password = ""
|
||
print("🔓 Open network - no password required")
|
||
else:
|
||
password = input("Enter WiFi password: ")
|
||
|
||
# 4) Configure WiFi
|
||
if not device.set_wifi(selected.ssid, password):
|
||
print("❌ Auto-setup aborted: failed to configure WiFi")
|
||
return
|
||
|
||
# 5) Connect
|
||
if not device.connect_wifi():
|
||
print("❌ Auto-setup aborted: failed to start WiFi connection")
|
||
return
|
||
|
||
# 6) Wait for IP / connected status
|
||
print("⏳ Connecting to WiFi, waiting for IP...")
|
||
start = time.time()
|
||
timeout_s = 30
|
||
ip = None
|
||
last_status = None
|
||
while time.time() - start < timeout_s:
|
||
status = device.get_wifi_status()
|
||
last_status = status
|
||
ip = (status or {}).get("ip_address")
|
||
if ip and ip not in ("0.0.0.0", "", None):
|
||
break
|
||
time.sleep(0.5)
|
||
|
||
if ip and ip not in ("0.0.0.0", "", None):
|
||
print(f"✅ Connected! IP Address: {ip}")
|
||
else:
|
||
print("⚠️ Connection not confirmed within timeout")
|
||
if last_status:
|
||
print(f" Status: {last_status.get('status', 'unknown')} | IP: {last_status.get('ip_address', '-')}")
|
||
|
||
|
||
def wifi_menu(device: OpenIrisDevice, args=None):
|
||
while True:
|
||
print("\n📶 WiFi Settings:")
|
||
print(f"{str(1):>2} ⚙️ Automatic WiFi setup")
|
||
print(f"{str(2):>2} 📁 Manual WiFi actions")
|
||
print("back Back")
|
||
|
||
choice = input("\nSelect option (1-2 or 'back'): ").strip()
|
||
if choice.lower() == "back":
|
||
break
|
||
|
||
if choice == "1":
|
||
wifi_auto_setup(device, args)
|
||
elif choice == "2":
|
||
wifi_manual_menu(device, args)
|
||
else:
|
||
print("❌ Invalid option")
|
||
|
||
|
||
def wifi_manual_menu(device: OpenIrisDevice, args=None):
|
||
while True:
|
||
print("\n📁 WiFi Manual Actions:")
|
||
print(f"{str(1):>2} 🔍 Scan for WiFi networks")
|
||
print(f"{str(2):>2} 📡 Show available networks")
|
||
print(f"{str(3):>2} 🔐 Configure WiFi")
|
||
print(f"{str(4):>2} 🔗 Connect to WiFi")
|
||
print(f"{str(5):>2} 🛰️ Check WiFi status")
|
||
print("back Back")
|
||
|
||
choice = input("\nSelect option (1-5 or 'back'): ").strip()
|
||
if choice.lower() == "back":
|
||
break
|
||
|
||
sub_map = {
|
||
"1": scan_networks,
|
||
"2": display_networks,
|
||
"3": configure_wifi,
|
||
"4": attempt_wifi_connection,
|
||
"5": check_wifi_status,
|
||
}
|
||
|
||
handler = sub_map.get(choice)
|
||
if not handler:
|
||
print("❌ Invalid option")
|
||
continue
|
||
handler(device, args)
|
||
|
||
|
||
def switch_device_mode(device: OpenIrisDevice, args = None):
|
||
current_mode = device.get_device_mode()
|
||
print(f"\n📍 Current device mode: {current_mode}")
|
||
print("\n🔄 Select new device mode:")
|
||
print("1. WiFi - Stream over WiFi connection")
|
||
print("2. UVC - Stream as USB webcam")
|
||
print("3. Setup - Configuration mode")
|
||
|
||
mode_choice = input("\nSelect mode (1-3): ").strip()
|
||
|
||
if mode_choice == "1":
|
||
device.switch_mode("wifi")
|
||
elif mode_choice == "2":
|
||
device.switch_mode("uvc")
|
||
elif mode_choice == "3":
|
||
device.switch_mode("setup")
|
||
else:
|
||
print("❌ Invalid mode selection")
|
||
|
||
|
||
def set_led_duty_cycle(device: OpenIrisDevice, args=None):
|
||
# Show current duty cycle on entry
|
||
current = device.get_led_duty_cycle()
|
||
if current is not None:
|
||
print(f"💡 Current LED duty cycle: {current}%")
|
||
|
||
while True:
|
||
input_data = input("Enter LED external PWM duty cycle (0-100) or `back` to exit: \n")
|
||
if input_data.lower() == "back":
|
||
break
|
||
|
||
try:
|
||
duty_cycle = int(input_data)
|
||
except ValueError:
|
||
print("❌ Invalid input. Please enter a number between 0 and 100.")
|
||
|
||
if duty_cycle < 0 or duty_cycle > 100:
|
||
print("❌ Duty cycle must be between 0 and 100.")
|
||
else:
|
||
# Apply immediately; stay in loop for further tweaks
|
||
if device.set_led_duty_cycle(duty_cycle):
|
||
# Read back and display current value using existing getter
|
||
updated = device.get_led_duty_cycle()
|
||
if updated is not None:
|
||
print(f"💡 Current LED duty cycle: {updated}%")
|
||
else:
|
||
print("ℹ️ Duty cycle updated, but current value could not be read back.")
|
||
|
||
|
||
def monitor_logs(device: OpenIrisDevice, args = None):
|
||
device.monitor_logs()
|
||
|
||
|
||
def get_led_duty_cycle(device: OpenIrisDevice, args=None):
|
||
duty = device.get_led_duty_cycle()
|
||
if duty is not None:
|
||
print(f"💡 Current LED duty cycle: {duty}%")
|
||
|
||
|
||
def get_serial(device: OpenIrisDevice, args=None):
|
||
info = device.get_serial_info()
|
||
if info is not None:
|
||
serial, mac = info
|
||
# print(f"🔑 Serial: {serial}")
|
||
print(f"🔗 MAC: {mac}")
|
||
|
||
|
||
# ----- Aggregated GET: settings summary -----
|
||
def _probe_serial(device: OpenIrisDevice) -> Dict:
|
||
info = device.get_serial_info()
|
||
if info is None:
|
||
return {"serial": None, "mac": None}
|
||
serial, mac = info
|
||
return {"serial": serial, "mac": mac}
|
||
|
||
def _probe_info(device: OpenIrisDevice) -> Dict:
|
||
resp = device.send_command("get_who_am_i")
|
||
if "error" in resp:
|
||
return {"who_am_i": None, "version": None, "error": resp["error"]}
|
||
try:
|
||
results = resp.get("results", [])
|
||
if results:
|
||
result_data = json.loads(results[0])
|
||
payload = result_data["result"]
|
||
if isinstance(payload, str):
|
||
payload = json.loads(payload)
|
||
return {"who_am_i": payload.get("who_am_i"), "version": payload.get("version")}
|
||
except Exception as e:
|
||
return {"who_am_i": None, "version": None, "error": str(e)}
|
||
return {"who_am_i": None, "version": None}
|
||
|
||
def _probe_advertised_name(device: OpenIrisDevice) -> Dict:
|
||
# Currently the advertised name == mdns hostname
|
||
name = device.get_mdns_name()
|
||
return {"advertised_name": name}
|
||
|
||
|
||
def _probe_led_pwm(device: OpenIrisDevice) -> Dict:
|
||
duty = device.get_led_duty_cycle()
|
||
return {"led_external_pwm_duty_cycle": duty}
|
||
|
||
def _probe_led_current(device: OpenIrisDevice) -> Dict:
|
||
# Query device for current in mA via new command
|
||
resp = device.send_command("get_led_current")
|
||
if "error" in resp:
|
||
return {"led_current_ma": None, "error": resp["error"]}
|
||
try:
|
||
results = resp.get("results", [])
|
||
if results:
|
||
result_data = json.loads(results[0])
|
||
payload = result_data["result"]
|
||
if isinstance(payload, str):
|
||
payload = json.loads(payload)
|
||
return {"led_current_ma": float(payload.get("led_current_ma"))}
|
||
except Exception as e:
|
||
return {"led_current_ma": None, "error": str(e)}
|
||
return {"led_current_ma": None}
|
||
|
||
|
||
def _probe_mode(device: OpenIrisDevice) -> Dict:
|
||
mode = device.get_device_mode()
|
||
return {"mode": mode}
|
||
|
||
|
||
def _probe_wifi_status(device: OpenIrisDevice) -> Dict:
|
||
# Returns dict as provided by device; pass through
|
||
status = device.get_wifi_status() or {}
|
||
return {"wifi_status": status}
|
||
|
||
|
||
def get_settings(device: OpenIrisDevice, args=None):
|
||
print("\n🧩 Collecting device settings...\n")
|
||
|
||
probes = [
|
||
("Identity", _probe_serial),
|
||
("AdvertisedName", _probe_advertised_name),
|
||
("Info", _probe_info),
|
||
("LED", _probe_led_pwm),
|
||
("Current", _probe_led_current),
|
||
("Mode", _probe_mode),
|
||
("WiFi", _probe_wifi_status),
|
||
]
|
||
|
||
summary: Dict[str, Dict] = {}
|
||
|
||
for label, probe in probes:
|
||
try:
|
||
data = probe(device)
|
||
summary[label] = data
|
||
except Exception as e:
|
||
summary[label] = {"error": str(e)}
|
||
|
||
# Pretty print summary
|
||
# Identity
|
||
ident = summary.get("Identity", {})
|
||
serial = ident.get("serial")
|
||
mac = ident.get("mac")
|
||
if serial:
|
||
print(f"🔑 Serial: {serial}")
|
||
# if mac:
|
||
# print(f"🔗 MAC: {mac}")
|
||
if not serial and not mac:
|
||
print("🔑 Serial/MAC: unavailable")
|
||
|
||
# Advertised Name
|
||
adv = summary.get("AdvertisedName", {})
|
||
adv_name = adv.get("advertised_name")
|
||
if adv_name:
|
||
print(f"📛 Name: {adv_name}")
|
||
|
||
# Info
|
||
info = summary.get("Info", {})
|
||
who = info.get("who_am_i")
|
||
ver = info.get("version")
|
||
if who:
|
||
print(f"🏷️ Device: {who}")
|
||
if ver:
|
||
print(f"🧭 Version: {ver}")
|
||
|
||
# LED
|
||
led = summary.get("LED", {})
|
||
duty = led.get("led_external_pwm_duty_cycle")
|
||
if duty is not None:
|
||
print(f"💡 LED PWM Duty: {duty}%")
|
||
else:
|
||
print("💡 LED PWM Duty: unknown")
|
||
|
||
# Mode
|
||
mode = summary.get("Mode", {}).get("mode")
|
||
print(f"🎚️ Mode: {mode if mode else 'unknown'}")
|
||
|
||
# Current
|
||
current = summary.get("Current", {}).get("led_current_ma")
|
||
if current is not None:
|
||
print(f"🔌 LED Current: {current:.3f} mA")
|
||
else:
|
||
err = summary.get("Current", {}).get("error")
|
||
if err:
|
||
print(f"🔌 LED Current: unavailable ({err})")
|
||
else:
|
||
print("🔌 LED Current: unavailable")
|
||
|
||
# WiFi
|
||
wifi = summary.get("WiFi", {}).get("wifi_status", {})
|
||
if wifi:
|
||
status = wifi.get("status", "unknown")
|
||
ip = wifi.get("ip_address") or "-"
|
||
configured = wifi.get("networks_configured", 0)
|
||
print(f"📶 WiFi: {status} | IP: {ip} | Networks configured: {configured}")
|
||
else:
|
||
print("📶 WiFi: status unavailable")
|
||
|
||
print("")
|
||
|
||
|
||
COMMANDS_MAP = {
|
||
"1": wifi_menu,
|
||
"2": configure_mdns,
|
||
"3": start_streaming,
|
||
"4": switch_device_mode,
|
||
"5": set_led_duty_cycle,
|
||
"6": monitor_logs,
|
||
"7": get_settings,
|
||
}
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="OpenIris Setup CLI Tool")
|
||
parser.add_argument("--timeout", type=int, default=3,
|
||
help="Discovery timeout in seconds (default: 3)")
|
||
parser.add_argument("--port", type=str,
|
||
help="Skip discovery and connect directly to specified port")
|
||
parser.add_argument("--scan-timeout", type=int, default=30,
|
||
help="WiFi scan timeout in seconds (default: 30)")
|
||
parser.add_argument("--no-auto", action="store_true",
|
||
help="Don't auto-connect to first device found")
|
||
parser.add_argument("--debug", action="store_true",
|
||
help="Show debug output including raw serial data")
|
||
args = parser.parse_args()
|
||
|
||
print("🔧 OpenIris Setup Tool")
|
||
print("=" * 50)
|
||
|
||
device = None
|
||
|
||
try:
|
||
if args.port:
|
||
# Connect directly to specified port
|
||
print(f"📡 Connecting directly to {args.port}...")
|
||
device = OpenIrisDevice(args.port, "direct", debug=args.debug)
|
||
if not device.connect():
|
||
return 1
|
||
else:
|
||
# Fast device discovery
|
||
discovery = OpenIrisDiscovery()
|
||
devices = discovery.discover_devices(args.timeout)
|
||
|
||
if not devices:
|
||
print("\n❌ No OpenIris devices found automatically")
|
||
print("\n💡 Troubleshooting:")
|
||
print(" - Make sure device is connected via USB")
|
||
print(" - Device must be powered on within last 20 seconds")
|
||
print(" - Try specifying port manually with --port")
|
||
|
||
# Show available ports to help user
|
||
print("\n📋 Available serial ports:")
|
||
all_ports = list(serial.tools.list_ports.comports())
|
||
for p in all_ports:
|
||
print(f" - {p.device}: {p.description}")
|
||
|
||
# Offer manual port entry
|
||
manual_port = input("\n🔌 Enter serial port manually (e.g. COM15, /dev/ttyUSB0) or press Enter to exit: ").strip()
|
||
if manual_port:
|
||
device = OpenIrisDevice(manual_port, "manual", debug=args.debug)
|
||
if not device.connect():
|
||
return 1
|
||
else:
|
||
return 1
|
||
else:
|
||
# Auto-connect to first device found (unless disabled)
|
||
if len(devices) == 1 or not args.no_auto:
|
||
device = devices[0]
|
||
device.debug = args.debug # Set debug mode
|
||
print(f"\n🎯 Auto-connecting to {device.serial_number}...")
|
||
if not device.connect():
|
||
return 1
|
||
else:
|
||
# Multiple devices found with no-auto flag
|
||
print("\n🔢 Multiple devices found. Select one:")
|
||
for i, dev in enumerate(devices, 1):
|
||
print(f" {i}. {dev.serial_number} on {dev.port}")
|
||
|
||
while True:
|
||
try:
|
||
choice = int(input("\nEnter device number: ")) - 1
|
||
if 0 <= choice < len(devices):
|
||
device = devices[choice]
|
||
break
|
||
else:
|
||
print("❌ Invalid selection")
|
||
except ValueError:
|
||
print("❌ Please enter a number")
|
||
|
||
# Connect to selected device
|
||
device.debug = args.debug # Set debug mode
|
||
if not device.connect():
|
||
return 1
|
||
|
||
# Main interaction loop
|
||
while True:
|
||
print("\n🔧 Setup Options:")
|
||
print(f"{str(1):>2} 📶 WiFi settings")
|
||
print(f"{str(2):>2} 📛 Configure advertised name (mDNS + UVC)")
|
||
print(f"{str(3):>2} 🚀 Start streaming mode")
|
||
print(f"{str(4):>2} 🔄 Switch device mode (WiFi/UVC/Setup)")
|
||
print(f"{str(5):>2} 💡 Update PWM Duty Cycle")
|
||
print(f"{str(6):>2} 📖 Monitor logs")
|
||
print(f"{str(7):>2} 🧩 Get settings summary")
|
||
print("exit 🚪 Exit")
|
||
choice = input("\nSelect option (1-7): ").strip()
|
||
|
||
if choice == "exit":
|
||
break
|
||
|
||
command_to_run = COMMANDS_MAP.get(choice, None)
|
||
if not command_to_run:
|
||
print("❌ Invalid option")
|
||
continue
|
||
|
||
command_to_run(device, args)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n🛑 Setup interrupted")
|
||
|
||
finally:
|
||
if device:
|
||
device.disconnect()
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main()) |