Add Python script for dynamic person-based alias management
This commit is contained in:
175
opnsense_person_alias_manager.py
Normal file
175
opnsense_person_alias_manager.py
Normal file
@@ -0,0 +1,175 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
OPNsense Person-Based Alias Manager
|
||||
Dynamically updates OPNsense firewall aliases based on device assignments in Home Assistant
|
||||
"""
|
||||
|
||||
import requests
|
||||
import sys
|
||||
import json
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||||
|
||||
class PersonBasedAliasManager:
|
||||
def __init__(self, opnsense_host: str, api_key: str, api_secret: str):
|
||||
self.host = opnsense_host
|
||||
self.api_key = api_key
|
||||
self.api_secret = api_secret
|
||||
self.base_url = f"https://{opnsense_host}/api"
|
||||
|
||||
def _make_request(self, endpoint: str, method: str = "GET", data: dict = None):
|
||||
"""Make API request to OPNsense"""
|
||||
url = f"{self.base_url}/{endpoint}"
|
||||
|
||||
try:
|
||||
if method == "GET":
|
||||
response = requests.get(
|
||||
url,
|
||||
auth=(self.api_key, self.api_secret),
|
||||
verify=False,
|
||||
timeout=10
|
||||
)
|
||||
elif method == "POST":
|
||||
response = requests.post(
|
||||
url,
|
||||
auth=(self.api_key, self.api_secret),
|
||||
json=data,
|
||||
verify=False,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except Exception as e:
|
||||
print(f"Error making request to {endpoint}: {e}")
|
||||
return None
|
||||
|
||||
def update_person_alias(self, person: str, mac_addresses: list, force_create: bool = False):
|
||||
"""Update or create a person's blocked device alias"""
|
||||
alias_name = f"Blocked_{person}"
|
||||
|
||||
# Filter out empty MAC addresses
|
||||
mac_addresses = [mac for mac in mac_addresses if mac and mac.strip()]
|
||||
|
||||
if not mac_addresses and not force_create:
|
||||
print(f"No MAC addresses for {person}, skipping alias update")
|
||||
return True
|
||||
|
||||
# Get existing aliases to check if it exists
|
||||
aliases = self._make_request("firewall/alias/get")
|
||||
|
||||
if not aliases:
|
||||
print(f"Failed to retrieve aliases")
|
||||
return False
|
||||
|
||||
# Find the alias UUID if it exists
|
||||
alias_uuid = None
|
||||
alias_data_dict = aliases.get('data', {}).get('alias', {}).get('aliases', {}).get('alias', {})
|
||||
|
||||
for uuid, alias_info in alias_data_dict.items():
|
||||
if alias_info.get('name') == alias_name:
|
||||
alias_uuid = uuid
|
||||
break
|
||||
|
||||
# Prepare alias data
|
||||
alias_data = {
|
||||
"alias": {
|
||||
"enabled": "1",
|
||||
"name": alias_name,
|
||||
"type": "mac",
|
||||
"content": "\n".join(mac_addresses) if mac_addresses else "",
|
||||
"description": f"Blocked devices for {person} - Managed by Home Assistant"
|
||||
}
|
||||
}
|
||||
|
||||
# Create or update
|
||||
if alias_uuid:
|
||||
# Update existing alias
|
||||
endpoint = f"firewall/alias/setItem/{alias_uuid}"
|
||||
else:
|
||||
# Create new alias
|
||||
endpoint = "firewall/alias/setItem"
|
||||
|
||||
result = self._make_request(endpoint, method="POST", data=alias_data)
|
||||
|
||||
if result and result.get('result') == 'saved':
|
||||
# Apply changes
|
||||
self._make_request("firewall/alias/reconfigure", method="POST")
|
||||
print(f"✓ Updated alias '{alias_name}' with {len(mac_addresses)} MAC(s)")
|
||||
return True
|
||||
else:
|
||||
print(f"✗ Failed to update alias '{alias_name}'")
|
||||
return False
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function - expects JSON input via stdin with device assignments
|
||||
|
||||
Expected JSON format:
|
||||
{
|
||||
"opnsense_host": "10.0.0.254",
|
||||
"api_key": "...",
|
||||
"api_secret": "...",
|
||||
"people": {
|
||||
"Bella": {
|
||||
"block": true,
|
||||
"devices": [
|
||||
{"name": "bella_phone", "mac": "aa:bb:cc:dd:ee:ff", "blocked": true},
|
||||
{"name": "bella_tablet", "mac": "11:22:33:44:55:66", "blocked": false}
|
||||
]
|
||||
},
|
||||
"Xander": { ... },
|
||||
...
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
# Read JSON from stdin
|
||||
try:
|
||||
input_data = json.load(sys.stdin)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error parsing JSON input: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Extract configuration
|
||||
opnsense_host = input_data.get('opnsense_host', '10.0.0.254')
|
||||
api_key = input_data.get('api_key')
|
||||
api_secret = input_data.get('api_secret')
|
||||
people = input_data.get('people', {})
|
||||
|
||||
if not api_key or not api_secret:
|
||||
print("Error: API key and secret are required")
|
||||
sys.exit(1)
|
||||
|
||||
# Initialize manager
|
||||
manager = PersonBasedAliasManager(opnsense_host, api_key, api_secret)
|
||||
|
||||
# Update aliases for each person
|
||||
success_count = 0
|
||||
total_count = 0
|
||||
|
||||
for person, person_data in people.items():
|
||||
total_count += 1
|
||||
|
||||
# Get blocked MACs for this person
|
||||
blocked_macs = []
|
||||
|
||||
for device in person_data.get('devices', []):
|
||||
# Add MAC if device is blocked OR if person-level block is enabled
|
||||
if device.get('blocked') or person_data.get('block'):
|
||||
mac = device.get('mac')
|
||||
if mac:
|
||||
blocked_macs.append(mac)
|
||||
|
||||
# Update the alias
|
||||
if manager.update_person_alias(person, blocked_macs, force_create=True):
|
||||
success_count += 1
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Updated {success_count}/{total_count} person aliases")
|
||||
print(f"{'='*60}")
|
||||
|
||||
sys.exit(0 if success_count == total_count else 1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user