Add theatre automation integration for ESP32 relay control
This commit is contained in:
444
theatre_automation.py
Normal file
444
theatre_automation.py
Normal file
@@ -0,0 +1,444 @@
|
||||
"""
|
||||
Theatre Automation Integration
|
||||
Integrates ESP32 relay controller with Moonlight Drive-In theatre system
|
||||
Controls lights, speakers, effects, etc. based on movie events
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional, Dict, Any
|
||||
from enum import Enum
|
||||
import time
|
||||
|
||||
from esp32_relay_controller import ESP32RelayController
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RelayFunction(Enum):
|
||||
"""Theatre relay functions"""
|
||||
HOUSE_LIGHTS = 1
|
||||
SCREEN_LIGHTS = 2
|
||||
MARQUEE_LIGHTS = 3
|
||||
SPEAKER_POWER = 4
|
||||
PROJECTOR_POWER = 5
|
||||
EFFECT_LIGHTS = 6
|
||||
CONCESSION_LIGHTS = 7
|
||||
PARKING_LIGHTS = 8
|
||||
|
||||
|
||||
class TheatreAutomation:
|
||||
"""
|
||||
Manages theatre automation using ESP32 relay controller
|
||||
|
||||
Usage:
|
||||
theatre = TheatreAutomation(config)
|
||||
theatre.connect()
|
||||
|
||||
# Movie starting
|
||||
theatre.movie_starting()
|
||||
|
||||
# Movie ended
|
||||
theatre.movie_ended()
|
||||
|
||||
# Manual control
|
||||
theatre.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
||||
"""
|
||||
|
||||
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
||||
"""
|
||||
Initialize theatre automation
|
||||
|
||||
Args:
|
||||
config: Configuration dictionary with relay settings
|
||||
"""
|
||||
self.config = config or {}
|
||||
self.relay_controller = ESP32RelayController()
|
||||
self.connected = False
|
||||
|
||||
# Load relay mapping from config
|
||||
self.relay_map = self._load_relay_mapping()
|
||||
|
||||
# Automation settings
|
||||
self.auto_lights = self.config.get('auto_theatre_lights', True)
|
||||
self.auto_speakers = self.config.get('auto_speaker_power', True)
|
||||
self.dim_delay = self.config.get('light_dim_delay', 5) # seconds
|
||||
|
||||
# State tracking
|
||||
self.movie_playing = False
|
||||
self.trailer_playing = False
|
||||
|
||||
def _load_relay_mapping(self) -> Dict[RelayFunction, int]:
|
||||
"""
|
||||
Load relay function mapping from config
|
||||
|
||||
Returns:
|
||||
Dictionary mapping functions to relay numbers
|
||||
"""
|
||||
default_mapping = {
|
||||
RelayFunction.HOUSE_LIGHTS: 1,
|
||||
RelayFunction.SCREEN_LIGHTS: 2,
|
||||
RelayFunction.MARQUEE_LIGHTS: 3,
|
||||
RelayFunction.SPEAKER_POWER: 4,
|
||||
RelayFunction.PROJECTOR_POWER: 5,
|
||||
RelayFunction.EFFECT_LIGHTS: 6,
|
||||
RelayFunction.CONCESSION_LIGHTS: 7,
|
||||
RelayFunction.PARKING_LIGHTS: 8,
|
||||
}
|
||||
|
||||
# Allow config to override default mapping
|
||||
custom_mapping = self.config.get('relay_mapping', {})
|
||||
for func_name, relay_num in custom_mapping.items():
|
||||
try:
|
||||
func = RelayFunction[func_name.upper()]
|
||||
default_mapping[func] = relay_num
|
||||
except (KeyError, ValueError):
|
||||
logger.warning(f"Invalid relay function in config: {func_name}")
|
||||
|
||||
return default_mapping
|
||||
|
||||
def connect(self, port: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Connect to ESP32 relay controller
|
||||
|
||||
Args:
|
||||
port: Serial port name (optional, will auto-detect if None)
|
||||
|
||||
Returns:
|
||||
True if connected successfully
|
||||
"""
|
||||
port = port or self.config.get('relay_port')
|
||||
|
||||
try:
|
||||
if self.relay_controller.connect(port):
|
||||
self.connected = True
|
||||
logger.info("Theatre automation connected to relay controller")
|
||||
|
||||
# Initialize to safe state (all lights on, speakers off)
|
||||
self.initialize_theatre()
|
||||
|
||||
return True
|
||||
else:
|
||||
logger.error("Failed to connect to relay controller")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error connecting to relay controller: {e}")
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from relay controller"""
|
||||
if self.connected:
|
||||
self.relay_controller.disconnect()
|
||||
self.connected = False
|
||||
logger.info("Theatre automation disconnected")
|
||||
|
||||
def initialize_theatre(self):
|
||||
"""Set theatre to initial state (safe mode)"""
|
||||
logger.info("Initializing theatre to safe state...")
|
||||
|
||||
# Turn on all lights
|
||||
self.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.SCREEN_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.MARQUEE_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.CONCESSION_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.PARKING_LIGHTS, True)
|
||||
|
||||
# Turn off speakers initially
|
||||
self.set_relay(RelayFunction.SPEAKER_POWER, False)
|
||||
|
||||
# Turn off effect lights
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
||||
|
||||
def set_relay(self, function: RelayFunction, state: bool) -> bool:
|
||||
"""
|
||||
Set a specific relay by function
|
||||
|
||||
Args:
|
||||
function: Relay function to control
|
||||
state: True for ON, False for OFF
|
||||
|
||||
Returns:
|
||||
True if command sent successfully
|
||||
"""
|
||||
if not self.connected:
|
||||
logger.warning("Not connected to relay controller")
|
||||
return False
|
||||
|
||||
relay_num = self.relay_map.get(function)
|
||||
if relay_num is None:
|
||||
logger.error(f"No relay mapped for function: {function}")
|
||||
return False
|
||||
|
||||
logger.info(f"Setting {function.name} (relay {relay_num}) to {'ON' if state else 'OFF'}")
|
||||
|
||||
if state:
|
||||
return self.relay_controller.turn_on(relay_num)
|
||||
else:
|
||||
return self.relay_controller.turn_off(relay_num)
|
||||
|
||||
def toggle_relay(self, function: RelayFunction) -> bool:
|
||||
"""
|
||||
Toggle a specific relay by function
|
||||
|
||||
Args:
|
||||
function: Relay function to toggle
|
||||
|
||||
Returns:
|
||||
True if command sent successfully
|
||||
"""
|
||||
if not self.connected:
|
||||
logger.warning("Not connected to relay controller")
|
||||
return False
|
||||
|
||||
relay_num = self.relay_map.get(function)
|
||||
if relay_num is None:
|
||||
logger.error(f"No relay mapped for function: {function}")
|
||||
return False
|
||||
|
||||
logger.info(f"Toggling {function.name} (relay {relay_num})")
|
||||
return self.relay_controller.toggle(relay_num)
|
||||
|
||||
def movie_starting(self, is_trailer: bool = False):
|
||||
"""
|
||||
Automation sequence when movie/trailer starts
|
||||
|
||||
Args:
|
||||
is_trailer: True if starting a trailer, False if main movie
|
||||
"""
|
||||
logger.info(f"{'Trailer' if is_trailer else 'Movie'} starting sequence...")
|
||||
|
||||
if is_trailer:
|
||||
self.trailer_playing = True
|
||||
else:
|
||||
self.movie_playing = True
|
||||
|
||||
if not self.auto_lights:
|
||||
logger.info("Auto lights disabled, skipping light automation")
|
||||
return
|
||||
|
||||
# Dim lights sequence
|
||||
logger.info(f"Dimming lights in {self.dim_delay} seconds...")
|
||||
time.sleep(self.dim_delay)
|
||||
|
||||
# Turn off house lights
|
||||
self.set_relay(RelayFunction.HOUSE_LIGHTS, False)
|
||||
|
||||
# Dim screen lights during movies (off during trailers)
|
||||
if not is_trailer:
|
||||
self.set_relay(RelayFunction.SCREEN_LIGHTS, False)
|
||||
|
||||
# Turn on speaker power if auto-enabled
|
||||
if self.auto_speakers:
|
||||
self.set_relay(RelayFunction.SPEAKER_POWER, True)
|
||||
logger.info("Speaker power enabled")
|
||||
|
||||
def movie_ended(self, is_trailer: bool = False):
|
||||
"""
|
||||
Automation sequence when movie/trailer ends
|
||||
|
||||
Args:
|
||||
is_trailer: True if ending a trailer, False if main movie
|
||||
"""
|
||||
logger.info(f"{'Trailer' if is_trailer else 'Movie'} ended sequence...")
|
||||
|
||||
if is_trailer:
|
||||
self.trailer_playing = False
|
||||
else:
|
||||
self.movie_playing = False
|
||||
|
||||
if not self.auto_lights:
|
||||
logger.info("Auto lights disabled, skipping light automation")
|
||||
return
|
||||
|
||||
# Bring lights back up
|
||||
self.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.SCREEN_LIGHTS, True)
|
||||
|
||||
# Turn off speakers if auto-enabled and nothing is playing
|
||||
if self.auto_speakers and not self.movie_playing and not self.trailer_playing:
|
||||
self.set_relay(RelayFunction.SPEAKER_POWER, False)
|
||||
logger.info("Speaker power disabled")
|
||||
|
||||
def intermission(self):
|
||||
"""Automation sequence for intermission"""
|
||||
logger.info("Intermission sequence...")
|
||||
|
||||
# Bring up house lights
|
||||
self.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
||||
|
||||
# Keep screen lights dim
|
||||
self.set_relay(RelayFunction.SCREEN_LIGHTS, False)
|
||||
|
||||
# Turn on concession lights
|
||||
self.set_relay(RelayFunction.CONCESSION_LIGHTS, True)
|
||||
|
||||
# Keep speakers on
|
||||
self.set_relay(RelayFunction.SPEAKER_POWER, True)
|
||||
|
||||
def emergency_lights_on(self):
|
||||
"""Emergency: Turn all lights on immediately"""
|
||||
logger.warning("EMERGENCY: All lights ON")
|
||||
|
||||
self.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.SCREEN_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.MARQUEE_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.CONCESSION_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.PARKING_LIGHTS, True)
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, True)
|
||||
|
||||
def closing_sequence(self):
|
||||
"""End of night closing sequence"""
|
||||
logger.info("Closing sequence...")
|
||||
|
||||
# Turn off speakers
|
||||
self.set_relay(RelayFunction.SPEAKER_POWER, False)
|
||||
|
||||
# Turn off effect lights
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
||||
|
||||
# Turn off screen lights
|
||||
self.set_relay(RelayFunction.SCREEN_LIGHTS, False)
|
||||
|
||||
# Keep parking lights on for safety
|
||||
self.set_relay(RelayFunction.PARKING_LIGHTS, True)
|
||||
|
||||
# Keep marquee on
|
||||
self.set_relay(RelayFunction.MARQUEE_LIGHTS, True)
|
||||
|
||||
logger.info("Theatre closing - parking and marquee lights remain on")
|
||||
|
||||
def effect_sequence(self, effect_name: str = "default"):
|
||||
"""
|
||||
Run a special effect lighting sequence
|
||||
|
||||
Args:
|
||||
effect_name: Name of effect to run
|
||||
"""
|
||||
logger.info(f"Running effect: {effect_name}")
|
||||
|
||||
if effect_name == "flash":
|
||||
# Flash effect lights
|
||||
for _ in range(3):
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, True)
|
||||
time.sleep(0.2)
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
||||
time.sleep(0.2)
|
||||
|
||||
elif effect_name == "pulse":
|
||||
# Pulse effect lights
|
||||
for _ in range(5):
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, True)
|
||||
time.sleep(0.5)
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
||||
time.sleep(0.5)
|
||||
|
||||
elif effect_name == "marquee":
|
||||
# Flash marquee lights
|
||||
for _ in range(3):
|
||||
self.toggle_relay(RelayFunction.MARQUEE_LIGHTS)
|
||||
time.sleep(0.3)
|
||||
self.toggle_relay(RelayFunction.MARQUEE_LIGHTS)
|
||||
time.sleep(0.3)
|
||||
|
||||
else:
|
||||
# Default: Turn on effect lights briefly
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, True)
|
||||
time.sleep(2)
|
||||
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
||||
|
||||
def get_status(self) -> Dict[str, bool]:
|
||||
"""
|
||||
Get current status of all theatre relays
|
||||
|
||||
Returns:
|
||||
Dictionary mapping function names to states
|
||||
"""
|
||||
if not self.connected:
|
||||
return {}
|
||||
|
||||
# Request status update
|
||||
self.relay_controller.get_status()
|
||||
time.sleep(0.5) # Wait for response
|
||||
|
||||
status = {}
|
||||
for function, relay_num in self.relay_map.items():
|
||||
state = self.relay_controller.get_relay_state(relay_num)
|
||||
status[function.name] = state if state is not None else False
|
||||
|
||||
return status
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry"""
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit"""
|
||||
self.disconnect()
|
||||
|
||||
|
||||
# Example usage and testing
|
||||
if __name__ == "__main__":
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
# Example configuration
|
||||
config = {
|
||||
'auto_theatre_lights': True,
|
||||
'auto_speaker_power': True,
|
||||
'light_dim_delay': 3,
|
||||
'relay_port': None, # Auto-detect
|
||||
'relay_mapping': {
|
||||
# Customize relay assignments if needed
|
||||
# 'HOUSE_LIGHTS': 1,
|
||||
# 'SPEAKER_POWER': 4,
|
||||
}
|
||||
}
|
||||
|
||||
# Create theatre automation
|
||||
theatre = TheatreAutomation(config)
|
||||
|
||||
# Connect
|
||||
print("Connecting to theatre automation system...")
|
||||
if theatre.connect():
|
||||
print("✓ Connected!\n")
|
||||
|
||||
# Get initial status
|
||||
print("Theatre Status:")
|
||||
status = theatre.get_status()
|
||||
for func, state in status.items():
|
||||
print(f" {func}: {'ON' if state else 'OFF'}")
|
||||
|
||||
# Demo sequence
|
||||
print("\n--- Demo Sequence ---")
|
||||
|
||||
print("\nStarting trailer...")
|
||||
theatre.movie_starting(is_trailer=True)
|
||||
time.sleep(5)
|
||||
|
||||
print("\nTrailer ended...")
|
||||
theatre.movie_ended(is_trailer=True)
|
||||
time.sleep(2)
|
||||
|
||||
print("\nStarting main movie...")
|
||||
theatre.movie_starting(is_trailer=False)
|
||||
time.sleep(5)
|
||||
|
||||
print("\nRunning flash effect...")
|
||||
theatre.effect_sequence("flash")
|
||||
time.sleep(2)
|
||||
|
||||
print("\nMovie ended...")
|
||||
theatre.movie_ended(is_trailer=False)
|
||||
time.sleep(2)
|
||||
|
||||
print("\nClosing theatre...")
|
||||
theatre.closing_sequence()
|
||||
|
||||
# Disconnect
|
||||
print("\nDisconnecting...")
|
||||
theatre.disconnect()
|
||||
print("✓ Done!")
|
||||
else:
|
||||
print("✗ Failed to connect to theatre automation")
|
||||
Reference in New Issue
Block a user