diff --git a/opnsense_person_alias_manager.py b/opnsense_person_alias_manager.py new file mode 100644 index 0000000..5311663 --- /dev/null +++ b/opnsense_person_alias_manager.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +""" +OPNsense Person-Based Alias Manager +Dynamically updates OPNsense firewall aliases based on device assignments in Home Assistant +""" + +import requests +import sys +import json +from urllib3.exceptions import InsecureRequestWarning +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + +class PersonBasedAliasManager: + def __init__(self, opnsense_host: str, api_key: str, api_secret: str): + self.host = opnsense_host + self.api_key = api_key + self.api_secret = api_secret + self.base_url = f"https://{opnsense_host}/api" + + def _make_request(self, endpoint: str, method: str = "GET", data: dict = None): + """Make API request to OPNsense""" + url = f"{self.base_url}/{endpoint}" + + try: + if method == "GET": + response = requests.get( + url, + auth=(self.api_key, self.api_secret), + verify=False, + timeout=10 + ) + elif method == "POST": + response = requests.post( + url, + auth=(self.api_key, self.api_secret), + json=data, + verify=False, + timeout=10 + ) + + response.raise_for_status() + return response.json() + except Exception as e: + print(f"Error making request to {endpoint}: {e}") + return None + + def update_person_alias(self, person: str, mac_addresses: list, force_create: bool = False): + """Update or create a person's blocked device alias""" + alias_name = f"Blocked_{person}" + + # Filter out empty MAC addresses + mac_addresses = [mac for mac in mac_addresses if mac and mac.strip()] + + if not mac_addresses and not force_create: + print(f"No MAC addresses for {person}, skipping alias update") + return True + + # Get existing aliases to check if it exists + aliases = self._make_request("firewall/alias/get") + + if not aliases: + print(f"Failed to retrieve aliases") + return False + + # Find the alias UUID if it exists + alias_uuid = None + alias_data_dict = aliases.get('data', {}).get('alias', {}).get('aliases', {}).get('alias', {}) + + for uuid, alias_info in alias_data_dict.items(): + if alias_info.get('name') == alias_name: + alias_uuid = uuid + break + + # Prepare alias data + alias_data = { + "alias": { + "enabled": "1", + "name": alias_name, + "type": "mac", + "content": "\n".join(mac_addresses) if mac_addresses else "", + "description": f"Blocked devices for {person} - Managed by Home Assistant" + } + } + + # Create or update + if alias_uuid: + # Update existing alias + endpoint = f"firewall/alias/setItem/{alias_uuid}" + else: + # Create new alias + endpoint = "firewall/alias/setItem" + + result = self._make_request(endpoint, method="POST", data=alias_data) + + if result and result.get('result') == 'saved': + # Apply changes + self._make_request("firewall/alias/reconfigure", method="POST") + print(f"✓ Updated alias '{alias_name}' with {len(mac_addresses)} MAC(s)") + return True + else: + print(f"✗ Failed to update alias '{alias_name}'") + return False + +def main(): + """ + Main function - expects JSON input via stdin with device assignments + + Expected JSON format: + { + "opnsense_host": "10.0.0.254", + "api_key": "...", + "api_secret": "...", + "people": { + "Bella": { + "block": true, + "devices": [ + {"name": "bella_phone", "mac": "aa:bb:cc:dd:ee:ff", "blocked": true}, + {"name": "bella_tablet", "mac": "11:22:33:44:55:66", "blocked": false} + ] + }, + "Xander": { ... }, + ... + } + } + """ + + # Read JSON from stdin + try: + input_data = json.load(sys.stdin) + except json.JSONDecodeError as e: + print(f"Error parsing JSON input: {e}") + sys.exit(1) + + # Extract configuration + opnsense_host = input_data.get('opnsense_host', '10.0.0.254') + api_key = input_data.get('api_key') + api_secret = input_data.get('api_secret') + people = input_data.get('people', {}) + + if not api_key or not api_secret: + print("Error: API key and secret are required") + sys.exit(1) + + # Initialize manager + manager = PersonBasedAliasManager(opnsense_host, api_key, api_secret) + + # Update aliases for each person + success_count = 0 + total_count = 0 + + for person, person_data in people.items(): + total_count += 1 + + # Get blocked MACs for this person + blocked_macs = [] + + for device in person_data.get('devices', []): + # Add MAC if device is blocked OR if person-level block is enabled + if device.get('blocked') or person_data.get('block'): + mac = device.get('mac') + if mac: + blocked_macs.append(mac) + + # Update the alias + if manager.update_person_alias(person, blocked_macs, force_create=True): + success_count += 1 + + print(f"\n{'='*60}") + print(f"Updated {success_count}/{total_count} person aliases") + print(f"{'='*60}") + + sys.exit(0 if success_count == total_count else 1) + +if __name__ == "__main__": + main()