Fix global input capture - NFC scans now work regardless of window focus
This commit is contained in:
@@ -1 +1,442 @@
|
||||
$(cat /tmp/nfc_client_extracted/nfc_network_client.py | base64 -w 0)
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
NFC Network Client with Global Input Capture
|
||||
Moonlight Drive-In Theater System
|
||||
|
||||
This client captures NFC scans using global keyboard capture (works even without focus)
|
||||
and forwards them to the main server over the network.
|
||||
"""
|
||||
|
||||
import tkinter as tk
|
||||
from tkinter import ttk, scrolledtext
|
||||
import requests
|
||||
import socket
|
||||
import time
|
||||
import threading
|
||||
from typing import Optional, Dict, List
|
||||
from datetime import datetime
|
||||
from pynput import keyboard
|
||||
import sys
|
||||
|
||||
# Version and identification
|
||||
CLIENT_VERSION = "1.1.0"
|
||||
CLIENT_NAME = "NFC Network Client"
|
||||
|
||||
# Server discovery and connection settings
|
||||
DEFAULT_PORTS = [5001, 5000]
|
||||
SCAN_TIMEOUT = 3.0
|
||||
MIN_TAG_LENGTH = 4
|
||||
MAX_TAG_LENGTH = 32
|
||||
|
||||
# Network timeouts
|
||||
CONNECTION_TIMEOUT = 2
|
||||
REQUEST_TIMEOUT = 5
|
||||
|
||||
|
||||
class NFCNetworkClient:
|
||||
"""Network client for remote NFC scanning with global input capture"""
|
||||
|
||||
def __init__(self, root: tk.Tk):
|
||||
self.root = root
|
||||
self.root.title(f"{CLIENT_NAME} v{CLIENT_VERSION}")
|
||||
self.root.geometry("700x600")
|
||||
|
||||
# Connection state
|
||||
self.server_url: Optional[str] = None
|
||||
self.connected = False
|
||||
self.auto_connect = True
|
||||
|
||||
# NFC scanning state
|
||||
self.current_scan = []
|
||||
self.scan_start_time = None
|
||||
self.is_scanning = False
|
||||
self.last_scan_time = 0
|
||||
|
||||
# Global keyboard listener
|
||||
self.keyboard_listener: Optional[keyboard.Listener] = None
|
||||
self.listener_running = False
|
||||
|
||||
# UI setup
|
||||
self.setup_ui()
|
||||
|
||||
# Start keyboard listener immediately
|
||||
self.start_keyboard_listener()
|
||||
|
||||
# Auto-connect on startup
|
||||
if self.auto_connect:
|
||||
self.root.after(500, self.connect_to_server)
|
||||
|
||||
def setup_ui(self):
|
||||
"""Setup the user interface"""
|
||||
# Main container
|
||||
main_frame = ttk.Frame(self.root, padding="10")
|
||||
main_frame.grid(row=0, column=0, sticky="nsew")
|
||||
self.root.grid_rowconfigure(0, weight=1)
|
||||
self.root.grid_columnconfigure(0, weight=1)
|
||||
|
||||
# Title
|
||||
title_label = ttk.Label(
|
||||
main_frame,
|
||||
text=f"{CLIENT_NAME} v{CLIENT_VERSION}",
|
||||
font=("Arial", 14, "bold")
|
||||
)
|
||||
title_label.grid(row=0, column=0, columnspan=2, pady=(0, 10))
|
||||
|
||||
# Status frame
|
||||
status_frame = ttk.LabelFrame(main_frame, text="Connection Status", padding="10")
|
||||
status_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(0, 10))
|
||||
|
||||
self.status_label = ttk.Label(
|
||||
status_frame,
|
||||
text="Disconnected",
|
||||
font=("Arial", 11)
|
||||
)
|
||||
self.status_label.pack()
|
||||
|
||||
# Server info frame
|
||||
server_frame = ttk.LabelFrame(main_frame, text="Server Configuration", padding="10")
|
||||
server_frame.grid(row=2, column=0, columnspan=2, sticky="ew", pady=(0, 10))
|
||||
|
||||
ttk.Label(server_frame, text="Server Address:").grid(row=0, column=0, sticky="w")
|
||||
self.server_entry = ttk.Entry(server_frame, width=40)
|
||||
self.server_entry.grid(row=0, column=1, padx=(10, 0))
|
||||
self.server_entry.insert(0, "moonlight-drivein")
|
||||
|
||||
# Control buttons
|
||||
btn_frame = ttk.Frame(main_frame)
|
||||
btn_frame.grid(row=3, column=0, columnspan=2, pady=(0, 10))
|
||||
|
||||
self.connect_btn = ttk.Button(
|
||||
btn_frame,
|
||||
text="Connect",
|
||||
command=self.connect_to_server
|
||||
)
|
||||
self.connect_btn.pack(side="left", padx=5)
|
||||
|
||||
self.disconnect_btn = ttk.Button(
|
||||
btn_frame,
|
||||
text="Disconnect",
|
||||
command=self.disconnect_from_server,
|
||||
state="disabled"
|
||||
)
|
||||
self.disconnect_btn.pack(side="left", padx=5)
|
||||
|
||||
# Listener status frame
|
||||
listener_frame = ttk.LabelFrame(main_frame, text="Global Input Capture", padding="10")
|
||||
listener_frame.grid(row=4, column=0, columnspan=2, sticky="ew", pady=(0, 10))
|
||||
|
||||
self.listener_label = ttk.Label(
|
||||
listener_frame,
|
||||
text="Initializing...",
|
||||
font=("Arial", 10)
|
||||
)
|
||||
self.listener_label.pack()
|
||||
|
||||
listener_info = ttk.Label(
|
||||
listener_frame,
|
||||
text="NFC scans will be captured even when this window is not focused",
|
||||
font=("Arial", 9),
|
||||
foreground="gray"
|
||||
)
|
||||
listener_info.pack()
|
||||
|
||||
# Log frame
|
||||
log_frame = ttk.LabelFrame(main_frame, text="Activity Log", padding="10")
|
||||
log_frame.grid(row=5, column=0, columnspan=2, sticky="nsew", pady=(0, 10))
|
||||
main_frame.grid_rowconfigure(5, weight=1)
|
||||
|
||||
self.log_text = scrolledtext.ScrolledText(
|
||||
log_frame,
|
||||
height=15,
|
||||
wrap=tk.WORD,
|
||||
state="disabled",
|
||||
font=("Courier", 9)
|
||||
)
|
||||
self.log_text.pack(fill="both", expand=True)
|
||||
|
||||
# Bind window close event
|
||||
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
|
||||
|
||||
def log(self, message: str, level: str = "INFO"):
|
||||
"""Add message to log with timestamp"""
|
||||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||||
formatted = f"[{timestamp}] [{level}] {message}\n"
|
||||
|
||||
self.log_text.config(state="normal")
|
||||
self.log_text.insert(tk.END, formatted)
|
||||
self.log_text.see(tk.END)
|
||||
self.log_text.config(state="disabled")
|
||||
|
||||
# Print to console as well
|
||||
print(formatted.strip())
|
||||
|
||||
def update_status(self, text: str, color: str = "black"):
|
||||
"""Update connection status display"""
|
||||
self.status_label.config(text=text, foreground=color)
|
||||
|
||||
def start_keyboard_listener(self):
|
||||
"""Start global keyboard listener"""
|
||||
try:
|
||||
if self.keyboard_listener is not None:
|
||||
self.stop_keyboard_listener()
|
||||
|
||||
self.keyboard_listener = keyboard.Listener(
|
||||
on_press=self.on_key_press
|
||||
)
|
||||
self.keyboard_listener.start()
|
||||
self.listener_running = True
|
||||
|
||||
self.listener_label.config(
|
||||
text="Active - Capturing NFC scans globally",
|
||||
foreground="green"
|
||||
)
|
||||
self.log("Global keyboard listener started", "SUCCESS")
|
||||
|
||||
except Exception as e:
|
||||
self.listener_label.config(
|
||||
text=f"Error: {str(e)}",
|
||||
foreground="red"
|
||||
)
|
||||
self.log(f"Failed to start keyboard listener: {e}", "ERROR")
|
||||
|
||||
def stop_keyboard_listener(self):
|
||||
"""Stop global keyboard listener"""
|
||||
if self.keyboard_listener is not None:
|
||||
try:
|
||||
self.keyboard_listener.stop()
|
||||
self.listener_running = False
|
||||
self.listener_label.config(
|
||||
text="Stopped",
|
||||
foreground="gray"
|
||||
)
|
||||
self.log("Global keyboard listener stopped", "INFO")
|
||||
except Exception as e:
|
||||
self.log(f"Error stopping keyboard listener: {e}", "ERROR")
|
||||
|
||||
def on_key_press(self, key):
|
||||
"""Handle global keyboard input"""
|
||||
try:
|
||||
current_time = time.time()
|
||||
|
||||
# Get the character
|
||||
char = None
|
||||
if hasattr(key, 'char') and key.char:
|
||||
char = key.char
|
||||
elif key == keyboard.Key.enter:
|
||||
# Enter key completes the scan
|
||||
if self.is_scanning and self.current_scan:
|
||||
self.complete_scan()
|
||||
return
|
||||
else:
|
||||
# Ignore other special keys during scanning
|
||||
if self.is_scanning:
|
||||
return
|
||||
return
|
||||
|
||||
# Start new scan if enough time has passed
|
||||
if not self.is_scanning:
|
||||
if current_time - self.last_scan_time > 1.0:
|
||||
self.start_scan()
|
||||
|
||||
# Add character to current scan
|
||||
if self.is_scanning and char:
|
||||
self.current_scan.append(char)
|
||||
|
||||
# Auto-complete if scan is taking too long
|
||||
if current_time - self.scan_start_time > SCAN_TIMEOUT:
|
||||
self.complete_scan()
|
||||
|
||||
except Exception as e:
|
||||
self.log(f"Error processing key: {e}", "ERROR")
|
||||
|
||||
def start_scan(self):
|
||||
"""Start a new NFC scan"""
|
||||
self.is_scanning = True
|
||||
self.current_scan = []
|
||||
self.scan_start_time = time.time()
|
||||
|
||||
def complete_scan(self):
|
||||
"""Complete current scan and send to server"""
|
||||
if not self.current_scan:
|
||||
self.is_scanning = False
|
||||
return
|
||||
|
||||
tag_uid = ''.join(self.current_scan).strip()
|
||||
self.current_scan = []
|
||||
self.is_scanning = False
|
||||
self.last_scan_time = time.time()
|
||||
|
||||
# Validate tag
|
||||
if len(tag_uid) < MIN_TAG_LENGTH or len(tag_uid) > MAX_TAG_LENGTH:
|
||||
self.log(f"Invalid tag length: {len(tag_uid)} chars", "WARN")
|
||||
return
|
||||
|
||||
# Send to server if connected
|
||||
if self.connected and self.server_url:
|
||||
self.send_tag_to_server(tag_uid)
|
||||
else:
|
||||
self.log(f"[SCAN] {tag_uid} (not connected)", "WARN")
|
||||
|
||||
def send_tag_to_server(self, tag_uid: str):
|
||||
"""Send scanned tag to server"""
|
||||
try:
|
||||
url = f"{self.server_url}/api/nfc_scan"
|
||||
data = {
|
||||
"tag_uid": tag_uid,
|
||||
"source": "network_client",
|
||||
"client_version": CLIENT_VERSION
|
||||
}
|
||||
|
||||
self.log(f"[SCAN] Sending tag: {tag_uid}")
|
||||
|
||||
response = requests.post(
|
||||
url,
|
||||
json=data,
|
||||
timeout=REQUEST_TIMEOUT
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
if result.get("success"):
|
||||
content_title = result.get("content_title", "Unknown")
|
||||
self.log(f"[SUCCESS] Playing: {content_title}", "SUCCESS")
|
||||
else:
|
||||
error = result.get("error", "Unknown error")
|
||||
self.log(f"[ERROR] {error}", "ERROR")
|
||||
else:
|
||||
self.log(f"[ERROR] Server returned status {response.status_code}", "ERROR")
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
self.log("[ERROR] Request timeout", "ERROR")
|
||||
except requests.exceptions.ConnectionError:
|
||||
self.log("[ERROR] Connection failed - server unreachable", "ERROR")
|
||||
self.disconnect_from_server()
|
||||
except Exception as e:
|
||||
self.log(f"[ERROR] Failed to send tag: {e}", "ERROR")
|
||||
|
||||
def try_connect(self, hostname: str) -> Optional[str]:
|
||||
"""Try to connect to server at hostname"""
|
||||
for port in DEFAULT_PORTS:
|
||||
try:
|
||||
url = f"http://{hostname}:{port}"
|
||||
response = requests.get(
|
||||
f"{url}/api/status",
|
||||
timeout=CONNECTION_TIMEOUT
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data.get("server_type") == "moonlight_drivein":
|
||||
return url
|
||||
except:
|
||||
continue
|
||||
|
||||
return None
|
||||
|
||||
def resolve_hostname(self, hostname: str) -> List[str]:
|
||||
"""Resolve hostname to list of addresses to try"""
|
||||
addresses = []
|
||||
|
||||
# Try as-is
|
||||
addresses.append(hostname)
|
||||
|
||||
# Try with .local for mDNS
|
||||
if not hostname.endswith(".local"):
|
||||
addresses.append(f"{hostname}.local")
|
||||
|
||||
# Try with Tailscale domain if looks like hostname
|
||||
if "." not in hostname:
|
||||
addresses.append(f"{hostname}.tail-scale.ts.net")
|
||||
|
||||
return addresses
|
||||
|
||||
def connect_to_server(self):
|
||||
"""Connect to the server"""
|
||||
hostname = self.server_entry.get().strip()
|
||||
if not hostname:
|
||||
self.log("Please enter a server address", "ERROR")
|
||||
return
|
||||
|
||||
self.log(f"Attempting connection to: {hostname}")
|
||||
self.update_status("Connecting...", "orange")
|
||||
self.connect_btn.config(state="disabled")
|
||||
|
||||
# Try in background thread
|
||||
def connect_thread():
|
||||
addresses = self.resolve_hostname(hostname)
|
||||
|
||||
for addr in addresses:
|
||||
self.log(f"Trying: {addr}")
|
||||
url = self.try_connect(addr)
|
||||
|
||||
if url:
|
||||
self.root.after(0, lambda: self.connection_success(url))
|
||||
return
|
||||
|
||||
self.root.after(0, self.connection_failed)
|
||||
|
||||
threading.Thread(target=connect_thread, daemon=True).start()
|
||||
|
||||
def connection_success(self, url: str):
|
||||
"""Handle successful connection"""
|
||||
self.server_url = url
|
||||
self.connected = True
|
||||
|
||||
self.update_status(f"Connected to {url}", "green")
|
||||
self.log(f"Connected successfully to {url}", "SUCCESS")
|
||||
|
||||
self.connect_btn.config(state="disabled")
|
||||
self.disconnect_btn.config(state="normal")
|
||||
self.server_entry.config(state="disabled")
|
||||
|
||||
def connection_failed(self):
|
||||
"""Handle connection failure"""
|
||||
self.update_status("Connection failed", "red")
|
||||
self.log("Failed to connect to server", "ERROR")
|
||||
self.connect_btn.config(state="normal")
|
||||
|
||||
def disconnect_from_server(self):
|
||||
"""Disconnect from server"""
|
||||
self.connected = False
|
||||
self.server_url = None
|
||||
|
||||
self.update_status("Disconnected", "gray")
|
||||
self.log("Disconnected from server", "INFO")
|
||||
|
||||
self.connect_btn.config(state="normal")
|
||||
self.disconnect_btn.config(state="disabled")
|
||||
self.server_entry.config(state="normal")
|
||||
|
||||
def on_closing(self):
|
||||
"""Handle window close event"""
|
||||
self.log("Shutting down...")
|
||||
|
||||
# Stop keyboard listener
|
||||
self.stop_keyboard_listener()
|
||||
|
||||
# Close window
|
||||
self.root.destroy()
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
print(f"\n{CLIENT_NAME} v{CLIENT_VERSION}")
|
||||
print("="* 50)
|
||||
print("Global Input Capture Enabled")
|
||||
print("NFC scans will be captured even when window is not focused")
|
||||
print("="* 50 + "\n")
|
||||
|
||||
root = tk.Tk()
|
||||
app = NFCNetworkClient(root)
|
||||
|
||||
try:
|
||||
root.mainloop()
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutdown requested...")
|
||||
finally:
|
||||
if hasattr(app, 'keyboard_listener') and app.keyboard_listener:
|
||||
app.keyboard_listener.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user