From 727ed71f4e734741adc7dfb4a96f01c877623199 Mon Sep 17 00:00:00 2001 From: jessikitty Date: Sat, 21 Feb 2026 12:11:30 +1100 Subject: [PATCH] v1.3.0: Remove default LED blink, add software effects - Removed automatic green flash + cyan color on tag insert - Callbacks now fully control LED behavior - Added software_flash() for reliable cross-portal flashing - Added software_pulse() for breathing effect - Added apply_effect() unified interface - Effects run in background threads (non-blocking) - Tag removal stops any running effects on that pad - Added legacy_key field to TagInfo for config lookup --- lego_dimensions_reader.py | 564 +++++++++++++------------------------- 1 file changed, 195 insertions(+), 369 deletions(-) diff --git a/lego_dimensions_reader.py b/lego_dimensions_reader.py index 15e98d5..297ae94 100644 --- a/lego_dimensions_reader.py +++ b/lego_dimensions_reader.py @@ -6,6 +6,12 @@ For Moonlight Drive-In Video Player System Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions) to detect character and vehicle disc placement/removal events. +v1.3.0 Changes: +- Removed default LED blink on tag insert (callbacks now control LEDs) +- Added software_flash() method for reliable flashing via solid color loop +- Added software_pulse() method for pulsing effect +- Effects now run in background threads to not block event handling + Requirements: pip install pyusb libusb-package @@ -64,7 +70,8 @@ import usb.backend.libusb1 import threading import time import json -from typing import Callable, Optional, Dict, Any +import math +from typing import Callable, Optional, Dict, Any, List from dataclasses import dataclass from enum import Enum from ctypes import c_uint32 @@ -74,7 +81,7 @@ VENDOR_ID = 0x0e6f PRODUCT_ID = 0x0241 # Module version -__version__ = "1.2.2" +__version__ = "1.3.0" def get_usb_backend(): @@ -141,12 +148,16 @@ class TagInfo: pad: Pad event: TagEvent uid_hex: str = "" + legacy_key: int = 0 # Integer key for tag_colors.json lookup character_id: Optional[int] = None character_name: Optional[str] = None is_vehicle: bool = False def __post_init__(self): self.uid_hex = self.uid.hex().upper() + # Calculate legacy key (32-bit int from first 4 bytes, big-endian) + if len(self.uid) >= 4: + self.legacy_key = int.from_bytes(self.uid[:4], 'big') class LegoDimensionsReader: @@ -156,6 +167,9 @@ class LegoDimensionsReader: Provides event-driven tag detection with callbacks for integration with the Moonlight Drive-In video player system. + v1.3.0: No default LED behavior on tag insert/remove. + Callbacks are fully responsible for LED control. + Usage: reader = LegoDimensionsReader() reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.uid_hex}") @@ -183,8 +197,9 @@ class LegoDimensionsReader: # Active tags currently on pads (pad_num -> TagInfo) self._active_tags: Dict[int, TagInfo] = {} - # Flash stop events per pad (for cancelling software flash) - self._flash_stop_events: Dict[int, threading.Event] = {} + # Effect control - track running effects per pad + self._effect_threads: Dict[int, threading.Thread] = {} + self._effect_stop_flags: Dict[int, threading.Event] = {} # Callbacks self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None @@ -199,7 +214,6 @@ class LegoDimensionsReader: def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]: """Load character ID database.""" - # Try to load from JSON file if provided if db_path and os.path.exists(db_path): try: with open(db_path, 'r', encoding='utf-8') as f: @@ -208,89 +222,29 @@ class LegoDimensionsReader: except Exception: pass - # Default built-in database return { - 1: "Batman", - 2: "Gandalf", - 3: "Wyldstyle", - 4: "Aquaman", - 5: "Bad Cop", - 6: "Bane", - 7: "Bart Simpson", - 8: "Benny", - 9: "Chell", - 10: "Cole", - 11: "Cragger", - 12: "Cyborg", - 13: "Cyberman", - 14: "Doc Brown", - 15: "The Doctor", - 16: "Emmet", - 17: "Eris", - 18: "Gimli", - 19: "Gollum", - 20: "Harley Quinn", - 21: "Homer Simpson", - 22: "Jay", - 23: "Joker", - 24: "Kai", - 25: "ACU Trooper", - 26: "Krusty the Clown", - 27: "Laval", - 28: "Legolas", - 29: "Lloyd", - 30: "Marty McFly", - 31: "Nya", - 32: "Owen Grady", - 33: "Peter Venkman", - 34: "Scooby-Doo", - 35: "Sensei Wu", - 36: "Shaggy", - 37: "Slimer", - 38: "Superman", - 39: "Unikitty", - 40: "Wicked Witch", - 41: "Wonder Woman", - 42: "Zane", - 43: "Green Arrow", - 44: "Supergirl", - 45: "Abby Yates", - 46: "Finn the Human", - 47: "Harry Potter", - 48: "Ethan Hunt", - 49: "Lumpy Space Princess", - 50: "Jake the Dog", - 51: "B.A. Baracus", - 52: "Michael Knight", - 53: "Gizmo", - 54: "Sonic the Hedgehog", - 55: "Gamer Kid", - 56: "E.T.", - 57: "Lord Voldemort", - 58: "Hermione Granger", - 59: "Newt Scamander", - 60: "Tina Goldstein", - 61: "Stripe", - 62: "Sloth", - 63: "Beetlejuice", - 64: "Kevin", - 65: "Erin Gilbert", - 66: "Patty Tolan", - 67: "Jillian Holtzmann", - 68: "Stay Puft", - 69: "Chase McCain", - 70: "Excalibur Batman", - 71: "Raven", - 72: "Beast Boy", - 73: "Starfire", - 74: "Blossom", - 75: "Bubbles", - 76: "Buttercup", - 77: "Marceline", - 78: "Batgirl", - 79: "Robin", - 80: "Knight Rider (KITT)", - 81: "Gremlins Gizmo", + 1: "Batman", 2: "Gandalf", 3: "Wyldstyle", 4: "Aquaman", 5: "Bad Cop", + 6: "Bane", 7: "Bart Simpson", 8: "Benny", 9: "Chell", 10: "Cole", + 11: "Cragger", 12: "Cyborg", 13: "Cyberman", 14: "Doc Brown", + 15: "The Doctor", 16: "Emmet", 17: "Eris", 18: "Gimli", 19: "Gollum", + 20: "Harley Quinn", 21: "Homer Simpson", 22: "Jay", 23: "Joker", + 24: "Kai", 25: "ACU Trooper", 26: "Krusty the Clown", 27: "Laval", + 28: "Legolas", 29: "Lloyd", 30: "Marty McFly", 31: "Nya", + 32: "Owen Grady", 33: "Peter Venkman", 34: "Scooby-Doo", + 35: "Sensei Wu", 36: "Shaggy", 37: "Slimer", 38: "Superman", + 39: "Unikitty", 40: "Wicked Witch", 41: "Wonder Woman", 42: "Zane", + 43: "Green Arrow", 44: "Supergirl", 45: "Abby Yates", + 46: "Finn the Human", 47: "Harry Potter", 48: "Ethan Hunt", + 49: "Lumpy Space Princess", 50: "Jake the Dog", 51: "B.A. Baracus", + 52: "Michael Knight", 53: "Gizmo", 54: "Sonic the Hedgehog", + 55: "Gamer Kid", 56: "E.T.", 57: "Lord Voldemort", + 58: "Hermione Granger", 59: "Newt Scamander", 60: "Tina Goldstein", + 61: "Stripe", 62: "Sloth", 63: "Beetlejuice", 64: "Kevin", + 65: "Erin Gilbert", 66: "Patty Tolan", 67: "Jillian Holtzmann", + 68: "Stay Puft", 69: "Chase McCain", 70: "Excalibur Batman", + 71: "Raven", 72: "Beast Boy", 73: "Starfire", 74: "Blossom", + 75: "Bubbles", 76: "Buttercup", 77: "Marceline", 78: "Batgirl", + 79: "Robin", 80: "Knight Rider (KITT)", 81: "Gremlins Gizmo", 82: "Teen Titans Go!", } @@ -305,94 +259,18 @@ class LegoDimensionsReader: pass return { - 1: "Batmobile", - 2: "Batwing", - 3: "Bat Blaster", - 4: "DeLorean Time Machine", - 5: "Hoverboard", - 6: "Travelling Time Train", - 7: "TARDIS", - 8: "K9", - 9: "Dalek", - 10: "Companion Cube", - 11: "Sentry Turret", - 12: "GLaDOS", - 13: "Mystery Machine", - 14: "Scooby Snack", - 15: "Gyrocopter", - 16: "Ghost Trap", - 17: "Ecto-1", - 18: "PKE Meter", - 19: "Emmet's Excavator", + 1: "Batmobile", 2: "Batwing", 3: "Bat Blaster", + 4: "DeLorean Time Machine", 5: "Hoverboard", 6: "Travelling Time Train", + 7: "TARDIS", 8: "K9", 9: "Dalek", 10: "Companion Cube", + 11: "Sentry Turret", 12: "GLaDOS", 13: "Mystery Machine", + 14: "Scooby Snack", 15: "Gyrocopter", 16: "Ghost Trap", + 17: "Ecto-1", 18: "PKE Meter", 19: "Emmet's Excavator", 20: "Emmet's Car", - 21: "Duplo Brickster", - 22: "Hover Bike", - 23: "Cyber Guard", - 24: "Cyber Wraith", - 25: "Blade Bike", - 26: "Flying White Dragon", - 27: "Golden Dragon", - 28: "Mega Flight Dragon", - 29: "Shadowmeld", - 30: "Swamp Skimmer", - 31: "Cragger's Fireship", - 32: "Eagle Interceptor", - 33: "Eagle Sky Blazer", - 34: "Eagle Swoop Diver", - 35: "Gyrosphere", - 36: "ACU Trooper Car", - 37: "T. Rex", - 38: "Homer's Car", - 39: "Taunt-o-Vision", - 40: "The Simpsons House", - 41: "Krusty's Bike", - 42: "Clown Bike", - 43: "Frink's Hover Car", - 44: "Gravity Sprinter", - 45: "Street Shredder", - 46: "Sky Clobberer", - 47: "Invisible Jet", - 48: "Justice Camper", - 49: "Electro Bolt", - 50: "Arrow Launcher", - 51: "Drill Driver", - 52: "Bane Dig Dig", - 53: "Ancient Psychic Tandem War Elephant", - 54: "Jakemobile", - 55: "Lumpy Car", - 56: "Ecto-1 (2016)", - 57: "Ectozer", - 58: "PerfEcto", - 59: "B.A.'s Van", - 60: "B.A.'s Super Van", - 61: "Pain Plane", - 62: "Sonic Speedster", - 63: "Blue Typhoon", - 64: "Moto Bug", - 65: "IMF Scrambler", - 66: "IMF Sport Car", - 67: "IMF Tank", - 68: "K.I.T.T.", - 69: "Gozer Trap", - 70: "Terror Dog", - 71: "Gremlin Car", - 72: "Flash 'n' Finish", - 73: "Rampage Wrecking", - 74: "Phone Home", - 75: "Mobile Uplink", - 76: "Super Charged Satellite", - 77: "Enchanted Car", - 78: "Harry's Broom", - 79: "Newt's Case", - 80: "Swooping Evil", - 81: "Occamy", - 82: "Niffler", } def _is_timeout_error(self, error: usb.core.USBError) -> bool: """Check if error is a timeout (normal during polling).""" error_str = str(error).lower() - # Windows timeout codes vary: -116, -7, 110, or string return ( error.errno in (-116, -7, 110, None) or 'timeout' in error_str or @@ -410,7 +288,6 @@ class LegoDimensionsReader: Returns True if connection successful. """ try: - # Get platform-appropriate backend backend = get_usb_backend() self.dev = usb.core.find( @@ -434,34 +311,28 @@ class LegoDimensionsReader: ) raise ConnectionError(error_msg) - # Detach kernel driver if active (Linux only) try: if self.dev.is_kernel_driver_active(0): self.dev.detach_kernel_driver(0) except (AttributeError, usb.core.USBError, NotImplementedError): - pass # Not available on Windows + pass - # Configure device try: self.dev.set_configuration() except usb.core.USBError as e: - # Ignore "Entity not found" - device already configured if 'entity not found' not in str(e).lower(): raise - # Get configuration and interface cfg = self.dev.get_active_configuration() intf = cfg[(0, 0)] self._interface = intf.bInterfaceNumber - # Claim interface (required on Windows) try: usb.util.claim_interface(self.dev, self._interface) except usb.core.USBError as e: if 'resource busy' not in str(e).lower(): raise - # Find endpoints dynamically self._endpoint_out = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT @@ -471,16 +342,13 @@ class LegoDimensionsReader: custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN ) - # Fallback to hardcoded endpoints if discovery fails if self._endpoint_out is None: self._endpoint_out = 0x01 if self._endpoint_in is None: self._endpoint_in = 0x81 - # Send initialization command self._write_raw(TOYPAD_INIT) - # Get device info try: product = usb.util.get_string(self.dev, self.dev.iProduct) print(f"Connected to: {product}") @@ -509,12 +377,12 @@ class LegoDimensionsReader: def disconnect(self): """Disconnect from portal and clean up resources.""" self.stop() + self.stop_all_effects() + if self.dev: try: - # Turn off all LEDs self.set_pad_color(Pad.ALL, COLORS['OFF']) - # Release interface (Windows) if self._interface is not None: try: usb.util.release_interface(self.dev, self._interface) @@ -552,22 +420,6 @@ class LegoDimensionsReader: except usb.core.USBError: pass - def _read_raw(self, size: int = 32, timeout: int = 100) -> Optional[list]: - """Read raw data from device.""" - if not self.dev: - return None - - try: - endpoint = self._endpoint_in - if hasattr(endpoint, 'bEndpointAddress'): - endpoint = endpoint.bEndpointAddress - if endpoint is None: - endpoint = 0x81 - - return list(self.dev.read(endpoint, size, timeout=timeout)) - except usb.core.USBError: - return None - def _send_command(self, command: list): """Send a command to the portal with checksum and padding.""" if not self.dev: @@ -576,13 +428,12 @@ class LegoDimensionsReader: checksum = self._calculate_checksum(command) message = command + [checksum] - # Pad to 32 bytes while len(message) < 32: message.append(0x00) self._write_raw(message) - def set_pad_color(self, pad: Pad, color: list): + def set_pad_color(self, pad: Pad, color: List[int]): """ Set a pad to a specific color immediately. @@ -590,132 +441,164 @@ class LegoDimensionsReader: pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT) color: [R, G, B] values 0-255 """ + self.stop_effect(pad) command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]] self._send_command(command) - def stop_flash(self, pad: Pad): - """ - Stop any active flash animation on a pad. - - Args: - pad: Which pad to stop flashing (Pad.ALL stops all pads) - """ - if pad == Pad.ALL: - pads_to_stop = [Pad.CENTER, Pad.LEFT, Pad.RIGHT] - else: - pads_to_stop = [pad] - - for p in pads_to_stop: - if p.value in self._flash_stop_events: - self._flash_stop_events[p.value].set() - - def flash_pad(self, pad: Pad, color: list, on_time: int = 10, + def flash_pad(self, pad: Pad, color: List[int], on_time: int = 10, off_time: int = 10, count: int = 5): """ - Flash a pad with a color using software-based blinking. - - Note: Hardware flash command (0xC8) is unreliable on some portals, - so this uses rapid solid color toggling instead. - - Args: - pad: Which pad to flash - color: [R, G, B] values - on_time: Ticks for LED on state (1 tick ~ 50ms) - off_time: Ticks for LED off state - count: Number of flashes (255 = infinite, capped at 50) + Flash a pad (hardware command - may not work on all portals). + Use software_flash() for reliable cross-portal flashing. """ - # Convert ticks to seconds (1 tick ≈ 50ms) - on_secs = on_time * 0.05 - off_secs = off_time * 0.05 - - # Cap infinite flashes to reasonable number for software implementation - actual_count = min(count, 50) if count == 255 else count - - # Determine which pads to flash - if pad == Pad.ALL: - pads_to_flash = [Pad.CENTER, Pad.LEFT, Pad.RIGHT] - else: - pads_to_flash = [pad] - - # Stop any existing flash on these pads - for p in pads_to_flash: - if p.value in self._flash_stop_events: - self._flash_stop_events[p.value].set() - - # Create new stop events for this flash - stop_events = {} - for p in pads_to_flash: - stop_events[p.value] = threading.Event() - self._flash_stop_events[p.value] = stop_events[p.value] - - def _flash_thread(): - for _ in range(actual_count): - # Check if we should stop (device disconnected or stop requested) - if not self._running and not self.dev: - break - # Check if stop was requested for any of our pads - if any(stop_events[p.value].is_set() for p in pads_to_flash): - break - # ON - for p in pads_to_flash: - if not stop_events[p.value].is_set(): - self.set_pad_color(p, color) - time.sleep(on_secs) - # Check again before OFF - if any(stop_events[p.value].is_set() for p in pads_to_flash): - break - # OFF - for p in pads_to_flash: - if not stop_events[p.value].is_set(): - self.set_pad_color(p, COLORS['OFF']) - time.sleep(off_secs) - - # Cleanup stop events when done - for p in pads_to_flash: - if p.value in self._flash_stop_events and self._flash_stop_events[p.value] == stop_events[p.value]: - del self._flash_stop_events[p.value] - - # Run flash in background thread so it doesn't block - flash_thread = threading.Thread(target=_flash_thread, daemon=True) - flash_thread.start() + command = [ + 0x55, 0x0e, 0xc8, 0x06, pad.value, + color[0], color[1], color[2], + on_time, off_time, count, + 0, 0, 0, 0, 0 + ] + self._send_command(command) - def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1): + def fade_pad(self, pad: Pad, color: List[int], speed: int = 10, count: int = 1): """ - Fade a pad to a color. - - Args: - pad: Which pad to fade - color: [R, G, B] target color - speed: Fade speed (higher = slower) - count: Number of fade pulses (255 = infinite) + Fade a pad (hardware command - may not work on all portals). + Use software_pulse() for reliable cross-portal pulsing. """ command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count, color[0], color[1], color[2]] self._send_command(command) - def rainbow_cycle(self, pad: Pad = Pad.ALL, speed: int = 5): + # ========================================================================= + # SOFTWARE-BASED EFFECTS (Reliable cross-portal) + # ========================================================================= + + def stop_effect(self, pad: Pad): + """Stop any running software effect on a specific pad.""" + pad_val = pad.value + if pad_val in self._effect_stop_flags: + self._effect_stop_flags[pad_val].set() + if pad_val in self._effect_threads: + thread = self._effect_threads[pad_val] + if thread.is_alive(): + thread.join(timeout=0.5) + del self._effect_threads[pad_val] + del self._effect_stop_flags[pad_val] + + def stop_all_effects(self): + """Stop all running software effects.""" + for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]: + self.stop_effect(pad) + + def _set_color_raw(self, pad_value: int, r: int, g: int, b: int): + """Set color without stopping effects (internal use).""" + command = [0x55, 0x06, 0xc0, 0x02, pad_value, r, g, b] + self._send_command(command) + + def software_flash(self, pad: Pad, color: List[int], on_time: float = 0.3, + off_time: float = 0.3, count: int = 0): """ - Start a rainbow color cycle on a pad. + Flash a pad using software-controlled solid colors. + This is reliable across all portal versions. Args: - pad: Which pad to animate - speed: Cycle speed (1-20, lower = faster) + pad: Which pad to flash + color: [R, G, B] values + on_time: Seconds LED stays on + off_time: Seconds LED stays off + count: Number of flashes (0 = infinite until stopped) """ - # Cycle through colors using fade - colors_cycle = [ - COLORS['RED'], COLORS['ORANGE'], COLORS['YELLOW'], - COLORS['GREEN'], COLORS['CYAN'], COLORS['BLUE'], - COLORS['PURPLE'], COLORS['MAGENTA'] - ] - for color in colors_cycle: - self.fade_pad(pad, color, speed=speed, count=1) - time.sleep(speed * 0.05) + self.stop_effect(pad) + + stop_flag = threading.Event() + self._effect_stop_flags[pad.value] = stop_flag + + def _flash_loop(): + flash_count = 0 + while not stop_flag.is_set(): + self._set_color_raw(pad.value, color[0], color[1], color[2]) + if stop_flag.wait(on_time): + break + self._set_color_raw(pad.value, 0, 0, 0) + if stop_flag.wait(off_time): + break + + flash_count += 1 + if count > 0 and flash_count >= count: + self._set_color_raw(pad.value, color[0], color[1], color[2]) + break + + thread = threading.Thread(target=_flash_loop, daemon=True) + self._effect_threads[pad.value] = thread + thread.start() + + def software_pulse(self, pad: Pad, color: List[int], speed: float = 1.0, + min_brightness: float = 0.1, count: int = 0): + """ + Pulse/breathe effect using software-controlled brightness. + + Args: + pad: Which pad to pulse + color: [R, G, B] base color + speed: Seconds for one full pulse cycle + min_brightness: Minimum brightness (0.0-1.0) + count: Number of pulses (0 = infinite until stopped) + """ + self.stop_effect(pad) + + stop_flag = threading.Event() + self._effect_stop_flags[pad.value] = stop_flag + + def _pulse_loop(): + pulse_count = 0 + start_time = time.time() + step_time = speed / 30 + + while not stop_flag.is_set(): + elapsed = time.time() - start_time + phase = (elapsed / speed) * 2 * math.pi + brightness = (math.sin(phase - math.pi/2) + 1) / 2 + brightness = min_brightness + brightness * (1 - min_brightness) + + r = int(color[0] * brightness) + g = int(color[1] * brightness) + b = int(color[2] * brightness) + + self._set_color_raw(pad.value, r, g, b) + + if elapsed >= speed * (pulse_count + 1): + pulse_count += 1 + if count > 0 and pulse_count >= count: + self._set_color_raw(pad.value, color[0], color[1], color[2]) + break + + if stop_flag.wait(step_time): + break + + thread = threading.Thread(target=_pulse_loop, daemon=True) + self._effect_threads[pad.value] = thread + thread.start() + + def apply_effect(self, pad: Pad, color: List[int], effect: str = "solid"): + """ + Apply a named effect to a pad. + + Args: + pad: Which pad + color: [R, G, B] values + effect: "solid", "flash", or "pulse" + """ + effect = effect.lower() + if effect == "flash": + self.software_flash(pad, color) + elif effect == "pulse": + self.software_pulse(pad, color) + else: + self.set_pad_color(pad, color) def _poll_events(self): """Background thread function for polling tag events.""" while self._running: try: - # Read from IN endpoint with timeout endpoint = self._endpoint_in if hasattr(endpoint, 'bEndpointAddress'): endpoint = endpoint.bEndpointAddress @@ -728,16 +611,13 @@ class LegoDimensionsReader: if not bytelist: continue - # NFC tag events start with 0x56 if bytelist[0] != 0x56: continue - # Parse tag event pad_num = bytelist[2] action = bytelist[5] uid = bytes(bytelist[6:13]) - # Validate UID (not all zeros) if uid == b'\x00' * 7: continue @@ -749,31 +629,21 @@ class LegoDimensionsReader: event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED tag_info = TagInfo(uid=uid, pad=pad, event=event) - # Track active tags and fire callbacks + # v1.3.0: NO default LED behavior - callbacks control LEDs if event == TagEvent.INSERTED: self._active_tags[pad_num] = tag_info - - # Visual feedback - green flash for detected tag - self.flash_pad(pad, COLORS['GREEN'], on_time=5, off_time=5, count=2) - time.sleep(0.1) - self.set_pad_color(pad, COLORS['CYAN']) - if self.on_tag_insert: self.on_tag_insert(tag_info) else: if pad_num in self._active_tags: del self._active_tags[pad_num] - - # Stop any active flash and turn off pad LED - self.stop_flash(pad) - self.set_pad_color(pad, COLORS['OFF']) - + self.stop_effect(pad) if self.on_tag_remove: self.on_tag_remove(tag_info) except usb.core.USBError as e: if self._is_timeout_error(e): - continue # Timeout is normal + continue elif self._is_pipe_error(e): if self._running and self.on_error: self.on_error(ConnectionError("Portal disconnected")) @@ -799,11 +669,6 @@ class LegoDimensionsReader: self._running = True self._thread = threading.Thread(target=self._poll_events, daemon=True) self._thread.start() - - # Initial LED animation to show portal is active - for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]: - self.flash_pad(pad, COLORS['BLUE'], on_time=3, off_time=3, count=2) - time.sleep(0.1) def stop(self): """Stop the event polling thread.""" @@ -826,31 +691,22 @@ class LegoDimensionsReader: @property def is_connected(self) -> bool: - """Check if portal is connected.""" return self.dev is not None @property def is_running(self) -> bool: - """Check if event polling is active.""" return self._running # ============================================================================= -# TEA Encryption/Decryption for Character ID -# Based on community reverse-engineering (ags131, bettse, socram8888) +# TEA Encryption/Decryption # ============================================================================= def tea_decrypt(data: bytes, key: bytes) -> bytes: - """ - TEA (Tiny Encryption Algorithm) decryption. - Used to decrypt character IDs from tag pages 0x24-0x25. - """ def _u32(x): return c_uint32(x).value delta = 0x9e3779b9 - - # Convert to 32-bit words v0 = int.from_bytes(data[0:4], 'little') v1 = int.from_bytes(data[4:8], 'little') k0 = int.from_bytes(key[0:4], 'little') @@ -869,15 +725,10 @@ def tea_decrypt(data: bytes, key: bytes) -> bytes: def tea_encrypt(data: bytes, key: bytes) -> bytes: - """ - TEA (Tiny Encryption Algorithm) encryption. - Used to encrypt character IDs for writing to tags. - """ def _u32(x): return c_uint32(x).value delta = 0x9e3779b9 - v0 = int.from_bytes(data[0:4], 'little') v1 = int.from_bytes(data[4:8], 'little') k0 = int.from_bytes(key[0:4], 'little') @@ -896,10 +747,6 @@ def tea_encrypt(data: bytes, key: bytes) -> bytes: def generate_tea_key(uid: bytes) -> bytes: - """ - Generate TEA encryption key from tag UID. - The key is derived by scrambling the 7-byte UID into 16 bytes. - """ key = bytearray(16) key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55 @@ -921,10 +768,6 @@ def generate_tea_key(uid: bytes) -> bytes: def generate_password(uid: bytes) -> bytes: - """ - Generate NTAG213 password from 7-byte UID. - This password protects pages 0x24+ on the tag. - """ pwd = bytearray(4) pwd[0] = 0xAA ^ uid[1] ^ uid[3] pwd[1] = 0x55 ^ uid[2] ^ uid[4] @@ -934,34 +777,13 @@ def generate_password(uid: bytes) -> bytes: def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int: - """ - Decrypt character ID from tag memory pages 0x24-0x25. - - Args: - uid: 7-byte tag UID - encrypted_data: 8 bytes from pages 0x24-0x25 - - Returns: - Decrypted character ID integer - """ key = generate_tea_key(uid) decrypted = tea_decrypt(encrypted_data, key) return int.from_bytes(decrypted[0:2], 'little') def encrypt_character_id(uid: bytes, character_id: int) -> bytes: - """ - Encrypt character ID for writing to tag pages 0x24-0x25. - - Args: - uid: 7-byte tag UID - character_id: Character ID to encrypt - - Returns: - 8 bytes of encrypted data - """ key = generate_tea_key(uid) - # Character ID in first 2 bytes, rest is padding data = character_id.to_bytes(2, 'little') + b'\x00' * 6 return tea_encrypt(data, key) @@ -975,6 +797,7 @@ if __name__ == "__main__": print(f"\n{'='*50}") print(f"[+] TAG PLACED on {tag.pad.name} pad") print(f" UID: {tag.uid_hex}") + print(f" Legacy Key: {tag.legacy_key}") print(f" Password: {generate_password(tag.uid).hex().upper()}") print(f"{'='*50}") @@ -993,6 +816,9 @@ if __name__ == "__main__": print(f"\nLEGO Dimensions Portal Reader v{__version__}") print(f"Platform: {platform.system()}") print("="*50) + print("NOTE: v1.3.0 - No default LED blink on tag insert") + print(" Callbacks control all LED behavior") + print("="*50) reader = LegoDimensionsReader() reader.on_tag_insert = on_insert