Files
lego-dimensions-reader/dimensions_cli_client.py
T
jessikitty ac190d49c1 fix: Don't kill cycle effect when removing non-cycle tags
Previously stop_cycle_effect() was called on every tag removal,
which froze the cycle when a non-cycle tag was removed. Now the
removal logic has three distinct paths:

1. Cycle tag removed → stop cycle, restore other pads
2. Non-cycle tag removed while cycle still active → do nothing,
   let the cycle thread keep controlling all pads
3. Normal removal (no cycle) → just turn off the pad
2026-04-06 17:00:30 +10:00

544 lines
17 KiB
Python

#!/usr/bin/env python3
"""
LEGO Dimensions Portal CLI Client
Moonlight Drive-In Theater System
Reads LEGO Dimensions character/vehicle discs from the USB portal
and sends legacy keys to the video player server.
Requirements:
pip install pyusb libusb-package requests
Windows Driver Setup:
Use Zadig to install WinUSB driver for "LEGO READER V2.10"
"""
import requests
import sys
import argparse
import time
import os
# Add current directory to path for lego_dimensions_reader import
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from lego_dimensions_reader import (
LegoDimensionsReader,
TagInfo,
Pad,
COLORS,
generate_password,
__version__ as READER_VERSION
)
# Default server settings
DEFAULT_SERVER = "100.94.163.117"
DEFAULT_PORT = 8547
# Tag colors config file
TAG_COLORS_FILE = "tag_colors.json"
# =============================================================================
# TAG COLOR MAPPINGS (loaded from tag_colors.json)
# =============================================================================
def load_tag_colors(filepath: str = TAG_COLORS_FILE) -> tuple:
"""
Load tag color mappings from JSON file.
Returns (tag_colors_dict, default_config)
"""
import json
default_config = {
'color': COLORS['CYAN'],
'effect': 'solid',
'speed': 1.0,
'name': 'Default'
}
tag_colors = {}
# Try to find the config file
search_paths = [
filepath,
os.path.join(os.path.dirname(os.path.abspath(__file__)), filepath),
os.path.join(os.getcwd(), filepath),
]
config_path = None
for path in search_paths:
if os.path.exists(path):
config_path = path
break
if config_path is None:
print(f" Note: {filepath} not found, using default colors")
return tag_colors, default_config
try:
with open(config_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Load default config
if 'default' in data:
default_config = {
'color': data['default'].get('color', COLORS['CYAN']),
'effect': data['default'].get('effect', 'solid'),
'speed': data['default'].get('speed', 1.0),
'name': data['default'].get('name', 'Default')
}
# Load tag mappings (convert string keys to int)
if 'tags' in data:
for key, config in data['tags'].items():
try:
legacy_key = int(key)
tag_colors[legacy_key] = {
'color': config.get('color', COLORS['CYAN']),
'effect': config.get('effect', 'solid'),
'speed': config.get('speed', 1.0),
'name': config.get('name', '')
}
except ValueError:
pass # Skip invalid keys
print(f" Loaded {len(tag_colors)} tag color mappings from {filepath}")
except json.JSONDecodeError as e:
print(f" Warning: Invalid JSON in {filepath}: {e}")
except Exception as e:
print(f" Warning: Could not load {filepath}: {e}")
return tag_colors, default_config
# Will be loaded at runtime
TAG_COLORS = {}
DEFAULT_TAG_COLOR = {
'color': COLORS['CYAN'],
'effect': 'solid',
'speed': 1.0,
'name': 'Default'
}
def _get_tag_effect(legacy_key: int) -> str:
"""Get the effect type for a tag's legacy key."""
config = TAG_COLORS.get(legacy_key, DEFAULT_TAG_COLOR)
return config.get('effect', 'solid')
def uid_to_legacy_key(uid: bytes) -> int:
"""
Convert 7-byte UID to legacy integer key format.
Uses bytes 3-6 (the unique portion) as a 32-bit little-endian integer.
"""
if len(uid) >= 7:
return int.from_bytes(uid[3:7], byteorder='little')
elif len(uid) >= 4:
return int.from_bytes(uid[:4], byteorder='little')
else:
return int.from_bytes(uid.ljust(4, b'\x00'), byteorder='little')
def send_nfc(server_ip, port, nfc_id):
"""Send NFC command to server"""
url = f"http://{server_ip}:{port}/api/video/play"
try:
print(f"Sending NFC '{nfc_id}' to {server_ip}:{port}...")
response = requests.post(url, json={'nfc_id': str(nfc_id)}, timeout=5)
if response.status_code == 200:
result = response.json()
if result.get('success'):
print(f"✓ SUCCESS: {result.get('message', 'Command sent')}")
return True
else:
print(f"✗ FAILED: {result.get('message', 'Unknown error')}")
return False
else:
print(f"✗ HTTP Error: {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print(f"✗ Cannot connect to server at {server_ip}:{port}")
return False
except requests.exceptions.Timeout:
print(f"✗ Request timeout - server not responding")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
def send_skip(server_ip, port):
"""Send skip command"""
url = f"http://{server_ip}:{port}/api/video/skip"
try:
print(f"Sending skip command to {server_ip}:{port}...")
response = requests.post(url, timeout=5)
if response.status_code == 200:
print("✓ Skip command sent")
return True
else:
print(f"✗ HTTP Error: {response.status_code}")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
def test_connection(server_ip, port):
"""Test connection to server"""
url = f"http://{server_ip}:{port}/api/stats"
try:
print(f"Testing connection to {server_ip}:{port}...")
response = requests.get(url, timeout=3)
if response.status_code == 200:
print("✓ Server is online and responding")
stats = response.json()
print(f" Videos played: {stats.get('videos_played', 'N/A')}")
print(f" Current video: {stats.get('current_video', 'N/A')}")
print(f" Uptime: {stats.get('uptime', 'N/A')}")
return True
else:
print(f"✗ Server error: HTTP {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print(f"✗ Cannot connect to server at {server_ip}:{port}")
print(" Make sure:")
print(" 1. The video player is running")
print(" 2. Web interface is enabled")
print(" 3. IP address and port are correct")
print(" 4. Firewall allows the connection")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
def portal_mode(server_ip, port):
"""Portal mode - read tags from LEGO Dimensions portal"""
global TAG_COLORS, DEFAULT_TAG_COLOR
print("\n" + "="*60)
print("LEGO DIMENSIONS PORTAL MODE")
print("="*60)
print(f"Server: {server_ip}:{port}")
print(f"Reader version: {READER_VERSION}")
# Load tag colors from JSON
TAG_COLORS, DEFAULT_TAG_COLOR = load_tag_colors()
print("="*60)
print()
# Track last sent to avoid duplicates
last_sent_key = None
last_sent_time = 0
DEBOUNCE_TIME = 2.0 # Seconds before allowing same tag again
# Track which pads have active tags
active_pads = {} # pad_num -> legacy_key
# Will be set after reader is created
reader = None
def apply_tag_color(pad: Pad, legacy_key: int):
"""Apply color effect to SPECIFIC pad based on tag mapping"""
if reader is None:
return
# Look up color config for this tag
config = TAG_COLORS.get(legacy_key, DEFAULT_TAG_COLOR)
color = config.get('color', COLORS['CYAN'])
effect = config.get('effect', 'solid')
speed = config.get('speed', 1.0)
name = config.get('name', '')
if name:
print(f" Theme: {name}")
# Apply effect using the new apply_effect method
# For cycle effect, it automatically uses all pads
reader.apply_effect(pad, color, effect, speed)
def on_tag_insert(tag: TagInfo):
nonlocal last_sent_key, last_sent_time
legacy_key = uid_to_legacy_key(tag.uid)
current_time = time.time()
print()
print(f"[+] TAG DETECTED on {tag.pad.name} pad")
print(f" UID: {tag.uid_hex}")
print(f" Legacy Key: {legacy_key}")
# Track this pad as active
active_pads[tag.pad.value] = legacy_key
# Turn off ONLY this pad first (clean slate for new effect)
if reader:
reader.set_pad_color(tag.pad, COLORS['OFF'])
# Debounce - don't resend same tag too quickly
if legacy_key == last_sent_key and (current_time - last_sent_time) < DEBOUNCE_TIME:
print(f" (Skipping server - same tag sent {current_time - last_sent_time:.1f}s ago)")
# Still apply the theme effect even if skipping server
apply_tag_color(tag.pad, legacy_key)
return
# Send to server
print()
if send_nfc(server_ip, port, legacy_key):
last_sent_key = legacy_key
last_sent_time = current_time
if reader:
# Quick blue blink on THIS pad to confirm scan
reader.set_pad_color(tag.pad, COLORS['BLUE'])
time.sleep(0.15)
# Turn off THIS pad
reader.set_pad_color(tag.pad, COLORS['OFF'])
time.sleep(0.1)
# Apply theme effect to THIS pad only
apply_tag_color(tag.pad, legacy_key)
print()
def on_tag_remove(tag: TagInfo):
print(f"[-] TAG REMOVED from {tag.pad.name} pad")
# Check if the removed tag had a cycle effect
removed_key = active_pads.get(tag.pad.value)
removed_is_cycle = False
if removed_key is not None:
removed_is_cycle = _get_tag_effect(removed_key) == 'cycle'
# Remove from active tracking
if tag.pad.value in active_pads:
del active_pads[tag.pad.value]
# Check if any REMAINING active tag has a cycle effect
remaining_cycle = False
for pad_val, key in active_pads.items():
if _get_tag_effect(key) == 'cycle':
remaining_cycle = True
break
if reader:
if removed_is_cycle:
# The cycle tag itself was removed - stop cycle and restore pads
reader.stop_cycle_effect()
reader.stop_effect(tag.pad)
reader.set_pad_color(tag.pad, COLORS['OFF'])
# Restore all other pads to their proper state
for pad_enum in [Pad.CENTER, Pad.LEFT, Pad.RIGHT]:
if pad_enum == tag.pad:
continue # Already handled above
if pad_enum.value in active_pads:
# Pad has an active tag - re-apply its theme
apply_tag_color(pad_enum, active_pads[pad_enum.value])
else:
# No tag on this pad - turn it off
reader.stop_effect(pad_enum)
reader.set_pad_color(pad_enum, COLORS['OFF'])
elif remaining_cycle:
# A cycle effect from another tag is still running
# Don't stop anything - the cycle thread controls all pads
# Just update tracking (already done above)
pass
else:
# Normal removal (no cycle involved at all)
reader.stop_effect(tag.pad)
reader.set_pad_color(tag.pad, COLORS['OFF'])
def on_connect():
print("✓ Portal connected and initialized")
print()
print("Place LEGO Dimensions discs on the portal...")
print("Press Ctrl+C to exit")
print()
def on_disconnect():
print("\n✗ Portal disconnected")
def on_error(e):
print(f"\n✗ Portal error: {e}")
# Initialize reader
reader = LegoDimensionsReader()
reader.on_tag_insert = on_tag_insert
reader.on_tag_remove = on_tag_remove
reader.on_connect = on_connect
reader.on_disconnect = on_disconnect
reader.on_error = on_error
try:
print("Connecting to portal...")
reader.start()
# Keep running
while True:
time.sleep(0.5)
except KeyboardInterrupt:
print("\n\nExiting...")
except ConnectionError as e:
print(f"\n{e}")
return False
finally:
reader.disconnect()
print("Portal disconnected. Goodbye!")
return True
def interactive_mode(server_ip, port):
"""Interactive mode for manual NFC input (no portal needed)"""
print("\n" + "="*60)
print("INTERACTIVE MODE (Manual Input)")
print("="*60)
print(f"Server: {server_ip}:{port}")
print("Commands:")
print(" - Type NFC ID and press Enter to send")
print(" - Type 'skip' to skip current video")
print(" - Type 'test' to test connection")
print(" - Type 'quit' or 'exit' to exit")
print("="*60)
print()
while True:
try:
command = input("NFC> ").strip()
if not command:
continue
if command.lower() in ['quit', 'exit', 'q']:
print("Exiting...")
break
if command.lower() == 'skip':
send_skip(server_ip, port)
continue
if command.lower() == 'test':
test_connection(server_ip, port)
continue
# Assume it's an NFC ID
send_nfc(server_ip, port, command)
except KeyboardInterrupt:
print("\nExiting...")
break
except EOFError:
break
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description='LEGO Dimensions Portal Client - Send NFC commands to video player',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
Default server: {DEFAULT_SERVER}:{DEFAULT_PORT}
Examples:
# Portal mode (default) - read tags from LEGO Dimensions portal
python dimensions_cli_client.py
# Test connection first
python dimensions_cli_client.py -t
# Send single NFC command manually
python dimensions_cli_client.py -n 12345678
# Skip current video
python dimensions_cli_client.py --skip
# Manual interactive mode (no portal)
python dimensions_cli_client.py -i
# Use different server
python dimensions_cli_client.py -s 192.168.1.100 -p 8547
Effects in tag_colors.json:
"solid" - Constant color on the pad
"flash" - Blinking effect
"pulse" - Breathing/fading effect
"cycle" - Revolving lights across all pads (police lights!)
"""
)
parser.add_argument('-s', '--server',
default=DEFAULT_SERVER,
help=f'Server IP address (default: {DEFAULT_SERVER})')
parser.add_argument('-p', '--port',
type=int,
default=DEFAULT_PORT,
help=f'Server port (default: {DEFAULT_PORT})')
parser.add_argument('-n', '--nfc',
help='NFC ID to send (manual, no portal needed)')
parser.add_argument('-t', '--test',
action='store_true',
help='Test connection to server')
parser.add_argument('--skip',
action='store_true',
help='Send skip video command')
parser.add_argument('-i', '--interactive',
action='store_true',
help='Manual interactive mode (no portal)')
args = parser.parse_args()
print("="*60)
print("LEGO Dimensions Portal Client")
print("Moonlight Drive-In Theater System")
print("="*60)
# Test connection
if args.test:
test_connection(args.server, args.port)
return 0
# Send skip
if args.skip:
success = send_skip(args.server, args.port)
return 0 if success else 1
# Send manual NFC
if args.nfc:
success = send_nfc(args.server, args.port, args.nfc)
return 0 if success else 1
# Manual interactive mode
if args.interactive:
interactive_mode(args.server, args.port)
return 0
# Default: Portal mode
success = portal_mode(args.server, args.port)
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())