480 lines
15 KiB
Python
480 lines
15 KiB
Python
"""
|
|
ESP32 Bluetooth Relay Controller Integration
|
|
Provides Bluetooth Serial communication with ESP32 8-channel relay controller
|
|
"""
|
|
|
|
import serial
|
|
import serial.tools.list_ports
|
|
import time
|
|
import threading
|
|
import logging
|
|
from typing import Optional, Callable, Dict, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ESP32RelayController:
|
|
"""
|
|
Controls an ESP32 8-channel Bluetooth relay controller
|
|
|
|
Usage:
|
|
relay = ESP32RelayController()
|
|
relay.connect("COM4") # or /dev/ttyUSB0 on Linux
|
|
relay.turn_on(1) # Turn on relay 1
|
|
relay.turn_off(1) # Turn off relay 1
|
|
relay.all_on() # Turn all relays on
|
|
relay.all_off() # Turn all relays off
|
|
relay.disconnect()
|
|
"""
|
|
|
|
def __init__(self, device_name: str = "ESP32-Relay-8CH", auto_reconnect: bool = True):
|
|
"""
|
|
Initialize the relay controller
|
|
|
|
Args:
|
|
device_name: Bluetooth device name to search for
|
|
auto_reconnect: Automatically reconnect if connection is lost
|
|
"""
|
|
self.device_name = device_name
|
|
self.auto_reconnect = auto_reconnect
|
|
self.serial: Optional[serial.Serial] = None
|
|
self.connected = False
|
|
self.relay_states = [False] * 8
|
|
self._read_thread: Optional[threading.Thread] = None
|
|
self._running = False
|
|
self._callbacks: Dict[str, List[Callable]] = {
|
|
'connect': [],
|
|
'disconnect': [],
|
|
'status_update': []
|
|
}
|
|
|
|
def find_bluetooth_ports(self) -> List[str]:
|
|
"""
|
|
Find all available serial ports (including Bluetooth COM ports)
|
|
|
|
Returns:
|
|
List of available port names
|
|
"""
|
|
ports = serial.tools.list_ports.comports()
|
|
available_ports = []
|
|
|
|
for port in ports:
|
|
logger.debug(f"Found port: {port.device} - {port.description}")
|
|
available_ports.append(port.device)
|
|
|
|
return available_ports
|
|
|
|
def connect(self, port: Optional[str] = None, baudrate: int = 115200, timeout: float = 2.0) -> bool:
|
|
"""
|
|
Connect to ESP32 relay controller
|
|
|
|
Args:
|
|
port: Serial port name (e.g., 'COM4' or '/dev/rfcomm0')
|
|
If None, will attempt to auto-detect
|
|
baudrate: Serial communication speed (default 115200)
|
|
timeout: Read timeout in seconds
|
|
|
|
Returns:
|
|
True if connection successful, False otherwise
|
|
"""
|
|
if self.connected:
|
|
logger.warning("Already connected")
|
|
return True
|
|
|
|
try:
|
|
# Auto-detect port if not specified
|
|
if port is None:
|
|
ports = self.find_bluetooth_ports()
|
|
if not ports:
|
|
logger.error("No serial ports found")
|
|
return False
|
|
port = ports[0]
|
|
logger.info(f"Auto-selected port: {port}")
|
|
|
|
# Establish serial connection
|
|
self.serial = serial.Serial(
|
|
port=port,
|
|
baudrate=baudrate,
|
|
timeout=timeout,
|
|
write_timeout=timeout
|
|
)
|
|
|
|
# Wait for connection to stabilize
|
|
time.sleep(1)
|
|
|
|
# Clear any buffered data
|
|
self.serial.reset_input_buffer()
|
|
self.serial.reset_output_buffer()
|
|
|
|
# Test connection with STATUS command
|
|
self._send_command("STATUS")
|
|
time.sleep(0.5)
|
|
|
|
# Read and parse status
|
|
response = self._read_response(timeout=2.0)
|
|
if response and "Relay" in response:
|
|
self.connected = True
|
|
logger.info(f"Connected to ESP32 relay controller on {port}")
|
|
|
|
# Start read thread
|
|
self._running = True
|
|
self._read_thread = threading.Thread(target=self._read_loop, daemon=True)
|
|
self._read_thread.start()
|
|
|
|
# Trigger connect callbacks
|
|
self._trigger_callbacks('connect')
|
|
|
|
return True
|
|
else:
|
|
logger.error("Failed to communicate with ESP32")
|
|
self.serial.close()
|
|
return False
|
|
|
|
except serial.SerialException as e:
|
|
logger.error(f"Serial connection error: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during connection: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from ESP32 relay controller"""
|
|
self._running = False
|
|
|
|
if self._read_thread:
|
|
self._read_thread.join(timeout=2.0)
|
|
|
|
if self.serial and self.serial.is_open:
|
|
self.serial.close()
|
|
|
|
self.connected = False
|
|
logger.info("Disconnected from ESP32 relay controller")
|
|
|
|
# Trigger disconnect callbacks
|
|
self._trigger_callbacks('disconnect')
|
|
|
|
def _send_command(self, command: str) -> bool:
|
|
"""
|
|
Send command to ESP32
|
|
|
|
Args:
|
|
command: Command string (e.g., 'ON1', 'OFF2', 'STATUS')
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
if not self.connected or not self.serial:
|
|
logger.warning("Not connected to ESP32")
|
|
return False
|
|
|
|
try:
|
|
# Add newline and send
|
|
command_bytes = f"{command}\n".encode('utf-8')
|
|
self.serial.write(command_bytes)
|
|
self.serial.flush()
|
|
logger.debug(f"Sent command: {command}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error sending command '{command}': {e}")
|
|
self._handle_connection_loss()
|
|
return False
|
|
|
|
def _read_response(self, timeout: float = 1.0) -> str:
|
|
"""
|
|
Read response from ESP32
|
|
|
|
Args:
|
|
timeout: Maximum time to wait for response
|
|
|
|
Returns:
|
|
Response string
|
|
"""
|
|
if not self.serial:
|
|
return ""
|
|
|
|
start_time = time.time()
|
|
response_lines = []
|
|
|
|
try:
|
|
while time.time() - start_time < timeout:
|
|
if self.serial.in_waiting > 0:
|
|
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
|
|
if line:
|
|
response_lines.append(line)
|
|
logger.debug(f"Received: {line}")
|
|
else:
|
|
time.sleep(0.05)
|
|
|
|
return "\n".join(response_lines)
|
|
except Exception as e:
|
|
logger.error(f"Error reading response: {e}")
|
|
return ""
|
|
|
|
def _read_loop(self):
|
|
"""Background thread to continuously read from serial port"""
|
|
while self._running and self.serial and self.serial.is_open:
|
|
try:
|
|
if self.serial.in_waiting > 0:
|
|
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
|
|
if line:
|
|
logger.debug(f"Read: {line}")
|
|
self._parse_status_line(line)
|
|
else:
|
|
time.sleep(0.1)
|
|
except Exception as e:
|
|
logger.error(f"Error in read loop: {e}")
|
|
if self._running:
|
|
self._handle_connection_loss()
|
|
break
|
|
|
|
def _parse_status_line(self, line: str):
|
|
"""
|
|
Parse status line from ESP32 and update relay states
|
|
|
|
Args:
|
|
line: Status line (e.g., "Relay 1: ON")
|
|
"""
|
|
try:
|
|
if "Relay" in line and ":" in line:
|
|
parts = line.split(":")
|
|
if len(parts) >= 2:
|
|
# Extract relay number
|
|
relay_part = parts[0].strip().split()
|
|
if len(relay_part) >= 2:
|
|
relay_num = int(relay_part[1]) - 1 # Convert to 0-based index
|
|
|
|
# Extract state
|
|
state_str = parts[1].strip().upper()
|
|
state = "ON" in state_str
|
|
|
|
if 0 <= relay_num < 8:
|
|
self.relay_states[relay_num] = state
|
|
self._trigger_callbacks('status_update', relay_num=relay_num, state=state)
|
|
except Exception as e:
|
|
logger.debug(f"Error parsing status line '{line}': {e}")
|
|
|
|
def _handle_connection_loss(self):
|
|
"""Handle unexpected connection loss"""
|
|
logger.warning("Connection lost to ESP32")
|
|
self.connected = False
|
|
|
|
if self.serial:
|
|
try:
|
|
self.serial.close()
|
|
except:
|
|
pass
|
|
|
|
self._trigger_callbacks('disconnect')
|
|
|
|
# TODO: Implement auto-reconnect if enabled
|
|
|
|
def turn_on(self, relay_num: int) -> bool:
|
|
"""
|
|
Turn on a specific relay
|
|
|
|
Args:
|
|
relay_num: Relay number (1-8)
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
if not 1 <= relay_num <= 8:
|
|
logger.error(f"Invalid relay number: {relay_num}. Must be 1-8")
|
|
return False
|
|
|
|
return self._send_command(f"ON{relay_num}")
|
|
|
|
def turn_off(self, relay_num: int) -> bool:
|
|
"""
|
|
Turn off a specific relay
|
|
|
|
Args:
|
|
relay_num: Relay number (1-8)
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
if not 1 <= relay_num <= 8:
|
|
logger.error(f"Invalid relay number: {relay_num}. Must be 1-8")
|
|
return False
|
|
|
|
return self._send_command(f"OFF{relay_num}")
|
|
|
|
def toggle(self, relay_num: int) -> bool:
|
|
"""
|
|
Toggle a specific relay
|
|
|
|
Args:
|
|
relay_num: Relay number (1-8)
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
if not 1 <= relay_num <= 8:
|
|
logger.error(f"Invalid relay number: {relay_num}. Must be 1-8")
|
|
return False
|
|
|
|
return self._send_command(f"TOGGLE{relay_num}")
|
|
|
|
def all_on(self) -> bool:
|
|
"""Turn all relays on"""
|
|
return self._send_command("ALL_ON")
|
|
|
|
def all_off(self) -> bool:
|
|
"""Turn all relays off"""
|
|
return self._send_command("ALL_OFF")
|
|
|
|
def get_status(self) -> bool:
|
|
"""Request status update for all relays"""
|
|
return self._send_command("STATUS")
|
|
|
|
def get_relay_state(self, relay_num: int) -> Optional[bool]:
|
|
"""
|
|
Get cached state of a specific relay
|
|
|
|
Args:
|
|
relay_num: Relay number (1-8)
|
|
|
|
Returns:
|
|
True if ON, False if OFF, None if invalid relay number
|
|
"""
|
|
if not 1 <= relay_num <= 8:
|
|
return None
|
|
return self.relay_states[relay_num - 1]
|
|
|
|
def get_all_states(self) -> List[bool]:
|
|
"""
|
|
Get cached states of all relays
|
|
|
|
Returns:
|
|
List of 8 boolean values (True=ON, False=OFF)
|
|
"""
|
|
return self.relay_states.copy()
|
|
|
|
def register_callback(self, event: str, callback: Callable):
|
|
"""
|
|
Register callback for events
|
|
|
|
Args:
|
|
event: Event type ('connect', 'disconnect', 'status_update')
|
|
callback: Function to call when event occurs
|
|
For 'status_update', callback receives (relay_num, state)
|
|
"""
|
|
if event in self._callbacks:
|
|
self._callbacks[event].append(callback)
|
|
else:
|
|
logger.warning(f"Unknown event type: {event}")
|
|
|
|
def _trigger_callbacks(self, event: str, **kwargs):
|
|
"""Trigger all callbacks registered for an event"""
|
|
for callback in self._callbacks.get(event, []):
|
|
try:
|
|
callback(**kwargs)
|
|
except Exception as e:
|
|
logger.error(f"Error in callback for event '{event}': {e}")
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry"""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit"""
|
|
self.disconnect()
|
|
|
|
def __del__(self):
|
|
"""Cleanup on destruction"""
|
|
self.disconnect()
|
|
|
|
|
|
# Example usage and testing
|
|
if __name__ == "__main__":
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Create controller
|
|
relay = ESP32RelayController()
|
|
|
|
# Register status update callback
|
|
def on_status_update(relay_num, state):
|
|
print(f" → Relay {relay_num + 1} is now {'ON' if state else 'OFF'}")
|
|
|
|
relay.register_callback('status_update', on_status_update)
|
|
|
|
# Find available ports
|
|
print("\nAvailable serial ports:")
|
|
ports = relay.find_bluetooth_ports()
|
|
for i, port in enumerate(ports, 1):
|
|
print(f" {i}. {port}")
|
|
|
|
if not ports:
|
|
print("No serial ports found. Make sure ESP32 is paired via Bluetooth.")
|
|
exit(1)
|
|
|
|
# Select port
|
|
port_choice = input(f"\nSelect port number (1-{len(ports)}) or press Enter for auto: ").strip()
|
|
selected_port = ports[int(port_choice) - 1] if port_choice.isdigit() else None
|
|
|
|
# Connect
|
|
print("\nConnecting to ESP32...")
|
|
if relay.connect(selected_port):
|
|
print("✓ Connected successfully!\n")
|
|
|
|
# Get initial status
|
|
print("Getting status...")
|
|
relay.get_status()
|
|
time.sleep(1)
|
|
|
|
# Demo: Turn relays on/off
|
|
print("\nDemo: Cycling through relays...")
|
|
for i in range(1, 9):
|
|
print(f" Turning ON relay {i}")
|
|
relay.turn_on(i)
|
|
time.sleep(0.5)
|
|
print(f" Turning OFF relay {i}")
|
|
relay.turn_off(i)
|
|
time.sleep(0.5)
|
|
|
|
# All on/off test
|
|
print("\nTesting ALL_ON...")
|
|
relay.all_on()
|
|
time.sleep(1)
|
|
|
|
print("Testing ALL_OFF...")
|
|
relay.all_off()
|
|
time.sleep(1)
|
|
|
|
# Interactive mode
|
|
print("\n--- Interactive Mode ---")
|
|
print("Commands: ON1-8, OFF1-8, TOGGLE1-8, ALL_ON, ALL_OFF, STATUS, QUIT")
|
|
|
|
while True:
|
|
cmd = input("\nCommand: ").strip().upper()
|
|
|
|
if cmd == "QUIT":
|
|
break
|
|
elif cmd.startswith("ON") and len(cmd) == 3 and cmd[2].isdigit():
|
|
relay.turn_on(int(cmd[2]))
|
|
elif cmd.startswith("OFF") and len(cmd) == 4 and cmd[3].isdigit():
|
|
relay.turn_off(int(cmd[3]))
|
|
elif cmd.startswith("TOGGLE") and len(cmd) == 7 and cmd[6].isdigit():
|
|
relay.toggle(int(cmd[6]))
|
|
elif cmd == "ALL_ON":
|
|
relay.all_on()
|
|
elif cmd == "ALL_OFF":
|
|
relay.all_off()
|
|
elif cmd == "STATUS":
|
|
relay.get_status()
|
|
else:
|
|
print("Invalid command")
|
|
|
|
# Disconnect
|
|
print("\nDisconnecting...")
|
|
relay.disconnect()
|
|
print("✓ Disconnected")
|
|
|
|
else:
|
|
print("✗ Failed to connect to ESP32")
|