From d7a302deff85992c1b53cab4ce9cc2eab0ee648c Mon Sep 17 00:00:00 2001 From: Lorow Date: Fri, 17 Oct 2025 00:06:59 +0200 Subject: [PATCH] cleanup old setup --- tools/openiris_setup.py | 1241 --------------------------------------- 1 file changed, 1241 deletions(-) diff --git a/tools/openiris_setup.py b/tools/openiris_setup.py index 403267c..e69de29 100644 --- a/tools/openiris_setup.py +++ b/tools/openiris_setup.py @@ -1,1241 +0,0 @@ -#!/usr/bin/env python3 -""" -OpenIris Setup CLI Tool - -This tool automatically discovers OpenIris devices via heartbeat, -allows WiFi configuration, and monitors the device logs. -""" - -import re -import json -import time -import threading -import argparse -import sys -import string -from typing import Dict, List, Optional, Tuple -import serial -import serial.tools.list_ports -from dataclasses import dataclass - - -@dataclass -class WiFiNetwork: - ssid: str - channel: int - rssi: int - mac_address: str - auth_mode: int - - @property - def security_type(self) -> str: - """Convert auth_mode to human readable string""" - auth_modes = { - 0: "Open", - 1: "WEP", - 2: "WPA PSK", - 3: "WPA2 PSK", - 4: "WPA WPA2 PSK", - 5: "WPA2 Enterprise", - 6: "WPA3 PSK", - 7: "WPA2 WPA3 PSK" - } - return auth_modes.get(self.auth_mode, f"Unknown ({self.auth_mode})") - - -class OpenIrisDevice: - def __init__(self, port: str, serial_number: str, debug: bool = False): - - self.port = port - self.serial_number = serial_number - self.connection: Optional[serial.Serial] = None - self.networks: List[WiFiNetwork] = [] - self.debug = debug - - def connect(self) -> bool: - """Connect to the device""" - try: - self.connection = serial.Serial( - port=self.port, - baudrate=115200, - timeout=1, - write_timeout=1 - ) - print(f"✅ Connected to device {self.serial_number} on {self.port}") - - # Immediately send pause command to keep device in setup mode - print("⏸️ Pausing device startup...") - # Use shorter timeout for pause command since device is just starting up - pause_response = self.send_command("pause", {"pause": True}, timeout=5) - if "error" not in pause_response and pause_response.get("results"): - print("✅ Device paused in setup mode") - elif "error" in pause_response and pause_response["error"] == "Command timeout": - # Even if we timeout, the command likely worked (as seen in logs) - print("✅ Device pause command sent (startup logs may have obscured response)") - else: - print(f"⚠️ Pause status uncertain: {pause_response}") - - return True - except Exception as e: - print(f"❌ Failed to connect to {self.port}: {e}") - return False - - def disconnect(self): - """Disconnect from the device""" - if self.connection and self.connection.is_open: - # Optionally unpause the device before disconnecting - print("Resuming device startup...") - self.send_command("pause", {"pause": False}) - - self.connection.close() - print(f"🔌 Disconnected from {self.port}") - - def send_command(self, command: str, params: Dict = None, timeout: int = None) -> Dict: - """Send a command to the device and wait for response""" - if not self.connection or not self.connection.is_open: - return {"error": "Not connected"} - - cmd_obj = {"commands": [{"command": command}]} - if params: - cmd_obj["commands"][0]["data"] = params - - cmd_json = json.dumps(cmd_obj) + '\n' - try: - # Clear any pending data - self.connection.reset_input_buffer() - - # Send command - print(f"📤 Sending: {cmd_json.strip()}") - self.connection.write(cmd_json.encode()) - - # For scan_networks command, handle special case - if command == "scan_networks": - # Use provided timeout or default to 30 seconds for scan - scan_timeout = timeout if timeout is not None else 30 - return self._handle_scan_response(scan_timeout) - - # Wait for response (skip heartbeats and logs) - start_time = time.time() - response_buffer = "" - - # Use provided timeout or default to 15 seconds - cmd_timeout = timeout if timeout is not None else 15 - while time.time() - start_time < cmd_timeout: - try: - if self.connection.in_waiting: - data = self.connection.read(self.connection.in_waiting).decode('utf-8', errors='ignore') - response_buffer += data - - # Show raw data for debugging - if self.debug and data.strip(): - print(f"📡 Raw: {repr(data)}") - print(f"📝 Buffer: {repr(response_buffer[-200:])}") - - # Remove ANSI escape sequences - clean_buffer = re.sub(r'\x1b\[[0-9;]*m', '', response_buffer) - clean_buffer = clean_buffer.replace('\r', '') - - # Look for JSON objects - handle both single-line and multi-line - # Try to find complete JSON objects - start_idx = clean_buffer.find('{') - while start_idx >= 0: - # Count braces to find complete JSON - brace_count = 0 - end_idx = -1 - - for i in range(start_idx, len(clean_buffer)): - if clean_buffer[i] == '{': - brace_count += 1 - elif clean_buffer[i] == '}': - brace_count -= 1 - if brace_count == 0: - end_idx = i + 1 - break - - if end_idx > start_idx: - json_str = clean_buffer[start_idx:end_idx] - - # Try to parse any complete JSON object - try: - # Clean up the JSON - clean_json = json_str.replace('\t', ' ').replace('\n', ' ').replace('\r', '') - clean_json = re.sub(r'\s+', ' ', clean_json) - - response = json.loads(clean_json) - - # Return if this is a command response with results - if "results" in response: - return response - - except json.JSONDecodeError: - pass - - # Look for next JSON object - start_idx = clean_buffer.find('{', end_idx) - else: - # No complete JSON found yet - break - else: - time.sleep(0.1) - except Exception as e: - print(f"⚠️ Exception: {e}") - continue - - return {"error": "Command timeout"} - except Exception as e: - return {"error": f"Communication error: {e}"} - - def _handle_scan_response(self, timeout: int = 30) -> Dict: - """Handle scan_networks command response which outputs raw JSON first""" - start_time = time.time() - response_buffer = "" - - while time.time() - start_time < timeout: # Configurable timeout for scan - if self.connection.in_waiting: - data = self.connection.read(self.connection.in_waiting).decode('utf-8', errors='ignore') - response_buffer += data - - # Look for WiFi networks JSON directly (new format) - # The scan command now outputs JSON directly followed by command result - if '{"networks":[' in response_buffer: - import re - - # Look for the networks JSON pattern that appears first - networks_pattern = r'\{"networks":\[.*?\]\}' - matches = re.findall(networks_pattern, response_buffer, re.DOTALL) - - for match in matches: - try: - # Parse the networks JSON directly - networks_data = json.loads(match) - if "networks" in networks_data: - # Return in the expected format for compatibility - return {"results": [json.dumps({"result": match})]} - except json.JSONDecodeError: - continue - - # Also check if we have the command result indicating completion - if '{"results":' in response_buffer and '"Networks scanned"' in response_buffer: - # We've received the completion message, parse any networks found - import re - networks_pattern = r'\{"networks":\[.*?\]\}' - matches = re.findall(networks_pattern, response_buffer, re.DOTALL) - - for match in matches: - try: - networks_data = json.loads(match) - if "networks" in networks_data: - return {"results": [json.dumps({"result": match})]} - except json.JSONDecodeError: - continue - - # If we get here, scan completed but no networks found - return {"results": [json.dumps({"result": '{"networks":[]}'})]} - else: - time.sleep(0.1) - - return {"error": "Scan timeout"} - - def scan_networks(self, timeout: int = 30) -> bool: - """Scan for WiFi networks""" - print(f"🔍 Scanning for WiFi networks (this may take up to {timeout} seconds)...") - response = self.send_command("scan_networks", timeout=timeout) - - if "error" in response: - print(f"❌ Scan failed: {response['error']}") - return False - - try: - # Parse the nested JSON response - results = response.get("results", []) - if not results: - print("❌ No scan results received") - return False - - # The result is JSON-encoded string inside the response - result_data = json.loads(results[0]) - networks_data = json.loads(result_data["result"]) - - self.networks = [] - channels_found = set() - - for net in networks_data.get("networks", []): - network = WiFiNetwork( - ssid=net["ssid"], - channel=net["channel"], - rssi=net["rssi"], - mac_address=net["mac_address"], - auth_mode=net["auth_mode"] - ) - self.networks.append(network) - channels_found.add(net["channel"]) - - # Sort networks by RSSI (strongest first) - self.networks.sort(key=lambda x: x.rssi, reverse=True) - - print(f"✅ Found {len(self.networks)} networks on channels: {sorted(channels_found)}") - return True - - except Exception as e: - print(f"❌ Failed to parse scan results: {e}") - return False - - def set_wifi(self, ssid: str, password: str) -> bool: - """Configure WiFi credentials""" - print(f"🔧 Setting WiFi credentials for '{ssid}'...") - - params = { - "name": "main", - "ssid": ssid, - "password": password, - "channel": 0, - "power": 0 - } - - response = self.send_command("set_wifi", params) - - if "error" in response: - print(f"❌ WiFi setup failed: {response['error']}") - return False - - print("✅ WiFi credentials set successfully") - return True - - def set_mdns_name(self, name: str) -> bool: - """Configure the MDNS hostname""" - - print(f"🔧 Setting MDNS name to '{name}'...") - print(" The device should be accessible under") - print(f"http://{name}.local/") - print("\n Note, this will also modify the name of the UVC device") - - params = { - "hostname": name - } - - response = self.send_command("set_mdns", params) - if "error" in response: - print(f"❌ MDNS name setup failed: {response['error']}") - return False - - print("✅ MDNS name set successfully") - return True - - def get_wifi_status(self) -> Dict: - """Get current WiFi connection status""" - response = self.send_command("get_wifi_status") - - if "error" in response: - print(f"❌ Failed to get WiFi status: {response['error']}") - return {} - - try: - # Parse the nested JSON response - results = response.get("results", []) - if results: - result_data = json.loads(results[0]) - # The result is a JSON-encoded string, need to decode it - status_json = result_data["result"] - - # First, unescape the JSON string properly - # Replace escaped backslashes and quotes - status_json = status_json.replace('\\\\', '\\') - status_json = status_json.replace('\\"', '"') - - # Now parse the cleaned JSON - status_data = json.loads(status_json) - return status_data - except Exception as e: - print(f"❌ Failed to parse WiFi status: {e}") - # Try to show raw response for debugging - if "results" in response and response["results"]: - print(f"📝 Raw result: {response['results'][0]}") - - return {} - - def connect_wifi(self) -> bool: - """Attempt to connect to configured WiFi""" - print("🔗 Attempting WiFi connection...") - response = self.send_command("connect_wifi") - - if "error" in response: - print(f"❌ WiFi connection failed: {response['error']}") - return False - - print("✅ WiFi connection attempt started") - return True - - def start_streaming(self) -> bool: - """Start streaming mode""" - print("🚀 Starting streaming mode...") - response = self.send_command("start_streaming") - - if "error" in response: - print(f"❌ Failed to start streaming: {response['error']}") - return False - - print("✅ Streaming mode started") - return True - - def switch_mode(self, mode: str) -> bool: - """Switch device mode between WiFi, UVC, and Setup""" - print(f"🔄 Switching device mode to '{mode}'...") - - params = {"mode": mode} - response = self.send_command("switch_mode", params) - - if "error" in response: - print(f"❌ Failed to switch mode: {response['error']}") - return False - - print(f"✅ Device mode switched to '{mode}' successfully!") - print("🔄 Please restart the device for changes to take effect") - return True - - def get_device_mode(self) -> str: - """Get current device mode""" - response = self.send_command("get_device_mode") - - if "error" in response: - print(f"❌ Failed to get device mode: {response['error']}") - return "unknown" - - try: - results = response.get("results", []) - if results: - result_data = json.loads(results[0]) - mode_data = json.loads(result_data["result"]) - return mode_data.get("mode", "unknown") - except Exception as e: - print(f"❌ Failed to parse mode response: {e}") - return "unknown" - - def get_mdns_name(self) -> str: - """Get the current MDNS name""" - response = self.send_command("get_mdns_name") - if "error" in response: - print(f"❌ Failed to get device name: {response['error']}") - return "unknown" - - try: - results = response.get("results", []) - if results: - result_data = json.loads(results[0]) - name_data = json.loads(result_data["result"]) - return name_data.get("hostname", "unknown") - except Exception as e: - print(f"❌ Failed to parse name response: {e}") - return "unknown" - - def set_led_duty_cycle(self, duty_cycle): - """Sets the PWN duty cycle of the LED""" - print(f"🌟 Setting LED duty cycle to {duty_cycle}%...") - response = self.send_command("set_led_duty_cycle", {"dutyCycle": duty_cycle}) - if "error" in response: - print(f"❌ Failed to set LED duty cycle: {response['error']}") - return False - - print("✅ LED duty cycle set successfully") - return True - - def get_led_duty_cycle(self) -> Optional[int]: - """Get the current LED PWM duty cycle from the device""" - response = self.send_command("get_led_duty_cycle") - if "error" in response: - print(f"❌ Failed to get LED duty cycle: {response['error']}") - return None - try: - results = response.get("results", []) - if results: - result_data = json.loads(results[0]) - payload = result_data["result"] - if isinstance(payload, str): - payload = json.loads(payload) - return int(payload.get("led_external_pwm_duty_cycle")) - except Exception as e: - print(f"❌ Failed to parse LED duty cycle: {e}") - return None - - def get_serial_info(self) -> Optional[Tuple[str, str]]: - """Get device serial number and MAC address""" - response = self.send_command("get_serial") - if "error" in response: - print(f"❌ Failed to get serial/MAC: {response['error']}") - return None - try: - results = response.get("results", []) - if results: - result_data = json.loads(results[0]) - payload = result_data["result"] - if isinstance(payload, str): - payload = json.loads(payload) - serial = payload.get("serial") - mac = payload.get("mac") - return serial, mac - except Exception as e: - print(f"❌ Failed to parse serial/MAC: {e}") - return None - - def monitor_logs(self): - """Monitor device logs until interrupted""" - print("📋 Monitoring device logs (Press Ctrl+C to exit)...") - print("-" * 60) - - if not self.connection or not self.connection.is_open: - print("❌ Not connected to device") - return - - try: - while True: - try: - if self.connection.in_waiting > 0: - line = self.connection.readline().decode().strip() - if line: - # Skip JSON command responses, show raw logs - if not (line.startswith('{') and line.endswith('}')): - print(line) - elif "heartbeat" not in line: - # Show non-heartbeat JSON responses - print(f"📡 {line}") - else: - time.sleep(0.1) # Small delay to prevent busy waiting - except Exception: - continue - except KeyboardInterrupt: - print("\n🛑 Log monitoring stopped") - - -class OpenIrisDiscovery: - def __init__(self): - self.devices: Dict[str, OpenIrisDevice] = {} - self.discovery_active = False - - def discover_devices(self, timeout: int = 3) -> List[OpenIrisDevice]: - """Discover OpenIris devices via heartbeat - ultra-fast concurrent scanning""" - print(f"⚡ Fast-scanning for OpenIris devices...") - - # Get all serial ports - ports = list(serial.tools.list_ports.comports()) - if not ports: - print("❌ No serial ports found") - return [] - - # Prioritize likely ESP32 USB ports for faster detection - priority_ports = [] - other_ports = [] - - for port in ports: - # Common ESP32 USB-to-serial descriptions - desc_lower = (port.description or "").lower() - # Include generic "USB Serial Device" which is common on Windows - if any(keyword in desc_lower for keyword in - ["cp210", "ch340", "ftdi", "esp32", "silicon labs", "usb-serial", "usb serial", "usb serial device"]): - priority_ports.append(port) - else: - other_ports.append(port) - - # Check priority ports first, then others - sorted_ports = priority_ports + other_ports - - if priority_ports: - print(f"📡 Checking {len(sorted_ports)} ports ({len(priority_ports)} prioritized USB serial ports)...") - else: - print(f"📡 Checking {len(sorted_ports)} serial ports...") - - discovered = {} - lock = threading.Lock() - threads = [] - - def check_port_fast(port_info): - """Check a single port for OpenIris heartbeat - optimized for speed""" - try: - # Initial connection timeout - 500ms - ser = serial.Serial(port_info.device, 115200, timeout=0.5) - ser.reset_input_buffer() - - # Wait up to 2 seconds for heartbeat - start_time = time.time() - while time.time() - start_time < 2.0: - try: - # Read timeout - 200ms - ser.timeout = 10 - if ser.in_waiting > 0: - line = ser.readline() - if line: - try: - data = json.loads(line.decode().strip()) - if (data.get("heartbeat") == "openiris_setup_mode" and - "serial" in data): - serial_num = data["serial"] - with lock: - if serial_num not in discovered: - device = OpenIrisDevice(port_info.device, serial_num, debug=False) - discovered[serial_num] = device - print(f"💓 Found {serial_num} on {port_info.device}") - # Return immediately to stop checking this port - ser.close() - return True - except (json.JSONDecodeError, UnicodeDecodeError): - pass - else: - time.sleep(0.05) # Very short sleep - except Exception: - pass - - ser.close() - except Exception: - # Port not available or not the right device - pass - return False - - # Start concurrent port checking - for port in sorted_ports: - thread = threading.Thread(target=check_port_fast, args=(port,)) - thread.daemon = True - thread.start() - threads.append(thread) - - # Wait for threads to complete or timeout - timeout_time = time.time() + timeout - for thread in threads: - remaining = timeout_time - time.time() - if remaining > 0: - thread.join(timeout=remaining) - - # If we found at least one device, return immediately - if discovered: - break - - devices = list(discovered.values()) - - if devices: - print(f"✅ Found {len(devices)} OpenIris device(s)") - else: - print("❌ No OpenIris devices found in {:.1f} seconds".format(time.time() - (timeout_time - timeout))) - print("💡 Device has 20-second setup window after power on") - - return devices - - def _check_port(self, port: str, discovered: Dict, timeout: int): - """Check a single port for OpenIris heartbeat""" - try: - with serial.Serial(port, 115200, timeout=1) as ser: - start_time = time.time() - - while time.time() - start_time < timeout: - try: - line = ser.readline().decode().strip() - if line: - try: - data = json.loads(line) - if (data.get("heartbeat") == "openiris_setup_mode" and - "serial" in data): - - serial_num = data["serial"] - if serial_num not in discovered: - discovered[serial_num] = OpenIrisDevice(port, serial_num, debug=False) - print(f"💓 Found device {serial_num} on {port}") - return - except json.JSONDecodeError: - continue - except Exception: - continue - except Exception: - # Port not available or not a serial device - pass - - -def scan_networks(device: OpenIrisDevice, args = None): - should_use_custom_timeout = input("Use custom scan timeout? (y/N): ").strip().lower() - if should_use_custom_timeout == 'y': - try: - timeout = int(input("Enter timeout in seconds (5-120): ")) - if 5 <= timeout <= 120: - device.scan_networks(timeout) - else: - print("❌ Timeout must be between 5 and 120 seconds") - device.scan_networks(args.scan_timeout) - except ValueError: - print("❌ Invalid timeout, using default") - device.scan_networks(args.scan_timeout) - else: - device.scan_networks(args.scan_timeout) - - -def configure_wifi(device: OpenIrisDevice, args = None): - if not device.networks: - print("❌ No networks available. Please scan first.") - return - - display_networks(device) - - while True: - net_choice = input("\nEnter network number (or 'back'): ").strip() - if net_choice.lower() == 'back': - break - - try: - net_idx = int(net_choice) - 1 - except ValueError: - print("❌ Please enter a number or 'back'") - continue - - sorted_networks = sorted(device.networks, key=lambda x: x.rssi, reverse=True) - - if 0 <= net_idx < len(sorted_networks): - selected_network = sorted_networks[net_idx] - - print(f"\n🔐 Selected: {selected_network.ssid}") - print(f"Security: {selected_network.security_type}") - - if selected_network.auth_mode == 0: # Open network - password = "" - print("🔓 Open network - no password required") - else: - password = input("Enter WiFi password: ") - - if device.set_wifi(selected_network.ssid, password): - print("✅ WiFi configured successfully!") - print("💡 Next steps:") - print(" • Open WiFi menu to connect to WiFi (if needed)") - print(" • Open WiFi menu to check WiFi status") - print(" • Start streaming from the main menu when connected") - break - else: - print("❌ Invalid network number") - - -def configure_mdns(device: OpenIrisDevice, args = None): - current_name = device.get_mdns_name() - print(f"\n📍 Current advertised name: {current_name} \n") - print("💡 This single name is used for both: mDNS (http://.local/) and USB UVC device descriptor.") - print("💡 Avoid spaces / special chars. Enter 'back' to cancel.") - - while True: - name_choice = input("\nDevice name: ").strip() - if any(space in name_choice for space in string.whitespace): - print("❌ Please avoid spaces and special characters") - else: - break - - if name_choice.lower() == "back": - return - - if device.set_mdns_name(name_choice): - print("✅ MDNS configured successfully!") - - -def display_networks(device: OpenIrisDevice, args = None): - """Display available WiFi networks in a formatted table""" - networks = device.networks - - if not networks: - print("❌ No networks available") - return - - print("\n📡 Available WiFi Networks:") - print("-" * 85) - print(f"{'#':<3} {'SSID':<32} {'Channel':<8} {'Signal':<20} {'Security':<15}") - print("-" * 85) - - # Networks are already sorted by signal strength from scan_networks - for i, network in enumerate(networks, 1): - # Create signal strength visualization - signal_bars = "▓" * min(5, max(0, (network.rssi + 100) // 10)) - signal_str = f"{signal_bars:<5} ({network.rssi} dBm)" - - # Format SSID (show hidden networks as ) - ssid_display = network.ssid if network.ssid else "" - - print(f"{i:<3} {ssid_display:<32} {network.channel:<8} {signal_str:<20} {network.security_type:<15}") - - print("-" * 85) - - # Show channel distribution - channels = {} - for net in networks: - channels[net.channel] = channels.get(net.channel, 0) + 1 - - print(f"\n📊 Channel distribution: ", end="") - for ch in sorted(channels.keys()): - print(f"Ch{ch}: {channels[ch]} networks ", end="") - print() - - -def check_wifi_status(device: OpenIrisDevice, args = None): - status = device.get_wifi_status() - if status: - print(f"📶 WiFi Status: {status.get('status', 'unknown')}") - print(f"📡 Networks configured: {status.get('networks_configured', 0)}") - if status.get('ip_address'): - print(f"🌐 IP Address: {status['ip_address']}") - else: - print("❌ Unable to get WiFi status") - - -def attempt_wifi_connection(device: OpenIrisDevice, args = None): - device.connect_wifi() - print("🕰️ Wait a few seconds then check status in the WiFi menu") - - -def start_streaming(device: OpenIrisDevice, args = None): - device.start_streaming() - print("🚀 Streaming started! Use 'Monitor logs' from the main menu.") - - -# ----- WiFi submenu ----- -def wifi_auto_setup(device: OpenIrisDevice, args=None): - print("\n⚙️ Automatic WiFi setup starting...") - scan_timeout = getattr(args, "scan_timeout", 30) if args else 30 - - # 1) Scan - if not device.scan_networks(timeout=scan_timeout): - print("❌ Auto-setup aborted: no networks found or scan failed") - return - - # 2) Show networks (sorted strongest-first already) - display_networks(device) - - # 3) Select a network (default strongest) - choice = input("Select network number [default: 1] or 'back': ").strip() - if choice.lower() == "back": - return - try: - idx = int(choice) - 1 if choice else 0 - except ValueError: - idx = 0 - - sorted_networks = sorted(device.networks, key=lambda x: x.rssi, reverse=True) - if not (0 <= idx < len(sorted_networks)): - print("⚠️ Invalid selection, using strongest network") - idx = 0 - - selected = sorted_networks[idx] - print(f"\n🔐 Selected: {selected.ssid if selected.ssid else ''}") - if selected.auth_mode == 0: - password = "" - print("🔓 Open network - no password required") - else: - password = input("Enter WiFi password: ") - - # 4) Configure WiFi - if not device.set_wifi(selected.ssid, password): - print("❌ Auto-setup aborted: failed to configure WiFi") - return - - # 5) Connect - if not device.connect_wifi(): - print("❌ Auto-setup aborted: failed to start WiFi connection") - return - - # 6) Wait for IP / connected status - print("⏳ Connecting to WiFi, waiting for IP...") - start = time.time() - timeout_s = 30 - ip = None - last_status = None - while time.time() - start < timeout_s: - status = device.get_wifi_status() - last_status = status - ip = (status or {}).get("ip_address") - if ip and ip not in ("0.0.0.0", "", None): - break - time.sleep(0.5) - - if ip and ip not in ("0.0.0.0", "", None): - print(f"✅ Connected! IP Address: {ip}") - else: - print("⚠️ Connection not confirmed within timeout") - if last_status: - print(f" Status: {last_status.get('status', 'unknown')} | IP: {last_status.get('ip_address', '-')}") - - -def wifi_menu(device: OpenIrisDevice, args=None): - while True: - print("\n📶 WiFi Settings:") - print(f"{str(1):>2} ⚙️ Automatic WiFi setup") - print(f"{str(2):>2} 📁 Manual WiFi actions") - print("back Back") - - choice = input("\nSelect option (1-2 or 'back'): ").strip() - if choice.lower() == "back": - break - - if choice == "1": - wifi_auto_setup(device, args) - elif choice == "2": - wifi_manual_menu(device, args) - else: - print("❌ Invalid option") - - -def wifi_manual_menu(device: OpenIrisDevice, args=None): - while True: - print("\n📁 WiFi Manual Actions:") - print(f"{str(1):>2} 🔍 Scan for WiFi networks") - print(f"{str(2):>2} 📡 Show available networks") - print(f"{str(3):>2} 🔐 Configure WiFi") - print(f"{str(4):>2} 🔗 Connect to WiFi") - print(f"{str(5):>2} 🛰️ Check WiFi status") - print("back Back") - - choice = input("\nSelect option (1-5 or 'back'): ").strip() - if choice.lower() == "back": - break - - sub_map = { - "1": scan_networks, - "2": display_networks, - "3": configure_wifi, - "4": attempt_wifi_connection, - "5": check_wifi_status, - } - - handler = sub_map.get(choice) - if not handler: - print("❌ Invalid option") - continue - handler(device, args) - - -def switch_device_mode(device: OpenIrisDevice, args = None): - current_mode = device.get_device_mode() - print(f"\n📍 Current device mode: {current_mode}") - print("\n🔄 Select new device mode:") - print("1. WiFi - Stream over WiFi connection") - print("2. UVC - Stream as USB webcam") - print("3. Setup - Configuration mode") - - mode_choice = input("\nSelect mode (1-3): ").strip() - - if mode_choice == "1": - device.switch_mode("wifi") - elif mode_choice == "2": - device.switch_mode("uvc") - elif mode_choice == "3": - device.switch_mode("setup") - else: - print("❌ Invalid mode selection") - - -def set_led_duty_cycle(device: OpenIrisDevice, args=None): - # Show current duty cycle on entry - current = device.get_led_duty_cycle() - if current is not None: - print(f"💡 Current LED duty cycle: {current}%") - - while True: - input_data = input("Enter LED external PWM duty cycle (0-100) or `back` to exit: \n") - if input_data.lower() == "back": - break - - try: - duty_cycle = int(input_data) - except ValueError: - print("❌ Invalid input. Please enter a number between 0 and 100.") - - if duty_cycle < 0 or duty_cycle > 100: - print("❌ Duty cycle must be between 0 and 100.") - else: - # Apply immediately; stay in loop for further tweaks - if device.set_led_duty_cycle(duty_cycle): - # Read back and display current value using existing getter - updated = device.get_led_duty_cycle() - if updated is not None: - print(f"💡 Current LED duty cycle: {updated}%") - else: - print("ℹ️ Duty cycle updated, but current value could not be read back.") - - -def monitor_logs(device: OpenIrisDevice, args = None): - device.monitor_logs() - - -def get_led_duty_cycle(device: OpenIrisDevice, args=None): - duty = device.get_led_duty_cycle() - if duty is not None: - print(f"💡 Current LED duty cycle: {duty}%") - - -def get_serial(device: OpenIrisDevice, args=None): - info = device.get_serial_info() - if info is not None: - serial, mac = info -# print(f"🔑 Serial: {serial}") - print(f"🔗 MAC: {mac}") - - -# ----- Aggregated GET: settings summary ----- -def _probe_serial(device: OpenIrisDevice) -> Dict: - info = device.get_serial_info() - if info is None: - return {"serial": None, "mac": None} - serial, mac = info - return {"serial": serial, "mac": mac} - -def _probe_info(device: OpenIrisDevice) -> Dict: - resp = device.send_command("get_who_am_i") - if "error" in resp: - return {"who_am_i": None, "version": None, "error": resp["error"]} - try: - results = resp.get("results", []) - if results: - result_data = json.loads(results[0]) - payload = result_data["result"] - if isinstance(payload, str): - payload = json.loads(payload) - return {"who_am_i": payload.get("who_am_i"), "version": payload.get("version")} - except Exception as e: - return {"who_am_i": None, "version": None, "error": str(e)} - return {"who_am_i": None, "version": None} - -def _probe_advertised_name(device: OpenIrisDevice) -> Dict: - # Currently the advertised name == mdns hostname - name = device.get_mdns_name() - return {"advertised_name": name} - - -def _probe_led_pwm(device: OpenIrisDevice) -> Dict: - duty = device.get_led_duty_cycle() - return {"led_external_pwm_duty_cycle": duty} - -def _probe_led_current(device: OpenIrisDevice) -> Dict: - # Query device for current in mA via new command - resp = device.send_command("get_led_current") - if "error" in resp: - return {"led_current_ma": None, "error": resp["error"]} - try: - results = resp.get("results", []) - if results: - result_data = json.loads(results[0]) - payload = result_data["result"] - if isinstance(payload, str): - payload = json.loads(payload) - return {"led_current_ma": float(payload.get("led_current_ma"))} - except Exception as e: - return {"led_current_ma": None, "error": str(e)} - return {"led_current_ma": None} - - -def _probe_mode(device: OpenIrisDevice) -> Dict: - mode = device.get_device_mode() - return {"mode": mode} - - -def _probe_wifi_status(device: OpenIrisDevice) -> Dict: - # Returns dict as provided by device; pass through - status = device.get_wifi_status() or {} - return {"wifi_status": status} - - -def get_settings(device: OpenIrisDevice, args=None): - print("\n🧩 Collecting device settings...\n") - - probes = [ - ("Identity", _probe_serial), - ("AdvertisedName", _probe_advertised_name), - ("Info", _probe_info), - ("LED", _probe_led_pwm), - ("Current", _probe_led_current), - ("Mode", _probe_mode), - ("WiFi", _probe_wifi_status), - ] - - summary: Dict[str, Dict] = {} - - for label, probe in probes: - try: - data = probe(device) - summary[label] = data - except Exception as e: - summary[label] = {"error": str(e)} - - # Pretty print summary - # Identity - ident = summary.get("Identity", {}) - serial = ident.get("serial") - mac = ident.get("mac") - if serial: - print(f"🔑 Serial: {serial}") - # if mac: - # print(f"🔗 MAC: {mac}") - if not serial and not mac: - print("🔑 Serial/MAC: unavailable") - - # Advertised Name - advertised_name_data = summary.get("AdvertisedName", {}) - if advertised_name := advertised_name_data.get("advertised_name"): - print(f"📛 Name: {advertised_name}") - - # Info - info = summary.get("Info", {}) - who = info.get("who_am_i") - ver = info.get("version") - if who: - print(f"🏷️ Device: {who}") - if ver: - print(f"🧭 Version: {ver}") - - # LED - led = summary.get("LED", {}) - duty = led.get("led_external_pwm_duty_cycle") - if duty is not None: - print(f"💡 LED PWM Duty: {duty}%") - else: - print("💡 LED PWM Duty: unknown") - - # Mode - mode = summary.get("Mode", {}).get("mode") - print(f"🎚️ Mode: {mode if mode else 'unknown'}") - - # Current - current_section = summary.get("Current", {}) - if (led_current_ma := current_section.get("led_current_ma")) is not None: - print(f"🔌 LED Current: {led_current_ma:.3f} mA") - else: - if (err := current_section.get("error")): - print(f"🔌 LED Current: unavailable ({err})") - else: - print("🔌 LED Current: unavailable") - - # WiFi - wifi = summary.get("WiFi", {}).get("wifi_status", {}) - if wifi: - status = wifi.get("status", "unknown") - ip = wifi.get("ip_address") or "-" - configured = wifi.get("networks_configured", 0) - print(f"📶 WiFi: {status} | IP: {ip} | Networks configured: {configured}") - else: - print("📶 WiFi: status unavailable") - - print("") - - -COMMANDS_MAP = { - "1": wifi_menu, - "2": configure_mdns, - "3": start_streaming, - "4": switch_device_mode, - "5": set_led_duty_cycle, - "6": monitor_logs, - "7": get_settings, -} - - -def main(): - parser = argparse.ArgumentParser(description="OpenIris Setup CLI Tool") - parser.add_argument("--timeout", type=int, default=3, - help="Discovery timeout in seconds (default: 3)") - parser.add_argument("--port", type=str, - help="Skip discovery and connect directly to specified port") - parser.add_argument("--scan-timeout", type=int, default=30, - help="WiFi scan timeout in seconds (default: 30)") - parser.add_argument("--no-auto", action="store_true", - help="Don't auto-connect to first device found") - parser.add_argument("--debug", action="store_true", - help="Show debug output including raw serial data") - args = parser.parse_args() - - print("🔧 OpenIris Setup Tool") - print("=" * 50) - - device = None - - try: - if args.port: - # Connect directly to specified port - print(f"📡 Connecting directly to {args.port}...") - device = OpenIrisDevice(args.port, "direct", debug=args.debug) - if not device.connect(): - return 1 - else: - # Fast device discovery - discovery = OpenIrisDiscovery() - devices = discovery.discover_devices(args.timeout) - - if not devices: - print("\n❌ No OpenIris devices found automatically") - print("\n💡 Troubleshooting:") - print(" - Make sure device is connected via USB") - print(" - Device must be powered on within last 20 seconds") - print(" - Try specifying port manually with --port") - - # Show available ports to help user - print("\n📋 Available serial ports:") - all_ports = list(serial.tools.list_ports.comports()) - for p in all_ports: - print(f" - {p.device}: {p.description}") - - # Offer manual port entry - manual_port = input("\n🔌 Enter serial port manually (e.g. COM15, /dev/ttyUSB0) or press Enter to exit: ").strip() - if manual_port: - device = OpenIrisDevice(manual_port, "manual", debug=args.debug) - if not device.connect(): - return 1 - else: - return 1 - else: - # Auto-connect to first device found (unless disabled) - if len(devices) == 1 or not args.no_auto: - device = devices[0] - device.debug = args.debug # Set debug mode - print(f"\n🎯 Auto-connecting to {device.serial_number}...") - if not device.connect(): - return 1 - else: - # Multiple devices found with no-auto flag - print("\n🔢 Multiple devices found. Select one:") - for i, dev in enumerate(devices, 1): - print(f" {i}. {dev.serial_number} on {dev.port}") - - while True: - try: - choice = int(input("\nEnter device number: ")) - 1 - if 0 <= choice < len(devices): - device = devices[choice] - break - else: - print("❌ Invalid selection") - except ValueError: - print("❌ Please enter a number") - - # Connect to selected device - device.debug = args.debug # Set debug mode - if not device.connect(): - return 1 - - # Main interaction loop - while True: - print("\n🔧 Setup Options:") - print(f"{str(1):>2} 📶 WiFi settings") - print(f"{str(2):>2} 📛 Configure advertised name (mDNS + UVC)") - print(f"{str(3):>2} 🚀 Start streaming mode") - print(f"{str(4):>2} 🔄 Switch device mode (WiFi/UVC/Setup)") - print(f"{str(5):>2} 💡 Update PWM Duty Cycle") - print(f"{str(6):>2} 📖 Monitor logs") - print(f"{str(7):>2} 🧩 Get settings summary") - print("exit 🚪 Exit") - choice = input("\nSelect option (1-7): ").strip() - - if choice == "exit": - break - - command_to_run = COMMANDS_MAP.get(choice, None) - if not command_to_run: - print("❌ Invalid option") - continue - - command_to_run(device, args) - - except KeyboardInterrupt: - print("\n🛑 Setup interrupted") - - finally: - if device: - device.disconnect() - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file