#!/usr/bin/env python3 """ LEGO Dimensions Portal Reader Module For Moonlight Drive-In Video Player System Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions) to detect character and vehicle disc placement/removal events. Requirements: pip install pyusb Windows Driver Setup: 1. Download Zadig from https://zadig.akeo.ie/ 2. Connect the LEGO Dimensions portal 3. Run Zadig as administrator 4. Select Options → List All Devices 5. Choose "LEGO READER V2.10" from the dropdown 6. Select WinUSB as the target driver 7. Click "Replace Driver" """ import usb.core import usb.util import threading import time import json import os from typing import Callable, Optional, Dict, Any from dataclasses import dataclass from enum import Enum from ctypes import c_uint32 # USB Device Identifiers VENDOR_ID = 0x0e6f PRODUCT_ID = 0x0241 # Module version __version__ = "1.0.0" class Pad(Enum): """Portal pad identifiers""" ALL = 0 CENTER = 1 LEFT = 2 RIGHT = 3 class TagEvent(Enum): """Tag event types""" INSERTED = 0 REMOVED = 1 # Predefined colors [R, G, B] COLORS = { 'OFF': [0, 0, 0], 'RED': [255, 0, 0], 'GREEN': [0, 255, 0], 'BLUE': [0, 0, 255], 'WHITE': [255, 255, 255], 'YELLOW': [255, 255, 0], 'CYAN': [0, 255, 255], 'MAGENTA': [255, 0, 255], 'ORANGE': [255, 128, 0], 'PURPLE': [128, 0, 255], 'PINK': [255, 105, 180], } # Portal initialization command (contains "(c) LEGO 2014" signature) TOYPAD_INIT = [ 0x55, 0x0f, 0xb0, 0x01, 0x28, 0x63, 0x29, 0x20, 0x4c, 0x45, 0x47, 0x4f, 0x20, 0x32, 0x30, 0x31, 0x34, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ] @dataclass class TagInfo: """Information about a detected NFC tag""" uid: bytes pad: Pad event: TagEvent uid_hex: str = "" character_id: Optional[int] = None character_name: Optional[str] = None is_vehicle: bool = False def __post_init__(self): self.uid_hex = self.uid.hex().upper() class LegoDimensionsReader: """ LEGO Dimensions Portal NFC Reader Provides event-driven tag detection with callbacks for integration with the Moonlight Drive-In video player system. Usage: reader = LegoDimensionsReader() reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.uid_hex}") reader.on_tag_remove = lambda tag: print(f"Tag removed: {tag.uid_hex}") reader.start() """ def __init__(self, character_db_path: Optional[str] = None): """ Initialize the LEGO Dimensions reader. Args: character_db_path: Optional path to character_database.json """ self.dev = None self._running = False self._thread: Optional[threading.Thread] = None self._lock = threading.Lock() # Active tags currently on pads (pad_num -> TagInfo) self._active_tags: Dict[int, TagInfo] = {} # Callbacks self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None self.on_tag_remove: Optional[Callable[[TagInfo], None]] = None self.on_error: Optional[Callable[[Exception], None]] = None self.on_connect: Optional[Callable[[], None]] = None self.on_disconnect: Optional[Callable[[], None]] = None # Character database for ID lookup self._character_db = self._load_character_database(character_db_path) self._vehicle_db = self._load_vehicle_database(character_db_path) def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]: """Load character ID database.""" # Try to load from JSON file if provided if db_path and os.path.exists(db_path): try: with open(db_path, 'r') as f: data = json.load(f) return {int(k): v for k, v in data.get('characters', {}).items()} except Exception: pass # Default built-in database return { 1: "Batman", 2: "Gandalf", 3: "Wyldstyle", 4: "Aquaman", 5: "Bad Cop", 6: "Bane", 7: "Bart Simpson", 8: "Benny", 9: "Chell", 10: "Cole", 11: "Cragger", 12: "Cyborg", 13: "Cyberman", 14: "Doc Brown", 15: "The Doctor", 16: "Emmet", 17: "Eris", 18: "Gimli", 19: "Gollum", 20: "Harley Quinn", 21: "Homer Simpson", 22: "Jay", 23: "Joker", 24: "Kai", 25: "ACU Trooper", 26: "Krusty the Clown", 27: "Laval", 28: "Legolas", 29: "Lloyd", 30: "Marty McFly", 31: "Nya", 32: "Owen Grady", 33: "Peter Venkman", 34: "Scooby-Doo", 35: "Sensei Wu", 36: "Shaggy", 37: "Slimer", 38: "Superman", 39: "Unikitty", 40: "Wicked Witch", 41: "Wonder Woman", 42: "Zane", 43: "Green Arrow", 44: "Supergirl", 45: "Abby Yates", 46: "Finn the Human", 47: "Harry Potter", 48: "Ethan Hunt", 49: "Lumpy Space Princess", 50: "Jake the Dog", 51: "B.A. Baracus", 52: "Michael Knight", 53: "Gizmo", 54: "Sonic the Hedgehog", 55: "Gamer Kid", 56: "E.T.", 57: "Lord Voldemort", 58: "Hermione Granger", 59: "Newt Scamander", 60: "Tina Goldstein", 61: "Stripe", 62: "Sloth", 63: "Beetlejuice", 64: "Kevin", 65: "Erin Gilbert", 66: "Patty Tolan", 67: "Jillian Holtzmann", 68: "Stay Puft", 69: "Chase McCain", 70: "Excalibur Batman", 71: "Raven", 72: "Beast Boy", 73: "Starfire", 74: "Blossom", 75: "Bubbles", 76: "Buttercup", 77: "Marceline", 78: "Batgirl", 79: "Robin", 80: "Knight Rider (KITT)", 81: "Gremlins Gizmo", 82: "Teen Titans Go!", } def _load_vehicle_database(self, db_path: Optional[str] = None) -> Dict[int, str]: """Load vehicle ID database.""" if db_path and os.path.exists(db_path): try: with open(db_path, 'r') as f: data = json.load(f) return {int(k): v for k, v in data.get('vehicles', {}).items()} except Exception: pass return { 1: "Batmobile", 2: "Batwing", 3: "Bat Blaster", 4: "DeLorean Time Machine", 5: "Hoverboard", 6: "Travelling Time Train", 7: "TARDIS", 8: "K9", 9: "Dalek", 10: "Companion Cube", 11: "Sentry Turret", 12: "GLaDOS", 13: "Mystery Machine", 14: "Scooby Snack", 15: "Gyrocopter", 16: "Ghost Trap", 17: "Ecto-1", 18: "PKE Meter", 19: "Emmet's Excavator", 20: "Emmet's Car", 21: "Duplo Brickster", 22: "Hover Bike", 23: "Cyber Guard", 24: "Cyber Wraith", 25: "Blade Bike", 26: "Flying White Dragon", 27: "Golden Dragon", 28: "Mega Flight Dragon", 29: "Shadowmeld", 30: "Swamp Skimmer", 31: "Cragger's Fireship", 32: "Eagle Interceptor", 33: "Eagle Sky Blazer", 34: "Eagle Swoop Diver", 35: "Gyrosphere", 36: "ACU Trooper Car", 37: "T. Rex", 38: "Homer's Car", 39: "Taunt-o-Vision", 40: "The Simpsons House", 41: "Krusty's Bike", 42: "Clown Bike", 43: "Frink's Hover Car", 44: "Gravity Sprinter", 45: "Street Shredder", 46: "Sky Clobberer", 47: "Invisible Jet", 48: "Justice Camper", 49: "Electro Bolt", 50: "Arrow Launcher", 51: "Drill Driver", 52: "Bane Dig Dig", 53: "Ancient Psychic Tandem War Elephant", 54: "Jakemobile", 55: "Lumpy Car", 56: "Ecto-1 (2016)", 57: "Ectozer", 58: "PerfEcto", 59: "B.A.'s Van", 60: "B.A.'s Super Van", 61: "Pain Plane", 62: "Sonic Speedster", 63: "Blue Typhoon", 64: "Moto Bug", 65: "IMF Scrambler", 66: "IMF Sport Car", 67: "IMF Tank", 68: "K.I.T.T.", 69: "Gozer Trap", 70: "Terror Dog", 71: "Gremlin Car", 72: "Flash 'n' Finish", 73: "Rampage Wrecking", 74: "Phone Home", 75: "Mobile Uplink", 76: "Super Charged Satellite", 77: "Enchanted Car", 78: "Harry's Broom", 79: "Newt's Case", 80: "Swooping Evil", 81: "Occamy", 82: "Niffler", } def connect(self) -> bool: """ Establish connection to LEGO Dimensions portal. Returns True if connection successful. """ try: self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) if self.dev is None: raise ConnectionError( "LEGO Dimensions Portal not found.\n" "Ensure:\n" " 1. Portal is connected via USB\n" " 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n" " 3. WinUSB driver installed via Zadig" ) # Detach kernel driver if active (Linux) try: if self.dev.is_kernel_driver_active(0): self.dev.detach_kernel_driver(0) except (AttributeError, usb.core.USBError): pass # Not available on Windows # Configure device self.dev.set_configuration() # Send initialization command self.dev.write(1, TOYPAD_INIT) # Get device info try: product = usb.util.get_string(self.dev, self.dev.iProduct) print(f"Connected to: {product}") except Exception: print("Connected to LEGO Dimensions Portal") if self.on_connect: self.on_connect() return True except usb.core.USBError as e: error_msg = str(e) if "Access denied" in error_msg or "insufficient permissions" in error_msg.lower(): print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.") if self.on_error: self.on_error(e) return False except Exception as e: if self.on_error: self.on_error(e) return False def disconnect(self): """Disconnect from portal and clean up resources.""" self.stop() if self.dev: try: # Turn off all LEDs self.set_pad_color(Pad.ALL, COLORS['OFF']) usb.util.dispose_resources(self.dev) except Exception: pass self.dev = None if self.on_disconnect: self.on_disconnect() def _calculate_checksum(self, command: list) -> int: """Calculate checksum (sum of bytes mod 256)""" return sum(command) % 256 def _send_command(self, command: list): """Send a command to the portal with checksum and padding.""" if not self.dev: return checksum = self._calculate_checksum(command) message = command + [checksum] # Pad to 32 bytes while len(message) < 32: message.append(0x00) try: with self._lock: self.dev.write(1, message) except usb.core.USBError: pass # Ignore write errors def set_pad_color(self, pad: Pad, color: list): """ Set a pad to a specific color immediately. Args: pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT) color: [R, G, B] values 0-255 """ command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]] self._send_command(command) def flash_pad(self, pad: Pad, color: list, on_time: int = 10, off_time: int = 10, count: int = 5): """ Flash a pad with a color. Args: pad: Which pad to flash color: [R, G, B] values on_time: Ticks for LED on state (1 tick ≈ 50ms) off_time: Ticks for LED off state count: Number of flashes (255 = infinite) """ command = [ 0x55, 0x0e, 0xc8, 0x06, pad.value, color[0], color[1], color[2], on_time, off_time, count, 0, 0, 0, 0, 0 # Return color (off) ] self._send_command(command) def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1): """ Fade a pad to a color. Args: pad: Which pad to fade color: [R, G, B] target color speed: Fade speed (higher = slower) count: Number of fade pulses (255 = infinite) """ command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count, color[0], color[1], color[2]] self._send_command(command) def rainbow_cycle(self, pad: Pad = Pad.ALL, speed: int = 5): """ Start a rainbow color cycle on a pad. Args: pad: Which pad to animate speed: Cycle speed (1-20, lower = faster) """ # Cycle through colors using fade colors_cycle = [ COLORS['RED'], COLORS['ORANGE'], COLORS['YELLOW'], COLORS['GREEN'], COLORS['CYAN'], COLORS['BLUE'], COLORS['PURPLE'], COLORS['MAGENTA'] ] for color in colors_cycle: self.fade_pad(pad, color, speed=speed, count=1) time.sleep(speed * 0.05) def _poll_events(self): """Background thread function for polling tag events.""" while self._running: try: # Read from IN endpoint with timeout in_packet = self.dev.read(0x81, 32, timeout=100) bytelist = list(in_packet) if not bytelist: continue # NFC tag events start with 0x56 if bytelist[0] != 0x56: continue # Parse tag event pad_num = bytelist[2] action = bytelist[5] uid = bytes(bytelist[6:13]) # Validate UID (not all zeros) if uid == b'\x00' * 7: continue try: pad = Pad(pad_num) except ValueError: pad = Pad.CENTER event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED tag_info = TagInfo(uid=uid, pad=pad, event=event) # Track active tags and fire callbacks if event == TagEvent.INSERTED: self._active_tags[pad_num] = tag_info # Visual feedback - green flash for detected tag self.flash_pad(pad, COLORS['GREEN'], on_time=5, off_time=5, count=2) time.sleep(0.1) self.set_pad_color(pad, COLORS['CYAN']) if self.on_tag_insert: self.on_tag_insert(tag_info) else: if pad_num in self._active_tags: del self._active_tags[pad_num] # Turn off pad LED self.set_pad_color(pad, COLORS['OFF']) if self.on_tag_remove: self.on_tag_remove(tag_info) except usb.core.USBError as e: if e.errno == 110 or "timeout" in str(e).lower(): # Timeout - normal continue elif self._running: if self.on_error: self.on_error(e) time.sleep(0.5) except Exception as e: if self._running and self.on_error: self.on_error(e) time.sleep(0.1) def start(self): """Start the event polling thread.""" if self._running: return if not self.dev: if not self.connect(): raise ConnectionError("Failed to connect to portal") self._running = True self._thread = threading.Thread(target=self._poll_events, daemon=True) self._thread.start() # Initial LED animation to show portal is active for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]: self.flash_pad(pad, COLORS['BLUE'], on_time=3, off_time=3, count=2) time.sleep(0.1) def stop(self): """Stop the event polling thread.""" self._running = False if self._thread: self._thread.join(timeout=2.0) self._thread = None def get_active_tags(self) -> Dict[int, TagInfo]: """Get dictionary of currently active tags on pads.""" return self._active_tags.copy() def get_character_name(self, character_id: int) -> Optional[str]: """Look up character name from character ID.""" return self._character_db.get(character_id) def get_vehicle_name(self, vehicle_id: int) -> Optional[str]: """Look up vehicle name from vehicle ID.""" return self._vehicle_db.get(vehicle_id) @property def is_connected(self) -> bool: """Check if portal is connected.""" return self.dev is not None @property def is_running(self) -> bool: """Check if event polling is active.""" return self._running # ============================================================================= # TEA Encryption/Decryption for Character ID # Based on community reverse-engineering (ags131, bettse, socram8888) # ============================================================================= def tea_decrypt(data: bytes, key: bytes) -> bytes: """ TEA (Tiny Encryption Algorithm) decryption. Used to decrypt character IDs from tag pages 0x24-0x25. """ def _u32(x): return c_uint32(x).value delta = 0x9e3779b9 # Convert to 32-bit words v0 = int.from_bytes(data[0:4], 'little') v1 = int.from_bytes(data[4:8], 'little') k0 = int.from_bytes(key[0:4], 'little') k1 = int.from_bytes(key[4:8], 'little') k2 = int.from_bytes(key[8:12], 'little') k3 = int.from_bytes(key[12:16], 'little') total = _u32(delta * 32) for _ in range(32): v1 = _u32(v1 - _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3))) v0 = _u32(v0 - _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1))) total = _u32(total - delta) return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little') def tea_encrypt(data: bytes, key: bytes) -> bytes: """ TEA (Tiny Encryption Algorithm) encryption. Used to encrypt character IDs for writing to tags. """ def _u32(x): return c_uint32(x).value delta = 0x9e3779b9 v0 = int.from_bytes(data[0:4], 'little') v1 = int.from_bytes(data[4:8], 'little') k0 = int.from_bytes(key[0:4], 'little') k1 = int.from_bytes(key[4:8], 'little') k2 = int.from_bytes(key[8:12], 'little') k3 = int.from_bytes(key[12:16], 'little') total = 0 for _ in range(32): total = _u32(total + delta) v0 = _u32(v0 + _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1))) v1 = _u32(v1 + _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3))) return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little') def generate_tea_key(uid: bytes) -> bytes: """ Generate TEA encryption key from tag UID. The key is derived by scrambling the 7-byte UID into 16 bytes. """ key = bytearray(16) key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[2] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA key[3] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[4] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA key[5] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[6] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA key[7] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[8] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA key[9] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[10] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA key[11] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[12] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA key[13] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[14] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA key[15] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 return bytes(key) def generate_password(uid: bytes) -> bytes: """ Generate NTAG213 password from 7-byte UID. This password protects pages 0x24+ on the tag. """ pwd = bytearray(4) pwd[0] = 0xAA ^ uid[1] ^ uid[3] pwd[1] = 0x55 ^ uid[2] ^ uid[4] pwd[2] = 0xAA ^ uid[3] ^ uid[5] pwd[3] = 0x55 ^ uid[4] ^ uid[6] return bytes(pwd) def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int: """ Decrypt character ID from tag memory pages 0x24-0x25. Args: uid: 7-byte tag UID encrypted_data: 8 bytes from pages 0x24-0x25 Returns: Decrypted character ID integer """ key = generate_tea_key(uid) decrypted = tea_decrypt(encrypted_data, key) return int.from_bytes(decrypted[0:2], 'little') def encrypt_character_id(uid: bytes, character_id: int) -> bytes: """ Encrypt character ID for writing to tag pages 0x24-0x25. Args: uid: 7-byte tag UID character_id: Character ID to encrypt Returns: 8 bytes of encrypted data """ key = generate_tea_key(uid) # Character ID in first 2 bytes, rest is padding data = character_id.to_bytes(2, 'little') + b'\x00' * 6 return tea_encrypt(data, key) # ============================================================================= # Standalone test # ============================================================================= if __name__ == "__main__": def on_insert(tag: TagInfo): print(f"\n{'='*50}") print(f"✓ TAG PLACED on {tag.pad.name} pad") print(f" UID: {tag.uid_hex}") print(f" Password: {generate_password(tag.uid).hex().upper()}") print(f"{'='*50}") def on_remove(tag: TagInfo): print(f"\n✗ TAG REMOVED from {tag.pad.name} pad") def on_error(e: Exception): print(f"\n⚠ ERROR: {e}") def on_connect(): print("✓ Portal connected and initialized") def on_disconnect(): print("✗ Portal disconnected") print(f"\nLEGO Dimensions Portal Reader v{__version__}") print("="*50) reader = LegoDimensionsReader() reader.on_tag_insert = on_insert reader.on_tag_remove = on_remove reader.on_error = on_error reader.on_connect = on_connect reader.on_disconnect = on_disconnect try: reader.start() print("\nPortal Active - Place character discs...") print("Press Ctrl+C to exit\n") while True: time.sleep(1) except KeyboardInterrupt: print("\n\nShutting down...") except ConnectionError as e: print(f"\nConnection failed: {e}") finally: reader.disconnect() print("Goodbye!")