#!/usr/bin/env python3 """ LEGO Dimensions Portal Reader Module For Moonlight Drive-In Video Player System Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions) to detect character and vehicle disc placement/removal events. Requirements: pip install pyusb libusb-package Windows Driver Setup: 1. Download Zadig from https://zadig.akeo.ie/ 2. Connect the LEGO Dimensions portal 3. Run Zadig as administrator 4. Select Options -> List All Devices 5. Choose "LEGO READER V2.10" from the dropdown 6. Select WinUSB as the target driver 7. Click "Replace Driver" Windows libusb Setup: The libusb-package provides pre-compiled libusb-1.0.dll. Alternatively, download from https://libusb.info/ and place libusb-1.0.dll in C:\\Windows\\System32 """ import sys import os import platform # Windows-specific backend setup - must be done BEFORE importing usb.core def _setup_windows_backend(): """Configure libusb backend for Windows.""" if platform.system() != 'Windows': return # Try libusb-package first (recommended) try: import libusb_package return except ImportError: pass # Search for libusb DLL in common locations dll_paths = [ os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'System32'), os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'SysWOW64'), os.path.dirname(sys.executable), os.getcwd(), ] for path in dll_paths: dll_file = os.path.join(path, 'libusb-1.0.dll') if os.path.exists(dll_file): os.environ['PATH'] = path + os.pathsep + os.environ.get('PATH', '') return # Run setup before USB imports _setup_windows_backend() import usb.core import usb.util import usb.backend.libusb1 import threading import time import json from typing import Callable, Optional, Dict, Any from dataclasses import dataclass from enum import Enum from ctypes import c_uint32 # USB Device Identifiers VENDOR_ID = 0x0e6f PRODUCT_ID = 0x0241 # Module version __version__ = "1.2.2" def get_usb_backend(): """Get the appropriate USB backend for the current platform.""" if platform.system() == 'Windows': # Try libusb-package first try: import libusb_package return libusb_package.get_libusb1_backend() except (ImportError, Exception): pass # Try default libusb1 backend try: return usb.backend.libusb1.get_backend() except Exception: pass return None # Use default backend class Pad(Enum): """Portal pad identifiers""" ALL = 0 CENTER = 1 LEFT = 2 RIGHT = 3 class TagEvent(Enum): """Tag event types""" INSERTED = 0 REMOVED = 1 # Predefined colors [R, G, B] COLORS = { 'OFF': [0, 0, 0], 'RED': [255, 0, 0], 'GREEN': [0, 255, 0], 'BLUE': [0, 0, 255], 'WHITE': [255, 255, 255], 'YELLOW': [255, 255, 0], 'CYAN': [0, 255, 255], 'MAGENTA': [255, 0, 255], 'ORANGE': [255, 128, 0], 'PURPLE': [128, 0, 255], 'PINK': [255, 105, 180], } # Portal initialization command (contains "(c) LEGO 2014" signature) TOYPAD_INIT = [ 0x55, 0x0f, 0xb0, 0x01, 0x28, 0x63, 0x29, 0x20, 0x4c, 0x45, 0x47, 0x4f, 0x20, 0x32, 0x30, 0x31, 0x34, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ] @dataclass class TagInfo: """Information about a detected NFC tag""" uid: bytes pad: Pad event: TagEvent uid_hex: str = "" character_id: Optional[int] = None character_name: Optional[str] = None is_vehicle: bool = False def __post_init__(self): self.uid_hex = self.uid.hex().upper() class LegoDimensionsReader: """ LEGO Dimensions Portal NFC Reader Provides event-driven tag detection with callbacks for integration with the Moonlight Drive-In video player system. Usage: reader = LegoDimensionsReader() reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.uid_hex}") reader.on_tag_remove = lambda tag: print(f"Tag removed: {tag.uid_hex}") reader.start() """ def __init__(self, character_db_path: Optional[str] = None): """ Initialize the LEGO Dimensions reader. Args: character_db_path: Optional path to character_database.json """ self.dev = None self._running = False self._thread: Optional[threading.Thread] = None self._lock = threading.Lock() # USB endpoints (will be discovered on connect) self._endpoint_in = None self._endpoint_out = None self._interface = None # Active tags currently on pads (pad_num -> TagInfo) self._active_tags: Dict[int, TagInfo] = {} # Flash stop events per pad (for cancelling software flash) self._flash_stop_events: Dict[int, threading.Event] = {} # Callbacks self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None self.on_tag_remove: Optional[Callable[[TagInfo], None]] = None self.on_error: Optional[Callable[[Exception], None]] = None self.on_connect: Optional[Callable[[], None]] = None self.on_disconnect: Optional[Callable[[], None]] = None # Character database for ID lookup self._character_db = self._load_character_database(character_db_path) self._vehicle_db = self._load_vehicle_database(character_db_path) def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]: """Load character ID database.""" # Try to load from JSON file if provided if db_path and os.path.exists(db_path): try: with open(db_path, 'r', encoding='utf-8') as f: data = json.load(f) return {int(k): v for k, v in data.get('characters', {}).items()} except Exception: pass # Default built-in database return { 1: "Batman", 2: "Gandalf", 3: "Wyldstyle", 4: "Aquaman", 5: "Bad Cop", 6: "Bane", 7: "Bart Simpson", 8: "Benny", 9: "Chell", 10: "Cole", 11: "Cragger", 12: "Cyborg", 13: "Cyberman", 14: "Doc Brown", 15: "The Doctor", 16: "Emmet", 17: "Eris", 18: "Gimli", 19: "Gollum", 20: "Harley Quinn", 21: "Homer Simpson", 22: "Jay", 23: "Joker", 24: "Kai", 25: "ACU Trooper", 26: "Krusty the Clown", 27: "Laval", 28: "Legolas", 29: "Lloyd", 30: "Marty McFly", 31: "Nya", 32: "Owen Grady", 33: "Peter Venkman", 34: "Scooby-Doo", 35: "Sensei Wu", 36: "Shaggy", 37: "Slimer", 38: "Superman", 39: "Unikitty", 40: "Wicked Witch", 41: "Wonder Woman", 42: "Zane", 43: "Green Arrow", 44: "Supergirl", 45: "Abby Yates", 46: "Finn the Human", 47: "Harry Potter", 48: "Ethan Hunt", 49: "Lumpy Space Princess", 50: "Jake the Dog", 51: "B.A. Baracus", 52: "Michael Knight", 53: "Gizmo", 54: "Sonic the Hedgehog", 55: "Gamer Kid", 56: "E.T.", 57: "Lord Voldemort", 58: "Hermione Granger", 59: "Newt Scamander", 60: "Tina Goldstein", 61: "Stripe", 62: "Sloth", 63: "Beetlejuice", 64: "Kevin", 65: "Erin Gilbert", 66: "Patty Tolan", 67: "Jillian Holtzmann", 68: "Stay Puft", 69: "Chase McCain", 70: "Excalibur Batman", 71: "Raven", 72: "Beast Boy", 73: "Starfire", 74: "Blossom", 75: "Bubbles", 76: "Buttercup", 77: "Marceline", 78: "Batgirl", 79: "Robin", 80: "Knight Rider (KITT)", 81: "Gremlins Gizmo", 82: "Teen Titans Go!", } def _load_vehicle_database(self, db_path: Optional[str] = None) -> Dict[int, str]: """Load vehicle ID database.""" if db_path and os.path.exists(db_path): try: with open(db_path, 'r', encoding='utf-8') as f: data = json.load(f) return {int(k): v for k, v in data.get('vehicles', {}).items()} except Exception: pass return { 1: "Batmobile", 2: "Batwing", 3: "Bat Blaster", 4: "DeLorean Time Machine", 5: "Hoverboard", 6: "Travelling Time Train", 7: "TARDIS", 8: "K9", 9: "Dalek", 10: "Companion Cube", 11: "Sentry Turret", 12: "GLaDOS", 13: "Mystery Machine", 14: "Scooby Snack", 15: "Gyrocopter", 16: "Ghost Trap", 17: "Ecto-1", 18: "PKE Meter", 19: "Emmet's Excavator", 20: "Emmet's Car", 21: "Duplo Brickster", 22: "Hover Bike", 23: "Cyber Guard", 24: "Cyber Wraith", 25: "Blade Bike", 26: "Flying White Dragon", 27: "Golden Dragon", 28: "Mega Flight Dragon", 29: "Shadowmeld", 30: "Swamp Skimmer", 31: "Cragger's Fireship", 32: "Eagle Interceptor", 33: "Eagle Sky Blazer", 34: "Eagle Swoop Diver", 35: "Gyrosphere", 36: "ACU Trooper Car", 37: "T. Rex", 38: "Homer's Car", 39: "Taunt-o-Vision", 40: "The Simpsons House", 41: "Krusty's Bike", 42: "Clown Bike", 43: "Frink's Hover Car", 44: "Gravity Sprinter", 45: "Street Shredder", 46: "Sky Clobberer", 47: "Invisible Jet", 48: "Justice Camper", 49: "Electro Bolt", 50: "Arrow Launcher", 51: "Drill Driver", 52: "Bane Dig Dig", 53: "Ancient Psychic Tandem War Elephant", 54: "Jakemobile", 55: "Lumpy Car", 56: "Ecto-1 (2016)", 57: "Ectozer", 58: "PerfEcto", 59: "B.A.'s Van", 60: "B.A.'s Super Van", 61: "Pain Plane", 62: "Sonic Speedster", 63: "Blue Typhoon", 64: "Moto Bug", 65: "IMF Scrambler", 66: "IMF Sport Car", 67: "IMF Tank", 68: "K.I.T.T.", 69: "Gozer Trap", 70: "Terror Dog", 71: "Gremlin Car", 72: "Flash 'n' Finish", 73: "Rampage Wrecking", 74: "Phone Home", 75: "Mobile Uplink", 76: "Super Charged Satellite", 77: "Enchanted Car", 78: "Harry's Broom", 79: "Newt's Case", 80: "Swooping Evil", 81: "Occamy", 82: "Niffler", } def _is_timeout_error(self, error: usb.core.USBError) -> bool: """Check if error is a timeout (normal during polling).""" error_str = str(error).lower() # Windows timeout codes vary: -116, -7, 110, or string return ( error.errno in (-116, -7, 110, None) or 'timeout' in error_str or 'operation timed out' in error_str or 'timed out' in error_str ) def _is_pipe_error(self, error: usb.core.USBError) -> bool: """Check if error indicates device disconnection.""" return error.errno == -9 or 'pipe' in str(error).lower() def connect(self) -> bool: """ Establish connection to LEGO Dimensions portal. Returns True if connection successful. """ try: # Get platform-appropriate backend backend = get_usb_backend() self.dev = usb.core.find( idVendor=VENDOR_ID, idProduct=PRODUCT_ID, backend=backend ) if self.dev is None: error_msg = ( "LEGO Dimensions Portal not found.\n" "Ensure:\n" " 1. Portal is connected via USB\n" " 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n" " 3. WinUSB driver installed via Zadig" ) if platform.system() == 'Windows': error_msg += ( "\n 4. libusb installed: pip install libusb-package\n" " 5. Or libusb-1.0.dll in System32" ) raise ConnectionError(error_msg) # Detach kernel driver if active (Linux only) try: if self.dev.is_kernel_driver_active(0): self.dev.detach_kernel_driver(0) except (AttributeError, usb.core.USBError, NotImplementedError): pass # Not available on Windows # Configure device try: self.dev.set_configuration() except usb.core.USBError as e: # Ignore "Entity not found" - device already configured if 'entity not found' not in str(e).lower(): raise # Get configuration and interface cfg = self.dev.get_active_configuration() intf = cfg[(0, 0)] self._interface = intf.bInterfaceNumber # Claim interface (required on Windows) try: usb.util.claim_interface(self.dev, self._interface) except usb.core.USBError as e: if 'resource busy' not in str(e).lower(): raise # Find endpoints dynamically self._endpoint_out = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT ) self._endpoint_in = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN ) # Fallback to hardcoded endpoints if discovery fails if self._endpoint_out is None: self._endpoint_out = 0x01 if self._endpoint_in is None: self._endpoint_in = 0x81 # Send initialization command self._write_raw(TOYPAD_INIT) # Get device info try: product = usb.util.get_string(self.dev, self.dev.iProduct) print(f"Connected to: {product}") except Exception: print("Connected to LEGO Dimensions Portal") if self.on_connect: self.on_connect() return True except usb.core.USBError as e: error_msg = str(e) if "access denied" in error_msg.lower() or "insufficient permissions" in error_msg.lower(): print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.") elif "no backend" in error_msg.lower(): print("ERROR: No USB backend found. Install libusb-package: pip install libusb-package") if self.on_error: self.on_error(e) return False except Exception as e: if self.on_error: self.on_error(e) return False def disconnect(self): """Disconnect from portal and clean up resources.""" self.stop() if self.dev: try: # Turn off all LEDs self.set_pad_color(Pad.ALL, COLORS['OFF']) # Release interface (Windows) if self._interface is not None: try: usb.util.release_interface(self.dev, self._interface) except Exception: pass usb.util.dispose_resources(self.dev) except Exception: pass self.dev = None self._endpoint_in = None self._endpoint_out = None self._interface = None if self.on_disconnect: self.on_disconnect() def _calculate_checksum(self, command: list) -> int: """Calculate checksum (sum of bytes mod 256)""" return sum(command) % 256 def _write_raw(self, data: list): """Write raw data to device.""" if not self.dev: return try: endpoint = self._endpoint_out if hasattr(endpoint, 'bEndpointAddress'): endpoint = endpoint.bEndpointAddress if endpoint is None: endpoint = 0x01 with self._lock: self.dev.write(endpoint, data) except usb.core.USBError: pass def _read_raw(self, size: int = 32, timeout: int = 100) -> Optional[list]: """Read raw data from device.""" if not self.dev: return None try: endpoint = self._endpoint_in if hasattr(endpoint, 'bEndpointAddress'): endpoint = endpoint.bEndpointAddress if endpoint is None: endpoint = 0x81 return list(self.dev.read(endpoint, size, timeout=timeout)) except usb.core.USBError: return None def _send_command(self, command: list): """Send a command to the portal with checksum and padding.""" if not self.dev: return checksum = self._calculate_checksum(command) message = command + [checksum] # Pad to 32 bytes while len(message) < 32: message.append(0x00) self._write_raw(message) def set_pad_color(self, pad: Pad, color: list): """ Set a pad to a specific color immediately. Args: pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT) color: [R, G, B] values 0-255 """ command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]] self._send_command(command) def stop_flash(self, pad: Pad): """ Stop any active flash animation on a pad. Args: pad: Which pad to stop flashing (Pad.ALL stops all pads) """ if pad == Pad.ALL: pads_to_stop = [Pad.CENTER, Pad.LEFT, Pad.RIGHT] else: pads_to_stop = [pad] for p in pads_to_stop: if p.value in self._flash_stop_events: self._flash_stop_events[p.value].set() def flash_pad(self, pad: Pad, color: list, on_time: int = 10, off_time: int = 10, count: int = 5): """ Flash a pad with a color using software-based blinking. Note: Hardware flash command (0xC8) is unreliable on some portals, so this uses rapid solid color toggling instead. Args: pad: Which pad to flash color: [R, G, B] values on_time: Ticks for LED on state (1 tick ~ 50ms) off_time: Ticks for LED off state count: Number of flashes (255 = infinite, capped at 50) """ # Convert ticks to seconds (1 tick ≈ 50ms) on_secs = on_time * 0.05 off_secs = off_time * 0.05 # Cap infinite flashes to reasonable number for software implementation actual_count = min(count, 50) if count == 255 else count # Determine which pads to flash if pad == Pad.ALL: pads_to_flash = [Pad.CENTER, Pad.LEFT, Pad.RIGHT] else: pads_to_flash = [pad] # Stop any existing flash on these pads for p in pads_to_flash: if p.value in self._flash_stop_events: self._flash_stop_events[p.value].set() # Create new stop events for this flash stop_events = {} for p in pads_to_flash: stop_events[p.value] = threading.Event() self._flash_stop_events[p.value] = stop_events[p.value] def _flash_thread(): for _ in range(actual_count): # Check if we should stop (device disconnected or stop requested) if not self._running and not self.dev: break # Check if stop was requested for any of our pads if any(stop_events[p.value].is_set() for p in pads_to_flash): break # ON for p in pads_to_flash: if not stop_events[p.value].is_set(): self.set_pad_color(p, color) time.sleep(on_secs) # Check again before OFF if any(stop_events[p.value].is_set() for p in pads_to_flash): break # OFF for p in pads_to_flash: if not stop_events[p.value].is_set(): self.set_pad_color(p, COLORS['OFF']) time.sleep(off_secs) # Cleanup stop events when done for p in pads_to_flash: if p.value in self._flash_stop_events and self._flash_stop_events[p.value] == stop_events[p.value]: del self._flash_stop_events[p.value] # Run flash in background thread so it doesn't block flash_thread = threading.Thread(target=_flash_thread, daemon=True) flash_thread.start() def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1): """ Fade a pad to a color. Args: pad: Which pad to fade color: [R, G, B] target color speed: Fade speed (higher = slower) count: Number of fade pulses (255 = infinite) """ command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count, color[0], color[1], color[2]] self._send_command(command) def rainbow_cycle(self, pad: Pad = Pad.ALL, speed: int = 5): """ Start a rainbow color cycle on a pad. Args: pad: Which pad to animate speed: Cycle speed (1-20, lower = faster) """ # Cycle through colors using fade colors_cycle = [ COLORS['RED'], COLORS['ORANGE'], COLORS['YELLOW'], COLORS['GREEN'], COLORS['CYAN'], COLORS['BLUE'], COLORS['PURPLE'], COLORS['MAGENTA'] ] for color in colors_cycle: self.fade_pad(pad, color, speed=speed, count=1) time.sleep(speed * 0.05) def _poll_events(self): """Background thread function for polling tag events.""" while self._running: try: # Read from IN endpoint with timeout endpoint = self._endpoint_in if hasattr(endpoint, 'bEndpointAddress'): endpoint = endpoint.bEndpointAddress if endpoint is None: endpoint = 0x81 in_packet = self.dev.read(endpoint, 32, timeout=100) bytelist = list(in_packet) if not bytelist: continue # NFC tag events start with 0x56 if bytelist[0] != 0x56: continue # Parse tag event pad_num = bytelist[2] action = bytelist[5] uid = bytes(bytelist[6:13]) # Validate UID (not all zeros) if uid == b'\x00' * 7: continue try: pad = Pad(pad_num) except ValueError: pad = Pad.CENTER event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED tag_info = TagInfo(uid=uid, pad=pad, event=event) # Track active tags and fire callbacks if event == TagEvent.INSERTED: self._active_tags[pad_num] = tag_info # Visual feedback - green flash for detected tag self.flash_pad(pad, COLORS['GREEN'], on_time=5, off_time=5, count=2) time.sleep(0.1) self.set_pad_color(pad, COLORS['CYAN']) if self.on_tag_insert: self.on_tag_insert(tag_info) else: if pad_num in self._active_tags: del self._active_tags[pad_num] # Stop any active flash and turn off pad LED self.stop_flash(pad) self.set_pad_color(pad, COLORS['OFF']) if self.on_tag_remove: self.on_tag_remove(tag_info) except usb.core.USBError as e: if self._is_timeout_error(e): continue # Timeout is normal elif self._is_pipe_error(e): if self._running and self.on_error: self.on_error(ConnectionError("Portal disconnected")) break elif self._running: if self.on_error: self.on_error(e) time.sleep(0.5) except Exception as e: if self._running and self.on_error: self.on_error(e) time.sleep(0.1) def start(self): """Start the event polling thread.""" if self._running: return if not self.dev: if not self.connect(): raise ConnectionError("Failed to connect to portal") self._running = True self._thread = threading.Thread(target=self._poll_events, daemon=True) self._thread.start() # Initial LED animation to show portal is active for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]: self.flash_pad(pad, COLORS['BLUE'], on_time=3, off_time=3, count=2) time.sleep(0.1) def stop(self): """Stop the event polling thread.""" self._running = False if self._thread: self._thread.join(timeout=2.0) self._thread = None def get_active_tags(self) -> Dict[int, TagInfo]: """Get dictionary of currently active tags on pads.""" return self._active_tags.copy() def get_character_name(self, character_id: int) -> Optional[str]: """Look up character name from character ID.""" return self._character_db.get(character_id) def get_vehicle_name(self, vehicle_id: int) -> Optional[str]: """Look up vehicle name from vehicle ID.""" return self._vehicle_db.get(vehicle_id) @property def is_connected(self) -> bool: """Check if portal is connected.""" return self.dev is not None @property def is_running(self) -> bool: """Check if event polling is active.""" return self._running # ============================================================================= # TEA Encryption/Decryption for Character ID # Based on community reverse-engineering (ags131, bettse, socram8888) # ============================================================================= def tea_decrypt(data: bytes, key: bytes) -> bytes: """ TEA (Tiny Encryption Algorithm) decryption. Used to decrypt character IDs from tag pages 0x24-0x25. """ def _u32(x): return c_uint32(x).value delta = 0x9e3779b9 # Convert to 32-bit words v0 = int.from_bytes(data[0:4], 'little') v1 = int.from_bytes(data[4:8], 'little') k0 = int.from_bytes(key[0:4], 'little') k1 = int.from_bytes(key[4:8], 'little') k2 = int.from_bytes(key[8:12], 'little') k3 = int.from_bytes(key[12:16], 'little') total = _u32(delta * 32) for _ in range(32): v1 = _u32(v1 - _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3))) v0 = _u32(v0 - _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1))) total = _u32(total - delta) return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little') def tea_encrypt(data: bytes, key: bytes) -> bytes: """ TEA (Tiny Encryption Algorithm) encryption. Used to encrypt character IDs for writing to tags. """ def _u32(x): return c_uint32(x).value delta = 0x9e3779b9 v0 = int.from_bytes(data[0:4], 'little') v1 = int.from_bytes(data[4:8], 'little') k0 = int.from_bytes(key[0:4], 'little') k1 = int.from_bytes(key[4:8], 'little') k2 = int.from_bytes(key[8:12], 'little') k3 = int.from_bytes(key[12:16], 'little') total = 0 for _ in range(32): total = _u32(total + delta) v0 = _u32(v0 + _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1))) v1 = _u32(v1 + _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3))) return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little') def generate_tea_key(uid: bytes) -> bytes: """ Generate TEA encryption key from tag UID. The key is derived by scrambling the 7-byte UID into 16 bytes. """ key = bytearray(16) key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[2] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA key[3] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[4] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA key[5] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[6] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA key[7] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[8] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA key[9] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[10] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA key[11] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 key[12] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA key[13] = (uid[2] ^ uid[4] ^ 0x55) | 0x55 key[14] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA key[15] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 return bytes(key) def generate_password(uid: bytes) -> bytes: """ Generate NTAG213 password from 7-byte UID. This password protects pages 0x24+ on the tag. """ pwd = bytearray(4) pwd[0] = 0xAA ^ uid[1] ^ uid[3] pwd[1] = 0x55 ^ uid[2] ^ uid[4] pwd[2] = 0xAA ^ uid[3] ^ uid[5] pwd[3] = 0x55 ^ uid[4] ^ uid[6] return bytes(pwd) def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int: """ Decrypt character ID from tag memory pages 0x24-0x25. Args: uid: 7-byte tag UID encrypted_data: 8 bytes from pages 0x24-0x25 Returns: Decrypted character ID integer """ key = generate_tea_key(uid) decrypted = tea_decrypt(encrypted_data, key) return int.from_bytes(decrypted[0:2], 'little') def encrypt_character_id(uid: bytes, character_id: int) -> bytes: """ Encrypt character ID for writing to tag pages 0x24-0x25. Args: uid: 7-byte tag UID character_id: Character ID to encrypt Returns: 8 bytes of encrypted data """ key = generate_tea_key(uid) # Character ID in first 2 bytes, rest is padding data = character_id.to_bytes(2, 'little') + b'\x00' * 6 return tea_encrypt(data, key) # ============================================================================= # Standalone test # ============================================================================= if __name__ == "__main__": def on_insert(tag: TagInfo): print(f"\n{'='*50}") print(f"[+] TAG PLACED on {tag.pad.name} pad") print(f" UID: {tag.uid_hex}") print(f" Password: {generate_password(tag.uid).hex().upper()}") print(f"{'='*50}") def on_remove(tag: TagInfo): print(f"\n[-] TAG REMOVED from {tag.pad.name} pad") def on_error(e: Exception): print(f"\n[!] ERROR: {e}") def on_connect(): print("[+] Portal connected and initialized") def on_disconnect(): print("[-] Portal disconnected") print(f"\nLEGO Dimensions Portal Reader v{__version__}") print(f"Platform: {platform.system()}") print("="*50) reader = LegoDimensionsReader() reader.on_tag_insert = on_insert reader.on_tag_remove = on_remove reader.on_error = on_error reader.on_connect = on_connect reader.on_disconnect = on_disconnect try: reader.start() print("\nPortal Active - Place character discs...") print("Press Ctrl+C to exit\n") while True: time.sleep(1) except KeyboardInterrupt: print("\n\nShutting down...") except ConnectionError as e: print(f"\nConnection failed: {e}") finally: reader.disconnect() print("Goodbye!")