diff --git a/opnsense_device_manager.py b/opnsense_device_manager.py new file mode 100644 index 0000000..b3be92c --- /dev/null +++ b/opnsense_device_manager.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python3 +""" +OPNsense Device Discovery and Management Script +Helps discover devices on the network and manage MAC-based internet blocking +""" + +import requests +import json +from typing import List, Dict +from urllib3.exceptions import InsecureRequestWarning +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + +class OPNsenseDeviceManager: + def __init__(self, host: str, api_key: str, api_secret: str): + self.host = host + self.api_key = api_key + self.api_secret = api_secret + self.base_url = f"https://{host}/api" + + def _make_request(self, endpoint: str, method: str = "GET", data: dict = None): + """Make API request to OPNsense""" + url = f"{self.base_url}/{endpoint}" + + try: + if method == "GET": + response = requests.get( + url, + auth=(self.api_key, self.api_secret), + verify=False, + timeout=10 + ) + elif method == "POST": + response = requests.post( + url, + auth=(self.api_key, self.api_secret), + json=data, + verify=False, + timeout=10 + ) + + response.raise_for_status() + return response.json() + except Exception as e: + print(f"Error making request to {endpoint}: {e}") + return None + + def discover_devices(self) -> List[Dict]: + """Discover all devices from ARP table""" + result = self._make_request("diagnostics/interface/getArp") + + if not result: + return [] + + devices = [] + seen_macs = set() + + for entry in result.get('data', []): + mac = entry.get('mac', '') + ip = entry.get('ip', '') + manufacturer = entry.get('manufacturer', 'Unknown') + + # Skip duplicates and the router itself + if mac in seen_macs or mac == 'a8:60:b6:34:13:e1': + continue + + # Skip MAC randomization placeholder + if mac == '7e:fe:ce:1a:4b:f0': + continue + + seen_macs.add(mac) + + device_info = { + 'mac': mac, + 'ip': ip, + 'manufacturer': manufacturer, + 'intf_description': entry.get('intf_description', 'LAN'), + 'suggested_name': self._suggest_device_name(manufacturer, ip) + } + + devices.append(device_info) + + return sorted(devices, key=lambda x: x['ip']) + + def _suggest_device_name(self, manufacturer: str, ip: str) -> str: + """Suggest a device name based on manufacturer""" + manufacturer = manufacturer.lower() + + if 'apple' in manufacturer: + return "Apple Device (iPhone/iPad/Mac)" + elif 'google' in manufacturer: + return "Google Device (Chromecast/Phone)" + elif 'nintendo' in manufacturer: + return "Nintendo Switch" + elif 'intel' in manufacturer: + return "Computer/Laptop" + elif 'asus' in manufacturer: + return "ASUS Device" + elif 'tp-link' in manufacturer: + return "TP-Link Device (AP/Camera)" + elif 'espressif' in manufacturer: + return "ESP Device (Smart Home)" + elif 'tuya' in manufacturer: + return "Tuya Smart Device" + elif 'proxmox' in manufacturer: + return "Proxmox Server" + else: + return f"Unknown Device ({ip})" + + def create_firewall_alias(self, name: str, mac_addresses: List[str], description: str = "") -> bool: + """Create or update a firewall alias for MAC addresses""" + # First, check if alias exists + aliases = self._make_request("firewall/alias/getItem") + + alias_data = { + "alias": { + "enabled": "1", + "name": name, + "type": "mac", + "content": "\n".join(mac_addresses), + "description": description + } + } + + # Create or update + result = self._make_request( + f"firewall/alias/setItem", + method="POST", + data=alias_data + ) + + if result and result.get('result') == 'saved': + # Apply changes + self._make_request("firewall/alias/reconfigure", method="POST") + return True + + return False + + def create_block_rule(self, alias_name: str, description: str) -> bool: + """Create a firewall rule to block internet access for an alias""" + rule_data = { + "rule": { + "enabled": "1", + "action": "block", + "interface": "lan", + "direction": "out", + "ipprotocol": "inet", + "protocol": "any", + "source_net": alias_name, + "destination_net": "any", + "destination_not": "1", # NOT LAN (so it blocks WAN only) + "description": description, + "log": "1" + } + } + + result = self._make_request( + "firewall/filter/addRule", + method="POST", + data=rule_data + ) + + if result and result.get('result') == 'saved': + # Apply filter rules + self._make_request("firewall/filter/apply", method="POST") + return True + + return False + + def list_aliases(self) -> List[Dict]: + """List all firewall aliases""" + result = self._make_request("firewall/alias/get") + + if not result: + return [] + + aliases = [] + alias_data = result.get('data', {}).get('alias', {}).get('aliases', {}).get('alias', {}) + + for alias_id, alias_info in alias_data.items(): + if alias_info.get('type', {}).get('mac', {}).get('selected') == 1: + aliases.append({ + 'id': alias_id, + 'name': alias_info.get('name'), + 'description': alias_info.get('description'), + 'content': alias_info.get('content', {}) + }) + + return aliases + + def generate_device_report(self) -> str: + """Generate a formatted report of discovered devices""" + devices = self.discover_devices() + + report = "=" * 80 + "\n" + report += "OPNSENSE NETWORK DEVICE DISCOVERY REPORT\n" + report += "=" * 80 + "\n\n" + + report += f"Total Devices Found: {len(devices)}\n\n" + + # Group by manufacturer + by_manufacturer = {} + for device in devices: + mfg = device['manufacturer'] or 'Unknown' + if mfg not in by_manufacturer: + by_manufacturer[mfg] = [] + by_manufacturer[mfg].append(device) + + for manufacturer, device_list in sorted(by_manufacturer.items()): + report += f"\n{manufacturer}:\n" + report += "-" * 80 + "\n" + + for device in device_list: + report += f" MAC: {device['mac']}\n" + report += f" IP: {device['ip']}\n" + report += f" Suggested Name: {device['suggested_name']}\n" + report += "\n" + + report += "=" * 80 + "\n" + return report + + def export_for_home_assistant(self, output_file: str = "devices_for_ha.txt"): + """Export devices in a format easy to copy into Home Assistant""" + devices = self.discover_devices() + + with open(output_file, 'w') as f: + f.write("# Copy these MAC addresses into your Home Assistant input_text entities\n") + f.write("# Format: device_name | MAC Address | IP | Manufacturer\n\n") + + for device in devices: + f.write(f"{device['suggested_name']:<40} | " + f"{device['mac']:<20} | " + f"{device['ip']:<15} | " + f"{device['manufacturer']}\n") + + print(f"Device list exported to: {output_file}") + + +def main(): + """Main function for interactive device discovery""" + print("OPNsense Device Discovery Tool") + print("=" * 80) + print() + + # Configuration + OPNSENSE_HOST = "10.0.0.254" + OPNSENSE_API_KEY = input("Enter OPNsense API Key: ").strip() + OPNSENSE_API_SECRET = input("Enter OPNsense API Secret: ").strip() + + print() + + manager = OPNsenseDeviceManager(OPNSENSE_HOST, OPNSENSE_API_KEY, OPNSENSE_API_SECRET) + + while True: + print("\nOptions:") + print("1. Discover and list all devices") + print("2. Generate device report") + print("3. Export devices for Home Assistant") + print("4. List current firewall aliases") + print("5. Create new device alias") + print("6. Exit") + + choice = input("\nSelect option (1-6): ").strip() + + if choice == "1": + devices = manager.discover_devices() + print(f"\nFound {len(devices)} devices:\n") + + for idx, device in enumerate(devices, 1): + print(f"{idx}. {device['suggested_name']}") + print(f" MAC: {device['mac']}") + print(f" IP: {device['ip']}") + print(f" Manufacturer: {device['manufacturer']}") + print() + + elif choice == "2": + report = manager.generate_device_report() + print(report) + + save = input("Save report to file? (y/n): ").strip().lower() + if save == 'y': + with open('device_report.txt', 'w') as f: + f.write(report) + print("Report saved to: device_report.txt") + + elif choice == "3": + manager.export_for_home_assistant() + + elif choice == "4": + aliases = manager.list_aliases() + print(f"\nFound {len(aliases)} MAC-based aliases:\n") + + for alias in aliases: + print(f"Name: {alias['name']}") + print(f"Description: {alias['description']}") + print(f"ID: {alias['id']}") + print() + + elif choice == "5": + print("\nCreate New Device Alias") + name = input("Alias name (e.g., Blocked_Bella): ").strip() + description = input("Description: ").strip() + + print("\nEnter MAC addresses (one per line, empty line to finish):") + mac_addresses = [] + while True: + mac = input("MAC: ").strip() + if not mac: + break + mac_addresses.append(mac) + + if mac_addresses: + if manager.create_firewall_alias(name, mac_addresses, description): + print(f"\nAlias '{name}' created successfully!") + + create_rule = input("Create block rule for this alias? (y/n): ").strip().lower() + if create_rule == 'y': + if manager.create_block_rule(name, f"Block {description}"): + print("Block rule created successfully!") + else: + print("Failed to create alias") + else: + print("No MAC addresses provided") + + elif choice == "6": + print("Exiting...") + break + + else: + print("Invalid option") + + +if __name__ == "__main__": + main()