diff --git a/lego_dimensions_reader.py b/lego_dimensions_reader.py index 000981b..cdc3a51 100644 --- a/lego_dimensions_reader.py +++ b/lego_dimensions_reader.py @@ -7,24 +7,63 @@ Communicates with LEGO Dimensions USB portal (PS3/PS4/Wii U versions) to detect character and vehicle disc placement/removal events. Requirements: - pip install pyusb + pip install pyusb libusb-package Windows Driver Setup: 1. Download Zadig from https://zadig.akeo.ie/ 2. Connect the LEGO Dimensions portal 3. Run Zadig as administrator - 4. Select Options → List All Devices + 4. Select Options -> List All Devices 5. Choose "LEGO READER V2.10" from the dropdown 6. Select WinUSB as the target driver 7. Click "Replace Driver" + +Windows libusb Setup: + The libusb-package provides pre-compiled libusb-1.0.dll. + Alternatively, download from https://libusb.info/ and place + libusb-1.0.dll in C:\\Windows\\System32 """ +import sys +import os +import platform + +# Windows-specific backend setup - must be done BEFORE importing usb.core +def _setup_windows_backend(): + """Configure libusb backend for Windows.""" + if platform.system() != 'Windows': + return + + # Try libusb-package first (recommended) + try: + import libusb_package + return + except ImportError: + pass + + # Search for libusb DLL in common locations + dll_paths = [ + os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'System32'), + os.path.join(os.environ.get('WINDIR', 'C:\\Windows'), 'SysWOW64'), + os.path.dirname(sys.executable), + os.getcwd(), + ] + + for path in dll_paths: + dll_file = os.path.join(path, 'libusb-1.0.dll') + if os.path.exists(dll_file): + os.environ['PATH'] = path + os.pathsep + os.environ.get('PATH', '') + return + +# Run setup before USB imports +_setup_windows_backend() + import usb.core import usb.util +import usb.backend.libusb1 import threading import time import json -import os from typing import Callable, Optional, Dict, Any from dataclasses import dataclass from enum import Enum @@ -35,7 +74,26 @@ VENDOR_ID = 0x0e6f PRODUCT_ID = 0x0241 # Module version -__version__ = "1.0.0" +__version__ = "1.2.1" + + +def get_usb_backend(): + """Get the appropriate USB backend for the current platform.""" + if platform.system() == 'Windows': + # Try libusb-package first + try: + import libusb_package + return libusb_package.get_libusb1_backend() + except (ImportError, Exception): + pass + + # Try default libusb1 backend + try: + return usb.backend.libusb1.get_backend() + except Exception: + pass + + return None # Use default backend class Pad(Enum): @@ -117,6 +175,11 @@ class LegoDimensionsReader: self._thread: Optional[threading.Thread] = None self._lock = threading.Lock() + # USB endpoints (will be discovered on connect) + self._endpoint_in = None + self._endpoint_out = None + self._interface = None + # Active tags currently on pads (pad_num -> TagInfo) self._active_tags: Dict[int, TagInfo] = {} @@ -136,7 +199,7 @@ class LegoDimensionsReader: # Try to load from JSON file if provided if db_path and os.path.exists(db_path): try: - with open(db_path, 'r') as f: + with open(db_path, 'r', encoding='utf-8') as f: data = json.load(f) return {int(k): v for k, v in data.get('characters', {}).items()} except Exception: @@ -232,7 +295,7 @@ class LegoDimensionsReader: """Load vehicle ID database.""" if db_path and os.path.exists(db_path): try: - with open(db_path, 'r') as f: + with open(db_path, 'r', encoding='utf-8') as f: data = json.load(f) return {int(k): v for k, v in data.get('vehicles', {}).items()} except Exception: @@ -323,35 +386,96 @@ class LegoDimensionsReader: 82: "Niffler", } + def _is_timeout_error(self, error: usb.core.USBError) -> bool: + """Check if error is a timeout (normal during polling).""" + error_str = str(error).lower() + # Windows timeout codes vary: -116, -7, 110, or string + return ( + error.errno in (-116, -7, 110, None) or + 'timeout' in error_str or + 'operation timed out' in error_str or + 'timed out' in error_str + ) + + def _is_pipe_error(self, error: usb.core.USBError) -> bool: + """Check if error indicates device disconnection.""" + return error.errno == -9 or 'pipe' in str(error).lower() + def connect(self) -> bool: """ Establish connection to LEGO Dimensions portal. Returns True if connection successful. """ try: - self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) + # Get platform-appropriate backend + backend = get_usb_backend() + + self.dev = usb.core.find( + idVendor=VENDOR_ID, + idProduct=PRODUCT_ID, + backend=backend + ) if self.dev is None: - raise ConnectionError( + error_msg = ( "LEGO Dimensions Portal not found.\n" "Ensure:\n" " 1. Portal is connected via USB\n" " 2. Using PS3/PS4/Wii U portal (NOT Xbox)\n" " 3. WinUSB driver installed via Zadig" ) + if platform.system() == 'Windows': + error_msg += ( + "\n 4. libusb installed: pip install libusb-package\n" + " 5. Or libusb-1.0.dll in System32" + ) + raise ConnectionError(error_msg) - # Detach kernel driver if active (Linux) + # Detach kernel driver if active (Linux only) try: if self.dev.is_kernel_driver_active(0): self.dev.detach_kernel_driver(0) - except (AttributeError, usb.core.USBError): + except (AttributeError, usb.core.USBError, NotImplementedError): pass # Not available on Windows # Configure device - self.dev.set_configuration() + try: + self.dev.set_configuration() + except usb.core.USBError as e: + # Ignore "Entity not found" - device already configured + if 'entity not found' not in str(e).lower(): + raise + + # Get configuration and interface + cfg = self.dev.get_active_configuration() + intf = cfg[(0, 0)] + self._interface = intf.bInterfaceNumber + + # Claim interface (required on Windows) + try: + usb.util.claim_interface(self.dev, self._interface) + except usb.core.USBError as e: + if 'resource busy' not in str(e).lower(): + raise + + # Find endpoints dynamically + self._endpoint_out = usb.util.find_descriptor( + intf, + custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT + ) + self._endpoint_in = usb.util.find_descriptor( + intf, + custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN + ) + + # Fallback to hardcoded endpoints if discovery fails + if self._endpoint_out is None: + self._endpoint_out = 0x01 + if self._endpoint_in is None: + self._endpoint_in = 0x81 # Send initialization command - self.dev.write(1, TOYPAD_INIT) + self._write_raw(TOYPAD_INIT) # Get device info try: @@ -367,8 +491,10 @@ class LegoDimensionsReader: except usb.core.USBError as e: error_msg = str(e) - if "Access denied" in error_msg or "insufficient permissions" in error_msg.lower(): + if "access denied" in error_msg.lower() or "insufficient permissions" in error_msg.lower(): print("ERROR: Access denied. Try running as Administrator or check Zadig drivers.") + elif "no backend" in error_msg.lower(): + print("ERROR: No USB backend found. Install libusb-package: pip install libusb-package") if self.on_error: self.on_error(e) return False @@ -384,10 +510,21 @@ class LegoDimensionsReader: try: # Turn off all LEDs self.set_pad_color(Pad.ALL, COLORS['OFF']) + + # Release interface (Windows) + if self._interface is not None: + try: + usb.util.release_interface(self.dev, self._interface) + except Exception: + pass + usb.util.dispose_resources(self.dev) except Exception: pass self.dev = None + self._endpoint_in = None + self._endpoint_out = None + self._interface = None if self.on_disconnect: self.on_disconnect() @@ -395,6 +532,39 @@ class LegoDimensionsReader: """Calculate checksum (sum of bytes mod 256)""" return sum(command) % 256 + def _write_raw(self, data: list): + """Write raw data to device.""" + if not self.dev: + return + + try: + endpoint = self._endpoint_out + if hasattr(endpoint, 'bEndpointAddress'): + endpoint = endpoint.bEndpointAddress + if endpoint is None: + endpoint = 0x01 + + with self._lock: + self.dev.write(endpoint, data) + except usb.core.USBError: + pass + + def _read_raw(self, size: int = 32, timeout: int = 100) -> Optional[list]: + """Read raw data from device.""" + if not self.dev: + return None + + try: + endpoint = self._endpoint_in + if hasattr(endpoint, 'bEndpointAddress'): + endpoint = endpoint.bEndpointAddress + if endpoint is None: + endpoint = 0x81 + + return list(self.dev.read(endpoint, size, timeout=timeout)) + except usb.core.USBError: + return None + def _send_command(self, command: list): """Send a command to the portal with checksum and padding.""" if not self.dev: @@ -407,11 +577,7 @@ class LegoDimensionsReader: while len(message) < 32: message.append(0x00) - try: - with self._lock: - self.dev.write(1, message) - except usb.core.USBError: - pass # Ignore write errors + self._write_raw(message) def set_pad_color(self, pad: Pad, color: list): """ @@ -427,22 +593,47 @@ class LegoDimensionsReader: def flash_pad(self, pad: Pad, color: list, on_time: int = 10, off_time: int = 10, count: int = 5): """ - Flash a pad with a color. + Flash a pad with a color using software-based blinking. + + Note: Hardware flash command (0xC8) is unreliable on some portals, + so this uses rapid solid color toggling instead. Args: pad: Which pad to flash color: [R, G, B] values - on_time: Ticks for LED on state (1 tick ≈ 50ms) + on_time: Ticks for LED on state (1 tick ~ 50ms) off_time: Ticks for LED off state - count: Number of flashes (255 = infinite) + count: Number of flashes (255 = infinite, capped at 50) """ - command = [ - 0x55, 0x0e, 0xc8, 0x06, pad.value, - color[0], color[1], color[2], - on_time, off_time, count, - 0, 0, 0, 0, 0 # Return color (off) - ] - self._send_command(command) + # Convert ticks to seconds (1 tick ≈ 50ms) + on_secs = on_time * 0.05 + off_secs = off_time * 0.05 + + # Cap infinite flashes to reasonable number for software implementation + actual_count = min(count, 50) if count == 255 else count + + # Determine which pads to flash + if pad == Pad.ALL: + pads_to_flash = [Pad.CENTER, Pad.LEFT, Pad.RIGHT] + else: + pads_to_flash = [pad] + + def _flash_thread(): + for _ in range(actual_count): + if not self._running and not self.dev: + break + # ON + for p in pads_to_flash: + self.set_pad_color(p, color) + time.sleep(on_secs) + # OFF + for p in pads_to_flash: + self.set_pad_color(p, COLORS['OFF']) + time.sleep(off_secs) + + # Run flash in background thread so it doesn't block + flash_thread = threading.Thread(target=_flash_thread, daemon=True) + flash_thread.start() def fade_pad(self, pad: Pad, color: list, speed: int = 10, count: int = 1): """ @@ -481,7 +672,13 @@ class LegoDimensionsReader: while self._running: try: # Read from IN endpoint with timeout - in_packet = self.dev.read(0x81, 32, timeout=100) + endpoint = self._endpoint_in + if hasattr(endpoint, 'bEndpointAddress'): + endpoint = endpoint.bEndpointAddress + if endpoint is None: + endpoint = 0x81 + + in_packet = self.dev.read(endpoint, 32, timeout=100) bytelist = list(in_packet) if not bytelist: @@ -530,8 +727,12 @@ class LegoDimensionsReader: self.on_tag_remove(tag_info) except usb.core.USBError as e: - if e.errno == 110 or "timeout" in str(e).lower(): # Timeout - normal - continue + if self._is_timeout_error(e): + continue # Timeout is normal + elif self._is_pipe_error(e): + if self._running and self.on_error: + self.on_error(ConnectionError("Portal disconnected")) + break elif self._running: if self.on_error: self.on_error(e) @@ -727,24 +928,25 @@ def encrypt_character_id(uid: bytes, character_id: int) -> bytes: if __name__ == "__main__": def on_insert(tag: TagInfo): print(f"\n{'='*50}") - print(f"✓ TAG PLACED on {tag.pad.name} pad") - print(f" UID: {tag.uid_hex}") - print(f" Password: {generate_password(tag.uid).hex().upper()}") + print(f"[+] TAG PLACED on {tag.pad.name} pad") + print(f" UID: {tag.uid_hex}") + print(f" Password: {generate_password(tag.uid).hex().upper()}") print(f"{'='*50}") def on_remove(tag: TagInfo): - print(f"\n✗ TAG REMOVED from {tag.pad.name} pad") + print(f"\n[-] TAG REMOVED from {tag.pad.name} pad") def on_error(e: Exception): - print(f"\n⚠ ERROR: {e}") + print(f"\n[!] ERROR: {e}") def on_connect(): - print("✓ Portal connected and initialized") + print("[+] Portal connected and initialized") def on_disconnect(): - print("✗ Portal disconnected") + print("[-] Portal disconnected") print(f"\nLEGO Dimensions Portal Reader v{__version__}") + print(f"Platform: {platform.system()}") print("="*50) reader = LegoDimensionsReader() @@ -768,4 +970,4 @@ if __name__ == "__main__": print(f"\nConnection failed: {e}") finally: reader.disconnect() - print("Goodbye!") + print("Goodbye!") \ No newline at end of file