- TagInfo now includes nfc_card_id: the 10-digit decimal ID standard NFC readers display - Calculated from bytes 3-6 of the 7-byte UID as big-endian uint32 - Example: UID 042B603A9A4080 -> nfc_card_id 983187584 - Updated standalone test to display nfc_card_id - Version bump to 1.4.0
863 lines
30 KiB
Python
863 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
LEGO Dimensions Portal Reader Module
|
|
For Moonlight Drive-In Video Player System
|
|
|
|
Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions)
|
|
to detect character and vehicle disc placement/removal events.
|
|
|
|
v1.4.0 Changes:
|
|
- Added nfc_card_id field: decimal ID matching standard NFC readers
|
|
- Calculated from bytes 3-6 of UID as big-endian uint32
|
|
- Example: UID 042B603A9A4080 -> nfc_card_id 983187584
|
|
|
|
v1.3.0 Changes:
|
|
- Removed default LED blink on tag insert (callbacks now control LEDs)
|
|
- Added software_flash() method for reliable flashing via solid color loop
|
|
- Added software_pulse() method for pulsing effect
|
|
- Effects now run in background threads to not block event handling
|
|
|
|
Requirements:
|
|
pip install pyusb libusb-package
|
|
|
|
Windows Driver Setup:
|
|
1. Download Zadig from https://zadig.akeo.ie/
|
|
2. Connect the LEGO Dimensions portal
|
|
3. Run Zadig as administrator
|
|
4. Select Options -> List All Devices
|
|
5. Choose "LEGO READER V2.10" from the dropdown
|
|
6. Select WinUSB as the target driver
|
|
7. Click "Replace Driver"
|
|
|
|
Windows libusb Setup:
|
|
The libusb-package provides pre-compiled libusb-1.0.dll.
|
|
Alternatively, download from https://libusb.info/ and place
|
|
libusb-1.0.dll in C:\\Windows\\System32
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import platform
|
|
|
|
# Windows-specific backend setup - must be done BEFORE importing usb.core
|
|
def _setup_windows_backend():
|
|
"""Configure libusb backend for Windows."""
|
|
if platform.system() != 'Windows':
|
|
return
|
|
|
|
# Try libusb-package first (recommended)
|
|
try:
|
|
import libusb_package
|
|
return
|
|
except ImportError:
|
|
pass
|
|
|
|
# Search for libusb DLL in common locations
|
|
dll_paths = [
|
|
os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'System32'),
|
|
os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'SysWOW64'),
|
|
os.path.dirname(sys.executable),
|
|
os.getcwd(),
|
|
]
|
|
|
|
for path in dll_paths:
|
|
dll_file = os.path.join(path, 'libusb-1.0.dll')
|
|
if os.path.exists(dll_file):
|
|
os.environ['PATH'] = path + os.pathsep + os.environ.get('PATH', '')
|
|
return
|
|
|
|
# Run setup before USB imports
|
|
_setup_windows_backend()
|
|
|
|
import usb.core
|
|
import usb.util
|
|
import usb.backend.libusb1
|
|
import threading
|
|
import time
|
|
import json
|
|
import math
|
|
from typing import Callable, Optional, Dict, Any, List
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from ctypes import c_uint32
|
|
|
|
# USB Device Identifiers
|
|
VENDOR_ID = 0x0e6f
|
|
PRODUCT_ID = 0x0241
|
|
|
|
# Module version
|
|
__version__ = "1.4.0"
|
|
|
|
|
|
def get_usb_backend():
|
|
"""Get the appropriate USB backend for the current platform."""
|
|
if platform.system() == 'Windows':
|
|
# Try libusb-package first
|
|
try:
|
|
import libusb_package
|
|
return libusb_package.get_libusb1_backend()
|
|
except (ImportError, Exception):
|
|
pass
|
|
|
|
# Try default libusb1 backend
|
|
try:
|
|
return usb.backend.libusb1.get_backend()
|
|
except Exception:
|
|
pass
|
|
|
|
return None # Use default backend
|
|
|
|
|
|
class Pad(Enum):
|
|
"""Portal pad identifiers"""
|
|
ALL = 0
|
|
CENTER = 1
|
|
LEFT = 2
|
|
RIGHT = 3
|
|
|
|
|
|
class TagEvent(Enum):
|
|
"""Tag event types"""
|
|
INSERTED = 0
|
|
REMOVED = 1
|
|
|
|
|
|
# Predefined colors [R, G, B]
|
|
COLORS = {
|
|
'OFF': [0, 0, 0],
|
|
'RED': [255, 0, 0],
|
|
'GREEN': [0, 255, 0],
|
|
'BLUE': [0, 0, 255],
|
|
'WHITE': [255, 255, 255],
|
|
'YELLOW': [255, 255, 0],
|
|
'CYAN': [0, 255, 255],
|
|
'MAGENTA': [255, 0, 255],
|
|
'ORANGE': [255, 128, 0],
|
|
'PURPLE': [128, 0, 255],
|
|
'PINK': [255, 105, 180],
|
|
}
|
|
|
|
# Portal initialization command (contains "(c) LEGO 2014" signature)
|
|
TOYPAD_INIT = [
|
|
0x55, 0x0f, 0xb0, 0x01, 0x28, 0x63, 0x29, 0x20,
|
|
0x4c, 0x45, 0x47, 0x4f, 0x20, 0x32, 0x30, 0x31,
|
|
0x34, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class TagInfo:
|
|
"""Information about a detected NFC tag"""
|
|
uid: bytes
|
|
pad: Pad
|
|
event: TagEvent
|
|
uid_hex: str = ""
|
|
nfc_card_id: int = 0 # Standard NFC card ID (bytes 3-6 as decimal)
|
|
legacy_key: int = 0 # Legacy key (first 4 bytes, big-endian)
|
|
character_id: Optional[int] = None
|
|
character_name: Optional[str] = None
|
|
is_vehicle: bool = False
|
|
|
|
def __post_init__(self):
|
|
self.uid_hex = self.uid.hex().upper()
|
|
|
|
# Calculate legacy key (32-bit int from first 4 bytes, big-endian)
|
|
if len(self.uid) >= 4:
|
|
self.legacy_key = int.from_bytes(self.uid[:4], 'big')
|
|
|
|
# Calculate NFC card ID (bytes 3-6 as big-endian uint32)
|
|
# This matches what standard NFC readers display (e.g., 983187584)
|
|
if len(self.uid) >= 7:
|
|
self.nfc_card_id = int.from_bytes(self.uid[3:7], 'big')
|
|
|
|
@property
|
|
def nfc_card_id_str(self) -> str:
|
|
"""Return NFC card ID as zero-padded 10-digit string."""
|
|
return f"{self.nfc_card_id:010d}"
|
|
|
|
|
|
class LegoDimensionsReader:
|
|
"""
|
|
LEGO Dimensions Portal NFC Reader
|
|
|
|
Provides event-driven tag detection with callbacks for integration
|
|
with the Moonlight Drive-In video player system.
|
|
|
|
v1.4.0: TagInfo now includes nfc_card_id matching standard NFC readers.
|
|
v1.3.0: No default LED behavior on tag insert/remove.
|
|
Callbacks are fully responsible for LED control.
|
|
|
|
Usage:
|
|
reader = LegoDimensionsReader()
|
|
reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.nfc_card_id}")
|
|
reader.on_tag_remove = lambda tag: print(f"Tag removed: {tag.nfc_card_id}")
|
|
reader.start()
|
|
"""
|
|
|
|
def __init__(self, character_db_path: Optional[str] = None):
|
|
"""
|
|
Initialize the LEGO Dimensions reader.
|
|
|
|
Args:
|
|
character_db_path: Optional path to character_database.json
|
|
"""
|
|
self.dev = None
|
|
self._running = False
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._lock = threading.Lock()
|
|
|
|
# USB endpoints (will be discovered on connect)
|
|
self._endpoint_in = None
|
|
self._endpoint_out = None
|
|
self._interface = None
|
|
|
|
# Active tags currently on pads (pad_num -> TagInfo)
|
|
self._active_tags: Dict[int, TagInfo] = {}
|
|
|
|
# Effect control - track running effects per pad
|
|
self._effect_threads: Dict[int, threading.Thread] = {}
|
|
self._effect_stop_flags: Dict[int, threading.Event] = {}
|
|
|
|
# Callbacks
|
|
self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None
|
|
self.on_tag_remove: Optional[Callable[[TagInfo], None]] = None
|
|
self.on_error: Optional[Callable[[Exception], None]] = None
|
|
self.on_connect: Optional[Callable[[], None]] = None
|
|
self.on_disconnect: Optional[Callable[[], None]] = None
|
|
|
|
# Character database for ID lookup
|
|
self._character_db = self._load_character_database(character_db_path)
|
|
self._vehicle_db = self._load_vehicle_database(character_db_path)
|
|
|
|
def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]:
|
|
"""Load character ID database."""
|
|
if db_path and os.path.exists(db_path):
|
|
try:
|
|
with open(db_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return {int(k): v for k, v in data.get('characters', {}).items()}
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
1: "Batman", 2: "Gandalf", 3: "Wyldstyle", 4: "Aquaman", 5: "Bad Cop",
|
|
6: "Bane", 7: "Bart Simpson", 8: "Benny", 9: "Chell", 10: "Cole",
|
|
11: "Cragger", 12: "Cyborg", 13: "Cyberman", 14: "Doc Brown",
|
|
15: "The Doctor", 16: "Emmet", 17: "Eris", 18: "Gimli", 19: "Gollum",
|
|
20: "Harley Quinn", 21: "Homer Simpson", 22: "Jay", 23: "Joker",
|
|
24: "Kai", 25: "ACU Trooper", 26: "Krusty the Clown", 27: "Laval",
|
|
28: "Legolas", 29: "Lloyd", 30: "Marty McFly", 31: "Nya",
|
|
32: "Owen Grady", 33: "Peter Venkman", 34: "Scooby-Doo",
|
|
35: "Sensei Wu", 36: "Shaggy", 37: "Slimer", 38: "Superman",
|
|
39: "Unikitty", 40: "Wicked Witch", 41: "Wonder Woman", 42: "Zane",
|
|
43: "Green Arrow", 44: "Supergirl", 45: "Abby Yates",
|
|
46: "Finn the Human", 47: "Harry Potter", 48: "Ethan Hunt",
|
|
49: "Lumpy Space Princess", 50: "Jake the Dog", 51: "B.A. Baracus",
|
|
52: "Michael Knight", 53: "Gizmo", 54: "Sonic the Hedgehog",
|
|
55: "Gamer Kid", 56: "E.T.", 57: "Lord Voldemort",
|
|
58: "Hermione Granger", 59: "Newt Scamander", 60: "Tina Goldstein",
|
|
61: "Stripe", 62: "Sloth", 63: "Beetlejuice", 64: "Kevin",
|
|
65: "Erin Gilbert", 66: "Patty Tolan", 67: "Jillian Holtzmann",
|
|
68: "Stay Puft", 69: "Chase McCain", 70: "Excalibur Batman",
|
|
71: "Raven", 72: "Beast Boy", 73: "Starfire", 74: "Blossom",
|
|
75: "Bubbles", 76: "Buttercup", 77: "Marceline", 78: "Batgirl",
|
|
79: "Robin", 80: "Knight Rider (KITT)", 81: "Gremlins Gizmo",
|
|
82: "Teen Titans Go!",
|
|
}
|
|
|
|
def _load_vehicle_database(self, db_path: Optional[str] = None) -> Dict[int, str]:
|
|
"""Load vehicle ID database."""
|
|
if db_path and os.path.exists(db_path):
|
|
try:
|
|
with open(db_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return {int(k): v for k, v in data.get('vehicles', {}).items()}
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
1: "Batmobile", 2: "Batwing", 3: "Bat Blaster",
|
|
4: "DeLorean Time Machine", 5: "Hoverboard", 6: "Travelling Time Train",
|
|
7: "TARDIS", 8: "K9", 9: "Dalek", 10: "Companion Cube",
|
|
11: "Sentry Turret", 12: "GLaDOS", 13: "Mystery Machine",
|
|
14: "Scooby Snack", 15: "Gyrocopter", 16: "Ghost Trap",
|
|
17: "Ecto-1", 18: "PKE Meter", 19: "Emmet's Excavator",
|
|
20: "Emmet's Car",
|
|
}
|
|
|
|
def _is_timeout_error(self, error: usb.core.USBError) -> bool:
|
|
"""Check if error is a timeout (normal during polling)."""
|
|
error_str = str(error).lower()
|
|
return (
|
|
error.errno in (-116, -7, 110, None) or
|
|
'timeout' in error_str or
|
|
'operation timed out' in error_str or
|
|
'timed out' in error_str
|
|
)
|
|
|
|
def _is_pipe_error(self, error: usb.core.USBError) -> bool:
|
|
"""Check if error indicates device disconnection."""
|
|
return error.errno == -9 or 'pipe' in str(error).lower()
|
|
|
|
def connect(self) -> bool:
|
|
"""
|
|
Establish connection to LEGO Dimensions portal.
|
|
Returns True if connection successful.
|
|
"""
|
|
try:
|
|
backend = get_usb_backend()
|
|
|
|
self.dev = usb.core.find(
|
|
idVendor=VENDOR_ID,
|
|
idProduct=PRODUCT_ID,
|
|
backend=backend
|
|
)
|
|
|
|
if self.dev is None:
|
|
error_msg = (
|
|
"LEGO Dimensions Portal not found.\n"
|
|
"Ensure:\n"
|
|
" 1. Portal is connected via USB\n"
|
|
" 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n"
|
|
" 3. WinUSB driver installed via Zadig"
|
|
)
|
|
if platform.system() == 'Windows':
|
|
error_msg += (
|
|
"\n 4. libusb installed: pip install libusb-package\n"
|
|
" 5. Or libusb-1.0.dll in System32"
|
|
)
|
|
raise ConnectionError(error_msg)
|
|
|
|
try:
|
|
if self.dev.is_kernel_driver_active(0):
|
|
self.dev.detach_kernel_driver(0)
|
|
except (AttributeError, usb.core.USBError, NotImplementedError):
|
|
pass
|
|
|
|
try:
|
|
self.dev.set_configuration()
|
|
except usb.core.USBError as e:
|
|
if 'entity not found' not in str(e).lower():
|
|
raise
|
|
|
|
cfg = self.dev.get_active_configuration()
|
|
intf = cfg[(0, 0)]
|
|
self._interface = intf.bInterfaceNumber
|
|
|
|
try:
|
|
usb.util.claim_interface(self.dev, self._interface)
|
|
except usb.core.USBError as e:
|
|
if 'resource busy' not in str(e).lower():
|
|
raise
|
|
|
|
self._endpoint_out = usb.util.find_descriptor(
|
|
intf,
|
|
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
|
|
)
|
|
self._endpoint_in = usb.util.find_descriptor(
|
|
intf,
|
|
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
|
|
)
|
|
|
|
if self._endpoint_out is None:
|
|
self._endpoint_out = 0x01
|
|
if self._endpoint_in is None:
|
|
self._endpoint_in = 0x81
|
|
|
|
self._write_raw(TOYPAD_INIT)
|
|
|
|
try:
|
|
product = usb.util.get_string(self.dev, self.dev.iProduct)
|
|
print(f"Connected to: {product}")
|
|
except Exception:
|
|
print("Connected to LEGO Dimensions Portal")
|
|
|
|
if self.on_connect:
|
|
self.on_connect()
|
|
|
|
return True
|
|
|
|
except usb.core.USBError as e:
|
|
error_msg = str(e)
|
|
if "access denied" in error_msg.lower() or "insufficient permissions" in error_msg.lower():
|
|
print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.")
|
|
elif "no backend" in error_msg.lower():
|
|
print("ERROR: No USB backend found. Install libusb-package: pip install libusb-package")
|
|
if self.on_error:
|
|
self.on_error(e)
|
|
return False
|
|
except Exception as e:
|
|
if self.on_error:
|
|
self.on_error(e)
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from portal and clean up resources."""
|
|
self.stop()
|
|
self.stop_all_effects()
|
|
|
|
if self.dev:
|
|
try:
|
|
self.set_pad_color(Pad.ALL, COLORS['OFF'])
|
|
|
|
if self._interface is not None:
|
|
try:
|
|
usb.util.release_interface(self.dev, self._interface)
|
|
except Exception:
|
|
pass
|
|
|
|
usb.util.dispose_resources(self.dev)
|
|
except Exception:
|
|
pass
|
|
self.dev = None
|
|
self._endpoint_in = None
|
|
self._endpoint_out = None
|
|
self._interface = None
|
|
if self.on_disconnect:
|
|
self.on_disconnect()
|
|
|
|
def _calculate_checksum(self, command: list) -> int:
|
|
"""Calculate checksum (sum of bytes mod 256)"""
|
|
return sum(command) % 256
|
|
|
|
def _write_raw(self, data: list):
|
|
"""Write raw data to device."""
|
|
if not self.dev:
|
|
return
|
|
|
|
try:
|
|
endpoint = self._endpoint_out
|
|
if hasattr(endpoint, 'bEndpointAddress'):
|
|
endpoint = endpoint.bEndpointAddress
|
|
if endpoint is None:
|
|
endpoint = 0x01
|
|
|
|
with self._lock:
|
|
self.dev.write(endpoint, data)
|
|
except usb.core.USBError:
|
|
pass
|
|
|
|
def _send_command(self, command: list):
|
|
"""Send a command to the portal with checksum and padding."""
|
|
if not self.dev:
|
|
return
|
|
|
|
checksum = self._calculate_checksum(command)
|
|
message = command + [checksum]
|
|
|
|
while len(message) < 32:
|
|
message.append(0x00)
|
|
|
|
self._write_raw(message)
|
|
|
|
def set_pad_color(self, pad: Pad, color: List[int]):
|
|
"""
|
|
Set a pad to a specific color immediately.
|
|
|
|
Args:
|
|
pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT)
|
|
color: [R, G, B] values 0-255
|
|
"""
|
|
self.stop_effect(pad)
|
|
command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]]
|
|
self._send_command(command)
|
|
|
|
def flash_pad(self, pad: Pad, color: List[int], on_time: int = 10,
|
|
off_time: int = 10, count: int = 5):
|
|
"""
|
|
Flash a pad (hardware command - may not work on all portals).
|
|
Use software_flash() for reliable cross-portal flashing.
|
|
"""
|
|
command = [
|
|
0x55, 0x0e, 0xc8, 0x06, pad.value,
|
|
color[0], color[1], color[2],
|
|
on_time, off_time, count,
|
|
0, 0, 0, 0, 0
|
|
]
|
|
self._send_command(command)
|
|
|
|
def fade_pad(self, pad: Pad, color: List[int], speed: int = 10, count: int = 1):
|
|
"""
|
|
Fade a pad (hardware command - may not work on all portals).
|
|
Use software_pulse() for reliable cross-portal pulsing.
|
|
"""
|
|
command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count,
|
|
color[0], color[1], color[2]]
|
|
self._send_command(command)
|
|
|
|
# =========================================================================
|
|
# SOFTWARE-BASED EFFECTS (Reliable cross-portal)
|
|
# =========================================================================
|
|
|
|
def stop_effect(self, pad: Pad):
|
|
"""Stop any running software effect on a specific pad."""
|
|
pad_val = pad.value
|
|
if pad_val in self._effect_stop_flags:
|
|
self._effect_stop_flags[pad_val].set()
|
|
if pad_val in self._effect_threads:
|
|
thread = self._effect_threads[pad_val]
|
|
if thread.is_alive():
|
|
thread.join(timeout=0.5)
|
|
del self._effect_threads[pad_val]
|
|
del self._effect_stop_flags[pad_val]
|
|
|
|
def stop_all_effects(self):
|
|
"""Stop all running software effects."""
|
|
for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]:
|
|
self.stop_effect(pad)
|
|
|
|
def _set_color_raw(self, pad_value: int, r: int, g: int, b: int):
|
|
"""Set color without stopping effects (internal use)."""
|
|
command = [0x55, 0x06, 0xc0, 0x02, pad_value, r, g, b]
|
|
self._send_command(command)
|
|
|
|
def software_flash(self, pad: Pad, color: List[int], on_time: float = 0.3,
|
|
off_time: float = 0.3, count: int = 0):
|
|
"""
|
|
Flash a pad using software-controlled solid colors.
|
|
This is reliable across all portal versions.
|
|
|
|
Args:
|
|
pad: Which pad to flash
|
|
color: [R, G, B] values
|
|
on_time: Seconds LED stays on
|
|
off_time: Seconds LED stays off
|
|
count: Number of flashes (0 = infinite until stopped)
|
|
"""
|
|
self.stop_effect(pad)
|
|
|
|
stop_flag = threading.Event()
|
|
self._effect_stop_flags[pad.value] = stop_flag
|
|
|
|
def _flash_loop():
|
|
flash_count = 0
|
|
while not stop_flag.is_set():
|
|
self._set_color_raw(pad.value, color[0], color[1], color[2])
|
|
if stop_flag.wait(on_time):
|
|
break
|
|
self._set_color_raw(pad.value, 0, 0, 0)
|
|
if stop_flag.wait(off_time):
|
|
break
|
|
|
|
flash_count += 1
|
|
if count > 0 and flash_count >= count:
|
|
self._set_color_raw(pad.value, color[0], color[1], color[2])
|
|
break
|
|
|
|
thread = threading.Thread(target=_flash_loop, daemon=True)
|
|
self._effect_threads[pad.value] = thread
|
|
thread.start()
|
|
|
|
def software_pulse(self, pad: Pad, color: List[int], speed: float = 1.0,
|
|
min_brightness: float = 0.1, count: int = 0):
|
|
"""
|
|
Pulse/breathe effect using software-controlled brightness.
|
|
|
|
Args:
|
|
pad: Which pad to pulse
|
|
color: [R, G, B] base color
|
|
speed: Seconds for one full pulse cycle
|
|
min_brightness: Minimum brightness (0.0-1.0)
|
|
count: Number of pulses (0 = infinite until stopped)
|
|
"""
|
|
self.stop_effect(pad)
|
|
|
|
stop_flag = threading.Event()
|
|
self._effect_stop_flags[pad.value] = stop_flag
|
|
|
|
def _pulse_loop():
|
|
pulse_count = 0
|
|
start_time = time.time()
|
|
step_time = speed / 30
|
|
|
|
while not stop_flag.is_set():
|
|
elapsed = time.time() - start_time
|
|
phase = (elapsed / speed) * 2 * math.pi
|
|
brightness = (math.sin(phase - math.pi/2) + 1) / 2
|
|
brightness = min_brightness + brightness * (1 - min_brightness)
|
|
|
|
r = int(color[0] * brightness)
|
|
g = int(color[1] * brightness)
|
|
b = int(color[2] * brightness)
|
|
|
|
self._set_color_raw(pad.value, r, g, b)
|
|
|
|
if elapsed >= speed * (pulse_count + 1):
|
|
pulse_count += 1
|
|
if count > 0 and pulse_count >= count:
|
|
self._set_color_raw(pad.value, color[0], color[1], color[2])
|
|
break
|
|
|
|
if stop_flag.wait(step_time):
|
|
break
|
|
|
|
thread = threading.Thread(target=_pulse_loop, daemon=True)
|
|
self._effect_threads[pad.value] = thread
|
|
thread.start()
|
|
|
|
def apply_effect(self, pad: Pad, color: List[int], effect: str = "solid"):
|
|
"""
|
|
Apply a named effect to a pad.
|
|
|
|
Args:
|
|
pad: Which pad
|
|
color: [R, G, B] values
|
|
effect: "solid", "flash", or "pulse"
|
|
"""
|
|
effect = effect.lower()
|
|
if effect == "flash":
|
|
self.software_flash(pad, color)
|
|
elif effect == "pulse":
|
|
self.software_pulse(pad, color)
|
|
else:
|
|
self.set_pad_color(pad, color)
|
|
|
|
def _poll_events(self):
|
|
"""Background thread function for polling tag events."""
|
|
while self._running:
|
|
try:
|
|
endpoint = self._endpoint_in
|
|
if hasattr(endpoint, 'bEndpointAddress'):
|
|
endpoint = endpoint.bEndpointAddress
|
|
if endpoint is None:
|
|
endpoint = 0x81
|
|
|
|
in_packet = self.dev.read(endpoint, 32, timeout=100)
|
|
bytelist = list(in_packet)
|
|
|
|
if not bytelist:
|
|
continue
|
|
|
|
if bytelist[0] != 0x56:
|
|
continue
|
|
|
|
pad_num = bytelist[2]
|
|
action = bytelist[5]
|
|
uid = bytes(bytelist[6:13])
|
|
|
|
if uid == b'\x00' * 7:
|
|
continue
|
|
|
|
try:
|
|
pad = Pad(pad_num)
|
|
except ValueError:
|
|
pad = Pad.CENTER
|
|
|
|
event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED
|
|
tag_info = TagInfo(uid=uid, pad=pad, event=event)
|
|
|
|
# v1.3.0+: NO default LED behavior - callbacks control LEDs
|
|
if event == TagEvent.INSERTED:
|
|
self._active_tags[pad_num] = tag_info
|
|
if self.on_tag_insert:
|
|
self.on_tag_insert(tag_info)
|
|
else:
|
|
if pad_num in self._active_tags:
|
|
del self._active_tags[pad_num]
|
|
self.stop_effect(pad)
|
|
if self.on_tag_remove:
|
|
self.on_tag_remove(tag_info)
|
|
|
|
except usb.core.USBError as e:
|
|
if self._is_timeout_error(e):
|
|
continue
|
|
elif self._is_pipe_error(e):
|
|
if self._running and self.on_error:
|
|
self.on_error(ConnectionError("Portal disconnected"))
|
|
break
|
|
elif self._running:
|
|
if self.on_error:
|
|
self.on_error(e)
|
|
time.sleep(0.5)
|
|
except Exception as e:
|
|
if self._running and self.on_error:
|
|
self.on_error(e)
|
|
time.sleep(0.1)
|
|
|
|
def start(self):
|
|
"""Start the event polling thread."""
|
|
if self._running:
|
|
return
|
|
|
|
if not self.dev:
|
|
if not self.connect():
|
|
raise ConnectionError("Failed to connect to portal")
|
|
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._poll_events, daemon=True)
|
|
self._thread.start()
|
|
|
|
def stop(self):
|
|
"""Stop the event polling thread."""
|
|
self._running = False
|
|
if self._thread:
|
|
self._thread.join(timeout=2.0)
|
|
self._thread = None
|
|
|
|
def get_active_tags(self) -> Dict[int, TagInfo]:
|
|
"""Get dictionary of currently active tags on pads."""
|
|
return self._active_tags.copy()
|
|
|
|
def get_character_name(self, character_id: int) -> Optional[str]:
|
|
"""Look up character name from character ID."""
|
|
return self._character_db.get(character_id)
|
|
|
|
def get_vehicle_name(self, vehicle_id: int) -> Optional[str]:
|
|
"""Look up vehicle name from vehicle ID."""
|
|
return self._vehicle_db.get(vehicle_id)
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
return self.dev is not None
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
return self._running
|
|
|
|
|
|
# =============================================================================
|
|
# TEA Encryption/Decryption
|
|
# =============================================================================
|
|
|
|
def tea_decrypt(data: bytes, key: bytes) -> bytes:
|
|
def _u32(x):
|
|
return c_uint32(x).value
|
|
|
|
delta = 0x9e3779b9
|
|
v0 = int.from_bytes(data[0:4], 'little')
|
|
v1 = int.from_bytes(data[4:8], 'little')
|
|
k0 = int.from_bytes(key[0:4], 'little')
|
|
k1 = int.from_bytes(key[4:8], 'little')
|
|
k2 = int.from_bytes(key[8:12], 'little')
|
|
k3 = int.from_bytes(key[12:16], 'little')
|
|
|
|
total = _u32(delta * 32)
|
|
|
|
for _ in range(32):
|
|
v1 = _u32(v1 - _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3)))
|
|
v0 = _u32(v0 - _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1)))
|
|
total = _u32(total - delta)
|
|
|
|
return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little')
|
|
|
|
|
|
def tea_encrypt(data: bytes, key: bytes) -> bytes:
|
|
def _u32(x):
|
|
return c_uint32(x).value
|
|
|
|
delta = 0x9e3779b9
|
|
v0 = int.from_bytes(data[0:4], 'little')
|
|
v1 = int.from_bytes(data[4:8], 'little')
|
|
k0 = int.from_bytes(key[0:4], 'little')
|
|
k1 = int.from_bytes(key[4:8], 'little')
|
|
k2 = int.from_bytes(key[8:12], 'little')
|
|
k3 = int.from_bytes(key[12:16], 'little')
|
|
|
|
total = 0
|
|
|
|
for _ in range(32):
|
|
total = _u32(total + delta)
|
|
v0 = _u32(v0 + _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1)))
|
|
v1 = _u32(v1 + _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3)))
|
|
|
|
return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little')
|
|
|
|
|
|
def generate_tea_key(uid: bytes) -> bytes:
|
|
key = bytearray(16)
|
|
key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA
|
|
key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
|
|
key[2] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA
|
|
key[3] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
|
|
key[4] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA
|
|
key[5] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
|
|
key[6] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA
|
|
key[7] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
|
|
key[8] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA
|
|
key[9] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
|
|
key[10] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA
|
|
key[11] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
|
|
key[12] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA
|
|
key[13] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
|
|
key[14] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA
|
|
key[15] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
|
|
return bytes(key)
|
|
|
|
|
|
def generate_password(uid: bytes) -> bytes:
|
|
pwd = bytearray(4)
|
|
pwd[0] = 0xAA ^ uid[1] ^ uid[3]
|
|
pwd[1] = 0x55 ^ uid[2] ^ uid[4]
|
|
pwd[2] = 0xAA ^ uid[3] ^ uid[5]
|
|
pwd[3] = 0x55 ^ uid[4] ^ uid[6]
|
|
return bytes(pwd)
|
|
|
|
|
|
def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int:
|
|
key = generate_tea_key(uid)
|
|
decrypted = tea_decrypt(encrypted_data, key)
|
|
return int.from_bytes(decrypted[0:2], 'little')
|
|
|
|
|
|
def encrypt_character_id(uid: bytes, character_id: int) -> bytes:
|
|
key = generate_tea_key(uid)
|
|
data = character_id.to_bytes(2, 'little') + b'\x00' * 6
|
|
return tea_encrypt(data, key)
|
|
|
|
|
|
# =============================================================================
|
|
# Standalone test
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
def on_insert(tag: TagInfo):
|
|
print(f"\n{'='*50}")
|
|
print(f"[+] TAG PLACED on {tag.pad.name} pad")
|
|
print(f" UID: {tag.uid_hex}")
|
|
print(f" NFC Card ID: {tag.nfc_card_id} ({tag.nfc_card_id_str})")
|
|
print(f" Password: {generate_password(tag.uid).hex().upper()}")
|
|
print(f"{'='*50}")
|
|
|
|
def on_remove(tag: TagInfo):
|
|
print(f"\n[-] TAG REMOVED from {tag.pad.name} pad")
|
|
|
|
def on_error(e: Exception):
|
|
print(f"\n[!] ERROR: {e}")
|
|
|
|
def on_connect():
|
|
print("[+] Portal connected and initialized")
|
|
|
|
def on_disconnect():
|
|
print("[-] Portal disconnected")
|
|
|
|
print(f"\nLEGO Dimensions Portal Reader v{__version__}")
|
|
print(f"Platform: {platform.system()}")
|
|
print("="*50)
|
|
print("v1.4.0: Now shows NFC Card ID (decimal) for compatibility")
|
|
print(" with standard NFC readers and mapping files")
|
|
print("="*50)
|
|
|
|
reader = LegoDimensionsReader()
|
|
reader.on_tag_insert = on_insert
|
|
reader.on_tag_remove = on_remove
|
|
reader.on_error = on_error
|
|
reader.on_connect = on_connect
|
|
reader.on_disconnect = on_disconnect
|
|
|
|
try:
|
|
reader.start()
|
|
print("\nPortal Active - Place character discs...")
|
|
print("Press Ctrl+C to exit\n")
|
|
|
|
while True:
|
|
time.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nShutting down...")
|
|
except ConnectionError as e:
|
|
print(f"\nConnection failed: {e}")
|
|
finally:
|
|
reader.disconnect()
|
|
print("Goodbye!")
|