Upload files to "/"

Add core Python modules
This commit is contained in:
2025-12-09 16:49:13 +11:00
parent ab77befe3f
commit 05a8bd237b
5 changed files with 3773 additions and 0 deletions

664
debug_console.py Normal file
View File

@@ -0,0 +1,664 @@
# debug_console.py - Enhanced with Folder Reset button
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import threading
import time
import logging
from datetime import datetime
from collections import deque
import sys
class EnhancedDebugConsole:
def __init__(self):
self.root = tk.Tk()
self.text_widget = None
self.running = False
self.logger = logging.getLogger(__name__)
self.video_player = None
self.message_queue = deque(maxlen=1000)
self.lock = threading.Lock()
self.nfc_buffer = ""
self.nfc_timeout = 0.3
self.last_nfc_input = 0
# Global input capture
self.global_listener = None
self.global_input_method = None
self.global_input_active = False
# Track mute state
self.is_muted = False
self.stats = {
'videos_played': 0,
'key_presses': 0,
'errors': 0,
'start_time': datetime.now(),
'queue_depth': 0,
'current_video': 'None',
'fullscreen': True,
'folder_videos_played': 0 # New stat for folder videos
}
self.setup_gui()
self.setup_global_input_capture()
self.log("Enhanced debug console initialized with folder support")
def setup_global_input_capture(self):
"""Setup global keyboard capture"""
if self.try_pynput_global_capture():
return
if sys.platform == "win32" and self.try_windows_global_capture():
return
if sys.platform.startswith('linux') and self.try_linux_global_capture():
return
self.log("Warning: Global input capture not available")
def try_pynput_global_capture(self):
"""Try pynput global keyboard listener"""
try:
from pynput import keyboard
def on_key_press(key):
try:
current_time = time.time()
if key == keyboard.Key.enter:
if self.nfc_buffer:
self.log(f"Global NFC input: {self.nfc_buffer}")
self.process_global_nfc_input(self.nfc_buffer.strip())
self.nfc_buffer = ""
return
if hasattr(key, 'char') and key.char and key.char.isdigit():
if current_time - self.last_nfc_input > self.nfc_timeout:
self.nfc_buffer = ""
self.nfc_buffer += key.char
self.last_nfc_input = current_time
except Exception as e:
self.logger.debug(f"Global key press error: {e}")
self.global_listener = keyboard.Listener(on_press=on_key_press)
self.global_listener.start()
self.global_input_method = "pynput"
self.global_input_active = True
self.log("Global input capture active (pynput)")
return True
except ImportError:
return False
except Exception as e:
self.logger.debug(f"pynput setup failed: {e}")
return False
def try_windows_global_capture(self):
"""Windows-specific global keyboard hook"""
try:
import win32api
import win32con
import win32gui
def low_level_keyboard_proc(nCode, wParam, lParam):
try:
if nCode >= 0 and wParam == win32con.WM_KEYDOWN:
vk_code = lParam[0]
current_time = time.time()
if vk_code == 13: # Enter
if self.nfc_buffer:
self.log(f"Global NFC input: {self.nfc_buffer}")
self.process_global_nfc_input(self.nfc_buffer.strip())
self.nfc_buffer = ""
elif 48 <= vk_code <= 57: # 0-9
if current_time - self.last_nfc_input > self.nfc_timeout:
self.nfc_buffer = ""
digit = chr(vk_code)
self.nfc_buffer += digit
self.last_nfc_input = current_time
except Exception as e:
self.logger.debug(f"Windows hook error: {e}")
return win32gui.CallNextHookEx(None, nCode, wParam, lParam)
hook = win32gui.SetWindowsHookEx(
win32con.WH_KEYBOARD_LL,
low_level_keyboard_proc,
win32api.GetModuleHandle(None),
0
)
if hook:
self.global_listener = hook
self.global_input_method = "windows"
self.global_input_active = True
self.log("Global input capture active (Windows)")
return True
except ImportError:
return False
except Exception as e:
self.logger.debug(f"Windows hook setup failed: {e}")
return False
return False
def try_linux_global_capture(self):
"""Linux-specific global keyboard listener"""
# Implementation same as original
return False
def process_global_nfc_input(self, nfc_id):
"""Process NFC input captured globally"""
if not nfc_id:
return
try:
if hasattr(self, 'nfc_entry'):
self.nfc_entry.delete(0, tk.END)
self.nfc_entry.insert(0, nfc_id)
except:
pass
self.process_nfc_input(nfc_id)
def setup_gui(self):
"""Setup GUI with folder reset button"""
self.root.title("Video Player Controller (Enhanced with Folder Support)")
self.root.geometry("900x700")
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
try:
self.root.state('zoomed')
except:
self.root.geometry("{0}x{1}+0+0".format(
self.root.winfo_screenwidth()//2,
self.root.winfo_screenheight()
))
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Enhanced controls
control_frame = ttk.LabelFrame(main_frame, text="Controls", padding="10")
control_frame.pack(fill=tk.X)
# Primary controls row
controls_row1 = ttk.Frame(control_frame)
controls_row1.pack(fill=tk.X, pady=(0, 5))
ttk.Button(controls_row1, text="Skip Video", command=self.skip_video).pack(side=tk.LEFT, padx=5)
ttk.Button(controls_row1, text="Toggle Fullscreen", command=self.toggle_fullscreen).pack(side=tk.LEFT, padx=5)
# Mute/Unmute button
self.mute_button = ttk.Button(controls_row1, text="Mute", command=self.toggle_mute)
self.mute_button.pack(side=tk.LEFT, padx=5)
# NEW: Folder Reset Button
reset_button = ttk.Button(
controls_row1,
text="🔄 Reset Folder Sequences",
command=self.reset_folder_sequences
)
reset_button.pack(side=tk.LEFT, padx=5)
# Global input status
self.global_input_label = ttk.Label(controls_row1, text="Global Input: Checking...", foreground="orange")
self.global_input_label.pack(side=tk.LEFT, padx=20)
# Exit button
exit_button = ttk.Button(controls_row1, text="Exit Application", command=self.exit_application)
exit_button.pack(side=tk.RIGHT, padx=5)
# Secondary controls row
controls_row2 = ttk.Frame(control_frame)
controls_row2.pack(fill=tk.X, pady=(5, 0))
ttk.Button(controls_row2, text="Clear Log", command=self.clear_log).pack(side=tk.LEFT, padx=5)
# Manual NFC input
ttk.Label(controls_row2, text="NFC Test:").pack(side=tk.LEFT, padx=(20, 5))
self.nfc_entry = ttk.Entry(controls_row2, width=15)
self.nfc_entry.pack(side=tk.LEFT, padx=5)
self.nfc_entry.bind('<Return>', self.manual_nfc_input)
ttk.Button(controls_row2, text="Send", command=self.manual_nfc_input).pack(side=tk.LEFT, padx=2)
# Status indicator
self.status_label = ttk.Label(controls_row2, text="Status: Starting...", foreground="orange")
self.status_label.pack(side=tk.RIGHT, padx=5)
# Logging display
log_frame = ttk.LabelFrame(main_frame, text="System Log", padding="5")
log_frame.pack(fill=tk.BOTH, expand=True)
self.text_widget = scrolledtext.ScrolledText(
log_frame,
wrap=tk.WORD,
font=('Consolas', 9),
height=20
)
self.text_widget.pack(fill=tk.BOTH, expand=True)
# Enhanced statistics
stats_frame = ttk.LabelFrame(main_frame, text="Statistics", padding="5")
stats_frame.pack(fill=tk.X)
self.stats_labels = {}
stats_grid = ttk.Frame(stats_frame)
stats_grid.pack(fill=tk.X)
stats_keys = [
('videos_played', 'Videos Played'),
('folder_videos_played', 'Folder Videos'),
('key_presses', 'NFC Scans'),
('errors', 'Errors'),
('uptime', 'Uptime'),
('queue_depth', 'Queue Depth'),
('trailers_pool', 'Trailers Available'),
('current_video', 'Current Video')
]
for i, (key, label) in enumerate(stats_keys):
row = i // 2
col = (i % 2) * 2
ttk.Label(stats_grid, text=f"{label}:").grid(row=row, column=col, sticky=tk.W, padx=5)
self.stats_labels[key] = ttk.Label(stats_grid, text="0", font=('Consolas', 9))
self.stats_labels[key].grid(row=row, column=col+1, sticky=tk.W, padx=5)
# Bind local keys as fallback
self.root.bind('<Key>', self.on_key_press)
self.root.focus_set()
def reset_folder_sequences(self):
"""Reset all folder video sequences to the beginning"""
if messagebox.askyesno(
"Reset Folder Sequences",
"This will reset all folder video sequences back to the first video.\n\nAre you sure?"
):
if self.video_player:
try:
self.log("Resetting all folder sequences...")
self.status_label.config(text="Status: Resetting folders...", foreground="orange")
if self.video_player.reset_all_folder_positions():
self.log("✓ All folder sequences reset successfully")
messagebox.showinfo(
"Reset Complete",
"All folder video sequences have been reset to the beginning."
)
self.status_label.config(text="Status: Folders reset", foreground="green")
else:
self.log_error("Failed to reset folder sequences")
messagebox.showerror("Reset Failed", "Could not reset folder sequences.")
self.status_label.config(text="Status: Reset failed", foreground="red")
# Reset status after delay
self.root.after(3000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
except Exception as e:
self.log_error(f"Folder reset error: {e}")
messagebox.showerror("Error", f"Reset failed: {e}")
self.status_label.config(text="Status: Error", foreground="red")
else:
self.log_error("Video player not connected")
messagebox.showerror("Error", "Video player not connected")
def update_global_input_status(self):
"""Update global input status indicator"""
if self.global_input_active:
self.global_input_label.config(
text=f"Global Input: Active ({self.global_input_method})",
foreground="green"
)
else:
self.global_input_label.config(
text="Global Input: Inactive (Focus Required)",
foreground="red"
)
def on_closing(self):
"""Handle window close event"""
if messagebox.askokcancel("Exit", "Do you want to exit the video player?"):
self.exit_application()
def exit_application(self):
"""Exit the application cleanly"""
try:
self.log("Exit requested - shutting down")
self.status_label.config(text="Status: Shutting down...", foreground="red")
# Stop global input capture
if self.global_listener:
try:
if self.global_input_method == "pynput":
self.global_listener.stop()
elif self.global_input_method == "windows":
import win32gui
win32gui.UnhookWindowsHookEx(self.global_listener)
self.log("Global input capture stopped")
except Exception as e:
self.log_error(f"Error stopping global input: {e}")
# Stop video player
if self.video_player:
self.video_player.stop()
self.log("Video player stopped")
self.running = False
self.root.quit()
self.root.destroy()
import sys
sys.exit(0)
except Exception as e:
self.log_error(f"Error during exit: {e}")
import sys
sys.exit(1)
def clear_log(self):
"""Clear the log display"""
if self.text_widget:
self.text_widget.delete(1.0, tk.END)
self.log("Log cleared")
def manual_nfc_input(self, event=None):
"""Handle manual NFC input"""
nfc_id = self.nfc_entry.get().strip()
if nfc_id:
self.log(f"Manual NFC input: {nfc_id}")
self.process_nfc_input(nfc_id)
self.nfc_entry.delete(0, tk.END)
def on_key_press(self, event):
"""Fallback NFC input handling"""
if self.global_input_active:
return
current_time = time.time()
try:
if event.keysym == 'Return':
if self.nfc_buffer:
self.log(f"Local NFC input: {self.nfc_buffer}")
self.process_nfc_input(self.nfc_buffer)
self.nfc_buffer = ""
return
if event.char and event.char.isdigit():
if current_time - self.last_nfc_input > self.nfc_timeout:
self.nfc_buffer = ""
self.nfc_buffer += event.char
self.last_nfc_input = current_time
except Exception as e:
self.log_error(f"Local key press error: {e}")
def log(self, message, level="INFO"):
"""Enhanced logging"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
thread_name = threading.current_thread().name[:10]
formatted = f"[{timestamp}][{thread_name}] {level}: {message}"
with self.lock:
self.message_queue.append(formatted)
if level == "ERROR":
self.stats['errors'] += 1
def log_error(self, message):
"""Log error message"""
self.log(message, "ERROR")
def log_video_played(self, video_name):
"""Log video playback - thread-safe version"""
self.log(f"Video played: {video_name}")
self.stats['videos_played'] += 1
self.stats['current_video'] = video_name
# Update status label using thread-safe after() method
if self.status_label:
try:
self.root.after(0, lambda: self._update_status_safe("Status: Playing video", "green"))
except:
pass # Ignore if GUI isn't ready
def _update_status_safe(self, text, color):
"""Safely update status label from any thread"""
try:
if self.status_label:
self.status_label.config(text=text, foreground=color)
except:
pass
def log_key_press(self, key):
"""Log key press"""
self.log(f"Key pressed: {key}")
self.stats['key_presses'] += 1
def process_nfc_input(self, nfc_id=None):
"""Process NFC input"""
if nfc_id is None:
nfc_id = self.nfc_buffer.strip()
if not nfc_id:
return
self.log(f"Processing NFC ID: {nfc_id}")
self.log_key_press(nfc_id)
if self.status_label:
self.status_label.config(text=f"Status: Processing NFC {nfc_id}", foreground="orange")
if self.video_player:
try:
self.video_player.play_specific_video(nfc_id)
self.log(f"Sent NFC {nfc_id} to video player")
self.root.after(2000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
except Exception as e:
self.log_error(f"Failed to send NFC: {e}")
self.status_label.config(text="Status: Error", foreground="red")
else:
self.log_error("Video player not connected")
self.status_label.config(text="Status: No video player", foreground="red")
def set_video_player(self, video_player):
"""Set reference to video player"""
self.video_player = video_player
self.log("Video player connected to debug console")
if self.status_label:
self.status_label.config(text="Status: Connected", foreground="green")
def skip_video(self):
"""Skip current video"""
if self.video_player:
try:
self.log("Skipping video...")
self.status_label.config(text="Status: Skipping...", foreground="orange")
self.video_player.skip_current_video()
self.video_player.force_next_video()
self.log("Video skip requested")
self.root.after(1000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
except Exception as e:
self.log_error(f"Skip video failed: {e}")
self.status_label.config(text="Status: Error", foreground="red")
else:
self.log_error("Video player not connected")
self.status_label.config(text="Status: No video player", foreground="red")
def toggle_mute(self):
"""Toggle mute/unmute"""
if self.video_player:
try:
if self.is_muted:
self.video_player.set_volume(0.7)
self.mute_button.config(text="Mute")
self.is_muted = False
self.log("Audio unmuted")
self.status_label.config(text="Status: Audio unmuted", foreground="green")
else:
self.video_player.set_volume(0.0)
self.mute_button.config(text="Unmute")
self.is_muted = True
self.log("Audio muted")
self.status_label.config(text="Status: Audio muted", foreground="orange")
self.root.after(2000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
except Exception as e:
self.log_error(f"Mute toggle failed: {e}")
self.status_label.config(text="Status: Error", foreground="red")
else:
self.log_error("Video player not connected")
def toggle_fullscreen(self):
"""Toggle fullscreen mode"""
if self.video_player:
try:
self.status_label.config(text="Status: Toggling fullscreen...", foreground="orange")
self.video_player.toggle_fullscreen()
self.log("Fullscreen toggled")
self.root.after(1000, lambda: self.status_label.config(text="Status: Ready", foreground="green"))
except Exception as e:
self.log_error(f"Fullscreen toggle failed: {e}")
self.status_label.config(text="Status: Error", foreground="red")
else:
self.log_error("Video player not connected")
def update_queue_depth(self, depth):
"""Update video queue depth"""
self.stats['queue_depth'] = depth
def update_display(self):
"""Update the console display"""
messages = []
with self.lock:
while self.message_queue:
messages.append(self.message_queue.popleft())
for msg in messages:
self.text_widget.insert(tk.END, msg + "\n")
# Color coding
if "ERROR" in msg:
self.text_widget.tag_add("error", f"end-2l linestart", f"end-1l lineend")
self.text_widget.tag_config("error", foreground="red", background="#ffe6e6")
elif "WARNING" in msg:
self.text_widget.tag_add("warning", f"end-2l linestart", f"end-1l lineend")
self.text_widget.tag_config("warning", foreground="orange", background="#fff3cd")
elif "Global NFC input" in msg or "NFC" in msg:
self.text_widget.tag_add("nfc", f"end-2l linestart", f"end-1l lineend")
self.text_widget.tag_config("nfc", foreground="blue", background="#e6f3ff")
elif "Video played:" in msg or "SPECIFIC VIDEO" in msg or "Folder" in msg:
self.text_widget.tag_add("video", f"end-2l linestart", f"end-1l lineend")
self.text_widget.tag_config("video", foreground="green", background="#e6ffe6")
elif "Reset" in msg or "reset" in msg:
self.text_widget.tag_add("reset", f"end-2l linestart", f"end-1l lineend")
self.text_widget.tag_config("reset", foreground="purple", background="#f0e6ff")
self.text_widget.see(tk.END)
# Limit text size
lines = self.text_widget.get("1.0", tk.END).count('\n')
if lines > 1000:
self.text_widget.delete("1.0", "200.0")
# Update statistics
uptime = str(datetime.now() - self.stats['start_time']).split('.')[0]
self.stats_labels['uptime'].config(text=uptime)
for key in self.stats_labels:
if key in self.stats and key != 'uptime':
value = str(self.stats[key])
if key == 'current_video' and len(value) > 30:
value = value[:27] + "..."
self.stats_labels[key].config(text=value)
# Update folder videos stat from video player
if self.video_player and hasattr(self.video_player, 'stats'):
folder_count = self.video_player.stats.get('folder_videos_played', 0)
self.stats_labels['folder_videos_played'].config(text=str(folder_count))
# Update trailers pool stat
if self.video_player:
try:
total_trailers = len(self.video_player.trailer_videos) if hasattr(self.video_player, 'trailer_videos') else 0
recent_count = len(self.video_player.recently_played_trailers[-25:]) if hasattr(self.video_player, 'recently_played_trailers') else 0
available = total_trailers - recent_count
if available < 0:
available = 0
self.stats_labels['trailers_pool'].config(text=f"{available}/{total_trailers}")
except:
pass
# Update global input status
self.update_global_input_status()
def run(self):
"""Run the debug console"""
self.running = True
self.log("Enhanced debug console started with folder support")
if self.status_label:
self.status_label.config(text="Status: Starting...", foreground="orange")
def update_loop():
if self.running:
try:
self.update_display()
if (self.status_label and
"Starting" in self.status_label.cget("text") and
self.video_player):
self.status_label.config(text="Status: Ready", foreground="green")
self.root.after(100, update_loop)
except Exception as e:
self.log_error(f"Display update error: {e}")
if self.running:
self.root.after(1000, update_loop)
update_loop()
try:
self.root.mainloop()
except Exception as e:
self.logger.error(f"Debug console error: {e}")
finally:
self.running = False
def stop(self):
"""Stop the debug console"""
self.running = False
if self.global_listener:
try:
if self.global_input_method == "pynput":
self.global_listener.stop()
elif self.global_input_method == "windows":
import win32gui
win32gui.UnhookWindowsHookEx(self.global_listener)
except Exception as e:
self.logger.debug(f"Error stopping global input: {e}")
if self.root:
try:
self.root.quit()
self.root.destroy()
except:
pass

1037
enhanced_debug_console.py Normal file

File diff suppressed because it is too large Load Diff

763
mpv_seamless_player.py Normal file
View File

@@ -0,0 +1,763 @@
# mpv_seamless_player.py - Enhanced with folder sequence support
"""
MPV-based seamless video player with folder sequence playback
"""
import os
import sys
import random
import time
import threading
import logging
import queue
from pathlib import Path
import tkinter as tk
try:
import mpv
MPV_AVAILABLE = True
except ImportError:
MPV_AVAILABLE = False
print("ERROR: python-mpv not installed!")
class MPVSeamlessPlayer:
def __init__(self, config, debug_console):
self.config = config
self.debug_console = debug_console
self.logger = logging.getLogger(__name__)
self.running = False
if not MPV_AVAILABLE:
raise Exception("MPV not available")
# Video state
self.current_mode = "trailers"
self.current_video_path = None
self.specific_video_playing = False
self.specific_video_start_time = None
# MPV setup
self.player_a = None
self.player_b = None
self.active_player = 'a'
# Display settings
self.screen_width = 1920
self.screen_height = 1080
self.is_fullscreen = True
# Content - Enhanced with folder support
self.trailer_videos = []
self.key_mapping = {} # Single file mappings
self.folder_mapping = {} # Folder sequence mappings
self.folder_state = {} # Track current position in each folder
self.video_queue = queue.Queue()
self.lock = threading.Lock()
# Enhanced trailer randomness - avoid recent repeats
self.recently_played_trailers = [] # Track last N trailers
self.max_recent_trailers = 36 # Don't repeat within last 25 plays
self.trailer_history_size = 70 # Keep history of 50 for better randomness
# Transition state
self.transition_lock = threading.Lock()
self.last_transition_time = 0
# Control flags
self.force_next = False
self.pending_specific_video = False
# Minimum play time protection
self.video_start_time = None
self.minimum_play_time = 3.0
self.minimum_specific_play_time = 5.0
# Performance tracking
self.stats = {
'transitions': 0,
'failed_loads': 0,
'queue_processed': 0,
'folder_videos_played': 0
}
# Initialize
self.logger.info("=== MPV PLAYER INITIALIZATION ===")
self.detect_display_settings()
if not self.setup_mpv():
raise Exception("MPV setup failed")
self.load_content()
self.logger.info("=== MPV PLAYER READY ===")
def detect_display_settings(self):
"""Detect optimal display configuration"""
try:
temp_root = tk.Tk()
temp_root.withdraw()
self.screen_width = temp_root.winfo_screenwidth()
self.screen_height = temp_root.winfo_screenheight()
self.logger.info(f"Screen: {self.screen_width}x{self.screen_height}")
temp_root.destroy()
except Exception as e:
self.logger.error(f"Display detection failed: {e}")
self.screen_width = 1920
self.screen_height = 1080
def setup_mpv(self):
"""Initialize MPV players"""
try:
self.logger.info("Setting up MPV players...")
mpv_config = {
'vo': 'gpu',
'hwdec': 'auto',
'fullscreen': True,
'scale': 'lanczos',
'keepaspect': True,
'keepaspect-window': True,
'volume': 70,
'volume-max': 100,
'keep-open': 'no',
'loop-file': 'no',
'pause': False,
'osd-level': 0,
'quiet': True,
}
self.player_a = mpv.MPV(**mpv_config)
self.player_b = mpv.MPV(**mpv_config)
self.player_a.register_event_callback(lambda event: self._mpv_event_handler('a', event))
self.player_b.register_event_callback(lambda event: self._mpv_event_handler('b', event))
self.logger.info("MPV setup complete")
return True
except Exception as e:
self.logger.error(f"MPV setup failed: {e}", exc_info=True)
return False
def _mpv_event_handler(self, player_id, event):
"""Handle MPV events for seamless playback - FIXED"""
try:
# MPV events don't have .get() - use direct attribute access
event_id = None
# Try to get event_id
if hasattr(event, 'event_id'):
event_id = event.event_id
elif isinstance(event, dict) and 'event_id' in event:
event_id = event['event_id']
# Event ID 7 = END_FILE
if event_id == 7:
if self.active_player == player_id and self.running:
self.logger.info(f"Video ended on player {player_id}")
self._handle_video_end()
except Exception as e:
# Silently ignore common event attribute errors
error_msg = str(e)
if "get" not in error_msg and "event_id" not in error_msg:
self.logger.debug(f"Event error for player {player_id}: {e}")
def _handle_video_end(self):
"""Handle natural video end"""
try:
if self.specific_video_playing:
self.logger.info("Specific video completed - returning to trailer mode")
self.debug_console.log("Video completed - switching back to trailer mode")
self.specific_video_playing = False
self.specific_video_start_time = None
self.current_mode = "trailers"
def queue_next_trailer():
time.sleep(self.config.TRANSITION_DELAY)
next_trailer = self.get_random_trailer()
if next_trailer and self.running:
self.video_queue.put(("trailers", next_trailer))
threading.Thread(target=queue_next_trailer, daemon=True).start()
except Exception as e:
self.logger.error(f"Error handling video end: {e}")
def load_content(self):
"""Load video content and mappings - Enhanced with folder support"""
try:
self.logger.info("Loading video content...")
# Load trailers
trailers_path = Path(self.config.TRAILERS_DIR)
if trailers_path.exists():
for f in trailers_path.iterdir():
if f.is_file() and f.suffix.lower() in self.config.SUPPORTED_FORMATS:
self.trailer_videos.append(str(f))
self.logger.info(f"Loaded {len(self.trailer_videos)} trailer videos")
# Load single file mappings
self.key_mapping = self.config.load_key_mapping()
self.logger.info(f"Loaded {len(self.key_mapping)} single-file NFC mappings")
# Load folder mappings
self.folder_mapping = self.config.load_folder_mapping()
self.logger.info(f"Loaded {len(self.folder_mapping)} folder NFC mappings")
# Load folder state (playback positions)
self.folder_state = self.config.load_folder_state()
self.logger.info(f"Loaded playback state for {len(self.folder_state)} folders")
# Initialize state for new folders
for nfc_id, folder_data in self.folder_mapping.items():
folder_name = folder_data['folder_name']
if folder_name not in self.folder_state:
self.folder_state[folder_name] = 0
self.logger.info(f"Initialized folder '{folder_name}' at position 0")
# Save initial state
self.config.save_folder_state(self.folder_state)
except Exception as e:
self.logger.error(f"Content loading error: {e}", exc_info=True)
def reset_all_folder_positions(self):
"""Reset all folder sequences back to the first video"""
try:
self.logger.info("Resetting all folder positions to start")
# Reset all positions to 0
for folder_name in self.folder_state:
self.folder_state[folder_name] = 0
# Save the reset state
self.config.save_folder_state(self.folder_state)
self.logger.info(f"Reset complete - {len(self.folder_state)} folders reset")
self.debug_console.log(f"All folder sequences reset to beginning ({len(self.folder_state)} folders)")
return True
except Exception as e:
self.logger.error(f"Failed to reset folder positions: {e}")
self.debug_console.log_error("Failed to reset folder positions")
return False
def get_next_video_from_folder(self, nfc_id):
"""Get the next video in sequence from a folder"""
try:
if nfc_id not in self.folder_mapping:
return None
folder_data = self.folder_mapping[nfc_id]
folder_name = folder_data['folder_name']
videos = folder_data['videos']
if not videos:
self.logger.warning(f"No videos in folder '{folder_name}'")
return None
# Get current position
current_pos = self.folder_state.get(folder_name, 0)
# Ensure position is valid
if current_pos >= len(videos):
current_pos = 0
# Get the video at current position
video_path = videos[current_pos]
# Advance position for next time
next_pos = (current_pos + 1) % len(videos)
self.folder_state[folder_name] = next_pos
# Save state
self.config.save_folder_state(self.folder_state)
self.logger.info(f"Folder '{folder_name}': Playing video {current_pos + 1}/{len(videos)}")
self.logger.info(f"Next scan will play video {next_pos + 1}/{len(videos)}")
self.debug_console.log(f"Folder '{folder_name}': Video {current_pos + 1}/{len(videos)}")
return video_path
except Exception as e:
self.logger.error(f"Error getting next folder video: {e}")
return None
def get_active_player(self):
"""Get currently active MPV player"""
return self.player_a if self.active_player == 'a' else self.player_b
def get_inactive_player(self):
"""Get currently inactive MPV player"""
return self.player_b if self.active_player == 'a' else self.player_a
def can_skip_video(self):
"""Check if video has played long enough to be skipped"""
if not self.video_start_time:
return True
current_time = time.time()
play_duration = current_time - self.video_start_time
min_time = self.minimum_specific_play_time if self.specific_video_playing else self.minimum_play_time
can_skip = play_duration >= min_time
if not can_skip:
remaining = min_time - play_duration
self.logger.debug(f"Skip blocked: needs {remaining:.1f}s more")
return can_skip
def seamless_transition(self, video_path, is_specific=False):
"""Perform seamless video transition"""
transition_start = time.time()
with self.transition_lock:
try:
video_name = Path(video_path).name
self.logger.info(f"Starting transition to: {video_name}")
# Rate limiting
time_since_last = time.time() - self.last_transition_time
if time_since_last < 0.3:
time.sleep(0.3 - time_since_last)
inactive_player = self.get_inactive_player()
active_player = self.get_active_player()
inactive_player.play(str(video_path))
# Wait for video to start
start_wait = time.time()
while time.time() - start_wait < 2.0:
try:
if not inactive_player.idle_active:
break
except:
pass
time.sleep(0.05)
time.sleep(0.15)
try:
if active_player:
active_player.stop()
except:
pass
self.active_player = 'b' if self.active_player == 'a' else 'a'
self.current_video_path = video_path
self.specific_video_playing = is_specific
self.last_transition_time = time.time()
self.stats['transitions'] += 1
self.video_start_time = time.time()
if is_specific:
self.specific_video_start_time = time.time()
self.stats['folder_videos_played'] += 1
self.logger.info(f"SPECIFIC VIDEO NOW PLAYING: {video_name}")
else:
self.specific_video_start_time = None
transition_time = time.time() - transition_start
self.logger.info(f"Transition complete in {transition_time:.3f}s")
self.debug_console.log_video_played(video_name)
return True
except Exception as e:
self.stats['failed_loads'] += 1
self.logger.error(f"Transition failed: {e}", exc_info=True)
return False
def skip_current_video(self):
"""Skip current video"""
try:
if not self.can_skip_video():
self.logger.info("Skip request ignored - minimum play time not reached")
return
self.logger.info("Skipping current video")
active_player = self.get_active_player()
try:
active_player.stop()
except:
pass
self.specific_video_playing = False
self.specific_video_start_time = None
self.pending_specific_video = False
self.force_next = True
self.video_start_time = None
self.debug_console.log("Video skipped by user")
except Exception as e:
self.logger.error(f"Skip failed: {e}", exc_info=True)
def force_next_video(self):
"""Force next video"""
try:
if not self.can_skip_video():
return
self.logger.info("Forcing next video")
if not self.pending_specific_video:
self.force_next = True
self.specific_video_playing = False
self.specific_video_start_time = None
self.video_start_time = None
if self.current_mode == "specific":
self.current_mode = "trailers"
self.logger.info("Force returning to trailer mode")
except Exception as e:
self.logger.error(f"Force next failed: {e}", exc_info=True)
def set_volume(self, volume):
"""Set volume for both MPV players"""
try:
vol = int(volume * 100)
vol = max(0, min(100, vol))
if self.player_a:
self.player_a.volume = vol
if self.player_b:
self.player_b.volume = vol
self.logger.debug(f"Volume set to {vol}%")
except Exception as e:
self.logger.error(f"Volume setting failed: {e}")
def toggle_fullscreen(self):
"""Toggle fullscreen mode"""
try:
active_player = self.get_active_player()
if active_player:
try:
current_state = active_player.fullscreen
new_state = not current_state
active_player.fullscreen = new_state
self.is_fullscreen = new_state
self.logger.info(f"Fullscreen toggled: {new_state}")
self.debug_console.log(f"Fullscreen: {new_state}")
except Exception as e:
self.logger.warning(f"Fullscreen toggle failed: {e}")
except Exception as e:
self.logger.error(f"Fullscreen toggle failed: {e}")
def is_video_playing(self):
"""Check if video is currently playing"""
try:
active_player = self.get_active_player()
if not active_player:
return False
return not active_player.idle_active
except:
return False
def get_video_duration(self):
"""Get current video duration in seconds"""
try:
active_player = self.get_active_player()
if not active_player:
return 0
duration = active_player.duration
return duration if duration and duration > 0 else 0
except:
return 0
def get_video_position(self):
"""Get current video position in seconds"""
try:
active_player = self.get_active_player()
if not active_player:
return 0
position = active_player.playback_time
return position if position and position > 0 else 0
except:
return 0
def get_random_trailer(self):
"""Get random trailer video with anti-repeat logic"""
if not self.trailer_videos:
return None
# If we have very few trailers, just pick randomly
if len(self.trailer_videos) <= 3:
trailer = random.choice(self.trailer_videos)
self.logger.debug(f"Selected trailer (few available): {Path(trailer).name}")
return trailer
# Get trailers that haven't been played recently
available_trailers = [
t for t in self.trailer_videos
if t not in self.recently_played_trailers[-self.max_recent_trailers:]
]
# If all trailers were played recently, use the oldest ones
if not available_trailers:
self.logger.debug("All trailers recently played - clearing recent history")
# Keep only the most recent 5 to avoid immediate repeats
self.recently_played_trailers = self.recently_played_trailers[-5:]
available_trailers = [
t for t in self.trailer_videos
if t not in self.recently_played_trailers
]
# If still none available, just use all
if not available_trailers:
available_trailers = self.trailer_videos
# Select random trailer from available pool
trailer = random.choice(available_trailers)
# Add to history
self.recently_played_trailers.append(trailer)
# Trim history to max size
if len(self.recently_played_trailers) > self.trailer_history_size:
self.recently_played_trailers = self.recently_played_trailers[-self.trailer_history_size:]
recent_count = len([t for t in self.recently_played_trailers[-self.max_recent_trailers:] if t == trailer])
self.logger.info(f"Selected trailer: {Path(trailer).name}")
self.logger.debug(f"Available pool: {len(available_trailers)}/{len(self.trailer_videos)} trailers")
self.logger.debug(f"Recent history size: {len(self.recently_played_trailers)}")
return trailer
def process_video_queue(self):
"""Process video queue"""
processed = 0
try:
while not self.video_queue.empty():
try:
mode, video_path = self.video_queue.get_nowait()
processed += 1
video_name = Path(video_path).name
self.logger.info(f"Processing queued video: {video_name}")
is_specific = (mode == "specific")
if is_specific:
self.pending_specific_video = True
if self.seamless_transition(video_path, is_specific=is_specific):
self.current_mode = mode
self.stats['queue_processed'] += 1
if is_specific:
time.sleep(0.3)
duration = self.get_video_duration()
self.logger.info(f"Video duration: {duration:.1f}s")
self.pending_specific_video = False
# Clear remaining queue
while not self.video_queue.empty():
try:
self.video_queue.get_nowait()
except queue.Empty:
break
break
else:
if is_specific:
self.pending_specific_video = False
except queue.Empty:
break
except Exception as e:
self.logger.error(f"Queue processing error: {e}", exc_info=True)
self.pending_specific_video = False
if processed > 0:
self.debug_console.update_queue_depth(self.video_queue.qsize())
def play_specific_video(self, nfc_id):
"""Handle NFC input - Enhanced with folder support"""
with self.lock:
try:
self.logger.info(f"Processing NFC input: {nfc_id}")
# Check if this is a folder mapping first
if nfc_id in self.folder_mapping:
self.logger.info(f"NFC {nfc_id} mapped to folder sequence")
video_path = self.get_next_video_from_folder(nfc_id)
if not video_path:
self.logger.error(f"Could not get video from folder for NFC {nfc_id}")
self.debug_console.log_error(f"No video available for NFC {nfc_id}")
return
if not Path(video_path).exists():
self.logger.error(f"Video file not found: {video_path}")
self.debug_console.log_error(f"File missing: {video_path}")
return
# Stop current video
try:
active_player = self.get_active_player()
if active_player:
active_player.stop()
except:
pass
# Clear queue
while not self.video_queue.empty():
try:
self.video_queue.get_nowait()
except queue.Empty:
break
# Queue the folder video
self.video_queue.put(("specific", video_path))
self.logger.info(f"Queued folder video: {Path(video_path).name}")
self.pending_specific_video = True
self.debug_console.update_queue_depth(self.video_queue.qsize())
return
# Check single file mapping
if nfc_id in self.key_mapping:
self.logger.info(f"NFC {nfc_id} mapped to single file")
video_path = self.key_mapping[nfc_id]
if video_path.startswith("Missing:"):
self.logger.error(f"Video marked as missing for NFC {nfc_id}")
self.debug_console.log_error(f"Missing video for NFC {nfc_id}")
return
if not Path(video_path).exists():
self.logger.error(f"Video file not found: {video_path}")
self.debug_console.log_error(f"File missing: {video_path}")
return
# Stop current video
try:
active_player = self.get_active_player()
if active_player:
active_player.stop()
except:
pass
# Clear queue
while not self.video_queue.empty():
try:
self.video_queue.get_nowait()
except queue.Empty:
break
# Queue the video
self.video_queue.put(("specific", video_path))
self.logger.info(f"Queued single file video: {Path(video_path).name}")
self.pending_specific_video = True
self.debug_console.update_queue_depth(self.video_queue.qsize())
return
# No mapping found
self.logger.warning(f"No mapping found for NFC ID: {nfc_id}")
self.debug_console.log(f"Unknown NFC ID: {nfc_id}")
except Exception as e:
self.logger.error(f"NFC processing error: {e}", exc_info=True)
def run(self):
"""Main video player loop"""
self.running = True
self.start_time = time.time()
self.logger.info("=== MPV VIDEO PLAYER MAIN LOOP START ===")
# Start with random trailer
initial_trailer = self.get_random_trailer()
if initial_trailer:
self.seamless_transition(initial_trailer)
else:
self.logger.error("No initial trailer available!")
while self.running:
try:
# Process video queue
if not self.video_queue.empty():
self.process_video_queue()
# Force next video
elif self.force_next and not self.pending_specific_video:
self.force_next = False
if self.video_queue.empty():
if self.current_mode == "specific" or self.specific_video_playing:
self.current_mode = "trailers"
self.specific_video_playing = False
self.specific_video_start_time = None
next_trailer = self.get_random_trailer()
if next_trailer:
self.seamless_transition(next_trailer)
# Handle natural video end
elif not self.is_video_playing() and self.current_video_path:
self.logger.info("Video ended naturally")
if self.specific_video_playing or self.current_mode == "specific":
self.current_mode = "trailers"
self.specific_video_playing = False
self.specific_video_start_time = None
next_trailer = self.get_random_trailer()
if next_trailer:
time.sleep(self.config.TRANSITION_DELAY)
self.seamless_transition(next_trailer)
time.sleep(0.1)
except Exception as e:
self.logger.error(f"Main loop error: {e}", exc_info=True)
time.sleep(1)
self.logger.info("=== MPV VIDEO PLAYER MAIN LOOP END ===")
def stop(self):
"""Clean up MPV resources"""
self.logger.info("=== MPV VIDEO PLAYER STOP ===")
self.running = False
try:
if self.player_a:
try:
self.player_a.stop()
self.player_a.terminate()
except:
pass
if self.player_b:
try:
self.player_b.stop()
self.player_b.terminate()
except:
pass
except Exception as e:
self.logger.error(f"Cleanup error: {e}")
self.logger.info("=== MPV CLEANUP COMPLETE ===")

398
validate_videos.py Normal file
View File

@@ -0,0 +1,398 @@
# validate_videos.py
"""
Video Collection Validator
Checks NFC mappings against actual video files and provides detailed reports
"""
import sys
from pathlib import Path
from datetime import datetime
import logging
# Setup basic logging
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s: %(message)s'
)
class VideoValidator:
def __init__(self):
self.base_dir = Path(__file__).parent
self.videos_dir = self.base_dir / "videos"
self.trailers_dir = self.videos_dir / "trailers"
self.specific_dir = self.videos_dir / "specific"
self.mapping_file = self.videos_dir / "key_mapping.txt"
self.supported_formats = ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v']
self.results = {
'valid_mappings': [],
'missing_files': [],
'unmapped_videos': [],
'trailer_count': 0,
'specific_count': 0
}
def print_header(self):
"""Print validation header"""
print("\n" + "="*70)
print("VIDEO COLLECTION VALIDATOR")
print("="*70)
print(f"Validation Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70 + "\n")
def check_directories(self):
"""Check if required directories exist"""
print("Checking directories...")
print("-" * 70)
all_exist = True
dirs = [
("Videos", self.videos_dir),
("Trailers", self.trailers_dir),
("Specific Videos", self.specific_dir)
]
for name, path in dirs:
if path.exists():
print(f"{name} directory found: {path}")
else:
print(f"{name} directory MISSING: {path}")
all_exist = False
print()
return all_exist
def scan_video_files(self, directory, category):
"""Scan a directory for video files"""
if not directory.exists():
return []
videos = []
for file_path in directory.iterdir():
if file_path.is_file() and file_path.suffix.lower() in self.supported_formats:
size_mb = file_path.stat().st_size / (1024 * 1024)
videos.append({
'path': file_path,
'name': file_path.name,
'stem': file_path.stem,
'size_mb': size_mb
})
return videos
def load_nfc_mappings(self):
"""Load and parse NFC mappings"""
if not self.mapping_file.exists():
print(f"✗ Key mapping file not found: {self.mapping_file}")
return {}
mappings = {}
line_num = 0
try:
with open(self.mapping_file, 'r', encoding='utf-8') as f:
for line in f:
line_num += 1
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
if ',' not in line:
print(f"⚠ Warning: Invalid format at line {line_num}: {line}")
continue
parts = line.split(',', 1)
if len(parts) != 2:
continue
nfc_id = parts[0].strip()
movie_name = parts[1].strip()
if nfc_id and movie_name:
mappings[nfc_id] = movie_name
except Exception as e:
print(f"✗ Error reading mapping file: {e}")
return {}
return mappings
def find_video_match(self, movie_name, video_files):
"""Find matching video file for a movie name"""
movie_lower = movie_name.lower()
# Strategy 1: Exact match (stem)
for video in video_files:
if video['stem'].lower() == movie_lower:
return video, 'exact'
# Strategy 2: Movie name in filename
for video in video_files:
if movie_lower in video['stem'].lower():
return video, 'substring'
# Strategy 3: Filename in movie name (reverse check)
for video in video_files:
if video['stem'].lower() in movie_lower:
return video, 'reverse'
return None, None
def validate_mappings(self):
"""Validate NFC mappings against video files"""
print("Validating NFC mappings...")
print("-" * 70)
# Load mappings
mappings = self.load_nfc_mappings()
if not mappings:
print("✗ No valid mappings found in key_mapping.txt")
print(" Create mappings in format: NFC_ID,Movie_Name")
print()
return
print(f"Found {len(mappings)} NFC mappings to validate")
print()
# Scan specific video files
specific_videos = self.scan_video_files(self.specific_dir, "specific")
self.results['specific_count'] = len(specific_videos)
print(f"Found {len(specific_videos)} video files in specific folder")
print()
# Check each mapping
print("Checking mappings:")
print()
for nfc_id, movie_name in mappings.items():
match, match_type = self.find_video_match(movie_name, specific_videos)
if match:
self.results['valid_mappings'].append({
'nfc_id': nfc_id,
'movie_name': movie_name,
'file': match['name'],
'size_mb': match['size_mb'],
'match_type': match_type
})
print(f"{nfc_id}'{movie_name}'")
print(f" Matched: {match['name']} ({match['size_mb']:.1f} MB) [{match_type}]")
else:
self.results['missing_files'].append({
'nfc_id': nfc_id,
'movie_name': movie_name
})
print(f"{nfc_id}'{movie_name}'")
print(f" ERROR: No matching video file found!")
print()
def check_unmapped_videos(self):
"""Find videos that don't have NFC mappings"""
print("Checking for unmapped videos...")
print("-" * 70)
mappings = self.load_nfc_mappings()
specific_videos = self.scan_video_files(self.specific_dir, "specific")
# Get list of mapped movie names (lowercase for comparison)
mapped_names = set(name.lower() for name in mappings.values())
unmapped = []
for video in specific_videos:
# Check if this video matches any mapping
video_matched = False
for movie_name in mappings.values():
if (movie_name.lower() in video['stem'].lower() or
video['stem'].lower() in movie_name.lower()):
video_matched = True
break
if not video_matched:
unmapped.append(video)
self.results['unmapped_videos'].append(video)
if unmapped:
print(f"Found {len(unmapped)} unmapped videos:")
print()
for video in unmapped:
print(f"{video['name']} ({video['size_mb']:.1f} MB)")
print()
print("Consider adding NFC mappings for these videos!")
else:
print("✓ All specific videos have NFC mappings")
print()
def check_trailers(self):
"""Check trailer video collection"""
print("Checking trailer collection...")
print("-" * 70)
trailers = self.scan_video_files(self.trailers_dir, "trailers")
self.results['trailer_count'] = len(trailers)
if len(trailers) == 0:
print("✗ No trailer videos found!")
print(" Add trailer videos to: " + str(self.trailers_dir))
elif len(trailers) < 5:
print(f"⚠ Only {len(trailers)} trailers found")
print(" Consider adding more trailers for variety")
print()
for trailer in trailers:
print(f"{trailer['name']} ({trailer['size_mb']:.1f} MB)")
else:
print(f"✓ Found {len(trailers)} trailer videos")
total_size = sum(t['size_mb'] for t in trailers)
print(f" Total size: {total_size:.1f} MB")
print()
print("Sample trailers:")
for trailer in trailers[:5]:
print(f"{trailer['name']} ({trailer['size_mb']:.1f} MB)")
if len(trailers) > 5:
print(f" ... and {len(trailers) - 5} more")
print()
def print_summary(self):
"""Print validation summary"""
print("\n" + "="*70)
print("VALIDATION SUMMARY")
print("="*70)
valid_count = len(self.results['valid_mappings'])
missing_count = len(self.results['missing_files'])
unmapped_count = len(self.results['unmapped_videos'])
trailer_count = self.results['trailer_count']
specific_count = self.results['specific_count']
print(f"\nTrailer Videos: {trailer_count}")
print(f"Specific Videos: {specific_count}")
print(f"Valid NFC Mappings: {valid_count}")
print(f"Missing Files: {missing_count}")
print(f"Unmapped Videos: {unmapped_count}")
# Overall status
print("\n" + "-"*70)
if missing_count == 0 and trailer_count > 0:
print("✓ VALIDATION PASSED - System ready!")
elif missing_count == 0 and trailer_count == 0:
print("⚠ WARNING - No trailer videos found")
else:
print("✗ VALIDATION FAILED - Fix missing files")
print("-"*70)
# Recommendations
print("\nRECOMMENDATIONS:")
if missing_count > 0:
print(f" • Fix {missing_count} missing video file(s)")
if trailer_count == 0:
print(" • Add trailer videos to trailers folder")
elif trailer_count < 5:
print(f" • Add more trailers (currently only {trailer_count})")
if unmapped_count > 0:
print(f" • Add NFC mappings for {unmapped_count} unmapped video(s)")
if missing_count == 0 and trailer_count >= 5 and unmapped_count == 0:
print(" • Everything looks good! Your system is ready to use.")
print()
def save_report(self):
"""Save detailed report to file"""
report_file = self.base_dir / f"validation_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
try:
with open(report_file, 'w', encoding='utf-8') as f:
f.write("VIDEO COLLECTION VALIDATION REPORT\n")
f.write("="*70 + "\n")
f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*70 + "\n\n")
f.write("VALID MAPPINGS:\n")
f.write("-"*70 + "\n")
for item in self.results['valid_mappings']:
f.write(f"NFC: {item['nfc_id']}\n")
f.write(f" Mapping: '{item['movie_name']}'\n")
f.write(f" File: {item['file']}\n")
f.write(f" Size: {item['size_mb']:.1f} MB\n")
f.write(f" Match: {item['match_type']}\n\n")
if self.results['missing_files']:
f.write("\nMISSING FILES:\n")
f.write("-"*70 + "\n")
for item in self.results['missing_files']:
f.write(f"NFC: {item['nfc_id']}\n")
f.write(f" Looking for: '{item['movie_name']}'\n")
f.write(f" Status: FILE NOT FOUND\n\n")
if self.results['unmapped_videos']:
f.write("\nUNMAPPED VIDEOS:\n")
f.write("-"*70 + "\n")
for video in self.results['unmapped_videos']:
f.write(f"File: {video['name']}\n")
f.write(f" Size: {video['size_mb']:.1f} MB\n")
f.write(f" Status: No NFC mapping\n\n")
print(f"Detailed report saved to: {report_file}")
except Exception as e:
print(f"Warning: Could not save report file: {e}")
def run(self):
"""Run complete validation"""
self.print_header()
# Check directories
if not self.check_directories():
print("\n✗ Required directories are missing!")
print(" Please run setup or create the required directories.")
return 1
# Check trailers
self.check_trailers()
# Validate mappings
self.validate_mappings()
# Check unmapped videos
self.check_unmapped_videos()
# Print summary
self.print_summary()
# Save report
self.save_report()
# Return exit code
if len(self.results['missing_files']) > 0:
return 1
return 0
def main():
"""Main entry point"""
try:
validator = VideoValidator()
exit_code = validator.run()
print("\n" + "="*70)
print("Press any key to exit...")
input()
return exit_code
except KeyboardInterrupt:
print("\n\nValidation cancelled by user")
return 1
except Exception as e:
print(f"\n✗ Validation error: {e}")
logging.exception("Validation failed")
return 1
if __name__ == "__main__":
sys.exit(main())

