Add main LEGO Dimensions portal reader module
This commit is contained in:
771
lego_dimensions_reader.py
Normal file
771
lego_dimensions_reader.py
Normal file
@@ -0,0 +1,771 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
LEGO Dimensions Portal Reader Module
|
||||
For Moonlight Drive-In Video Player System
|
||||
|
||||
Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions)
|
||||
to detect character and vehicle disc placement/removal events.
|
||||
|
||||
Requirements:
|
||||
pip install pyusb
|
||||
|
||||
Windows Driver Setup:
|
||||
1. Download Zadig from https://zadig.akeo.ie/
|
||||
2. Connect the LEGO Dimensions portal
|
||||
3. Run Zadig as administrator
|
||||
4. Select Options → List All Devices
|
||||
5. Choose "LEGO READER V2.10" from the dropdown
|
||||
6. Select WinUSB as the target driver
|
||||
7. Click "Replace Driver"
|
||||
"""
|
||||
|
||||
import usb.core
|
||||
import usb.util
|
||||
import threading
|
||||
import time
|
||||
import json
|
||||
import os
|
||||
from typing import Callable, Optional, Dict, Any
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from ctypes import c_uint32
|
||||
|
||||
# USB Device Identifiers
|
||||
VENDOR_ID = 0x0e6f
|
||||
PRODUCT_ID = 0x0241
|
||||
|
||||
# Module version
|
||||
__version__ = "1.0.0"
|
||||
|
||||
|
||||
class Pad(Enum):
|
||||
"""Portal pad identifiers"""
|
||||
ALL = 0
|
||||
CENTER = 1
|
||||
LEFT = 2
|
||||
RIGHT = 3
|
||||
|
||||
|
||||
class TagEvent(Enum):
|
||||
"""Tag event types"""
|
||||
INSERTED = 0
|
||||
REMOVED = 1
|
||||
|
||||
|
||||
# Predefined colors [R, G, B]
|
||||
COLORS = {
|
||||
'OFF': [0, 0, 0],
|
||||
'RED': [255, 0, 0],
|
||||
'GREEN': [0, 255, 0],
|
||||
'BLUE': [0, 0, 255],
|
||||
'WHITE': [255, 255, 255],
|
||||
'YELLOW': [255, 255, 0],
|
||||
'CYAN': [0, 255, 255],
|
||||
'MAGENTA': [255, 0, 255],
|
||||
'ORANGE': [255, 128, 0],
|
||||
'PURPLE': [128, 0, 255],
|
||||
'PINK': [255, 105, 180],
|
||||
}
|
||||
|
||||
# Portal initialization command (contains "(c) LEGO 2014" signature)
|
||||
TOYPAD_INIT = [
|
||||
0x55, 0x0f, 0xb0, 0x01, 0x28, 0x63, 0x29, 0x20,
|
||||
0x4c, 0x45, 0x47, 0x4f, 0x20, 0x32, 0x30, 0x31,
|
||||
0x34, 0xf7, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class TagInfo:
|
||||
"""Information about a detected NFC tag"""
|
||||
uid: bytes
|
||||
pad: Pad
|
||||
event: TagEvent
|
||||
uid_hex: str = ""
|
||||
character_id: Optional[int] = None
|
||||
character_name: Optional[str] = None
|
||||
is_vehicle: bool = False
|
||||
|
||||
def __post_init__(self):
|
||||
self.uid_hex = self.uid.hex().upper()
|
||||
|
||||
|
||||
class LegoDimensionsReader:
|
||||
"""
|
||||
LEGO Dimensions Portal NFC Reader
|
||||
|
||||
Provides event-driven tag detection with callbacks for integration
|
||||
with the Moonlight Drive-In video player system.
|
||||
|
||||
Usage:
|
||||
reader = LegoDimensionsReader()
|
||||
reader.on_tag_insert = lambda tag: print(f"Tag placed: {tag.uid_hex}")
|
||||
reader.on_tag_remove = lambda tag: print(f"Tag removed: {tag.uid_hex}")
|
||||
reader.start()
|
||||
"""
|
||||
|
||||
def __init__(self, character_db_path: Optional[str] = None):
|
||||
"""
|
||||
Initialize the LEGO Dimensions reader.
|
||||
|
||||
Args:
|
||||
character_db_path: Optional path to character_database.json
|
||||
"""
|
||||
self.dev = None
|
||||
self._running = False
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# Active tags currently on pads (pad_num -> TagInfo)
|
||||
self._active_tags: Dict[int, TagInfo] = {}
|
||||
|
||||
# Callbacks
|
||||
self.on_tag_insert: Optional[Callable[[TagInfo], None]] = None
|
||||
self.on_tag_remove: Optional[Callable[[TagInfo], None]] = None
|
||||
self.on_error: Optional[Callable[[Exception], None]] = None
|
||||
self.on_connect: Optional[Callable[[], None]] = None
|
||||
self.on_disconnect: Optional[Callable[[], None]] = None
|
||||
|
||||
# Character database for ID lookup
|
||||
self._character_db = self._load_character_database(character_db_path)
|
||||
self._vehicle_db = self._load_vehicle_database(character_db_path)
|
||||
|
||||
def _load_character_database(self, db_path: Optional[str] = None) -> Dict[int, str]:
|
||||
"""Load character ID database."""
|
||||
# Try to load from JSON file if provided
|
||||
if db_path and os.path.exists(db_path):
|
||||
try:
|
||||
with open(db_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
return {int(k): v for k, v in data.get('characters', {}).items()}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Default built-in database
|
||||
return {
|
||||
1: "Batman",
|
||||
2: "Gandalf",
|
||||
3: "Wyldstyle",
|
||||
4: "Aquaman",
|
||||
5: "Bad Cop",
|
||||
6: "Bane",
|
||||
7: "Bart Simpson",
|
||||
8: "Benny",
|
||||
9: "Chell",
|
||||
10: "Cole",
|
||||
11: "Cragger",
|
||||
12: "Cyborg",
|
||||
13: "Cyberman",
|
||||
14: "Doc Brown",
|
||||
15: "The Doctor",
|
||||
16: "Emmet",
|
||||
17: "Eris",
|
||||
18: "Gimli",
|
||||
19: "Gollum",
|
||||
20: "Harley Quinn",
|
||||
21: "Homer Simpson",
|
||||
22: "Jay",
|
||||
23: "Joker",
|
||||
24: "Kai",
|
||||
25: "ACU Trooper",
|
||||
26: "Krusty the Clown",
|
||||
27: "Laval",
|
||||
28: "Legolas",
|
||||
29: "Lloyd",
|
||||
30: "Marty McFly",
|
||||
31: "Nya",
|
||||
32: "Owen Grady",
|
||||
33: "Peter Venkman",
|
||||
34: "Scooby-Doo",
|
||||
35: "Sensei Wu",
|
||||
36: "Shaggy",
|
||||
37: "Slimer",
|
||||
38: "Superman",
|
||||
39: "Unikitty",
|
||||
40: "Wicked Witch",
|
||||
41: "Wonder Woman",
|
||||
42: "Zane",
|
||||
43: "Green Arrow",
|
||||
44: "Supergirl",
|
||||
45: "Abby Yates",
|
||||
46: "Finn the Human",
|
||||
47: "Harry Potter",
|
||||
48: "Ethan Hunt",
|
||||
49: "Lumpy Space Princess",
|
||||
50: "Jake the Dog",
|
||||
51: "B.A. Baracus",
|
||||
52: "Michael Knight",
|
||||
53: "Gizmo",
|
||||
54: "Sonic the Hedgehog",
|
||||
55: "Gamer Kid",
|
||||
56: "E.T.",
|
||||
57: "Lord Voldemort",
|
||||
58: "Hermione Granger",
|
||||
59: "Newt Scamander",
|
||||
60: "Tina Goldstein",
|
||||
61: "Stripe",
|
||||
62: "Sloth",
|
||||
63: "Beetlejuice",
|
||||
64: "Kevin",
|
||||
65: "Erin Gilbert",
|
||||
66: "Patty Tolan",
|
||||
67: "Jillian Holtzmann",
|
||||
68: "Stay Puft",
|
||||
69: "Chase McCain",
|
||||
70: "Excalibur Batman",
|
||||
71: "Raven",
|
||||
72: "Beast Boy",
|
||||
73: "Starfire",
|
||||
74: "Blossom",
|
||||
75: "Bubbles",
|
||||
76: "Buttercup",
|
||||
77: "Marceline",
|
||||
78: "Batgirl",
|
||||
79: "Robin",
|
||||
80: "Knight Rider (KITT)",
|
||||
81: "Gremlins Gizmo",
|
||||
82: "Teen Titans Go!",
|
||||
}
|
||||
|
||||
def _load_vehicle_database(self, db_path: Optional[str] = None) -> Dict[int, str]:
|
||||
"""Load vehicle ID database."""
|
||||
if db_path and os.path.exists(db_path):
|
||||
try:
|
||||
with open(db_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
return {int(k): v for k, v in data.get('vehicles', {}).items()}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
1: "Batmobile",
|
||||
2: "Batwing",
|
||||
3: "Bat Blaster",
|
||||
4: "DeLorean Time Machine",
|
||||
5: "Hoverboard",
|
||||
6: "Travelling Time Train",
|
||||
7: "TARDIS",
|
||||
8: "K9",
|
||||
9: "Dalek",
|
||||
10: "Companion Cube",
|
||||
11: "Sentry Turret",
|
||||
12: "GLaDOS",
|
||||
13: "Mystery Machine",
|
||||
14: "Scooby Snack",
|
||||
15: "Gyrocopter",
|
||||
16: "Ghost Trap",
|
||||
17: "Ecto-1",
|
||||
18: "PKE Meter",
|
||||
19: "Emmet's Excavator",
|
||||
20: "Emmet's Car",
|
||||
21: "Duplo Brickster",
|
||||
22: "Hover Bike",
|
||||
23: "Cyber Guard",
|
||||
24: "Cyber Wraith",
|
||||
25: "Blade Bike",
|
||||
26: "Flying White Dragon",
|
||||
27: "Golden Dragon",
|
||||
28: "Mega Flight Dragon",
|
||||
29: "Shadowmeld",
|
||||
30: "Swamp Skimmer",
|
||||
31: "Cragger's Fireship",
|
||||
32: "Eagle Interceptor",
|
||||
33: "Eagle Sky Blazer",
|
||||
34: "Eagle Swoop Diver",
|
||||
35: "Gyrosphere",
|
||||
36: "ACU Trooper Car",
|
||||
37: "T. Rex",
|
||||
38: "Homer's Car",
|
||||
39: "Taunt-o-Vision",
|
||||
40: "The Simpsons House",
|
||||
41: "Krusty's Bike",
|
||||
42: "Clown Bike",
|
||||
43: "Frink's Hover Car",
|
||||
44: "Gravity Sprinter",
|
||||
45: "Street Shredder",
|
||||
46: "Sky Clobberer",
|
||||
47: "Invisible Jet",
|
||||
48: "Justice Camper",
|
||||
49: "Electro Bolt",
|
||||
50: "Arrow Launcher",
|
||||
51: "Drill Driver",
|
||||
52: "Bane Dig Dig",
|
||||
53: "Ancient Psychic Tandem War Elephant",
|
||||
54: "Jakemobile",
|
||||
55: "Lumpy Car",
|
||||
56: "Ecto-1 (2016)",
|
||||
57: "Ectozer",
|
||||
58: "PerfEcto",
|
||||
59: "B.A.'s Van",
|
||||
60: "B.A.'s Super Van",
|
||||
61: "Pain Plane",
|
||||
62: "Sonic Speedster",
|
||||
63: "Blue Typhoon",
|
||||
64: "Moto Bug",
|
||||
65: "IMF Scrambler",
|
||||
66: "IMF Sport Car",
|
||||
67: "IMF Tank",
|
||||
68: "K.I.T.T.",
|
||||
69: "Gozer Trap",
|
||||
70: "Terror Dog",
|
||||
71: "Gremlin Car",
|
||||
72: "Flash 'n' Finish",
|
||||
73: "Rampage Wrecking",
|
||||
74: "Phone Home",
|
||||
75: "Mobile Uplink",
|
||||
76: "Super Charged Satellite",
|
||||
77: "Enchanted Car",
|
||||
78: "Harry's Broom",
|
||||
79: "Newt's Case",
|
||||
80: "Swooping Evil",
|
||||
81: "Occamy",
|
||||
82: "Niffler",
|
||||
}
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""
|
||||
Establish connection to LEGO Dimensions portal.
|
||||
Returns True if connection successful.
|
||||
"""
|
||||
try:
|
||||
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
|
||||
|
||||
if self.dev is None:
|
||||
raise ConnectionError(
|
||||
"LEGO Dimensions Portal not found.\n"
|
||||
"Ensure:\n"
|
||||
" 1. Portal is connected via USB\n"
|
||||
" 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n"
|
||||
" 3. WinUSB driver installed via Zadig"
|
||||
)
|
||||
|
||||
# Detach kernel driver if active (Linux)
|
||||
try:
|
||||
if self.dev.is_kernel_driver_active(0):
|
||||
self.dev.detach_kernel_driver(0)
|
||||
except (AttributeError, usb.core.USBError):
|
||||
pass # Not available on Windows
|
||||
|
||||
# Configure device
|
||||
self.dev.set_configuration()
|
||||
|
||||
# Send initialization command
|
||||
self.dev.write(1, TOYPAD_INIT)
|
||||
|
||||
# Get device info
|
||||
try:
|
||||
product = usb.util.get_string(self.dev, self.dev.iProduct)
|
||||
print(f"Connected to: {product}")
|
||||
except Exception:
|
||||
print("Connected to LEGO Dimensions Portal")
|
||||
|
||||
if self.on_connect:
|
||||
self.on_connect()
|
||||
|
||||
return True
|
||||
|
||||
except usb.core.USBError as e:
|
||||
error_msg = str(e)
|
||||
if "Access denied" in error_msg or "insufficient permissions" in error_msg.lower():
|
||||
print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.")
|
||||
if self.on_error:
|
||||
self.on_error(e)
|
||||
return False
|
||||
except Exception as e:
|
||||
if self.on_error:
|
||||
self.on_error(e)
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from portal and clean up resources."""
|
||||
self.stop()
|
||||
if self.dev:
|
||||
try:
|
||||
# Turn off all LEDs
|
||||
self.set_pad_color(Pad.ALL, COLORS['OFF'])
|
||||
usb.util.dispose_resources(self.dev)
|
||||
except Exception:
|
||||
pass
|
||||
self.dev = None
|
||||
if self.on_disconnect:
|
||||
self.on_disconnect()
|
||||
|
||||
def _calculate_checksum(self, command: list) -> int:
|
||||
"""Calculate checksum (sum of bytes mod 256)"""
|
||||
return sum(command) % 256
|
||||
|
||||
def _send_command(self, command: list):
|
||||
"""Send a command to the portal with checksum and padding."""
|
||||
if not self.dev:
|
||||
return
|
||||
|
||||
checksum = self._calculate_checksum(command)
|
||||
message = command + [checksum]
|
||||
|
||||
# Pad to 32 bytes
|
||||
while len(message) < 32:
|
||||
message.append(0x00)
|
||||
|
||||
try:
|
||||
with self._lock:
|
||||
self.dev.write(1, message)
|
||||
except usb.core.USBError:
|
||||
pass # Ignore write errors
|
||||
|
||||
def set_pad_color(self, pad: Pad, color: list):
|
||||
"""
|
||||
Set a pad to a specific color immediately.
|
||||
|
||||
Args:
|
||||
pad: Which pad (Pad.ALL, Pad.CENTER, Pad.LEFT, Pad.RIGHT)
|
||||
color: [R, G, B] values 0-255
|
||||
"""
|
||||
command = [0x55, 0x06, 0xc0, 0x02, pad.value, color[0], color[1], color[2]]
|
||||
self._send_command(command)
|
||||
|
||||
def flash_pad(self, pad: Pad, color: list, on_time: int = 10,
|
||||
off_time: int = 10, count: int = 5):
|
||||
"""
|
||||
Flash a pad with a color.
|
||||
|
||||
Args:
|
||||
pad: Which pad to flash
|
||||
color: [R, G, B] values
|
||||
on_time: Ticks for LED on state (1 tick ≈ 50ms)
|
||||
off_time: Ticks for LED off state
|
||||
count: Number of flashes (255 = infinite)
|
||||
"""
|
||||
command = [
|
||||
0x55, 0x0e, 0xc8, 0x06, pad.value,
|
||||
color[0], color[1], color[2],
|
||||
on_time, off_time, count,
|
||||
0, 0, 0, 0, 0 # Return color (off)
|
||||
]
|
||||
self._send_command(command)
|
||||
|
||||
def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1):
|
||||
"""
|
||||
Fade a pad to a color.
|
||||
|
||||
Args:
|
||||
pad: Which pad to fade
|
||||
color: [R, G, B] target color
|
||||
speed: Fade speed (higher = slower)
|
||||
count: Number of fade pulses (255 = infinite)
|
||||
"""
|
||||
command = [0x55, 0x08, 0xc2, 0x04, pad.value, speed, count,
|
||||
color[0], color[1], color[2]]
|
||||
self._send_command(command)
|
||||
|
||||
def rainbow_cycle(self, pad: Pad = Pad.ALL, speed: int = 5):
|
||||
"""
|
||||
Start a rainbow color cycle on a pad.
|
||||
|
||||
Args:
|
||||
pad: Which pad to animate
|
||||
speed: Cycle speed (1-20, lower = faster)
|
||||
"""
|
||||
# Cycle through colors using fade
|
||||
colors_cycle = [
|
||||
COLORS['RED'], COLORS['ORANGE'], COLORS['YELLOW'],
|
||||
COLORS['GREEN'], COLORS['CYAN'], COLORS['BLUE'],
|
||||
COLORS['PURPLE'], COLORS['MAGENTA']
|
||||
]
|
||||
for color in colors_cycle:
|
||||
self.fade_pad(pad, color, speed=speed, count=1)
|
||||
time.sleep(speed * 0.05)
|
||||
|
||||
def _poll_events(self):
|
||||
"""Background thread function for polling tag events."""
|
||||
while self._running:
|
||||
try:
|
||||
# Read from IN endpoint with timeout
|
||||
in_packet = self.dev.read(0x81, 32, timeout=100)
|
||||
bytelist = list(in_packet)
|
||||
|
||||
if not bytelist:
|
||||
continue
|
||||
|
||||
# NFC tag events start with 0x56
|
||||
if bytelist[0] != 0x56:
|
||||
continue
|
||||
|
||||
# Parse tag event
|
||||
pad_num = bytelist[2]
|
||||
action = bytelist[5]
|
||||
uid = bytes(bytelist[6:13])
|
||||
|
||||
# Validate UID (not all zeros)
|
||||
if uid == b'\x00' * 7:
|
||||
continue
|
||||
|
||||
try:
|
||||
pad = Pad(pad_num)
|
||||
except ValueError:
|
||||
pad = Pad.CENTER
|
||||
|
||||
event = TagEvent.INSERTED if action == 0 else TagEvent.REMOVED
|
||||
tag_info = TagInfo(uid=uid, pad=pad, event=event)
|
||||
|
||||
# Track active tags and fire callbacks
|
||||
if event == TagEvent.INSERTED:
|
||||
self._active_tags[pad_num] = tag_info
|
||||
|
||||
# Visual feedback - green flash for detected tag
|
||||
self.flash_pad(pad, COLORS['GREEN'], on_time=5, off_time=5, count=2)
|
||||
time.sleep(0.1)
|
||||
self.set_pad_color(pad, COLORS['CYAN'])
|
||||
|
||||
if self.on_tag_insert:
|
||||
self.on_tag_insert(tag_info)
|
||||
else:
|
||||
if pad_num in self._active_tags:
|
||||
del self._active_tags[pad_num]
|
||||
|
||||
# Turn off pad LED
|
||||
self.set_pad_color(pad, COLORS['OFF'])
|
||||
|
||||
if self.on_tag_remove:
|
||||
self.on_tag_remove(tag_info)
|
||||
|
||||
except usb.core.USBError as e:
|
||||
if e.errno == 110 or "timeout" in str(e).lower(): # Timeout - normal
|
||||
continue
|
||||
elif self._running:
|
||||
if self.on_error:
|
||||
self.on_error(e)
|
||||
time.sleep(0.5)
|
||||
except Exception as e:
|
||||
if self._running and self.on_error:
|
||||
self.on_error(e)
|
||||
time.sleep(0.1)
|
||||
|
||||
def start(self):
|
||||
"""Start the event polling thread."""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
if not self.dev:
|
||||
if not self.connect():
|
||||
raise ConnectionError("Failed to connect to portal")
|
||||
|
||||
self._running = True
|
||||
self._thread = threading.Thread(target=self._poll_events, daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
# Initial LED animation to show portal is active
|
||||
for pad in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]:
|
||||
self.flash_pad(pad, COLORS['BLUE'], on_time=3, off_time=3, count=2)
|
||||
time.sleep(0.1)
|
||||
|
||||
def stop(self):
|
||||
"""Stop the event polling thread."""
|
||||
self._running = False
|
||||
if self._thread:
|
||||
self._thread.join(timeout=2.0)
|
||||
self._thread = None
|
||||
|
||||
def get_active_tags(self) -> Dict[int, TagInfo]:
|
||||
"""Get dictionary of currently active tags on pads."""
|
||||
return self._active_tags.copy()
|
||||
|
||||
def get_character_name(self, character_id: int) -> Optional[str]:
|
||||
"""Look up character name from character ID."""
|
||||
return self._character_db.get(character_id)
|
||||
|
||||
def get_vehicle_name(self, vehicle_id: int) -> Optional[str]:
|
||||
"""Look up vehicle name from vehicle ID."""
|
||||
return self._vehicle_db.get(vehicle_id)
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""Check if portal is connected."""
|
||||
return self.dev is not None
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""Check if event polling is active."""
|
||||
return self._running
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# TEA Encryption/Decryption for Character ID
|
||||
# Based on community reverse-engineering (ags131, bettse, socram8888)
|
||||
# =============================================================================
|
||||
|
||||
def tea_decrypt(data: bytes, key: bytes) -> bytes:
|
||||
"""
|
||||
TEA (Tiny Encryption Algorithm) decryption.
|
||||
Used to decrypt character IDs from tag pages 0x24-0x25.
|
||||
"""
|
||||
def _u32(x):
|
||||
return c_uint32(x).value
|
||||
|
||||
delta = 0x9e3779b9
|
||||
|
||||
# Convert to 32-bit words
|
||||
v0 = int.from_bytes(data[0:4], 'little')
|
||||
v1 = int.from_bytes(data[4:8], 'little')
|
||||
k0 = int.from_bytes(key[0:4], 'little')
|
||||
k1 = int.from_bytes(key[4:8], 'little')
|
||||
k2 = int.from_bytes(key[8:12], 'little')
|
||||
k3 = int.from_bytes(key[12:16], 'little')
|
||||
|
||||
total = _u32(delta * 32)
|
||||
|
||||
for _ in range(32):
|
||||
v1 = _u32(v1 - _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3)))
|
||||
v0 = _u32(v0 - _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1)))
|
||||
total = _u32(total - delta)
|
||||
|
||||
return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little')
|
||||
|
||||
|
||||
def tea_encrypt(data: bytes, key: bytes) -> bytes:
|
||||
"""
|
||||
TEA (Tiny Encryption Algorithm) encryption.
|
||||
Used to encrypt character IDs for writing to tags.
|
||||
"""
|
||||
def _u32(x):
|
||||
return c_uint32(x).value
|
||||
|
||||
delta = 0x9e3779b9
|
||||
|
||||
v0 = int.from_bytes(data[0:4], 'little')
|
||||
v1 = int.from_bytes(data[4:8], 'little')
|
||||
k0 = int.from_bytes(key[0:4], 'little')
|
||||
k1 = int.from_bytes(key[4:8], 'little')
|
||||
k2 = int.from_bytes(key[8:12], 'little')
|
||||
k3 = int.from_bytes(key[12:16], 'little')
|
||||
|
||||
total = 0
|
||||
|
||||
for _ in range(32):
|
||||
total = _u32(total + delta)
|
||||
v0 = _u32(v0 + _u32((_u32(v1 << 4) + k0) ^ _u32(v1 + total) ^ _u32((v1 >> 5) + k1)))
|
||||
v1 = _u32(v1 + _u32((_u32(v0 << 4) + k2) ^ _u32(v0 + total) ^ _u32((v0 >> 5) + k3)))
|
||||
|
||||
return v0.to_bytes(4, 'little') + v1.to_bytes(4, 'little')
|
||||
|
||||
|
||||
def generate_tea_key(uid: bytes) -> bytes:
|
||||
"""
|
||||
Generate TEA encryption key from tag UID.
|
||||
The key is derived by scrambling the 7-byte UID into 16 bytes.
|
||||
"""
|
||||
key = bytearray(16)
|
||||
key[0] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA
|
||||
key[1] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
|
||||
key[2] = (uid[1] ^ uid[3] ^ 0xAA) | 0xAA
|
||||
key[3] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
|
||||
key[4] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA
|
||||
key[5] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
|
||||
key[6] = (uid[3] ^ uid[5] ^ 0xAA) | 0xAA
|
||||
key[7] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
|
||||
key[8] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA
|
||||
key[9] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
|
||||
key[10] = (uid[1] ^ uid[3] ^ 0xAA) & 0xAA
|
||||
key[11] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
|
||||
key[12] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA
|
||||
key[13] = (uid[2] ^ uid[4] ^ 0x55) | 0x55
|
||||
key[14] = (uid[3] ^ uid[5] ^ 0xAA) & 0xAA
|
||||
key[15] = (uid[2] ^ uid[4] ^ 0x55) & 0x55
|
||||
return bytes(key)
|
||||
|
||||
|
||||
def generate_password(uid: bytes) -> bytes:
|
||||
"""
|
||||
Generate NTAG213 password from 7-byte UID.
|
||||
This password protects pages 0x24+ on the tag.
|
||||
"""
|
||||
pwd = bytearray(4)
|
||||
pwd[0] = 0xAA ^ uid[1] ^ uid[3]
|
||||
pwd[1] = 0x55 ^ uid[2] ^ uid[4]
|
||||
pwd[2] = 0xAA ^ uid[3] ^ uid[5]
|
||||
pwd[3] = 0x55 ^ uid[4] ^ uid[6]
|
||||
return bytes(pwd)
|
||||
|
||||
|
||||
def decrypt_character_id(uid: bytes, encrypted_data: bytes) -> int:
|
||||
"""
|
||||
Decrypt character ID from tag memory pages 0x24-0x25.
|
||||
|
||||
Args:
|
||||
uid: 7-byte tag UID
|
||||
encrypted_data: 8 bytes from pages 0x24-0x25
|
||||
|
||||
Returns:
|
||||
Decrypted character ID integer
|
||||
"""
|
||||
key = generate_tea_key(uid)
|
||||
decrypted = tea_decrypt(encrypted_data, key)
|
||||
return int.from_bytes(decrypted[0:2], 'little')
|
||||
|
||||
|
||||
def encrypt_character_id(uid: bytes, character_id: int) -> bytes:
|
||||
"""
|
||||
Encrypt character ID for writing to tag pages 0x24-0x25.
|
||||
|
||||
Args:
|
||||
uid: 7-byte tag UID
|
||||
character_id: Character ID to encrypt
|
||||
|
||||
Returns:
|
||||
8 bytes of encrypted data
|
||||
"""
|
||||
key = generate_tea_key(uid)
|
||||
# Character ID in first 2 bytes, rest is padding
|
||||
data = character_id.to_bytes(2, 'little') + b'\x00' * 6
|
||||
return tea_encrypt(data, key)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Standalone test
|
||||
# =============================================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
def on_insert(tag: TagInfo):
|
||||
print(f"\n{'='*50}")
|
||||
print(f"✓ TAG PLACED on {tag.pad.name} pad")
|
||||
print(f" UID: {tag.uid_hex}")
|
||||
print(f" Password: {generate_password(tag.uid).hex().upper()}")
|
||||
print(f"{'='*50}")
|
||||
|
||||
def on_remove(tag: TagInfo):
|
||||
print(f"\n✗ TAG REMOVED from {tag.pad.name} pad")
|
||||
|
||||
def on_error(e: Exception):
|
||||
print(f"\n⚠ ERROR: {e}")
|
||||
|
||||
def on_connect():
|
||||
print("✓ Portal connected and initialized")
|
||||
|
||||
def on_disconnect():
|
||||
print("✗ Portal disconnected")
|
||||
|
||||
print(f"\nLEGO Dimensions Portal Reader v{__version__}")
|
||||
print("="*50)
|
||||
|
||||
reader = LegoDimensionsReader()
|
||||
reader.on_tag_insert = on_insert
|
||||
reader.on_tag_remove = on_remove
|
||||
reader.on_error = on_error
|
||||
reader.on_connect = on_connect
|
||||
reader.on_disconnect = on_disconnect
|
||||
|
||||
try:
|
||||
reader.start()
|
||||
print("\nPortal Active - Place character discs...")
|
||||
print("Press Ctrl+C to exit\n")
|
||||
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nShutting down...")
|
||||
except ConnectionError as e:
|
||||
print(f"\nConnection failed: {e}")
|
||||
finally:
|
||||
reader.disconnect()
|
||||
print("Goodbye!")
|
||||
Reference in New Issue
Block a user