mirror of
https://github.com/MrUnknownDE/OpenIris-ESPIDF.git
synced 2026-04-21 23:43:44 +02:00
Improvements and refactors after CR, add option to modify mdns name and simplify setup tool
This commit is contained in:
@@ -6,11 +6,13 @@ This tool automatically discovers OpenIris devices via heartbeat,
|
||||
allows WiFi configuration, and monitors the device logs.
|
||||
"""
|
||||
|
||||
import re
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
import argparse
|
||||
import sys
|
||||
import string
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
@@ -98,7 +100,6 @@ class OpenIrisDevice:
|
||||
cmd_obj["commands"][0]["data"] = params
|
||||
|
||||
cmd_json = json.dumps(cmd_obj) + '\n'
|
||||
|
||||
try:
|
||||
# Clear any pending data
|
||||
self.connection.reset_input_buffer()
|
||||
@@ -130,9 +131,6 @@ class OpenIrisDevice:
|
||||
print(f"📡 Raw: {repr(data)}")
|
||||
print(f"📝 Buffer: {repr(response_buffer[-200:])}")
|
||||
|
||||
# Clean buffer and look for JSON
|
||||
import re
|
||||
|
||||
# Remove ANSI escape sequences
|
||||
clean_buffer = re.sub(r'\x1b\[[0-9;]*m', '', response_buffer)
|
||||
clean_buffer = clean_buffer.replace('\r', '')
|
||||
@@ -303,6 +301,26 @@ class OpenIrisDevice:
|
||||
print("✅ WiFi credentials set successfully")
|
||||
return True
|
||||
|
||||
def set_mdns_name(self, name: str) -> bool:
|
||||
"""Configure the MDNS hostname"""
|
||||
|
||||
print(f"🔧 Setting MDNS name to '{name}'...")
|
||||
print(" The device should be accessible under")
|
||||
print(f"http://{name}.local/")
|
||||
print("\n Note, this will also modify the name of the UVC device")
|
||||
|
||||
params = {
|
||||
"hostname": name
|
||||
}
|
||||
|
||||
response = self.send_command("set_mdns", params)
|
||||
if "error" in response:
|
||||
print(f"❌ MDNS name setup failed: {response['error']}")
|
||||
return False
|
||||
|
||||
print("✅ MDNS name set successfully")
|
||||
return True
|
||||
|
||||
def get_wifi_status(self) -> Dict:
|
||||
"""Get current WiFi connection status"""
|
||||
response = self.send_command("get_wifi_status")
|
||||
@@ -561,8 +579,89 @@ class OpenIrisDiscovery:
|
||||
pass
|
||||
|
||||
|
||||
def display_networks(networks: List[WiFiNetwork]):
|
||||
def scan_networks(device: OpenIrisDevice, args = None):
|
||||
should_use_custom_timeout = input("Use custom scan timeout? (y/N): ").strip().lower()
|
||||
if should_use_custom_timeout == 'y':
|
||||
try:
|
||||
timeout = int(input("Enter timeout in seconds (5-120): "))
|
||||
if 5 <= timeout <= 120:
|
||||
device.scan_networks(timeout)
|
||||
else:
|
||||
print("❌ Timeout must be between 5 and 120 seconds")
|
||||
device.scan_networks(args.scan_timeout)
|
||||
except ValueError:
|
||||
print("❌ Invalid timeout, using default")
|
||||
device.scan_networks(args.scan_timeout)
|
||||
else:
|
||||
device.scan_networks(args.scan_timeout)
|
||||
|
||||
|
||||
def configure_wifi(device: OpenIrisDevice, args = None):
|
||||
if not device.networks:
|
||||
print("❌ No networks available. Please scan first.")
|
||||
return
|
||||
|
||||
display_networks(device)
|
||||
|
||||
while True:
|
||||
net_choice = input("\nEnter network number (or 'back'): ").strip()
|
||||
if net_choice.lower() == 'back':
|
||||
break
|
||||
|
||||
try:
|
||||
net_idx = int(net_choice) - 1
|
||||
except ValueError:
|
||||
print("❌ Please enter a number or 'back'")
|
||||
continue
|
||||
|
||||
sorted_networks = sorted(device.networks, key=lambda x: x.rssi, reverse=True)
|
||||
|
||||
if 0 <= net_idx < len(sorted_networks):
|
||||
selected_network = sorted_networks[net_idx]
|
||||
|
||||
print(f"\n🔐 Selected: {selected_network.ssid}")
|
||||
print(f"Security: {selected_network.security_type}")
|
||||
|
||||
if selected_network.auth_mode == 0: # Open network
|
||||
password = ""
|
||||
print("🔓 Open network - no password required")
|
||||
else:
|
||||
password = input("Enter WiFi password: ")
|
||||
|
||||
if device.set_wifi(selected_network.ssid, password):
|
||||
print("✅ WiFi configured successfully!")
|
||||
print("💡 Next steps:")
|
||||
print(" 4. Check WiFi status")
|
||||
print(" 5. Connect to WiFi (if needed)")
|
||||
print(" 6. Start streaming when connected")
|
||||
break
|
||||
else:
|
||||
print("❌ Invalid network number")
|
||||
|
||||
|
||||
def configure_mdns(device: OpenIrisDevice, args = None):
|
||||
print("💡 Please enter your preferred device name, your board will be accessible under http://<name>.local/")
|
||||
print("💡 Please avoid spaces and special characters")
|
||||
print(" To back out, enter `back`")
|
||||
print("\n Note, this will also modify the name of the UVC device")
|
||||
while True:
|
||||
name_choice = input("\nDevice name: ").strip()
|
||||
if any(space in name_choice for space in string.whitespace):
|
||||
print("❌ Please avoid spaces and special characters")
|
||||
else:
|
||||
break
|
||||
|
||||
if name_choice.lower() == "back":
|
||||
return
|
||||
|
||||
if device.set_mdns_name(name_choice):
|
||||
print("✅ MDNS configured successfully!")
|
||||
|
||||
|
||||
def display_networks(device: OpenIrisDevice, args = None):
|
||||
"""Display available WiFi networks in a formatted table"""
|
||||
networks = device.networks
|
||||
|
||||
if not networks:
|
||||
print("❌ No networks available")
|
||||
return
|
||||
@@ -596,6 +695,65 @@ def display_networks(networks: List[WiFiNetwork]):
|
||||
print()
|
||||
|
||||
|
||||
def check_wifi_status(device: OpenIrisDevice, args = None):
|
||||
status = device.get_wifi_status()
|
||||
if status:
|
||||
print(f"📶 WiFi Status: {status.get('status', 'unknown')}")
|
||||
print(f"📡 Networks configured: {status.get('networks_configured', 0)}")
|
||||
if status.get('ip_address'):
|
||||
print(f"🌐 IP Address: {status['ip_address']}")
|
||||
else:
|
||||
print("❌ Unable to get WiFi status")
|
||||
|
||||
|
||||
def attempt_wifi_connection(device: OpenIrisDevice, args = None):
|
||||
device.connect_wifi()
|
||||
print("🕰️ Wait a few seconds then check status (option 4)")
|
||||
|
||||
|
||||
def start_streaming(device: OpenIrisDevice, args = None):
|
||||
device.start_streaming()
|
||||
print("🚀 Streaming started! Use option 8 to monitor logs.")
|
||||
|
||||
|
||||
def switch_device_mode(device: OpenIrisDevice, args = None):
|
||||
current_mode = device.get_device_mode()
|
||||
print(f"\n📍 Current device mode: {current_mode}")
|
||||
print("\n🔄 Select new device mode:")
|
||||
print("1. WiFi - Stream over WiFi connection")
|
||||
print("2. UVC - Stream as USB webcam")
|
||||
print("3. Auto - Automatic mode selection")
|
||||
|
||||
mode_choice = input("\nSelect mode (1-3): ").strip()
|
||||
|
||||
if mode_choice == "1":
|
||||
device.switch_mode("wifi")
|
||||
elif mode_choice == "2":
|
||||
device.switch_mode("uvc")
|
||||
elif mode_choice == "3":
|
||||
device.switch_mode("auto")
|
||||
else:
|
||||
print("❌ Invalid mode selection")
|
||||
|
||||
|
||||
def monitor_logs(device: OpenIrisDevice, args = None):
|
||||
device.monitor_logs()
|
||||
|
||||
|
||||
COMMANDS_MAP = {
|
||||
"1": scan_networks,
|
||||
"2": display_networks,
|
||||
"3": configure_wifi,
|
||||
"4": configure_mdns,
|
||||
"5": configure_mdns,
|
||||
"6": check_wifi_status,
|
||||
"7": attempt_wifi_connection,
|
||||
"8": start_streaming,
|
||||
"9": switch_device_mode,
|
||||
"10": monitor_logs,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="OpenIris Setup CLI Tool")
|
||||
parser.add_argument("--timeout", type=int, default=3,
|
||||
@@ -684,123 +842,25 @@ def main():
|
||||
print("1. 🔍 Scan for WiFi networks")
|
||||
print("2. 📡 Show available networks")
|
||||
print("3. 🔐 Configure WiFi")
|
||||
print("4. 📶 Check WiFi status")
|
||||
print("5. 🔗 Connect to WiFi")
|
||||
print("6. 🚀 Start streaming mode")
|
||||
print("7. 🔄 Switch device mode (WiFi/UVC/Auto)")
|
||||
print("8. 📋 Monitor logs")
|
||||
print("9. 🚪 Exit")
|
||||
print("4. 🌐 Configure MDNS")
|
||||
print("5. 💻 Configure UVC Name")
|
||||
print("6. 📶 Check WiFi status")
|
||||
print("7. 🔗 Connect to WiFi")
|
||||
print("8. 🚀 Start streaming mode")
|
||||
print("9. 🔄 Switch device mode (WiFi/UVC/Auto)")
|
||||
print("10. 📋 Monitor logs")
|
||||
print("exit. 🚪 Exit")
|
||||
choice = input("\nSelect option (1-10): ").strip()
|
||||
|
||||
choice = input("\nSelect option (1-9): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
# Ask if user wants custom timeout
|
||||
custom = input("Use custom scan timeout? (y/N): ").strip().lower()
|
||||
if custom == 'y':
|
||||
try:
|
||||
timeout = int(input("Enter timeout in seconds (5-120): "))
|
||||
if 5 <= timeout <= 120:
|
||||
device.scan_networks(timeout)
|
||||
else:
|
||||
print("❌ Timeout must be between 5 and 120 seconds")
|
||||
device.scan_networks(args.scan_timeout)
|
||||
except ValueError:
|
||||
print("❌ Invalid timeout, using default")
|
||||
device.scan_networks(args.scan_timeout)
|
||||
else:
|
||||
device.scan_networks(args.scan_timeout)
|
||||
|
||||
elif choice == "2":
|
||||
display_networks(device.networks)
|
||||
|
||||
elif choice == "3":
|
||||
if not device.networks:
|
||||
print("❌ No networks available. Please scan first.")
|
||||
continue
|
||||
|
||||
display_networks(device.networks)
|
||||
|
||||
while True:
|
||||
try:
|
||||
net_choice = input("\nEnter network number (or 'back'): ").strip()
|
||||
if net_choice.lower() == 'back':
|
||||
break
|
||||
|
||||
net_idx = int(net_choice) - 1
|
||||
sorted_networks = sorted(device.networks, key=lambda x: x.rssi, reverse=True)
|
||||
|
||||
if 0 <= net_idx < len(sorted_networks):
|
||||
selected_network = sorted_networks[net_idx]
|
||||
|
||||
print(f"\n🔐 Selected: {selected_network.ssid}")
|
||||
print(f"Security: {selected_network.security_type}")
|
||||
|
||||
if selected_network.auth_mode == 0: # Open network
|
||||
password = ""
|
||||
print("🔓 Open network - no password required")
|
||||
else:
|
||||
password = input("Enter WiFi password: ")
|
||||
|
||||
if device.set_wifi(selected_network.ssid, password):
|
||||
print("✅ WiFi configured successfully!")
|
||||
print("💡 Next steps:")
|
||||
print(" 4. Check WiFi status")
|
||||
print(" 5. Connect to WiFi (if needed)")
|
||||
print(" 6. Start streaming when connected")
|
||||
break
|
||||
else:
|
||||
print("❌ Invalid network number")
|
||||
except ValueError:
|
||||
print("❌ Please enter a number or 'back'")
|
||||
|
||||
elif choice == "4":
|
||||
# Check WiFi status
|
||||
status = device.get_wifi_status()
|
||||
if status:
|
||||
print(f"📶 WiFi Status: {status.get('status', 'unknown')}")
|
||||
print(f"📡 Networks configured: {status.get('networks_configured', 0)}")
|
||||
if status.get('ip_address'):
|
||||
print(f"🌐 IP Address: {status['ip_address']}")
|
||||
else:
|
||||
print("❌ Unable to get WiFi status")
|
||||
|
||||
elif choice == "5":
|
||||
# Attempt WiFi connection
|
||||
device.connect_wifi()
|
||||
print("🕰️ Wait a few seconds then check status (option 4)")
|
||||
|
||||
elif choice == "6":
|
||||
device.start_streaming()
|
||||
print("🚀 Streaming started! Use option 8 to monitor logs.")
|
||||
|
||||
elif choice == "7":
|
||||
# Switch device mode
|
||||
current_mode = device.get_device_mode()
|
||||
print(f"\n📍 Current device mode: {current_mode}")
|
||||
print("\n🔄 Select new device mode:")
|
||||
print("1. WiFi - Stream over WiFi connection")
|
||||
print("2. UVC - Stream as USB webcam")
|
||||
print("3. Auto - Automatic mode selection")
|
||||
|
||||
mode_choice = input("\nSelect mode (1-3): ").strip()
|
||||
|
||||
if mode_choice == "1":
|
||||
device.switch_mode("wifi")
|
||||
elif mode_choice == "2":
|
||||
device.switch_mode("uvc")
|
||||
elif mode_choice == "3":
|
||||
device.switch_mode("auto")
|
||||
else:
|
||||
print("❌ Invalid mode selection")
|
||||
|
||||
elif choice == "8":
|
||||
device.monitor_logs()
|
||||
|
||||
elif choice == "9":
|
||||
if choice == "exit":
|
||||
break
|
||||
|
||||
else:
|
||||
|
||||
command_to_run = COMMANDS_MAP.get(choice, None)
|
||||
if not command_to_run:
|
||||
print("❌ Invalid option")
|
||||
continue
|
||||
|
||||
command_to_run(device, args)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n🛑 Setup interrupted")
|
||||
|
||||
Reference in New Issue
Block a user