#!/usr/bin/env python3 """ LEGO Dimensions Portal CLI Client Moonlight Drive-In Theater System Reads LEGO Dimensions character/vehicle discs from the USB portal and sends legacy keys to the video player server. Requirements: pip install pyusb libusb-package requests Windows Driver Setup: Use Zadig to install WinUSB driver for "LEGO READER V2.10" """ import requests import sys import argparse import time import os # Add current directory to path for lego_dimensions_reader import sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from lego_dimensions_reader import ( LegoDimensionsReader, TagInfo, Pad, COLORS, generate_password, __version__ as READER_VERSION ) # Default server settings DEFAULT_SERVER = "100.94.163.117" DEFAULT_PORT = 8547 # Tag colors config file TAG_COLORS_FILE = "tag_colors.json" # ============================================================================= # TAG COLOR MAPPINGS (loaded from tag_colors.json) # ============================================================================= def load_tag_colors(filepath: str = TAG_COLORS_FILE) -> tuple: """ Load tag color mappings from JSON file. Returns (tag_colors_dict, default_config) """ import json default_config = { 'color': COLORS['CYAN'], 'effect': 'solid', 'name': 'Default' } tag_colors = {} # Try to find the config file search_paths = [ filepath, os.path.join(os.path.dirname(os.path.abspath(__file__)), filepath), os.path.join(os.getcwd(), filepath), ] config_path = None for path in search_paths: if os.path.exists(path): config_path = path break if config_path is None: print(f" Note: {filepath} not found, using default colors") return tag_colors, default_config try: with open(config_path, 'r', encoding='utf-8') as f: data = json.load(f) # Load default config if 'default' in data: default_config = { 'color': data['default'].get('color', COLORS['CYAN']), 'effect': data['default'].get('effect', 'solid'), 'name': data['default'].get('name', 'Default') } # Load tag mappings (convert string keys to int) if 'tags' in data: for key, config in data['tags'].items(): try: legacy_key = int(key) tag_colors[legacy_key] = { 'color': config.get('color', COLORS['CYAN']), 'effect': config.get('effect', 'solid'), 'name': config.get('name', '') } except ValueError: pass # Skip invalid keys print(f" Loaded {len(tag_colors)} tag color mappings from {filepath}") except json.JSONDecodeError as e: print(f" Warning: Invalid JSON in {filepath}: {e}") except Exception as e: print(f" Warning: Could not load {filepath}: {e}") return tag_colors, default_config # Will be loaded at runtime TAG_COLORS = {} DEFAULT_TAG_COLOR = { 'color': COLORS['CYAN'], 'effect': 'solid', 'name': 'Default' } def uid_to_legacy_key(uid: bytes) -> int: """ Convert 7-byte UID to legacy integer key format. Uses bytes 3-6 (the unique portion) as a 32-bit little-endian integer. """ if len(uid) >= 7: return int.from_bytes(uid[3:7], byteorder='little') elif len(uid) >= 4: return int.from_bytes(uid[:4], byteorder='little') else: return int.from_bytes(uid.ljust(4, b'\x00'), byteorder='little') def send_nfc(server_ip, port, nfc_id): """Send NFC command to server""" url = f"http://{server_ip}:{port}/api/video/play" try: print(f"Sending NFC '{nfc_id}' to {server_ip}:{port}...") response = requests.post(url, json={'nfc_id': str(nfc_id)}, timeout=5) if response.status_code == 200: result = response.json() if result.get('success'): print(f"✓ SUCCESS: {result.get('message', 'Command sent')}") return True else: print(f"✗ FAILED: {result.get('message', 'Unknown error')}") return False else: print(f"✗ HTTP Error: {response.status_code}") return False except requests.exceptions.ConnectionError: print(f"✗ Cannot connect to server at {server_ip}:{port}") return False except requests.exceptions.Timeout: print(f"✗ Request timeout - server not responding") return False except Exception as e: print(f"✗ Error: {e}") return False def send_skip(server_ip, port): """Send skip command""" url = f"http://{server_ip}:{port}/api/video/skip" try: print(f"Sending skip command to {server_ip}:{port}...") response = requests.post(url, timeout=5) if response.status_code == 200: print("✓ Skip command sent") return True else: print(f"✗ HTTP Error: {response.status_code}") return False except Exception as e: print(f"✗ Error: {e}") return False def test_connection(server_ip, port): """Test connection to server""" url = f"http://{server_ip}:{port}/api/stats" try: print(f"Testing connection to {server_ip}:{port}...") response = requests.get(url, timeout=3) if response.status_code == 200: print("✓ Server is online and responding") stats = response.json() print(f" Videos played: {stats.get('videos_played', 'N/A')}") print(f" Current video: {stats.get('current_video', 'N/A')}") print(f" Uptime: {stats.get('uptime', 'N/A')}") return True else: print(f"✗ Server error: HTTP {response.status_code}") return False except requests.exceptions.ConnectionError: print(f"✗ Cannot connect to server at {server_ip}:{port}") print(" Make sure:") print(" 1. The video player is running") print(" 2. Web interface is enabled") print(" 3. IP address and port are correct") print(" 4. Firewall allows the connection") return False except Exception as e: print(f"✗ Error: {e}") return False def portal_mode(server_ip, port): """Portal mode - read tags from LEGO Dimensions portal""" global TAG_COLORS, DEFAULT_TAG_COLOR print("\n" + "="*60) print("LEGO DIMENSIONS PORTAL MODE") print("="*60) print(f"Server: {server_ip}:{port}") print(f"Reader version: {READER_VERSION}") # Load tag colors from JSON TAG_COLORS, DEFAULT_TAG_COLOR = load_tag_colors() print("="*60) print() # Track last sent to avoid duplicates last_sent_key = None last_sent_time = 0 DEBOUNCE_TIME = 2.0 # Seconds before allowing same tag again # Will be set after reader is created reader = None def apply_tag_color(legacy_key: int): """Apply color effect to ALL pads based on tag mapping""" if reader is None: return # Look up color config for this tag config = TAG_COLORS.get(legacy_key, DEFAULT_TAG_COLOR) color = config.get('color', COLORS['CYAN']) effect = config.get('effect', 'solid') name = config.get('name', '') if name: print(f" Theme: {name}") # Apply effect to ALL pads if effect == 'pulse': # Slow pulsing fade on all pads (count=255 for continuous) reader.fade_pad(Pad.ALL, color, speed=15, count=255) elif effect == 'flash': # Quick flashing on all pads reader.flash_pad(Pad.ALL, color, on_time=8, off_time=8, count=255) else: # Solid color on all pads reader.set_pad_color(Pad.ALL, color) def on_tag_insert(tag: TagInfo): nonlocal last_sent_key, last_sent_time legacy_key = uid_to_legacy_key(tag.uid) current_time = time.time() print() print(f"[+] TAG DETECTED on {tag.pad.name} pad") print(f" UID: {tag.uid_hex}") print(f" Legacy Key: {legacy_key}") # Step 1: ALL LEDs OFF if reader: reader.set_pad_color(Pad.ALL, COLORS['OFF']) # Debounce - don't resend same tag too quickly if legacy_key == last_sent_key and (current_time - last_sent_time) < DEBOUNCE_TIME: print(f" (Skipping server - same tag sent {current_time - last_sent_time:.1f}s ago)") # Still apply the theme effect even if skipping server apply_tag_color(legacy_key) return # Send to server print() if send_nfc(server_ip, port, legacy_key): last_sent_key = legacy_key last_sent_time = current_time if reader: # Step 2: Blue blink CENTER to confirm scan reader.set_pad_color(Pad.CENTER, COLORS['BLUE']) time.sleep(0.15) # Step 3: ALL OFF reader.set_pad_color(Pad.ALL, COLORS['OFF']) time.sleep(0.1) # Step 4: Apply theme effect to ALL pads apply_tag_color(legacy_key) print() def on_tag_remove(tag: TagInfo): print(f"[-] TAG REMOVED from {tag.pad.name} pad") # Turn off ALL pad LEDs if reader: reader.set_pad_color(Pad.ALL, COLORS['OFF']) def on_connect(): print("✓ Portal connected and initialized") print() print("Place LEGO Dimensions discs on the portal...") print("Press Ctrl+C to exit") print() def on_disconnect(): print("\n✗ Portal disconnected") def on_error(e): print(f"\n✗ Portal error: {e}") # Initialize reader reader = LegoDimensionsReader() reader.on_tag_insert = on_tag_insert reader.on_tag_remove = on_tag_remove reader.on_connect = on_connect reader.on_disconnect = on_disconnect reader.on_error = on_error try: print("Connecting to portal...") reader.start() # Keep running while True: time.sleep(0.5) except KeyboardInterrupt: print("\n\nExiting...") except ConnectionError as e: print(f"\n✗ {e}") return False finally: reader.disconnect() print("Portal disconnected. Goodbye!") return True def interactive_mode(server_ip, port): """Interactive mode for manual NFC input (no portal needed)""" print("\n" + "="*60) print("INTERACTIVE MODE (Manual Input)") print("="*60) print(f"Server: {server_ip}:{port}") print("Commands:") print(" - Type NFC ID and press Enter to send") print(" - Type 'skip' to skip current video") print(" - Type 'test' to test connection") print(" - Type 'quit' or 'exit' to exit") print("="*60) print() while True: try: command = input("NFC> ").strip() if not command: continue if command.lower() in ['quit', 'exit', 'q']: print("Exiting...") break if command.lower() == 'skip': send_skip(server_ip, port) continue if command.lower() == 'test': test_connection(server_ip, port) continue # Assume it's an NFC ID send_nfc(server_ip, port, command) except KeyboardInterrupt: print("\nExiting...") break except EOFError: break def main(): """Main entry point""" parser = argparse.ArgumentParser( description='LEGO Dimensions Portal Client - Send NFC commands to video player', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=f""" Default server: {DEFAULT_SERVER}:{DEFAULT_PORT} Examples: # Portal mode (default) - read tags from LEGO Dimensions portal python dimensions_cli_client.py # Test connection first python dimensions_cli_client.py -t # Send single NFC command manually python dimensions_cli_client.py -n 12345678 # Skip current video python dimensions_cli_client.py --skip # Manual interactive mode (no portal) python dimensions_cli_client.py -i # Use different server python dimensions_cli_client.py -s 192.168.1.100 -p 8547 """ ) parser.add_argument('-s', '--server', default=DEFAULT_SERVER, help=f'Server IP address (default: {DEFAULT_SERVER})') parser.add_argument('-p', '--port', type=int, default=DEFAULT_PORT, help=f'Server port (default: {DEFAULT_PORT})') parser.add_argument('-n', '--nfc', help='NFC ID to send (manual, no portal needed)') parser.add_argument('-t', '--test', action='store_true', help='Test connection to server') parser.add_argument('--skip', action='store_true', help='Send skip video command') parser.add_argument('-i', '--interactive', action='store_true', help='Manual interactive mode (no portal)') args = parser.parse_args() print("="*60) print("LEGO Dimensions Portal Client") print("Moonlight Drive-In Theater System") print("="*60) # Test connection if args.test: test_connection(args.server, args.port) return 0 # Send skip if args.skip: success = send_skip(args.server, args.port) return 0 if success else 1 # Send manual NFC if args.nfc: success = send_nfc(args.server, args.port, args.nfc) return 0 if success else 1 # Manual interactive mode if args.interactive: interactive_mode(args.server, args.port) return 0 # Default: Portal mode success = portal_mode(args.server, args.port) return 0 if success else 1 if __name__ == "__main__": sys.exit(main())