v1.3.0: Remove default LED blink, add software effects

- Removed automatic green flash + cyan color on tag insert
- Callbacks now fully control LED behavior
- Added software_flash() for reliable cross-portal flashing
- Added software_pulse() for breathing effect
- Added apply_effect() unified interface
- Effects run in background threads (non-blocking)
- Tag removal stops any running effects on that pad
- Added legacy_key field to TagInfo for config lookup
This commit is contained in:
2026-02-21 12:11:30 +11:00
parent 3bcf573160
commit 727ed71f4e

View File

@@ -6,6 +6,12 @@ For Moonlight Drive-In Video Player System
Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions)
to detect character and vehicle disc placement/removal events.
v1.3.0 Changes:
- Removed default LED blink on tag insert (callbacks now control LEDs)
- Added software_flash() method for reliable flashing via solid color loop
- Added software_pulse() method for pulsing effect
- Effects now run in background threads to not block event handling
Requirements:
pip install pyusb libusb-package
@@ -64,7 +70,8 @@ import usb.backend.libusb1
import threading
import time
import json
from typing import Callable, Optional, Dict, Any
import math
from typing import Callable, Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
from ctypes import c_uint32
@@ -74,7 +81,7 @@ VENDOR_ID = 0x0e6f
PRODUCT_ID = 0x0241
# Module version
__version__ = "1.2.2"
__version__ = "1.3.0"
def get_usb_backend():
@@ -141,12 +148,16 @@ class TagInfo:
pad: Pad
event: TagEvent
uid_hex: str = ""
legacy_key: int = 0 # Integer key for tag_colors.json lookup
character_id: Optional[int] = None
character_name: Optional[str] = None
is_vehicle: bool = False
def __post_init__(self):
self.uid_hex = self.uid.hex().upper()
# Calculate legacy key (32-bit int from first 4 bytes, big-endian)
if len(self.uid) >= 4:
self.legacy_key = int.from_bytes(self.uid[:4], 'big')
class LegoDimensionsReader:
@@ -156,6 +167,9 @@ class LegoDimensionsReader:
Provides event-driven tag detection with callbacks for integration
with the Moonlight Drive-In video player system.
v1.3.0: No default LED behavior on tag insert/remove.
Callbacks are fully responsible for LED control.
Usage:
reader = LegoDimensionsReader()
reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.uid_hex}")
@@ -183,8 +197,9 @@ class LegoDimensionsReader:
# Active tags currently on pads (pad_num -> TagInfo)
self._active_tags: Dict[int, TagInfo] = {}
# Flash stop events per pad (for cancelling software flash)
self._flash_stop_events: Dict[int, threading.Event] = {}
# Effect control - track running effects per pad
self._effect_threads: Dict[int, threading.Thread] = {}
self._effect_stop_flags: Dict[int, threading.Event] = {}
# Callbacks
self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None
@@ -199,7 +214,6 @@ class LegoDimensionsReader:
def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]:
"""Load character ID database."""
# Try to load from JSON file if provided
if db_path and os.path.exists(db_path):
try:
with open(db_path, 'r', encoding='utf-8') as f:
@@ -208,89 +222,29 @@ class LegoDimensionsReader:
except Exception:
pass
# Default built-in database
return {
1: "Batman",
2: "Gandalf",
3: "Wyldstyle",
4: "Aquaman",
5: "Bad Cop",
6: "Bane",
7: "Bart Simpson",
8: "Benny",
9: "Chell",
10: "Cole",
11: "Cragger",
12: "Cyborg",
13: "Cyberman",
14: "Doc Brown",
15: "The Doctor",
16: "Emmet",
17: "Eris",
18: "Gimli",
19: "Gollum",
20: "Harley Quinn",
21: "Homer Simpson",
22: "Jay",
23: "Joker",
24: "Kai",
25: "ACU Trooper",
26: "Krusty the Clown",
27: "Laval",
28: "Legolas",
29: "Lloyd",
30: "Marty McFly",
31: "Nya",
32: "Owen Grady",
33: "Peter Venkman",
34: "Scooby-Doo",
35: "Sensei Wu",
36: "Shaggy",
37: "Slimer",
38: "Superman",
39: "Unikitty",
40: "Wicked Witch",
41: "Wonder Woman",
42: "Zane",
43: "Green Arrow",
44: "Supergirl",
45: "Abby Yates",
46: "Finn the Human",
47: "Harry Potter",
48: "Ethan Hunt",
49: "Lumpy Space Princess",
50: "Jake the Dog",
51: "B.A. Baracus",
52: "Michael Knight",
53: "Gizmo",
54: "Sonic the Hedgehog",
55: "Gamer Kid",
56: "E.T.",
57: "Lord Voldemort",
58: "Hermione Granger",
59: "Newt Scamander",
60: "Tina Goldstein",
61: "Stripe",
62: "Sloth",
63: "Beetlejuice",
64: "Kevin",
65: "Erin Gilbert",
66: "Patty Tolan",
67: "Jillian Holtzmann",
68: "Stay Puft",
69: "Chase McCain",
70: "Excalibur Batman",
71: "Raven",
72: "Beast Boy",
73: "Starfire",
74: "Blossom",
75: "Bubbles",
76: "Buttercup",
77: "Marceline",
78: "Batgirl",
79: "Robin",
80: "Knight Rider (KITT)",
81: "Gremlins Gizmo",
1: "Batman", 2: "Gandalf", 3: "Wyldstyle", 4: "Aquaman", 5: "Bad Cop",
6: "Bane", 7: "Bart Simpson", 8: "Benny", 9: "Chell", 10: "Cole",
11: "Cragger", 12: "Cyborg", 13: "Cyberman", 14: "Doc Brown",
15: "The Doctor", 16: "Emmet", 17: "Eris", 18: "Gimli", 19: "Gollum",
20: "Harley Quinn", 21: "Homer Simpson", 22: "Jay", 23: "Joker",
24: "Kai", 25: "ACU Trooper", 26: "Krusty the Clown", 27: "Laval",
28: "Legolas", 29: "Lloyd", 30: "Marty McFly", 31: "Nya",
32: "Owen Grady", 33: "Peter Venkman", 34: "Scooby-Doo",
35: "Sensei Wu", 36: "Shaggy", 37: "Slimer", 38: "Superman",
39: "Unikitty", 40: "Wicked Witch", 41: "Wonder Woman", 42: "Zane",
43: "Green Arrow", 44: "Supergirl", 45: "Abby Yates",
46: "Finn the Human", 47: "Harry Potter", 48: "Ethan Hunt",
49: "Lumpy Space Princess", 50: "Jake the Dog", 51: "B.A. Baracus",
52: "Michael Knight", 53: "Gizmo", 54: "Sonic the Hedgehog",
55: "Gamer Kid", 56: "E.T.", 57: "Lord Voldemort",
58: "Hermione Granger", 59: "Newt Scamander", 60: "Tina Goldstein",
61: "Stripe", 62: "Sloth", 63: "Beetlejuice", 64: "Kevin",
65: "Erin Gilbert", 66: "Patty Tolan", 67: "Jillian Holtzmann",
68: "Stay Puft", 69: "Chase McCain", 70: "Excalibur Batman",
71: "Raven", 72: "Beast Boy", 73: "Starfire", 74: "Blossom",
75: "Bubbles", 76: "Buttercup", 77: "Marceline", 78: "Batgirl",
79: "Robin", 80: "Knight Rider (KITT)", 81: "Gremlins Gizmo",
82: "Teen Titans Go!",
}
@@ -305,94 +259,18 @@ class LegoDimensionsReader:
pass
return {
1: "Batmobile",
2: "Batwing",
3: "Bat Blaster",
4: "DeLorean Time Machine",
5: "Hoverboard",
6: "Travelling Time Train",
7: "TARDIS",
8: "K9",
9: "Dalek",
10: "Companion Cube",
11: "Sentry Turret",
12: "GLaDOS",
13: "Mystery Machine",
14: "Scooby Snack",
15: "Gyrocopter",
16: "Ghost Trap",
17: "Ecto-1",
18: "PKE Meter",
19: "Emmet's Excavator",
1: "Batmobile", 2: "Batwing", 3: "Bat Blaster",
4: "DeLorean Time Machine", 5: "Hoverboard", 6: "Travelling Time Train",
7: "TARDIS", 8: "K9", 9: "Dalek", 10: "Companion Cube",
11: "Sentry Turret", 12: "GLaDOS", 13: "Mystery Machine",
14: "Scooby Snack", 15: "Gyrocopter", 16: "Ghost Trap",
17: "Ecto-1", 18: "PKE Meter", 19: "Emmet's Excavator",
20: "Emmet's Car",
21: "Duplo Brickster",
22: "Hover Bike",
23: "Cyber Guard",
24: "Cyber Wraith",
25: "Blade Bike",
26: "Flying White Dragon",
27: "Golden Dragon",
28: "Mega Flight Dragon",
29: "Shadowmeld",
30: "Swamp Skimmer",
31: "Cragger's Fireship",
32: "Eagle Interceptor",
33: "Eagle Sky Blazer",
34: "Eagle Swoop Diver",
35: "Gyrosphere",
36: "ACU Trooper Car",
37: "T. Rex",
38: "Homer's Car",
39: "Taunt-o-Vision",
40: "The Simpsons House",
41: "Krusty's Bike",
42: "Clown Bike",
43: "Frink's Hover Car",
44: "Gravity Sprinter",
45: "Street Shredder",
46: "Sky Clobberer",
47: "Invisible Jet",
48: "Justice Camper",
49: "Electro Bolt",
50: "Arrow Launcher",
51: "Drill Driver",
52: "Bane Dig Dig",
53: "Ancient Psychic Tandem War Elephant",
54: "Jakemobile",
55: "Lumpy Car",
56: "Ecto-1 (2016)",
57: "Ectozer",
58: "PerfEcto",
59: "B.A.'s Van",
60: "B.A.'s Super Van",
61: "Pain Plane",
62: "Sonic Speedster",
63: "Blue Typhoon",
64: "Moto Bug",
65: "IMF Scrambler",
66: "IMF Sport Car",
67: "IMF Tank",
68: "K.I.T.T.",
69: "Gozer Trap",
70: "Terror Dog",
71: "Gremlin Car",
72: "Flash 'n' Finish",
73: "Rampage Wrecking",
74: "Phone Home",
75: "Mobile Uplink",
76: "Super Charged Satellite",
77: "Enchanted Car",
78: "Harry's Broom",
79: "Newt's Case",
80: "Swooping Evil",
81: "Occamy",
82: "Niffler",
}
def _is_timeout_error(self, error: usb.core.USBError) -> bool:
"""Check if error is a timeout (normal during polling)."""
error_str = str(error).lower()
# Windows timeout codes vary: -116, -7, 110, or string
return (
error.errno in (-116, -7, 110, None) or
'timeout' in error_str or
@@ -410,7 +288,6 @@ class LegoDimensionsReader:
Returns True if connection successful.
"""
try:
# Get platform-appropriate backend
backend = get_usb_backend()
self.dev = usb.core.find(
@@ -434,34 +311,28 @@ class LegoDimensionsReader:
)
raise ConnectionError(error_msg)
# Detach kernel driver if active (Linux only)
try:
if self.dev.is_kernel_driver_active(0):
self.dev.detach_kernel_driver(0)
except (AttributeError, usb.core.USBError, NotImplementedError):
pass # Not available on Windows
pass
# Configure device
try:
self.dev.set_configuration()
except usb.core.USBError as e:
# Ignore "Entity not found" - device already configured
if 'entity not found' not in str(e).lower():
raise
# Get configuration and interface
cfg = self.dev.get_active_configuration()
intf = cfg[(0, 0)]
self._interface = intf.bInterfaceNumber
# Claim interface (required on Windows)
try:
usb.util.claim_interface(self.dev, self._interface)
except usb.core.USBError as e:
if 'resource busy' not in str(e).lower():
raise
# Find endpoints dynamically
self._endpoint_out = usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
@@ -471,16 +342,13 @@ class LegoDimensionsReader:
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
)
# Fallback to hardcoded endpoints if discovery fails
if self._endpoint_out is None:
self._endpoint_out = 0x01
if self._endpoint_in is None:
self._endpoint_in = 0x81
# Send initialization command
self._write_raw(TOYPAD_INIT)
# Get device info
try:
product = usb.util.get_string(self.dev, self.dev.iProduct)
print(f"Connected to: {product}")
@@ -509,12 +377,12 @@ class LegoDimensionsReader:
def disconnect(self):
"""Disconnect from portal and clean up resources."""
self.stop()
self.stop_all_effects()
if self.dev:
try:
# Turn off all LEDs
self.set_pad_color(Pad.ALL, COLORS['OFF'])
# Release interface (Windows)
if self._interface is not None:
try:
usb.util.release_interface(self.dev, self._interface)
@@ -552,22 +420,6 @@ class LegoDimensionsReader:
except usb.core.USBError:
pass
def _read_raw(self, size: int = 32, timeout: int = 100) -> Optional[list]:
"""Read raw data from device."""
if not self.dev:
return None
try:
endpoint = self._endpoint_in
if hasattr(endpoint, 'bEndpointAddress'):
endpoint = endpoint.bEndpointAddress
if endpoint is None:
endpoint = 0x81
return list(self.dev.read(endpoint, size, timeout=timeout))
except usb.core.USBError:
return None
def _send_command(self, command: list):
"""Send a command to the portal with checksum and padding."""
if not self.dev:
@@ -576,13 +428,12 @@ class LegoDimensionsReader:
checksum = self._calculate_checksum(command)
message = command + [checksum]
# Pad to 32 bytes
while len(message) < 32:
message.append(0x00)
self._write_raw(message)
def set_pad_color(self, pad: Pad, color: list):
def set_pad_color(self, pad: Pad, color: List[int]):
"""
Set a pad to a specific color immediately.
@@ -590,132 +441,164 @@ class LegoDimensionsReader:
pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT)
color: [R, G, B] values 0-255
"""
self.stop_effect(pad)
command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]]
self._send_command(command)
def stop_flash(self, pad: Pad):
"""
Stop any active flash animation on a pad.
Args:
pad: Which pad to stop flashing (Pad.ALL stops all pads)
"""
if pad == Pad.ALL:
pads_to_stop = [Pad.CENTER, Pad.LEFT, Pad.RIGHT]
else:
pads_to_stop = [pad]
for p in pads_to_stop:
if p.value in self._flash_stop_events:
self._flash_stop_events[p.value].set()
def flash_pad(self, pad: Pad, color: list, on_time: int = 10,
def flash_pad(self, pad: Pad, color: List[int], on_time: int = 10,
off_time: int = 10, count: int = 5):
"""
Flash a pad with a color using software-based blinking.
Note: Hardware flash command (0xC8) is unreliable on some portals,
so this uses rapid solid color toggling instead.
Args:
pad: Which pad to flash
color: [R, G, B] values
on_time: Ticks for LED on state (1 tick ~ 50ms)
off_time: Ticks for LED off state
count: Number of flashes (255 = infinite, capped at 50)
Flash a pad (hardware command - may not work on all portals).
Use software_flash() for reliable cross-portal flashing.
"""
# Convert ticks to seconds (1 tick ≈ 50ms)
on_secs = on_time * 0.05
off_secs = off_time * 0.05
command = [
0x55, 0x0e, 0xc8, 0x06, pad.value,
color[0], color[1], color[2],
on_time, off_time, count,
0, 0, 0, 0, 0
]
self._send_command(command)
# Cap infinite flashes to reasonable number for software implementation
actual_count = min(count, 50) if count == 255 else count
# Determine which pads to flash
if pad == Pad.ALL:
pads_to_flash = [Pad.CENTER, Pad.LEFT, Pad.RIGHT]
else:
pads_to_flash = [pad]
# Stop any existing flash on these pads
for p in pads_to_flash:
if p.value in self._flash_stop_events:
self._flash_stop_events[p.value].set()
# Create new stop events for this flash
stop_events = {}
for p in pads_to_flash:
stop_events[p.value] = threading.Event()
self._flash_stop_events[p.value] = stop_events[p.value]
def _flash_thread():
for _ in range(actual_count):
# Check if we should stop (device disconnected or stop requested)
if not self._running and not self.dev:
break
# Check if stop was requested for any of our pads
if any(stop_events[p.value].is_set() for p in pads_to_flash):
break
# ON
for p in pads_to_flash:
if not stop_events[p.value].is_set():
self.set_pad_color(p, color)
time.sleep(on_secs)
# Check again before OFF
if any(stop_events[p.value].is_set() for p in pads_to_flash):
break
# OFF
for p in pads_to_flash:
if not stop_events[p.value].is_set():
self.set_pad_color(p, COLORS['OFF'])
time.sleep(off_secs)
# Cleanup stop events when done
for p in pads_to_flash:
if p.value in self._flash_stop_events and self._flash_stop_events[p.value] == stop_events[p.value]:
del self._flash_stop_events[p.value]
# Run flash in background thread so it doesn't block
flash_thread = threading.Thread(target=_flash_thread, daemon=True)
flash_thread.start()
def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1):
def fade_pad(self, pad: Pad, color: List[int], speed: int = 10, count: int = 1):
"""
Fade a pad to a color.
Args:
pad: Which pad to fade
color: [R, G, B] target color
speed: Fade speed (higher = slower)
count: Number of fade pulses (255 = infinite)
Fade a pad (hardware command - may not work on all portals).
Use software_pulse() for reliable cross-portal pulsing.
"""
command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count,
color[0], color[1], color[2]]
self._send_command(command)
def rainbow_cycle(self, pad: Pad = Pad.ALL, speed: int = 5):
# =========================================================================
# SOFTWARE-BASED EFFECTS (Reliable cross-portal)
# =========================================================================
def stop_effect(self, pad: Pad):
"""Stop any running software effect on a specific pad."""
pad_val = pad.value
if pad_val in self._effect_stop_flags:
self._effect_stop_flags[pad_val].set()
if pad_val in self._effect_threads:
thread = self._effect_threads[pad_val]
if thread.is_alive():
thread.join(timeout=0.5)
del self._effect_threads[pad_val]
del self._effect_stop_flags[pad_val]
def stop_all_effects(self):
"""Stop all running software effects."""
for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]:
self.stop_effect(pad)
def _set_color_raw(self, pad_value: int, r: int, g: int, b: int):
"""Set color without stopping effects (internal use)."""
command = [0x55, 0x06, 0xc0, 0x02, pad_value, r, g, b]
self._send_command(command)
def software_flash(self, pad: Pad, color: List[int], on_time: float = 0.3,
off_time: float = 0.3, count: int = 0):
"""
Start a rainbow color cycle on a pad.
Flash a pad using software-controlled solid colors.
This is reliable across all portal versions.
Args:
pad: Which pad to animate
speed: Cycle speed (1-20, lower = faster)
pad: Which pad to flash
color: [R, G, B] values
on_time: Seconds LED stays on
off_time: Seconds LED stays off
count: Number of flashes (0 = infinite until stopped)
"""
# Cycle through colors using fade
colors_cycle = [
COLORS['RED'], COLORS['ORANGE'], COLORS['YELLOW'],
COLORS['GREEN'], COLORS['CYAN'], COLORS['BLUE'],
COLORS['PURPLE'], COLORS['MAGENTA']
]
for color in colors_cycle:
self.fade_pad(pad, color, speed=speed, count=1)
time.sleep(speed * 0.05)
self.stop_effect(pad)
stop_flag = threading.Event()
self._effect_stop_flags[pad.value] = stop_flag
def _flash_loop():
flash_count = 0
while not stop_flag.is_set():
self._set_color_raw(pad.value, color[0], color[1], color[2])
if stop_flag.wait(on_time):
break
self._set_color_raw(pad.value, 0, 0, 0)
if stop_flag.wait(off_time):
break
flash_count += 1
if count > 0 and flash_count >= count:
self._set_color_raw(pad.value, color[0], color[1], color[2])
break
thread = threading.Thread(target=_flash_loop, daemon=True)
self._effect_threads[pad.value] = thread
thread.start()
def software_pulse(self, pad: Pad, color: List[int], speed: float = 1.0,
min_brightness: float = 0.1, count: int = 0):
"""
Pulse/breathe effect using software-controlled brightness.
Args:
pad: Which pad to pulse
color: [R, G, B] base color
speed: Seconds for one full pulse cycle
min_brightness: Minimum brightness (0.0-1.0)
count: Number of pulses (0 = infinite until stopped)
"""
self.stop_effect(pad)
stop_flag = threading.Event()
self._effect_stop_flags[pad.value] = stop_flag
def _pulse_loop():
pulse_count = 0
start_time = time.time()
step_time = speed / 30
while not stop_flag.is_set():
elapsed = time.time() - start_time
phase = (elapsed / speed) * 2 * math.pi
brightness = (math.sin(phase - math.pi/2) + 1) / 2
brightness = min_brightness + brightness * (1 - min_brightness)
r = int(color[0] * brightness)
g = int(color[1] * brightness)
b = int(color[2] * brightness)
self._set_color_raw(pad.value, r, g, b)
if elapsed >= speed * (pulse_count + 1):
pulse_count += 1
if count > 0 and pulse_count >= count:
self._set_color_raw(pad.value, color[0], color[1], color[2])
break
if stop_flag.wait(step_time):
break
thread = threading.Thread(target=_pulse_loop, daemon=True)
self._effect_threads[pad.value] = thread
thread.start()
def apply_effect(self, pad: Pad, color: List[int], effect: str = "solid"):
"""
Apply a named effect to a pad.
Args:
pad: Which pad
color: [R, G, B] values
effect: "solid", "flash", or "pulse"
"""
effect = effect.lower()
if effect == "flash":
self.software_flash(pad, color)
elif effect == "pulse":
self.software_pulse(pad, color)
else:
self.set_pad_color(pad, color)
def _poll_events(self):
"""Background thread function for polling tag events."""
while self._running:
try:
# Read from IN endpoint with timeout
endpoint = self._endpoint_in
if hasattr(endpoint, 'bEndpointAddress'):
endpoint = endpoint.bEndpointAddress
@@ -728,16 +611,13 @@ class LegoDimensionsReader:
if not bytelist:
continue
# NFC tag events start with 0x56
if bytelist[0] != 0x56:
continue
# Parse tag event
pad_num = bytelist[2]
action = bytelist[5]
uid = bytes(bytelist[6:13])
# Validate UID (not all zeros)
if uid == b'\x00' * 7:
continue
@@ -749,31 +629,21 @@ class LegoDimensionsReader:
event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED
tag_info = TagInfo(uid=uid, pad=pad, event=event)
# Track active tags and fire callbacks
# v1.3.0: NO default LED behavior - callbacks control LEDs
if event == TagEvent.INSERTED:
self._active_tags[pad_num] = tag_info
# Visual feedback - green flash for detected tag
self.flash_pad(pad, COLORS['GREEN'], on_time=5, off_time=5, count=2)
time.sleep(0.1)
self.set_pad_color(pad, COLORS['CYAN'])
if self.on_tag_insert:
self.on_tag_insert(tag_info)
else:
if pad_num in self._active_tags:
del self._active_tags[pad_num]
# Stop any active flash and turn off pad LED
self.stop_flash(pad)
self.set_pad_color(pad, COLORS['OFF'])
self.stop_effect(pad)
if self.on_tag_remove:
self.on_tag_remove(tag_info)
except usb.core.USBError as e:
if self._is_timeout_error(e):
continue # Timeout is normal
continue
elif self._is_pipe_error(e):
if self._running and self.on_error:
self.on_error(ConnectionError("Portal disconnected"))
@@ -800,11 +670,6 @@ class LegoDimensionsReader:
self._thread = threading.Thread(target=self._poll_events, daemon=True)
self._thread.start()
# Initial LED animation to show portal is active
for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]:
self.flash_pad(pad, COLORS['BLUE'], on_time=3, off_time=3, count=2)
time.sleep(0.1)
def stop(self):
"""Stop the event polling thread."""
self._running = False
@@ -826,31 +691,22 @@ class LegoDimensionsReader:
@property
def is_connected(self) -> bool:
"""Check if portal is connected."""
return self.dev is not None
@property
def is_running(self) -> bool:
"""Check if event polling is active."""
return self._running
# =============================================================================
# TEA Encryption/Decryption for Character ID
# Based on community reverse-engineering (ags131, bettse, socram8888)
# TEA Encryption/Decryption
# =============================================================================
def tea_decrypt(data: bytes, key: bytes) -> bytes:
"""
TEA (Tiny Encryption Algorithm) decryption.
Used to decrypt character IDs from tag pages 0x24-0x25.
"""
def _u32(x):
return c_uint32(x).value
delta = 0x9e3779b9
# Convert to 32-bit words
v0 = int.from_bytes(data[0:4], 'little')
v1 = int.from_bytes(data[4:8], 'little')
k0 = int.from_bytes(key[0:4], 'little')
@@ -869,15 +725,10 @@ def tea_decrypt(data: bytes, key: bytes) -> bytes:
def tea_encrypt(data: bytes, key: bytes) -> bytes:
"""
TEA (Tiny Encryption Algorithm) encryption.
Used to encrypt character IDs for writing to tags.
"""
def _u32(x):
return c_uint32(x).value
delta = 0x9e3779b9
v0 = int.from_bytes(data[0:4], 'little')
v1 = int.from_bytes(data[4:8], 'little')
k0 = int.from_bytes(key[0:4], 'little')
@@ -896,10 +747,6 @@ def tea_encrypt(data: bytes, key: bytes) -> bytes:
def generate_tea_key(uid: bytes) -> bytes:
"""
Generate TEA encryption key from tag UID.
The key is derived by scrambling the 7-byte UID into 16 bytes.
"""
key = bytearray(16)
key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA
key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
@@ -921,10 +768,6 @@ def generate_tea_key(uid: bytes) -> bytes:
def generate_password(uid: bytes) -> bytes:
"""
Generate NTAG213 password from 7-byte UID.
This password protects pages 0x24+ on the tag.
"""
pwd = bytearray(4)
pwd[0] = 0xAA ^ uid[1] ^ uid[3]
pwd[1] = 0x55 ^ uid[2] ^ uid[4]
@@ -934,34 +777,13 @@ def generate_password(uid: bytes) -> bytes:
def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int:
"""
Decrypt character ID from tag memory pages 0x24-0x25.
Args:
uid: 7-byte tag UID
encrypted_data: 8 bytes from pages 0x24-0x25
Returns:
Decrypted character ID integer
"""
key = generate_tea_key(uid)
decrypted = tea_decrypt(encrypted_data, key)
return int.from_bytes(decrypted[0:2], 'little')
def encrypt_character_id(uid: bytes, character_id: int) -> bytes:
"""
Encrypt character ID for writing to tag pages 0x24-0x25.
Args:
uid: 7-byte tag UID
character_id: Character ID to encrypt
Returns:
8 bytes of encrypted data
"""
key = generate_tea_key(uid)
# Character ID in first 2 bytes, rest is padding
data = character_id.to_bytes(2, 'little') + b'\x00' * 6
return tea_encrypt(data, key)
@@ -975,6 +797,7 @@ if __name__ == "__main__":
print(f"\n{'='*50}")
print(f"[+] TAG PLACED on {tag.pad.name} pad")
print(f" UID: {tag.uid_hex}")
print(f" Legacy Key: {tag.legacy_key}")
print(f" Password: {generate_password(tag.uid).hex().upper()}")
print(f"{'='*50}")
@@ -993,6 +816,9 @@ if __name__ == "__main__":
print(f"\nLEGO Dimensions Portal Reader v{__version__}")
print(f"Platform: {platform.system()}")
print("="*50)
print("NOTE: v1.3.0 - No default LED blink on tag insert")
print(" Callbacks control all LED behavior")
print("="*50)
reader = LegoDimensionsReader()
reader.on_tag_insert = on_insert