163 lines
6.1 KiB
Python
163 lines
6.1 KiB
Python
from django.conf import settings
|
|
from django.http import HttpResponseRedirect
|
|
from django import forms
|
|
from django.shortcuts import render
|
|
import requests
|
|
|
|
|
|
def _get_token():
|
|
url = 'https://products.api.telstra.com/v2/oauth/token'
|
|
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded'
|
|
}
|
|
data = {
|
|
'client_id': settings.TELSTRA_AUTH['client_id'],
|
|
'client_secret': settings.TELSTRA_AUTH['client_secret'],
|
|
'grant_type': 'client_credentials'
|
|
}
|
|
|
|
result = requests.post(url, data=data, headers=headers)
|
|
if result.status_code != 200:
|
|
print("Bad request for telstra access_token:" + str(result.status_code))
|
|
return None
|
|
return result.json()['access_token']
|
|
|
|
|
|
def telstra_api_request(url, data=None, method="POST"):
|
|
url = 'https://products.api.telstra.com/' + url
|
|
|
|
token = _get_token()
|
|
headers = {
|
|
'Telstra-api-version': '3.x',
|
|
'Content-Language': 'en-au',
|
|
'Authorization': f'Bearer {token}',
|
|
'Accept': 'application/json',
|
|
'Accept-Charset': 'utf-8',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
result = requests.request(method, url, json=data, headers=headers)
|
|
|
|
if result.status_code != 200:
|
|
print("Bad request:" + str(result.status_code))
|
|
print(result.content)
|
|
return False, result.content
|
|
return True, result.json()
|
|
|
|
|
|
def _send_message(to, msg):
|
|
url = 'messaging/v3/messages'
|
|
data = {
|
|
'to': to,
|
|
'from': _get_virtual_numbers(),
|
|
'messageContent': msg
|
|
}
|
|
result = telstra_api_request(url, data)
|
|
return result
|
|
|
|
|
|
def _get_virtual_numbers():
|
|
url = 'messaging/v3/virtual-numbers'
|
|
success, result = telstra_api_request(url, method='GET')
|
|
if not success:
|
|
print("No number found")
|
|
success, result = telstra_api_request(url, method='POST')
|
|
if not success:
|
|
return None
|
|
return result['virtualNumber']
|
|
|
|
numbers = result['virtualNumbers']
|
|
if numbers is None:
|
|
return None
|
|
return numbers[0]['virtualNumber']
|
|
|
|
|
|
class SMSForm(forms.Form):
|
|
_selected_action = forms.CharField(widget=forms.MultipleHiddenInput)
|
|
send_to_parents = forms.BooleanField(required=False)
|
|
send_to_emergency_contacts = forms.BooleanField(required=False)
|
|
only_include_active_travellers = forms.BooleanField(initial=True, required=False)
|
|
message = forms.CharField(label="Message", max_length=160, widget=forms.Textarea)
|
|
|
|
|
|
def _get_numbers(request, queryset):
|
|
send_to_parents = bool(request.POST.get("send_to_parents"))
|
|
send_to_emergency_contacts = bool(request.POST.get("send_to_emergency_contacts"))
|
|
only_include_active_travellers = bool(request.POST.get("only_include_active_travellers"))
|
|
|
|
numbers = []
|
|
for traveller in queryset:
|
|
if only_include_active_travellers and not traveller._is_active():
|
|
continue
|
|
for family in traveller.get_families():
|
|
family_numbers, _ = family.get_parsed_numbers(parents=send_to_parents, emergency=send_to_emergency_contacts)
|
|
if family_numbers:
|
|
numbers = numbers + family_numbers
|
|
return list(set(numbers)) # Remove duplicates
|
|
|
|
|
|
def _family_context(queryset):
|
|
family_set = []
|
|
for traveller in queryset:
|
|
families = traveller.get_families()
|
|
if len(families) == 0:
|
|
family_set.append({
|
|
'traveller': traveller.__str__(),
|
|
'has_failed_number': True
|
|
})
|
|
for family in families:
|
|
_, failed_numbers = family.get_parsed_numbers(True, True)
|
|
family_context = {
|
|
'traveller': traveller.__str__(),
|
|
'has_failed_number': len(failed_numbers) > 0
|
|
}
|
|
if family.parent_A_phone:
|
|
family_context['parent_A'] = f"{family.parent_A_firstname} {family.parent_A_lastname} ({family.parent_A_phone})"
|
|
if family.parent_B_phone:
|
|
family_context['parent_B'] = f"{family.parent_B_firstname} {family.parent_B_lastname} ({family.parent_B_phone})"
|
|
if family.emergency_contact_A_phone:
|
|
family_context['contact_A'] = f"{family.emergency_contact_A_firstname} {family.emergency_contact_A_lastname} ({family.emergency_contact_A_phone})"
|
|
if family.emergency_contact_B_phone:
|
|
family_context['contact_B'] = f"{family.emergency_contact_B_firstname} {family.emergency_contact_B_lastname} ({family.emergency_contact_B_phone})"
|
|
family_set.append(family_context)
|
|
return family_set
|
|
|
|
|
|
def send_sms(send_sms_mixin, request, queryset):
|
|
if not settings.TELSTRA_AUTH:
|
|
send_sms_mixin.message_user(request, "Telstra auth not configured", level="WARNING")
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
if 'send' in request.POST:
|
|
numbers = _get_numbers(request, queryset)
|
|
if len(numbers) > 500:
|
|
send_sms_mixin.message_user(request, f"SMS failed. Total phone numbers ({len(numbers)}) exceeds 500", level="WARNING")
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
if len(numbers) == 0:
|
|
send_sms_mixin.message_user(request, f"SMS failed. No numbers we selected", level="WARNING")
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
result = _send_message(numbers, request.POST["message"])
|
|
send_sms_mixin.message_user(request, f"SMS has been sent to {len(numbers)} recipients")
|
|
return HttpResponseRedirect(request.get_full_path())
|
|
|
|
form = SMSForm(initial={'_selected_action': queryset.values_list('id', flat=True)})
|
|
|
|
family_set = _family_context(queryset)
|
|
|
|
return render(request, 'admin/sms_form.html', context={'form': form, 'items': family_set})
|
|
|
|
|
|
class SMSTestForm(forms.Form):
|
|
phone_number = forms.CharField(label="Phone number", max_length=20)
|
|
message = forms.CharField(label="Message", max_length=160, widget=forms.Textarea)
|
|
|
|
|
|
def send_sms_test(request):
|
|
|
|
if not settings.TELSTRA_AUTH:
|
|
return None
|
|
|
|
if request.method == "POST":
|
|
_send_message(request.POST["phone_number"], request.POST["message"])
|
|
return None
|