#!/usr/bin/env python3 """ Network Monitoring Script for OPNsense/TP-Link Diagnostics Monitors multiple IPs for connectivity issues over a specified duration """ import subprocess import time import datetime from collections import defaultdict import sys # Configuration MONITOR_IPS = { "10.0.0.254": "OPNsense Firewall", "10.0.0.253": "TP-Link AX5400 (253)", "10.0.0.252": "TP-Link Device (252)", "10.0.0.55": "DNS/Other Service" } PING_INTERVAL = 5 # seconds between ping checks MONITOR_DURATION = 3600 # 1 hour in seconds PING_TIMEOUT = 2 # seconds to wait for ping response PING_COUNT = 1 # number of pings per check class NetworkMonitor: def __init__(self, ips, duration, interval): self.ips = ips self.duration = duration self.interval = interval self.start_time = None self.end_time = None # Statistics tracking self.stats = {ip: { 'total_checks': 0, 'successes': 0, 'failures': 0, 'failure_times': [], 'consecutive_failures': 0, 'max_consecutive_failures': 0, 'last_status': None } for ip in ips.keys()} def ping(self, ip): """Ping an IP address and return True if successful""" try: result = subprocess.run( ['ping', '-c', str(PING_COUNT), '-W', str(PING_TIMEOUT), ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=PING_TIMEOUT + 1 ) return result.returncode == 0 except (subprocess.TimeoutExpired, Exception): return False def check_all_ips(self): """Check all monitored IPs""" results = {} for ip, name in self.ips.items(): success = self.ping(ip) results[ip] = success # Update statistics self.stats[ip]['total_checks'] += 1 if success: self.stats[ip]['successes'] += 1 self.stats[ip]['consecutive_failures'] = 0 self.stats[ip]['last_status'] = 'UP' else: self.stats[ip]['failures'] += 1 self.stats[ip]['consecutive_failures'] += 1 self.stats[ip]['failure_times'].append(datetime.datetime.now()) self.stats[ip]['last_status'] = 'DOWN' # Track max consecutive failures if self.stats[ip]['consecutive_failures'] > self.stats[ip]['max_consecutive_failures']: self.stats[ip]['max_consecutive_failures'] = self.stats[ip]['consecutive_failures'] return results def print_status(self, elapsed_time, results): """Print current status update""" clear_screen() print("=" * 80) print(f"Network Connectivity Monitor - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 80) print(f"\nMonitoring Duration: {elapsed_time:.0f}s / {self.duration}s ({(elapsed_time/self.duration)*100:.1f}%)") print(f"Next check in: {self.interval}s\n") print("-" * 80) print(f"{'IP Address':<15} {'Device':<25} {'Status':<10} {'Success Rate':<15}") print("-" * 80) for ip, name in self.ips.items(): status = "🟢 UP" if results[ip] else "šŸ”“ DOWN" total = self.stats[ip]['total_checks'] successes = self.stats[ip]['successes'] rate = (successes / total * 100) if total > 0 else 0 print(f"{ip:<15} {name:<25} {status:<10} {rate:>6.2f}% ({successes}/{total})") print("-" * 80) # Show any current issues issues = [ip for ip, result in results.items() if not result] if issues: print("\nāš ļø CURRENT ISSUES:") for ip in issues: consecutive = self.stats[ip]['consecutive_failures'] print(f" • {ip} ({self.ips[ip]}): {consecutive} consecutive failures") else: print("\nāœ… All monitored devices are UP") print() def print_final_report(self): """Print final summary report""" clear_screen() print("=" * 80) print("FINAL MONITORING REPORT") print("=" * 80) print(f"Start Time: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") print(f"End Time: {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}") print(f"Duration: {(self.end_time - self.start_time).total_seconds():.0f} seconds") print("=" * 80) print() # Overall summary print("DEVICE SUMMARY:") print("-" * 80) print(f"{'IP Address':<15} {'Device':<25} {'Success Rate':<15} {'Total Failures':<15}") print("-" * 80) for ip, name in self.ips.items(): stats = self.stats[ip] rate = (stats['successes'] / stats['total_checks'] * 100) if stats['total_checks'] > 0 else 0 status_icon = "āœ…" if stats['failures'] == 0 else ("āš ļø" if rate >= 95 else "āŒ") print(f"{ip:<15} {name:<25} {status_icon} {rate:>6.2f}% {stats['failures']:>14}") print("-" * 80) print() # Detailed statistics print("DETAILED STATISTICS:") print("=" * 80) for ip, name in self.ips.items(): stats = self.stats[ip] print(f"\n{name} ({ip}):") print(f" Total Checks: {stats['total_checks']}") print(f" Successful: {stats['successes']} ({stats['successes']/stats['total_checks']*100:.2f}%)") print(f" Failed: {stats['failures']} ({stats['failures']/stats['total_checks']*100:.2f}%)") print(f" Max Consecutive Failures: {stats['max_consecutive_failures']}") if stats['failure_times']: print(f" First Failure: {stats['failure_times'][0].strftime('%H:%M:%S')}") print(f" Last Failure: {stats['failure_times'][-1].strftime('%H:%M:%S')}") # Group failures into incidents (gaps > 30 seconds = separate incident) incidents = [] current_incident = [stats['failure_times'][0]] for i in range(1, len(stats['failure_times'])): time_gap = (stats['failure_times'][i] - stats['failure_times'][i-1]).total_seconds() if time_gap <= 30: # Same incident current_incident.append(stats['failure_times'][i]) else: # New incident incidents.append(current_incident) current_incident = [stats['failure_times'][i]] incidents.append(current_incident) print(f" Failure Incidents: {len(incidents)}") if len(incidents) <= 10: # Only show details if not too many print(f"\n Failure Timeline:") for idx, incident in enumerate(incidents, 1): start_time = incident[0].strftime('%H:%M:%S') end_time = incident[-1].strftime('%H:%M:%S') duration = (incident[-1] - incident[0]).total_seconds() print(f" Incident {idx}: {start_time} - {end_time} ({len(incident)} failures, {duration:.0f}s)") print("\n" + "=" * 80) # Overall health assessment print("\nOVERALL ASSESSMENT:") all_good = all(self.stats[ip]['failures'] == 0 for ip in self.ips.keys()) if all_good: print("āœ… EXCELLENT: All devices maintained 100% uptime during monitoring period!") else: problem_devices = [ip for ip in self.ips.keys() if self.stats[ip]['failures'] > 0] if len(problem_devices) == 1: print(f"āš ļø ISSUE ISOLATED: Only {self.ips[problem_devices[0]]} ({problem_devices[0]}) had connectivity issues.") print(" This suggests a device-specific problem rather than network-wide issue.") else: print(f"āŒ MULTIPLE ISSUES: {len(problem_devices)} devices experienced connectivity problems.") print(" This may indicate a broader network issue.") print("=" * 80) def run(self): """Run the monitoring loop""" print("Starting network monitoring...") print(f"Monitoring {len(self.ips)} devices for {self.duration} seconds ({self.duration/60:.0f} minutes)") print("Press Ctrl+C to stop early\n") time.sleep(2) self.start_time = datetime.datetime.now() elapsed = 0 try: while elapsed < self.duration: results = self.check_all_ips() self.print_status(elapsed, results) time.sleep(self.interval) elapsed = (datetime.datetime.now() - self.start_time).total_seconds() self.end_time = datetime.datetime.now() except KeyboardInterrupt: print("\n\nMonitoring stopped by user.") self.end_time = datetime.datetime.now() # Print final report self.print_final_report() def clear_screen(): """Clear the terminal screen""" print("\033[H\033[J", end="") def main(): print("=" * 80) print("Network Monitoring Script for OPNsense/TP-Link Diagnostics") print("=" * 80) print(f"\nConfigured to monitor:") for ip, name in MONITOR_IPS.items(): print(f" • {ip:<15} - {name}") print(f"\nMonitoring interval: {PING_INTERVAL} seconds") print(f"Total duration: {MONITOR_DURATION} seconds ({MONITOR_DURATION/60:.0f} minutes)") print(f"Ping timeout: {PING_TIMEOUT} seconds") print("\nStarting in 3 seconds...") time.sleep(3) monitor = NetworkMonitor(MONITOR_IPS, MONITOR_DURATION, PING_INTERVAL) monitor.run() if __name__ == "__main__": main()