Created new models to replicate out of and split coord app
Moved helpers and views to their respective new apps
This commit is contained in:
@@ -46,7 +46,8 @@ INSTALLED_APPS = [
|
||||
'rangefilter',
|
||||
'common',
|
||||
'transport',
|
||||
'traveller'
|
||||
'traveller',
|
||||
'messaging'
|
||||
]
|
||||
|
||||
if platform.system() == "Linux":
|
||||
|
||||
@@ -1,3 +1,13 @@
|
||||
from django.contrib import admin
|
||||
from import_export.admin import ImportExportModelAdmin
|
||||
|
||||
# Register your models here.
|
||||
from common.models import Suburb
|
||||
|
||||
|
||||
class MyImportExportModelAdmin(ImportExportModelAdmin):
|
||||
def has_import_permission(self, request):
|
||||
return request.user.is_superuser
|
||||
|
||||
@admin.register(Suburb)
|
||||
class SuburbsAdmin(MyImportExportModelAdmin, admin.ModelAdmin):
|
||||
list_filter = ["state"]
|
||||
@@ -0,0 +1,14 @@
|
||||
from io import BytesIO
|
||||
|
||||
from django.http import HttpResponse
|
||||
from django.template.loader import get_template
|
||||
from xhtml2pdf import pisa
|
||||
|
||||
|
||||
def render_to_pdf(template, context):
|
||||
html = get_template(template).render(context)
|
||||
result = BytesIO()
|
||||
pdf = pisa.pisaDocument(BytesIO(html.encode("ISO-8859-1")), result)
|
||||
if pdf.err:
|
||||
return HttpResponse("Invalid PDF", status_code=400, content_type='text/plan')
|
||||
return HttpResponse(result.getvalue(), content_type='application/pdf')
|
||||
@@ -1,3 +1,23 @@
|
||||
from django.db import models
|
||||
|
||||
# Create your models here.
|
||||
class Suburb(models.Model):
|
||||
STATE = [
|
||||
("VIC", "Victoria"),
|
||||
("NSW", "New South Wales"),
|
||||
("SA", "South Australia"),
|
||||
("ACT", "Australia Capital Territory"),
|
||||
("QLD", "Queensland"),
|
||||
("NT", "Northern Territory"),
|
||||
("WA", "Western Australia"),
|
||||
("TAS", "Tasmania"),
|
||||
]
|
||||
name = models.CharField(max_length=30, unique=True)
|
||||
state = models.CharField(max_length=3, choices=STATE)
|
||||
postcode = models.PositiveSmallIntegerField()
|
||||
distance = models.PositiveSmallIntegerField(blank=True, null=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ["name"]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}, {self.state} {self.postcode}"
|
||||
@@ -21,7 +21,7 @@ def emergency_contacts(request):
|
||||
def bus_roll(request):
|
||||
return render_to_pdf('reports/bus_roll.html', bus_roll_context())
|
||||
|
||||
|
||||
@staff_member_required
|
||||
def sms_message(request, queryset):
|
||||
if request.method == 'POST':
|
||||
form = SMSForm(request.POST)
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class MessagingConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'messaging'
|
||||
@@ -0,0 +1,127 @@
|
||||
from datetime import date
|
||||
|
||||
from django.core.mail import EmailMessage
|
||||
|
||||
from common.documents import render_to_pdf
|
||||
from transport.context_busroll import bus_roll_context
|
||||
from transport.context_helpers import emergency_contacts_context
|
||||
from transport.models import Company
|
||||
from traveller.context_helpers import school_roll_context
|
||||
from traveller.models import School
|
||||
|
||||
|
||||
def _getBCC(request):
|
||||
if request.user.email:
|
||||
return [request.user.email]
|
||||
return []
|
||||
|
||||
|
||||
def email_companies_bus_roll(request, query_set=None):
|
||||
html_template = 'reports/bus_roll.html'
|
||||
context = bus_roll_context(query_set)
|
||||
|
||||
for company in Company.objects.all():
|
||||
if not company.contact_email:
|
||||
continue
|
||||
company_route = []
|
||||
|
||||
for route in context.get("routes"):
|
||||
if route.get("bus").company == company:
|
||||
company_route.append(route)
|
||||
if not company_route:
|
||||
continue
|
||||
company_context = {'routes': company_route}
|
||||
pdf = render_to_pdf(html_template, company_context)
|
||||
|
||||
subject = "Echuca Schools Bus Roll"
|
||||
message = f"A new bus roll for {company.name} has been generated"
|
||||
email_from = "bus.manager@education.vic.gov.au"
|
||||
recipient = [company.contact_email]
|
||||
email = EmailMessage(subject, message, email_from, recipient, _getBCC(request))
|
||||
email.attach(f"school_bus_roll_{date.today()}.pdf", pdf.content)
|
||||
email.send(fail_silently=True)
|
||||
|
||||
return render_to_pdf(html_template, context)
|
||||
|
||||
|
||||
def email_companies_emergency_contacts(request, query_set=None):
|
||||
html_template = 'reports/emergency_contacts.html'
|
||||
context = emergency_contacts_context(query_set)
|
||||
|
||||
for company in Company.objects.all():
|
||||
if not company.contact_email:
|
||||
continue
|
||||
company_route = []
|
||||
|
||||
for route in context.get("routes"):
|
||||
if route.get("bus").company == company:
|
||||
company_route.append(route)
|
||||
if not company_route:
|
||||
continue
|
||||
company_context = {'routes': company_route}
|
||||
pdf = render_to_pdf(html_template, company_context)
|
||||
|
||||
subject = "Echuca School Buses Emergency Contacts"
|
||||
message = f"A new emergency contact list for {company.name} has been generated"
|
||||
email_from = "bus.manager@education.vic.gov.au"
|
||||
recipient = [company.contact_email]
|
||||
email = EmailMessage(subject, message, email_from, recipient, _getBCC(request))
|
||||
email.attach(f"school_bus_roll_{date.today()}.pdf", pdf.content)
|
||||
email.send(fail_silently=True)
|
||||
|
||||
return render_to_pdf(html_template, context)
|
||||
|
||||
|
||||
def email_school_roll(request, query_set):
|
||||
html_template = 'reports/school_roll.html'
|
||||
context = school_roll_context(query_set)
|
||||
|
||||
for school in School.objects.all():
|
||||
if not school.email:
|
||||
continue
|
||||
school_route = []
|
||||
for school_context in context.get("schools"):
|
||||
if school_context.get("name") == school.name:
|
||||
school_route.append(school_context)
|
||||
if not school_route:
|
||||
continue
|
||||
school_context = {'schools': school_route}
|
||||
pdf = render_to_pdf(html_template, school_context)
|
||||
|
||||
subject = "Echuca Schools Bus Roll"
|
||||
message = f"A new bus roll for {school.name} has been generated"
|
||||
email_from = "bus.manager@education.vic.gov.au"
|
||||
recipient = [school.email]
|
||||
email = EmailMessage(subject, message, email_from, recipient, _getBCC(request))
|
||||
email.attach(f"school_bus_roll_{date.today()}.pdf", pdf.content)
|
||||
email.send(fail_silently=True)
|
||||
|
||||
return render_to_pdf(html_template, context)
|
||||
|
||||
|
||||
def email_school_shuttle_roll(request, queryset):
|
||||
html_template = 'reports/bus_roll.html'
|
||||
|
||||
schools = []
|
||||
for shuttle in queryset:
|
||||
if shuttle.school not in schools:
|
||||
schools.append(shuttle.school)
|
||||
for school in schools:
|
||||
buses = []
|
||||
for shuttle in queryset:
|
||||
if shuttle.school == school:
|
||||
buses.append(shuttle.bus)
|
||||
pdf = render_to_pdf(html_template, bus_roll_context(buses, include_bus_stops=False))
|
||||
|
||||
subject = "Echuca Schools Shuttle Roll"
|
||||
message = f"A new shuttle roll for {school.name} has been generated"
|
||||
email_from = "bus.manager@education.vic.gov.au"
|
||||
recipient = [school.email]
|
||||
email = EmailMessage(subject, message, email_from, recipient, _getBCC(request))
|
||||
email.attach(f"school_shuttle_roll_{date.today()}.pdf", pdf.content)
|
||||
email.send(fail_silently=True)
|
||||
buses = []
|
||||
for shuttle in queryset:
|
||||
if shuttle.bus not in buses:
|
||||
buses.append(shuttle.bus)
|
||||
return render_to_pdf(html_template, bus_roll_context(buses, include_bus_stops=False))
|
||||
@@ -0,0 +1,162 @@
|
||||
import requests
|
||||
from django import forms
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
|
||||
|
||||
def _get_token():
|
||||
url = 'https://products.api.telstra.com/v2/oauth/token'
|
||||
|
||||
headers = {
|
||||
'Content-Type': 'application/x-www-form-urlencoded'
|
||||
}
|
||||
data = {
|
||||
'client_id': settings.TELSTRA_AUTH['client_id'],
|
||||
'client_secret': settings.TELSTRA_AUTH['client_secret'],
|
||||
'grant_type': 'client_credentials'
|
||||
}
|
||||
|
||||
result = requests.post(url, data=data, headers=headers)
|
||||
if result.status_code != 200:
|
||||
print("Bad request for telstra access_token:" + str(result.status_code))
|
||||
return None
|
||||
return result.json()['access_token']
|
||||
|
||||
|
||||
def telstra_api_request(url, data=None, method="POST"):
|
||||
url = 'https://products.api.telstra.com/' + url
|
||||
|
||||
token = _get_token()
|
||||
headers = {
|
||||
'Telstra-api-version': '3.x',
|
||||
'Content-Language': 'en-au',
|
||||
'Authorization': f'Bearer {token}',
|
||||
'Accept': 'application/json',
|
||||
'Accept-Charset': 'utf-8',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
result = requests.request(method, url, json=data, headers=headers)
|
||||
|
||||
if result.status_code != 200:
|
||||
print("Bad request:" + str(result.status_code))
|
||||
print(result.content)
|
||||
return False, result.content
|
||||
return True, result.json()
|
||||
|
||||
|
||||
def _send_message(to, msg):
|
||||
url = 'messaging/v3/messages'
|
||||
data = {
|
||||
'to': to,
|
||||
'from': _get_virtual_numbers(),
|
||||
'messageContent': msg
|
||||
}
|
||||
result = telstra_api_request(url, data)
|
||||
return result
|
||||
|
||||
|
||||
def _get_virtual_numbers():
|
||||
url = 'messaging/v3/virtual-numbers'
|
||||
success, result = telstra_api_request(url, method='GET')
|
||||
if not success:
|
||||
print("No number found")
|
||||
success, result = telstra_api_request(url, method='POST')
|
||||
if not success:
|
||||
return None
|
||||
return result['virtualNumber']
|
||||
|
||||
numbers = result['virtualNumbers']
|
||||
if numbers is None:
|
||||
return None
|
||||
return numbers[0]['virtualNumber']
|
||||
|
||||
|
||||
class SMSForm(forms.Form):
|
||||
_selected_action = forms.CharField(widget=forms.MultipleHiddenInput)
|
||||
send_to_parents = forms.BooleanField(required=False)
|
||||
send_to_emergency_contacts = forms.BooleanField(required=False)
|
||||
only_include_active_travellers = forms.BooleanField(initial=True, required=False)
|
||||
message = forms.CharField(label="Message", max_length=160, widget=forms.Textarea)
|
||||
|
||||
|
||||
def _get_numbers(request, queryset):
|
||||
send_to_parents = bool(request.POST.get("send_to_parents"))
|
||||
send_to_emergency_contacts = bool(request.POST.get("send_to_emergency_contacts"))
|
||||
only_include_active_travellers = bool(request.POST.get("only_include_active_travellers"))
|
||||
|
||||
numbers = []
|
||||
for traveller in queryset:
|
||||
if only_include_active_travellers and not traveller._is_active():
|
||||
continue
|
||||
for family in traveller.get_families():
|
||||
family_numbers, _ = family.get_parsed_numbers(parents=send_to_parents, emergency=send_to_emergency_contacts)
|
||||
if family_numbers:
|
||||
numbers = numbers + family_numbers
|
||||
return list(set(numbers)) # Remove duplicates
|
||||
|
||||
|
||||
def _family_context(queryset):
|
||||
family_set = []
|
||||
for traveller in queryset:
|
||||
families = traveller.get_families()
|
||||
if len(families) == 0:
|
||||
family_set.append({
|
||||
'traveller': traveller.__str__(),
|
||||
'has_failed_number': True
|
||||
})
|
||||
for family in families:
|
||||
_, failed_numbers = family.get_parsed_numbers(True, True)
|
||||
family_context = {
|
||||
'traveller': traveller.__str__(),
|
||||
'has_failed_number': len(failed_numbers) > 0
|
||||
}
|
||||
if family.parent_A_phone:
|
||||
family_context['parent_A'] = f"{family.parent_A_firstname} {family.parent_A_lastname} ({family.parent_A_phone})"
|
||||
if family.parent_B_phone:
|
||||
family_context['parent_B'] = f"{family.parent_B_firstname} {family.parent_B_lastname} ({family.parent_B_phone})"
|
||||
if family.emergency_contact_A_phone:
|
||||
family_context['contact_A'] = f"{family.emergency_contact_A_firstname} {family.emergency_contact_A_lastname} ({family.emergency_contact_A_phone})"
|
||||
if family.emergency_contact_B_phone:
|
||||
family_context['contact_B'] = f"{family.emergency_contact_B_firstname} {family.emergency_contact_B_lastname} ({family.emergency_contact_B_phone})"
|
||||
family_set.append(family_context)
|
||||
return family_set
|
||||
|
||||
|
||||
def send_sms(send_sms_mixin, request, queryset):
|
||||
if not settings.TELSTRA_AUTH:
|
||||
send_sms_mixin.message_user(request, "Telstra auth not configured", level="WARNING")
|
||||
return HttpResponseRedirect(request.get_full_path())
|
||||
|
||||
if 'send' in request.POST:
|
||||
numbers = _get_numbers(request, queryset)
|
||||
if len(numbers) > 500:
|
||||
send_sms_mixin.message_user(request, f"SMS failed. Total phone numbers ({len(numbers)}) exceeds 500", level="WARNING")
|
||||
return HttpResponseRedirect(request.get_full_path())
|
||||
if len(numbers) == 0:
|
||||
send_sms_mixin.message_user(request, f"SMS failed. No numbers we selected", level="WARNING")
|
||||
return HttpResponseRedirect(request.get_full_path())
|
||||
result = _send_message(numbers, request.POST["message"])
|
||||
send_sms_mixin.message_user(request, f"SMS has been sent to {len(numbers)} recipients")
|
||||
return HttpResponseRedirect(request.get_full_path())
|
||||
|
||||
form = SMSForm(initial={'_selected_action': queryset.values_list('id', flat=True)})
|
||||
|
||||
family_set = _family_context(queryset)
|
||||
|
||||
return render(request, 'admin/sms_form.html', context={'form': form, 'items': family_set})
|
||||
|
||||
|
||||
class SMSTestForm(forms.Form):
|
||||
phone_number = forms.CharField(label="Phone number", max_length=20)
|
||||
message = forms.CharField(label="Message", max_length=160, widget=forms.Textarea)
|
||||
|
||||
|
||||
def send_sms_test(request):
|
||||
|
||||
if not settings.TELSTRA_AUTH:
|
||||
return None
|
||||
|
||||
if request.method == "POST":
|
||||
_send_message(request.POST["phone_number"], request.POST["message"])
|
||||
return None
|
||||
@@ -0,0 +1,3 @@
|
||||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
||||
@@ -0,0 +1,3 @@
|
||||
from django.shortcuts import render
|
||||
|
||||
# Create your views here.
|
||||
@@ -0,0 +1,24 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
import django
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
# Setup Django environment
|
||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "busManager.settings")
|
||||
django.setup()
|
||||
|
||||
from coord.models import Suburb as OldSuburb
|
||||
from common.models import Suburb as NewSuburb
|
||||
|
||||
for old in OldSuburb.objects.all():
|
||||
NewSuburb.objects.create(
|
||||
id=old.id, # preserve PK
|
||||
name=old.name,
|
||||
state=old.state,
|
||||
postcode=old.postcode,
|
||||
distance=old.distance
|
||||
)
|
||||
|
||||
print(f"Copied {OldSuburb.objects.count()} suburbs to common_suburb")
|
||||
@@ -1,3 +1,79 @@
|
||||
from django.contrib import admin
|
||||
from django.urls import reverse
|
||||
from django.utils.html import format_html
|
||||
from django.utils.http import urlencode
|
||||
|
||||
# Register your models here.
|
||||
from common.admin import MyImportExportModelAdmin
|
||||
from transport.admin_mixins import BusRollMixin, ShuttleRollMixin
|
||||
from transport.models import Driver, BusStop, Company, Bus, Shuttle
|
||||
|
||||
|
||||
@admin.register(Company)
|
||||
class CompanyAdmin(MyImportExportModelAdmin, admin.ModelAdmin):
|
||||
list_display = ["name", "contact_name", "email", "buses"]
|
||||
|
||||
def buses(self, obj):
|
||||
count = obj.bus_set.count()
|
||||
url = (
|
||||
reverse("admin:coord_bus_changelist")
|
||||
+ "?"
|
||||
+ urlencode({"company__id__exact": f"{obj.id}"})
|
||||
)
|
||||
return format_html('<a href="{}">{} Buses</a>', url, count)
|
||||
|
||||
def email(self, obj):
|
||||
return format_html('<a href="mailto:{}">{}</a>', obj.contact_email, obj.contact_email)
|
||||
|
||||
|
||||
class DriverInline(admin.StackedInline):
|
||||
model = Driver
|
||||
extra = 0
|
||||
|
||||
|
||||
class BusStopInline(admin.TabularInline):
|
||||
model = BusStop
|
||||
extra = 0
|
||||
ordering = ("am_time",)
|
||||
|
||||
|
||||
@admin.register(Bus)
|
||||
class BusesAdmin(MyImportExportModelAdmin, admin.ModelAdmin, BusRollMixin):
|
||||
list_filter = ["company"]
|
||||
list_display = ["route_name", "company", "contract_number", "seating_capacity", "route_travellers"]
|
||||
readonly_fields = ["traveller_count"]
|
||||
actions = ["show_bus_roll", "show_bus_roll_on_date", "show_emergency_contacts", "sms_traveller_contacts", "email_bus_roll", "email_emergency_contacts"]
|
||||
inlines = [DriverInline, BusStopInline]
|
||||
fieldsets = [
|
||||
(None, {'fields': [
|
||||
"company", "route_name", "contract_number", "registration",
|
||||
"traveller_count", "seating_capacity", "make", "model", "notes"
|
||||
]})
|
||||
]
|
||||
|
||||
def route_travellers(self, obj):
|
||||
url = (
|
||||
reverse("admin:coord_traveller_changelist")
|
||||
+ "?"
|
||||
+ urlencode({"bus_stops__bus__id__exact": f"{obj.id}"})
|
||||
)
|
||||
return format_html('<a href="{}">{} Travellers</a>', url, obj.traveller_count())
|
||||
|
||||
|
||||
# @admin.register(BusStop)
|
||||
class BusStopAdmin(MyImportExportModelAdmin, admin.ModelAdmin):
|
||||
list_filter = ["bus__company", "bus__route_name"]
|
||||
list_display = ["__str__", "am_time", "pm_time", "address"]
|
||||
search_fields = ["bus__route_name", "address"]
|
||||
|
||||
@admin.register(Shuttle)
|
||||
class ShuttleAdmin(MyImportExportModelAdmin, admin.ModelAdmin, ShuttleRollMixin):
|
||||
list_display = ["__str__", "school", "bus", "shuttle_travellers"]
|
||||
actions = ["show_shuttle_roll", "email_shuttle_roll"]
|
||||
|
||||
def shuttle_travellers(self, obj):
|
||||
url = (
|
||||
reverse("admin:coord_traveller_changelist")
|
||||
+ "?"
|
||||
+ urlencode({"shuttle__id__exact": f"{obj.id}"})
|
||||
)
|
||||
return format_html('<a href="{}">{} Travellers</a>', url, obj.traveller_count())
|
||||
@@ -0,0 +1,54 @@
|
||||
from common.documents import render_to_pdf
|
||||
from messaging.services.email import email_companies_bus_roll, email_companies_emergency_contacts, \
|
||||
email_school_shuttle_roll
|
||||
from messaging.services.sms import send_sms
|
||||
from transport.context_busroll import bus_roll_context
|
||||
from transport.context_helpers import emergency_contacts_context
|
||||
from transport.forms import roll_date_selector
|
||||
from traveller.models import Traveller
|
||||
|
||||
|
||||
class BusRollMixin:
|
||||
|
||||
def show_bus_roll(self, request, queryset):
|
||||
return render_to_pdf('reports/bus_roll.html', bus_roll_context(queryset))
|
||||
|
||||
def show_bus_roll_on_date(self, request, queryset):
|
||||
return roll_date_selector(self, request, queryset)
|
||||
|
||||
def show_emergency_contacts(self, request, queryset):
|
||||
return render_to_pdf('reports/emergency_contacts.html', emergency_contacts_context(queryset))
|
||||
|
||||
def sms_traveller_contacts(self, request, queryset):
|
||||
travellers = None
|
||||
for bus in queryset:
|
||||
query = Traveller.objects.filter(bus_stops__bus=bus).filter(is_active=True).distinct()
|
||||
if travellers is None:
|
||||
travellers = query
|
||||
else:
|
||||
travellers.union(query)
|
||||
return send_sms(self, request, travellers)
|
||||
|
||||
def email_bus_roll(self, request, queryset):
|
||||
return email_companies_bus_roll(request, queryset)
|
||||
|
||||
def email_emergency_contacts(self, request, queryset):
|
||||
return email_companies_emergency_contacts(request, queryset)
|
||||
|
||||
email_bus_roll.short_description = "Email Bus Roll to Company"
|
||||
email_emergency_contacts.short_description = "Email Emergency Contacts to Company"
|
||||
|
||||
class ShuttleRollMixin:
|
||||
|
||||
def show_shuttle_roll(self, request, queryset):
|
||||
if queryset is None:
|
||||
buses = None
|
||||
else:
|
||||
buses = []
|
||||
for shuttle in queryset:
|
||||
if shuttle.bus not in buses:
|
||||
buses.append(shuttle.bus)
|
||||
return render_to_pdf('reports/bus_roll.html', bus_roll_context(buses, include_bus_stops=False))
|
||||
|
||||
def email_shuttle_roll(self, request, queryset):
|
||||
return email_school_shuttle_roll(request, queryset)
|
||||
@@ -0,0 +1,73 @@
|
||||
import datetime
|
||||
|
||||
from transport.models import Bus, Shuttle, BusStop
|
||||
from traveller.models import TravellerRoute, Traveller
|
||||
|
||||
|
||||
def route_paged_context(bus, date=None):
|
||||
table_header_size = 5
|
||||
page_max_size = 45
|
||||
page_size = 3 # Account for traveller numbers at the top of the first page
|
||||
route_stops = []
|
||||
for bus_stop in BusStop.objects.filter(bus=bus):
|
||||
traveller_routes = TravellerRoute.objects.filter(busStop=bus_stop)
|
||||
traveller_list = []
|
||||
for trav_route in traveller_routes:
|
||||
traveller = trav_route.traveller
|
||||
if not traveller._is_active(date):
|
||||
continue
|
||||
is_fared = "---"
|
||||
if traveller.eligibility_status == "2":
|
||||
is_fared = "Y"
|
||||
traveller_list.append({
|
||||
'display': f"{traveller} ({traveller.get_year_level_display()}, {traveller.school.shortName})",
|
||||
'isFared': is_fared
|
||||
})
|
||||
|
||||
stop_size = len(traveller_list)
|
||||
page_break = False
|
||||
page_size += table_header_size + stop_size
|
||||
if page_size > page_max_size:
|
||||
if len(route_stops) > 0: # Don't break the page if it's the first stop
|
||||
page_break = True
|
||||
page_size = table_header_size + stop_size
|
||||
|
||||
route_stops.append({
|
||||
'stop_num': bus_stop.get_stop_number(),
|
||||
'name': bus_stop.address,
|
||||
'am': bus_stop.am_time,
|
||||
'pm': bus_stop.pm_time,
|
||||
'travellers': traveller_list,
|
||||
'page_break': page_break
|
||||
})
|
||||
return route_stops
|
||||
|
||||
def shuttle_route_context(shuttle, date=None):
|
||||
shuttle_travellers = []
|
||||
for traveller in Traveller.objects.filter(shuttle=shuttle):
|
||||
if traveller._is_active(date):
|
||||
shuttle_travellers.append({
|
||||
'display': f"{traveller} ({traveller.get_year_level_display()}, {traveller.school})",
|
||||
})
|
||||
return {'shuttle': shuttle, 'shuttle_travellers': shuttle_travellers, 'traveller_count': shuttle.traveller_count(date)}
|
||||
|
||||
def bus_roll_context(queryset=None, include_bus_stops=True, date=None):
|
||||
bus_routes = []
|
||||
if queryset is None:
|
||||
buses = Bus.objects.all()
|
||||
else:
|
||||
buses = queryset
|
||||
|
||||
for bus in buses:
|
||||
route_stops = []
|
||||
if include_bus_stops:
|
||||
route_stops = route_paged_context(bus=bus, date=date)
|
||||
|
||||
shuttle_routes = []
|
||||
for shuttle in Shuttle.objects.filter(bus=bus):
|
||||
shuttle_routes.append(shuttle_route_context(shuttle, date))
|
||||
|
||||
bus_routes.append({'name': bus.route_name, 'traveller_count': bus.traveller_count(date), 'seating_capacity': bus.seating_capacity, 'bus': bus, 'route_stops': route_stops, 'shuttle_routes': shuttle_routes})
|
||||
if date is None:
|
||||
date = datetime.date.today()
|
||||
return {'routes': bus_routes, 'date': date.strftime('%Y-%m-%d')}
|
||||
@@ -0,0 +1,88 @@
|
||||
from transport.models import Bus, Driver, BusStop, Shuttle
|
||||
from traveller.models import TravellerRoute, Traveller
|
||||
|
||||
|
||||
def bus_summary_context():
|
||||
bus_routes = []
|
||||
for bus in Bus.objects.all():
|
||||
|
||||
drivers = []
|
||||
for driver in Driver.objects.filter(bus=bus):
|
||||
drivers.append(driver)
|
||||
|
||||
stops = []
|
||||
for bus_stop in BusStop.objects.filter(bus=bus):
|
||||
stops.append(bus_stop)
|
||||
|
||||
traveller_count = 0
|
||||
for travellerRoute in TravellerRoute.objects.filter(busStop__bus=bus):
|
||||
if travellerRoute.traveller._is_active():
|
||||
traveller_count += 1
|
||||
|
||||
shuttle_name = ""
|
||||
shuttle_count = 0
|
||||
for shuttle in Shuttle.objects.filter(bus=bus):
|
||||
if shuttle_name == "":
|
||||
shuttle_name = shuttle.school.shortName
|
||||
else:
|
||||
shuttle_name += f", {shuttle.school.shortName}"
|
||||
|
||||
for traveller in Traveller.objects.filter(shuttle=shuttle):
|
||||
if traveller._is_active():
|
||||
shuttle_count += 1
|
||||
|
||||
over_capacity = traveller_count > bus.seating_capacity or shuttle_count > bus.seating_capacity
|
||||
if shuttle_count == 0:
|
||||
shuttle_count = ""
|
||||
|
||||
bus_routes.append({
|
||||
'bus': bus,
|
||||
'drivers': drivers,
|
||||
'stops': stops,
|
||||
'traveller_count': traveller_count,
|
||||
'shuttle_name': shuttle_name,
|
||||
'shuttle_count': shuttle_count,
|
||||
'over_capacity': over_capacity,
|
||||
})
|
||||
return {'routes': bus_routes}
|
||||
|
||||
def emergency_contacts_context(queryset=None):
|
||||
if queryset is None:
|
||||
buses = Bus.objects.all()
|
||||
else:
|
||||
buses = queryset
|
||||
|
||||
bus_routes = []
|
||||
for bus in buses:
|
||||
drivers = []
|
||||
for driver in Driver.objects.filter(bus=bus):
|
||||
drivers.append(driver)
|
||||
traveller_list = []
|
||||
for travellerRoute in TravellerRoute.objects.filter(busStop__bus=bus):
|
||||
traveller = travellerRoute.traveller
|
||||
if not traveller._is_active():
|
||||
continue
|
||||
for family in traveller.get_families():
|
||||
parent_a = ""
|
||||
if family.parent_A_firstname:
|
||||
parent_a = f"{family.parent_A_firstname} {family.parent_A_lastname} ({family.parent_A_phone})"
|
||||
parent_b = ""
|
||||
if family.parent_B_firstname:
|
||||
parent_b = f"{family.parent_B_firstname} {family.parent_B_lastname} ({family.parent_B_phone})"
|
||||
contact_a = ""
|
||||
if family.emergency_contact_A_firstname:
|
||||
contact_a = f"{family.emergency_contact_A_firstname} {family.emergency_contact_A_lastname} ({family.emergency_contact_A_phone})"
|
||||
contact_b = ""
|
||||
if family.emergency_contact_B_firstname:
|
||||
contact_b = f"{family.emergency_contact_B_firstname} {family.emergency_contact_B_lastname} ({family.emergency_contact_B_phone})"
|
||||
traveller_list.append({
|
||||
'traveller': traveller,
|
||||
'parent_a': parent_a,
|
||||
'parent_b': parent_b,
|
||||
'contact_a': contact_a,
|
||||
'contact_b': contact_b,
|
||||
'note': travellerRoute.notes
|
||||
})
|
||||
bus_routes.append({'bus': bus, 'drivers': drivers, 'travellers': traveller_list})
|
||||
|
||||
return {'routes': bus_routes}
|
||||
@@ -0,0 +1,23 @@
|
||||
from datetime import datetime
|
||||
|
||||
from django import forms
|
||||
from django.shortcuts import render
|
||||
|
||||
from common.documents import render_to_pdf
|
||||
from transport.context_busroll import bus_roll_context
|
||||
|
||||
|
||||
class RollDateSelector(forms.Form):
|
||||
_selected_action = forms.CharField(widget=forms.MultipleHiddenInput)
|
||||
|
||||
|
||||
def roll_date_selector(mixin, request, queryset):
|
||||
if 'generate' in request.POST:
|
||||
date = request.POST.get("date")
|
||||
if date:
|
||||
date = datetime.strptime(date, '%Y-%m-%d')
|
||||
else:
|
||||
date = None
|
||||
return render_to_pdf('reports/bus_roll.html', bus_roll_context(queryset, date=date))
|
||||
form = RollDateSelector(initial={'_selected_action': queryset.values_list('id', flat=True)})
|
||||
return render(request, 'admin/date_selector.html', context={'form': form})
|
||||
@@ -1,3 +1,104 @@
|
||||
from django.db import models
|
||||
|
||||
# Create your models here.
|
||||
from common.models import Suburb
|
||||
|
||||
|
||||
class Company(models.Model):
|
||||
name = models.CharField(max_length=50, unique=True)
|
||||
contact_name = models.CharField(max_length=50, blank=True)
|
||||
contact_number = models.CharField(max_length=15, blank=True)
|
||||
contact_mobile = models.CharField(max_length=15, blank=True)
|
||||
contact_email = models.CharField(max_length=50, blank=True)
|
||||
address = models.CharField(max_length=50, blank=True)
|
||||
suburb = models.ForeignKey(Suburb, on_delete=models.CASCADE)
|
||||
notes = models.TextField(blank=True)
|
||||
|
||||
class Meta:
|
||||
verbose_name_plural = "Companies"
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}"
|
||||
|
||||
|
||||
class Bus(models.Model):
|
||||
company = models.ForeignKey(Company, on_delete=models.CASCADE)
|
||||
route_name = models.CharField(max_length=50, unique=True)
|
||||
contract_number = models.CharField(max_length=20, blank=True)
|
||||
registration = models.CharField(max_length=10, blank=True)
|
||||
seating_capacity = models.SmallIntegerField()
|
||||
make = models.CharField(max_length=15, blank=True)
|
||||
model = models.CharField(max_length=15, blank=True)
|
||||
notes = models.TextField(blank=True)
|
||||
|
||||
class Meta:
|
||||
verbose_name_plural = "Buses"
|
||||
ordering = ["route_name"]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.route_name}"
|
||||
|
||||
def traveller_count(self, date=None):
|
||||
count = 0
|
||||
from traveller.models import Traveller
|
||||
for traveller in Traveller.objects.filter(bus_stops__bus=self):
|
||||
if traveller._is_active(date):
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
class Shuttle(models.Model):
|
||||
bus = models.ForeignKey(Bus, on_delete=models.CASCADE)
|
||||
school = models.ForeignKey("traveller.School", on_delete=models.CASCADE)
|
||||
custom_name = models.CharField(max_length=10, blank=True)
|
||||
# transfer_school = models.ForeignKey(School, related_name='transfer_school', on_delete=models.CASCADE)
|
||||
am_service = models.BooleanField(default=True)
|
||||
pm_service = models.BooleanField(default=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ["school__name"]
|
||||
|
||||
def __str__(self):
|
||||
custom_name = self.custom_name
|
||||
if custom_name:
|
||||
custom_name = f" ({self.custom_name})"
|
||||
else:
|
||||
custom_name = ""
|
||||
return f"{self.school.shortName} <-> {self.bus.route_name}{custom_name}"
|
||||
|
||||
def traveller_count(self, date=None):
|
||||
count = 0
|
||||
from traveller.models import Traveller
|
||||
for traveller in Traveller.objects.filter(shuttle=self):
|
||||
if traveller._is_active(date):
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
class Driver(models.Model):
|
||||
bus = models.ForeignKey(Bus, on_delete=models.CASCADE)
|
||||
first_name = models.CharField(max_length=50)
|
||||
last_name = models.CharField(max_length=50)
|
||||
phone_number = models.CharField(max_length=15, blank=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ["last_name"]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.first_name} {self.last_name}"
|
||||
|
||||
|
||||
class BusStop(models.Model):
|
||||
bus = models.ForeignKey(Bus, on_delete=models.CASCADE)
|
||||
am_time = models.TimeField()
|
||||
pm_time = models.TimeField()
|
||||
address = models.CharField(max_length=100)
|
||||
notes = models.TextField(blank=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ["bus__route_name", "am_time"]
|
||||
|
||||
def get_stop_number(self):
|
||||
return BusStop.objects.filter(bus=self.bus, am_time__lt=self.am_time).count() + 1
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.bus.route_name} #{self.get_stop_number()} - {self.address}"
|
||||
@@ -1,3 +1,20 @@
|
||||
from django.contrib.admin.views.decorators import staff_member_required
|
||||
from django.shortcuts import render
|
||||
|
||||
# Create your views here.
|
||||
from common.documents import render_to_pdf
|
||||
from transport.context_busroll import bus_roll_context
|
||||
from transport.context_helpers import bus_summary_context, emergency_contacts_context
|
||||
|
||||
|
||||
@staff_member_required
|
||||
def bus_summary(request):
|
||||
return render(request, 'reports/bus_summary.html', bus_summary_context())
|
||||
|
||||
@staff_member_required
|
||||
def emergency_contacts(request):
|
||||
return render_to_pdf('reports/emergency_contacts.html', emergency_contacts_context())
|
||||
|
||||
|
||||
@staff_member_required
|
||||
def bus_roll(request):
|
||||
return render_to_pdf('reports/bus_roll.html', bus_roll_context())
|
||||
@@ -1,3 +1,111 @@
|
||||
from django.contrib import admin
|
||||
from django.utils.html import format_html
|
||||
from rangefilter.filters import DateRangeFilterBuilder
|
||||
|
||||
# Register your models here.
|
||||
from common.admin import MyImportExportModelAdmin
|
||||
from traveller.adminClone import CloneModelAdmin
|
||||
from traveller.admin_mixins import SchoolRollMixin, TravellerRollMixin
|
||||
from traveller.models import Family, TravellerRoute, Traveller, School
|
||||
|
||||
|
||||
class FamilyInline(admin.StackedInline):
|
||||
model = Family
|
||||
classes = ['collapse']
|
||||
extra = 0
|
||||
clone_parent = "traveller"
|
||||
|
||||
|
||||
class TravellerRouteInline(admin.TabularInline):
|
||||
model = TravellerRoute
|
||||
extra = 0
|
||||
clone_parent = "traveller"
|
||||
|
||||
|
||||
@admin.register(Traveller)
|
||||
class TravellerAdmin(MyImportExportModelAdmin, CloneModelAdmin, TravellerRollMixin):
|
||||
list_display = ["first_name", "last_name", "school", "year_level", "is_active", "address", "stop_route", "shuttle", "travel_start_date", "travel_end_date"]
|
||||
list_filter = [
|
||||
"is_active", "school", "year_level", "eligibility_status", "bus_stops__bus", "shuttle",
|
||||
("travel_start_date", DateRangeFilterBuilder(
|
||||
title="Start date"
|
||||
)),
|
||||
("travel_end_date", DateRangeFilterBuilder(
|
||||
title="End date"
|
||||
))
|
||||
]
|
||||
cloneable_fields = ["last_name"]
|
||||
search_fields = ["first_name", "last_name", "address"]
|
||||
inlines = [FamilyInline, TravellerRouteInline]
|
||||
readonly_fields = ["travel_start_date", "travel_end_date", "created_on", "last_edit", "is_active", "address"]
|
||||
actions = ["export_to_csv", "send_sms", "confirmation_letter", "letter_creator"]
|
||||
fieldsets = [
|
||||
(None, {
|
||||
'fields': [
|
||||
"is_active",
|
||||
"school",
|
||||
"first_name",
|
||||
"last_name",
|
||||
"year_level",
|
||||
"dob",
|
||||
"address",
|
||||
]
|
||||
}),
|
||||
('Office Use', {
|
||||
'classes': ('collapse',),
|
||||
'fields': [
|
||||
"distance_to_school",
|
||||
"travel_start_date",
|
||||
"travel_end_date",
|
||||
"eligibility_status",
|
||||
"term_1_paid",
|
||||
"term_2_paid",
|
||||
"term_3_paid",
|
||||
"term_4_paid",
|
||||
"assessment_date",
|
||||
"application_form_completed",
|
||||
"parent_notified",
|
||||
"seat_number",
|
||||
"created_on",
|
||||
"last_edit",
|
||||
]
|
||||
}),
|
||||
(None, {'fields': ["notes", "shuttle"]})
|
||||
]
|
||||
# list_display_links = None
|
||||
|
||||
def stop_route(self, obj):
|
||||
from transport.models import BusStop
|
||||
stops = BusStop.objects.filter(traveller__id__exact=obj.id)
|
||||
if stops.count() == 0:
|
||||
return ""
|
||||
if stops.count() == 1:
|
||||
return stops.first()
|
||||
return "Multiple"
|
||||
|
||||
# def save_model(self, request, obj, form, change):
|
||||
# if obj.is_archived and obj.travel_end_date is None:
|
||||
# obj.is_archived = False
|
||||
# super().save_model(request, obj, form, change)
|
||||
|
||||
|
||||
# @admin.register(Family)
|
||||
class FamilyAdmin(MyImportExportModelAdmin, admin.ModelAdmin):
|
||||
list_display = ["traveller", "__str__",
|
||||
"parent_A_firstname", "parent_A_lastname", "parent_A_phone",
|
||||
"parent_B_firstname", "parent_B_lastname", "parent_B_phone",
|
||||
"emergency_contact_A_firstname", "emergency_contact_A_lastname", "emergency_contact_A_phone",
|
||||
"emergency_contact_B_firstname", "emergency_contact_B_lastname", "emergency_contact_B_phone"]
|
||||
|
||||
|
||||
# @admin.register(TravellerRoute)
|
||||
class TravellerRouteAdmin(MyImportExportModelAdmin, admin.ModelAdmin):
|
||||
list_display = ["traveller", "busStop"]
|
||||
|
||||
|
||||
@admin.register(School)
|
||||
class SchoolAdmin(MyImportExportModelAdmin, admin.ModelAdmin, SchoolRollMixin):
|
||||
list_display = ["__str__", "address", "suburb", "school_email", "phone"]
|
||||
actions = ["email_travellers_to_school", "show_school_travellers", "export_travellers_to_csv"]
|
||||
|
||||
def school_email(self, obj):
|
||||
return format_html('<a href="mailto:{}">{}</a>', obj.email, obj.email)
|
||||
@@ -0,0 +1,161 @@
|
||||
from functools import update_wrapper
|
||||
from django.contrib.admin import ModelAdmin, helpers
|
||||
from django.contrib.admin.exceptions import DisallowedModelAdminToField
|
||||
from django.contrib.admin.options import TO_FIELD_VAR
|
||||
from django.contrib.admin.utils import flatten_fieldsets, unquote
|
||||
from django.shortcuts import redirect
|
||||
from django.urls import reverse, path
|
||||
from django.core.exceptions import PermissionDenied, FieldDoesNotExist
|
||||
|
||||
|
||||
class CloneModelAdmin(ModelAdmin):
|
||||
clone_verbose_name = "Clone"
|
||||
change_form_template = 'admin/admin_change_form.html'
|
||||
cloneable_fields = []
|
||||
|
||||
def get_urls(self):
|
||||
|
||||
# Not certain what this wrap() function is exactly. Just copied it from the django admin get_urls function
|
||||
def wrap(view):
|
||||
def wrapper(*args, **kwargs):
|
||||
return self.admin_site.admin_view(view)(*args, **kwargs)
|
||||
|
||||
wrapper.model_admin = self
|
||||
return update_wrapper(wrapper, view)
|
||||
|
||||
info = self.opts.app_label, self.opts.model_name
|
||||
|
||||
new_urlpatterns = [
|
||||
path(
|
||||
"<path:object_id>/clone/",
|
||||
wrap(self.clone_view),
|
||||
name="%s_%s_clone" % info,
|
||||
),
|
||||
]
|
||||
|
||||
original_urlpatterns = super(CloneModelAdmin, self).get_urls()
|
||||
|
||||
# Important to add custom urls before the existing ones.
|
||||
# Last entry is <URLPattern '<path:object_id>/'> which will catch everything not already picked up
|
||||
return new_urlpatterns + original_urlpatterns
|
||||
|
||||
def change_view(self, request, object_id, form_url='', extra_context=None):
|
||||
url = reverse("admin:{0}_{1}_clone".format(self.opts.app_label, self.opts.model_name), args=[object_id])
|
||||
|
||||
extra_context = extra_context or {}
|
||||
extra_context.update({
|
||||
'clone_verbose_name': self.clone_verbose_name,
|
||||
'clone_link': url,
|
||||
})
|
||||
return super(CloneModelAdmin, self).change_view(request, object_id, form_url, extra_context)
|
||||
|
||||
def clone_view(self, request, object_id, form_url='', extra_context=None):
|
||||
to_field = request.POST.get(TO_FIELD_VAR, request.GET.get(TO_FIELD_VAR))
|
||||
if to_field:
|
||||
print("To field in clone_view. What is this? ",to_field)
|
||||
if to_field and not self.to_field_allowed(request, to_field):
|
||||
raise DisallowedModelAdminToField(
|
||||
"The field %s cannot be referenced." % to_field
|
||||
)
|
||||
|
||||
if not self.has_add_permission(request):
|
||||
raise PermissionDenied
|
||||
original_obj = self.get_object(request, unquote(object_id), to_field)
|
||||
if original_obj is None:
|
||||
return self._get_obj_does_not_exist_redirect(request, self.opts, object_id)
|
||||
if not self.has_view_or_change_permission(request, original_obj):
|
||||
raise PermissionDenied
|
||||
|
||||
fieldsets = self.get_fieldsets(request, original_obj)
|
||||
model_form = self.get_form(request, original_obj, change=False, fields=flatten_fieldsets(fieldsets))
|
||||
|
||||
if request.method == "POST":
|
||||
form = model_form(request.POST)
|
||||
formsets, inline_instances = self._create_formsets(
|
||||
request,
|
||||
original_obj,
|
||||
change=False,
|
||||
)
|
||||
|
||||
form_validated = form.is_valid()
|
||||
if form_validated:
|
||||
new_object = self.save_form(request, form, change=False)
|
||||
self.save_model(request, new_object, form, False)
|
||||
self.clone_related(request, original_obj, new_object)
|
||||
return redirect(reverse("admin:{0}_{1}_change".format(self.opts.app_label, self.opts.model_name), args=[new_object.pk]))
|
||||
else:
|
||||
new_obj = self.model()
|
||||
for field in self.cloneable_fields:
|
||||
setattr(new_obj, field, getattr(original_obj, field))
|
||||
form = model_form(instance=new_obj)
|
||||
formsets, inline_instances = self._create_formsets(request, original_obj, change=False)
|
||||
|
||||
admin_form = helpers.AdminForm(
|
||||
form,
|
||||
list(self.get_fieldsets(request)),
|
||||
self.get_prepopulated_fields(request),
|
||||
self.readonly_fields,
|
||||
model_admin=self
|
||||
)
|
||||
media = self.media
|
||||
|
||||
inline_formsets = self.get_inline_formsets(request, formsets, inline_instances)
|
||||
for inline_formset in inline_formsets:
|
||||
media += inline_formset.media
|
||||
|
||||
title = u'{0} {1}'.format(self.clone_verbose_name, original_obj)
|
||||
|
||||
context = {
|
||||
**self.admin_site.each_context(request),
|
||||
"title": title,
|
||||
"original": title,
|
||||
"adminform": admin_form,
|
||||
"is_popup": "_popup" in request.POST or "_popup" in request.GET,
|
||||
"show_delete": False,
|
||||
"media": media,
|
||||
"inline_admin_formsets": inline_formsets,
|
||||
"errors": helpers.AdminErrorList(form, formsets),
|
||||
**(extra_context or {}),
|
||||
}
|
||||
|
||||
context.update(extra_context or {})
|
||||
|
||||
return self.render_change_form(
|
||||
request,
|
||||
context,
|
||||
form_url=form_url,
|
||||
change=False,
|
||||
add=True
|
||||
)
|
||||
|
||||
def clone_related(self, request, original_obj, new_obj):
|
||||
for inline in self.inlines:
|
||||
if not inline.clone_parent:
|
||||
continue
|
||||
try:
|
||||
inline.model._meta.get_field(inline.clone_parent)
|
||||
except FieldDoesNotExist:
|
||||
continue
|
||||
inline_objects = inline.model.objects.filter(**{inline.clone_parent: original_obj})
|
||||
for inline_object in inline_objects:
|
||||
inline_object.pk = None
|
||||
setattr(inline_object, inline.clone_parent, new_obj)
|
||||
inline_object.save()
|
||||
|
||||
|
||||
class InlineAdminFormSetFakeOriginal(helpers.InlineAdminFormSet):
|
||||
|
||||
def __iter__(self):
|
||||
# the template requires the AdminInlineForm to have an `original`
|
||||
# attribute, which is the model instance, in order to display the
|
||||
# 'Delete' checkbox
|
||||
# we don't have `original` because we are just providing initial
|
||||
# data to the form, so we attach a "fake original" (something that
|
||||
# evaluates to True) to fool the template and make is display
|
||||
# the 'Delete' checkbox
|
||||
# needless to say this is a terrible hack and will break in future
|
||||
# django versions :)
|
||||
for inline_form in super(InlineAdminFormSetFakeOriginal, self).__iter__():
|
||||
if inline_form.form.initial:
|
||||
inline_form.original = True
|
||||
yield inline_form
|
||||
@@ -0,0 +1,60 @@
|
||||
import csv
|
||||
from datetime import date
|
||||
|
||||
from django.http import HttpResponse
|
||||
|
||||
from common.documents import render_to_pdf
|
||||
from messaging.services.email import email_school_roll
|
||||
from messaging.services.sms import send_sms
|
||||
from traveller.context_helpers import school_roll_context, traveller_route_context, confirmation_letter_context, \
|
||||
traveller_roll_context
|
||||
from traveller.models import TravellerRoute
|
||||
|
||||
|
||||
class SchoolRollMixin:
|
||||
|
||||
def email_travellers_to_school(self, request, queryset):
|
||||
return email_school_roll(request, queryset)
|
||||
|
||||
def show_school_travellers(self, request, queryset):
|
||||
return render_to_pdf('reports/school_roll.html', school_roll_context(queryset))
|
||||
|
||||
def show_school_travellers_on_date(self, request, queryset):
|
||||
return render_to_pdf('reports/school_roll.html', school_roll_context(queryset))
|
||||
|
||||
def export_travellers_to_csv(self, request, queryset):
|
||||
traveller_list = []
|
||||
for school in queryset:
|
||||
for travellerRoute in TravellerRoute.objects.filter(traveller__school=school):
|
||||
if not travellerRoute.traveller._is_active():
|
||||
continue
|
||||
traveller_list.append(traveller_route_context(travellerRoute))
|
||||
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = f"attachment; filename=traveller_list_{date.today()}.csv"
|
||||
|
||||
writer = csv.DictWriter(response, fieldnames=traveller_list[0].keys())
|
||||
writer.writeheader()
|
||||
writer.writerows(traveller_list)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
class TravellerRollMixin:
|
||||
|
||||
def confirmation_letter(self, request, queryset):
|
||||
return render_to_pdf('mail/confirmation_letter.html', confirmation_letter_context(queryset))
|
||||
|
||||
def send_sms(self, request, queryset):
|
||||
return send_sms(self, request, queryset)
|
||||
|
||||
def export_to_csv(self, request, queryset):
|
||||
traveller_list = traveller_roll_context(queryset)
|
||||
response = HttpResponse(content_type="text/csv")
|
||||
response["Content-Disposition"] = f"attachment; filename=traveller_list_{date.today()}.csv"
|
||||
|
||||
writer = csv.DictWriter(response, fieldnames=traveller_list[0].keys())
|
||||
writer.writeheader()
|
||||
writer.writerows(traveller_list)
|
||||
|
||||
return response
|
||||
@@ -0,0 +1,90 @@
|
||||
from django.db.models import Q
|
||||
|
||||
from transport.models import Bus
|
||||
from traveller.models import TravellerRoute
|
||||
|
||||
|
||||
def school_roll_context(queryset, date=None):
|
||||
school_list = []
|
||||
|
||||
for school in queryset:
|
||||
school_routes = []
|
||||
query = Q(traveller__school=school) | Q(traveller__shuttle__school=school)
|
||||
for bus in Bus.objects.all():
|
||||
travellers = []
|
||||
for trav_route in TravellerRoute.objects.filter(query).filter(busStop__bus=bus).order_by('busStop__am_time'):
|
||||
traveller = trav_route.traveller
|
||||
if not traveller._is_active(date):
|
||||
continue
|
||||
bus_stop = trav_route.busStop
|
||||
|
||||
display_name = f"{traveller} ({traveller.get_year_level_display()})"
|
||||
if traveller.school != school:
|
||||
display_name = f"{traveller} ({traveller.get_year_level_display()}, {traveller.school.shortName})"
|
||||
is_fared = "---"
|
||||
if traveller.eligibility_status == "2":
|
||||
is_fared = "Y"
|
||||
shuttle_name = " "
|
||||
if traveller.shuttle:
|
||||
shuttle_name = traveller.shuttle.bus
|
||||
travellers.append({
|
||||
'display': display_name,
|
||||
'isFared': is_fared,
|
||||
'shuttle': shuttle_name,
|
||||
'stop': f"#{bus_stop.get_stop_number()} - {bus_stop.address}",
|
||||
'am_time': bus_stop.am_time,
|
||||
'pm_time': bus_stop.pm_time
|
||||
})
|
||||
if travellers:
|
||||
school_routes.append({
|
||||
'bus': bus,
|
||||
'travellers': travellers
|
||||
})
|
||||
|
||||
school_list.append({"name": school.name, "routes": school_routes})
|
||||
return {"schools": school_list}
|
||||
|
||||
def traveller_route_context(traveller_route):
|
||||
traveller = traveller_route.traveller
|
||||
bus_stop = traveller_route.busStop
|
||||
families = traveller.get_families()
|
||||
address = ""
|
||||
for family in families:
|
||||
if address:
|
||||
address += ";"
|
||||
address = address + f"{family.residential_address} {family.residential_suburb}"
|
||||
return {
|
||||
'first_name': traveller.first_name,
|
||||
'last_name': traveller.last_name,
|
||||
'active': traveller.is_active,
|
||||
'school': traveller.school,
|
||||
'dob': traveller.dob,
|
||||
'year_level': traveller.year_level,
|
||||
'address': address,
|
||||
'start_date': traveller.travel_start_date,
|
||||
'end_date': traveller.travel_end_date,
|
||||
'eligibility': traveller.get_eligibility_status_display(),
|
||||
'shuttle': traveller.shuttle,
|
||||
'route': traveller_route.busStop.bus,
|
||||
'stop': f"#{bus_stop.get_stop_number()} - {bus_stop.address}",
|
||||
'pickup': bus_stop.am_time,
|
||||
'drop-off': bus_stop.pm_time
|
||||
}
|
||||
|
||||
def traveller_roll_context(queryset):
|
||||
travellers = []
|
||||
for traveller in queryset:
|
||||
for traveller_route in TravellerRoute.objects.filter(traveller=traveller):
|
||||
travellers.append(traveller_route_context(traveller_route))
|
||||
return travellers
|
||||
|
||||
def confirmation_letter_context(queryset):
|
||||
travellers = []
|
||||
for traveller in queryset:
|
||||
for travellerRoute in TravellerRoute.objects.filter(traveller=traveller):
|
||||
travellers.append({
|
||||
'traveller': traveller,
|
||||
'stop': travellerRoute.busStop,
|
||||
'shuttle': traveller.shuttle,
|
||||
})
|
||||
return {'travellers': travellers}
|
||||
@@ -1,3 +1,294 @@
|
||||
from datetime import datetime
|
||||
|
||||
import phonenumbers
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import models
|
||||
|
||||
# Create your models here.
|
||||
from common.models import Suburb
|
||||
|
||||
|
||||
class School(models.Model):
|
||||
name = models.CharField(max_length=30, unique=True)
|
||||
shortName = models.CharField(max_length=10, unique=True)
|
||||
address = models.CharField(max_length=50)
|
||||
suburb = models.ForeignKey(Suburb, on_delete=models.CASCADE)
|
||||
email = models.CharField(max_length=50, blank=True)
|
||||
phone = models.CharField(max_length=15, blank=True)
|
||||
principal_name = models.CharField(max_length=50, blank=True)
|
||||
principal_phone = models.CharField(max_length=15, blank=True)
|
||||
notes = models.TextField(blank=True)
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
class Traveller(models.Model):
|
||||
YEAR = [
|
||||
("PS", "PreSchool"),
|
||||
("00", "Year 00"),
|
||||
("01", "Year 01"),
|
||||
("02", "Year 02"),
|
||||
("03", "Year 03"),
|
||||
("04", "Year 04"),
|
||||
("05", "Year 05"),
|
||||
("06", "Year 06"),
|
||||
("07", "Year 07"),
|
||||
("08", "Year 08"),
|
||||
("09", "Year 09"),
|
||||
("10", "Year 10"),
|
||||
("11", "Year 11"),
|
||||
("12", "Year 12"),
|
||||
("AL", "Adult Learner"),
|
||||
]
|
||||
|
||||
ELIGIBILITY_STATUS = [
|
||||
("1", "Eligible"),
|
||||
("2", "Ineligible"),
|
||||
("3", "<4.8 Exemption"),
|
||||
("4", "Eligible waitlisted"),
|
||||
("5", "Ineligible waitlisted"),
|
||||
("6", "Kinder Exemption"),
|
||||
("7", "Tafe/Post Secondary Exemption"),
|
||||
("8", "Other Exemption"),
|
||||
]
|
||||
|
||||
school = models.ForeignKey(School, on_delete=models.PROTECT)
|
||||
first_name = models.CharField(max_length=50)
|
||||
last_name = models.CharField(max_length=50)
|
||||
dob = models.DateField(blank=True, null=True)
|
||||
year_level = models.CharField(max_length=2, choices=YEAR)
|
||||
bus_stops = models.ManyToManyField("transport.BusStop", through='TravellerRoute', blank=True)
|
||||
distance_to_school = models.PositiveSmallIntegerField(blank=True, null=True)
|
||||
address = models.CharField(max_length=100, default="")
|
||||
|
||||
travel_start_date = models.DateField(blank=True, null=True)
|
||||
travel_end_date = models.DateField(blank=True, null=True)
|
||||
eligibility_status = models.CharField(max_length=1, choices=ELIGIBILITY_STATUS)
|
||||
assessment_date = models.DateField(blank=True, null=True)
|
||||
fee_per_term = models.DecimalField(decimal_places=2, max_digits=5, blank=True, null=True)
|
||||
term_1_paid = models.BooleanField(default=False)
|
||||
term_2_paid = models.BooleanField(default=False)
|
||||
term_3_paid = models.BooleanField(default=False)
|
||||
term_4_paid = models.BooleanField(default=False)
|
||||
application_form_completed = models.BooleanField()
|
||||
parent_notified = models.BooleanField()
|
||||
seat_number = models.CharField(max_length=5, blank=True)
|
||||
|
||||
created_on = models.DateTimeField(auto_now_add=True, blank=True, null=True)
|
||||
last_edit = models.DateTimeField(auto_now=True, blank=True, null=True)
|
||||
is_archived = models.BooleanField(default=False, verbose_name="Archived")
|
||||
is_active = models.BooleanField(default=False, verbose_name='Active')
|
||||
notes = models.TextField(blank=True, verbose_name='Admin Notes')
|
||||
shuttle = models.ForeignKey("transport.Shuttle", on_delete=models.SET_NULL, blank=True, null=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ["last_name", "first_name"]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.first_name} {self.last_name}"
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
self._update_active_status()
|
||||
self._repopulate_address()
|
||||
super(Traveller, self).save(*args, **kwargs)
|
||||
|
||||
def _is_active(self, date=None):
|
||||
if date is None:
|
||||
today = datetime.today()
|
||||
date = datetime(today.year, today.month, today.day)
|
||||
|
||||
if not self.travel_start_date or datetime(self.travel_start_date.year, self.travel_start_date.month, self.travel_start_date.day) > date:
|
||||
return False
|
||||
if not self.travel_end_date:
|
||||
return True
|
||||
|
||||
end_date = datetime(self.travel_end_date.year, self.travel_end_date.month, self.travel_end_date.day)
|
||||
return end_date >= date
|
||||
|
||||
def _update_active_status(self):
|
||||
new_start_date = None
|
||||
new_end_date = None
|
||||
|
||||
for travellerRoute in TravellerRoute.objects.filter(traveller=self.id):
|
||||
route_start = travellerRoute.travel_start_date
|
||||
route_end = travellerRoute.travel_end_date
|
||||
if route_start is not None:
|
||||
if new_start_date is None or new_start_date > route_start:
|
||||
new_start_date = route_start
|
||||
if route_end is not None:
|
||||
if new_end_date is None or new_end_date < route_end:
|
||||
new_end_date = route_end
|
||||
|
||||
self.travel_start_date = new_start_date
|
||||
self.travel_end_date = new_end_date
|
||||
self.is_active = self._is_active()
|
||||
|
||||
def fare_paying(self):
|
||||
if self.eligibility_status != "2":
|
||||
return
|
||||
cost_setting = 0
|
||||
|
||||
if not cost_setting.exists():
|
||||
return "TERM_TRAVEL_COST not configured"
|
||||
|
||||
cost = int(cost_setting.get().value)
|
||||
|
||||
stops = 0
|
||||
for stop in TravellerRoute.objects.filter(traveller=self.id):
|
||||
stops += stop.active_stops()
|
||||
if stops > 1:
|
||||
stops = 1
|
||||
return f"${round(cost*stops)}"
|
||||
|
||||
def _repopulate_address(self):
|
||||
families = self.get_families()
|
||||
if families.count() == 0:
|
||||
self.address = ""
|
||||
elif families.count() == 1:
|
||||
family = families.first()
|
||||
self.address = f"{family.residential_address} {family.residential_suburb}"
|
||||
else:
|
||||
self.address = "Multiple"
|
||||
|
||||
def get_families(self):
|
||||
return Family.objects.filter(traveller__id__exact=self.id)
|
||||
|
||||
|
||||
class Family(models.Model):
|
||||
RELATIONS = [
|
||||
("1", "Parent"),
|
||||
("2", "Step-Parent"),
|
||||
("3", "Foster Parent"),
|
||||
("4", "Host Family"),
|
||||
("5", "Sibling"),
|
||||
("6", "Grandparent"),
|
||||
("7", "Aunt/Uncle"),
|
||||
("8", "Cousin"),
|
||||
("9", "Carer"),
|
||||
("10", "Case Worker"),
|
||||
("11", "Friend/Other"),
|
||||
]
|
||||
|
||||
traveller = models.ForeignKey(Traveller, on_delete=models.CASCADE)
|
||||
residential_address = models.CharField(max_length=50, blank=True)
|
||||
residential_suburb = models.ForeignKey(Suburb, on_delete=models.PROTECT, blank=True, null=True,
|
||||
related_name='family_residential_suburb')
|
||||
postal_address = models.CharField(max_length=50, blank=True)
|
||||
postal_suburb = models.ForeignKey(Suburb, on_delete=models.PROTECT, blank=True, null=True,
|
||||
related_name='family_postal_suburb')
|
||||
parent_A_firstname = models.CharField(max_length=50, blank=True)
|
||||
parent_A_lastname = models.CharField(max_length=50, blank=True)
|
||||
parent_A_phone = models.CharField(max_length=15, blank=True)
|
||||
parent_A_email = models.CharField(max_length=50, blank=True)
|
||||
parent_B_firstname = models.CharField(max_length=50, blank=True)
|
||||
parent_B_lastname = models.CharField(max_length=50, blank=True)
|
||||
parent_B_phone = models.CharField(max_length=15, blank=True)
|
||||
parent_B_email = models.CharField(max_length=50, blank=True)
|
||||
emergency_contact_A_firstname = models.CharField(max_length=50, blank=True)
|
||||
emergency_contact_A_lastname = models.CharField(max_length=50, blank=True)
|
||||
emergency_contact_A_phone = models.CharField(max_length=15, blank=True)
|
||||
emergency_contact_A_relation = models.CharField(max_length=50, choices=RELATIONS, blank=True)
|
||||
emergency_contact_B_firstname = models.CharField(max_length=50, blank=True)
|
||||
emergency_contact_B_lastname = models.CharField(max_length=50, blank=True)
|
||||
emergency_contact_B_phone = models.CharField(max_length=15, blank=True)
|
||||
emergency_contact_B_relation = models.CharField(max_length=50, choices=RELATIONS, blank=True)
|
||||
created_on = models.DateTimeField(auto_now_add=True, blank=True, null=True)
|
||||
last_edit = models.DateTimeField(auto_now=True, blank=True, null=True)
|
||||
|
||||
class Meta:
|
||||
verbose_name_plural = "Families"
|
||||
|
||||
def __str__(self):
|
||||
return self.parent_names()
|
||||
|
||||
def clean(self):
|
||||
valid_numbers, failed_numbers = self.get_parsed_numbers(True, True)
|
||||
if len(failed_numbers) > 0:
|
||||
raise ValidationError(f"Phone number {failed_numbers[0]} not valid")
|
||||
|
||||
def parent_names(self):
|
||||
a_name = self.parent_A_firstname
|
||||
b_name = self.parent_B_firstname
|
||||
if a_name:
|
||||
if b_name:
|
||||
return f"{a_name} and {b_name}"
|
||||
return a_name
|
||||
elif b_name:
|
||||
return b_name
|
||||
return ""
|
||||
|
||||
def get_parsed_numbers(self, parents=False, emergency=False):
|
||||
numbers = []
|
||||
if parents:
|
||||
if self.parent_A_phone:
|
||||
numbers.append(self.parent_A_phone)
|
||||
if self.parent_B_phone:
|
||||
numbers.append(self.parent_B_phone)
|
||||
if emergency:
|
||||
if self.emergency_contact_A_phone:
|
||||
numbers.append(self.emergency_contact_A_phone)
|
||||
if self.emergency_contact_B_phone:
|
||||
numbers.append(self.emergency_contact_B_phone)
|
||||
valid_numbers = []
|
||||
failed_numbers = []
|
||||
for number in numbers:
|
||||
try:
|
||||
num = phonenumbers.parse(number, "AU")
|
||||
if phonenumbers.is_valid_number(num):
|
||||
valid_numbers.append(f"+{num.country_code}{num.national_number}")
|
||||
else:
|
||||
failed_numbers.append(number)
|
||||
except:
|
||||
failed_numbers.append(number)
|
||||
|
||||
return valid_numbers, failed_numbers
|
||||
|
||||
|
||||
class TravellerRoute(models.Model):
|
||||
traveller = models.ForeignKey(Traveller, on_delete=models.CASCADE)
|
||||
busStop = models.ForeignKey("transport.BusStop", on_delete=models.CASCADE)
|
||||
travel_start_date = models.DateField(blank=True, null=True)
|
||||
travel_end_date = models.DateField(blank=True, null=True)
|
||||
mon_am = models.BooleanField(default=True)
|
||||
mon_pm = models.BooleanField(default=True)
|
||||
tue_am = models.BooleanField(default=True)
|
||||
tue_pm = models.BooleanField(default=True)
|
||||
wen_am = models.BooleanField(default=True)
|
||||
wen_pm = models.BooleanField(default=True)
|
||||
thu_am = models.BooleanField(default=True)
|
||||
thu_pm = models.BooleanField(default=True)
|
||||
fri_am = models.BooleanField(default=True)
|
||||
fri_pm = models.BooleanField(default=True)
|
||||
notes = models.TextField(blank=True, verbose_name="Driver Notes")
|
||||
created_on = models.DateTimeField(auto_now_add=True, blank=True, null=True)
|
||||
last_edit = models.DateTimeField(auto_now=True, blank=True, null=True)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.busStop}"
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
super(TravellerRoute, self).save(*args, **kwargs)
|
||||
self.traveller.save()
|
||||
|
||||
def active_stops(self):
|
||||
stops = 0
|
||||
if self.mon_am:
|
||||
stops += 1
|
||||
if self.mon_pm:
|
||||
stops += 1
|
||||
if self.tue_am:
|
||||
stops += 1
|
||||
if self.tue_pm:
|
||||
stops += 1
|
||||
if self.wen_am:
|
||||
stops += 1
|
||||
if self.wen_pm:
|
||||
stops += 1
|
||||
if self.thu_am:
|
||||
stops += 1
|
||||
if self.thu_pm:
|
||||
stops += 1
|
||||
if self.fri_am:
|
||||
stops += 1
|
||||
if self.fri_pm:
|
||||
stops += 1
|
||||
return stops * 0.1
|
||||
|
||||
@@ -1,3 +1,17 @@
|
||||
from django.contrib.admin.views.decorators import staff_member_required
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
|
||||
# Create your views here.
|
||||
from messaging.services.sms import SMSForm
|
||||
|
||||
|
||||
@staff_member_required
|
||||
def sms_message(request, queryset):
|
||||
if request.method == 'POST':
|
||||
form = SMSForm(request.POST)
|
||||
if form.is_valid():
|
||||
HttpResponseRedirect(request.get_full_path())
|
||||
else:
|
||||
form = SMSForm()
|
||||
|
||||
return render(request, 'admin/sms_form.html', context={'form': form, 'travellers': queryset})
|
||||
|
||||
Reference in New Issue
Block a user