Files
moonlight-drive-in/esp32_relay_controller.py

480 lines
15 KiB
Python

"""
ESP32 Bluetooth Relay Controller Integration
Provides Bluetooth Serial communication with ESP32 8-channel relay controller
"""
import serial
import serial.tools.list_ports
import time
import threading
import logging
from typing import Optional, Callable, Dict, List
logger = logging.getLogger(__name__)
class ESP32RelayController:
"""
Controls an ESP32 8-channel Bluetooth relay controller
Usage:
relay = ESP32RelayController()
relay.connect("COM4") # or /dev/ttyUSB0 on Linux
relay.turn_on(1) # Turn on relay 1
relay.turn_off(1) # Turn off relay 1
relay.all_on() # Turn all relays on
relay.all_off() # Turn all relays off
relay.disconnect()
"""
def __init__(self, device_name: str = "ESP32-Relay-8CH", auto_reconnect: bool = True):
"""
Initialize the relay controller
Args:
device_name: Bluetooth device name to search for
auto_reconnect: Automatically reconnect if connection is lost
"""
self.device_name = device_name
self.auto_reconnect = auto_reconnect
self.serial: Optional[serial.Serial] = None
self.connected = False
self.relay_states = [False] * 8
self._read_thread: Optional[threading.Thread] = None
self._running = False
self._callbacks: Dict[str, List[Callable]] = {
'connect': [],
'disconnect': [],
'status_update': []
}
def find_bluetooth_ports(self) -> List[str]:
"""
Find all available serial ports (including Bluetooth COM ports)
Returns:
List of available port names
"""
ports = serial.tools.list_ports.comports()
available_ports = []
for port in ports:
logger.debug(f"Found port: {port.device} - {port.description}")
available_ports.append(port.device)
return available_ports
def connect(self, port: Optional[str] = None, baudrate: int = 115200, timeout: float = 2.0) -> bool:
"""
Connect to ESP32 relay controller
Args:
port: Serial port name (e.g., 'COM4' or '/dev/rfcomm0')
If None, will attempt to auto-detect
baudrate: Serial communication speed (default 115200)
timeout: Read timeout in seconds
Returns:
True if connection successful, False otherwise
"""
if self.connected:
logger.warning("Already connected")
return True
try:
# Auto-detect port if not specified
if port is None:
ports = self.find_bluetooth_ports()
if not ports:
logger.error("No serial ports found")
return False
port = ports[0]
logger.info(f"Auto-selected port: {port}")
# Establish serial connection
self.serial = serial.Serial(
port=port,
baudrate=baudrate,
timeout=timeout,
write_timeout=timeout
)
# Wait for connection to stabilize
time.sleep(1)
# Clear any buffered data
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
# Test connection with STATUS command
self._send_command("STATUS")
time.sleep(0.5)
# Read and parse status
response = self._read_response(timeout=2.0)
if response and "Relay" in response:
self.connected = True
logger.info(f"Connected to ESP32 relay controller on {port}")
# Start read thread
self._running = True
self._read_thread = threading.Thread(target=self._read_loop, daemon=True)
self._read_thread.start()
# Trigger connect callbacks
self._trigger_callbacks('connect')
return True
else:
logger.error("Failed to communicate with ESP32")
self.serial.close()
return False
except serial.SerialException as e:
logger.error(f"Serial connection error: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during connection: {e}")
return False
def disconnect(self):
"""Disconnect from ESP32 relay controller"""
self._running = False
if self._read_thread:
self._read_thread.join(timeout=2.0)
if self.serial and self.serial.is_open:
self.serial.close()
self.connected = False
logger.info("Disconnected from ESP32 relay controller")
# Trigger disconnect callbacks
self._trigger_callbacks('disconnect')
def _send_command(self, command: str) -> bool:
"""
Send command to ESP32
Args:
command: Command string (e.g., 'ON1', 'OFF2', 'STATUS')
Returns:
True if command sent successfully
"""
if not self.connected or not self.serial:
logger.warning("Not connected to ESP32")
return False
try:
# Add newline and send
command_bytes = f"{command}\n".encode('utf-8')
self.serial.write(command_bytes)
self.serial.flush()
logger.debug(f"Sent command: {command}")
return True
except Exception as e:
logger.error(f"Error sending command '{command}': {e}")
self._handle_connection_loss()
return False
def _read_response(self, timeout: float = 1.0) -> str:
"""
Read response from ESP32
Args:
timeout: Maximum time to wait for response
Returns:
Response string
"""
if not self.serial:
return ""
start_time = time.time()
response_lines = []
try:
while time.time() - start_time < timeout:
if self.serial.in_waiting > 0:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if line:
response_lines.append(line)
logger.debug(f"Received: {line}")
else:
time.sleep(0.05)
return "\n".join(response_lines)
except Exception as e:
logger.error(f"Error reading response: {e}")
return ""
def _read_loop(self):
"""Background thread to continuously read from serial port"""
while self._running and self.serial and self.serial.is_open:
try:
if self.serial.in_waiting > 0:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if line:
logger.debug(f"Read: {line}")
self._parse_status_line(line)
else:
time.sleep(0.1)
except Exception as e:
logger.error(f"Error in read loop: {e}")
if self._running:
self._handle_connection_loss()
break
def _parse_status_line(self, line: str):
"""
Parse status line from ESP32 and update relay states
Args:
line: Status line (e.g., "Relay 1: ON")
"""
try:
if "Relay" in line and ":" in line:
parts = line.split(":")
if len(parts) >= 2:
# Extract relay number
relay_part = parts[0].strip().split()
if len(relay_part) >= 2:
relay_num = int(relay_part[1]) - 1 # Convert to 0-based index
# Extract state
state_str = parts[1].strip().upper()
state = "ON" in state_str
if 0 <= relay_num < 8:
self.relay_states[relay_num] = state
self._trigger_callbacks('status_update', relay_num=relay_num, state=state)
except Exception as e:
logger.debug(f"Error parsing status line '{line}': {e}")
def _handle_connection_loss(self):
"""Handle unexpected connection loss"""
logger.warning("Connection lost to ESP32")
self.connected = False
if self.serial:
try:
self.serial.close()
except:
pass
self._trigger_callbacks('disconnect')
# TODO: Implement auto-reconnect if enabled
def turn_on(self, relay_num: int) -> bool:
"""
Turn on a specific relay
Args:
relay_num: Relay number (1-8)
Returns:
True if command sent successfully
"""
if not 1 <= relay_num <= 8:
logger.error(f"Invalid relay number: {relay_num}. Must be 1-8")
return False
return self._send_command(f"ON{relay_num}")
def turn_off(self, relay_num: int) -> bool:
"""
Turn off a specific relay
Args:
relay_num: Relay number (1-8)
Returns:
True if command sent successfully
"""
if not 1 <= relay_num <= 8:
logger.error(f"Invalid relay number: {relay_num}. Must be 1-8")
return False
return self._send_command(f"OFF{relay_num}")
def toggle(self, relay_num: int) -> bool:
"""
Toggle a specific relay
Args:
relay_num: Relay number (1-8)
Returns:
True if command sent successfully
"""
if not 1 <= relay_num <= 8:
logger.error(f"Invalid relay number: {relay_num}. Must be 1-8")
return False
return self._send_command(f"TOGGLE{relay_num}")
def all_on(self) -> bool:
"""Turn all relays on"""
return self._send_command("ALL_ON")
def all_off(self) -> bool:
"""Turn all relays off"""
return self._send_command("ALL_OFF")
def get_status(self) -> bool:
"""Request status update for all relays"""
return self._send_command("STATUS")
def get_relay_state(self, relay_num: int) -> Optional[bool]:
"""
Get cached state of a specific relay
Args:
relay_num: Relay number (1-8)
Returns:
True if ON, False if OFF, None if invalid relay number
"""
if not 1 <= relay_num <= 8:
return None
return self.relay_states[relay_num - 1]
def get_all_states(self) -> List[bool]:
"""
Get cached states of all relays
Returns:
List of 8 boolean values (True=ON, False=OFF)
"""
return self.relay_states.copy()
def register_callback(self, event: str, callback: Callable):
"""
Register callback for events
Args:
event: Event type ('connect', 'disconnect', 'status_update')
callback: Function to call when event occurs
For 'status_update', callback receives (relay_num, state)
"""
if event in self._callbacks:
self._callbacks[event].append(callback)
else:
logger.warning(f"Unknown event type: {event}")
def _trigger_callbacks(self, event: str, **kwargs):
"""Trigger all callbacks registered for an event"""
for callback in self._callbacks.get(event, []):
try:
callback(**kwargs)
except Exception as e:
logger.error(f"Error in callback for event '{event}': {e}")
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.disconnect()
def __del__(self):
"""Cleanup on destruction"""
self.disconnect()
# Example usage and testing
if __name__ == "__main__":
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create controller
relay = ESP32RelayController()
# Register status update callback
def on_status_update(relay_num, state):
print(f" → Relay {relay_num + 1} is now {'ON' if state else 'OFF'}")
relay.register_callback('status_update', on_status_update)
# Find available ports
print("\nAvailable serial ports:")
ports = relay.find_bluetooth_ports()
for i, port in enumerate(ports, 1):
print(f" {i}. {port}")
if not ports:
print("No serial ports found. Make sure ESP32 is paired via Bluetooth.")
exit(1)
# Select port
port_choice = input(f"\nSelect port number (1-{len(ports)}) or press Enter for auto: ").strip()
selected_port = ports[int(port_choice) - 1] if port_choice.isdigit() else None
# Connect
print("\nConnecting to ESP32...")
if relay.connect(selected_port):
print("✓ Connected successfully!\n")
# Get initial status
print("Getting status...")
relay.get_status()
time.sleep(1)
# Demo: Turn relays on/off
print("\nDemo: Cycling through relays...")
for i in range(1, 9):
print(f" Turning ON relay {i}")
relay.turn_on(i)
time.sleep(0.5)
print(f" Turning OFF relay {i}")
relay.turn_off(i)
time.sleep(0.5)
# All on/off test
print("\nTesting ALL_ON...")
relay.all_on()
time.sleep(1)
print("Testing ALL_OFF...")
relay.all_off()
time.sleep(1)
# Interactive mode
print("\n--- Interactive Mode ---")
print("Commands: ON1-8, OFF1-8, TOGGLE1-8, ALL_ON, ALL_OFF, STATUS, QUIT")
while True:
cmd = input("\nCommand: ").strip().upper()
if cmd == "QUIT":
break
elif cmd.startswith("ON") and len(cmd) == 3 and cmd[2].isdigit():
relay.turn_on(int(cmd[2]))
elif cmd.startswith("OFF") and len(cmd) == 4 and cmd[3].isdigit():
relay.turn_off(int(cmd[3]))
elif cmd.startswith("TOGGLE") and len(cmd) == 7 and cmd[6].isdigit():
relay.toggle(int(cmd[6]))
elif cmd == "ALL_ON":
relay.all_on()
elif cmd == "ALL_OFF":
relay.all_off()
elif cmd == "STATUS":
relay.get_status()
else:
print("Invalid command")
# Disconnect
print("\nDisconnecting...")
relay.disconnect()
print("✓ Disconnected")
else:
print("✗ Failed to connect to ESP32")