diff --git a/esp32_relay_controller.py b/esp32_relay_controller.py new file mode 100644 index 0000000..f1f335e --- /dev/null +++ b/esp32_relay_controller.py @@ -0,0 +1,479 @@ +""" +ESP32 Bluetooth Relay Controller Integration +Provides Bluetooth Serial communication with ESP32 8-channel relay controller +""" + +import serial +import serial.tools.list_ports +import time +import threading +import logging +from typing import Optional, Callable, Dict, List + +logger = logging.getLogger(__name__) + + +class ESP32RelayController: + """ + Controls an ESP32 8-channel Bluetooth relay controller + + Usage: + relay = ESP32RelayController() + relay.connect("COM4") # or /dev/ttyUSB0 on Linux + relay.turn_on(1) # Turn on relay 1 + relay.turn_off(1) # Turn off relay 1 + relay.all_on() # Turn all relays on + relay.all_off() # Turn all relays off + relay.disconnect() + """ + + def __init__(self, device_name: str = "ESP32-Relay-8CH", auto_reconnect: bool = True): + """ + Initialize the relay controller + + Args: + device_name: Bluetooth device name to search for + auto_reconnect: Automatically reconnect if connection is lost + """ + self.device_name = device_name + self.auto_reconnect = auto_reconnect + self.serial: Optional[serial.Serial] = None + self.connected = False + self.relay_states = [False] * 8 + self._read_thread: Optional[threading.Thread] = None + self._running = False + self._callbacks: Dict[str, List[Callable]] = { + 'connect': [], + 'disconnect': [], + 'status_update': [] + } + + def find_bluetooth_ports(self) -> List[str]: + """ + Find all available serial ports (including Bluetooth COM ports) + + Returns: + List of available port names + """ + ports = serial.tools.list_ports.comports() + available_ports = [] + + for port in ports: + logger.debug(f"Found port: {port.device} - {port.description}") + available_ports.append(port.device) + + return available_ports + + def connect(self, port: Optional[str] = None, baudrate: int = 115200, timeout: float = 2.0) -> bool: + """ + Connect to ESP32 relay controller + + Args: + port: Serial port name (e.g., 'COM4' or '/dev/rfcomm0') + If None, will attempt to auto-detect + baudrate: Serial communication speed (default 115200) + timeout: Read timeout in seconds + + Returns: + True if connection successful, False otherwise + """ + if self.connected: + logger.warning("Already connected") + return True + + try: + # Auto-detect port if not specified + if port is None: + ports = self.find_bluetooth_ports() + if not ports: + logger.error("No serial ports found") + return False + port = ports[0] + logger.info(f"Auto-selected port: {port}") + + # Establish serial connection + self.serial = serial.Serial( + port=port, + baudrate=baudrate, + timeout=timeout, + write_timeout=timeout + ) + + # Wait for connection to stabilize + time.sleep(1) + + # Clear any buffered data + self.serial.reset_input_buffer() + self.serial.reset_output_buffer() + + # Test connection with STATUS command + self._send_command("STATUS") + time.sleep(0.5) + + # Read and parse status + response = self._read_response(timeout=2.0) + if response and "Relay" in response: + self.connected = True + logger.info(f"Connected to ESP32 relay controller on {port}") + + # Start read thread + self._running = True + self._read_thread = threading.Thread(target=self._read_loop, daemon=True) + self._read_thread.start() + + # Trigger connect callbacks + self._trigger_callbacks('connect') + + return True + else: + logger.error("Failed to communicate with ESP32") + self.serial.close() + return False + + except serial.SerialException as e: + logger.error(f"Serial connection error: {e}") + return False + except Exception as e: + logger.error(f"Unexpected error during connection: {e}") + return False + + def disconnect(self): + """Disconnect from ESP32 relay controller""" + self._running = False + + if self._read_thread: + self._read_thread.join(timeout=2.0) + + if self.serial and self.serial.is_open: + self.serial.close() + + self.connected = False + logger.info("Disconnected from ESP32 relay controller") + + # Trigger disconnect callbacks + self._trigger_callbacks('disconnect') + + def _send_command(self, command: str) -> bool: + """ + Send command to ESP32 + + Args: + command: Command string (e.g., 'ON1', 'OFF2', 'STATUS') + + Returns: + True if command sent successfully + """ + if not self.connected or not self.serial: + logger.warning("Not connected to ESP32") + return False + + try: + # Add newline and send + command_bytes = f"{command}\n".encode('utf-8') + self.serial.write(command_bytes) + self.serial.flush() + logger.debug(f"Sent command: {command}") + return True + except Exception as e: + logger.error(f"Error sending command '{command}': {e}") + self._handle_connection_loss() + return False + + def _read_response(self, timeout: float = 1.0) -> str: + """ + Read response from ESP32 + + Args: + timeout: Maximum time to wait for response + + Returns: + Response string + """ + if not self.serial: + return "" + + start_time = time.time() + response_lines = [] + + try: + while time.time() - start_time < timeout: + if self.serial.in_waiting > 0: + line = self.serial.readline().decode('utf-8', errors='ignore').strip() + if line: + response_lines.append(line) + logger.debug(f"Received: {line}") + else: + time.sleep(0.05) + + return "\n".join(response_lines) + except Exception as e: + logger.error(f"Error reading response: {e}") + return "" + + def _read_loop(self): + """Background thread to continuously read from serial port""" + while self._running and self.serial and self.serial.is_open: + try: + if self.serial.in_waiting > 0: + line = self.serial.readline().decode('utf-8', errors='ignore').strip() + if line: + logger.debug(f"Read: {line}") + self._parse_status_line(line) + else: + time.sleep(0.1) + except Exception as e: + logger.error(f"Error in read loop: {e}") + if self._running: + self._handle_connection_loss() + break + + def _parse_status_line(self, line: str): + """ + Parse status line from ESP32 and update relay states + + Args: + line: Status line (e.g., "Relay 1: ON") + """ + try: + if "Relay" in line and ":" in line: + parts = line.split(":") + if len(parts) >= 2: + # Extract relay number + relay_part = parts[0].strip().split() + if len(relay_part) >= 2: + relay_num = int(relay_part[1]) - 1 # Convert to 0-based index + + # Extract state + state_str = parts[1].strip().upper() + state = "ON" in state_str + + if 0 <= relay_num < 8: + self.relay_states[relay_num] = state + self._trigger_callbacks('status_update', relay_num=relay_num, state=state) + except Exception as e: + logger.debug(f"Error parsing status line '{line}': {e}") + + def _handle_connection_loss(self): + """Handle unexpected connection loss""" + logger.warning("Connection lost to ESP32") + self.connected = False + + if self.serial: + try: + self.serial.close() + except: + pass + + self._trigger_callbacks('disconnect') + + # TODO: Implement auto-reconnect if enabled + + def turn_on(self, relay_num: int) -> bool: + """ + Turn on a specific relay + + Args: + relay_num: Relay number (1-8) + + Returns: + True if command sent successfully + """ + if not 1 <= relay_num <= 8: + logger.error(f"Invalid relay number: {relay_num}. Must be 1-8") + return False + + return self._send_command(f"ON{relay_num}") + + def turn_off(self, relay_num: int) -> bool: + """ + Turn off a specific relay + + Args: + relay_num: Relay number (1-8) + + Returns: + True if command sent successfully + """ + if not 1 <= relay_num <= 8: + logger.error(f"Invalid relay number: {relay_num}. Must be 1-8") + return False + + return self._send_command(f"OFF{relay_num}") + + def toggle(self, relay_num: int) -> bool: + """ + Toggle a specific relay + + Args: + relay_num: Relay number (1-8) + + Returns: + True if command sent successfully + """ + if not 1 <= relay_num <= 8: + logger.error(f"Invalid relay number: {relay_num}. Must be 1-8") + return False + + return self._send_command(f"TOGGLE{relay_num}") + + def all_on(self) -> bool: + """Turn all relays on""" + return self._send_command("ALL_ON") + + def all_off(self) -> bool: + """Turn all relays off""" + return self._send_command("ALL_OFF") + + def get_status(self) -> bool: + """Request status update for all relays""" + return self._send_command("STATUS") + + def get_relay_state(self, relay_num: int) -> Optional[bool]: + """ + Get cached state of a specific relay + + Args: + relay_num: Relay number (1-8) + + Returns: + True if ON, False if OFF, None if invalid relay number + """ + if not 1 <= relay_num <= 8: + return None + return self.relay_states[relay_num - 1] + + def get_all_states(self) -> List[bool]: + """ + Get cached states of all relays + + Returns: + List of 8 boolean values (True=ON, False=OFF) + """ + return self.relay_states.copy() + + def register_callback(self, event: str, callback: Callable): + """ + Register callback for events + + Args: + event: Event type ('connect', 'disconnect', 'status_update') + callback: Function to call when event occurs + For 'status_update', callback receives (relay_num, state) + """ + if event in self._callbacks: + self._callbacks[event].append(callback) + else: + logger.warning(f"Unknown event type: {event}") + + def _trigger_callbacks(self, event: str, **kwargs): + """Trigger all callbacks registered for an event""" + for callback in self._callbacks.get(event, []): + try: + callback(**kwargs) + except Exception as e: + logger.error(f"Error in callback for event '{event}': {e}") + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.disconnect() + + def __del__(self): + """Cleanup on destruction""" + self.disconnect() + + +# Example usage and testing +if __name__ == "__main__": + # Configure logging + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + # Create controller + relay = ESP32RelayController() + + # Register status update callback + def on_status_update(relay_num, state): + print(f" → Relay {relay_num + 1} is now {'ON' if state else 'OFF'}") + + relay.register_callback('status_update', on_status_update) + + # Find available ports + print("\nAvailable serial ports:") + ports = relay.find_bluetooth_ports() + for i, port in enumerate(ports, 1): + print(f" {i}. {port}") + + if not ports: + print("No serial ports found. Make sure ESP32 is paired via Bluetooth.") + exit(1) + + # Select port + port_choice = input(f"\nSelect port number (1-{len(ports)}) or press Enter for auto: ").strip() + selected_port = ports[int(port_choice) - 1] if port_choice.isdigit() else None + + # Connect + print("\nConnecting to ESP32...") + if relay.connect(selected_port): + print("✓ Connected successfully!\n") + + # Get initial status + print("Getting status...") + relay.get_status() + time.sleep(1) + + # Demo: Turn relays on/off + print("\nDemo: Cycling through relays...") + for i in range(1, 9): + print(f" Turning ON relay {i}") + relay.turn_on(i) + time.sleep(0.5) + print(f" Turning OFF relay {i}") + relay.turn_off(i) + time.sleep(0.5) + + # All on/off test + print("\nTesting ALL_ON...") + relay.all_on() + time.sleep(1) + + print("Testing ALL_OFF...") + relay.all_off() + time.sleep(1) + + # Interactive mode + print("\n--- Interactive Mode ---") + print("Commands: ON1-8, OFF1-8, TOGGLE1-8, ALL_ON, ALL_OFF, STATUS, QUIT") + + while True: + cmd = input("\nCommand: ").strip().upper() + + if cmd == "QUIT": + break + elif cmd.startswith("ON") and len(cmd) == 3 and cmd[2].isdigit(): + relay.turn_on(int(cmd[2])) + elif cmd.startswith("OFF") and len(cmd) == 4 and cmd[3].isdigit(): + relay.turn_off(int(cmd[3])) + elif cmd.startswith("TOGGLE") and len(cmd) == 7 and cmd[6].isdigit(): + relay.toggle(int(cmd[6])) + elif cmd == "ALL_ON": + relay.all_on() + elif cmd == "ALL_OFF": + relay.all_off() + elif cmd == "STATUS": + relay.get_status() + else: + print("Invalid command") + + # Disconnect + print("\nDisconnecting...") + relay.disconnect() + print("✓ Disconnected") + + else: + print("✗ Failed to connect to ESP32")