445 lines
14 KiB
Python
445 lines
14 KiB
Python
"""
|
|
Theatre Automation Integration
|
|
Integrates ESP32 relay controller with Moonlight Drive-In theatre system
|
|
Controls lights, speakers, effects, etc. based on movie events
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
from enum import Enum
|
|
import time
|
|
|
|
from esp32_relay_controller import ESP32RelayController
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RelayFunction(Enum):
|
|
"""Theatre relay functions"""
|
|
HOUSE_LIGHTS = 1
|
|
SCREEN_LIGHTS = 2
|
|
MARQUEE_LIGHTS = 3
|
|
SPEAKER_POWER = 4
|
|
PROJECTOR_POWER = 5
|
|
EFFECT_LIGHTS = 6
|
|
CONCESSION_LIGHTS = 7
|
|
PARKING_LIGHTS = 8
|
|
|
|
|
|
class TheatreAutomation:
|
|
"""
|
|
Manages theatre automation using ESP32 relay controller
|
|
|
|
Usage:
|
|
theatre = TheatreAutomation(config)
|
|
theatre.connect()
|
|
|
|
# Movie starting
|
|
theatre.movie_starting()
|
|
|
|
# Movie ended
|
|
theatre.movie_ended()
|
|
|
|
# Manual control
|
|
theatre.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
|
"""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Initialize theatre automation
|
|
|
|
Args:
|
|
config: Configuration dictionary with relay settings
|
|
"""
|
|
self.config = config or {}
|
|
self.relay_controller = ESP32RelayController()
|
|
self.connected = False
|
|
|
|
# Load relay mapping from config
|
|
self.relay_map = self._load_relay_mapping()
|
|
|
|
# Automation settings
|
|
self.auto_lights = self.config.get('auto_theatre_lights', True)
|
|
self.auto_speakers = self.config.get('auto_speaker_power', True)
|
|
self.dim_delay = self.config.get('light_dim_delay', 5) # seconds
|
|
|
|
# State tracking
|
|
self.movie_playing = False
|
|
self.trailer_playing = False
|
|
|
|
def _load_relay_mapping(self) -> Dict[RelayFunction, int]:
|
|
"""
|
|
Load relay function mapping from config
|
|
|
|
Returns:
|
|
Dictionary mapping functions to relay numbers
|
|
"""
|
|
default_mapping = {
|
|
RelayFunction.HOUSE_LIGHTS: 1,
|
|
RelayFunction.SCREEN_LIGHTS: 2,
|
|
RelayFunction.MARQUEE_LIGHTS: 3,
|
|
RelayFunction.SPEAKER_POWER: 4,
|
|
RelayFunction.PROJECTOR_POWER: 5,
|
|
RelayFunction.EFFECT_LIGHTS: 6,
|
|
RelayFunction.CONCESSION_LIGHTS: 7,
|
|
RelayFunction.PARKING_LIGHTS: 8,
|
|
}
|
|
|
|
# Allow config to override default mapping
|
|
custom_mapping = self.config.get('relay_mapping', {})
|
|
for func_name, relay_num in custom_mapping.items():
|
|
try:
|
|
func = RelayFunction[func_name.upper()]
|
|
default_mapping[func] = relay_num
|
|
except (KeyError, ValueError):
|
|
logger.warning(f"Invalid relay function in config: {func_name}")
|
|
|
|
return default_mapping
|
|
|
|
def connect(self, port: Optional[str] = None) -> bool:
|
|
"""
|
|
Connect to ESP32 relay controller
|
|
|
|
Args:
|
|
port: Serial port name (optional, will auto-detect if None)
|
|
|
|
Returns:
|
|
True if connected successfully
|
|
"""
|
|
port = port or self.config.get('relay_port')
|
|
|
|
try:
|
|
if self.relay_controller.connect(port):
|
|
self.connected = True
|
|
logger.info("Theatre automation connected to relay controller")
|
|
|
|
# Initialize to safe state (all lights on, speakers off)
|
|
self.initialize_theatre()
|
|
|
|
return True
|
|
else:
|
|
logger.error("Failed to connect to relay controller")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error connecting to relay controller: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from relay controller"""
|
|
if self.connected:
|
|
self.relay_controller.disconnect()
|
|
self.connected = False
|
|
logger.info("Theatre automation disconnected")
|
|
|
|
def initialize_theatre(self):
|
|
"""Set theatre to initial state (safe mode)"""
|
|
logger.info("Initializing theatre to safe state...")
|
|
|
|
# Turn on all lights
|
|
self.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
|
self.set_relay(RelayFunction.SCREEN_LIGHTS, True)
|
|
self.set_relay(RelayFunction.MARQUEE_LIGHTS, True)
|
|
self.set_relay(RelayFunction.CONCESSION_LIGHTS, True)
|
|
self.set_relay(RelayFunction.PARKING_LIGHTS, True)
|
|
|
|
# Turn off speakers initially
|
|
self.set_relay(RelayFunction.SPEAKER_POWER, False)
|
|
|
|
# Turn off effect lights
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
|
|
|
def set_relay(self, function: RelayFunction, state: bool) -> bool:
|
|
"""
|
|
Set a specific relay by function
|
|
|
|
Args:
|
|
function: Relay function to control
|
|
state: True for ON, False for OFF
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
if not self.connected:
|
|
logger.warning("Not connected to relay controller")
|
|
return False
|
|
|
|
relay_num = self.relay_map.get(function)
|
|
if relay_num is None:
|
|
logger.error(f"No relay mapped for function: {function}")
|
|
return False
|
|
|
|
logger.info(f"Setting {function.name} (relay {relay_num}) to {'ON' if state else 'OFF'}")
|
|
|
|
if state:
|
|
return self.relay_controller.turn_on(relay_num)
|
|
else:
|
|
return self.relay_controller.turn_off(relay_num)
|
|
|
|
def toggle_relay(self, function: RelayFunction) -> bool:
|
|
"""
|
|
Toggle a specific relay by function
|
|
|
|
Args:
|
|
function: Relay function to toggle
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
if not self.connected:
|
|
logger.warning("Not connected to relay controller")
|
|
return False
|
|
|
|
relay_num = self.relay_map.get(function)
|
|
if relay_num is None:
|
|
logger.error(f"No relay mapped for function: {function}")
|
|
return False
|
|
|
|
logger.info(f"Toggling {function.name} (relay {relay_num})")
|
|
return self.relay_controller.toggle(relay_num)
|
|
|
|
def movie_starting(self, is_trailer: bool = False):
|
|
"""
|
|
Automation sequence when movie/trailer starts
|
|
|
|
Args:
|
|
is_trailer: True if starting a trailer, False if main movie
|
|
"""
|
|
logger.info(f"{'Trailer' if is_trailer else 'Movie'} starting sequence...")
|
|
|
|
if is_trailer:
|
|
self.trailer_playing = True
|
|
else:
|
|
self.movie_playing = True
|
|
|
|
if not self.auto_lights:
|
|
logger.info("Auto lights disabled, skipping light automation")
|
|
return
|
|
|
|
# Dim lights sequence
|
|
logger.info(f"Dimming lights in {self.dim_delay} seconds...")
|
|
time.sleep(self.dim_delay)
|
|
|
|
# Turn off house lights
|
|
self.set_relay(RelayFunction.HOUSE_LIGHTS, False)
|
|
|
|
# Dim screen lights during movies (off during trailers)
|
|
if not is_trailer:
|
|
self.set_relay(RelayFunction.SCREEN_LIGHTS, False)
|
|
|
|
# Turn on speaker power if auto-enabled
|
|
if self.auto_speakers:
|
|
self.set_relay(RelayFunction.SPEAKER_POWER, True)
|
|
logger.info("Speaker power enabled")
|
|
|
|
def movie_ended(self, is_trailer: bool = False):
|
|
"""
|
|
Automation sequence when movie/trailer ends
|
|
|
|
Args:
|
|
is_trailer: True if ending a trailer, False if main movie
|
|
"""
|
|
logger.info(f"{'Trailer' if is_trailer else 'Movie'} ended sequence...")
|
|
|
|
if is_trailer:
|
|
self.trailer_playing = False
|
|
else:
|
|
self.movie_playing = False
|
|
|
|
if not self.auto_lights:
|
|
logger.info("Auto lights disabled, skipping light automation")
|
|
return
|
|
|
|
# Bring lights back up
|
|
self.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
|
self.set_relay(RelayFunction.SCREEN_LIGHTS, True)
|
|
|
|
# Turn off speakers if auto-enabled and nothing is playing
|
|
if self.auto_speakers and not self.movie_playing and not self.trailer_playing:
|
|
self.set_relay(RelayFunction.SPEAKER_POWER, False)
|
|
logger.info("Speaker power disabled")
|
|
|
|
def intermission(self):
|
|
"""Automation sequence for intermission"""
|
|
logger.info("Intermission sequence...")
|
|
|
|
# Bring up house lights
|
|
self.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
|
|
|
# Keep screen lights dim
|
|
self.set_relay(RelayFunction.SCREEN_LIGHTS, False)
|
|
|
|
# Turn on concession lights
|
|
self.set_relay(RelayFunction.CONCESSION_LIGHTS, True)
|
|
|
|
# Keep speakers on
|
|
self.set_relay(RelayFunction.SPEAKER_POWER, True)
|
|
|
|
def emergency_lights_on(self):
|
|
"""Emergency: Turn all lights on immediately"""
|
|
logger.warning("EMERGENCY: All lights ON")
|
|
|
|
self.set_relay(RelayFunction.HOUSE_LIGHTS, True)
|
|
self.set_relay(RelayFunction.SCREEN_LIGHTS, True)
|
|
self.set_relay(RelayFunction.MARQUEE_LIGHTS, True)
|
|
self.set_relay(RelayFunction.CONCESSION_LIGHTS, True)
|
|
self.set_relay(RelayFunction.PARKING_LIGHTS, True)
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, True)
|
|
|
|
def closing_sequence(self):
|
|
"""End of night closing sequence"""
|
|
logger.info("Closing sequence...")
|
|
|
|
# Turn off speakers
|
|
self.set_relay(RelayFunction.SPEAKER_POWER, False)
|
|
|
|
# Turn off effect lights
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
|
|
|
# Turn off screen lights
|
|
self.set_relay(RelayFunction.SCREEN_LIGHTS, False)
|
|
|
|
# Keep parking lights on for safety
|
|
self.set_relay(RelayFunction.PARKING_LIGHTS, True)
|
|
|
|
# Keep marquee on
|
|
self.set_relay(RelayFunction.MARQUEE_LIGHTS, True)
|
|
|
|
logger.info("Theatre closing - parking and marquee lights remain on")
|
|
|
|
def effect_sequence(self, effect_name: str = "default"):
|
|
"""
|
|
Run a special effect lighting sequence
|
|
|
|
Args:
|
|
effect_name: Name of effect to run
|
|
"""
|
|
logger.info(f"Running effect: {effect_name}")
|
|
|
|
if effect_name == "flash":
|
|
# Flash effect lights
|
|
for _ in range(3):
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, True)
|
|
time.sleep(0.2)
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
|
time.sleep(0.2)
|
|
|
|
elif effect_name == "pulse":
|
|
# Pulse effect lights
|
|
for _ in range(5):
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, True)
|
|
time.sleep(0.5)
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
|
time.sleep(0.5)
|
|
|
|
elif effect_name == "marquee":
|
|
# Flash marquee lights
|
|
for _ in range(3):
|
|
self.toggle_relay(RelayFunction.MARQUEE_LIGHTS)
|
|
time.sleep(0.3)
|
|
self.toggle_relay(RelayFunction.MARQUEE_LIGHTS)
|
|
time.sleep(0.3)
|
|
|
|
else:
|
|
# Default: Turn on effect lights briefly
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, True)
|
|
time.sleep(2)
|
|
self.set_relay(RelayFunction.EFFECT_LIGHTS, False)
|
|
|
|
def get_status(self) -> Dict[str, bool]:
|
|
"""
|
|
Get current status of all theatre relays
|
|
|
|
Returns:
|
|
Dictionary mapping function names to states
|
|
"""
|
|
if not self.connected:
|
|
return {}
|
|
|
|
# Request status update
|
|
self.relay_controller.get_status()
|
|
time.sleep(0.5) # Wait for response
|
|
|
|
status = {}
|
|
for function, relay_num in self.relay_map.items():
|
|
state = self.relay_controller.get_relay_state(relay_num)
|
|
status[function.name] = state if state is not None else False
|
|
|
|
return status
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry"""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit"""
|
|
self.disconnect()
|
|
|
|
|
|
# Example usage and testing
|
|
if __name__ == "__main__":
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Example configuration
|
|
config = {
|
|
'auto_theatre_lights': True,
|
|
'auto_speaker_power': True,
|
|
'light_dim_delay': 3,
|
|
'relay_port': None, # Auto-detect
|
|
'relay_mapping': {
|
|
# Customize relay assignments if needed
|
|
# 'HOUSE_LIGHTS': 1,
|
|
# 'SPEAKER_POWER': 4,
|
|
}
|
|
}
|
|
|
|
# Create theatre automation
|
|
theatre = TheatreAutomation(config)
|
|
|
|
# Connect
|
|
print("Connecting to theatre automation system...")
|
|
if theatre.connect():
|
|
print("✓ Connected!\n")
|
|
|
|
# Get initial status
|
|
print("Theatre Status:")
|
|
status = theatre.get_status()
|
|
for func, state in status.items():
|
|
print(f" {func}: {'ON' if state else 'OFF'}")
|
|
|
|
# Demo sequence
|
|
print("\n--- Demo Sequence ---")
|
|
|
|
print("\nStarting trailer...")
|
|
theatre.movie_starting(is_trailer=True)
|
|
time.sleep(5)
|
|
|
|
print("\nTrailer ended...")
|
|
theatre.movie_ended(is_trailer=True)
|
|
time.sleep(2)
|
|
|
|
print("\nStarting main movie...")
|
|
theatre.movie_starting(is_trailer=False)
|
|
time.sleep(5)
|
|
|
|
print("\nRunning flash effect...")
|
|
theatre.effect_sequence("flash")
|
|
time.sleep(2)
|
|
|
|
print("\nMovie ended...")
|
|
theatre.movie_ended(is_trailer=False)
|
|
time.sleep(2)
|
|
|
|
print("\nClosing theatre...")
|
|
theatre.closing_sequence()
|
|
|
|
# Disconnect
|
|
print("\nDisconnecting...")
|
|
theatre.disconnect()
|
|
print("✓ Done!")
|
|
else:
|
|
print("✗ Failed to connect to theatre automation")
|