911
web_interface.py Normal file
View File

@@ -0,0 +1,911 @@
# web_interface_clean.py
"""
Clean Web Interface - No Logs, Fixed Status Updates
Focuses on controls and statistics with better real-time updates
"""
import os
import json
import logging
import threading
import time
from datetime import datetime, timedelta
from pathlib import Path
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit
import queue
class DateTimeJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, timedelta):
return str(obj)
return super().default(obj)
def make_json_safe(data):
if isinstance(data, dict):
return {key: make_json_safe(value) for key, value in data.items()}
elif isinstance(data, list):
return [make_json_safe(item) for item in data]
elif isinstance(data, datetime):
return data.isoformat()
elif isinstance(data, timedelta):
return str(data)
else:
return data
class WebInterface:
def __init__(self, debug_console=None, host='0.0.0.0', port=8547):
self.debug_console = debug_console
self.video_player = None
self.host = host
self.port = port
self.logger = logging.getLogger(__name__)
# Web command tracking
self.command_history = []
self.max_history = 100
self.command_lock = threading.Lock()
# Statistics cache with shorter refresh
self.stats_cache = {}
self.last_stats_update = 0
self.stats_cache_duration = 0.5 # Update every 500ms for better responsiveness
# Message queues
self.message_queue = queue.Queue()
self.log_queue = queue.Queue()
# Flask app setup
self.app = Flask(__name__)
self.app.config['SECRET_KEY'] = 'video_player_secret_key_2024'
# SocketIO setup
self.socketio = SocketIO(
self.app,
cors_allowed_origins="*",
json=DateTimeJSONEncoder,
async_mode='threading'
)
# Background task control
self.running = True
self.stats_thread = None
self.log_thread = None
# Setup routes and handlers
self.setup_routes()
self.setup_socketio_handlers()
self.logger.info("Clean web interface initialized successfully")
# =================================================================
# REQUIRED COMPATIBILITY METHODS
# =================================================================
def log(self, message, level="INFO"):
"""Required by existing debug console"""
try:
if level == "ERROR":
self.logger.error(message)
elif level == "WARNING":
self.logger.warning(message)
else:
self.logger.info(message)
except Exception as e:
logging.getLogger(__name__).error(f"Web interface log error: {e}")
def log_error(self, message):
self.log(message, "ERROR")
def log_info(self, message):
self.log(message, "INFO")
def log_warning(self, message):
self.log(message, "WARNING")
def log_video_played(self, video_name):
"""Required compatibility method"""
self.log(f"Video played: {video_name}", "INFO")
self.log_web_command("VIDEO_PLAYED",
params={'video': video_name},
source="player",
details=f"Video started: {video_name}")
# Force immediate stats update when video changes
self.last_stats_update = 0
self.stats_cache = {}
def set_video_player(self, video_player):
"""Required by main application"""
self.video_player = video_player
self.log(f"Video player connected to web interface")
def update_global_input_status(self, *args, **kwargs):
"""Required by debug console - flexible parameters"""
pass
def get_stats(self):
"""Alternative method name for getting statistics"""
return self.get_current_stats()
# =================================================================
# CORE METHODS - IMPROVED STATUS TRACKING
# =================================================================
def log_web_command(self, command, params=None, source="web", success=True, details=None):
"""Comprehensive logging for all web commands"""
try:
timestamp = datetime.now()
log_entry = {
'timestamp': timestamp.isoformat(),
'command': command,
'source': source,
'success': success,
'params': params or {},
'details': details or 'Command executed'
}
with self.command_lock:
self.command_history.append(log_entry)
if len(self.command_history) > self.max_history:
self.command_history.pop(0)
status = "SUCCESS" if success else "FAILED"
param_str = f" | Params: {params}" if params else ""
detail_str = f" | {details}" if details else ""
log_message = f"[WEB-CMD] {command} [{status}] from {source.upper()}{param_str}{detail_str}"
self.logger.info(log_message)
except Exception as e:
self.logger.error(f"Error logging web command: {e}")
def get_current_stats(self):
"""Get current statistics with improved real-time updates"""
try:
current_time = time.time()
# Use cached stats if very recent
if (current_time - self.last_stats_update) < self.stats_cache_duration and self.stats_cache:
return self.stats_cache
# Get fresh stats from debug console
raw_stats = {}
if self.debug_console:
try:
if hasattr(self.debug_console, 'get_stats'):
raw_stats = self.debug_console.get_stats()
elif hasattr(self.debug_console, 'stats'):
raw_stats = dict(self.debug_console.stats)
except Exception as e:
self.logger.debug(f"Error getting debug console stats: {e}")
# Calculate uptime
uptime_str = '00:00:00'
if 'start_time' in raw_stats and isinstance(raw_stats['start_time'], datetime):
uptime = datetime.now() - raw_stats['start_time']
uptime_str = str(uptime).split('.')[0]
# Get current status with better detection
current_status = self.get_current_status()
current_video = self.get_current_video()
# Build stats object
safe_stats = {
'videos_played': raw_stats.get('videos_played', 0),
'nfc_scans': raw_stats.get('key_presses', 0),
'errors': raw_stats.get('errors', 0),
'uptime': uptime_str,
'queue_depth': raw_stats.get('queue_depth', 0),
'current_video': current_video,
'status': current_status,
'fullscreen': raw_stats.get('fullscreen', True),
'web_commands_executed': len(self.command_history),
'last_update': datetime.now().isoformat(),
'connection_status': 'Connected' if self.debug_console else 'No Player',
'is_playing': self.is_video_playing(),
'player_ready': bool(self.get_video_player())
}
# Cache the stats
self.stats_cache = safe_stats
self.last_stats_update = current_time
return safe_stats
except Exception as e:
self.logger.error(f"Error getting stats: {e}")
return {
'videos_played': 0,
'nfc_scans': 0,
'errors': 1,
'uptime': '00:00:00',
'queue_depth': 0,
'current_video': 'Error',
'status': 'Error getting stats',
'fullscreen': True,
'web_commands_executed': 0,
'last_update': datetime.now().isoformat(),
'connection_status': 'Error',
'is_playing': False,
'player_ready': False
}
def get_current_status(self):
"""Get current status with better detection"""
try:
player = self.get_video_player()
if not player:
return "No video player connected"
# Check if specific video is playing
if hasattr(player, 'specific_video_playing') and player.specific_video_playing:
return "Playing specific video"
# Check if any video is playing
if hasattr(player, 'is_video_playing'):
try:
if player.is_video_playing():
return "Playing trailer"
else:
return "Ready"
except:
return "Ready"
# Check if video player is running
if hasattr(player, 'running') and player.running:
return "Ready"
return "Player stopped"
except Exception as e:
self.logger.debug(f"Error getting status: {e}")
return "Status unknown"
def get_current_video(self):
"""Get current video with better detection"""
try:
player = self.get_video_player()
if not player:
return "No video player"
# Try to get current video path
if hasattr(player, 'current_video_path') and player.current_video_path:
video_path = Path(player.current_video_path)
return video_path.name
# Try to get from stats
if self.debug_console and hasattr(self.debug_console, 'stats'):
current_video = self.debug_console.stats.get('current_video', 'None')
if current_video and current_video != 'None':
return current_video
return "No video"
except Exception as e:
self.logger.debug(f"Error getting current video: {e}")
return "Unknown"
def is_video_playing(self):
"""Check if video is currently playing"""
try:
player = self.get_video_player()
if not player:
return False
if hasattr(player, 'is_video_playing'):
return player.is_video_playing()
return False
except Exception as e:
self.logger.debug(f"Error checking if video playing: {e}")
return False
def get_video_player(self):
"""Get video player from multiple possible sources"""
if self.video_player:
return self.video_player
elif self.debug_console and hasattr(self.debug_console, 'video_player'):
return self.debug_console.video_player
else:
return None
def broadcast_stats(self):
"""Broadcast statistics update via SocketIO"""
try:
safe_stats = self.get_current_stats()
if hasattr(self, 'socketio'):
self.socketio.emit('stats_update', safe_stats)
except Exception as e:
self.logger.error(f"Failed to broadcast stats: {e}")
def stats_broadcast_loop(self):
"""Background thread to broadcast stats updates - faster updates"""
while self.running:
try:
self.broadcast_stats()
time.sleep(1) # Update every 1 second for better responsiveness
except Exception as e:
self.logger.error(f"Stats broadcast loop error: {e}")
time.sleep(5)
def log_broadcast_loop(self):
"""Background thread - not needed for clean interface but kept for compatibility"""
while self.running:
try:
# Clear the queue but don't broadcast logs
while not self.log_queue.empty():
try:
self.log_queue.get_nowait()
except queue.Empty:
break
time.sleep(1)
except Exception as e:
self.logger.error(f"Log broadcast loop error: {e}")
time.sleep(1)
def setup_routes(self):
"""Setup Flask routes"""
@self.app.route('/')
def index():
"""Main interface"""
try:
self.log_web_command("PAGE_LOAD", source="web", details="Main interface accessed")
return self.create_clean_html()
except Exception as e:
self.log_web_command("PAGE_LOAD", success=False, details=f"Error: {e}")
return self.create_clean_html()
@self.app.route('/api/video/skip', methods=['POST'])
def api_skip_video():
"""Skip current video"""
try:
self.log_web_command("SKIP_VIDEO", source="web", details="User clicked skip button")
if self.get_video_player():
self.get_video_player().skip_current_video()
self.log_web_command("SKIP_VIDEO", success=True, details="Skip command sent to player")
# Force immediate stats update
self.last_stats_update = 0
return jsonify({'success': True, 'message': 'Video skipped successfully'})
else:
self.log_web_command("SKIP_VIDEO", success=False, details="No video player available")
return jsonify({'success': False, 'message': 'Video player not available'}), 500
except Exception as e:
self.log_web_command("SKIP_VIDEO", success=False, details=f"Error: {str(e)}")
return jsonify({'success': False, 'message': f'Skip failed: {str(e)}'}), 500
@self.app.route('/api/video/toggle-fullscreen', methods=['POST'])
def api_toggle_fullscreen():
"""Toggle fullscreen"""
try:
self.log_web_command("TOGGLE_FULLSCREEN", source="web", details="User toggled fullscreen")
if self.get_video_player():
self.get_video_player().toggle_fullscreen()
self.log_web_command("TOGGLE_FULLSCREEN", success=True, details="Fullscreen toggle sent to player")
return jsonify({'success': True, 'message': 'Fullscreen toggled'})
else:
self.log_web_command("TOGGLE_FULLSCREEN", success=False, details="No video player available")
return jsonify({'success': False, 'message': 'Video player not available'}), 500
except Exception as e:
self.log_web_command("TOGGLE_FULLSCREEN", success=False, details=f"Error: {str(e)}")
return jsonify({'success': False, 'message': f'Fullscreen toggle failed: {str(e)}'}), 500
@self.app.route('/api/nfc/send', methods=['POST'])
def api_send_nfc():
"""Send NFC input"""
try:
data = request.get_json()
nfc_id = str(data.get('nfc_id', '')).strip()
self.log_web_command("NFC_INPUT", params={'nfc_id': nfc_id},
source="web", details=f"User sent NFC ID: {nfc_id}")
if self.get_video_player():
self.get_video_player().play_specific_video(nfc_id)
self.log_web_command("NFC_INPUT", params={'nfc_id': nfc_id},
success=True, details=f"NFC {nfc_id} sent to player")
# Force immediate stats update
self.last_stats_update = 0
return jsonify({'success': True, 'message': f'NFC {nfc_id} sent successfully'})
else:
self.log_web_command("NFC_INPUT", success=False, details="No video player available")
return jsonify({'success': False, 'message': 'Video player not available'}), 500
except Exception as e:
self.log_web_command("NFC_INPUT", params={'nfc_id': nfc_id if 'nfc_id' in locals() else 'unknown'},
success=False, details=f"Error: {str(e)}")
return jsonify({'success': False, 'message': f'NFC input failed: {str(e)}'}), 500
@self.app.route('/api/stats', methods=['GET'])
def api_get_stats():
"""Get statistics"""
try:
stats = self.get_current_stats()
return jsonify(stats)
except Exception as e:
self.logger.error(f"Stats API error: {e}")
return jsonify({'error': 'Stats unavailable'}), 500
def setup_socketio_handlers(self):
"""Setup SocketIO event handlers"""
@self.socketio.on('connect')
def handle_connect():
"""Handle client connection"""
self.logger.info("Web client connected")
self.log_web_command("CONNECT", source="socketio", details="Client connected")
# Send initial stats immediately
emit('stats_update', self.get_current_stats())
@self.socketio.on('disconnect')
def handle_disconnect():
"""Handle client disconnection"""
self.logger.info("Web client disconnected")
self.log_web_command("DISCONNECT", source="socketio", details="Client disconnected")
def create_clean_html(self):
"""Create clean HTML interface without logs"""
return '''<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Video Player Controller</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%);
color: white;
min-height: 100vh;
padding: 20px;
}
.container {
max-width: 1000px;
margin: 0 auto;
}
.header {
text-align: center;
margin-bottom: 30px;
}
.header h1 {
font-size: 2.5rem;
margin-bottom: 10px;
text-shadow: 2px 2px 4px rgba(0,0,0,0.3);
}
.panel {
background: rgba(255, 255, 255, 0.1);
backdrop-filter: blur(10px);
padding: 25px;
margin: 20px 0;
border-radius: 15px;
border: 1px solid rgba(255, 255, 255, 0.2);
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
}
.btn {
background: linear-gradient(45deg, #4CAF50, #45a049);
color: white;
border: none;
padding: 12px 24px;
margin: 8px;
border-radius: 8px;
cursor: pointer;
font-size: 16px;
font-weight: 600;
transition: all 0.3s ease;
box-shadow: 0 4px 15px rgba(76, 175, 80, 0.3);
}
.btn:hover {
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(76, 175, 80, 0.4);
}
.btn:active {
transform: translateY(0);
}
.status {
padding: 15px;
border-radius: 10px;
margin: 15px 0;
text-align: center;
font-weight: 600;
font-size: 18px;
transition: all 0.3s ease;
}
.status.success {
background: linear-gradient(45deg, #4CAF50, #45a049);
box-shadow: 0 4px 15px rgba(76, 175, 80, 0.3);
}
.status.error {
background: linear-gradient(45deg, #f44336, #d32f2f);
box-shadow: 0 4px 15px rgba(244, 67, 54, 0.3);
}
.status.info {
background: linear-gradient(45deg, #2196F3, #1976D2);
box-shadow: 0 4px 15px rgba(33, 150, 243, 0.3);
}
.input-group {
display: flex;
margin: 15px 0;
gap: 10px;
}
.input-group input {
flex: 1;
padding: 12px 16px;
border: 2px solid rgba(255, 255, 255, 0.3);
border-radius: 8px;
background: rgba(255, 255, 255, 0.1);
color: white;
font-size: 16px;
backdrop-filter: blur(10px);
}
.input-group input::placeholder {
color: rgba(255, 255, 255, 0.7);
}
.input-group input:focus {
outline: none;
border-color: #4CAF50;
box-shadow: 0 0 10px rgba(76, 175, 80, 0.3);
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
margin: 20px 0;
}
.stat {
background: rgba(255, 255, 255, 0.15);
padding: 20px;
border-radius: 12px;
text-align: center;
border: 1px solid rgba(255, 255, 255, 0.2);
transition: transform 0.3s ease;
}
.stat:hover {
transform: translateY(-3px);
}
.stat-value {
font-size: 2.2rem;
font-weight: bold;
color: #4CAF50;
margin-bottom: 8px;
text-shadow: 1px 1px 2px rgba(0,0,0,0.3);
}
.stat-label {
font-size: 0.9rem;
opacity: 0.9;
text-transform: uppercase;
letter-spacing: 1px;
}
.current-video {
background: rgba(255, 255, 255, 0.15);
padding: 20px;
border-radius: 12px;
margin: 20px 0;
border: 1px solid rgba(255, 255, 255, 0.2);
}
.current-video-value {
font-size: 1.3rem;
font-weight: 600;
color: #FFD700;
margin-top: 10px;
word-break: break-word;
}
.connection-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 8px;
animation: pulse 2s infinite;
}
.connected { background: #4CAF50; }
.disconnected { background: #f44336; }
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.5; }
100% { opacity: 1; }
}
.controls-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 15px;
}
h3 {
margin-bottom: 20px;
font-size: 1.4rem;
color: #FFD700;
text-shadow: 1px 1px 2px rgba(0,0,0,0.3);
}
@media (max-width: 768px) {
.container { padding: 10px; }
.panel { padding: 15px; }
.header h1 { font-size: 2rem; }
.stats-grid { grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); gap: 15px; }
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎬 Video Player Controller</h1>
<div id="status" class="status info">
<span id="connectionIndicator" class="connection-indicator disconnected"></span>
Connecting to video player...
</div>
</div>
<div class="panel">
<h3>📊 Live Statistics</h3>
<div class="stats-grid">
<div class="stat">
<div class="stat-value" id="videosPlayed">0</div>
<div class="stat-label">Videos Played</div>
</div>
<div class="stat">
<div class="stat-value" id="nfcScans">0</div>
<div class="stat-label">NFC Scans</div>
</div>
<div class="stat">
<div class="stat-value" id="uptime">00:00:00</div>
<div class="stat-label">Uptime</div>
</div>
<div class="stat">
<div class="stat-value" id="playerStatus">Unknown</div>
<div class="stat-label">Player Status</div>
</div>
</div>
<div class="current-video">
<h3>🎥 Currently Playing</h3>
<div class="current-video-value" id="currentVideo">No video</div>
</div>
</div>
<div class="panel">
<h3>🎮 Video Controls</h3>
<div class="controls-grid">
<button class="btn" onclick="skipVideo()">⏭️ Skip Video</button>
<button class="btn" onclick="toggleFullscreen()">🔳 Toggle Fullscreen</button>
</div>
</div>
<div class="panel">
<h3>🏷️ NFC Input</h3>
<div class="input-group">
<input type="text" id="nfcInput" placeholder="Enter NFC ID and press Enter..." onkeypress="handleNFCKeypress(event)">
<button class="btn" onclick="sendNFC()">Send NFC</button>
</div>
</div>
</div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.0/socket.io.js"></script>
<script>
const socket = io();
let lastStatsUpdate = 0;
function showStatus(message, type = 'info', duration = 3000) {
const status = document.getElementById('status');
const indicator = document.getElementById('connectionIndicator');
status.innerHTML = `<span id="connectionIndicator" class="connection-indicator ${type === 'success' ? 'connected' : 'disconnected'}"></span>${message}`;
status.className = `status ${type}`;
if (duration > 0) {
setTimeout(() => {
status.innerHTML = '<span id="connectionIndicator" class="connection-indicator connected"></span>Ready';
status.className = 'status success';
}, duration);
}
}
function skipVideo() {
fetch('/api/video/skip', {method: 'POST'})
.then(r => r.json())
.then(data => {
showStatus(data.message, data.success ? 'success' : 'error');
if (data.success) {
// Request immediate stats update
requestStatsUpdate();
}
})
.catch(e => showStatus('Network error', 'error'));
}
function toggleFullscreen() {
fetch('/api/video/toggle-fullscreen', {method: 'POST'})
.then(r => r.json())
.then(data => showStatus(data.message, data.success ? 'success' : 'error'))
.catch(e => showStatus('Network error', 'error'));
}
function sendNFC() {
const nfcId = document.getElementById('nfcInput').value.trim();
if (!nfcId) return showStatus('Please enter NFC ID', 'error');
fetch('/api/nfc/send', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({nfc_id: nfcId})
})
.then(r => r.json())
.then(data => {
showStatus(data.message, data.success ? 'success' : 'error');
if (data.success) {
document.getElementById('nfcInput').value = '';
// Request immediate stats update
requestStatsUpdate();
}
})
.catch(e => showStatus('Network error', 'error'));
}
function handleNFCKeypress(event) {
if (event.key === 'Enter') sendNFC();
}
function requestStatsUpdate() {
// Request fresh stats from server
fetch('/api/stats')
.then(r => r.json())
.then(updateStats)
.catch(e => console.error('Stats request failed:', e));
}
function updateStats(stats) {
try {
// Update all stat values
document.getElementById('videosPlayed').textContent = stats.videos_played || 0;
document.getElementById('nfcScans').textContent = stats.nfc_scans || 0;
document.getElementById('uptime').textContent = stats.uptime || '00:00:00';
document.getElementById('playerStatus').textContent = stats.status || 'Unknown';
// Update current video with better formatting
const currentVideo = stats.current_video || 'No video';
const videoElement = document.getElementById('currentVideo');
if (currentVideo.length > 50) {
videoElement.textContent = currentVideo.substring(0, 47) + '...';
videoElement.title = currentVideo;
} else {
videoElement.textContent = currentVideo;
videoElement.title = currentVideo;
}
// Update connection status
const isConnected = stats.player_ready && stats.connection_status === 'Connected';
const indicator = document.getElementById('connectionIndicator');
if (indicator) {
indicator.className = `connection-indicator ${isConnected ? 'connected' : 'disconnected'}`;
}
lastStatsUpdate = Date.now();
} catch (e) {
console.error('Error updating stats:', e);
}
}
// Socket event handlers
socket.on('connect', () => {
showStatus('Connected to video player', 'success', 0);
console.log('Connected to video player web interface');
requestStatsUpdate();
});
socket.on('disconnect', () => {
showStatus('Disconnected from video player', 'error', 0);
console.log('Disconnected from video player');
});
socket.on('stats_update', updateStats);
// Request stats update every 5 seconds as backup
setInterval(() => {
if (Date.now() - lastStatsUpdate > 5000) {
requestStatsUpdate();
}
}, 5000);
// Initial connection attempt
showStatus('Attempting to connect...', 'info');
</script>
</body>
</html>'''
def run(self, host=None, port=None, debug=False, **kwargs):
"""Fixed run method that accepts all possible parameters"""
try:
run_host = host or self.host
run_port = port or self.port
self.logger.info(f"Starting clean web interface on {run_host}:{run_port}")
# Start background threads
if not self.stats_thread or not self.stats_thread.is_alive():
self.stats_thread = threading.Thread(target=self.stats_broadcast_loop, daemon=True)
self.log_thread = threading.Thread(target=self.log_broadcast_loop, daemon=True)
self.stats_thread.start()
self.log_thread.start()
self.logger.info("Background threads started")
# Start the SocketIO server
self.socketio.run(
self.app,
host=run_host,
port=run_port,
debug=False,
use_reloader=False,
allow_unsafe_werkzeug=True
)
except Exception as e:
self.logger.error(f"Web interface startup error: {e}")
self.logger.info("Web interface failed to start but application can continue")
def stop(self):
"""Stop the web interface"""
self.logger.info("Stopping clean web interface...")
self.running = False
if self.stats_thread and self.stats_thread.is_alive():
self.stats_thread.join(timeout=2)
if self.log_thread and self.log_thread.is_alive():
self.log_thread.join(timeout=2)
self.logger.info("Clean web interface stopped")
# Helper functions for backward compatibility
def create_web_interface(debug_console=None, host='0.0.0.0', port=8547):
return WebInterface(debug_console, host, port)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
web = WebInterface()
try:
web.run()
except KeyboardInterrupt:
web.stop()