# web_interface_clean.py """ Clean Web Interface - No Logs, Fixed Status Updates Focuses on controls and statistics with better real-time updates """ import os import json import logging import threading import time from datetime import datetime, timedelta from pathlib import Path from flask import Flask, render_template, request, jsonify from flask_socketio import SocketIO, emit import queue class DateTimeJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, timedelta): return str(obj) return super().default(obj) def make_json_safe(data): if isinstance(data, dict): return {key: make_json_safe(value) for key, value in data.items()} elif isinstance(data, list): return [make_json_safe(item) for item in data] elif isinstance(data, datetime): return data.isoformat() elif isinstance(data, timedelta): return str(data) else: return data class WebInterface: def __init__(self, debug_console=None, host='0.0.0.0', port=8547): self.debug_console = debug_console self.video_player = None self.host = host self.port = port self.logger = logging.getLogger(__name__) # Web command tracking self.command_history = [] self.max_history = 100 self.command_lock = threading.Lock() # Statistics cache with shorter refresh self.stats_cache = {} self.last_stats_update = 0 self.stats_cache_duration = 0.5 # Update every 500ms for better responsiveness # Message queues self.message_queue = queue.Queue() self.log_queue = queue.Queue() # Flask app setup self.app = Flask(__name__) self.app.config['SECRET_KEY'] = 'video_player_secret_key_2024' # SocketIO setup self.socketio = SocketIO( self.app, cors_allowed_origins="*", json=DateTimeJSONEncoder, async_mode='threading' ) # Background task control self.running = True self.stats_thread = None self.log_thread = None # Setup routes and handlers self.setup_routes() self.setup_socketio_handlers() self.logger.info("Clean web interface initialized successfully") # ================================================================= # REQUIRED COMPATIBILITY METHODS # ================================================================= def log(self, message, level="INFO"): """Required by existing debug console""" try: if level == "ERROR": self.logger.error(message) elif level == "WARNING": self.logger.warning(message) else: self.logger.info(message) except Exception as e: logging.getLogger(__name__).error(f"Web interface log error: {e}") def log_error(self, message): self.log(message, "ERROR") def log_info(self, message): self.log(message, "INFO") def log_warning(self, message): self.log(message, "WARNING") def log_video_played(self, video_name): """Required compatibility method""" self.log(f"Video played: {video_name}", "INFO") self.log_web_command("VIDEO_PLAYED", params={'video': video_name}, source="player", details=f"Video started: {video_name}") # Force immediate stats update when video changes self.last_stats_update = 0 self.stats_cache = {} def set_video_player(self, video_player): """Required by main application""" self.video_player = video_player self.log(f"Video player connected to web interface") def update_global_input_status(self, *args, **kwargs): """Required by debug console - flexible parameters""" pass def get_stats(self): """Alternative method name for getting statistics""" return self.get_current_stats() # ================================================================= # CORE METHODS - IMPROVED STATUS TRACKING # ================================================================= def log_web_command(self, command, params=None, source="web", success=True, details=None): """Comprehensive logging for all web commands""" try: timestamp = datetime.now() log_entry = { 'timestamp': timestamp.isoformat(), 'command': command, 'source': source, 'success': success, 'params': params or {}, 'details': details or 'Command executed' } with self.command_lock: self.command_history.append(log_entry) if len(self.command_history) > self.max_history: self.command_history.pop(0) status = "SUCCESS" if success else "FAILED" param_str = f" | Params: {params}" if params else "" detail_str = f" | {details}" if details else "" log_message = f"[WEB-CMD] {command} [{status}] from {source.upper()}{param_str}{detail_str}" self.logger.info(log_message) except Exception as e: self.logger.error(f"Error logging web command: {e}") def get_current_stats(self): """Get current statistics with improved real-time updates""" try: current_time = time.time() # Use cached stats if very recent if (current_time - self.last_stats_update) < self.stats_cache_duration and self.stats_cache: return self.stats_cache # Get fresh stats from debug console raw_stats = {} if self.debug_console: try: if hasattr(self.debug_console, 'get_stats'): raw_stats = self.debug_console.get_stats() elif hasattr(self.debug_console, 'stats'): raw_stats = dict(self.debug_console.stats) except Exception as e: self.logger.debug(f"Error getting debug console stats: {e}") # Calculate uptime uptime_str = '00:00:00' if 'start_time' in raw_stats and isinstance(raw_stats['start_time'], datetime): uptime = datetime.now() - raw_stats['start_time'] uptime_str = str(uptime).split('.')[0] # Get current status with better detection current_status = self.get_current_status() current_video = self.get_current_video() # Build stats object safe_stats = { 'videos_played': raw_stats.get('videos_played', 0), 'nfc_scans': raw_stats.get('key_presses', 0), 'errors': raw_stats.get('errors', 0), 'uptime': uptime_str, 'queue_depth': raw_stats.get('queue_depth', 0), 'current_video': current_video, 'status': current_status, 'fullscreen': raw_stats.get('fullscreen', True), 'web_commands_executed': len(self.command_history), 'last_update': datetime.now().isoformat(), 'connection_status': 'Connected' if self.debug_console else 'No Player', 'is_playing': self.is_video_playing(), 'player_ready': bool(self.get_video_player()) } # Cache the stats self.stats_cache = safe_stats self.last_stats_update = current_time return safe_stats except Exception as e: self.logger.error(f"Error getting stats: {e}") return { 'videos_played': 0, 'nfc_scans': 0, 'errors': 1, 'uptime': '00:00:00', 'queue_depth': 0, 'current_video': 'Error', 'status': 'Error getting stats', 'fullscreen': True, 'web_commands_executed': 0, 'last_update': datetime.now().isoformat(), 'connection_status': 'Error', 'is_playing': False, 'player_ready': False } def get_current_status(self): """Get current status with better detection""" try: player = self.get_video_player() if not player: return "No video player connected" # Check if specific video is playing if hasattr(player, 'specific_video_playing') and player.specific_video_playing: return "Playing specific video" # Check if any video is playing if hasattr(player, 'is_video_playing'): try: if player.is_video_playing(): return "Playing trailer" else: return "Ready" except: return "Ready" # Check if video player is running if hasattr(player, 'running') and player.running: return "Ready" return "Player stopped" except Exception as e: self.logger.debug(f"Error getting status: {e}") return "Status unknown" def get_current_video(self): """Get current video with better detection""" try: player = self.get_video_player() if not player: return "No video player" # Try to get current video path if hasattr(player, 'current_video_path') and player.current_video_path: video_path = Path(player.current_video_path) return video_path.name # Try to get from stats if self.debug_console and hasattr(self.debug_console, 'stats'): current_video = self.debug_console.stats.get('current_video', 'None') if current_video and current_video != 'None': return current_video return "No video" except Exception as e: self.logger.debug(f"Error getting current video: {e}") return "Unknown" def is_video_playing(self): """Check if video is currently playing""" try: player = self.get_video_player() if not player: return False if hasattr(player, 'is_video_playing'): return player.is_video_playing() return False except Exception as e: self.logger.debug(f"Error checking if video playing: {e}") return False def get_video_player(self): """Get video player from multiple possible sources""" if self.video_player: return self.video_player elif self.debug_console and hasattr(self.debug_console, 'video_player'): return self.debug_console.video_player else: return None def broadcast_stats(self): """Broadcast statistics update via SocketIO""" try: safe_stats = self.get_current_stats() if hasattr(self, 'socketio'): self.socketio.emit('stats_update', safe_stats) except Exception as e: self.logger.error(f"Failed to broadcast stats: {e}") def stats_broadcast_loop(self): """Background thread to broadcast stats updates - faster updates""" while self.running: try: self.broadcast_stats() time.sleep(1) # Update every 1 second for better responsiveness except Exception as e: self.logger.error(f"Stats broadcast loop error: {e}") time.sleep(5) def log_broadcast_loop(self): """Background thread - not needed for clean interface but kept for compatibility""" while self.running: try: # Clear the queue but don't broadcast logs while not self.log_queue.empty(): try: self.log_queue.get_nowait() except queue.Empty: break time.sleep(1) except Exception as e: self.logger.error(f"Log broadcast loop error: {e}") time.sleep(1) def setup_routes(self): """Setup Flask routes""" @self.app.route('/') def index(): """Main interface""" try: self.log_web_command("PAGE_LOAD", source="web", details="Main interface accessed") return self.create_clean_html() except Exception as e: self.log_web_command("PAGE_LOAD", success=False, details=f"Error: {e}") return self.create_clean_html() @self.app.route('/api/video/skip', methods=['POST']) def api_skip_video(): """Skip current video""" try: self.log_web_command("SKIP_VIDEO", source="web", details="User clicked skip button") if self.get_video_player(): self.get_video_player().skip_current_video() self.log_web_command("SKIP_VIDEO", success=True, details="Skip command sent to player") # Force immediate stats update self.last_stats_update = 0 return jsonify({'success': True, 'message': 'Video skipped successfully'}) else: self.log_web_command("SKIP_VIDEO", success=False, details="No video player available") return jsonify({'success': False, 'message': 'Video player not available'}), 500 except Exception as e: self.log_web_command("SKIP_VIDEO", success=False, details=f"Error: {str(e)}") return jsonify({'success': False, 'message': f'Skip failed: {str(e)}'}), 500 @self.app.route('/api/video/toggle-fullscreen', methods=['POST']) def api_toggle_fullscreen(): """Toggle fullscreen""" try: self.log_web_command("TOGGLE_FULLSCREEN", source="web", details="User toggled fullscreen") if self.get_video_player(): self.get_video_player().toggle_fullscreen() self.log_web_command("TOGGLE_FULLSCREEN", success=True, details="Fullscreen toggle sent to player") return jsonify({'success': True, 'message': 'Fullscreen toggled'}) else: self.log_web_command("TOGGLE_FULLSCREEN", success=False, details="No video player available") return jsonify({'success': False, 'message': 'Video player not available'}), 500 except Exception as e: self.log_web_command("TOGGLE_FULLSCREEN", success=False, details=f"Error: {str(e)}") return jsonify({'success': False, 'message': f'Fullscreen toggle failed: {str(e)}'}), 500 @self.app.route('/api/nfc/send', methods=['POST']) def api_send_nfc(): """Send NFC input""" try: data = request.get_json() nfc_id = str(data.get('nfc_id', '')).strip() self.log_web_command("NFC_INPUT", params={'nfc_id': nfc_id}, source="web", details=f"User sent NFC ID: {nfc_id}") if self.get_video_player(): self.get_video_player().play_specific_video(nfc_id) self.log_web_command("NFC_INPUT", params={'nfc_id': nfc_id}, success=True, details=f"NFC {nfc_id} sent to player") # Force immediate stats update self.last_stats_update = 0 return jsonify({'success': True, 'message': f'NFC {nfc_id} sent successfully'}) else: self.log_web_command("NFC_INPUT", success=False, details="No video player available") return jsonify({'success': False, 'message': 'Video player not available'}), 500 except Exception as e: self.log_web_command("NFC_INPUT", params={'nfc_id': nfc_id if 'nfc_id' in locals() else 'unknown'}, success=False, details=f"Error: {str(e)}") return jsonify({'success': False, 'message': f'NFC input failed: {str(e)}'}), 500 @self.app.route('/api/stats', methods=['GET']) def api_get_stats(): """Get statistics""" try: stats = self.get_current_stats() return jsonify(stats) except Exception as e: self.logger.error(f"Stats API error: {e}") return jsonify({'error': 'Stats unavailable'}), 500 def setup_socketio_handlers(self): """Setup SocketIO event handlers""" @self.socketio.on('connect') def handle_connect(): """Handle client connection""" self.logger.info("Web client connected") self.log_web_command("CONNECT", source="socketio", details="Client connected") # Send initial stats immediately emit('stats_update', self.get_current_stats()) @self.socketio.on('disconnect') def handle_disconnect(): """Handle client disconnection""" self.logger.info("Web client disconnected") self.log_web_command("DISCONNECT", source="socketio", details="Client disconnected") def create_clean_html(self): """Create clean HTML interface without logs""" return '''