From a4650b33af1279533a1e3009b562eb80cc4d349a Mon Sep 17 00:00:00 2001 From: jessikitty Date: Sat, 24 Jan 2026 10:33:48 +1100 Subject: [PATCH] Add main LEGO Dimensions portal reader module --- lego_dimensions_reader.py | 771 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 771 insertions(+) create mode 100644 lego_dimensions_reader.py diff --git a/lego_dimensions_reader.py b/lego_dimensions_reader.py new file mode 100644 index 0000000..000981b --- /dev/null +++ b/lego_dimensions_reader.py @@ -0,0 +1,771 @@ +#!/usr/bin/env python3 +""" +LEGO Dimensions Portal Reader Module +For Moonlight Drive-In Video Player System + +Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions) +to detect character and vehicle disc placement/removal events. + +Requirements: + pip install pyusb + +Windows Driver Setup: + 1. Download Zadig from https://zadig.akeo.ie/ + 2. Connect the LEGO Dimensions portal + 3. Run Zadig as administrator + 4. Select Options → List All Devices + 5. Choose "LEGO READER V2.10" from the dropdown + 6. Select WinUSB as the target driver + 7. Click "Replace Driver" +""" + +import usb.core +import usb.util +import threading +import time +import json +import os +from typing import Callable, Optional, Dict, Any +from dataclasses import dataclass +from enum import Enum +from ctypes import c_uint32 + +# USB Device Identifiers +VENDOR_ID = 0x0e6f +PRODUCT_ID = 0x0241 + +# Module version +__version__ = "1.0.0" + + +class Pad(Enum): + """Portal pad identifiers""" + ALL = 0 + CENTER = 1 + LEFT = 2 + RIGHT = 3 + + +class TagEvent(Enum): + """Tag event types""" + INSERTED = 0 + REMOVED = 1 + + +# Predefined colors [R, G, B] +COLORS = { + 'OFF': [0, 0, 0], + 'RED': [255, 0, 0], + 'GREEN': [0, 255, 0], + 'BLUE': [0, 0, 255], + 'WHITE': [255, 255, 255], + 'YELLOW': [255, 255, 0], + 'CYAN': [0, 255, 255], + 'MAGENTA': [255, 0, 255], + 'ORANGE': [255, 128, 0], + 'PURPLE': [128, 0, 255], + 'PINK': [255, 105, 180], +} + +# Portal initialization command (contains "(c) LEGO 2014" signature) +TOYPAD_INIT = [ + 0x55, 0x0f, 0xb0, 0x01, 0x28, 0x63, 0x29, 0x20, + 0x4c, 0x45, 0x47, 0x4f, 0x20, 0x32, 0x30, 0x31, + 0x34, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 +] + + +@dataclass +class TagInfo: + """Information about a detected NFC tag""" + uid: bytes + pad: Pad + event: TagEvent + uid_hex: str = "" + character_id: Optional[int] = None + character_name: Optional[str] = None + is_vehicle: bool = False + + def __post_init__(self): + self.uid_hex = self.uid.hex().upper() + + +class LegoDimensionsReader: + """ + LEGO Dimensions Portal NFC Reader + + Provides event-driven tag detection with callbacks for integration + with the Moonlight Drive-In video player system. + + Usage: + reader = LegoDimensionsReader() + reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.uid_hex}") + reader.on_tag_remove = lambda tag: print(f"Tag removed: {tag.uid_hex}") + reader.start() + """ + + def __init__(self, character_db_path: Optional[str] = None): + """ + Initialize the LEGO Dimensions reader. + + Args: + character_db_path: Optional path to character_database.json + """ + self.dev = None + self._running = False + self._thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + + # Active tags currently on pads (pad_num -> TagInfo) + self._active_tags: Dict[int, TagInfo] = {} + + # Callbacks + self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None + self.on_tag_remove: Optional[Callable[[TagInfo], None]] = None + self.on_error: Optional[Callable[[Exception], None]] = None + self.on_connect: Optional[Callable[[], None]] = None + self.on_disconnect: Optional[Callable[[], None]] = None + + # Character database for ID lookup + self._character_db = self._load_character_database(character_db_path) + self._vehicle_db = self._load_vehicle_database(character_db_path) + + def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]: + """Load character ID database.""" + # Try to load from JSON file if provided + if db_path and os.path.exists(db_path): + try: + with open(db_path, 'r') as f: + data = json.load(f) + return {int(k): v for k, v in data.get('characters', {}).items()} + except Exception: + pass + + # Default built-in database + return { + 1: "Batman", + 2: "Gandalf", + 3: "Wyldstyle", + 4: "Aquaman", + 5: "Bad Cop", + 6: "Bane", + 7: "Bart Simpson", + 8: "Benny", + 9: "Chell", + 10: "Cole", + 11: "Cragger", + 12: "Cyborg", + 13: "Cyberman", + 14: "Doc Brown", + 15: "The Doctor", + 16: "Emmet", + 17: "Eris", + 18: "Gimli", + 19: "Gollum", + 20: "Harley Quinn", + 21: "Homer Simpson", + 22: "Jay", + 23: "Joker", + 24: "Kai", + 25: "ACU Trooper", + 26: "Krusty the Clown", + 27: "Laval", + 28: "Legolas", + 29: "Lloyd", + 30: "Marty McFly", + 31: "Nya", + 32: "Owen Grady", + 33: "Peter Venkman", + 34: "Scooby-Doo", + 35: "Sensei Wu", + 36: "Shaggy", + 37: "Slimer", + 38: "Superman", + 39: "Unikitty", + 40: "Wicked Witch", + 41: "Wonder Woman", + 42: "Zane", + 43: "Green Arrow", + 44: "Supergirl", + 45: "Abby Yates", + 46: "Finn the Human", + 47: "Harry Potter", + 48: "Ethan Hunt", + 49: "Lumpy Space Princess", + 50: "Jake the Dog", + 51: "B.A. Baracus", + 52: "Michael Knight", + 53: "Gizmo", + 54: "Sonic the Hedgehog", + 55: "Gamer Kid", + 56: "E.T.", + 57: "Lord Voldemort", + 58: "Hermione Granger", + 59: "Newt Scamander", + 60: "Tina Goldstein", + 61: "Stripe", + 62: "Sloth", + 63: "Beetlejuice", + 64: "Kevin", + 65: "Erin Gilbert", + 66: "Patty Tolan", + 67: "Jillian Holtzmann", + 68: "Stay Puft", + 69: "Chase McCain", + 70: "Excalibur Batman", + 71: "Raven", + 72: "Beast Boy", + 73: "Starfire", + 74: "Blossom", + 75: "Bubbles", + 76: "Buttercup", + 77: "Marceline", + 78: "Batgirl", + 79: "Robin", + 80: "Knight Rider (KITT)", + 81: "Gremlins Gizmo", + 82: "Teen Titans Go!", + } + + def _load_vehicle_database(self, db_path: Optional[str] = None) -> Dict[int, str]: + """Load vehicle ID database.""" + if db_path and os.path.exists(db_path): + try: + with open(db_path, 'r') as f: + data = json.load(f) + return {int(k): v for k, v in data.get('vehicles', {}).items()} + except Exception: + pass + + return { + 1: "Batmobile", + 2: "Batwing", + 3: "Bat Blaster", + 4: "DeLorean Time Machine", + 5: "Hoverboard", + 6: "Travelling Time Train", + 7: "TARDIS", + 8: "K9", + 9: "Dalek", + 10: "Companion Cube", + 11: "Sentry Turret", + 12: "GLaDOS", + 13: "Mystery Machine", + 14: "Scooby Snack", + 15: "Gyrocopter", + 16: "Ghost Trap", + 17: "Ecto-1", + 18: "PKE Meter", + 19: "Emmet's Excavator", + 20: "Emmet's Car", + 21: "Duplo Brickster", + 22: "Hover Bike", + 23: "Cyber Guard", + 24: "Cyber Wraith", + 25: "Blade Bike", + 26: "Flying White Dragon", + 27: "Golden Dragon", + 28: "Mega Flight Dragon", + 29: "Shadowmeld", + 30: "Swamp Skimmer", + 31: "Cragger's Fireship", + 32: "Eagle Interceptor", + 33: "Eagle Sky Blazer", + 34: "Eagle Swoop Diver", + 35: "Gyrosphere", + 36: "ACU Trooper Car", + 37: "T. Rex", + 38: "Homer's Car", + 39: "Taunt-o-Vision", + 40: "The Simpsons House", + 41: "Krusty's Bike", + 42: "Clown Bike", + 43: "Frink's Hover Car", + 44: "Gravity Sprinter", + 45: "Street Shredder", + 46: "Sky Clobberer", + 47: "Invisible Jet", + 48: "Justice Camper", + 49: "Electro Bolt", + 50: "Arrow Launcher", + 51: "Drill Driver", + 52: "Bane Dig Dig", + 53: "Ancient Psychic Tandem War Elephant", + 54: "Jakemobile", + 55: "Lumpy Car", + 56: "Ecto-1 (2016)", + 57: "Ectozer", + 58: "PerfEcto", + 59: "B.A.'s Van", + 60: "B.A.'s Super Van", + 61: "Pain Plane", + 62: "Sonic Speedster", + 63: "Blue Typhoon", + 64: "Moto Bug", + 65: "IMF Scrambler", + 66: "IMF Sport Car", + 67: "IMF Tank", + 68: "K.I.T.T.", + 69: "Gozer Trap", + 70: "Terror Dog", + 71: "Gremlin Car", + 72: "Flash 'n' Finish", + 73: "Rampage Wrecking", + 74: "Phone Home", + 75: "Mobile Uplink", + 76: "Super Charged Satellite", + 77: "Enchanted Car", + 78: "Harry's Broom", + 79: "Newt's Case", + 80: "Swooping Evil", + 81: "Occamy", + 82: "Niffler", + } + + def connect(self) -> bool: + """ + Establish connection to LEGO Dimensions portal. + Returns True if connection successful. + """ + try: + self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) + + if self.dev is None: + raise ConnectionError( + "LEGO Dimensions Portal not found.\n" + "Ensure:\n" + " 1. Portal is connected via USB\n" + " 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n" + " 3. WinUSB driver installed via Zadig" + ) + + # Detach kernel driver if active (Linux) + try: + if self.dev.is_kernel_driver_active(0): + self.dev.detach_kernel_driver(0) + except (AttributeError, usb.core.USBError): + pass # Not available on Windows + + # Configure device + self.dev.set_configuration() + + # Send initialization command + self.dev.write(1, TOYPAD_INIT) + + # Get device info + try: + product = usb.util.get_string(self.dev, self.dev.iProduct) + print(f"Connected to: {product}") + except Exception: + print("Connected to LEGO Dimensions Portal") + + if self.on_connect: + self.on_connect() + + return True + + except usb.core.USBError as e: + error_msg = str(e) + if "Access denied" in error_msg or "insufficient permissions" in error_msg.lower(): + print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.") + if self.on_error: + self.on_error(e) + return False + except Exception as e: + if self.on_error: + self.on_error(e) + return False + + def disconnect(self): + """Disconnect from portal and clean up resources.""" + self.stop() + if self.dev: + try: + # Turn off all LEDs + self.set_pad_color(Pad.ALL, COLORS['OFF']) + usb.util.dispose_resources(self.dev) + except Exception: + pass + self.dev = None + if self.on_disconnect: + self.on_disconnect() + + def _calculate_checksum(self, command: list) -> int: + """Calculate checksum (sum of bytes mod 256)""" + return sum(command) % 256 + + def _send_command(self, command: list): + """Send a command to the portal with checksum and padding.""" + if not self.dev: + return + + checksum = self._calculate_checksum(command) + message = command + [checksum] + + # Pad to 32 bytes + while len(message) < 32: + message.append(0x00) + + try: + with self._lock: + self.dev.write(1, message) + except usb.core.USBError: + pass # Ignore write errors + + def set_pad_color(self, pad: Pad, color: list): + """ + Set a pad to a specific color immediately. + + Args: + pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT) + color: [R, G, B] values 0-255 + """ + command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]] + self._send_command(command) + + def flash_pad(self, pad: Pad, color: list, on_time: int = 10, + off_time: int = 10, count: int = 5): + """ + Flash a pad with a color. + + Args: + pad: Which pad to flash + color: [R, G, B] values + on_time: Ticks for LED on state (1 tick ≈ 50ms) + off_time: Ticks for LED off state + count: Number of flashes (255 = infinite) + """ + command = [ + 0x55, 0x0e, 0xc8, 0x06, pad.value, + color[0], color[1], color[2], + on_time, off_time, count, + 0, 0, 0, 0, 0 # Return color (off) + ] + self._send_command(command) + + def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1): + """ + Fade a pad to a color. + + Args: + pad: Which pad to fade + color: [R, G, B] target color + speed: Fade speed (higher = slower) + count: Number of fade pulses (255 = infinite) + """ + command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count, + color[0], color[1], color[2]] + self._send_command(command) + + def rainbow_cycle(self, pad: Pad = Pad.ALL, speed: int = 5): + """ + Start a rainbow color cycle on a pad. + + Args: + pad: Which pad to animate + speed: Cycle speed (1-20, lower = faster) + """ + # Cycle through colors using fade + colors_cycle = [ + COLORS['RED'], COLORS['ORANGE'], COLORS['YELLOW'], + COLORS['GREEN'], COLORS['CYAN'], COLORS['BLUE'], + COLORS['PURPLE'], COLORS['MAGENTA'] + ] + for color in colors_cycle: + self.fade_pad(pad, color, speed=speed, count=1) + time.sleep(speed * 0.05) + + def _poll_events(self): + """Background thread function for polling tag events.""" + while self._running: + try: + # Read from IN endpoint with timeout + in_packet = self.dev.read(0x81, 32, timeout=100) + bytelist = list(in_packet) + + if not bytelist: + continue + + # NFC tag events start with 0x56 + if bytelist[0] != 0x56: + continue + + # Parse tag event + pad_num = bytelist[2] + action = bytelist[5] + uid = bytes(bytelist[6:13]) + + # Validate UID (not all zeros) + if uid == b'\x00' * 7: + continue + + try: + pad = Pad(pad_num) + except ValueError: + pad = Pad.CENTER + + event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED + tag_info = TagInfo(uid=uid, pad=pad, event=event) + + # Track active tags and fire callbacks + if event == TagEvent.INSERTED: + self._active_tags[pad_num] = tag_info + + # Visual feedback - green flash for detected tag + self.flash_pad(pad, COLORS['GREEN'], on_time=5, off_time=5, count=2) + time.sleep(0.1) + self.set_pad_color(pad, COLORS['CYAN']) + + if self.on_tag_insert: + self.on_tag_insert(tag_info) + else: + if pad_num in self._active_tags: + del self._active_tags[pad_num] + + # Turn off pad LED + self.set_pad_color(pad, COLORS['OFF']) + + if self.on_tag_remove: + self.on_tag_remove(tag_info) + + except usb.core.USBError as e: + if e.errno == 110 or "timeout" in str(e).lower(): # Timeout - normal + continue + elif self._running: + if self.on_error: + self.on_error(e) + time.sleep(0.5) + except Exception as e: + if self._running and self.on_error: + self.on_error(e) + time.sleep(0.1) + + def start(self): + """Start the event polling thread.""" + if self._running: + return + + if not self.dev: + if not self.connect(): + raise ConnectionError("Failed to connect to portal") + + self._running = True + self._thread = threading.Thread(target=self._poll_events, daemon=True) + self._thread.start() + + # Initial LED animation to show portal is active + for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]: + self.flash_pad(pad, COLORS['BLUE'], on_time=3, off_time=3, count=2) + time.sleep(0.1) + + def stop(self): + """Stop the event polling thread.""" + self._running = False + if self._thread: + self._thread.join(timeout=2.0) + self._thread = None + + def get_active_tags(self) -> Dict[int, TagInfo]: + """Get dictionary of currently active tags on pads.""" + return self._active_tags.copy() + + def get_character_name(self, character_id: int) -> Optional[str]: + """Look up character name from character ID.""" + return self._character_db.get(character_id) + + def get_vehicle_name(self, vehicle_id: int) -> Optional[str]: + """Look up vehicle name from vehicle ID.""" + return self._vehicle_db.get(vehicle_id) + + @property + def is_connected(self) -> bool: + """Check if portal is connected.""" + return self.dev is not None + + @property + def is_running(self) -> bool: + """Check if event polling is active.""" + return self._running + + +# ============================================================================= +# TEA Encryption/Decryption for Character ID +# Based on community reverse-engineering (ags131, bettse, socram8888) +# ============================================================================= + +def tea_decrypt(data: bytes, key: bytes) -> bytes: + """ + TEA (Tiny Encryption Algorithm) decryption. + Used to decrypt character IDs from tag pages 0x24-0x25. + """ + def _u32(x): + return c_uint32(x).value + + delta = 0x9e3779b9 + + # Convert to 32-bit words + v0 = int.from_bytes(data[0:4], 'little') + v1 = int.from_bytes(data[4:8], 'little') + k0 = int.from_bytes(key[0:4], 'little') + k1 = int.from_bytes(key[4:8], 'little') + k2 = int.from_bytes(key[8:12], 'little') + k3 = int.from_bytes(key[12:16], 'little') + + total = _u32(delta * 32) + + for _ in range(32): + v1 = _u32(v1 - _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3))) + v0 = _u32(v0 - _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1))) + total = _u32(total - delta) + + return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little') + + +def tea_encrypt(data: bytes, key: bytes) -> bytes: + """ + TEA (Tiny Encryption Algorithm) encryption. + Used to encrypt character IDs for writing to tags. + """ + def _u32(x): + return c_uint32(x).value + + delta = 0x9e3779b9 + + v0 = int.from_bytes(data[0:4], 'little') + v1 = int.from_bytes(data[4:8], 'little') + k0 = int.from_bytes(key[0:4], 'little') + k1 = int.from_bytes(key[4:8], 'little') + k2 = int.from_bytes(key[8:12], 'little') + k3 = int.from_bytes(key[12:16], 'little') + + total = 0 + + for _ in range(32): + total = _u32(total + delta) + v0 = _u32(v0 + _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1))) + v1 = _u32(v1 + _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3))) + + return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little') + + +def generate_tea_key(uid: bytes) -> bytes: + """ + Generate TEA encryption key from tag UID. + The key is derived by scrambling the 7-byte UID into 16 bytes. + """ + key = bytearray(16) + key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA + key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 + key[2] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA + key[3] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 + key[4] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA + key[5] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 + key[6] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA + key[7] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 + key[8] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA + key[9] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 + key[10] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA + key[11] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 + key[12] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA + key[13] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 + key[14] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA + key[15] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 + return bytes(key) + + +def generate_password(uid: bytes) -> bytes: + """ + Generate NTAG213 password from 7-byte UID. + This password protects pages 0x24+ on the tag. + """ + pwd = bytearray(4) + pwd[0] = 0xAA ^ uid[1] ^ uid[3] + pwd[1] = 0x55 ^ uid[2] ^ uid[4] + pwd[2] = 0xAA ^ uid[3] ^ uid[5] + pwd[3] = 0x55 ^ uid[4] ^ uid[6] + return bytes(pwd) + + +def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int: + """ + Decrypt character ID from tag memory pages 0x24-0x25. + + Args: + uid: 7-byte tag UID + encrypted_data: 8 bytes from pages 0x24-0x25 + + Returns: + Decrypted character ID integer + """ + key = generate_tea_key(uid) + decrypted = tea_decrypt(encrypted_data, key) + return int.from_bytes(decrypted[0:2], 'little') + + +def encrypt_character_id(uid: bytes, character_id: int) -> bytes: + """ + Encrypt character ID for writing to tag pages 0x24-0x25. + + Args: + uid: 7-byte tag UID + character_id: Character ID to encrypt + + Returns: + 8 bytes of encrypted data + """ + key = generate_tea_key(uid) + # Character ID in first 2 bytes, rest is padding + data = character_id.to_bytes(2, 'little') + b'\x00' * 6 + return tea_encrypt(data, key) + + +# ============================================================================= +# Standalone test +# ============================================================================= + +if __name__ == "__main__": + def on_insert(tag: TagInfo): + print(f"\n{'='*50}") + print(f"✓ TAG PLACED on {tag.pad.name} pad") + print(f" UID: {tag.uid_hex}") + print(f" Password: {generate_password(tag.uid).hex().upper()}") + print(f"{'='*50}") + + def on_remove(tag: TagInfo): + print(f"\n✗ TAG REMOVED from {tag.pad.name} pad") + + def on_error(e: Exception): + print(f"\n⚠ ERROR: {e}") + + def on_connect(): + print("✓ Portal connected and initialized") + + def on_disconnect(): + print("✗ Portal disconnected") + + print(f"\nLEGO Dimensions Portal Reader v{__version__}") + print("="*50) + + reader = LegoDimensionsReader() + reader.on_tag_insert = on_insert + reader.on_tag_remove = on_remove + reader.on_error = on_error + reader.on_connect = on_connect + reader.on_disconnect = on_disconnect + + try: + reader.start() + print("\nPortal Active - Place character discs...") + print("Press Ctrl+C to exit\n") + + while True: + time.sleep(1) + + except KeyboardInterrupt: + print("\n\nShutting down...") + except ConnectionError as e: + print(f"\nConnection failed: {e}") + finally: + reader.disconnect() + print("Goodbye!")