""" ESP32 Bluetooth Relay Controller Integration Provides Bluetooth Serial communication with ESP32 8-channel relay controller """ import serial import serial.tools.list_ports import time import threading import logging from typing import Optional, Callable, Dict, List logger = logging.getLogger(__name__) class ESP32RelayController: """ Controls an ESP32 8-channel Bluetooth relay controller Usage: relay = ESP32RelayController() relay.connect("COM4") # or /dev/ttyUSB0 on Linux relay.turn_on(1) # Turn on relay 1 relay.turn_off(1) # Turn off relay 1 relay.all_on() # Turn all relays on relay.all_off() # Turn all relays off relay.disconnect() """ def __init__(self, device_name: str = "ESP32-Relay-8CH", auto_reconnect: bool = True): """ Initialize the relay controller Args: device_name: Bluetooth device name to search for auto_reconnect: Automatically reconnect if connection is lost """ self.device_name = device_name self.auto_reconnect = auto_reconnect self.serial: Optional[serial.Serial] = None self.connected = False self.relay_states = [False] * 8 self._read_thread: Optional[threading.Thread] = None self._running = False self._callbacks: Dict[str, List[Callable]] = { 'connect': [], 'disconnect': [], 'status_update': [] } def find_bluetooth_ports(self) -> List[str]: """ Find all available serial ports (including Bluetooth COM ports) Returns: List of available port names """ ports = serial.tools.list_ports.comports() available_ports = [] for port in ports: logger.debug(f"Found port: {port.device} - {port.description}") available_ports.append(port.device) return available_ports def connect(self, port: Optional[str] = None, baudrate: int = 115200, timeout: float = 2.0) -> bool: """ Connect to ESP32 relay controller Args: port: Serial port name (e.g., 'COM4' or '/dev/rfcomm0') If None, will attempt to auto-detect baudrate: Serial communication speed (default 115200) timeout: Read timeout in seconds Returns: True if connection successful, False otherwise """ if self.connected: logger.warning("Already connected") return True try: # Auto-detect port if not specified if port is None: ports = self.find_bluetooth_ports() if not ports: logger.error("No serial ports found") return False port = ports[0] logger.info(f"Auto-selected port: {port}") # Establish serial connection self.serial = serial.Serial( port=port, baudrate=baudrate, timeout=timeout, write_timeout=timeout ) # Wait for connection to stabilize time.sleep(1) # Clear any buffered data self.serial.reset_input_buffer() self.serial.reset_output_buffer() # Test connection with STATUS command self._send_command("STATUS") time.sleep(0.5) # Read and parse status response = self._read_response(timeout=2.0) if response and "Relay" in response: self.connected = True logger.info(f"Connected to ESP32 relay controller on {port}") # Start read thread self._running = True self._read_thread = threading.Thread(target=self._read_loop, daemon=True) self._read_thread.start() # Trigger connect callbacks self._trigger_callbacks('connect') return True else: logger.error("Failed to communicate with ESP32") self.serial.close() return False except serial.SerialException as e: logger.error(f"Serial connection error: {e}") return False except Exception as e: logger.error(f"Unexpected error during connection: {e}") return False def disconnect(self): """Disconnect from ESP32 relay controller""" self._running = False if self._read_thread: self._read_thread.join(timeout=2.0) if self.serial and self.serial.is_open: self.serial.close() self.connected = False logger.info("Disconnected from ESP32 relay controller") # Trigger disconnect callbacks self._trigger_callbacks('disconnect') def _send_command(self, command: str) -> bool: """ Send command to ESP32 Args: command: Command string (e.g., 'ON1', 'OFF2', 'STATUS') Returns: True if command sent successfully """ if not self.connected or not self.serial: logger.warning("Not connected to ESP32") return False try: # Add newline and send command_bytes = f"{command}\n".encode('utf-8') self.serial.write(command_bytes) self.serial.flush() logger.debug(f"Sent command: {command}") return True except Exception as e: logger.error(f"Error sending command '{command}': {e}") self._handle_connection_loss() return False def _read_response(self, timeout: float = 1.0) -> str: """ Read response from ESP32 Args: timeout: Maximum time to wait for response Returns: Response string """ if not self.serial: return "" start_time = time.time() response_lines = [] try: while time.time() - start_time < timeout: if self.serial.in_waiting > 0: line = self.serial.readline().decode('utf-8', errors='ignore').strip() if line: response_lines.append(line) logger.debug(f"Received: {line}") else: time.sleep(0.05) return "\n".join(response_lines) except Exception as e: logger.error(f"Error reading response: {e}") return "" def _read_loop(self): """Background thread to continuously read from serial port""" while self._running and self.serial and self.serial.is_open: try: if self.serial.in_waiting > 0: line = self.serial.readline().decode('utf-8', errors='ignore').strip() if line: logger.debug(f"Read: {line}") self._parse_status_line(line) else: time.sleep(0.1) except Exception as e: logger.error(f"Error in read loop: {e}") if self._running: self._handle_connection_loss() break def _parse_status_line(self, line: str): """ Parse status line from ESP32 and update relay states Args: line: Status line (e.g., "Relay 1: ON") """ try: if "Relay" in line and ":" in line: parts = line.split(":") if len(parts) >= 2: # Extract relay number relay_part = parts[0].strip().split() if len(relay_part) >= 2: relay_num = int(relay_part[1]) - 1 # Convert to 0-based index # Extract state state_str = parts[1].strip().upper() state = "ON" in state_str if 0 <= relay_num < 8: self.relay_states[relay_num] = state self._trigger_callbacks('status_update', relay_num=relay_num, state=state) except Exception as e: logger.debug(f"Error parsing status line '{line}': {e}") def _handle_connection_loss(self): """Handle unexpected connection loss""" logger.warning("Connection lost to ESP32") self.connected = False if self.serial: try: self.serial.close() except: pass self._trigger_callbacks('disconnect') # TODO: Implement auto-reconnect if enabled def turn_on(self, relay_num: int) -> bool: """ Turn on a specific relay Args: relay_num: Relay number (1-8) Returns: True if command sent successfully """ if not 1 <= relay_num <= 8: logger.error(f"Invalid relay number: {relay_num}. Must be 1-8") return False return self._send_command(f"ON{relay_num}") def turn_off(self, relay_num: int) -> bool: """ Turn off a specific relay Args: relay_num: Relay number (1-8) Returns: True if command sent successfully """ if not 1 <= relay_num <= 8: logger.error(f"Invalid relay number: {relay_num}. Must be 1-8") return False return self._send_command(f"OFF{relay_num}") def toggle(self, relay_num: int) -> bool: """ Toggle a specific relay Args: relay_num: Relay number (1-8) Returns: True if command sent successfully """ if not 1 <= relay_num <= 8: logger.error(f"Invalid relay number: {relay_num}. Must be 1-8") return False return self._send_command(f"TOGGLE{relay_num}") def all_on(self) -> bool: """Turn all relays on""" return self._send_command("ALL_ON") def all_off(self) -> bool: """Turn all relays off""" return self._send_command("ALL_OFF") def get_status(self) -> bool: """Request status update for all relays""" return self._send_command("STATUS") def get_relay_state(self, relay_num: int) -> Optional[bool]: """ Get cached state of a specific relay Args: relay_num: Relay number (1-8) Returns: True if ON, False if OFF, None if invalid relay number """ if not 1 <= relay_num <= 8: return None return self.relay_states[relay_num - 1] def get_all_states(self) -> List[bool]: """ Get cached states of all relays Returns: List of 8 boolean values (True=ON, False=OFF) """ return self.relay_states.copy() def register_callback(self, event: str, callback: Callable): """ Register callback for events Args: event: Event type ('connect', 'disconnect', 'status_update') callback: Function to call when event occurs For 'status_update', callback receives (relay_num, state) """ if event in self._callbacks: self._callbacks[event].append(callback) else: logger.warning(f"Unknown event type: {event}") def _trigger_callbacks(self, event: str, **kwargs): """Trigger all callbacks registered for an event""" for callback in self._callbacks.get(event, []): try: callback(**kwargs) except Exception as e: logger.error(f"Error in callback for event '{event}': {e}") def __enter__(self): """Context manager entry""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" self.disconnect() def __del__(self): """Cleanup on destruction""" self.disconnect() # Example usage and testing if __name__ == "__main__": # Configure logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create controller relay = ESP32RelayController() # Register status update callback def on_status_update(relay_num, state): print(f" → Relay {relay_num + 1} is now {'ON' if state else 'OFF'}") relay.register_callback('status_update', on_status_update) # Find available ports print("\nAvailable serial ports:") ports = relay.find_bluetooth_ports() for i, port in enumerate(ports, 1): print(f" {i}. {port}") if not ports: print("No serial ports found. Make sure ESP32 is paired via Bluetooth.") exit(1) # Select port port_choice = input(f"\nSelect port number (1-{len(ports)}) or press Enter for auto: ").strip() selected_port = ports[int(port_choice) - 1] if port_choice.isdigit() else None # Connect print("\nConnecting to ESP32...") if relay.connect(selected_port): print("✓ Connected successfully!\n") # Get initial status print("Getting status...") relay.get_status() time.sleep(1) # Demo: Turn relays on/off print("\nDemo: Cycling through relays...") for i in range(1, 9): print(f" Turning ON relay {i}") relay.turn_on(i) time.sleep(0.5) print(f" Turning OFF relay {i}") relay.turn_off(i) time.sleep(0.5) # All on/off test print("\nTesting ALL_ON...") relay.all_on() time.sleep(1) print("Testing ALL_OFF...") relay.all_off() time.sleep(1) # Interactive mode print("\n--- Interactive Mode ---") print("Commands: ON1-8, OFF1-8, TOGGLE1-8, ALL_ON, ALL_OFF, STATUS, QUIT") while True: cmd = input("\nCommand: ").strip().upper() if cmd == "QUIT": break elif cmd.startswith("ON") and len(cmd) == 3 and cmd[2].isdigit(): relay.turn_on(int(cmd[2])) elif cmd.startswith("OFF") and len(cmd) == 4 and cmd[3].isdigit(): relay.turn_off(int(cmd[3])) elif cmd.startswith("TOGGLE") and len(cmd) == 7 and cmd[6].isdigit(): relay.toggle(int(cmd[6])) elif cmd == "ALL_ON": relay.all_on() elif cmd == "ALL_OFF": relay.all_off() elif cmd == "STATUS": relay.get_status() else: print("Invalid command") # Disconnect print("\nDisconnecting...") relay.disconnect() print("✓ Disconnected") else: print("✗ Failed to connect to ESP32")