1038 lines
42 KiB
Python
1038 lines
42 KiB
Python
# enhanced_debug_console.py
|
|
"""
|
|
Enhanced Debug Console with Web Interface Integration
|
|
Maintains all original functionality while adding web interface support
|
|
"""
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk, scrolledtext, messagebox
|
|
import threading
|
|
import time
|
|
import logging
|
|
import socket
|
|
from datetime import datetime
|
|
from collections import deque
|
|
import sys
|
|
|
|
def make_json_safe(data):
|
|
"""Convert data to be JSON-safe by handling datetime objects"""
|
|
if isinstance(data, dict):
|
|
return {key: make_json_safe(value) for key, value in data.items()}
|
|
elif isinstance(data, list):
|
|
return [make_json_safe(item) for item in data]
|
|
elif isinstance(data, datetime):
|
|
return data.isoformat()
|
|
elif hasattr(data, 'total_seconds'): # timedelta
|
|
return str(data)
|
|
else:
|
|
return data
|
|
|
|
class EnhancedDebugConsoleWithWeb:
|
|
def __init__(self, enable_web=True, web_port=8547):
|
|
self.root = tk.Tk()
|
|
self.text_widget = None
|
|
self.running = False
|
|
self.logger = logging.getLogger(__name__)
|
|
self.video_player = None
|
|
self.message_queue = deque(maxlen=1000)
|
|
self.lock = threading.Lock()
|
|
self.nfc_buffer = ""
|
|
self.nfc_timeout = 0.3
|
|
self.last_nfc_input = 0
|
|
|
|
# Web interface integration
|
|
self.enable_web = enable_web
|
|
self.web_port = web_port
|
|
self.web_interface = None
|
|
self.web_thread = None
|
|
|
|
# Global input capture
|
|
self.global_listener = None
|
|
self.global_input_method = None
|
|
self.global_input_active = False
|
|
|
|
# Track mute state instead of volume
|
|
self.is_muted = False
|
|
|
|
self.stats = {
|
|
'videos_played': 0,
|
|
'key_presses': 0,
|
|
'errors': 0,
|
|
'start_time': datetime.now(),
|
|
'queue_depth': 0,
|
|
'current_video': 'None',
|
|
'fullscreen': True
|
|
}
|
|
|
|
self.setup_gui()
|
|
if self.enable_web:
|
|
self.setup_web_interface()
|
|
self.setup_global_input_capture()
|
|
self.log("Enhanced debug console with web interface initialized")
|
|
|
|
def get_local_ip(self):
|
|
"""Get the local IP address of this machine"""
|
|
try:
|
|
# Try to connect to a remote address to determine local IP
|
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
|
# Use Google's DNS server - doesn't actually connect
|
|
s.connect(('8.8.8.8', 80))
|
|
local_ip = s.getsockname()[0]
|
|
return local_ip
|
|
except Exception:
|
|
try:
|
|
# Fallback method - get hostname IP
|
|
hostname = socket.gethostname()
|
|
local_ip = socket.gethostbyname(hostname)
|
|
if local_ip.startswith('127.'):
|
|
# If we get localhost, try alternative method
|
|
return self.get_network_ip_alternative()
|
|
return local_ip
|
|
except Exception:
|
|
return 'localhost'
|
|
|
|
def get_network_ip_alternative(self):
|
|
"""Alternative method to get network IP"""
|
|
try:
|
|
import subprocess
|
|
import platform
|
|
|
|
system = platform.system().lower()
|
|
|
|
if system == 'windows':
|
|
# Windows: use ipconfig
|
|
result = subprocess.run(['ipconfig'], capture_output=True, text=True)
|
|
lines = result.stdout.split('\n')
|
|
for i, line in enumerate(lines):
|
|
if 'Wireless LAN adapter' in line or 'Ethernet adapter' in line:
|
|
# Look for IPv4 address in next few lines
|
|
for j in range(i+1, min(i+10, len(lines))):
|
|
if 'IPv4 Address' in lines[j] and '192.168.' in lines[j]:
|
|
ip = lines[j].split(':')[-1].strip()
|
|
return ip
|
|
|
|
elif system == 'linux' or system == 'darwin':
|
|
# Linux/Mac: use hostname -I or ifconfig
|
|
try:
|
|
result = subprocess.run(['hostname', '-I'], capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
ips = result.stdout.strip().split()
|
|
for ip in ips:
|
|
if ip.startswith('192.168.') or ip.startswith('10.') or ip.startswith('172.'):
|
|
return ip
|
|
except:
|
|
pass
|
|
|
|
return 'localhost'
|
|
except Exception:
|
|
return 'localhost'
|
|
|
|
def setup_web_interface(self):
|
|
"""Setup web interface in separate thread"""
|
|
try:
|
|
from web_interface import WebInterface
|
|
|
|
# Get local IP address
|
|
local_ip = self.get_local_ip()
|
|
|
|
# Create web interface instance
|
|
self.web_interface = WebInterface(debug_console=self)
|
|
self.log(f"Web interface created - will start on port {self.web_port}")
|
|
|
|
# Log URLs
|
|
self.log(f"Local URL: http://localhost:{self.web_port}")
|
|
self.log(f"Network URL: http://{local_ip}:{self.web_port}")
|
|
|
|
# Start web interface in separate thread
|
|
def start_web_interface():
|
|
try:
|
|
self.log(f"Starting web interface on port {self.web_port}")
|
|
self.web_interface.run(host='0.0.0.0', port=self.web_port, debug=False)
|
|
except Exception as e:
|
|
self.log_error(f"Web interface failed to start: {e}")
|
|
|
|
self.web_thread = threading.Thread(
|
|
target=start_web_interface,
|
|
daemon=True,
|
|
name="WebInterfaceThread"
|
|
)
|
|
self.web_thread.start()
|
|
|
|
self.log("Web interface thread started")
|
|
|
|
# Update GUI with URL information
|
|
if hasattr(self, 'web_status_label'):
|
|
self.web_status_label.config(
|
|
text=f"🌐 Web: http://{local_ip}:{self.web_port}",
|
|
foreground="green"
|
|
)
|
|
|
|
except ImportError:
|
|
self.log_error("Web interface dependencies not available - install flask and flask-socketio")
|
|
self.enable_web = False
|
|
except Exception as e:
|
|
self.log_error(f"Web interface setup failed: {e}")
|
|
self.enable_web = False
|
|
|
|
def setup_global_input_capture(self):
|
|
"""Setup global keyboard capture using the best available method"""
|
|
|
|
# Method 1: Try pynput (cross-platform, most reliable)
|
|
if self.try_pynput_global_capture():
|
|
return
|
|
|
|
# Method 2: Try Windows-specific hook
|
|
if sys.platform == "win32" and self.try_windows_global_capture():
|
|
return
|
|
|
|
# Method 3: Try Linux-specific listener
|
|
if sys.platform.startswith('linux') and self.try_linux_global_capture():
|
|
return
|
|
|
|
# Fallback: No global capture
|
|
self.log("Warning: Global input capture not available")
|
|
self.log("NFC input will only work when debug window is in focus")
|
|
|
|
def try_pynput_global_capture(self):
|
|
"""Try setting up pynput global keyboard listener"""
|
|
try:
|
|
from pynput import keyboard
|
|
|
|
def on_key_press(key):
|
|
try:
|
|
current_time = time.time()
|
|
|
|
# Handle Enter key
|
|
if key == keyboard.Key.enter:
|
|
if self.nfc_buffer:
|
|
self.log(f"Global NFC input: {self.nfc_buffer}")
|
|
self.process_global_nfc_input(self.nfc_buffer.strip())
|
|
self.nfc_buffer = ""
|
|
return
|
|
|
|
# Handle digit keys
|
|
if hasattr(key, 'char') and key.char and key.char.isdigit():
|
|
# Reset buffer if timeout exceeded
|
|
if current_time - self.last_nfc_input > self.nfc_timeout:
|
|
self.nfc_buffer = ""
|
|
|
|
self.nfc_buffer += key.char
|
|
self.last_nfc_input = current_time
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Global key press error: {e}")
|
|
|
|
# Create and start the global listener
|
|
self.global_listener = keyboard.Listener(on_press=on_key_press)
|
|
self.global_listener.start()
|
|
|
|
self.global_input_method = "pynput"
|
|
self.global_input_active = True
|
|
self.log("Global input capture active (pynput)")
|
|
|
|
# Update web interface if available
|
|
if self.web_interface:
|
|
self.web_interface.update_global_input_status(True, "pynput")
|
|
|
|
return True
|
|
|
|
except ImportError:
|
|
self.logger.debug("pynput not available")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.debug(f"pynput setup failed: {e}")
|
|
return False
|
|
|
|
def try_windows_global_capture(self):
|
|
"""Try Windows-specific global keyboard hook"""
|
|
try:
|
|
import win32api
|
|
import win32con
|
|
import win32gui
|
|
|
|
def low_level_keyboard_proc(nCode, wParam, lParam):
|
|
try:
|
|
if nCode >= 0 and wParam == win32con.WM_KEYDOWN:
|
|
vk_code = lParam[0]
|
|
current_time = time.time()
|
|
|
|
# Handle Enter key (VK_RETURN = 13)
|
|
if vk_code == 13:
|
|
if self.nfc_buffer:
|
|
self.log(f"Global NFC input: {self.nfc_buffer}")
|
|
self.process_global_nfc_input(self.nfc_buffer.strip())
|
|
self.nfc_buffer = ""
|
|
|
|
# Handle digit keys (VK_0 to VK_9 = 48-57)
|
|
elif 48 <= vk_code <= 57:
|
|
# Reset buffer if timeout exceeded
|
|
if current_time - self.last_nfc_input > self.nfc_timeout:
|
|
self.nfc_buffer = ""
|
|
|
|
digit = chr(vk_code)
|
|
self.nfc_buffer += digit
|
|
self.last_nfc_input = current_time
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Windows hook error: {e}")
|
|
|
|
return win32gui.CallNextHookEx(None, nCode, wParam, lParam)
|
|
|
|
# Install the hook
|
|
hook = win32gui.SetWindowsHookEx(
|
|
win32con.WH_KEYBOARD_LL,
|
|
low_level_keyboard_proc,
|
|
win32api.GetModuleHandle(None),
|
|
0
|
|
)
|
|
|
|
if hook:
|
|
self.global_listener = hook
|
|
self.global_input_method = "windows"
|
|
self.global_input_active = True
|
|
self.log("Global input capture active (Windows)")
|
|
|
|
# Update web interface if available
|
|
if self.web_interface:
|
|
self.web_interface.update_global_input_status(True, "windows")
|
|
|
|
return True
|
|
|
|
except ImportError:
|
|
self.logger.debug("pywin32 not available")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.debug(f"Windows hook setup failed: {e}")
|
|
return False
|
|
|
|
return False
|
|
|
|
def try_linux_global_capture(self):
|
|
"""Try Linux-specific global keyboard listener"""
|
|
try:
|
|
from Xlib import display, X
|
|
from Xlib.ext import record
|
|
from Xlib.protocol import rq
|
|
|
|
def record_callback(reply):
|
|
try:
|
|
if reply.category != record.FromServer:
|
|
return
|
|
if reply.client_swapped:
|
|
return
|
|
if not len(reply.data) or reply.data[0] < 2:
|
|
return
|
|
|
|
data = reply.data
|
|
while len(data):
|
|
event, data = rq.EventField(None).parse_binary_value(
|
|
data, self.record_dpy.display, None, None)
|
|
|
|
if event.type == X.KeyPress:
|
|
keycode = event.detail
|
|
current_time = time.time()
|
|
|
|
# Convert keycode to character
|
|
keysym = self.local_dpy.keycode_to_keysym(keycode, 0)
|
|
|
|
# Handle Enter key
|
|
if keysym == 65293: # Return key
|
|
if self.nfc_buffer:
|
|
self.log(f"Global NFC input: {self.nfc_buffer}")
|
|
self.process_global_nfc_input(self.nfc_buffer.strip())
|
|
self.nfc_buffer = ""
|
|
|
|
# Handle digit keys
|
|
elif 48 <= keysym <= 57: # 0-9 keys
|
|
# Reset buffer if timeout exceeded
|
|
if current_time - self.last_nfc_input > self.nfc_timeout:
|
|
self.nfc_buffer = ""
|
|
|
|
digit = chr(keysym)
|
|
self.nfc_buffer += digit
|
|
self.last_nfc_input = current_time
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Linux record callback error: {e}")
|
|
|
|
# Setup X11 recording
|
|
self.local_dpy = display.Display()
|
|
self.record_dpy = display.Display()
|
|
|
|
# Create a recording context
|
|
ctx = self.record_dpy.record_create_context(
|
|
0,
|
|
[record.AllClients],
|
|
[{
|
|
'core_requests': (0, 0),
|
|
'core_replies': (0, 0),
|
|
'ext_requests': (0, 0, 0, 0),
|
|
'ext_replies': (0, 0, 0, 0),
|
|
'delivered_events': (0, 0),
|
|
'device_events': (X.KeyPress, X.KeyRelease),
|
|
'errors': (0, 0),
|
|
'client_started': False,
|
|
'client_died': False,
|
|
}]
|
|
)
|
|
|
|
# Start recording in a separate thread
|
|
def start_recording():
|
|
self.record_dpy.record_enable_context(ctx, record_callback)
|
|
self.record_dpy.record_free_context(ctx)
|
|
|
|
thread = threading.Thread(target=start_recording, daemon=True)
|
|
thread.start()
|
|
|
|
self.global_listener = ctx
|
|
self.global_input_method = "linux"
|
|
self.global_input_active = True
|
|
self.log("Global input capture active (Linux)")
|
|
|
|
# Update web interface if available
|
|
if self.web_interface:
|
|
self.web_interface.update_global_input_status(True, "linux")
|
|
|
|
return True
|
|
|
|
except ImportError:
|
|
self.logger.debug("python-xlib not available")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.debug(f"Linux listener setup failed: {e}")
|
|
return False
|
|
|
|
def process_global_nfc_input(self, nfc_id):
|
|
"""Process NFC input captured globally"""
|
|
if not nfc_id:
|
|
return
|
|
|
|
# Update the GUI entry field to show what was captured
|
|
try:
|
|
if hasattr(self, 'nfc_entry'):
|
|
self.nfc_entry.delete(0, tk.END)
|
|
self.nfc_entry.insert(0, nfc_id)
|
|
except:
|
|
pass
|
|
|
|
# Process the NFC input
|
|
self.process_nfc_input(nfc_id)
|
|
|
|
def setup_gui(self):
|
|
"""Setup enhanced GUI with web interface status"""
|
|
self.root.title("Video Player Controller (Enhanced + Web)")
|
|
self.root.geometry("900x700")
|
|
|
|
# Handle window close properly
|
|
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
|
|
|
|
try:
|
|
self.root.state('zoomed')
|
|
except:
|
|
self.root.geometry("{0}x{1}+0+0".format(
|
|
self.root.winfo_screenwidth()//2,
|
|
self.root.winfo_screenheight()
|
|
))
|
|
|
|
main_frame = ttk.Frame(self.root, padding="10")
|
|
main_frame.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# Enhanced controls with web interface status
|
|
control_frame = ttk.LabelFrame(main_frame, text="Controls", padding="10")
|
|
control_frame.pack(fill=tk.X)
|
|
|
|
# Web interface status
|
|
web_frame = ttk.Frame(control_frame)
|
|
web_frame.pack(fill=tk.X, pady=(0, 10))
|
|
|
|
if self.enable_web:
|
|
local_ip = self.get_local_ip()
|
|
|
|
# Create a more prominent web status display
|
|
web_info_frame = ttk.LabelFrame(web_frame, text="Web Interface Access", padding="10")
|
|
web_info_frame.pack(fill=tk.X)
|
|
|
|
# Local URL
|
|
local_url_frame = ttk.Frame(web_info_frame)
|
|
local_url_frame.pack(fill=tk.X, pady=2)
|
|
ttk.Label(local_url_frame, text="Local:").pack(side=tk.LEFT)
|
|
self.local_url_label = ttk.Label(
|
|
local_url_frame,
|
|
text=f"http://localhost:{self.web_port}",
|
|
foreground="blue",
|
|
font=('Consolas', 10, 'bold')
|
|
)
|
|
self.local_url_label.pack(side=tk.LEFT, padx=10)
|
|
ttk.Button(
|
|
local_url_frame,
|
|
text="📋 Copy",
|
|
command=self.copy_local_url,
|
|
width=8
|
|
).pack(side=tk.RIGHT)
|
|
|
|
# Network URL
|
|
network_url_frame = ttk.Frame(web_info_frame)
|
|
network_url_frame.pack(fill=tk.X, pady=2)
|
|
ttk.Label(network_url_frame, text="Network:").pack(side=tk.LEFT)
|
|
self.network_url_label = ttk.Label(
|
|
network_url_frame,
|
|
text=f"http://{local_ip}:{self.web_port}",
|
|
foreground="blue",
|
|
font=('Consolas', 10, 'bold')
|
|
)
|
|
self.network_url_label.pack(side=tk.LEFT, padx=10)
|
|
ttk.Button(
|
|
network_url_frame,
|
|
text="📋 Copy",
|
|
command=self.copy_network_url,
|
|
width=8
|
|
).pack(side=tk.RIGHT)
|
|
|
|
# Status and button
|
|
button_frame = ttk.Frame(web_info_frame)
|
|
button_frame.pack(fill=tk.X, pady=(5, 0))
|
|
|
|
self.web_status_label = ttk.Label(
|
|
button_frame,
|
|
text="🌐 Starting...",
|
|
foreground="orange"
|
|
)
|
|
self.web_status_label.pack(side=tk.LEFT)
|
|
|
|
ttk.Button(
|
|
button_frame,
|
|
text="Open in Browser",
|
|
command=self.open_web_interface
|
|
).pack(side=tk.RIGHT, padx=5)
|
|
else:
|
|
ttk.Label(
|
|
web_frame,
|
|
text="🌐 Web Interface: Disabled",
|
|
foreground="red"
|
|
).pack(side=tk.LEFT)
|
|
|
|
# Primary controls
|
|
controls_row1 = ttk.Frame(control_frame)
|
|
controls_row1.pack(fill=tk.X, pady=(0, 5))
|
|
|
|
ttk.Button(controls_row1, text="Skip Video", command=self.skip_video).pack(side=tk.LEFT, padx=5)
|
|
ttk.Button(controls_row1, text="Toggle Fullscreen", command=self.toggle_fullscreen).pack(side=tk.LEFT, padx=5)
|
|
|
|
# Mute/Unmute button
|
|
self.mute_button = ttk.Button(controls_row1, text="Mute", command=self.toggle_mute)
|
|
self.mute_button.pack(side=tk.LEFT, padx=5)
|
|
|
|
# Global input status indicator
|
|
self.global_input_label = ttk.Label(controls_row1, text="Global Input: Checking...", foreground="orange")
|
|
self.global_input_label.pack(side=tk.LEFT, padx=20)
|
|
|
|
# Exit button with confirmation
|
|
exit_button = ttk.Button(controls_row1, text="Exit Application", command=self.exit_application)
|
|
exit_button.pack(side=tk.RIGHT, padx=5)
|
|
|
|
# Secondary controls
|
|
controls_row2 = ttk.Frame(control_frame)
|
|
controls_row2.pack(fill=tk.X, pady=(5, 0))
|
|
|
|
ttk.Button(controls_row2, text="Clear Log", command=self.clear_log).pack(side=tk.LEFT, padx=5)
|
|
|
|
# Manual NFC input for testing
|
|
ttk.Label(controls_row2, text="NFC Test:").pack(side=tk.LEFT, padx=(20, 5))
|
|
self.nfc_entry = ttk.Entry(controls_row2, width=15)
|
|
self.nfc_entry.pack(side=tk.LEFT, padx=5)
|
|
self.nfc_entry.bind('<Return>', self.manual_nfc_input)
|
|
ttk.Button(controls_row2, text="Send", command=self.manual_nfc_input).pack(side=tk.LEFT, padx=2)
|
|
|
|
# Status indicator
|
|
self.status_label = ttk.Label(controls_row2, text="Status: Starting...", foreground="orange")
|
|
self.status_label.pack(side=tk.RIGHT, padx=5)
|
|
|
|
# Enhanced logging display
|
|
log_frame = ttk.LabelFrame(main_frame, text="System Log", padding="5")
|
|
log_frame.pack(fill=tk.BOTH, expand=True)
|
|
|
|
self.text_widget = scrolledtext.ScrolledText(
|
|
log_frame,
|
|
wrap=tk.WORD,
|
|
font=('Consolas', 9),
|
|
height=20
|
|
)
|
|
self.text_widget.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# Enhanced statistics
|
|
stats_frame = ttk.LabelFrame(main_frame, text="Statistics", padding="5")
|
|
stats_frame.pack(fill=tk.X)
|
|
|
|
self.stats_labels = {}
|
|
stats_grid = ttk.Frame(stats_frame)
|
|
stats_grid.pack(fill=tk.X)
|
|
|
|
stats_keys = [
|
|
('videos_played', 'Videos Played'),
|
|
('key_presses', 'NFC Scans'),
|
|
('errors', 'Errors'),
|
|
('uptime', 'Uptime'),
|
|
('queue_depth', 'Queue Depth'),
|
|
('current_video', 'Current Video')
|
|
]
|
|
|
|
for i, (key, label) in enumerate(stats_keys):
|
|
row = i // 2
|
|
col = (i % 2) * 2
|
|
|
|
ttk.Label(stats_grid, text=f"{label}:").grid(row=row, column=col, sticky=tk.W, padx=5)
|
|
self.stats_labels[key] = ttk.Label(stats_grid, text="0", font=('Consolas', 9))
|
|
self.stats_labels[key].grid(row=row, column=col+1, sticky=tk.W, padx=5)
|
|
|
|
# Still bind local keys as fallback
|
|
self.root.bind('<Key>', self.on_key_press)
|
|
self.root.focus_set()
|
|
|
|
def open_web_interface(self):
|
|
"""Open web interface in default browser"""
|
|
try:
|
|
import webbrowser
|
|
url = f"http://localhost:{self.web_port}"
|
|
webbrowser.open(url)
|
|
self.log(f"Opened web interface in browser: {url}")
|
|
except Exception as e:
|
|
self.log_error(f"Failed to open web interface: {e}")
|
|
|
|
def copy_local_url(self):
|
|
"""Copy local URL to clipboard"""
|
|
try:
|
|
url = f"http://localhost:{self.web_port}"
|
|
self.root.clipboard_clear()
|
|
self.root.clipboard_append(url)
|
|
self.root.update()
|
|
self.log(f"Local URL copied to clipboard: {url}")
|
|
if hasattr(self, 'web_status_label'):
|
|
original_text = self.web_status_label.cget("text")
|
|
self.web_status_label.config(text="📋 Local URL copied!", foreground="green")
|
|
self.root.after(2000, lambda: self.web_status_label.config(text=original_text, foreground="green"))
|
|
except Exception as e:
|
|
self.log_error(f"Failed to copy URL: {e}")
|
|
|
|
def copy_network_url(self):
|
|
"""Copy network URL to clipboard"""
|
|
try:
|
|
local_ip = self.get_local_ip()
|
|
url = f"http://{local_ip}:{self.web_port}"
|
|
self.root.clipboard_clear()
|
|
self.root.clipboard_append(url)
|
|
self.root.update()
|
|
self.log(f"Network URL copied to clipboard: {url}")
|
|
if hasattr(self, 'web_status_label'):
|
|
original_text = self.web_status_label.cget("text")
|
|
self.web_status_label.config(text="📋 Network URL copied!", foreground="green")
|
|
self.root.after(2000, lambda: self.web_status_label.config(text=original_text, foreground="green"))
|
|
except Exception as e:
|
|
self.log_error(f"Failed to copy URL: {e}")
|
|
|
|
def update_web_status(self, status, color="black"):
|
|
"""Update web interface status"""
|
|
if hasattr(self, 'web_status_label'):
|
|
self.web_status_label.config(text=f"🌐 {status}", foreground=color)
|
|
|
|
def update_global_input_status(self):
|
|
"""Update the global input status indicator"""
|
|
if self.global_input_active:
|
|
self.global_input_label.config(
|
|
text=f"Global Input: Active ({self.global_input_method})",
|
|
foreground="green"
|
|
)
|
|
else:
|
|
self.global_input_label.config(
|
|
text="Global Input: Inactive (Focus Required)",
|
|
foreground="red"
|
|
)
|
|
|
|
def on_closing(self):
|
|
"""Handle window close event with confirmation"""
|
|
if messagebox.askokcancel("Exit", "Do you want to exit the video player?"):
|
|
self.exit_application()
|
|
|
|
def exit_application(self):
|
|
"""Exit the entire application cleanly"""
|
|
try:
|
|
self.log("Exit requested - shutting down application")
|
|
self.status_label.config(text="Status: Shutting down...", foreground="red")
|
|
|
|
# Stop global input capture
|
|
if self.global_listener:
|
|
try:
|
|
if self.global_input_method == "pynput":
|
|
self.global_listener.stop()
|
|
elif self.global_input_method == "windows":
|
|
import win32gui
|
|
win32gui.UnhookWindowsHookEx(self.global_listener)
|
|
# Linux cleanup is automatic with daemon thread
|
|
self.log("Global input capture stopped")
|
|
except Exception as e:
|
|
self.log_error(f"Error stopping global input: {e}")
|
|
|
|
# Stop web interface
|
|
if self.web_interface:
|
|
try:
|
|
self.web_interface.stop()
|
|
self.log("Web interface stopped")
|
|
except Exception as e:
|
|
self.log_error(f"Error stopping web interface: {e}")
|
|
|
|
# Stop video player first
|
|
if self.video_player:
|
|
self.video_player.stop()
|
|
self.log("Video player stopped")
|
|
|
|
# Stop debug console
|
|
self.running = False
|
|
|
|
# Close the application
|
|
self.root.quit()
|
|
self.root.destroy()
|
|
|
|
# Force exit if needed
|
|
import sys
|
|
sys.exit(0)
|
|
|
|
except Exception as e:
|
|
self.log_error(f"Error during exit: {e}")
|
|
import sys
|
|
sys.exit(1)
|
|
|
|
def clear_log(self):
|
|
"""Clear the log display"""
|
|
if self.text_widget:
|
|
self.text_widget.delete(1.0, tk.END)
|
|
if self.web_interface:
|
|
with self.web_interface.lock:
|
|
self.web_interface.message_queue.clear()
|
|
self.log("Log cleared")
|
|
|
|
def manual_nfc_input(self, event=None):
|
|
"""Handle manual NFC input from entry field"""
|
|
nfc_id = self.nfc_entry.get().strip()
|
|
if nfc_id:
|
|
self.log(f"Manual NFC input: {nfc_id}")
|
|
self.process_nfc_input(nfc_id)
|
|
self.nfc_entry.delete(0, tk.END)
|
|
|
|
def on_key_press(self, event):
|
|
"""Enhanced NFC input handling (fallback when window has focus)"""
|
|
# Only process if global input is not active
|
|
if self.global_input_active:
|
|
return
|
|
|
|
current_time = time.time()
|
|
|
|
try:
|
|
# Process Enter key
|
|
if event.keysym == 'Return':
|
|
if self.nfc_buffer:
|
|
self.log(f"Local NFC input: {self.nfc_buffer}")
|
|
self.process_nfc_input(self.nfc_buffer)
|
|
self.nfc_buffer = ""
|
|
return
|
|
|
|
# Process number keys
|
|
if event.char and event.char.isdigit():
|
|
if current_time - self.last_nfc_input > self.nfc_timeout:
|
|
self.nfc_buffer = ""
|
|
|
|
self.nfc_buffer += event.char
|
|
self.last_nfc_input = current_time
|
|
|
|
except Exception as e:
|
|
self.log_error(f"Local key press handling error: {e}")
|
|
|
|
def log(self, message, level="INFO"):
|
|
"""Enhanced logging with timestamps and levels"""
|
|
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
thread_name = threading.current_thread().name[:10]
|
|
formatted = f"[{timestamp}][{thread_name}] {level}: {message}"
|
|
|
|
with self.lock:
|
|
self.message_queue.append(formatted)
|
|
|
|
# Also log to web interface if available
|
|
if self.web_interface:
|
|
self.web_interface.log(message, level)
|
|
|
|
if level == "ERROR":
|
|
self.stats['errors'] += 1
|
|
|
|
def log_error(self, message):
|
|
"""Log an error message"""
|
|
self.log(message, "ERROR")
|
|
|
|
def log_video_played(self, video_name):
|
|
"""Log video playback"""
|
|
self.log(f"Video played: {video_name}")
|
|
self.stats['videos_played'] += 1
|
|
self.stats['current_video'] = video_name
|
|
|
|
# Also notify web interface
|
|
if self.web_interface:
|
|
self.web_interface.log_video_played(video_name)
|
|
|
|
# Update status
|
|
if self.status_label:
|
|
self.status_label.config(text="Status: Playing video", foreground="green")
|
|
|
|
def log_key_press(self, key):
|
|
"""Log key press"""
|
|
self.log(f"Key pressed: {key}")
|
|
self.stats['key_presses'] += 1
|
|
|
|
def process_nfc_input(self, nfc_id=None):
|
|
"""Process completed NFC input with enhanced error handling"""
|
|
if nfc_id is None:
|
|
nfc_id = self.nfc_buffer.strip()
|
|
|
|
if not nfc_id:
|
|
return
|
|
|
|
self.log(f"Processing NFC ID: {nfc_id}")
|
|
self.log_key_press(nfc_id)
|
|
|
|
# Update status
|
|
if self.status_label:
|
|
self.status_label.config(text=f"Status: Processing NFC {nfc_id}", foreground="orange")
|
|
|
|
if self.video_player:
|
|
try:
|
|
# Force immediate processing
|
|
self.video_player.play_specific_video(nfc_id)
|
|
self.log(f"Sent NFC {nfc_id} to video player")
|
|
|
|
# Update status after short delay
|
|
self.root.after(2000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
|
|
|
|
except Exception as e:
|
|
self.log_error(f"Failed to send NFC to video player: {e}")
|
|
self.status_label.config(text="Status: Error", foreground="red")
|
|
else:
|
|
self.log_error("Video player not connected")
|
|
self.status_label.config(text="Status: No video player", foreground="red")
|
|
|
|
def set_video_player(self, video_player):
|
|
"""Set reference to video player"""
|
|
self.video_player = video_player
|
|
self.log("Video player connected to debug console")
|
|
|
|
# Also connect to web interface
|
|
if self.web_interface:
|
|
self.web_interface.set_video_player(video_player)
|
|
|
|
if self.status_label:
|
|
self.status_label.config(text="Status: Connected", foreground="green")
|
|
|
|
if self.enable_web:
|
|
self.update_web_status("Ready", "green")
|
|
|
|
def skip_video(self):
|
|
"""Skip current video with better error handling"""
|
|
if self.video_player:
|
|
try:
|
|
self.log("Skipping video...")
|
|
self.status_label.config(text="Status: Skipping...", foreground="orange")
|
|
|
|
self.video_player.skip_current_video()
|
|
# Force next video immediately
|
|
self.video_player.force_next_video()
|
|
self.log("Video skip requested")
|
|
|
|
# Update status after short delay
|
|
self.root.after(1000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
|
|
|
|
except Exception as e:
|
|
self.log_error(f"Skip video failed: {e}")
|
|
self.status_label.config(text="Status: Error", foreground="red")
|
|
else:
|
|
self.log_error("Video player not connected")
|
|
self.status_label.config(text="Status: No video player", foreground="red")
|
|
|
|
def toggle_mute(self):
|
|
"""Toggle mute/unmute with status updates"""
|
|
if self.video_player:
|
|
try:
|
|
if self.is_muted:
|
|
# Unmute - set to reasonable volume
|
|
self.video_player.set_volume(0.7)
|
|
self.mute_button.config(text="Mute")
|
|
self.is_muted = False
|
|
self.log("Audio unmuted")
|
|
self.status_label.config(text="Status: Audio unmuted", foreground="green")
|
|
else:
|
|
# Mute
|
|
self.video_player.set_volume(0.0)
|
|
self.mute_button.config(text="Unmute")
|
|
self.is_muted = True
|
|
self.log("Audio muted")
|
|
self.status_label.config(text="Status: Audio muted", foreground="orange")
|
|
|
|
# Reset status after delay
|
|
self.root.after(2000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
|
|
|
|
except Exception as e:
|
|
self.log_error(f"Mute toggle failed: {e}")
|
|
self.status_label.config(text="Status: Error", foreground="red")
|
|
else:
|
|
self.log_error("Video player not connected")
|
|
self.status_label.config(text="Status: No video player", foreground="red")
|
|
|
|
def toggle_fullscreen(self):
|
|
"""Toggle fullscreen mode with status updates"""
|
|
if self.video_player:
|
|
try:
|
|
self.status_label.config(text="Status: Toggling fullscreen...", foreground="orange")
|
|
self.video_player.toggle_fullscreen()
|
|
self.log("Fullscreen toggled")
|
|
|
|
# Update status after delay
|
|
self.root.after(1000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
|
|
|
|
except Exception as e:
|
|
self.log_error(f"Fullscreen toggle failed: {e}")
|
|
self.status_label.config(text="Status: Error", foreground="red")
|
|
else:
|
|
self.log_error("Video player not connected")
|
|
self.status_label.config(text="Status: No video player", foreground="red")
|
|
|
|
def update_queue_depth(self, depth):
|
|
"""Update video queue depth"""
|
|
self.stats['queue_depth'] = depth
|
|
if self.web_interface:
|
|
self.web_interface.update_queue_depth(depth)
|
|
|
|
def update_display(self):
|
|
"""Update the console display with enhanced formatting"""
|
|
messages = []
|
|
with self.lock:
|
|
while self.message_queue:
|
|
messages.append(self.message_queue.popleft())
|
|
|
|
for msg in messages:
|
|
self.text_widget.insert(tk.END, msg + "\n")
|
|
|
|
# Enhanced color coding for different log levels
|
|
if "ERROR" in msg:
|
|
self.text_widget.tag_add("error", f"end-2l linestart", f"end-1l lineend")
|
|
self.text_widget.tag_config("error", foreground="red", background="#ffe6e6")
|
|
elif "WARNING" in msg:
|
|
self.text_widget.tag_add("warning", f"end-2l linestart", f"end-1l lineend")
|
|
self.text_widget.tag_config("warning", foreground="orange", background="#fff3cd")
|
|
elif "Global NFC input" in msg or "NFC" in msg:
|
|
self.text_widget.tag_add("nfc", f"end-2l linestart", f"end-1l lineend")
|
|
self.text_widget.tag_config("nfc", foreground="blue", background="#e6f3ff")
|
|
elif "Video played:" in msg or "SPECIFIC VIDEO" in msg:
|
|
self.text_widget.tag_add("video", f"end-2l linestart", f"end-1l lineend")
|
|
self.text_widget.tag_config("video", foreground="green", background="#e6ffe6")
|
|
elif "Web interface" in msg:
|
|
self.text_widget.tag_add("web", f"end-2l linestart", f"end-1l lineend")
|
|
self.text_widget.tag_config("web", foreground="purple", background="#f0e6ff")
|
|
elif "Global input capture" in msg:
|
|
self.text_widget.tag_add("global", f"end-2l linestart", f"end-1l lineend")
|
|
self.text_widget.tag_config("global", foreground="purple", background="#f0e6ff")
|
|
elif "transition" in msg.lower():
|
|
self.text_widget.tag_add("transition", f"end-2l linestart", f"end-1l lineend")
|
|
self.text_widget.tag_config("transition", foreground="purple")
|
|
|
|
self.text_widget.see(tk.END)
|
|
|
|
# Limit text widget size to prevent memory issues
|
|
lines = self.text_widget.get("1.0", tk.END).count('\n')
|
|
if lines > 1000:
|
|
self.text_widget.delete("1.0", "200.0")
|
|
|
|
# Update statistics
|
|
uptime = str(datetime.now() - self.stats['start_time']).split('.')[0]
|
|
self.stats_labels['uptime'].config(text=uptime)
|
|
|
|
for key in self.stats_labels:
|
|
if key in self.stats and key != 'uptime':
|
|
value = str(self.stats[key])
|
|
# Truncate long video names
|
|
if key == 'current_video' and len(value) > 30:
|
|
value = value[:27] + "..."
|
|
self.stats_labels[key].config(text=value)
|
|
|
|
# Update global input status
|
|
self.update_global_input_status()
|
|
|
|
def run(self):
|
|
"""Run the debug console with enhanced error handling"""
|
|
self.running = True
|
|
self.log("Enhanced debug console with web interface started")
|
|
|
|
if self.enable_web:
|
|
self.log(f"Web interface accessible at: http://localhost:{self.web_port}")
|
|
self.update_web_status("Starting...", "orange")
|
|
|
|
# Update web status after initialization
|
|
def update_web_status_delayed():
|
|
time.sleep(3)
|
|
if self.web_interface and self.running:
|
|
self.root.after(0, lambda: self.update_web_status("Ready", "green"))
|
|
|
|
threading.Thread(target=update_web_status_delayed, daemon=True).start()
|
|
|
|
if self.status_label:
|
|
self.status_label.config(text="Status: Starting...", foreground="orange")
|
|
|
|
def update_loop():
|
|
if self.running:
|
|
try:
|
|
self.update_display()
|
|
|
|
# Update status to ready if no recent activity
|
|
if (self.status_label and
|
|
"Starting" in self.status_label.cget("text") and
|
|
self.video_player):
|
|
self.status_label.config(text="Status: Ready", foreground="green")
|
|
|
|
self.root.after(100, update_loop)
|
|
except Exception as e:
|
|
self.log_error(f"Display update error: {e}")
|
|
if self.running:
|
|
self.root.after(1000, update_loop) # Retry after delay
|
|
|
|
update_loop()
|
|
|
|
try:
|
|
self.root.mainloop()
|
|
except Exception as e:
|
|
self.logger.error(f"Debug console error: {e}")
|
|
finally:
|
|
self.running = False
|
|
|
|
def stop(self):
|
|
"""Stop the debug console"""
|
|
self.running = False
|
|
|
|
# Stop global input capture
|
|
if self.global_listener:
|
|
try:
|
|
if self.global_input_method == "pynput":
|
|
self.global_listener.stop()
|
|
elif self.global_input_method == "windows":
|
|
import win32gui
|
|
win32gui.UnhookWindowsHookEx(self.global_listener)
|
|
# Linux cleanup is automatic with daemon thread
|
|
except Exception as e:
|
|
self.logger.debug(f"Error stopping global input: {e}")
|
|
|
|
# Stop web interface
|
|
if self.web_interface:
|
|
try:
|
|
self.web_interface.stop()
|
|
except Exception as e:
|
|
self.logger.debug(f"Error stopping web interface: {e}")
|
|
|
|
if self.root:
|
|
try:
|
|
self.root.quit()
|
|
self.root.destroy()
|
|
except:
|
|
pass
|