diff --git a/tools/setup_openiris.py b/tools/setup_openiris.py new file mode 100644 index 0000000..5d88fd8 --- /dev/null +++ b/tools/setup_openiris.py @@ -0,0 +1,314 @@ +# /// script +# dependencies = [ +# "pyserial>=3.5", +# ] +# /// + + +import json +import time +import argparse +import sys +import serial + + +def is_back(choice: str): + return choice.lower() in ["back", "b", "exit"] + + +class SubMenu: + def __init__(self, title, context: dict, parent_menu=None): + self.title = title + self.context = context + self.items = [] + + if parent_menu: + parent_menu.add_submenu(self) + + def render(self, idx: int): + print(f"{str(idx + 1):>2} {self.title}") + + def add_submenu(self, submenu): + self.items.append(submenu) + + def add_action(self, title, action): + self.items.append((title, action)) + + def validate_choice(self, choice: str): + try: + parsed_choice = int(choice) + if parsed_choice >= 0 and parsed_choice <= len(self.items): + return parsed_choice + except ValueError: + pass + + # we'll print the error regardless if it was an exception or wrong choice + print("āŒ Invalid choice") + print("-" * 50) + + def show(self): + while True: + for idx, item in enumerate(self.items): + if isinstance(item, SubMenu): + item.render(idx) + else: + print(f"{str(idx + 1):>2} {item[0]}") + print("[Back] To go back") + + choice = input(">> ") + if is_back(choice): + break + + choice = self.validate_choice(choice) + if not choice: + continue + + selected_element = self.items[int(choice) - 1] + if isinstance(selected_element, SubMenu): + selected_element.show() + else: + selected_element[1](**self.context) + + +class Menu(SubMenu): + def __init__(self, title, context=None, parent_menu=None): + super().__init__(title, context, parent_menu) + + +class OpenIrisDevice: + def __init__(self, port: str, debug: bool, debug_commands: bool): + self.port = port + self.debug = debug + self.debug_commands = debug_commands + self.connection: serial.Serial | None = None + self.connected = False + + def __enter__(self): + self.connected = self.__connect() + return self + + def __exit__(self, type, value, traceback): + self.__disconnect() + + def __connect(self) -> bool: + print(f"šŸ“” Connecting directly to {self.port}...") + + try: + self.connection = serial.Serial( + port=self.port, baudrate=115200, timeout=1, write_timeout=1 + ) + print(f"āœ… Connected to the device on {self.port}") + return True + except Exception as e: + print(f"āŒ Failed to connect to {self.port}: {e}") + return False + + def __disconnect(self): + if self.connection and self.connection.is_open: + self.connection.close() + print(f"šŸ”Œ Disconnected from {self.port}") + + def __check_if_response_is_complete(self, response) -> dict | None: + try: + return json.loads(response) + except ValueError: + return None + + def __read_response(self, timeout: int | None = None) -> dict | None: + # we can try and retrieve the response now. + # it should be more or less immediate, but some commands may take longer + # so we gotta timeout + timeout = timeout if timeout is not None else 15 + start_time = time.time() + response_buffer = "" + while time.time() - start_time < timeout: + if self.connection.in_waiting: + packet = self.connection.read_all().decode("utf-8", errors="ignore") + if self.debug and packet.strip(): + print(f"Received: {packet}") + print("-" * 10) + print(f"Current buffer: {response_buffer}") + print("-" * 10) + + # we can't rely on new lines to detect if we're done + # nor can we assume that we're always gonna get valid json response + # but we can assume that if we're to get a valid response, it's gonna be json + # so we can start actually building the buffer only when + # some part of the packet starts with "{", and start building from there + # we can assume that no further data will be returned, so we can validate + # right after receiving the last packet + if (not response_buffer and "{" in packet) or response_buffer: + # assume we just started building the buffer and we've received the first packet + + # alternative approach in case this doesn't work - we're always sending a valid json + # so we can start building the buffer from the first packet and keep trying to find the + # starting and ending brackets, extract that part and validate, if the message is complete, return + if not response_buffer: + starting_idx = packet.find("{") + response_buffer = packet[starting_idx:] + else: + response_buffer += packet + + # and once we get something, we can validate if it's a valid json + if parsed_response := self.__check_if_response_is_complete( + response_buffer + ): + return parsed_response + else: + time.sleep(0.1) + return None + + def is_connected(self) -> bool: + return self.connected + + def send_command( + self, command: str, params: dict | None = None, timeout: int | None = None + ) -> dict: + if not self.connection or not self.connection.is_open: + return {"error": "Device Not Connected"} + + cmd_obj = {"commands": [{"command": command}]} + if params: + cmd_obj["commands"][0]["data"] = params + + # we're expecting the json string to end with a new line + # to signify we've finished sending the command + cmd_str = json.dumps(cmd_obj) + "\n" + try: + # clean it out first, just to be sure we're starting fresh + self.connection.reset_input_buffer() + if self.debug or self.debug_commands: + print(f"Sending command: {cmd_str}") + self.connection.write(cmd_str.encode()) + response = self.__read_response(timeout) + + if self.debug: + print(f"Received response: {response}") + + return response or {"error": "Command timeout"} + + except Exception as e: + return {"error": f"Communication error: {e}"} + + +def has_command_failed(result) -> bool: + return "error" in result or result["results"][0]["result"]["status"] != "success" + + +def get_device_mode(device: OpenIrisDevice) -> dict: + command_result = device.send_command("get_device_mode") + if has_command_failed(command_result): + return {"mode": "unknown"} + + return command_result["results"][0]["result"]["data"]["mode"].lower() + + +def switch_device_mode_command(device: OpenIrisDevice, *args, **kwargs): + modes = ["wifi", "uvc", "auto"] + current_mode = get_device_mode(device) + print(f"\nšŸ“ Current device mode: {current_mode}") + print("\nšŸ”„ Select new device mode:") + print("1. WiFi - Stream over WiFi connection") + print("2. UVC - Stream as USB webcam") + print("3. Auto - Automatic mode selection") + print("Back - Return to main menu") + + mode_choice = input("\nSelect mode (1-3): ").strip() + if is_back(mode_choice): + return + + try: + mode = modes[int(mode_choice) - 1] + except ValueError: + print("āŒ Invalid mode selection") + return + + command_result = device.send_command("switch_mode", {"mode": mode}) + if "error" in command_result: + print(f"āŒ Failed to switch mode: {command_result['error']}") + return + + print(f"āœ… Device mode switched to '{mode}' successfully!") + print("šŸ”„ Please restart the device for changes to take effect") + + +def start_streaming(device: OpenIrisDevice, *args, **kwargs): + print("šŸš€ Starting streaming mode...") + response = device.send_command("start_streaming") + + if "error" in response: + print(f"āŒ Failed to start streaming: {response['error']}") + return + + print("āœ… Streaming mode started") + + +def handle_menu(menu_context: dict | None = None) -> str: + menu = Menu("OpenIris Setup", menu_context) + wifi_settings = SubMenu("šŸ“¶ WiFi settings", menu_context, menu) + wifi_settings.add_action("āš™ļø Automatic WiFi setup", lambda: None) + manual_wifi_actions = SubMenu( + "šŸ“ WiFi Manual Actions:", + menu_context, + wifi_settings, + ) + + # simple commands can just be functions, they will get passed some context to them by the menu + manual_wifi_actions.add_action("šŸ” Scan for WiFi networks", lambda: None) + manual_wifi_actions.add_action("šŸ“” Show available networks", lambda: None) + manual_wifi_actions.add_action("šŸ” Configure WiFi", lambda: None) + manual_wifi_actions.add_action("šŸ”— Connect to WiFi", lambda: None) + manual_wifi_actions.add_action("šŸ›°ļø Check WiFi status", lambda: None) + + menu.add_action("🌐 Configure MDNS", lambda: None) + menu.add_action("šŸ’» Configure UVC Name", lambda: None) + menu.add_action("šŸš€ Start streaming mode", lambda: None) + menu.add_action("šŸ”„ Switch device mode (WiFi/UVC/Auto)", switch_device_mode_command) + menu.add_action("šŸ’” Update PWM Duty Cycle", lambda: start_streaming) + menu.add_action("🧩 Get settings summary", lambda: None) + menu.show() + + +def valid_port(port: str): + if not port.startswith("COM"): + raise argparse.ArgumentTypeError("Invalid port name. We only support COM ports") + return port + + +def main(): + parser = argparse.ArgumentParser(description="OpenIris CLI Setup Tool") + parser.add_argument( + "--port", + type=valid_port, + help="Serial port to connect to [COM4, COM3, etc]", + required=True, + ) + parser.add_argument( + "--debug", + action="store_true", + help="Show debug output including raw serial data", + ) + parser.add_argument( + "--show-commands", + action="store_true", + help="Debug mode, but will show only sent commands", + ) + args = parser.parse_args() + + print("šŸ”§ OpenIris Setup Tool") + print("=" * 50) + + with OpenIrisDevice(args.port, args.debug, args.show_commands) as device: + if not device.is_connected(): + return 1 + + try: + handle_menu({"device": device, "args": args}) + except KeyboardInterrupt: + print("\nšŸ›‘ Setup interrupted, disconnecting") + + return 0 + + +if __name__ == "__main__": + sys.exit(main())