Files
bus-manager/busManager/messaging/services/sms.py
T
st01765 4025c28eae Added locations app
Moved contacts to new model
2026-02-26 13:34:32 +11:00

240 lines
7.5 KiB
Python

import requests
from django import forms
from django.conf import settings
from django.db.models import Prefetch
from django.http import HttpResponseRedirect
from django.shortcuts import render
from traveller.models import Family
def _get_token():
url = 'https://products.api.telstra.com/v2/oauth/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'client_id': settings.TELSTRA_AUTH['client_id'],
'client_secret': settings.TELSTRA_AUTH['client_secret'],
'grant_type': 'client_credentials'
}
result = requests.post(url, data=data, headers=headers)
if result.status_code != 200:
print("Bad request for telstra access_token:" + str(result.status_code))
return None
return result.json()['access_token']
def telstra_api_request(url, data=None, method="POST"):
url = 'https://products.api.telstra.com/' + url
token = _get_token()
headers = {
'Telstra-api-version': '3.x',
'Content-Language': 'en-au',
'Authorization': f'Bearer {token}',
'Accept': 'application/json',
'Accept-Charset': 'utf-8',
'Content-Type': 'application/json'
}
result = requests.request(method, url, json=data, headers=headers)
if result.status_code != 200:
print("Bad request:" + str(result.status_code))
print(result.content)
return False, result.content
return True, result.json()
def _send_message(to, msg):
url = 'messaging/v3/messages'
data = {
'to': to,
'from': _get_virtual_numbers(),
'messageContent': msg
}
result = telstra_api_request(url, data)
return result
def _get_virtual_numbers():
url = 'messaging/v3/virtual-numbers'
success, result = telstra_api_request(url, method='GET')
if not success:
print("No number found")
success, result = telstra_api_request(url, method='POST')
if not success:
return None
return result['virtualNumber']
numbers = result['virtualNumbers']
if numbers is None:
return None
return numbers[0]['virtualNumber']
class SMSForm(forms.Form):
_selected_action = forms.CharField(widget=forms.MultipleHiddenInput)
send_to_parents = forms.BooleanField(required=False)
send_to_emergency_contacts = forms.BooleanField(required=False)
only_include_active_travellers = forms.BooleanField(initial=True, required=False)
message = forms.CharField(label="Message", max_length=160, widget=forms.Textarea)
def _get_numbers(request, queryset):
send_to_parents = bool(request.POST.get("send_to_parents"))
send_to_emergency_contacts = bool(request.POST.get("send_to_emergency_contacts"))
only_include_active_travellers = bool(request.POST.get("only_include_active_travellers"))
numbers = set()
families_prefetch = Prefetch(
"family_set",
queryset=Family.objects.select_related(
"contact_A",
"contact_B",
"emergency_contact_A",
"emergency_contact_B",
)
)
travellers = queryset.prefetch_related(families_prefetch)
for traveller in travellers:
if only_include_active_travellers and not traveller.is_active:
continue
for family in traveller.family_set.all():
family_numbers, _ = family.get_parsed_numbers(
parents=send_to_parents,
emergency=send_to_emergency_contacts
)
numbers.update(family_numbers)
return list(numbers)
def _family_context(queryset):
families_prefetch = Prefetch(
"family_set",
queryset=Family.objects.select_related(
"contact_A",
"contact_B",
"emergency_contact_A",
"emergency_contact_B",
)
)
travellers = queryset.prefetch_related(families_prefetch)
family_set = []
for traveller in travellers:
families = traveller.family_set.all()
if not families:
family_set.append({
"traveller": str(traveller),
"has_failed_number": True
})
continue
for family in families:
_, failed_numbers = family.get_parsed_numbers(True, True)
family_context = {
"traveller": str(traveller),
"has_failed_number": len(failed_numbers) > 0,
}
# -----------------------
# Parent A
# -----------------------
if family.contact_A:
family_context["parent_A"] = (
f"{family.contact_A.first_name} "
f"{family.contact_A.last_name} "
f"({family.contact_A.phone})"
)
# -----------------------
# Parent B
# -----------------------
if family.contact_B:
family_context["parent_B"] = (
f"{family.contact_B.first_name} "
f"{family.contact_B.last_name} "
f"({family.contact_B.phone})"
)
# -----------------------
# Emergency A
# -----------------------
if family.emergency_contact_A:
family_context["contact_A"] = (
f"{family.emergency_contact_A.first_name} "
f"{family.emergency_contact_A.last_name} "
f"({family.emergency_contact_A.phone})"
)
# -----------------------
# Emergency B
# -----------------------
if family.emergency_contact_B:
family_context["contact_B"] = (
f"{family.emergency_contact_B.first_name} "
f"{family.emergency_contact_B.last_name} "
f"({family.emergency_contact_B.phone})"
)
family_set.append(family_context)
return family_set
def send_sms(send_sms_mixin, request, queryset):
if not settings.TELSTRA_AUTH:
send_sms_mixin.message_user(request, "Telstra auth not configured", level="WARNING")
return HttpResponseRedirect(request.get_full_path())
if 'send' in request.POST:
numbers = _get_numbers(request, queryset)
if len(numbers) > 500:
send_sms_mixin.message_user(request, f"SMS failed. Total phone numbers ({len(numbers)}) exceeds 500", level="WARNING")
return HttpResponseRedirect(request.get_full_path())
if len(numbers) == 0:
send_sms_mixin.message_user(request, f"SMS failed. No numbers we selected", level="WARNING")
return HttpResponseRedirect(request.get_full_path())
result = _send_message(numbers, request.POST["message"])
send_sms_mixin.message_user(request, f"SMS has been sent to {len(numbers)} recipients")
return HttpResponseRedirect(request.get_full_path())
form = SMSForm(initial={'_selected_action': queryset.values_list('id', flat=True)})
family_set = _family_context(queryset)
return render(request, 'admin/sms_form.html', context={'form': form, 'items': family_set})
class SMSTestForm(forms.Form):
phone_number = forms.CharField(label="Phone number", max_length=20)
message = forms.CharField(label="Message", max_length=160, widget=forms.Textarea)
def send_sms_test(request):
if not settings.TELSTRA_AUTH:
return None
if request.method == "POST":
_send_message(request.POST["phone_number"], request.POST["message"])
return None