Files
lego-dimensions-reader/lego_dimensions_reader.py

772 lines
24 KiB
Python

#!/usr/bin/env python3
"""
LEGO Dimensions Portal Reader Module
For Moonlight Drive-In Video Player System
Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions)
to detect character and vehicle disc placement/removal events.
Requirements:
pip install pyusb
Windows Driver Setup:
1. Download Zadig from https://zadig.akeo.ie/
2. Connect the LEGO Dimensions portal
3. Run Zadig as administrator
4. Select Options → List All Devices
5. Choose "LEGO READER V2.10" from the dropdown
6. Select WinUSB as the target driver
7. Click "Replace Driver"
"""
import usb.core
import usb.util
import threading
import time
import json
import os
from typing import Callable, Optional, Dict, Any
from dataclasses import dataclass
from enum import Enum
from ctypes import c_uint32
# USB Device Identifiers
VENDOR_ID = 0x0e6f
PRODUCT_ID = 0x0241
# Module version
__version__ = "1.0.0"
class Pad(Enum):
"""Portal pad identifiers"""
ALL = 0
CENTER = 1
LEFT = 2
RIGHT = 3
class TagEvent(Enum):
"""Tag event types"""
INSERTED = 0
REMOVED = 1
# Predefined colors [R, G, B]
COLORS = {
'OFF': [0, 0, 0],
'RED': [255, 0, 0],
'GREEN': [0, 255, 0],
'BLUE': [0, 0, 255],
'WHITE': [255, 255, 255],
'YELLOW': [255, 255, 0],
'CYAN': [0, 255, 255],
'MAGENTA': [255, 0, 255],
'ORANGE': [255, 128, 0],
'PURPLE': [128, 0, 255],
'PINK': [255, 105, 180],
}
# Portal initialization command (contains "(c) LEGO 2014" signature)
TOYPAD_INIT = [
0x55, 0x0f, 0xb0, 0x01, 0x28, 0x63, 0x29, 0x20,
0x4c, 0x45, 0x47, 0x4f, 0x20, 0x32, 0x30, 0x31,
0x34, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
]
@dataclass
class TagInfo:
"""Information about a detected NFC tag"""
uid: bytes
pad: Pad
event: TagEvent
uid_hex: str = ""
character_id: Optional[int] = None
character_name: Optional[str] = None
is_vehicle: bool = False
def __post_init__(self):
self.uid_hex = self.uid.hex().upper()
class LegoDimensionsReader:
"""
LEGO Dimensions Portal NFC Reader
Provides event-driven tag detection with callbacks for integration
with the Moonlight Drive-In video player system.
Usage:
reader = LegoDimensionsReader()
reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.uid_hex}")
reader.on_tag_remove = lambda tag: print(f"Tag removed: {tag.uid_hex}")
reader.start()
"""
def __init__(self, character_db_path: Optional[str] = None):
"""
Initialize the LEGO Dimensions reader.
Args:
character_db_path: Optional path to character_database.json
"""
self.dev = None
self._running = False
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
# Active tags currently on pads (pad_num -> TagInfo)
self._active_tags: Dict[int, TagInfo] = {}
# Callbacks
self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None
self.on_tag_remove: Optional[Callable[[TagInfo], None]] = None
self.on_error: Optional[Callable[[Exception], None]] = None
self.on_connect: Optional[Callable[[], None]] = None
self.on_disconnect: Optional[Callable[[], None]] = None
# Character database for ID lookup
self._character_db = self._load_character_database(character_db_path)
self._vehicle_db = self._load_vehicle_database(character_db_path)
def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]:
"""Load character ID database."""
# Try to load from JSON file if provided
if db_path and os.path.exists(db_path):
try:
with open(db_path, 'r') as f:
data = json.load(f)
return {int(k): v for k, v in data.get('characters', {}).items()}
except Exception:
pass
# Default built-in database
return {
1: "Batman",
2: "Gandalf",
3: "Wyldstyle",
4: "Aquaman",
5: "Bad Cop",
6: "Bane",
7: "Bart Simpson",
8: "Benny",
9: "Chell",
10: "Cole",
11: "Cragger",
12: "Cyborg",
13: "Cyberman",
14: "Doc Brown",
15: "The Doctor",
16: "Emmet",
17: "Eris",
18: "Gimli",
19: "Gollum",
20: "Harley Quinn",
21: "Homer Simpson",
22: "Jay",
23: "Joker",
24: "Kai",
25: "ACU Trooper",
26: "Krusty the Clown",
27: "Laval",
28: "Legolas",
29: "Lloyd",
30: "Marty McFly",
31: "Nya",
32: "Owen Grady",
33: "Peter Venkman",
34: "Scooby-Doo",
35: "Sensei Wu",
36: "Shaggy",
37: "Slimer",
38: "Superman",
39: "Unikitty",
40: "Wicked Witch",
41: "Wonder Woman",
42: "Zane",
43: "Green Arrow",
44: "Supergirl",
45: "Abby Yates",
46: "Finn the Human",
47: "Harry Potter",
48: "Ethan Hunt",
49: "Lumpy Space Princess",
50: "Jake the Dog",
51: "B.A. Baracus",
52: "Michael Knight",
53: "Gizmo",
54: "Sonic the Hedgehog",
55: "Gamer Kid",
56: "E.T.",
57: "Lord Voldemort",
58: "Hermione Granger",
59: "Newt Scamander",
60: "Tina Goldstein",
61: "Stripe",
62: "Sloth",
63: "Beetlejuice",
64: "Kevin",
65: "Erin Gilbert",
66: "Patty Tolan",
67: "Jillian Holtzmann",
68: "Stay Puft",
69: "Chase McCain",
70: "Excalibur Batman",
71: "Raven",
72: "Beast Boy",
73: "Starfire",
74: "Blossom",
75: "Bubbles",
76: "Buttercup",
77: "Marceline",
78: "Batgirl",
79: "Robin",
80: "Knight Rider (KITT)",
81: "Gremlins Gizmo",
82: "Teen Titans Go!",
}
def _load_vehicle_database(self, db_path: Optional[str] = None) -> Dict[int, str]:
"""Load vehicle ID database."""
if db_path and os.path.exists(db_path):
try:
with open(db_path, 'r') as f:
data = json.load(f)
return {int(k): v for k, v in data.get('vehicles', {}).items()}
except Exception:
pass
return {
1: "Batmobile",
2: "Batwing",
3: "Bat Blaster",
4: "DeLorean Time Machine",
5: "Hoverboard",
6: "Travelling Time Train",
7: "TARDIS",
8: "K9",
9: "Dalek",
10: "Companion Cube",
11: "Sentry Turret",
12: "GLaDOS",
13: "Mystery Machine",
14: "Scooby Snack",
15: "Gyrocopter",
16: "Ghost Trap",
17: "Ecto-1",
18: "PKE Meter",
19: "Emmet's Excavator",
20: "Emmet's Car",
21: "Duplo Brickster",
22: "Hover Bike",
23: "Cyber Guard",
24: "Cyber Wraith",
25: "Blade Bike",
26: "Flying White Dragon",
27: "Golden Dragon",
28: "Mega Flight Dragon",
29: "Shadowmeld",
30: "Swamp Skimmer",
31: "Cragger's Fireship",
32: "Eagle Interceptor",
33: "Eagle Sky Blazer",
34: "Eagle Swoop Diver",
35: "Gyrosphere",
36: "ACU Trooper Car",
37: "T. Rex",
38: "Homer's Car",
39: "Taunt-o-Vision",
40: "The Simpsons House",
41: "Krusty's Bike",
42: "Clown Bike",
43: "Frink's Hover Car",
44: "Gravity Sprinter",
45: "Street Shredder",
46: "Sky Clobberer",
47: "Invisible Jet",
48: "Justice Camper",
49: "Electro Bolt",
50: "Arrow Launcher",
51: "Drill Driver",
52: "Bane Dig Dig",
53: "Ancient Psychic Tandem War Elephant",
54: "Jakemobile",
55: "Lumpy Car",
56: "Ecto-1 (2016)",
57: "Ectozer",
58: "PerfEcto",
59: "B.A.'s Van",
60: "B.A.'s Super Van",
61: "Pain Plane",
62: "Sonic Speedster",
63: "Blue Typhoon",
64: "Moto Bug",
65: "IMF Scrambler",
66: "IMF Sport Car",
67: "IMF Tank",
68: "K.I.T.T.",
69: "Gozer Trap",
70: "Terror Dog",
71: "Gremlin Car",
72: "Flash 'n' Finish",
73: "Rampage Wrecking",
74: "Phone Home",
75: "Mobile Uplink",
76: "Super Charged Satellite",
77: "Enchanted Car",
78: "Harry's Broom",
79: "Newt's Case",
80: "Swooping Evil",
81: "Occamy",
82: "Niffler",
}
def connect(self) -> bool:
"""
Establish connection to LEGO Dimensions portal.
Returns True if connection successful.
"""
try:
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if self.dev is None:
raise ConnectionError(
"LEGO Dimensions Portal not found.\n"
"Ensure:\n"
" 1. Portal is connected via USB\n"
" 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n"
" 3. WinUSB driver installed via Zadig"
)
# Detach kernel driver if active (Linux)
try:
if self.dev.is_kernel_driver_active(0):
self.dev.detach_kernel_driver(0)
except (AttributeError, usb.core.USBError):
pass # Not available on Windows
# Configure device
self.dev.set_configuration()
# Send initialization command
self.dev.write(1, TOYPAD_INIT)
# Get device info
try:
product = usb.util.get_string(self.dev, self.dev.iProduct)
print(f"Connected to: {product}")
except Exception:
print("Connected to LEGO Dimensions Portal")
if self.on_connect:
self.on_connect()
return True
except usb.core.USBError as e:
error_msg = str(e)
if "Access denied" in error_msg or "insufficient permissions" in error_msg.lower():
print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.")
if self.on_error:
self.on_error(e)
return False
except Exception as e:
if self.on_error:
self.on_error(e)
return False
def disconnect(self):
"""Disconnect from portal and clean up resources."""
self.stop()
if self.dev:
try:
# Turn off all LEDs
self.set_pad_color(Pad.ALL, COLORS['OFF'])
usb.util.dispose_resources(self.dev)
except Exception:
pass
self.dev = None
if self.on_disconnect:
self.on_disconnect()
def _calculate_checksum(self, command: list) -> int:
"""Calculate checksum (sum of bytes mod 256)"""
return sum(command) % 256
def _send_command(self, command: list):
"""Send a command to the portal with checksum and padding."""
if not self.dev:
return
checksum = self._calculate_checksum(command)
message = command + [checksum]
# Pad to 32 bytes
while len(message) < 32:
message.append(0x00)
try:
with self._lock:
self.dev.write(1, message)
except usb.core.USBError:
pass # Ignore write errors
def set_pad_color(self, pad: Pad, color: list):
"""
Set a pad to a specific color immediately.
Args:
pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT)
color: [R, G, B] values 0-255
"""
command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]]
self._send_command(command)
def flash_pad(self, pad: Pad, color: list, on_time: int = 10,
off_time: int = 10, count: int = 5):
"""
Flash a pad with a color.
Args:
pad: Which pad to flash
color: [R, G, B] values
on_time: Ticks for LED on state (1 tick ≈ 50ms)
off_time: Ticks for LED off state
count: Number of flashes (255 = infinite)
"""
command = [
0x55, 0x0e, 0xc8, 0x06, pad.value,
color[0], color[1], color[2],
on_time, off_time, count,
0, 0, 0, 0, 0 # Return color (off)
]
self._send_command(command)
def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1):
"""
Fade a pad to a color.
Args:
pad: Which pad to fade
color: [R, G, B] target color
speed: Fade speed (higher = slower)
count: Number of fade pulses (255 = infinite)
"""
command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count,
color[0], color[1], color[2]]
self._send_command(command)
def rainbow_cycle(self, pad: Pad = Pad.ALL, speed: int = 5):
"""
Start a rainbow color cycle on a pad.
Args:
pad: Which pad to animate
speed: Cycle speed (1-20, lower = faster)
"""
# Cycle through colors using fade
colors_cycle = [
COLORS['RED'], COLORS['ORANGE'], COLORS['YELLOW'],
COLORS['GREEN'], COLORS['CYAN'], COLORS['BLUE'],
COLORS['PURPLE'], COLORS['MAGENTA']
]
for color in colors_cycle:
self.fade_pad(pad, color, speed=speed, count=1)
time.sleep(speed * 0.05)
def _poll_events(self):
"""Background thread function for polling tag events."""
while self._running:
try:
# Read from IN endpoint with timeout
in_packet = self.dev.read(0x81, 32, timeout=100)
bytelist = list(in_packet)
if not bytelist:
continue
# NFC tag events start with 0x56
if bytelist[0] != 0x56:
continue
# Parse tag event
pad_num = bytelist[2]
action = bytelist[5]
uid = bytes(bytelist[6:13])
# Validate UID (not all zeros)
if uid == b'\x00' * 7:
continue
try:
pad = Pad(pad_num)
except ValueError:
pad = Pad.CENTER
event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED
tag_info = TagInfo(uid=uid, pad=pad, event=event)
# Track active tags and fire callbacks
if event == TagEvent.INSERTED:
self._active_tags[pad_num] = tag_info
# Visual feedback - green flash for detected tag
self.flash_pad(pad, COLORS['GREEN'], on_time=5, off_time=5, count=2)
time.sleep(0.1)
self.set_pad_color(pad, COLORS['CYAN'])
if self.on_tag_insert:
self.on_tag_insert(tag_info)
else:
if pad_num in self._active_tags:
del self._active_tags[pad_num]
# Turn off pad LED
self.set_pad_color(pad, COLORS['OFF'])
if self.on_tag_remove:
self.on_tag_remove(tag_info)
except usb.core.USBError as e:
if e.errno == 110 or "timeout" in str(e).lower(): # Timeout - normal
continue
elif self._running:
if self.on_error:
self.on_error(e)
time.sleep(0.5)
except Exception as e:
if self._running and self.on_error:
self.on_error(e)
time.sleep(0.1)
def start(self):
"""Start the event polling thread."""
if self._running:
return
if not self.dev:
if not self.connect():
raise ConnectionError("Failed to connect to portal")
self._running = True
self._thread = threading.Thread(target=self._poll_events, daemon=True)
self._thread.start()
# Initial LED animation to show portal is active
for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]:
self.flash_pad(pad, COLORS['BLUE'], on_time=3, off_time=3, count=2)
time.sleep(0.1)
def stop(self):
"""Stop the event polling thread."""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
self._thread = None
def get_active_tags(self) -> Dict[int, TagInfo]:
"""Get dictionary of currently active tags on pads."""
return self._active_tags.copy()
def get_character_name(self, character_id: int) -> Optional[str]:
"""Look up character name from character ID."""
return self._character_db.get(character_id)
def get_vehicle_name(self, vehicle_id: int) -> Optional[str]:
"""Look up vehicle name from vehicle ID."""
return self._vehicle_db.get(vehicle_id)
@property
def is_connected(self) -> bool:
"""Check if portal is connected."""
return self.dev is not None
@property
def is_running(self) -> bool:
"""Check if event polling is active."""
return self._running
# =============================================================================
# TEA Encryption/Decryption for Character ID
# Based on community reverse-engineering (ags131, bettse, socram8888)
# =============================================================================
def tea_decrypt(data: bytes, key: bytes) -> bytes:
"""
TEA (Tiny Encryption Algorithm) decryption.
Used to decrypt character IDs from tag pages 0x24-0x25.
"""
def _u32(x):
return c_uint32(x).value
delta = 0x9e3779b9
# Convert to 32-bit words
v0 = int.from_bytes(data[0:4], 'little')
v1 = int.from_bytes(data[4:8], 'little')
k0 = int.from_bytes(key[0:4], 'little')
k1 = int.from_bytes(key[4:8], 'little')
k2 = int.from_bytes(key[8:12], 'little')
k3 = int.from_bytes(key[12:16], 'little')
total = _u32(delta * 32)
for _ in range(32):
v1 = _u32(v1 - _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3)))
v0 = _u32(v0 - _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1)))
total = _u32(total - delta)
return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little')
def tea_encrypt(data: bytes, key: bytes) -> bytes:
"""
TEA (Tiny Encryption Algorithm) encryption.
Used to encrypt character IDs for writing to tags.
"""
def _u32(x):
return c_uint32(x).value
delta = 0x9e3779b9
v0 = int.from_bytes(data[0:4], 'little')
v1 = int.from_bytes(data[4:8], 'little')
k0 = int.from_bytes(key[0:4], 'little')
k1 = int.from_bytes(key[4:8], 'little')
k2 = int.from_bytes(key[8:12], 'little')
k3 = int.from_bytes(key[12:16], 'little')
total = 0
for _ in range(32):
total = _u32(total + delta)
v0 = _u32(v0 + _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1)))
v1 = _u32(v1 + _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3)))
return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little')
def generate_tea_key(uid: bytes) -> bytes:
"""
Generate TEA encryption key from tag UID.
The key is derived by scrambling the 7-byte UID into 16 bytes.
"""
key = bytearray(16)
key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA
key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
key[2] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA
key[3] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
key[4] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA
key[5] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
key[6] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA
key[7] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
key[8] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA
key[9] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
key[10] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA
key[11] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
key[12] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA
key[13] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
key[14] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA
key[15] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
return bytes(key)
def generate_password(uid: bytes) -> bytes:
"""
Generate NTAG213 password from 7-byte UID.
This password protects pages 0x24+ on the tag.
"""
pwd = bytearray(4)
pwd[0] = 0xAA ^ uid[1] ^ uid[3]
pwd[1] = 0x55 ^ uid[2] ^ uid[4]
pwd[2] = 0xAA ^ uid[3] ^ uid[5]
pwd[3] = 0x55 ^ uid[4] ^ uid[6]
return bytes(pwd)
def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int:
"""
Decrypt character ID from tag memory pages 0x24-0x25.
Args:
uid: 7-byte tag UID
encrypted_data: 8 bytes from pages 0x24-0x25
Returns:
Decrypted character ID integer
"""
key = generate_tea_key(uid)
decrypted = tea_decrypt(encrypted_data, key)
return int.from_bytes(decrypted[0:2], 'little')
def encrypt_character_id(uid: bytes, character_id: int) -> bytes:
"""
Encrypt character ID for writing to tag pages 0x24-0x25.
Args:
uid: 7-byte tag UID
character_id: Character ID to encrypt
Returns:
8 bytes of encrypted data
"""
key = generate_tea_key(uid)
# Character ID in first 2 bytes, rest is padding
data = character_id.to_bytes(2, 'little') + b'\x00' * 6
return tea_encrypt(data, key)
# =============================================================================
# Standalone test
# =============================================================================
if __name__ == "__main__":
def on_insert(tag: TagInfo):
print(f"\n{'='*50}")
print(f"✓ TAG PLACED on {tag.pad.name} pad")
print(f" UID: {tag.uid_hex}")
print(f" Password: {generate_password(tag.uid).hex().upper()}")
print(f"{'='*50}")
def on_remove(tag: TagInfo):
print(f"\n✗ TAG REMOVED from {tag.pad.name} pad")
def on_error(e: Exception):
print(f"\n⚠ ERROR: {e}")
def on_connect():
print("✓ Portal connected and initialized")
def on_disconnect():
print("✗ Portal disconnected")
print(f"\nLEGO Dimensions Portal Reader v{__version__}")
print("="*50)
reader = LegoDimensionsReader()
reader.on_tag_insert = on_insert
reader.on_tag_remove = on_remove
reader.on_error = on_error
reader.on_connect = on_connect
reader.on_disconnect = on_disconnect
try:
reader.start()
print("\nPortal Active - Place character discs...")
print("Press Ctrl+C to exit\n")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nShutting down...")
except ConnectionError as e:
print(f"\nConnection failed: {e}")
finally:
reader.disconnect()
print("Goodbye!")