Initial commit: Network monitoring script for OPNsense/TP-Link diagnostics

This commit is contained in:
2026-01-07 17:12:47 +11:00
commit 751b8eb487

259
network_monitor.py Normal file
View File

@@ -0,0 +1,259 @@
#!/usr/bin/env python3
"""
Network Monitoring Script for OPNsense/TP-Link Diagnostics
Monitors multiple IPs for connectivity issues over a specified duration
"""
import subprocess
import time
import datetime
from collections import defaultdict
import sys
# Configuration
MONITOR_IPS = {
"10.0.0.254": "OPNsense Firewall",
"10.0.0.253": "TP-Link AX5400 (253)",
"10.0.0.252": "TP-Link Device (252)",
"10.0.0.55": "DNS/Other Service"
}
PING_INTERVAL = 5 # seconds between ping checks
MONITOR_DURATION = 3600 # 1 hour in seconds
PING_TIMEOUT = 2 # seconds to wait for ping response
PING_COUNT = 1 # number of pings per check
class NetworkMonitor:
def __init__(self, ips, duration, interval):
self.ips = ips
self.duration = duration
self.interval = interval
self.start_time = None
self.end_time = None
# Statistics tracking
self.stats = {ip: {
'total_checks': 0,
'successes': 0,
'failures': 0,
'failure_times': [],
'consecutive_failures': 0,
'max_consecutive_failures': 0,
'last_status': None
} for ip in ips.keys()}
def ping(self, ip):
"""Ping an IP address and return True if successful"""
try:
result = subprocess.run(
['ping', '-c', str(PING_COUNT), '-W', str(PING_TIMEOUT), ip],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=PING_TIMEOUT + 1
)
return result.returncode == 0
except (subprocess.TimeoutExpired, Exception):
return False
def check_all_ips(self):
"""Check all monitored IPs"""
results = {}
for ip, name in self.ips.items():
success = self.ping(ip)
results[ip] = success
# Update statistics
self.stats[ip]['total_checks'] += 1
if success:
self.stats[ip]['successes'] += 1
self.stats[ip]['consecutive_failures'] = 0
self.stats[ip]['last_status'] = 'UP'
else:
self.stats[ip]['failures'] += 1
self.stats[ip]['consecutive_failures'] += 1
self.stats[ip]['failure_times'].append(datetime.datetime.now())
self.stats[ip]['last_status'] = 'DOWN'
# Track max consecutive failures
if self.stats[ip]['consecutive_failures'] > self.stats[ip]['max_consecutive_failures']:
self.stats[ip]['max_consecutive_failures'] = self.stats[ip]['consecutive_failures']
return results
def print_status(self, elapsed_time, results):
"""Print current status update"""
clear_screen()
print("=" * 80)
print(f"Network Connectivity Monitor - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
print(f"\nMonitoring Duration: {elapsed_time:.0f}s / {self.duration}s ({(elapsed_time/self.duration)*100:.1f}%)")
print(f"Next check in: {self.interval}s\n")
print("-" * 80)
print(f"{'IP Address':<15} {'Device':<25} {'Status':<10} {'Success Rate':<15}")
print("-" * 80)
for ip, name in self.ips.items():
status = "🟢 UP" if results[ip] else "🔴 DOWN"
total = self.stats[ip]['total_checks']
successes = self.stats[ip]['successes']
rate = (successes / total * 100) if total > 0 else 0
print(f"{ip:<15} {name:<25} {status:<10} {rate:>6.2f}% ({successes}/{total})")
print("-" * 80)
# Show any current issues
issues = [ip for ip, result in results.items() if not result]
if issues:
print("\n⚠️ CURRENT ISSUES:")
for ip in issues:
consecutive = self.stats[ip]['consecutive_failures']
print(f"{ip} ({self.ips[ip]}): {consecutive} consecutive failures")
else:
print("\n✅ All monitored devices are UP")
print()
def print_final_report(self):
"""Print final summary report"""
clear_screen()
print("=" * 80)
print("FINAL MONITORING REPORT")
print("=" * 80)
print(f"Start Time: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"End Time: {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Duration: {(self.end_time - self.start_time).total_seconds():.0f} seconds")
print("=" * 80)
print()
# Overall summary
print("DEVICE SUMMARY:")
print("-" * 80)
print(f"{'IP Address':<15} {'Device':<25} {'Success Rate':<15} {'Total Failures':<15}")
print("-" * 80)
for ip, name in self.ips.items():
stats = self.stats[ip]
rate = (stats['successes'] / stats['total_checks'] * 100) if stats['total_checks'] > 0 else 0
status_icon = "" if stats['failures'] == 0 else ("⚠️" if rate >= 95 else "")
print(f"{ip:<15} {name:<25} {status_icon} {rate:>6.2f}% {stats['failures']:>14}")
print("-" * 80)
print()
# Detailed statistics
print("DETAILED STATISTICS:")
print("=" * 80)
for ip, name in self.ips.items():
stats = self.stats[ip]
print(f"\n{name} ({ip}):")
print(f" Total Checks: {stats['total_checks']}")
print(f" Successful: {stats['successes']} ({stats['successes']/stats['total_checks']*100:.2f}%)")
print(f" Failed: {stats['failures']} ({stats['failures']/stats['total_checks']*100:.2f}%)")
print(f" Max Consecutive Failures: {stats['max_consecutive_failures']}")
if stats['failure_times']:
print(f" First Failure: {stats['failure_times'][0].strftime('%H:%M:%S')}")
print(f" Last Failure: {stats['failure_times'][-1].strftime('%H:%M:%S')}")
# Group failures into incidents (gaps > 30 seconds = separate incident)
incidents = []
current_incident = [stats['failure_times'][0]]
for i in range(1, len(stats['failure_times'])):
time_gap = (stats['failure_times'][i] - stats['failure_times'][i-1]).total_seconds()
if time_gap <= 30: # Same incident
current_incident.append(stats['failure_times'][i])
else: # New incident
incidents.append(current_incident)
current_incident = [stats['failure_times'][i]]
incidents.append(current_incident)
print(f" Failure Incidents: {len(incidents)}")
if len(incidents) <= 10: # Only show details if not too many
print(f"\n Failure Timeline:")
for idx, incident in enumerate(incidents, 1):
start_time = incident[0].strftime('%H:%M:%S')
end_time = incident[-1].strftime('%H:%M:%S')
duration = (incident[-1] - incident[0]).total_seconds()
print(f" Incident {idx}: {start_time} - {end_time} ({len(incident)} failures, {duration:.0f}s)")
print("\n" + "=" * 80)
# Overall health assessment
print("\nOVERALL ASSESSMENT:")
all_good = all(self.stats[ip]['failures'] == 0 for ip in self.ips.keys())
if all_good:
print("✅ EXCELLENT: All devices maintained 100% uptime during monitoring period!")
else:
problem_devices = [ip for ip in self.ips.keys() if self.stats[ip]['failures'] > 0]
if len(problem_devices) == 1:
print(f"⚠️ ISSUE ISOLATED: Only {self.ips[problem_devices[0]]} ({problem_devices[0]}) had connectivity issues.")
print(" This suggests a device-specific problem rather than network-wide issue.")
else:
print(f"❌ MULTIPLE ISSUES: {len(problem_devices)} devices experienced connectivity problems.")
print(" This may indicate a broader network issue.")
print("=" * 80)
def run(self):
"""Run the monitoring loop"""
print("Starting network monitoring...")
print(f"Monitoring {len(self.ips)} devices for {self.duration} seconds ({self.duration/60:.0f} minutes)")
print("Press Ctrl+C to stop early\n")
time.sleep(2)
self.start_time = datetime.datetime.now()
elapsed = 0
try:
while elapsed < self.duration:
results = self.check_all_ips()
self.print_status(elapsed, results)
time.sleep(self.interval)
elapsed = (datetime.datetime.now() - self.start_time).total_seconds()
self.end_time = datetime.datetime.now()
except KeyboardInterrupt:
print("\n\nMonitoring stopped by user.")
self.end_time = datetime.datetime.now()
# Print final report
self.print_final_report()
def clear_screen():
"""Clear the terminal screen"""
print("\033[H\033[J", end="")
def main():
print("=" * 80)
print("Network Monitoring Script for OPNsense/TP-Link Diagnostics")
print("=" * 80)
print(f"\nConfigured to monitor:")
for ip, name in MONITOR_IPS.items():
print(f"{ip:<15} - {name}")
print(f"\nMonitoring interval: {PING_INTERVAL} seconds")
print(f"Total duration: {MONITOR_DURATION} seconds ({MONITOR_DURATION/60:.0f} minutes)")
print(f"Ping timeout: {PING_TIMEOUT} seconds")
print("\nStarting in 3 seconds...")
time.sleep(3)
monitor = NetworkMonitor(MONITOR_IPS, MONITOR_DURATION, PING_INTERVAL)
monitor.run()
if __name__ == "__main__":
main()