Files
bus-manager/busManager/custom_user/handlers.py
T

148 lines
5.1 KiB
Python

from http import HTTPStatus
import msal
import requests
from django.conf import settings
from django.contrib.auth import get_user_model
from .exceptions import DjangoAzureAuthException, TokenError
UserModel = get_user_model()
class AuthHandler:
"""
Class to interface with `msal` package and execute authentication process.
"""
def __init__(self, request=None):
"""
:param request: HttpRequest
"""
self.request = request
self.graph_user_endpoint = "https://graph.microsoft.com/v1.0/me"
self.auth_flow_session_key = "auth_flow"
self._cache = msal.SerializableTokenCache()
self._msal_app = None
def get_auth_uri(self) -> str:
"""
Requests the auth flow dictionary and stores it on the session to be
queried later in the auth process.
:return: Authentication redirect URI
"""
# TODO: Handle if user has put incorrect details in settings
flow = self.msal_app.initiate_auth_code_flow(
scopes=settings.AZURE_AUTH["SCOPES"],
redirect_uri=settings.AZURE_AUTH["REDIRECT_URI"],
)
self.request.session[self.auth_flow_session_key] = flow
return flow["auth_uri"]
def get_token_from_flow(self) -> dict:
"""
Acquires the token from the auth flow on the session and the content of
the redirect request from Active Directory.
:return: Token result containing `access_token`/`id_token` and other
claims, depending on scopes used
"""
flow = self.request.session.pop(self.auth_flow_session_key, {})
token_result = self.msal_app.acquire_token_by_auth_code_flow(
auth_code_flow=flow, auth_response=self.request.GET
)
if "error" in token_result:
raise TokenError(token_result["error"], token_result["error_description"])
self._save_cache()
self.request.session["id_token_claims"] = token_result["id_token_claims"]
return token_result
def get_token_from_cache(self):
accounts = self.msal_app.get_accounts()
if accounts: # pragma: no branch
# Will return `None` if CCA cannot retrieve or generate new token
token_result = self.msal_app.acquire_token_silent(
scopes=settings.AZURE_AUTH["SCOPES"], account=accounts[0]
)
self._save_cache()
return token_result
def authenticate(self, token: dict) -> UserModel:
"""
Helper method to authenticate the user. Gets the Azure user from the
Microsoft Graph endpoint and gets/creates the associated Django user.
:param token: MSAL auth token dictionary
:return: Django user instance
"""
azure_user = self._get_azure_user(token["access_token"])
# Allow for `outlook.com` users with email set on the
# `userPrincipalName` attribute
email = (
azure_user["mail"]
if azure_user.get("mail", None)
else azure_user["userPrincipalName"]
)
# Using `UserModel._default_manager.get_by_natural_key` handles custom
# user model and `USERNAME_FIELD` setting
try:
user = UserModel._default_manager.get_by_natural_key(email)
except UserModel.DoesNotExist:
return None
# TODO: Handle groups
return user
@staticmethod
def get_logout_uri() -> str:
"""
Forms the URI to log the user out in the Active Directory app and
redirect to the webapp logout page.
:return: Active Directory app logout URI
"""
authority = settings.AZURE_AUTH["AUTHORITY"]
logout_uri = settings.AZURE_AUTH.get("LOGOUT_URI", "")
if logout_uri:
return (
f"{authority}/oauth2/v2.0/logout?post_logout_redirect_uri={logout_uri}"
)
return f"{authority}/oauth2/v2.0/logout"
@property
def msal_app(self):
if self._msal_app is None:
self._msal_app = msal.ConfidentialClientApplication(
client_id=settings.AZURE_AUTH["CLIENT_ID"],
client_credential=settings.AZURE_AUTH["CLIENT_SECRET"],
authority=settings.AZURE_AUTH["AUTHORITY"],
token_cache=self.cache,
)
return self._msal_app
@property
def cache(self):
if self.request.session.get("token_cache"):
self._cache.deserialize(self.request.session["token_cache"])
return self._cache
def _save_cache(self):
if self.cache.has_state_changed:
self.request.session["token_cache"] = self.cache.serialize()
def _get_azure_user(self, token: str):
resp = requests.get(
self.graph_user_endpoint, headers={"Authorization": f"Bearer {token}"}
)
if resp.ok:
return resp.json()
elif resp.status_code == HTTPStatus.UNAUTHORIZED:
error = resp.json()["error"]
raise TokenError(message=error["code"], description=error["message"])
else: # pragma: no cover
raise DjangoAzureAuthException("An unknown error occurred.")