#!/usr/bin/env python3 """ OPNsense Person-Based Alias Manager Dynamically updates OPNsense firewall aliases based on device assignments in Home Assistant """ import requests import sys import json from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class PersonBasedAliasManager: def __init__(self, opnsense_host: str, api_key: str, api_secret: str): self.host = opnsense_host self.api_key = api_key self.api_secret = api_secret self.base_url = f"https://{opnsense_host}/api" def _make_request(self, endpoint: str, method: str = "GET", data: dict = None): """Make API request to OPNsense""" url = f"{self.base_url}/{endpoint}" try: if method == "GET": response = requests.get( url, auth=(self.api_key, self.api_secret), verify=False, timeout=10 ) elif method == "POST": response = requests.post( url, auth=(self.api_key, self.api_secret), json=data, verify=False, timeout=10 ) response.raise_for_status() return response.json() except Exception as e: print(f"Error making request to {endpoint}: {e}") return None def update_person_alias(self, person: str, mac_addresses: list, force_create: bool = False): """Update or create a person's blocked device alias""" alias_name = f"Blocked_{person}" # Filter out empty MAC addresses mac_addresses = [mac for mac in mac_addresses if mac and mac.strip()] if not mac_addresses and not force_create: print(f"No MAC addresses for {person}, skipping alias update") return True # Get existing aliases to check if it exists aliases = self._make_request("firewall/alias/get") if not aliases: print(f"Failed to retrieve aliases") return False # Find the alias UUID if it exists alias_uuid = None alias_data_dict = aliases.get('data', {}).get('alias', {}).get('aliases', {}).get('alias', {}) for uuid, alias_info in alias_data_dict.items(): if alias_info.get('name') == alias_name: alias_uuid = uuid break # Prepare alias data alias_data = { "alias": { "enabled": "1", "name": alias_name, "type": "mac", "content": "\n".join(mac_addresses) if mac_addresses else "", "description": f"Blocked devices for {person} - Managed by Home Assistant" } } # Create or update if alias_uuid: # Update existing alias endpoint = f"firewall/alias/setItem/{alias_uuid}" else: # Create new alias endpoint = "firewall/alias/setItem" result = self._make_request(endpoint, method="POST", data=alias_data) if result and result.get('result') == 'saved': # Apply changes self._make_request("firewall/alias/reconfigure", method="POST") print(f"✓ Updated alias '{alias_name}' with {len(mac_addresses)} MAC(s)") return True else: print(f"✗ Failed to update alias '{alias_name}'") return False def main(): """ Main function - expects JSON input via stdin with device assignments Expected JSON format: { "opnsense_host": "10.0.0.254", "api_key": "...", "api_secret": "...", "people": { "Bella": { "block": true, "devices": [ {"name": "bella_phone", "mac": "aa:bb:cc:dd:ee:ff", "blocked": true}, {"name": "bella_tablet", "mac": "11:22:33:44:55:66", "blocked": false} ] }, "Xander": { ... }, ... } } """ # Read JSON from stdin try: input_data = json.load(sys.stdin) except json.JSONDecodeError as e: print(f"Error parsing JSON input: {e}") sys.exit(1) # Extract configuration opnsense_host = input_data.get('opnsense_host', '10.0.0.254') api_key = input_data.get('api_key') api_secret = input_data.get('api_secret') people = input_data.get('people', {}) if not api_key or not api_secret: print("Error: API key and secret are required") sys.exit(1) # Initialize manager manager = PersonBasedAliasManager(opnsense_host, api_key, api_secret) # Update aliases for each person success_count = 0 total_count = 0 for person, person_data in people.items(): total_count += 1 # Get blocked MACs for this person blocked_macs = [] for device in person_data.get('devices', []): # Add MAC if device is blocked OR if person-level block is enabled if device.get('blocked') or person_data.get('block'): mac = device.get('mac') if mac: blocked_macs.append(mac) # Update the alias if manager.update_person_alias(person, blocked_macs, force_create=True): success_count += 1 print(f"\n{'='*60}") print(f"Updated {success_count}/{total_count} person aliases") print(f"{'='*60}") sys.exit(0 if success_count == total_count else 1) if __name__ == "__main__": main()