commit 751b8eb487425301f2fff0fec2c43467f8b82d62 Author: jessikitty Date: Wed Jan 7 17:12:47 2026 +1100 Initial commit: Network monitoring script for OPNsense/TP-Link diagnostics diff --git a/network_monitor.py b/network_monitor.py new file mode 100644 index 0000000..b8997a2 --- /dev/null +++ b/network_monitor.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +""" +Network Monitoring Script for OPNsense/TP-Link Diagnostics +Monitors multiple IPs for connectivity issues over a specified duration +""" + +import subprocess +import time +import datetime +from collections import defaultdict +import sys + +# Configuration +MONITOR_IPS = { + "10.0.0.254": "OPNsense Firewall", + "10.0.0.253": "TP-Link AX5400 (253)", + "10.0.0.252": "TP-Link Device (252)", + "10.0.0.55": "DNS/Other Service" +} + +PING_INTERVAL = 5 # seconds between ping checks +MONITOR_DURATION = 3600 # 1 hour in seconds +PING_TIMEOUT = 2 # seconds to wait for ping response +PING_COUNT = 1 # number of pings per check + + +class NetworkMonitor: + def __init__(self, ips, duration, interval): + self.ips = ips + self.duration = duration + self.interval = interval + self.start_time = None + self.end_time = None + + # Statistics tracking + self.stats = {ip: { + 'total_checks': 0, + 'successes': 0, + 'failures': 0, + 'failure_times': [], + 'consecutive_failures': 0, + 'max_consecutive_failures': 0, + 'last_status': None + } for ip in ips.keys()} + + def ping(self, ip): + """Ping an IP address and return True if successful""" + try: + result = subprocess.run( + ['ping', '-c', str(PING_COUNT), '-W', str(PING_TIMEOUT), ip], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=PING_TIMEOUT + 1 + ) + return result.returncode == 0 + except (subprocess.TimeoutExpired, Exception): + return False + + def check_all_ips(self): + """Check all monitored IPs""" + results = {} + for ip, name in self.ips.items(): + success = self.ping(ip) + results[ip] = success + + # Update statistics + self.stats[ip]['total_checks'] += 1 + + if success: + self.stats[ip]['successes'] += 1 + self.stats[ip]['consecutive_failures'] = 0 + self.stats[ip]['last_status'] = 'UP' + else: + self.stats[ip]['failures'] += 1 + self.stats[ip]['consecutive_failures'] += 1 + self.stats[ip]['failure_times'].append(datetime.datetime.now()) + self.stats[ip]['last_status'] = 'DOWN' + + # Track max consecutive failures + if self.stats[ip]['consecutive_failures'] > self.stats[ip]['max_consecutive_failures']: + self.stats[ip]['max_consecutive_failures'] = self.stats[ip]['consecutive_failures'] + + return results + + def print_status(self, elapsed_time, results): + """Print current status update""" + clear_screen() + print("=" * 80) + print(f"Network Connectivity Monitor - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("=" * 80) + print(f"\nMonitoring Duration: {elapsed_time:.0f}s / {self.duration}s ({(elapsed_time/self.duration)*100:.1f}%)") + print(f"Next check in: {self.interval}s\n") + + print("-" * 80) + print(f"{'IP Address':<15} {'Device':<25} {'Status':<10} {'Success Rate':<15}") + print("-" * 80) + + for ip, name in self.ips.items(): + status = "🟢 UP" if results[ip] else "šŸ”“ DOWN" + total = self.stats[ip]['total_checks'] + successes = self.stats[ip]['successes'] + rate = (successes / total * 100) if total > 0 else 0 + + print(f"{ip:<15} {name:<25} {status:<10} {rate:>6.2f}% ({successes}/{total})") + + print("-" * 80) + + # Show any current issues + issues = [ip for ip, result in results.items() if not result] + if issues: + print("\nāš ļø CURRENT ISSUES:") + for ip in issues: + consecutive = self.stats[ip]['consecutive_failures'] + print(f" • {ip} ({self.ips[ip]}): {consecutive} consecutive failures") + else: + print("\nāœ… All monitored devices are UP") + + print() + + def print_final_report(self): + """Print final summary report""" + clear_screen() + print("=" * 80) + print("FINAL MONITORING REPORT") + print("=" * 80) + print(f"Start Time: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"End Time: {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Duration: {(self.end_time - self.start_time).total_seconds():.0f} seconds") + print("=" * 80) + print() + + # Overall summary + print("DEVICE SUMMARY:") + print("-" * 80) + print(f"{'IP Address':<15} {'Device':<25} {'Success Rate':<15} {'Total Failures':<15}") + print("-" * 80) + + for ip, name in self.ips.items(): + stats = self.stats[ip] + rate = (stats['successes'] / stats['total_checks'] * 100) if stats['total_checks'] > 0 else 0 + + status_icon = "āœ…" if stats['failures'] == 0 else ("āš ļø" if rate >= 95 else "āŒ") + print(f"{ip:<15} {name:<25} {status_icon} {rate:>6.2f}% {stats['failures']:>14}") + + print("-" * 80) + print() + + # Detailed statistics + print("DETAILED STATISTICS:") + print("=" * 80) + + for ip, name in self.ips.items(): + stats = self.stats[ip] + print(f"\n{name} ({ip}):") + print(f" Total Checks: {stats['total_checks']}") + print(f" Successful: {stats['successes']} ({stats['successes']/stats['total_checks']*100:.2f}%)") + print(f" Failed: {stats['failures']} ({stats['failures']/stats['total_checks']*100:.2f}%)") + print(f" Max Consecutive Failures: {stats['max_consecutive_failures']}") + + if stats['failure_times']: + print(f" First Failure: {stats['failure_times'][0].strftime('%H:%M:%S')}") + print(f" Last Failure: {stats['failure_times'][-1].strftime('%H:%M:%S')}") + + # Group failures into incidents (gaps > 30 seconds = separate incident) + incidents = [] + current_incident = [stats['failure_times'][0]] + + for i in range(1, len(stats['failure_times'])): + time_gap = (stats['failure_times'][i] - stats['failure_times'][i-1]).total_seconds() + if time_gap <= 30: # Same incident + current_incident.append(stats['failure_times'][i]) + else: # New incident + incidents.append(current_incident) + current_incident = [stats['failure_times'][i]] + incidents.append(current_incident) + + print(f" Failure Incidents: {len(incidents)}") + + if len(incidents) <= 10: # Only show details if not too many + print(f"\n Failure Timeline:") + for idx, incident in enumerate(incidents, 1): + start_time = incident[0].strftime('%H:%M:%S') + end_time = incident[-1].strftime('%H:%M:%S') + duration = (incident[-1] - incident[0]).total_seconds() + print(f" Incident {idx}: {start_time} - {end_time} ({len(incident)} failures, {duration:.0f}s)") + + print("\n" + "=" * 80) + + # Overall health assessment + print("\nOVERALL ASSESSMENT:") + all_good = all(self.stats[ip]['failures'] == 0 for ip in self.ips.keys()) + + if all_good: + print("āœ… EXCELLENT: All devices maintained 100% uptime during monitoring period!") + else: + problem_devices = [ip for ip in self.ips.keys() if self.stats[ip]['failures'] > 0] + + if len(problem_devices) == 1: + print(f"āš ļø ISSUE ISOLATED: Only {self.ips[problem_devices[0]]} ({problem_devices[0]}) had connectivity issues.") + print(" This suggests a device-specific problem rather than network-wide issue.") + else: + print(f"āŒ MULTIPLE ISSUES: {len(problem_devices)} devices experienced connectivity problems.") + print(" This may indicate a broader network issue.") + + print("=" * 80) + + def run(self): + """Run the monitoring loop""" + print("Starting network monitoring...") + print(f"Monitoring {len(self.ips)} devices for {self.duration} seconds ({self.duration/60:.0f} minutes)") + print("Press Ctrl+C to stop early\n") + time.sleep(2) + + self.start_time = datetime.datetime.now() + elapsed = 0 + + try: + while elapsed < self.duration: + results = self.check_all_ips() + self.print_status(elapsed, results) + + time.sleep(self.interval) + elapsed = (datetime.datetime.now() - self.start_time).total_seconds() + + self.end_time = datetime.datetime.now() + + except KeyboardInterrupt: + print("\n\nMonitoring stopped by user.") + self.end_time = datetime.datetime.now() + + # Print final report + self.print_final_report() + + +def clear_screen(): + """Clear the terminal screen""" + print("\033[H\033[J", end="") + + +def main(): + print("=" * 80) + print("Network Monitoring Script for OPNsense/TP-Link Diagnostics") + print("=" * 80) + print(f"\nConfigured to monitor:") + for ip, name in MONITOR_IPS.items(): + print(f" • {ip:<15} - {name}") + + print(f"\nMonitoring interval: {PING_INTERVAL} seconds") + print(f"Total duration: {MONITOR_DURATION} seconds ({MONITOR_DURATION/60:.0f} minutes)") + print(f"Ping timeout: {PING_TIMEOUT} seconds") + print("\nStarting in 3 seconds...") + time.sleep(3) + + monitor = NetworkMonitor(MONITOR_IPS, MONITOR_DURATION, PING_INTERVAL) + monitor.run() + + +if __name__ == "__main__": + main()