Add Python device discovery script
This commit is contained in:
333
opnsense_device_manager.py
Normal file
333
opnsense_device_manager.py
Normal file
@@ -0,0 +1,333 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
OPNsense Device Discovery and Management Script
|
||||||
|
Helps discover devices on the network and manage MAC-based internet blocking
|
||||||
|
"""
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
from typing import List, Dict
|
||||||
|
from urllib3.exceptions import InsecureRequestWarning
|
||||||
|
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||||||
|
|
||||||
|
class OPNsenseDeviceManager:
|
||||||
|
def __init__(self, host: str, api_key: str, api_secret: str):
|
||||||
|
self.host = host
|
||||||
|
self.api_key = api_key
|
||||||
|
self.api_secret = api_secret
|
||||||
|
self.base_url = f"https://{host}/api"
|
||||||
|
|
||||||
|
def _make_request(self, endpoint: str, method: str = "GET", data: dict = None):
|
||||||
|
"""Make API request to OPNsense"""
|
||||||
|
url = f"{self.base_url}/{endpoint}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
if method == "GET":
|
||||||
|
response = requests.get(
|
||||||
|
url,
|
||||||
|
auth=(self.api_key, self.api_secret),
|
||||||
|
verify=False,
|
||||||
|
timeout=10
|
||||||
|
)
|
||||||
|
elif method == "POST":
|
||||||
|
response = requests.post(
|
||||||
|
url,
|
||||||
|
auth=(self.api_key, self.api_secret),
|
||||||
|
json=data,
|
||||||
|
verify=False,
|
||||||
|
timeout=10
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error making request to {endpoint}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def discover_devices(self) -> List[Dict]:
|
||||||
|
"""Discover all devices from ARP table"""
|
||||||
|
result = self._make_request("diagnostics/interface/getArp")
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
return []
|
||||||
|
|
||||||
|
devices = []
|
||||||
|
seen_macs = set()
|
||||||
|
|
||||||
|
for entry in result.get('data', []):
|
||||||
|
mac = entry.get('mac', '')
|
||||||
|
ip = entry.get('ip', '')
|
||||||
|
manufacturer = entry.get('manufacturer', 'Unknown')
|
||||||
|
|
||||||
|
# Skip duplicates and the router itself
|
||||||
|
if mac in seen_macs or mac == 'a8:60:b6:34:13:e1':
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Skip MAC randomization placeholder
|
||||||
|
if mac == '7e:fe:ce:1a:4b:f0':
|
||||||
|
continue
|
||||||
|
|
||||||
|
seen_macs.add(mac)
|
||||||
|
|
||||||
|
device_info = {
|
||||||
|
'mac': mac,
|
||||||
|
'ip': ip,
|
||||||
|
'manufacturer': manufacturer,
|
||||||
|
'intf_description': entry.get('intf_description', 'LAN'),
|
||||||
|
'suggested_name': self._suggest_device_name(manufacturer, ip)
|
||||||
|
}
|
||||||
|
|
||||||
|
devices.append(device_info)
|
||||||
|
|
||||||
|
return sorted(devices, key=lambda x: x['ip'])
|
||||||
|
|
||||||
|
def _suggest_device_name(self, manufacturer: str, ip: str) -> str:
|
||||||
|
"""Suggest a device name based on manufacturer"""
|
||||||
|
manufacturer = manufacturer.lower()
|
||||||
|
|
||||||
|
if 'apple' in manufacturer:
|
||||||
|
return "Apple Device (iPhone/iPad/Mac)"
|
||||||
|
elif 'google' in manufacturer:
|
||||||
|
return "Google Device (Chromecast/Phone)"
|
||||||
|
elif 'nintendo' in manufacturer:
|
||||||
|
return "Nintendo Switch"
|
||||||
|
elif 'intel' in manufacturer:
|
||||||
|
return "Computer/Laptop"
|
||||||
|
elif 'asus' in manufacturer:
|
||||||
|
return "ASUS Device"
|
||||||
|
elif 'tp-link' in manufacturer:
|
||||||
|
return "TP-Link Device (AP/Camera)"
|
||||||
|
elif 'espressif' in manufacturer:
|
||||||
|
return "ESP Device (Smart Home)"
|
||||||
|
elif 'tuya' in manufacturer:
|
||||||
|
return "Tuya Smart Device"
|
||||||
|
elif 'proxmox' in manufacturer:
|
||||||
|
return "Proxmox Server"
|
||||||
|
else:
|
||||||
|
return f"Unknown Device ({ip})"
|
||||||
|
|
||||||
|
def create_firewall_alias(self, name: str, mac_addresses: List[str], description: str = "") -> bool:
|
||||||
|
"""Create or update a firewall alias for MAC addresses"""
|
||||||
|
# First, check if alias exists
|
||||||
|
aliases = self._make_request("firewall/alias/getItem")
|
||||||
|
|
||||||
|
alias_data = {
|
||||||
|
"alias": {
|
||||||
|
"enabled": "1",
|
||||||
|
"name": name,
|
||||||
|
"type": "mac",
|
||||||
|
"content": "\n".join(mac_addresses),
|
||||||
|
"description": description
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create or update
|
||||||
|
result = self._make_request(
|
||||||
|
f"firewall/alias/setItem",
|
||||||
|
method="POST",
|
||||||
|
data=alias_data
|
||||||
|
)
|
||||||
|
|
||||||
|
if result and result.get('result') == 'saved':
|
||||||
|
# Apply changes
|
||||||
|
self._make_request("firewall/alias/reconfigure", method="POST")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def create_block_rule(self, alias_name: str, description: str) -> bool:
|
||||||
|
"""Create a firewall rule to block internet access for an alias"""
|
||||||
|
rule_data = {
|
||||||
|
"rule": {
|
||||||
|
"enabled": "1",
|
||||||
|
"action": "block",
|
||||||
|
"interface": "lan",
|
||||||
|
"direction": "out",
|
||||||
|
"ipprotocol": "inet",
|
||||||
|
"protocol": "any",
|
||||||
|
"source_net": alias_name,
|
||||||
|
"destination_net": "any",
|
||||||
|
"destination_not": "1", # NOT LAN (so it blocks WAN only)
|
||||||
|
"description": description,
|
||||||
|
"log": "1"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = self._make_request(
|
||||||
|
"firewall/filter/addRule",
|
||||||
|
method="POST",
|
||||||
|
data=rule_data
|
||||||
|
)
|
||||||
|
|
||||||
|
if result and result.get('result') == 'saved':
|
||||||
|
# Apply filter rules
|
||||||
|
self._make_request("firewall/filter/apply", method="POST")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def list_aliases(self) -> List[Dict]:
|
||||||
|
"""List all firewall aliases"""
|
||||||
|
result = self._make_request("firewall/alias/get")
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
return []
|
||||||
|
|
||||||
|
aliases = []
|
||||||
|
alias_data = result.get('data', {}).get('alias', {}).get('aliases', {}).get('alias', {})
|
||||||
|
|
||||||
|
for alias_id, alias_info in alias_data.items():
|
||||||
|
if alias_info.get('type', {}).get('mac', {}).get('selected') == 1:
|
||||||
|
aliases.append({
|
||||||
|
'id': alias_id,
|
||||||
|
'name': alias_info.get('name'),
|
||||||
|
'description': alias_info.get('description'),
|
||||||
|
'content': alias_info.get('content', {})
|
||||||
|
})
|
||||||
|
|
||||||
|
return aliases
|
||||||
|
|
||||||
|
def generate_device_report(self) -> str:
|
||||||
|
"""Generate a formatted report of discovered devices"""
|
||||||
|
devices = self.discover_devices()
|
||||||
|
|
||||||
|
report = "=" * 80 + "\n"
|
||||||
|
report += "OPNSENSE NETWORK DEVICE DISCOVERY REPORT\n"
|
||||||
|
report += "=" * 80 + "\n\n"
|
||||||
|
|
||||||
|
report += f"Total Devices Found: {len(devices)}\n\n"
|
||||||
|
|
||||||
|
# Group by manufacturer
|
||||||
|
by_manufacturer = {}
|
||||||
|
for device in devices:
|
||||||
|
mfg = device['manufacturer'] or 'Unknown'
|
||||||
|
if mfg not in by_manufacturer:
|
||||||
|
by_manufacturer[mfg] = []
|
||||||
|
by_manufacturer[mfg].append(device)
|
||||||
|
|
||||||
|
for manufacturer, device_list in sorted(by_manufacturer.items()):
|
||||||
|
report += f"\n{manufacturer}:\n"
|
||||||
|
report += "-" * 80 + "\n"
|
||||||
|
|
||||||
|
for device in device_list:
|
||||||
|
report += f" MAC: {device['mac']}\n"
|
||||||
|
report += f" IP: {device['ip']}\n"
|
||||||
|
report += f" Suggested Name: {device['suggested_name']}\n"
|
||||||
|
report += "\n"
|
||||||
|
|
||||||
|
report += "=" * 80 + "\n"
|
||||||
|
return report
|
||||||
|
|
||||||
|
def export_for_home_assistant(self, output_file: str = "devices_for_ha.txt"):
|
||||||
|
"""Export devices in a format easy to copy into Home Assistant"""
|
||||||
|
devices = self.discover_devices()
|
||||||
|
|
||||||
|
with open(output_file, 'w') as f:
|
||||||
|
f.write("# Copy these MAC addresses into your Home Assistant input_text entities\n")
|
||||||
|
f.write("# Format: device_name | MAC Address | IP | Manufacturer\n\n")
|
||||||
|
|
||||||
|
for device in devices:
|
||||||
|
f.write(f"{device['suggested_name']:<40} | "
|
||||||
|
f"{device['mac']:<20} | "
|
||||||
|
f"{device['ip']:<15} | "
|
||||||
|
f"{device['manufacturer']}\n")
|
||||||
|
|
||||||
|
print(f"Device list exported to: {output_file}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main function for interactive device discovery"""
|
||||||
|
print("OPNsense Device Discovery Tool")
|
||||||
|
print("=" * 80)
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
OPNSENSE_HOST = "10.0.0.254"
|
||||||
|
OPNSENSE_API_KEY = input("Enter OPNsense API Key: ").strip()
|
||||||
|
OPNSENSE_API_SECRET = input("Enter OPNsense API Secret: ").strip()
|
||||||
|
|
||||||
|
print()
|
||||||
|
|
||||||
|
manager = OPNsenseDeviceManager(OPNSENSE_HOST, OPNSENSE_API_KEY, OPNSENSE_API_SECRET)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
print("\nOptions:")
|
||||||
|
print("1. Discover and list all devices")
|
||||||
|
print("2. Generate device report")
|
||||||
|
print("3. Export devices for Home Assistant")
|
||||||
|
print("4. List current firewall aliases")
|
||||||
|
print("5. Create new device alias")
|
||||||
|
print("6. Exit")
|
||||||
|
|
||||||
|
choice = input("\nSelect option (1-6): ").strip()
|
||||||
|
|
||||||
|
if choice == "1":
|
||||||
|
devices = manager.discover_devices()
|
||||||
|
print(f"\nFound {len(devices)} devices:\n")
|
||||||
|
|
||||||
|
for idx, device in enumerate(devices, 1):
|
||||||
|
print(f"{idx}. {device['suggested_name']}")
|
||||||
|
print(f" MAC: {device['mac']}")
|
||||||
|
print(f" IP: {device['ip']}")
|
||||||
|
print(f" Manufacturer: {device['manufacturer']}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
elif choice == "2":
|
||||||
|
report = manager.generate_device_report()
|
||||||
|
print(report)
|
||||||
|
|
||||||
|
save = input("Save report to file? (y/n): ").strip().lower()
|
||||||
|
if save == 'y':
|
||||||
|
with open('device_report.txt', 'w') as f:
|
||||||
|
f.write(report)
|
||||||
|
print("Report saved to: device_report.txt")
|
||||||
|
|
||||||
|
elif choice == "3":
|
||||||
|
manager.export_for_home_assistant()
|
||||||
|
|
||||||
|
elif choice == "4":
|
||||||
|
aliases = manager.list_aliases()
|
||||||
|
print(f"\nFound {len(aliases)} MAC-based aliases:\n")
|
||||||
|
|
||||||
|
for alias in aliases:
|
||||||
|
print(f"Name: {alias['name']}")
|
||||||
|
print(f"Description: {alias['description']}")
|
||||||
|
print(f"ID: {alias['id']}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
elif choice == "5":
|
||||||
|
print("\nCreate New Device Alias")
|
||||||
|
name = input("Alias name (e.g., Blocked_Bella): ").strip()
|
||||||
|
description = input("Description: ").strip()
|
||||||
|
|
||||||
|
print("\nEnter MAC addresses (one per line, empty line to finish):")
|
||||||
|
mac_addresses = []
|
||||||
|
while True:
|
||||||
|
mac = input("MAC: ").strip()
|
||||||
|
if not mac:
|
||||||
|
break
|
||||||
|
mac_addresses.append(mac)
|
||||||
|
|
||||||
|
if mac_addresses:
|
||||||
|
if manager.create_firewall_alias(name, mac_addresses, description):
|
||||||
|
print(f"\nAlias '{name}' created successfully!")
|
||||||
|
|
||||||
|
create_rule = input("Create block rule for this alias? (y/n): ").strip().lower()
|
||||||
|
if create_rule == 'y':
|
||||||
|
if manager.create_block_rule(name, f"Block {description}"):
|
||||||
|
print("Block rule created successfully!")
|
||||||
|
else:
|
||||||
|
print("Failed to create alias")
|
||||||
|
else:
|
||||||
|
print("No MAC addresses provided")
|
||||||
|
|
||||||
|
elif choice == "6":
|
||||||
|
print("Exiting...")
|
||||||
|
break
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("Invalid option")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user