diff --git a/theatre_automation.py b/theatre_automation.py new file mode 100644 index 0000000..765e41d --- /dev/null +++ b/theatre_automation.py @@ -0,0 +1,444 @@ +""" +Theatre Automation Integration +Integrates ESP32 relay controller with Moonlight Drive-In theatre system +Controls lights, speakers, effects, etc. based on movie events +""" + +import logging +from typing import Optional, Dict, Any +from enum import Enum +import time + +from esp32_relay_controller import ESP32RelayController + +logger = logging.getLogger(__name__) + + +class RelayFunction(Enum): + """Theatre relay functions""" + HOUSE_LIGHTS = 1 + SCREEN_LIGHTS = 2 + MARQUEE_LIGHTS = 3 + SPEAKER_POWER = 4 + PROJECTOR_POWER = 5 + EFFECT_LIGHTS = 6 + CONCESSION_LIGHTS = 7 + PARKING_LIGHTS = 8 + + +class TheatreAutomation: + """ + Manages theatre automation using ESP32 relay controller + + Usage: + theatre = TheatreAutomation(config) + theatre.connect() + + # Movie starting + theatre.movie_starting() + + # Movie ended + theatre.movie_ended() + + # Manual control + theatre.set_relay(RelayFunction.HOUSE_LIGHTS, True) + """ + + def __init__(self, config: Optional[Dict[str, Any]] = None): + """ + Initialize theatre automation + + Args: + config: Configuration dictionary with relay settings + """ + self.config = config or {} + self.relay_controller = ESP32RelayController() + self.connected = False + + # Load relay mapping from config + self.relay_map = self._load_relay_mapping() + + # Automation settings + self.auto_lights = self.config.get('auto_theatre_lights', True) + self.auto_speakers = self.config.get('auto_speaker_power', True) + self.dim_delay = self.config.get('light_dim_delay', 5) # seconds + + # State tracking + self.movie_playing = False + self.trailer_playing = False + + def _load_relay_mapping(self) -> Dict[RelayFunction, int]: + """ + Load relay function mapping from config + + Returns: + Dictionary mapping functions to relay numbers + """ + default_mapping = { + RelayFunction.HOUSE_LIGHTS: 1, + RelayFunction.SCREEN_LIGHTS: 2, + RelayFunction.MARQUEE_LIGHTS: 3, + RelayFunction.SPEAKER_POWER: 4, + RelayFunction.PROJECTOR_POWER: 5, + RelayFunction.EFFECT_LIGHTS: 6, + RelayFunction.CONCESSION_LIGHTS: 7, + RelayFunction.PARKING_LIGHTS: 8, + } + + # Allow config to override default mapping + custom_mapping = self.config.get('relay_mapping', {}) + for func_name, relay_num in custom_mapping.items(): + try: + func = RelayFunction[func_name.upper()] + default_mapping[func] = relay_num + except (KeyError, ValueError): + logger.warning(f"Invalid relay function in config: {func_name}") + + return default_mapping + + def connect(self, port: Optional[str] = None) -> bool: + """ + Connect to ESP32 relay controller + + Args: + port: Serial port name (optional, will auto-detect if None) + + Returns: + True if connected successfully + """ + port = port or self.config.get('relay_port') + + try: + if self.relay_controller.connect(port): + self.connected = True + logger.info("Theatre automation connected to relay controller") + + # Initialize to safe state (all lights on, speakers off) + self.initialize_theatre() + + return True + else: + logger.error("Failed to connect to relay controller") + return False + except Exception as e: + logger.error(f"Error connecting to relay controller: {e}") + return False + + def disconnect(self): + """Disconnect from relay controller""" + if self.connected: + self.relay_controller.disconnect() + self.connected = False + logger.info("Theatre automation disconnected") + + def initialize_theatre(self): + """Set theatre to initial state (safe mode)""" + logger.info("Initializing theatre to safe state...") + + # Turn on all lights + self.set_relay(RelayFunction.HOUSE_LIGHTS, True) + self.set_relay(RelayFunction.SCREEN_LIGHTS, True) + self.set_relay(RelayFunction.MARQUEE_LIGHTS, True) + self.set_relay(RelayFunction.CONCESSION_LIGHTS, True) + self.set_relay(RelayFunction.PARKING_LIGHTS, True) + + # Turn off speakers initially + self.set_relay(RelayFunction.SPEAKER_POWER, False) + + # Turn off effect lights + self.set_relay(RelayFunction.EFFECT_LIGHTS, False) + + def set_relay(self, function: RelayFunction, state: bool) -> bool: + """ + Set a specific relay by function + + Args: + function: Relay function to control + state: True for ON, False for OFF + + Returns: + True if command sent successfully + """ + if not self.connected: + logger.warning("Not connected to relay controller") + return False + + relay_num = self.relay_map.get(function) + if relay_num is None: + logger.error(f"No relay mapped for function: {function}") + return False + + logger.info(f"Setting {function.name} (relay {relay_num}) to {'ON' if state else 'OFF'}") + + if state: + return self.relay_controller.turn_on(relay_num) + else: + return self.relay_controller.turn_off(relay_num) + + def toggle_relay(self, function: RelayFunction) -> bool: + """ + Toggle a specific relay by function + + Args: + function: Relay function to toggle + + Returns: + True if command sent successfully + """ + if not self.connected: + logger.warning("Not connected to relay controller") + return False + + relay_num = self.relay_map.get(function) + if relay_num is None: + logger.error(f"No relay mapped for function: {function}") + return False + + logger.info(f"Toggling {function.name} (relay {relay_num})") + return self.relay_controller.toggle(relay_num) + + def movie_starting(self, is_trailer: bool = False): + """ + Automation sequence when movie/trailer starts + + Args: + is_trailer: True if starting a trailer, False if main movie + """ + logger.info(f"{'Trailer' if is_trailer else 'Movie'} starting sequence...") + + if is_trailer: + self.trailer_playing = True + else: + self.movie_playing = True + + if not self.auto_lights: + logger.info("Auto lights disabled, skipping light automation") + return + + # Dim lights sequence + logger.info(f"Dimming lights in {self.dim_delay} seconds...") + time.sleep(self.dim_delay) + + # Turn off house lights + self.set_relay(RelayFunction.HOUSE_LIGHTS, False) + + # Dim screen lights during movies (off during trailers) + if not is_trailer: + self.set_relay(RelayFunction.SCREEN_LIGHTS, False) + + # Turn on speaker power if auto-enabled + if self.auto_speakers: + self.set_relay(RelayFunction.SPEAKER_POWER, True) + logger.info("Speaker power enabled") + + def movie_ended(self, is_trailer: bool = False): + """ + Automation sequence when movie/trailer ends + + Args: + is_trailer: True if ending a trailer, False if main movie + """ + logger.info(f"{'Trailer' if is_trailer else 'Movie'} ended sequence...") + + if is_trailer: + self.trailer_playing = False + else: + self.movie_playing = False + + if not self.auto_lights: + logger.info("Auto lights disabled, skipping light automation") + return + + # Bring lights back up + self.set_relay(RelayFunction.HOUSE_LIGHTS, True) + self.set_relay(RelayFunction.SCREEN_LIGHTS, True) + + # Turn off speakers if auto-enabled and nothing is playing + if self.auto_speakers and not self.movie_playing and not self.trailer_playing: + self.set_relay(RelayFunction.SPEAKER_POWER, False) + logger.info("Speaker power disabled") + + def intermission(self): + """Automation sequence for intermission""" + logger.info("Intermission sequence...") + + # Bring up house lights + self.set_relay(RelayFunction.HOUSE_LIGHTS, True) + + # Keep screen lights dim + self.set_relay(RelayFunction.SCREEN_LIGHTS, False) + + # Turn on concession lights + self.set_relay(RelayFunction.CONCESSION_LIGHTS, True) + + # Keep speakers on + self.set_relay(RelayFunction.SPEAKER_POWER, True) + + def emergency_lights_on(self): + """Emergency: Turn all lights on immediately""" + logger.warning("EMERGENCY: All lights ON") + + self.set_relay(RelayFunction.HOUSE_LIGHTS, True) + self.set_relay(RelayFunction.SCREEN_LIGHTS, True) + self.set_relay(RelayFunction.MARQUEE_LIGHTS, True) + self.set_relay(RelayFunction.CONCESSION_LIGHTS, True) + self.set_relay(RelayFunction.PARKING_LIGHTS, True) + self.set_relay(RelayFunction.EFFECT_LIGHTS, True) + + def closing_sequence(self): + """End of night closing sequence""" + logger.info("Closing sequence...") + + # Turn off speakers + self.set_relay(RelayFunction.SPEAKER_POWER, False) + + # Turn off effect lights + self.set_relay(RelayFunction.EFFECT_LIGHTS, False) + + # Turn off screen lights + self.set_relay(RelayFunction.SCREEN_LIGHTS, False) + + # Keep parking lights on for safety + self.set_relay(RelayFunction.PARKING_LIGHTS, True) + + # Keep marquee on + self.set_relay(RelayFunction.MARQUEE_LIGHTS, True) + + logger.info("Theatre closing - parking and marquee lights remain on") + + def effect_sequence(self, effect_name: str = "default"): + """ + Run a special effect lighting sequence + + Args: + effect_name: Name of effect to run + """ + logger.info(f"Running effect: {effect_name}") + + if effect_name == "flash": + # Flash effect lights + for _ in range(3): + self.set_relay(RelayFunction.EFFECT_LIGHTS, True) + time.sleep(0.2) + self.set_relay(RelayFunction.EFFECT_LIGHTS, False) + time.sleep(0.2) + + elif effect_name == "pulse": + # Pulse effect lights + for _ in range(5): + self.set_relay(RelayFunction.EFFECT_LIGHTS, True) + time.sleep(0.5) + self.set_relay(RelayFunction.EFFECT_LIGHTS, False) + time.sleep(0.5) + + elif effect_name == "marquee": + # Flash marquee lights + for _ in range(3): + self.toggle_relay(RelayFunction.MARQUEE_LIGHTS) + time.sleep(0.3) + self.toggle_relay(RelayFunction.MARQUEE_LIGHTS) + time.sleep(0.3) + + else: + # Default: Turn on effect lights briefly + self.set_relay(RelayFunction.EFFECT_LIGHTS, True) + time.sleep(2) + self.set_relay(RelayFunction.EFFECT_LIGHTS, False) + + def get_status(self) -> Dict[str, bool]: + """ + Get current status of all theatre relays + + Returns: + Dictionary mapping function names to states + """ + if not self.connected: + return {} + + # Request status update + self.relay_controller.get_status() + time.sleep(0.5) # Wait for response + + status = {} + for function, relay_num in self.relay_map.items(): + state = self.relay_controller.get_relay_state(relay_num) + status[function.name] = state if state is not None else False + + return status + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.disconnect() + + +# Example usage and testing +if __name__ == "__main__": + # Configure logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + # Example configuration + config = { + 'auto_theatre_lights': True, + 'auto_speaker_power': True, + 'light_dim_delay': 3, + 'relay_port': None, # Auto-detect + 'relay_mapping': { + # Customize relay assignments if needed + # 'HOUSE_LIGHTS': 1, + # 'SPEAKER_POWER': 4, + } + } + + # Create theatre automation + theatre = TheatreAutomation(config) + + # Connect + print("Connecting to theatre automation system...") + if theatre.connect(): + print("✓ Connected!\n") + + # Get initial status + print("Theatre Status:") + status = theatre.get_status() + for func, state in status.items(): + print(f" {func}: {'ON' if state else 'OFF'}") + + # Demo sequence + print("\n--- Demo Sequence ---") + + print("\nStarting trailer...") + theatre.movie_starting(is_trailer=True) + time.sleep(5) + + print("\nTrailer ended...") + theatre.movie_ended(is_trailer=True) + time.sleep(2) + + print("\nStarting main movie...") + theatre.movie_starting(is_trailer=False) + time.sleep(5) + + print("\nRunning flash effect...") + theatre.effect_sequence("flash") + time.sleep(2) + + print("\nMovie ended...") + theatre.movie_ended(is_trailer=False) + time.sleep(2) + + print("\nClosing theatre...") + theatre.closing_sequence() + + # Disconnect + print("\nDisconnecting...") + theatre.disconnect() + print("✓ Done!") + else: + print("✗ Failed to connect to theatre automation")