Files
lego-dimensions-reader/dimensions_cli_client.py
jessikitty bbc4f0878c Add CLI client with refined LED sequence and JSON color config
- Tag detection turns ALL LEDs off immediately
- Server success shows CENTER blue blink (150ms)
- Theme effects (pulse/flash/solid) apply to ALL pads
- External tag_colors.json for customizable per-tag themes
- Debounce logic preserves theme effect even when skipping server
2026-01-26 21:32:54 +11:00

484 lines
14 KiB
Python

#!/usr/bin/env python3
"""
LEGO Dimensions Portal CLI Client
Moonlight Drive-In Theater System
Reads LEGO Dimensions character/vehicle discs from the USB portal
and sends legacy keys to the video player server.
Requirements:
pip install pyusb libusb-package requests
Windows Driver Setup:
Use Zadig to install WinUSB driver for "LEGO READER V2.10"
"""
import requests
import sys
import argparse
import time
import os
# Add current directory to path for lego_dimensions_reader import
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from lego_dimensions_reader import (
LegoDimensionsReader,
TagInfo,
Pad,
COLORS,
generate_password,
__version__ as READER_VERSION
)
# Default server settings
DEFAULT_SERVER = "100.94.163.117"
DEFAULT_PORT = 8547
# Tag colors config file
TAG_COLORS_FILE = "tag_colors.json"
# =============================================================================
# TAG COLOR MAPPINGS (loaded from tag_colors.json)
# =============================================================================
def load_tag_colors(filepath: str = TAG_COLORS_FILE) -> tuple:
"""
Load tag color mappings from JSON file.
Returns (tag_colors_dict, default_config)
"""
import json
default_config = {
'color': COLORS['CYAN'],
'effect': 'solid',
'name': 'Default'
}
tag_colors = {}
# Try to find the config file
search_paths = [
filepath,
os.path.join(os.path.dirname(os.path.abspath(__file__)), filepath),
os.path.join(os.getcwd(), filepath),
]
config_path = None
for path in search_paths:
if os.path.exists(path):
config_path = path
break
if config_path is None:
print(f" Note: {filepath} not found, using default colors")
return tag_colors, default_config
try:
with open(config_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Load default config
if 'default' in data:
default_config = {
'color': data['default'].get('color', COLORS['CYAN']),
'effect': data['default'].get('effect', 'solid'),
'name': data['default'].get('name', 'Default')
}
# Load tag mappings (convert string keys to int)
if 'tags' in data:
for key, config in data['tags'].items():
try:
legacy_key = int(key)
tag_colors[legacy_key] = {
'color': config.get('color', COLORS['CYAN']),
'effect': config.get('effect', 'solid'),
'name': config.get('name', '')
}
except ValueError:
pass # Skip invalid keys
print(f" Loaded {len(tag_colors)} tag color mappings from {filepath}")
except json.JSONDecodeError as e:
print(f" Warning: Invalid JSON in {filepath}: {e}")
except Exception as e:
print(f" Warning: Could not load {filepath}: {e}")
return tag_colors, default_config
# Will be loaded at runtime
TAG_COLORS = {}
DEFAULT_TAG_COLOR = {
'color': COLORS['CYAN'],
'effect': 'solid',
'name': 'Default'
}
def uid_to_legacy_key(uid: bytes) -> int:
"""
Convert 7-byte UID to legacy integer key format.
Uses bytes 3-6 (the unique portion) as a 32-bit little-endian integer.
"""
if len(uid) >= 7:
return int.from_bytes(uid[3:7], byteorder='little')
elif len(uid) >= 4:
return int.from_bytes(uid[:4], byteorder='little')
else:
return int.from_bytes(uid.ljust(4, b'\x00'), byteorder='little')
def send_nfc(server_ip, port, nfc_id):
"""Send NFC command to server"""
url = f"http://{server_ip}:{port}/api/video/play"
try:
print(f"Sending NFC '{nfc_id}' to {server_ip}:{port}...")
response = requests.post(url, json={'nfc_id': str(nfc_id)}, timeout=5)
if response.status_code == 200:
result = response.json()
if result.get('success'):
print(f"✓ SUCCESS: {result.get('message', 'Command sent')}")
return True
else:
print(f"✗ FAILED: {result.get('message', 'Unknown error')}")
return False
else:
print(f"✗ HTTP Error: {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print(f"✗ Cannot connect to server at {server_ip}:{port}")
return False
except requests.exceptions.Timeout:
print(f"✗ Request timeout - server not responding")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
def send_skip(server_ip, port):
"""Send skip command"""
url = f"http://{server_ip}:{port}/api/video/skip"
try:
print(f"Sending skip command to {server_ip}:{port}...")
response = requests.post(url, timeout=5)
if response.status_code == 200:
print("✓ Skip command sent")
return True
else:
print(f"✗ HTTP Error: {response.status_code}")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
def test_connection(server_ip, port):
"""Test connection to server"""
url = f"http://{server_ip}:{port}/api/stats"
try:
print(f"Testing connection to {server_ip}:{port}...")
response = requests.get(url, timeout=3)
if response.status_code == 200:
print("✓ Server is online and responding")
stats = response.json()
print(f" Videos played: {stats.get('videos_played', 'N/A')}")
print(f" Current video: {stats.get('current_video', 'N/A')}")
print(f" Uptime: {stats.get('uptime', 'N/A')}")
return True
else:
print(f"✗ Server error: HTTP {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print(f"✗ Cannot connect to server at {server_ip}:{port}")
print(" Make sure:")
print(" 1. The video player is running")
print(" 2. Web interface is enabled")
print(" 3. IP address and port are correct")
print(" 4. Firewall allows the connection")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
def portal_mode(server_ip, port):
"""Portal mode - read tags from LEGO Dimensions portal"""
global TAG_COLORS, DEFAULT_TAG_COLOR
print("\n" + "="*60)
print("LEGO DIMENSIONS PORTAL MODE")
print("="*60)
print(f"Server: {server_ip}:{port}")
print(f"Reader version: {READER_VERSION}")
# Load tag colors from JSON
TAG_COLORS, DEFAULT_TAG_COLOR = load_tag_colors()
print("="*60)
print()
# Track last sent to avoid duplicates
last_sent_key = None
last_sent_time = 0
DEBOUNCE_TIME = 2.0 # Seconds before allowing same tag again
# Will be set after reader is created
reader = None
def apply_tag_color(legacy_key: int):
"""Apply color effect to ALL pads based on tag mapping"""
if reader is None:
return
# Look up color config for this tag
config = TAG_COLORS.get(legacy_key, DEFAULT_TAG_COLOR)
color = config.get('color', COLORS['CYAN'])
effect = config.get('effect', 'solid')
name = config.get('name', '')
if name:
print(f" Theme: {name}")
# Apply effect to ALL pads
if effect == 'pulse':
# Slow pulsing fade on all pads (count=255 for continuous)
reader.fade_pad(Pad.ALL, color, speed=15, count=255)
elif effect == 'flash':
# Quick flashing on all pads
reader.flash_pad(Pad.ALL, color, on_time=8, off_time=8, count=255)
else:
# Solid color on all pads
reader.set_pad_color(Pad.ALL, color)
def on_tag_insert(tag: TagInfo):
nonlocal last_sent_key, last_sent_time
legacy_key = uid_to_legacy_key(tag.uid)
current_time = time.time()
print()
print(f"[+] TAG DETECTED on {tag.pad.name} pad")
print(f" UID: {tag.uid_hex}")
print(f" Legacy Key: {legacy_key}")
# Step 1: ALL LEDs OFF
if reader:
reader.set_pad_color(Pad.ALL, COLORS['OFF'])
# Debounce - don't resend same tag too quickly
if legacy_key == last_sent_key and (current_time - last_sent_time) < DEBOUNCE_TIME:
print(f" (Skipping server - same tag sent {current_time - last_sent_time:.1f}s ago)")
# Still apply the theme effect even if skipping server
apply_tag_color(legacy_key)
return
# Send to server
print()
if send_nfc(server_ip, port, legacy_key):
last_sent_key = legacy_key
last_sent_time = current_time
if reader:
# Step 2: Blue blink CENTER to confirm scan
reader.set_pad_color(Pad.CENTER, COLORS['BLUE'])
time.sleep(0.15)
# Step 3: ALL OFF
reader.set_pad_color(Pad.ALL, COLORS['OFF'])
time.sleep(0.1)
# Step 4: Apply theme effect to ALL pads
apply_tag_color(legacy_key)
print()
def on_tag_remove(tag: TagInfo):
print(f"[-] TAG REMOVED from {tag.pad.name} pad")
# Turn off ALL pad LEDs
if reader:
reader.set_pad_color(Pad.ALL, COLORS['OFF'])
def on_connect():
print("✓ Portal connected and initialized")
print()
print("Place LEGO Dimensions discs on the portal...")
print("Press Ctrl+C to exit")
print()
def on_disconnect():
print("\n✗ Portal disconnected")
def on_error(e):
print(f"\n✗ Portal error: {e}")
# Initialize reader
reader = LegoDimensionsReader()
reader.on_tag_insert = on_tag_insert
reader.on_tag_remove = on_tag_remove
reader.on_connect = on_connect
reader.on_disconnect = on_disconnect
reader.on_error = on_error
try:
print("Connecting to portal...")
reader.start()
# Keep running
while True:
time.sleep(0.5)
except KeyboardInterrupt:
print("\n\nExiting...")
except ConnectionError as e:
print(f"\n{e}")
return False
finally:
reader.disconnect()
print("Portal disconnected. Goodbye!")
return True
def interactive_mode(server_ip, port):
"""Interactive mode for manual NFC input (no portal needed)"""
print("\n" + "="*60)
print("INTERACTIVE MODE (Manual Input)")
print("="*60)
print(f"Server: {server_ip}:{port}")
print("Commands:")
print(" - Type NFC ID and press Enter to send")
print(" - Type 'skip' to skip current video")
print(" - Type 'test' to test connection")
print(" - Type 'quit' or 'exit' to exit")
print("="*60)
print()
while True:
try:
command = input("NFC> ").strip()
if not command:
continue
if command.lower() in ['quit', 'exit', 'q']:
print("Exiting...")
break
if command.lower() == 'skip':
send_skip(server_ip, port)
continue
if command.lower() == 'test':
test_connection(server_ip, port)
continue
# Assume it's an NFC ID
send_nfc(server_ip, port, command)
except KeyboardInterrupt:
print("\nExiting...")
break
except EOFError:
break
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description='LEGO Dimensions Portal Client - Send NFC commands to video player',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
Default server: {DEFAULT_SERVER}:{DEFAULT_PORT}
Examples:
# Portal mode (default) - read tags from LEGO Dimensions portal
python dimensions_cli_client.py
# Test connection first
python dimensions_cli_client.py -t
# Send single NFC command manually
python dimensions_cli_client.py -n 12345678
# Skip current video
python dimensions_cli_client.py --skip
# Manual interactive mode (no portal)
python dimensions_cli_client.py -i
# Use different server
python dimensions_cli_client.py -s 192.168.1.100 -p 8547
"""
)
parser.add_argument('-s', '--server',
default=DEFAULT_SERVER,
help=f'Server IP address (default: {DEFAULT_SERVER})')
parser.add_argument('-p', '--port',
type=int,
default=DEFAULT_PORT,
help=f'Server port (default: {DEFAULT_PORT})')
parser.add_argument('-n', '--nfc',
help='NFC ID to send (manual, no portal needed)')
parser.add_argument('-t', '--test',
action='store_true',
help='Test connection to server')
parser.add_argument('--skip',
action='store_true',
help='Send skip video command')
parser.add_argument('-i', '--interactive',
action='store_true',
help='Manual interactive mode (no portal)')
args = parser.parse_args()
print("="*60)
print("LEGO Dimensions Portal Client")
print("Moonlight Drive-In Theater System")
print("="*60)
# Test connection
if args.test:
test_connection(args.server, args.port)
return 0
# Send skip
if args.skip:
success = send_skip(args.server, args.port)
return 0 if success else 1
# Send manual NFC
if args.nfc:
success = send_nfc(args.server, args.port, args.nfc)
return 0 if success else 1
# Manual interactive mode
if args.interactive:
interactive_mode(args.server, args.port)
return 0
# Default: Portal mode
success = portal_mode(args.server, args.port)
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())