Added SMS functions to buses and travellers

This commit is contained in:
John Mullins
2024-11-11 11:36:39 +11:00
parent a7e4f3e2de
commit 6bd2664352
5 changed files with 173 additions and 36 deletions
+6
View File
@@ -87,6 +87,12 @@ if "AZURE_CLIENT_ID" in os.environ:
# "PUBLIC_PATHS": ['/go/',], # Optional, public paths accessible by non-authenticated users
}
if "TELSTRA_CLIENT_ID" in os.environ:
TELSTRA_AUTH = {
'client_id': os.environ.get('TELSTRA_CLIENT_ID'),
'client_secret': os.environ.get('TELSTRA_CLIENT_SECRET')
}
ROOT_URLCONF = 'busManager.urls'
TEMPLATES = [
+11 -1
View File
@@ -26,6 +26,16 @@ class BusRollMixin:
def show_emergency_contacts(self, request, queryset):
return render_to_pdf('reports/emergency_contacts.html', emergency_contacts_context(queryset))
def sms_traveller_contacts(self, request, queryset):
travellers = None
for bus in queryset:
query = Traveller.objects.filter(bus_stops__bus=bus).filter(is_active=True).distinct()
if travellers is None:
travellers = query
else:
travellers.union(query)
return send_sms(self, request, travellers)
def email_bus_roll(self, request, queryset):
return email_companies_bus_roll(request, queryset)
@@ -137,7 +147,7 @@ class BusesAdmin(MyImportExportModelAdmin, admin.ModelAdmin, BusRollMixin):
list_filter = ["company"]
list_display = ["route_name", "company", "contract_number", "seating_capacity", "route_travellers"]
readonly_fields = ["traveller_count"]
actions = ["show_bus_roll", "show_emergency_contacts", "email_bus_roll", "email_emergency_contacts"]
actions = ["show_bus_roll", "show_emergency_contacts", "sms_traveller_contacts", "email_bus_roll", "email_emergency_contacts"]
inlines = [DriverInline, BusStopInline]
fieldsets = [
(None, {'fields': [
+17 -8
View File
@@ -1,6 +1,7 @@
from datetime import datetime
import phonenumbers
from django.core.exceptions import ValidationError
from django.db import models
@@ -271,9 +272,6 @@ class Traveller(models.Model):
else:
self.address = "Multiple"
def get_parsed_numbers(self, parents=False, emergency=False):
return [] # Moved to family model. To remove from traveller model
def get_families(self):
return Family.objects.filter(traveller__id__exact=self.id)
@@ -320,6 +318,11 @@ class Family(models.Model):
def __str__(self):
return self.parent_names()
def clean(self):
valid_numbers, failed_numbers = self.get_parsed_numbers(True, True)
if len(failed_numbers) > 0:
raise ValidationError(f"Phone number {failed_numbers[0]} not valid")
def parent_names(self):
a_name = self.parent_A_firstname
b_name = self.parent_B_firstname
@@ -343,13 +346,19 @@ class Family(models.Model):
numbers.append(self.emergency_contact_A_phone)
if self.emergency_contact_B_phone:
numbers.append(self.emergency_contact_B_phone)
valid_numbers = []
failed_numbers = []
for number in numbers:
num = phonenumbers.parse(number, "AU")
if phonenumbers.is_valid_number(num):
valid_numbers.append(num.national_number)
return valid_numbers
try:
num = phonenumbers.parse(number, "AU")
if phonenumbers.is_valid_number(num):
valid_numbers.append(f"+{num.country_code}{num.national_number}")
else:
failed_numbers.append(number)
except:
failed_numbers.append(number)
return valid_numbers, failed_numbers
class TravellerRoute(models.Model):
+10 -6
View File
@@ -10,12 +10,16 @@
<th>Emergency B</th>
</tr>
{% for item in items %}
<tr>
<td>{{ item }}</td>
<td>{{ item.parent_A_phone }}</td>
<td>{{ item.parent_B_phone }}</td>
<td>{{ item.emergency_contact_A_phone }}</td>
<td>{{ item.emergency_contact_B_phone }}</td>
{% if item.has_failed_number %}
<tr style="color:red">
{% else %}
<tr>
{% endif %}
<td>{{ item.traveller }}</td>
<td>{{ item.parent_A }}</td>
<td>{{ item.parent_B }}</td>
<td>{{ item.contact_A }}</td>
<td>{{ item.contact_B }}</td>
</tr>
{% endfor %}
</table>
+129 -21
View File
@@ -1,6 +1,75 @@
from django.conf import settings
from django.http import HttpResponseRedirect
from django import forms
from django.shortcuts import render
import requests
def _get_token():
url = 'https://products.api.telstra.com/v2/oauth/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'client_id': settings.TELSTRA_AUTH['client_id'],
'client_secret': settings.TELSTRA_AUTH['client_secret'],
'grant_type': 'client_credentials'
}
result = requests.post(url, data=data, headers=headers)
if result.status_code != 200:
print("Bad request for telstra access_token:" + str(result.status_code))
return None
return result.json()['access_token']
def telstra_api_request(url, data=None, method="POST"):
url = 'https://products.api.telstra.com/' + url
token = _get_token()
headers = {
'Telstra-api-version': '3.x',
'Content-Language': 'en-au',
'Authorization': f'Bearer {token}',
'Accept': 'application/json',
'Accept-Charset': 'utf-8',
'Content-Type': 'application/json'
}
result = requests.request(method, url, json=data, headers=headers)
if result.status_code != 200:
print("Bad request:" + str(result.status_code))
print(result.content)
return False, result.content
return True, result.json()
def _send_message(to, msg):
url = 'messaging/v3/messages'
data = {
'to': to,
'from': _get_virtual_numbers(),
'messageContent': msg
}
result = telstra_api_request(url, data)
return result
def _get_virtual_numbers():
url = 'messaging/v3/virtual-numbers'
success, result = telstra_api_request(url, method='GET')
if not success:
print("No number found")
success, result = telstra_api_request(url, method='POST')
if not success:
return None
return result['virtualNumber']
numbers = result['virtualNumbers']
if numbers is None:
return None
return numbers[0]['virtualNumber']
class SMSForm(forms.Form):
@@ -8,32 +77,71 @@ class SMSForm(forms.Form):
send_to_parents = forms.BooleanField(required=False)
send_to_emergency_contacts = forms.BooleanField(required=False)
only_include_active_travellers = forms.BooleanField(initial=True, required=False)
message = forms.CharField(label="Message", max_length=320, widget=forms.Textarea)
message = forms.CharField(label="Message", max_length=160, widget=forms.Textarea)
def _get_numbers(request, queryset):
send_to_parents = bool(request.POST.get("send_to_parents"))
send_to_emergency_contacts = bool(request.POST.get("send_to_emergency_contacts"))
only_include_active_travellers = bool(request.POST.get("only_include_active_travellers"))
numbers = []
for traveller in queryset:
if only_include_active_travellers and not traveller._is_active():
continue
for family in traveller.get_families():
family_numbers, _ = family.get_parsed_numbers(parents=send_to_parents, emergency=send_to_emergency_contacts)
if family_numbers:
numbers = numbers + family_numbers
return list(set(numbers)) # Remove duplicates
def _family_context(queryset):
family_set = []
for traveller in queryset:
families = traveller.get_families()
if len(families) == 0:
family_set.append({
'traveller': traveller.__str__(),
'has_failed_number': True
})
for family in families:
_, failed_numbers = family.get_parsed_numbers(True, True)
family_context = {
'traveller': traveller.__str__(),
'has_failed_number': len(failed_numbers) > 0
}
if family.parent_A_phone:
family_context['parent_A'] = f"{family.parent_A_firstname} {family.parent_A_lastname} ({family.parent_A_phone})"
if family.parent_B_phone:
family_context['parent_B'] = f"{family.parent_B_firstname} {family.parent_B_lastname} ({family.parent_B_phone})"
if family.emergency_contact_A_phone:
family_context['contact_A'] = f"{family.emergency_contact_A_firstname} {family.emergency_contact_A_lastname} ({family.emergency_contact_A_phone})"
if family.emergency_contact_B_phone:
family_context['contact_B'] = f"{family.emergency_contact_B_firstname} {family.emergency_contact_B_lastname} ({family.emergency_contact_B_phone})"
family_set.append(family_context)
return family_set
def send_sms(send_sms_mixin, request, queryset):
if 'confirm' in request.POST:
message = request.POST["message"]
send_to_parents = False
if request.POST.get("send_to_parents"):
send_to_parents = True
send_to_emergency_contacts = False
if request.POST.get("send_to_emergency_contacts"):
send_to_emergency_contacts = True
only_include_active_travellers = False
if request.POST.get("only_include_active_travellers"):
only_include_active_travellers = True
total = 0
numbers = []
for traveller in queryset:
if only_include_active_travellers and not traveller._is_active():
continue
numbers.append(traveller.get_parsed_numbers(parents=send_to_parents, emergency=send_to_emergency_contacts))
if not settings.TELSTRA_AUTH:
send_sms_mixin.message_user(request, "Telstra auth not configured", level="WARNING")
return HttpResponseRedirect(request.get_full_path())
len(numbers)
send_sms_mixin.message_user(request, f"SMS has been sent to {total} recipients")
if 'send' in request.POST:
numbers = _get_numbers(request, queryset)
if len(numbers) > 500:
send_sms_mixin.message_user(request, f"SMS failed. Total phone numbers ({len(numbers)}) exceeds 500", level="WARNING")
return HttpResponseRedirect(request.get_full_path())
if len(numbers) == 0:
send_sms_mixin.message_user(request, f"SMS failed. No numbers we selected", level="WARNING")
return HttpResponseRedirect(request.get_full_path())
result = _send_message(numbers, request.POST["message"])
send_sms_mixin.message_user(request, f"SMS has been sent to {len(numbers)} recipients")
return HttpResponseRedirect(request.get_full_path())
form = SMSForm(initial={'_selected_action': queryset.values_list('id', flat=True)})
return render(request, 'admin/sms_form.html', context={'form': form, 'items': queryset})
family_set = _family_context(queryset)
return render(request, 'admin/sms_form.html', context={'form': form, 'items': family_set})