#!/usr/bin/env python3 """ LEGO Dimensions Portal Reader Module For Moonlight Drive-In Video Player System Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions) to detect character and vehicle disc placement/removal events. v1.3.0 Changes: - Removed default LED blink on tag insert (callbacks now control LEDs) - Added software_flash() method for reliable flashing via solid color loop - Added software_pulse() method for pulsing effect - Effects now run in background threads to not block event handling Requirements: pip install pyusb libusb-package Windows Driver Setup: 1. Download Zadig from https://zadig.akeo.ie/ 2. Connect the LEGO Dimensions portal 3. Run Zadig as administrator 4. Select Options -> List All Devices 5. Choose "LEGO READER V2.10" from the dropdown 6. Select WinUSB as the target driver 7. Click "Replace Driver" Windows libusb Setup: The libusb-package provides pre-compiled libusb-1.0.dll. Alternatively, download from https://libusb.info/ and place libusb-1.0.dll in C:\\Windows\\System32 """ import sys import os import platform # Windows-specific backend setup - must be done BEFORE importing usb.core def _setup_windows_backend(): """Configure libusb backend for Windows.""" if platform.system() != 'Windows': return # Try libusb-package first (recommended) try: import libusb_package return except ImportError: pass # Search for libusb DLL in common locations dll_paths = [ os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'System32'), os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'SysWOW64'), os.path.dirname(sys.executable), os.getcwd(), ] for path in dll_paths: dll_file = os.path.join(path, 'libusb-1.0.dll') if os.path.exists(dll_file): os.environ['PATH'] = path + os.pathsep + os.environ.get('PATH', '') return # Run setup before USB imports _setup_windows_backend() import usb.core import usb.util import usb.backend.libusb1 import threading import time import json import math from typing import Callable, Optional, Dict, Any, List from dataclasses import dataclass from enum import Enum from ctypes import c_uint32 # USB Device Identifiers VENDOR_ID = 0x0e6f PRODUCT_ID = 0x0241 # Module version __version__ = "1.3.0" def get_usb_backend(): """Get the appropriate USB backend for the current platform.""" if platform.system() == 'Windows': # Try libusb-package first try: import libusb_package return libusb_package.get_libusb1_backend() except (ImportError, Exception): pass # Try default libusb1 backend try: return usb.backend.libusb1.get_backend() except Exception: pass return None # Use default backend class Pad(Enum): """Portal pad identifiers""" ALL = 0 CENTER = 1 LEFT = 2 RIGHT = 3 class TagEvent(Enum): """Tag event types""" INSERTED = 0 REMOVED = 1 # Predefined colors [R, G, B] COLORS = { 'OFF': [0, 0, 0], 'RED': [255, 0, 0], 'GREEN': [0, 255, 0], 'BLUE': [0, 0, 255], 'WHITE': [255, 255, 255], 'YELLOW': [255, 255, 0], 'CYAN': [0, 255, 255], 'MAGENTA': [255, 0, 255], 'ORANGE': [255, 128, 0], 'PURPLE': [128, 0, 255], 'PINK': [255, 105, 180], } # Portal initialization command (contains "(c) LEGO 2014" signature) TOYPAD_INIT = [ 0x55, 0x0f, 0xb0, 0x01, 0x28, 0x63, 0x29, 0x20, 0x4c, 0x45, 0x47, 0x4f, 0x20, 0x32, 0x30, 0x31, 0x34, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ] @dataclass class TagInfo: """Information about a detected NFC tag""" uid: bytes pad: Pad event: TagEvent uid_hex: str = "" legacy_key: int = 0 # Integer key for tag_colors.json lookup character_id: Optional[int] = None character_name: Optional[str] = None is_vehicle: bool = False def __post_init__(self): self.uid_hex = self.uid.hex().upper() # Calculate legacy key (32-bit int from first 4 bytes, big-endian) if len(self.uid) >= 4: self.legacy_key = int.from_bytes(self.uid[:4], 'big') class LegoDimensionsReader: """ LEGO Dimensions Portal NFC Reader Provides event-driven tag detection with callbacks for integration with the Moonlight Drive-In video player system. v1.3.0: No default LED behavior on tag insert/remove. Callbacks are fully responsible for LED control. Usage: reader = LegoDimensionsReader() reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.uid_hex}") reader.on_tag_remove = lambda tag: print(f"Tag removed: {tag.uid_hex}") reader.start() """ def __init__(self, character_db_path: Optional[str] = None): """ Initialize the LEGO Dimensions reader. Args: character_db_path: Optional path to character_database.json """ self.dev = None self._running = False self._thread: Optional[threading.Thread] = None self._lock = threading.Lock() # USB endpoints (will be discovered on connect) self._endpoint_in = None self._endpoint_out = None self._interface = None # Active tags currently on pads (pad_num -> TagInfo) self._active_tags: Dict[int, TagInfo] = {} # Effect control - track running effects per pad self._effect_threads: Dict[int, threading.Thread] = {} self._effect_stop_flags: Dict[int, threading.Event] = {} # Callbacks self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None self.on_tag_remove: Optional[Callable[[TagInfo], None]] = None self.on_error: Optional[Callable[[Exception], None]] = None self.on_connect: Optional[Callable[[], None]] = None self.on_disconnect: Optional[Callable[[], None]] = None # Character database for ID lookup self._character_db = self._load_character_database(character_db_path) self._vehicle_db = self._load_vehicle_database(character_db_path) def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]: """Load character ID database.""" if db_path and os.path.exists(db_path): try: with open(db_path, 'r', encoding='utf-8') as f: data = json.load(f) return {int(k): v for k, v in data.get('characters', {}).items()} except Exception: pass return { 1: "Batman", 2: "Gandalf", 3: "Wyldstyle", 4: "Aquaman", 5: "Bad Cop", 6: "Bane", 7: "Bart Simpson", 8: "Benny", 9: "Chell", 10: "Cole", 11: "Cragger", 12: "Cyborg", 13: "Cyberman", 14: "Doc Brown", 15: "The Doctor", 16: "Emmet", 17: "Eris", 18: "Gimli", 19: "Gollum", 20: "Harley Quinn", 21: "Homer Simpson", 22: "Jay", 23: "Joker", 24: "Kai", 25: "ACU Trooper", 26: "Krusty the Clown", 27: "Laval", 28: "Legolas", 29: "Lloyd", 30: "Marty McFly", 31: "Nya", 32: "Owen Grady", 33: "Peter Venkman", 34: "Scooby-Doo", 35: "Sensei Wu", 36: "Shaggy", 37: "Slimer", 38: "Superman", 39: "Unikitty", 40: "Wicked Witch", 41: "Wonder Woman", 42: "Zane", 43: "Green Arrow", 44: "Supergirl", 45: "Abby Yates", 46: "Finn the Human", 47: "Harry Potter", 48: "Ethan Hunt", 49: "Lumpy Space Princess", 50: "Jake the Dog", 51: "B.A. Baracus", 52: "Michael Knight", 53: "Gizmo", 54: "Sonic the Hedgehog", 55: "Gamer Kid", 56: "E.T.", 57: "Lord Voldemort", 58: "Hermione Granger", 59: "Newt Scamander", 60: "Tina Goldstein", 61: "Stripe", 62: "Sloth", 63: "Beetlejuice", 64: "Kevin", 65: "Erin Gilbert", 66: "Patty Tolan", 67: "Jillian Holtzmann", 68: "Stay Puft", 69: "Chase McCain", 70: "Excalibur Batman", 71: "Raven", 72: "Beast Boy", 73: "Starfire", 74: "Blossom", 75: "Bubbles", 76: "Buttercup", 77: "Marceline", 78: "Batgirl", 79: "Robin", 80: "Knight Rider (KITT)", 81: "Gremlins Gizmo", 82: "Teen Titans Go!", } def _load_vehicle_database(self, db_path: Optional[str] = None) -> Dict[int, str]: """Load vehicle ID database.""" if db_path and os.path.exists(db_path): try: with open(db_path, 'r', encoding='utf-8') as f: data = json.load(f) return {int(k): v for k, v in data.get('vehicles', {}).items()} except Exception: pass return { 1: "Batmobile", 2: "Batwing", 3: "Bat Blaster", 4: "DeLorean Time Machine", 5: "Hoverboard", 6: "Travelling Time Train", 7: "TARDIS", 8: "K9", 9: "Dalek", 10: "Companion Cube", 11: "Sentry Turret", 12: "GLaDOS", 13: "Mystery Machine", 14: "Scooby Snack", 15: "Gyrocopter", 16: "Ghost Trap", 17: "Ecto-1", 18: "PKE Meter", 19: "Emmet's Excavator", 20: "Emmet's Car", } def _is_timeout_error(self, error: usb.core.USBError) -> bool: """Check if error is a timeout (normal during polling).""" error_str = str(error).lower() return ( error.errno in (-116, -7, 110, None) or 'timeout' in error_str or 'operation timed out' in error_str or 'timed out' in error_str ) def _is_pipe_error(self, error: usb.core.USBError) -> bool: """Check if error indicates device disconnection.""" return error.errno == -9 or 'pipe' in str(error).lower() def connect(self) -> bool: """ Establish connection to LEGO Dimensions portal. Returns True if connection successful. """ try: backend = get_usb_backend() self.dev = usb.core.find( idVendor=VENDOR_ID, idProduct=PRODUCT_ID, backend=backend ) if self.dev is None: error_msg = ( "LEGO Dimensions Portal not found.\n" "Ensure:\n" " 1. Portal is connected via USB\n" " 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n" " 3. WinUSB driver installed via Zadig" ) if platform.system() == 'Windows': error_msg += ( "\n 4. libusb installed: pip install libusb-package\n" " 5. Or libusb-1.0.dll in System32" ) raise ConnectionError(error_msg) try: if self.dev.is_kernel_driver_active(0): self.dev.detach_kernel_driver(0) except (AttributeError, usb.core.USBError, NotImplementedError): pass try: self.dev.set_configuration() except usb.core.USBError as e: if 'entity not found' not in str(e).lower(): raise cfg = self.dev.get_active_configuration() intf = cfg[(0, 0)] self._interface = intf.bInterfaceNumber try: usb.util.claim_interface(self.dev, self._interface) except usb.core.USBError as e: if 'resource busy' not in str(e).lower(): raise self._endpoint_out = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT ) self._endpoint_in = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN ) if self._endpoint_out is None: self._endpoint_out = 0x01 if self._endpoint_in is None: self._endpoint_in = 0x81 self._write_raw(TOYPAD_INIT) try: product = usb.util.get_string(self.dev, self.dev.iProduct) print(f"Connected to: {product}") except Exception: print("Connected to LEGO Dimensions Portal") if self.on_connect: self.on_connect() return True except usb.core.USBError as e: error_msg = str(e) if "access denied" in error_msg.lower() or "insufficient permissions" in error_msg.lower(): print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.") elif "no backend" in error_msg.lower(): print("ERROR: No USB backend found. Install libusb-package: pip install libusb-package") if self.on_error: self.on_error(e) return False except Exception as e: if self.on_error: self.on_error(e) return False def disconnect(self): """Disconnect from portal and clean up resources.""" self.stop() self.stop_all_effects() if self.dev: try: self.set_pad_color(Pad.ALL, COLORS['OFF']) if self._interface is not None: try: usb.util.release_interface(self.dev, self._interface) except Exception: pass usb.util.dispose_resources(self.dev) except Exception: pass self.dev = None self._endpoint_in = None self._endpoint_out = None self._interface = None if self.on_disconnect: self.on_disconnect() def _calculate_checksum(self, command: list) -> int: """Calculate checksum (sum of bytes mod 256)""" return sum(command) % 256 def _write_raw(self, data: list): """Write raw data to device.""" if not self.dev: return try: endpoint = self._endpoint_out if hasattr(endpoint, 'bEndpointAddress'): endpoint = endpoint.bEndpointAddress if endpoint is None: endpoint = 0x01 with self._lock: self.dev.write(endpoint, data) except usb.core.USBError: pass def _send_command(self, command: list): """Send a command to the portal with checksum and padding.""" if not self.dev: return checksum = self._calculate_checksum(command) message = command + [checksum] while len(message) < 32: message.append(0x00) self._write_raw(message) def set_pad_color(self, pad: Pad, color: List[int]): """ Set a pad to a specific color immediately. Args: pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT) color: [R, G, B] values 0-255 """ self.stop_effect(pad) command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]] self._send_command(command) def flash_pad(self, pad: Pad, color: List[int], on_time: int = 10, off_time: int = 10, count: int = 5): """ Flash a pad (hardware command - may not work on all portals). Use software_flash() for reliable cross-portal flashing. """ command = [ 0x55, 0x0e, 0xc8, 0x06, pad.value, color[0], color[1], color[2], on_time, off_time, count, 0, 0, 0, 0, 0 ] self._send_command(command) def fade_pad(self, pad: Pad, color: List[int], speed: int = 10, count: int = 1): """ Fade a pad (hardware command - may not work on all portals). Use software_pulse() for reliable cross-portal pulsing. """ command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count, color[0], color[1], color[2]] self._send_command(command) # ========================================================================= # SOFTWARE-BASED EFFECTS (Reliable cross-portal) # ========================================================================= def stop_effect(self, pad: Pad): """Stop any running software effect on a specific pad.""" pad_val = pad.value if pad_val in self._effect_stop_flags: self._effect_stop_flags[pad_val].set() if pad_val in self._effect_threads: thread = self._effect_threads[pad_val] if thread.is_alive(): thread.join(timeout=0.5) del self._effect_threads[pad_val] del self._effect_stop_flags[pad_val] def stop_all_effects(self): """Stop all running software effects.""" for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]: self.stop_effect(pad) def _set_color_raw(self, pad_value: int, r: int, g: int, b: int): """Set color without stopping effects (internal use).""" command = [0x55, 0x06, 0xc0, 0x02, pad_value, r, g, b] self._send_command(command) def software_flash(self, pad: Pad, color: List[int], on_time: float = 0.3, off_time: float = 0.3, count: int = 0): """ Flash a pad using software-controlled solid colors. This is reliable across all portal versions. Args: pad: Which pad to flash color: [R, G, B] values on_time: Seconds LED stays on off_time: Seconds LED stays off count: Number of flashes (0 = infinite until stopped) """ self.stop_effect(pad) stop_flag = threading.Event() self._effect_stop_flags[pad.value] = stop_flag def _flash_loop(): flash_count = 0 while not stop_flag.is_set(): self._set_color_raw(pad.value, color[0], color[1], color[2]) if stop_flag.wait(on_time): break self._set_color_raw(pad.value, 0, 0, 0) if stop_flag.wait(off_time): break flash_count += 1 if count > 0 and flash_count >= count: self._set_color_raw(pad.value, color[0], color[1], color[2]) break thread = threading.Thread(target=_flash_loop, daemon=True) self._effect_threads[pad.value] = thread thread.start() def software_pulse(self, pad: Pad, color: List[int], speed: float = 1.0, min_brightness: float = 0.1, count: int = 0): """ Pulse/breathe effect using software-controlled brightness. Args: pad: Which pad to pulse color: [R, G, B] base color speed: Seconds for one full pulse cycle min_brightness: Minimum brightness (0.0-1.0) count: Number of pulses (0 = infinite until stopped) """ self.stop_effect(pad) stop_flag = threading.Event() self._effect_stop_flags[pad.value] = stop_flag def _pulse_loop(): pulse_count = 0 start_time = time.time() step_time = speed / 30 while not stop_flag.is_set(): elapsed = time.time() - start_time phase = (elapsed / speed) * 2 * math.pi brightness = (math.sin(phase - math.pi/2) + 1) / 2 brightness = min_brightness + brightness * (1 - min_brightness) r = int(color[0] * brightness) g = int(color[1] * brightness) b = int(color[2] * brightness) self._set_color_raw(pad.value, r, g, b) if elapsed >= speed * (pulse_count + 1): pulse_count += 1 if count > 0 and pulse_count >= count: self._set_color_raw(pad.value, color[0], color[1], color[2]) break if stop_flag.wait(step_time): break thread = threading.Thread(target=_pulse_loop, daemon=True) self._effect_threads[pad.value] = thread thread.start() def apply_effect(self, pad: Pad, color: List[int], effect: str = "solid"): """ Apply a named effect to a pad. Args: pad: Which pad color: [R, G, B] values effect: "solid", "flash", or "pulse" """ effect = effect.lower() if effect == "flash": self.software_flash(pad, color) elif effect == "pulse": self.software_pulse(pad, color) else: self.set_pad_color(pad, color) def _poll_events(self): """Background thread function for polling tag events.""" while self._running: try: endpoint = self._endpoint_in if hasattr(endpoint, 'bEndpointAddress'): endpoint = endpoint.bEndpointAddress if endpoint is None: endpoint = 0x81 in_packet = self.dev.read(endpoint, 32, timeout=100) bytelist = list(in_packet) if not bytelist: continue if bytelist[0] != 0x56: continue pad_num = bytelist[2] action = bytelist[5] uid = bytes(bytelist[6:13]) if uid == b'\x00' * 7: continue try: pad = Pad(pad_num) except ValueError: pad = Pad.CENTER event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED tag_info = TagInfo(uid=uid, pad=pad, event=event) # v1.3.0: NO default LED behavior - callbacks control LEDs if event == TagEvent.INSERTED: self._active_tags[pad_num] = tag_info if self.on_tag_insert: self.on_tag_insert(tag_info) else: if pad_num in self._active_tags: del self._active_tags[pad_num] self.stop_effect(pad) if self.on_tag_remove: self.on_tag_remove(tag_info) except usb.core.USBError as e: if self._is_timeout_error(e): continue elif self._is_pipe_error(e): if self._running and self.on_error: self.on_error(ConnectionError("Portal disconnected")) break elif self._running: if self.on_error: self.on_error(e) time.sleep(0.5) except Exception as e: if self._running and self.on_error: self.on_error(e) time.sleep(0.1) def start(self): """Start the event polling thread.""" if self._running: return if not self.dev: if not self.connect(): raise ConnectionError("Failed to connect to portal") self._running = True self._thread = threading.Thread(target=self._poll_events, daemon=True) self._thread.start() def stop(self): """Stop the event polling thread.""" self._running = False if self._thread: self._thread.join(timeout=2.0) self._thread = None def get_active_tags(self) -> Dict[int, TagInfo]: """Get dictionary of currently active tags on pads.""" return self._active_tags.copy() def get_character_name(self, character_id: int) -> Optional[str]: """Look up character name from character ID.""" return self._character_db.get(character_id) def get_vehicle_name(self, vehicle_id: int) -> Optional[str]: """Look up vehicle name from vehicle ID.""" return self._vehicle_db.get(vehicle_id) @property def is_connected(self) -> bool: return self.dev is not None @property def is_running(self) -> bool: return self._running # ============================================================================= # TEA Encryption/Decryption # ============================================================================= def tea_decrypt(data: bytes, key: bytes) -> bytes: def _u32(x): return c_uint32(x).value delta = 0x9e3779b9 v0 = int.from_bytes(data[0:4], 'little') v1 = int.from_bytes(data[4:8], 'little') k0 = int.from_bytes(key[0:4], 'little') k1 = int.from_bytes(key[4:8], 'little') k2 = int.from_bytes(key[8:12], 'little') k3 = int.from_bytes(key[12:16], 'little') total = _u32(delta * 32) for _ in range(32): v1 = _u32(v1 - _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3))) v0 = _u32(v0 - _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1))) total = _u32(total - delta) return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little') def tea_encrypt(data: bytes, key: bytes) -> bytes: def _u32(x): return c_uint32(x).value delta = 0x9e3779b9 v0 = int.from_bytes(data[0:4], 'little') v1 = int.from_bytes(data[4:8], 'little') k0 = int.from_bytes(key[0:4], 'little') k1 = int.from_bytes(key[4:8], 'little') k2 = int.from_bytes(key[8:12], 'little') k3 = int.from_bytes(key[12:16], 'little') total = 0 for _ in range(32): total = _u32(total + delta) v0 = _u32(v0 + _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1))) v1 = _u32(v1 + _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3))) return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little') def generate_tea_key(uid: bytes) -> bytes: key = bytearray(16) key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[2] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA key[3] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[4] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA key[5] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[6] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA key[7] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[8] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA key[9] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[10] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA key[11] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[12] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA key[13] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[14] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA key[15] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 return bytes(key) def generate_password(uid: bytes) -> bytes: pwd = bytearray(4) pwd[0] = 0xAA ^ uid[1] ^ uid[3] pwd[1] = 0x55 ^ uid[2] ^ uid[4] pwd[2] = 0xAA ^ uid[3] ^ uid[5] pwd[3] = 0x55 ^ uid[4] ^ uid[6] return bytes(pwd) def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int: key = generate_tea_key(uid) decrypted = tea_decrypt(encrypted_data, key) return int.from_bytes(decrypted[0:2], 'little') def encrypt_character_id(uid: bytes, character_id: int) -> bytes: key = generate_tea_key(uid) data = character_id.to_bytes(2, 'little') + b'\x00' * 6 return tea_encrypt(data, key) # ============================================================================= # Standalone test # ============================================================================= if __name__ == "__main__": def on_insert(tag: TagInfo): print(f"\n{'='*50}") print(f"[+] TAG PLACED on {tag.pad.name} pad") print(f" UID: {tag.uid_hex}") print(f" Legacy Key: {tag.legacy_key}") print(f" Password: {generate_password(tag.uid).hex().upper()}") print(f"{'='*50}") def on_remove(tag: TagInfo): print(f"\n[-] TAG REMOVED from {tag.pad.name} pad") def on_error(e: Exception): print(f"\n[!] ERROR: {e}") def on_connect(): print("[+] Portal connected and initialized") def on_disconnect(): print("[-] Portal disconnected") print(f"\nLEGO Dimensions Portal Reader v{__version__}") print(f"Platform: {platform.system()}") print("="*50) print("NOTE: v1.3.0 - No default LED blink on tag insert") print(" Callbacks control all LED behavior") print("="*50) reader = LegoDimensionsReader() reader.on_tag_insert = on_insert reader.on_tag_remove = on_remove reader.on_error = on_error reader.on_connect = on_connect reader.on_disconnect = on_disconnect try: reader.start() print("\nPortal Active - Place character discs...") print("Press Ctrl+C to exit\n") while True: time.sleep(1) except KeyboardInterrupt: print("\n\nShutting down...") except ConnectionError as e: print(f"\nConnection failed: {e}") finally: reader.disconnect() print("Goodbye!")