Add CLI client with refined LED sequence and JSON color config
- Tag detection turns ALL LEDs off immediately - Server success shows CENTER blue blink (150ms) - Theme effects (pulse/flash/solid) apply to ALL pads - External tag_colors.json for customizable per-tag themes - Debounce logic preserves theme effect even when skipping server
This commit is contained in:
483
dimensions_cli_client.py
Normal file
483
dimensions_cli_client.py
Normal file
@@ -0,0 +1,483 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
LEGO Dimensions Portal CLI Client
|
||||||
|
Moonlight Drive-In Theater System
|
||||||
|
|
||||||
|
Reads LEGO Dimensions character/vehicle discs from the USB portal
|
||||||
|
and sends legacy keys to the video player server.
|
||||||
|
|
||||||
|
Requirements:
|
||||||
|
pip install pyusb libusb-package requests
|
||||||
|
|
||||||
|
Windows Driver Setup:
|
||||||
|
Use Zadig to install WinUSB driver for "LEGO READER V2.10"
|
||||||
|
"""
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Add current directory to path for lego_dimensions_reader import
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
|
||||||
|
from lego_dimensions_reader import (
|
||||||
|
LegoDimensionsReader,
|
||||||
|
TagInfo,
|
||||||
|
Pad,
|
||||||
|
COLORS,
|
||||||
|
generate_password,
|
||||||
|
__version__ as READER_VERSION
|
||||||
|
)
|
||||||
|
|
||||||
|
# Default server settings
|
||||||
|
DEFAULT_SERVER = "100.94.163.117"
|
||||||
|
DEFAULT_PORT = 8547
|
||||||
|
|
||||||
|
# Tag colors config file
|
||||||
|
TAG_COLORS_FILE = "tag_colors.json"
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# TAG COLOR MAPPINGS (loaded from tag_colors.json)
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
def load_tag_colors(filepath: str = TAG_COLORS_FILE) -> tuple:
|
||||||
|
"""
|
||||||
|
Load tag color mappings from JSON file.
|
||||||
|
Returns (tag_colors_dict, default_config)
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
|
||||||
|
default_config = {
|
||||||
|
'color': COLORS['CYAN'],
|
||||||
|
'effect': 'solid',
|
||||||
|
'name': 'Default'
|
||||||
|
}
|
||||||
|
|
||||||
|
tag_colors = {}
|
||||||
|
|
||||||
|
# Try to find the config file
|
||||||
|
search_paths = [
|
||||||
|
filepath,
|
||||||
|
os.path.join(os.path.dirname(os.path.abspath(__file__)), filepath),
|
||||||
|
os.path.join(os.getcwd(), filepath),
|
||||||
|
]
|
||||||
|
|
||||||
|
config_path = None
|
||||||
|
for path in search_paths:
|
||||||
|
if os.path.exists(path):
|
||||||
|
config_path = path
|
||||||
|
break
|
||||||
|
|
||||||
|
if config_path is None:
|
||||||
|
print(f" Note: {filepath} not found, using default colors")
|
||||||
|
return tag_colors, default_config
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(config_path, 'r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
# Load default config
|
||||||
|
if 'default' in data:
|
||||||
|
default_config = {
|
||||||
|
'color': data['default'].get('color', COLORS['CYAN']),
|
||||||
|
'effect': data['default'].get('effect', 'solid'),
|
||||||
|
'name': data['default'].get('name', 'Default')
|
||||||
|
}
|
||||||
|
|
||||||
|
# Load tag mappings (convert string keys to int)
|
||||||
|
if 'tags' in data:
|
||||||
|
for key, config in data['tags'].items():
|
||||||
|
try:
|
||||||
|
legacy_key = int(key)
|
||||||
|
tag_colors[legacy_key] = {
|
||||||
|
'color': config.get('color', COLORS['CYAN']),
|
||||||
|
'effect': config.get('effect', 'solid'),
|
||||||
|
'name': config.get('name', '')
|
||||||
|
}
|
||||||
|
except ValueError:
|
||||||
|
pass # Skip invalid keys
|
||||||
|
|
||||||
|
print(f" Loaded {len(tag_colors)} tag color mappings from {filepath}")
|
||||||
|
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
print(f" Warning: Invalid JSON in {filepath}: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" Warning: Could not load {filepath}: {e}")
|
||||||
|
|
||||||
|
return tag_colors, default_config
|
||||||
|
|
||||||
|
|
||||||
|
# Will be loaded at runtime
|
||||||
|
TAG_COLORS = {}
|
||||||
|
DEFAULT_TAG_COLOR = {
|
||||||
|
'color': COLORS['CYAN'],
|
||||||
|
'effect': 'solid',
|
||||||
|
'name': 'Default'
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def uid_to_legacy_key(uid: bytes) -> int:
|
||||||
|
"""
|
||||||
|
Convert 7-byte UID to legacy integer key format.
|
||||||
|
Uses bytes 3-6 (the unique portion) as a 32-bit little-endian integer.
|
||||||
|
"""
|
||||||
|
if len(uid) >= 7:
|
||||||
|
return int.from_bytes(uid[3:7], byteorder='little')
|
||||||
|
elif len(uid) >= 4:
|
||||||
|
return int.from_bytes(uid[:4], byteorder='little')
|
||||||
|
else:
|
||||||
|
return int.from_bytes(uid.ljust(4, b'\x00'), byteorder='little')
|
||||||
|
|
||||||
|
|
||||||
|
def send_nfc(server_ip, port, nfc_id):
|
||||||
|
"""Send NFC command to server"""
|
||||||
|
url = f"http://{server_ip}:{port}/api/video/play"
|
||||||
|
|
||||||
|
try:
|
||||||
|
print(f"Sending NFC '{nfc_id}' to {server_ip}:{port}...")
|
||||||
|
response = requests.post(url, json={'nfc_id': str(nfc_id)}, timeout=5)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
result = response.json()
|
||||||
|
if result.get('success'):
|
||||||
|
print(f"✓ SUCCESS: {result.get('message', 'Command sent')}")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"✗ FAILED: {result.get('message', 'Unknown error')}")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
print(f"✗ HTTP Error: {response.status_code}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
print(f"✗ Cannot connect to server at {server_ip}:{port}")
|
||||||
|
return False
|
||||||
|
except requests.exceptions.Timeout:
|
||||||
|
print(f"✗ Request timeout - server not responding")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
print(f"✗ Error: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def send_skip(server_ip, port):
|
||||||
|
"""Send skip command"""
|
||||||
|
url = f"http://{server_ip}:{port}/api/video/skip"
|
||||||
|
|
||||||
|
try:
|
||||||
|
print(f"Sending skip command to {server_ip}:{port}...")
|
||||||
|
response = requests.post(url, timeout=5)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
print("✓ Skip command sent")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"✗ HTTP Error: {response.status_code}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"✗ Error: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def test_connection(server_ip, port):
|
||||||
|
"""Test connection to server"""
|
||||||
|
url = f"http://{server_ip}:{port}/api/stats"
|
||||||
|
|
||||||
|
try:
|
||||||
|
print(f"Testing connection to {server_ip}:{port}...")
|
||||||
|
response = requests.get(url, timeout=3)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
print("✓ Server is online and responding")
|
||||||
|
stats = response.json()
|
||||||
|
print(f" Videos played: {stats.get('videos_played', 'N/A')}")
|
||||||
|
print(f" Current video: {stats.get('current_video', 'N/A')}")
|
||||||
|
print(f" Uptime: {stats.get('uptime', 'N/A')}")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"✗ Server error: HTTP {response.status_code}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
print(f"✗ Cannot connect to server at {server_ip}:{port}")
|
||||||
|
print(" Make sure:")
|
||||||
|
print(" 1. The video player is running")
|
||||||
|
print(" 2. Web interface is enabled")
|
||||||
|
print(" 3. IP address and port are correct")
|
||||||
|
print(" 4. Firewall allows the connection")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
print(f"✗ Error: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def portal_mode(server_ip, port):
|
||||||
|
"""Portal mode - read tags from LEGO Dimensions portal"""
|
||||||
|
global TAG_COLORS, DEFAULT_TAG_COLOR
|
||||||
|
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("LEGO DIMENSIONS PORTAL MODE")
|
||||||
|
print("="*60)
|
||||||
|
print(f"Server: {server_ip}:{port}")
|
||||||
|
print(f"Reader version: {READER_VERSION}")
|
||||||
|
|
||||||
|
# Load tag colors from JSON
|
||||||
|
TAG_COLORS, DEFAULT_TAG_COLOR = load_tag_colors()
|
||||||
|
|
||||||
|
print("="*60)
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Track last sent to avoid duplicates
|
||||||
|
last_sent_key = None
|
||||||
|
last_sent_time = 0
|
||||||
|
DEBOUNCE_TIME = 2.0 # Seconds before allowing same tag again
|
||||||
|
|
||||||
|
# Will be set after reader is created
|
||||||
|
reader = None
|
||||||
|
|
||||||
|
def apply_tag_color(legacy_key: int):
|
||||||
|
"""Apply color effect to ALL pads based on tag mapping"""
|
||||||
|
if reader is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Look up color config for this tag
|
||||||
|
config = TAG_COLORS.get(legacy_key, DEFAULT_TAG_COLOR)
|
||||||
|
color = config.get('color', COLORS['CYAN'])
|
||||||
|
effect = config.get('effect', 'solid')
|
||||||
|
name = config.get('name', '')
|
||||||
|
|
||||||
|
if name:
|
||||||
|
print(f" Theme: {name}")
|
||||||
|
|
||||||
|
# Apply effect to ALL pads
|
||||||
|
if effect == 'pulse':
|
||||||
|
# Slow pulsing fade on all pads (count=255 for continuous)
|
||||||
|
reader.fade_pad(Pad.ALL, color, speed=15, count=255)
|
||||||
|
elif effect == 'flash':
|
||||||
|
# Quick flashing on all pads
|
||||||
|
reader.flash_pad(Pad.ALL, color, on_time=8, off_time=8, count=255)
|
||||||
|
else:
|
||||||
|
# Solid color on all pads
|
||||||
|
reader.set_pad_color(Pad.ALL, color)
|
||||||
|
|
||||||
|
def on_tag_insert(tag: TagInfo):
|
||||||
|
nonlocal last_sent_key, last_sent_time
|
||||||
|
|
||||||
|
legacy_key = uid_to_legacy_key(tag.uid)
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
print()
|
||||||
|
print(f"[+] TAG DETECTED on {tag.pad.name} pad")
|
||||||
|
print(f" UID: {tag.uid_hex}")
|
||||||
|
print(f" Legacy Key: {legacy_key}")
|
||||||
|
|
||||||
|
# Step 1: ALL LEDs OFF
|
||||||
|
if reader:
|
||||||
|
reader.set_pad_color(Pad.ALL, COLORS['OFF'])
|
||||||
|
|
||||||
|
# Debounce - don't resend same tag too quickly
|
||||||
|
if legacy_key == last_sent_key and (current_time - last_sent_time) < DEBOUNCE_TIME:
|
||||||
|
print(f" (Skipping server - same tag sent {current_time - last_sent_time:.1f}s ago)")
|
||||||
|
# Still apply the theme effect even if skipping server
|
||||||
|
apply_tag_color(legacy_key)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Send to server
|
||||||
|
print()
|
||||||
|
if send_nfc(server_ip, port, legacy_key):
|
||||||
|
last_sent_key = legacy_key
|
||||||
|
last_sent_time = current_time
|
||||||
|
|
||||||
|
if reader:
|
||||||
|
# Step 2: Blue blink CENTER to confirm scan
|
||||||
|
reader.set_pad_color(Pad.CENTER, COLORS['BLUE'])
|
||||||
|
time.sleep(0.15)
|
||||||
|
|
||||||
|
# Step 3: ALL OFF
|
||||||
|
reader.set_pad_color(Pad.ALL, COLORS['OFF'])
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# Step 4: Apply theme effect to ALL pads
|
||||||
|
apply_tag_color(legacy_key)
|
||||||
|
print()
|
||||||
|
|
||||||
|
def on_tag_remove(tag: TagInfo):
|
||||||
|
print(f"[-] TAG REMOVED from {tag.pad.name} pad")
|
||||||
|
# Turn off ALL pad LEDs
|
||||||
|
if reader:
|
||||||
|
reader.set_pad_color(Pad.ALL, COLORS['OFF'])
|
||||||
|
|
||||||
|
def on_connect():
|
||||||
|
print("✓ Portal connected and initialized")
|
||||||
|
print()
|
||||||
|
print("Place LEGO Dimensions discs on the portal...")
|
||||||
|
print("Press Ctrl+C to exit")
|
||||||
|
print()
|
||||||
|
|
||||||
|
def on_disconnect():
|
||||||
|
print("\n✗ Portal disconnected")
|
||||||
|
|
||||||
|
def on_error(e):
|
||||||
|
print(f"\n✗ Portal error: {e}")
|
||||||
|
|
||||||
|
# Initialize reader
|
||||||
|
reader = LegoDimensionsReader()
|
||||||
|
reader.on_tag_insert = on_tag_insert
|
||||||
|
reader.on_tag_remove = on_tag_remove
|
||||||
|
reader.on_connect = on_connect
|
||||||
|
reader.on_disconnect = on_disconnect
|
||||||
|
reader.on_error = on_error
|
||||||
|
|
||||||
|
try:
|
||||||
|
print("Connecting to portal...")
|
||||||
|
reader.start()
|
||||||
|
|
||||||
|
# Keep running
|
||||||
|
while True:
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\n\nExiting...")
|
||||||
|
except ConnectionError as e:
|
||||||
|
print(f"\n✗ {e}")
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
reader.disconnect()
|
||||||
|
print("Portal disconnected. Goodbye!")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def interactive_mode(server_ip, port):
|
||||||
|
"""Interactive mode for manual NFC input (no portal needed)"""
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("INTERACTIVE MODE (Manual Input)")
|
||||||
|
print("="*60)
|
||||||
|
print(f"Server: {server_ip}:{port}")
|
||||||
|
print("Commands:")
|
||||||
|
print(" - Type NFC ID and press Enter to send")
|
||||||
|
print(" - Type 'skip' to skip current video")
|
||||||
|
print(" - Type 'test' to test connection")
|
||||||
|
print(" - Type 'quit' or 'exit' to exit")
|
||||||
|
print("="*60)
|
||||||
|
print()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
command = input("NFC> ").strip()
|
||||||
|
|
||||||
|
if not command:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if command.lower() in ['quit', 'exit', 'q']:
|
||||||
|
print("Exiting...")
|
||||||
|
break
|
||||||
|
|
||||||
|
if command.lower() == 'skip':
|
||||||
|
send_skip(server_ip, port)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if command.lower() == 'test':
|
||||||
|
test_connection(server_ip, port)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Assume it's an NFC ID
|
||||||
|
send_nfc(server_ip, port, command)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\nExiting...")
|
||||||
|
break
|
||||||
|
except EOFError:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main entry point"""
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='LEGO Dimensions Portal Client - Send NFC commands to video player',
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
epilog=f"""
|
||||||
|
Default server: {DEFAULT_SERVER}:{DEFAULT_PORT}
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
# Portal mode (default) - read tags from LEGO Dimensions portal
|
||||||
|
python dimensions_cli_client.py
|
||||||
|
|
||||||
|
# Test connection first
|
||||||
|
python dimensions_cli_client.py -t
|
||||||
|
|
||||||
|
# Send single NFC command manually
|
||||||
|
python dimensions_cli_client.py -n 12345678
|
||||||
|
|
||||||
|
# Skip current video
|
||||||
|
python dimensions_cli_client.py --skip
|
||||||
|
|
||||||
|
# Manual interactive mode (no portal)
|
||||||
|
python dimensions_cli_client.py -i
|
||||||
|
|
||||||
|
# Use different server
|
||||||
|
python dimensions_cli_client.py -s 192.168.1.100 -p 8547
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument('-s', '--server',
|
||||||
|
default=DEFAULT_SERVER,
|
||||||
|
help=f'Server IP address (default: {DEFAULT_SERVER})')
|
||||||
|
|
||||||
|
parser.add_argument('-p', '--port',
|
||||||
|
type=int,
|
||||||
|
default=DEFAULT_PORT,
|
||||||
|
help=f'Server port (default: {DEFAULT_PORT})')
|
||||||
|
|
||||||
|
parser.add_argument('-n', '--nfc',
|
||||||
|
help='NFC ID to send (manual, no portal needed)')
|
||||||
|
|
||||||
|
parser.add_argument('-t', '--test',
|
||||||
|
action='store_true',
|
||||||
|
help='Test connection to server')
|
||||||
|
|
||||||
|
parser.add_argument('--skip',
|
||||||
|
action='store_true',
|
||||||
|
help='Send skip video command')
|
||||||
|
|
||||||
|
parser.add_argument('-i', '--interactive',
|
||||||
|
action='store_true',
|
||||||
|
help='Manual interactive mode (no portal)')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
print("="*60)
|
||||||
|
print("LEGO Dimensions Portal Client")
|
||||||
|
print("Moonlight Drive-In Theater System")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
# Test connection
|
||||||
|
if args.test:
|
||||||
|
test_connection(args.server, args.port)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Send skip
|
||||||
|
if args.skip:
|
||||||
|
success = send_skip(args.server, args.port)
|
||||||
|
return 0 if success else 1
|
||||||
|
|
||||||
|
# Send manual NFC
|
||||||
|
if args.nfc:
|
||||||
|
success = send_nfc(args.server, args.port, args.nfc)
|
||||||
|
return 0 if success else 1
|
||||||
|
|
||||||
|
# Manual interactive mode
|
||||||
|
if args.interactive:
|
||||||
|
interactive_mode(args.server, args.port)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Default: Portal mode
|
||||||
|
success = portal_mode(args.server, args.port)
|
||||||
|
return 0 if success else 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
Reference in New Issue
Block a user