diff --git a/dimensions_cli_client.py b/dimensions_cli_client.py new file mode 100644 index 0000000..57ad591 --- /dev/null +++ b/dimensions_cli_client.py @@ -0,0 +1,483 @@ +#!/usr/bin/env python3 +""" +LEGO Dimensions Portal CLI Client +Moonlight Drive-In Theater System + +Reads LEGO Dimensions character/vehicle discs from the USB portal +and sends legacy keys to the video player server. + +Requirements: + pip install pyusb libusb-package requests + +Windows Driver Setup: + Use Zadig to install WinUSB driver for "LEGO READER V2.10" +""" + +import requests +import sys +import argparse +import time +import os + +# Add current directory to path for lego_dimensions_reader import +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from lego_dimensions_reader import ( + LegoDimensionsReader, + TagInfo, + Pad, + COLORS, + generate_password, + __version__ as READER_VERSION +) + +# Default server settings +DEFAULT_SERVER = "100.94.163.117" +DEFAULT_PORT = 8547 + +# Tag colors config file +TAG_COLORS_FILE = "tag_colors.json" + +# ============================================================================= +# TAG COLOR MAPPINGS (loaded from tag_colors.json) +# ============================================================================= + +def load_tag_colors(filepath: str = TAG_COLORS_FILE) -> tuple: + """ + Load tag color mappings from JSON file. + Returns (tag_colors_dict, default_config) + """ + import json + + default_config = { + 'color': COLORS['CYAN'], + 'effect': 'solid', + 'name': 'Default' + } + + tag_colors = {} + + # Try to find the config file + search_paths = [ + filepath, + os.path.join(os.path.dirname(os.path.abspath(__file__)), filepath), + os.path.join(os.getcwd(), filepath), + ] + + config_path = None + for path in search_paths: + if os.path.exists(path): + config_path = path + break + + if config_path is None: + print(f" Note: {filepath} not found, using default colors") + return tag_colors, default_config + + try: + with open(config_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + # Load default config + if 'default' in data: + default_config = { + 'color': data['default'].get('color', COLORS['CYAN']), + 'effect': data['default'].get('effect', 'solid'), + 'name': data['default'].get('name', 'Default') + } + + # Load tag mappings (convert string keys to int) + if 'tags' in data: + for key, config in data['tags'].items(): + try: + legacy_key = int(key) + tag_colors[legacy_key] = { + 'color': config.get('color', COLORS['CYAN']), + 'effect': config.get('effect', 'solid'), + 'name': config.get('name', '') + } + except ValueError: + pass # Skip invalid keys + + print(f" Loaded {len(tag_colors)} tag color mappings from {filepath}") + + except json.JSONDecodeError as e: + print(f" Warning: Invalid JSON in {filepath}: {e}") + except Exception as e: + print(f" Warning: Could not load {filepath}: {e}") + + return tag_colors, default_config + + +# Will be loaded at runtime +TAG_COLORS = {} +DEFAULT_TAG_COLOR = { + 'color': COLORS['CYAN'], + 'effect': 'solid', + 'name': 'Default' +} + + +def uid_to_legacy_key(uid: bytes) -> int: + """ + Convert 7-byte UID to legacy integer key format. + Uses bytes 3-6 (the unique portion) as a 32-bit little-endian integer. + """ + if len(uid) >= 7: + return int.from_bytes(uid[3:7], byteorder='little') + elif len(uid) >= 4: + return int.from_bytes(uid[:4], byteorder='little') + else: + return int.from_bytes(uid.ljust(4, b'\x00'), byteorder='little') + + +def send_nfc(server_ip, port, nfc_id): + """Send NFC command to server""" + url = f"http://{server_ip}:{port}/api/video/play" + + try: + print(f"Sending NFC '{nfc_id}' to {server_ip}:{port}...") + response = requests.post(url, json={'nfc_id': str(nfc_id)}, timeout=5) + + if response.status_code == 200: + result = response.json() + if result.get('success'): + print(f"✓ SUCCESS: {result.get('message', 'Command sent')}") + return True + else: + print(f"✗ FAILED: {result.get('message', 'Unknown error')}") + return False + else: + print(f"✗ HTTP Error: {response.status_code}") + return False + + except requests.exceptions.ConnectionError: + print(f"✗ Cannot connect to server at {server_ip}:{port}") + return False + except requests.exceptions.Timeout: + print(f"✗ Request timeout - server not responding") + return False + except Exception as e: + print(f"✗ Error: {e}") + return False + + +def send_skip(server_ip, port): + """Send skip command""" + url = f"http://{server_ip}:{port}/api/video/skip" + + try: + print(f"Sending skip command to {server_ip}:{port}...") + response = requests.post(url, timeout=5) + + if response.status_code == 200: + print("✓ Skip command sent") + return True + else: + print(f"✗ HTTP Error: {response.status_code}") + return False + + except Exception as e: + print(f"✗ Error: {e}") + return False + + +def test_connection(server_ip, port): + """Test connection to server""" + url = f"http://{server_ip}:{port}/api/stats" + + try: + print(f"Testing connection to {server_ip}:{port}...") + response = requests.get(url, timeout=3) + + if response.status_code == 200: + print("✓ Server is online and responding") + stats = response.json() + print(f" Videos played: {stats.get('videos_played', 'N/A')}") + print(f" Current video: {stats.get('current_video', 'N/A')}") + print(f" Uptime: {stats.get('uptime', 'N/A')}") + return True + else: + print(f"✗ Server error: HTTP {response.status_code}") + return False + + except requests.exceptions.ConnectionError: + print(f"✗ Cannot connect to server at {server_ip}:{port}") + print(" Make sure:") + print(" 1. The video player is running") + print(" 2. Web interface is enabled") + print(" 3. IP address and port are correct") + print(" 4. Firewall allows the connection") + return False + except Exception as e: + print(f"✗ Error: {e}") + return False + + +def portal_mode(server_ip, port): + """Portal mode - read tags from LEGO Dimensions portal""" + global TAG_COLORS, DEFAULT_TAG_COLOR + + print("\n" + "="*60) + print("LEGO DIMENSIONS PORTAL MODE") + print("="*60) + print(f"Server: {server_ip}:{port}") + print(f"Reader version: {READER_VERSION}") + + # Load tag colors from JSON + TAG_COLORS, DEFAULT_TAG_COLOR = load_tag_colors() + + print("="*60) + print() + + # Track last sent to avoid duplicates + last_sent_key = None + last_sent_time = 0 + DEBOUNCE_TIME = 2.0 # Seconds before allowing same tag again + + # Will be set after reader is created + reader = None + + def apply_tag_color(legacy_key: int): + """Apply color effect to ALL pads based on tag mapping""" + if reader is None: + return + + # Look up color config for this tag + config = TAG_COLORS.get(legacy_key, DEFAULT_TAG_COLOR) + color = config.get('color', COLORS['CYAN']) + effect = config.get('effect', 'solid') + name = config.get('name', '') + + if name: + print(f" Theme: {name}") + + # Apply effect to ALL pads + if effect == 'pulse': + # Slow pulsing fade on all pads (count=255 for continuous) + reader.fade_pad(Pad.ALL, color, speed=15, count=255) + elif effect == 'flash': + # Quick flashing on all pads + reader.flash_pad(Pad.ALL, color, on_time=8, off_time=8, count=255) + else: + # Solid color on all pads + reader.set_pad_color(Pad.ALL, color) + + def on_tag_insert(tag: TagInfo): + nonlocal last_sent_key, last_sent_time + + legacy_key = uid_to_legacy_key(tag.uid) + current_time = time.time() + + print() + print(f"[+] TAG DETECTED on {tag.pad.name} pad") + print(f" UID: {tag.uid_hex}") + print(f" Legacy Key: {legacy_key}") + + # Step 1: ALL LEDs OFF + if reader: + reader.set_pad_color(Pad.ALL, COLORS['OFF']) + + # Debounce - don't resend same tag too quickly + if legacy_key == last_sent_key and (current_time - last_sent_time) < DEBOUNCE_TIME: + print(f" (Skipping server - same tag sent {current_time - last_sent_time:.1f}s ago)") + # Still apply the theme effect even if skipping server + apply_tag_color(legacy_key) + return + + # Send to server + print() + if send_nfc(server_ip, port, legacy_key): + last_sent_key = legacy_key + last_sent_time = current_time + + if reader: + # Step 2: Blue blink CENTER to confirm scan + reader.set_pad_color(Pad.CENTER, COLORS['BLUE']) + time.sleep(0.15) + + # Step 3: ALL OFF + reader.set_pad_color(Pad.ALL, COLORS['OFF']) + time.sleep(0.1) + + # Step 4: Apply theme effect to ALL pads + apply_tag_color(legacy_key) + print() + + def on_tag_remove(tag: TagInfo): + print(f"[-] TAG REMOVED from {tag.pad.name} pad") + # Turn off ALL pad LEDs + if reader: + reader.set_pad_color(Pad.ALL, COLORS['OFF']) + + def on_connect(): + print("✓ Portal connected and initialized") + print() + print("Place LEGO Dimensions discs on the portal...") + print("Press Ctrl+C to exit") + print() + + def on_disconnect(): + print("\n✗ Portal disconnected") + + def on_error(e): + print(f"\n✗ Portal error: {e}") + + # Initialize reader + reader = LegoDimensionsReader() + reader.on_tag_insert = on_tag_insert + reader.on_tag_remove = on_tag_remove + reader.on_connect = on_connect + reader.on_disconnect = on_disconnect + reader.on_error = on_error + + try: + print("Connecting to portal...") + reader.start() + + # Keep running + while True: + time.sleep(0.5) + + except KeyboardInterrupt: + print("\n\nExiting...") + except ConnectionError as e: + print(f"\n✗ {e}") + return False + finally: + reader.disconnect() + print("Portal disconnected. Goodbye!") + + return True + + +def interactive_mode(server_ip, port): + """Interactive mode for manual NFC input (no portal needed)""" + print("\n" + "="*60) + print("INTERACTIVE MODE (Manual Input)") + print("="*60) + print(f"Server: {server_ip}:{port}") + print("Commands:") + print(" - Type NFC ID and press Enter to send") + print(" - Type 'skip' to skip current video") + print(" - Type 'test' to test connection") + print(" - Type 'quit' or 'exit' to exit") + print("="*60) + print() + + while True: + try: + command = input("NFC> ").strip() + + if not command: + continue + + if command.lower() in ['quit', 'exit', 'q']: + print("Exiting...") + break + + if command.lower() == 'skip': + send_skip(server_ip, port) + continue + + if command.lower() == 'test': + test_connection(server_ip, port) + continue + + # Assume it's an NFC ID + send_nfc(server_ip, port, command) + + except KeyboardInterrupt: + print("\nExiting...") + break + except EOFError: + break + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser( + description='LEGO Dimensions Portal Client - Send NFC commands to video player', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=f""" +Default server: {DEFAULT_SERVER}:{DEFAULT_PORT} + +Examples: + # Portal mode (default) - read tags from LEGO Dimensions portal + python dimensions_cli_client.py + + # Test connection first + python dimensions_cli_client.py -t + + # Send single NFC command manually + python dimensions_cli_client.py -n 12345678 + + # Skip current video + python dimensions_cli_client.py --skip + + # Manual interactive mode (no portal) + python dimensions_cli_client.py -i + + # Use different server + python dimensions_cli_client.py -s 192.168.1.100 -p 8547 + """ + ) + + parser.add_argument('-s', '--server', + default=DEFAULT_SERVER, + help=f'Server IP address (default: {DEFAULT_SERVER})') + + parser.add_argument('-p', '--port', + type=int, + default=DEFAULT_PORT, + help=f'Server port (default: {DEFAULT_PORT})') + + parser.add_argument('-n', '--nfc', + help='NFC ID to send (manual, no portal needed)') + + parser.add_argument('-t', '--test', + action='store_true', + help='Test connection to server') + + parser.add_argument('--skip', + action='store_true', + help='Send skip video command') + + parser.add_argument('-i', '--interactive', + action='store_true', + help='Manual interactive mode (no portal)') + + args = parser.parse_args() + + print("="*60) + print("LEGO Dimensions Portal Client") + print("Moonlight Drive-In Theater System") + print("="*60) + + # Test connection + if args.test: + test_connection(args.server, args.port) + return 0 + + # Send skip + if args.skip: + success = send_skip(args.server, args.port) + return 0 if success else 1 + + # Send manual NFC + if args.nfc: + success = send_nfc(args.server, args.port, args.nfc) + return 0 if success else 1 + + # Manual interactive mode + if args.interactive: + interactive_mode(args.server, args.port) + return 0 + + # Default: Portal mode + success = portal_mode(args.server, args.port) + return 0 if success else 1 + + +if __name__ == "__main__": + sys.exit(main())