Fix flash_pad with software-based implementation (v1.2.1)

- Replace broken hardware 0xC8 flash command with software-based blinking
- Hardware flash causes color corruption on some portals
- Software flash uses rapid solid color toggling (works reliably)
- Add Windows USB backend setup for proper libusb detection
- Add proper endpoint discovery with fallbacks
- Add timeout/pipe error detection helpers
This commit is contained in:
2026-02-20 23:00:30 +11:00
parent a2fce69395
commit 62a61b2d36

View File

@@ -7,24 +7,63 @@ Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions)
to detect character and vehicle disc placement/removal events.
Requirements:
pip install pyusb
pip install pyusb libusb-package
Windows Driver Setup:
1. Download Zadig from https://zadig.akeo.ie/
2. Connect the LEGO Dimensions portal
3. Run Zadig as administrator
4. Select Options List All Devices
4. Select Options -> List All Devices
5. Choose "LEGO READER V2.10" from the dropdown
6. Select WinUSB as the target driver
7. Click "Replace Driver"
Windows libusb Setup:
The libusb-package provides pre-compiled libusb-1.0.dll.
Alternatively, download from https://libusb.info/ and place
libusb-1.0.dll in C:\\Windows\\System32
"""
import sys
import os
import platform
# Windows-specific backend setup - must be done BEFORE importing usb.core
def _setup_windows_backend():
"""Configure libusb backend for Windows."""
if platform.system() != 'Windows':
return
# Try libusb-package first (recommended)
try:
import libusb_package
return
except ImportError:
pass
# Search for libusb DLL in common locations
dll_paths = [
os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'System32'),
os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'SysWOW64'),
os.path.dirname(sys.executable),
os.getcwd(),
]
for path in dll_paths:
dll_file = os.path.join(path, 'libusb-1.0.dll')
if os.path.exists(dll_file):
os.environ['PATH'] = path + os.pathsep + os.environ.get('PATH', '')
return
# Run setup before USB imports
_setup_windows_backend()
import usb.core
import usb.util
import usb.backend.libusb1
import threading
import time
import json
import os
from typing import Callable, Optional, Dict, Any
from dataclasses import dataclass
from enum import Enum
@@ -35,7 +74,26 @@ VENDOR_ID = 0x0e6f
PRODUCT_ID = 0x0241
# Module version
__version__ = "1.0.0"
__version__ = "1.2.1"
def get_usb_backend():
"""Get the appropriate USB backend for the current platform."""
if platform.system() == 'Windows':
# Try libusb-package first
try:
import libusb_package
return libusb_package.get_libusb1_backend()
except (ImportError, Exception):
pass
# Try default libusb1 backend
try:
return usb.backend.libusb1.get_backend()
except Exception:
pass
return None # Use default backend
class Pad(Enum):
@@ -117,6 +175,11 @@ class LegoDimensionsReader:
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
# USB endpoints (will be discovered on connect)
self._endpoint_in = None
self._endpoint_out = None
self._interface = None
# Active tags currently on pads (pad_num -> TagInfo)
self._active_tags: Dict[int, TagInfo] = {}
@@ -136,7 +199,7 @@ class LegoDimensionsReader:
# Try to load from JSON file if provided
if db_path and os.path.exists(db_path):
try:
with open(db_path, 'r') as f:
with open(db_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return {int(k): v for k, v in data.get('characters', {}).items()}
except Exception:
@@ -232,7 +295,7 @@ class LegoDimensionsReader:
"""Load vehicle ID database."""
if db_path and os.path.exists(db_path):
try:
with open(db_path, 'r') as f:
with open(db_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return {int(k): v for k, v in data.get('vehicles', {}).items()}
except Exception:
@@ -323,35 +386,96 @@ class LegoDimensionsReader:
82: "Niffler",
}
def _is_timeout_error(self, error: usb.core.USBError) -> bool:
"""Check if error is a timeout (normal during polling)."""
error_str = str(error).lower()
# Windows timeout codes vary: -116, -7, 110, or string
return (
error.errno in (-116, -7, 110, None) or
'timeout' in error_str or
'operation timed out' in error_str or
'timed out' in error_str
)
def _is_pipe_error(self, error: usb.core.USBError) -> bool:
"""Check if error indicates device disconnection."""
return error.errno == -9 or 'pipe' in str(error).lower()
def connect(self) -> bool:
"""
Establish connection to LEGO Dimensions portal.
Returns True if connection successful.
"""
try:
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
# Get platform-appropriate backend
backend = get_usb_backend()
self.dev = usb.core.find(
idVendor=VENDOR_ID,
idProduct=PRODUCT_ID,
backend=backend
)
if self.dev is None:
raise ConnectionError(
error_msg = (
"LEGO Dimensions Portal not found.\n"
"Ensure:\n"
" 1. Portal is connected via USB\n"
" 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n"
" 3. WinUSB driver installed via Zadig"
)
if platform.system() == 'Windows':
error_msg += (
"\n 4. libusb installed: pip install libusb-package\n"
" 5. Or libusb-1.0.dll in System32"
)
raise ConnectionError(error_msg)
# Detach kernel driver if active (Linux)
# Detach kernel driver if active (Linux only)
try:
if self.dev.is_kernel_driver_active(0):
self.dev.detach_kernel_driver(0)
except (AttributeError, usb.core.USBError):
except (AttributeError, usb.core.USBError, NotImplementedError):
pass # Not available on Windows
# Configure device
self.dev.set_configuration()
try:
self.dev.set_configuration()
except usb.core.USBError as e:
# Ignore "Entity not found" - device already configured
if 'entity not found' not in str(e).lower():
raise
# Get configuration and interface
cfg = self.dev.get_active_configuration()
intf = cfg[(0, 0)]
self._interface = intf.bInterfaceNumber
# Claim interface (required on Windows)
try:
usb.util.claim_interface(self.dev, self._interface)
except usb.core.USBError as e:
if 'resource busy' not in str(e).lower():
raise
# Find endpoints dynamically
self._endpoint_out = usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
)
self._endpoint_in = usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
)
# Fallback to hardcoded endpoints if discovery fails
if self._endpoint_out is None:
self._endpoint_out = 0x01
if self._endpoint_in is None:
self._endpoint_in = 0x81
# Send initialization command
self.dev.write(1, TOYPAD_INIT)
self._write_raw(TOYPAD_INIT)
# Get device info
try:
@@ -367,8 +491,10 @@ class LegoDimensionsReader:
except usb.core.USBError as e:
error_msg = str(e)
if "Access denied" in error_msg or "insufficient permissions" in error_msg.lower():
if "access denied" in error_msg.lower() or "insufficient permissions" in error_msg.lower():
print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.")
elif "no backend" in error_msg.lower():
print("ERROR: No USB backend found. Install libusb-package: pip install libusb-package")
if self.on_error:
self.on_error(e)
return False
@@ -384,10 +510,21 @@ class LegoDimensionsReader:
try:
# Turn off all LEDs
self.set_pad_color(Pad.ALL, COLORS['OFF'])
# Release interface (Windows)
if self._interface is not None:
try:
usb.util.release_interface(self.dev, self._interface)
except Exception:
pass
usb.util.dispose_resources(self.dev)
except Exception:
pass
self.dev = None
self._endpoint_in = None
self._endpoint_out = None
self._interface = None
if self.on_disconnect:
self.on_disconnect()
@@ -395,6 +532,39 @@ class LegoDimensionsReader:
"""Calculate checksum (sum of bytes mod 256)"""
return sum(command) % 256
def _write_raw(self, data: list):
"""Write raw data to device."""
if not self.dev:
return
try:
endpoint = self._endpoint_out
if hasattr(endpoint, 'bEndpointAddress'):
endpoint = endpoint.bEndpointAddress
if endpoint is None:
endpoint = 0x01
with self._lock:
self.dev.write(endpoint, data)
except usb.core.USBError:
pass
def _read_raw(self, size: int = 32, timeout: int = 100) -> Optional[list]:
"""Read raw data from device."""
if not self.dev:
return None
try:
endpoint = self._endpoint_in
if hasattr(endpoint, 'bEndpointAddress'):
endpoint = endpoint.bEndpointAddress
if endpoint is None:
endpoint = 0x81
return list(self.dev.read(endpoint, size, timeout=timeout))
except usb.core.USBError:
return None
def _send_command(self, command: list):
"""Send a command to the portal with checksum and padding."""
if not self.dev:
@@ -407,11 +577,7 @@ class LegoDimensionsReader:
while len(message) < 32:
message.append(0x00)
try:
with self._lock:
self.dev.write(1, message)
except usb.core.USBError:
pass # Ignore write errors
self._write_raw(message)
def set_pad_color(self, pad: Pad, color: list):
"""
@@ -427,22 +593,47 @@ class LegoDimensionsReader:
def flash_pad(self, pad: Pad, color: list, on_time: int = 10,
off_time: int = 10, count: int = 5):
"""
Flash a pad with a color.
Flash a pad with a color using software-based blinking.
Note: Hardware flash command (0xC8) is unreliable on some portals,
so this uses rapid solid color toggling instead.
Args:
pad: Which pad to flash
color: [R, G, B] values
on_time: Ticks for LED on state (1 tick 50ms)
on_time: Ticks for LED on state (1 tick ~ 50ms)
off_time: Ticks for LED off state
count: Number of flashes (255 = infinite)
count: Number of flashes (255 = infinite, capped at 50)
"""
command = [
0x55, 0x0e, 0xc8, 0x06, pad.value,
color[0], color[1], color[2],
on_time, off_time, count,
0, 0, 0, 0, 0 # Return color (off)
]
self._send_command(command)
# Convert ticks to seconds (1 tick ≈ 50ms)
on_secs = on_time * 0.05
off_secs = off_time * 0.05
# Cap infinite flashes to reasonable number for software implementation
actual_count = min(count, 50) if count == 255 else count
# Determine which pads to flash
if pad == Pad.ALL:
pads_to_flash = [Pad.CENTER, Pad.LEFT, Pad.RIGHT]
else:
pads_to_flash = [pad]
def _flash_thread():
for _ in range(actual_count):
if not self._running and not self.dev:
break
# ON
for p in pads_to_flash:
self.set_pad_color(p, color)
time.sleep(on_secs)
# OFF
for p in pads_to_flash:
self.set_pad_color(p, COLORS['OFF'])
time.sleep(off_secs)
# Run flash in background thread so it doesn't block
flash_thread = threading.Thread(target=_flash_thread, daemon=True)
flash_thread.start()
def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1):
"""
@@ -481,7 +672,13 @@ class LegoDimensionsReader:
while self._running:
try:
# Read from IN endpoint with timeout
in_packet = self.dev.read(0x81, 32, timeout=100)
endpoint = self._endpoint_in
if hasattr(endpoint, 'bEndpointAddress'):
endpoint = endpoint.bEndpointAddress
if endpoint is None:
endpoint = 0x81
in_packet = self.dev.read(endpoint, 32, timeout=100)
bytelist = list(in_packet)
if not bytelist:
@@ -530,8 +727,12 @@ class LegoDimensionsReader:
self.on_tag_remove(tag_info)
except usb.core.USBError as e:
if e.errno == 110 or "timeout" in str(e).lower(): # Timeout - normal
continue
if self._is_timeout_error(e):
continue # Timeout is normal
elif self._is_pipe_error(e):
if self._running and self.on_error:
self.on_error(ConnectionError("Portal disconnected"))
break
elif self._running:
if self.on_error:
self.on_error(e)
@@ -727,24 +928,25 @@ def encrypt_character_id(uid: bytes, character_id: int) -> bytes:
if __name__ == "__main__":
def on_insert(tag: TagInfo):
print(f"\n{'='*50}")
print(f" TAG PLACED on {tag.pad.name} pad")
print(f" UID: {tag.uid_hex}")
print(f" Password: {generate_password(tag.uid).hex().upper()}")
print(f"[+] TAG PLACED on {tag.pad.name} pad")
print(f" UID: {tag.uid_hex}")
print(f" Password: {generate_password(tag.uid).hex().upper()}")
print(f"{'='*50}")
def on_remove(tag: TagInfo):
print(f"\n TAG REMOVED from {tag.pad.name} pad")
print(f"\n[-] TAG REMOVED from {tag.pad.name} pad")
def on_error(e: Exception):
print(f"\n ERROR: {e}")
print(f"\n[!] ERROR: {e}")
def on_connect():
print(" Portal connected and initialized")
print("[+] Portal connected and initialized")
def on_disconnect():
print(" Portal disconnected")
print("[-] Portal disconnected")
print(f"\nLEGO Dimensions Portal Reader v{__version__}")
print(f"Platform: {platform.system()}")
print("="*50)
reader = LegoDimensionsReader()