Moved azure login to local app. Removed ability for new users to be created automatically
This commit is contained in:
@@ -34,6 +34,7 @@ ALLOWED_HOSTS = os.environ.get('ALLOWED_HOSTS').split(' ')
|
||||
INSTALLED_APPS = [
|
||||
'coord.apps.CoordConfig',
|
||||
# 'mail_templates.apps.MailTemplatesConfig',
|
||||
'custom_user.apps.CustomUserConfig',
|
||||
'django.contrib.admin',
|
||||
'django.contrib.auth',
|
||||
'django.contrib.contenttypes',
|
||||
@@ -42,7 +43,6 @@ INSTALLED_APPS = [
|
||||
'django.contrib.staticfiles',
|
||||
'import_export',
|
||||
'rangefilter',
|
||||
'azure_auth',
|
||||
]
|
||||
|
||||
TWILIO = {
|
||||
@@ -65,8 +65,8 @@ MIDDLEWARE = [
|
||||
]
|
||||
|
||||
if "AZURE_CLIENT_ID" in os.environ:
|
||||
AUTHENTICATION_BACKENDS = ("azure_auth.backends.AzureBackend",)
|
||||
MIDDLEWARE.append('azure_auth.middleware.AzureMiddleware')
|
||||
AUTHENTICATION_BACKENDS = ("custom_user.backends.AzureBackend",)
|
||||
MIDDLEWARE.append('custom_user.middleware.AzureMiddleware')
|
||||
LOGIN_URL = "/azure_auth/login"
|
||||
AZURE_AUTH = {
|
||||
"CLIENT_ID": os.environ.get('AZURE_CLIENT_ID'),
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class CustomUserConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'custom_user'
|
||||
@@ -0,0 +1,14 @@
|
||||
from django.contrib.auth.backends import ModelBackend
|
||||
|
||||
from .handlers import AuthHandler
|
||||
|
||||
|
||||
class AzureBackend(ModelBackend):
|
||||
def authenticate(self, request, token=None, *args, **kwargs):
|
||||
if not token: # pragma: no cover
|
||||
return
|
||||
user = AuthHandler(request).authenticate(token)
|
||||
|
||||
# Return only if `is_active`
|
||||
if self.user_can_authenticate(user):
|
||||
return user
|
||||
@@ -0,0 +1,11 @@
|
||||
class DjangoAzureAuthException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TokenError(DjangoAzureAuthException):
|
||||
def __init__(self, message, description):
|
||||
self.message = message if message else ""
|
||||
self.description = description if description else ""
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.message}\n{self.description}"
|
||||
@@ -0,0 +1,147 @@
|
||||
from http import HTTPStatus
|
||||
|
||||
import msal
|
||||
import requests
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
from .exceptions import DjangoAzureAuthException, TokenError
|
||||
|
||||
UserModel = get_user_model()
|
||||
|
||||
|
||||
class AuthHandler:
|
||||
"""
|
||||
Class to interface with `msal` package and execute authentication process.
|
||||
"""
|
||||
|
||||
def __init__(self, request=None):
|
||||
"""
|
||||
|
||||
:param request: HttpRequest
|
||||
"""
|
||||
self.request = request
|
||||
self.graph_user_endpoint = "https://graph.microsoft.com/v1.0/me"
|
||||
self.auth_flow_session_key = "auth_flow"
|
||||
self._cache = msal.SerializableTokenCache()
|
||||
self._msal_app = None
|
||||
|
||||
def get_auth_uri(self) -> str:
|
||||
"""
|
||||
Requests the auth flow dictionary and stores it on the session to be
|
||||
queried later in the auth process.
|
||||
|
||||
:return: Authentication redirect URI
|
||||
"""
|
||||
# TODO: Handle if user has put incorrect details in settings
|
||||
flow = self.msal_app.initiate_auth_code_flow(
|
||||
scopes=settings.AZURE_AUTH["SCOPES"],
|
||||
redirect_uri=settings.AZURE_AUTH["REDIRECT_URI"],
|
||||
)
|
||||
self.request.session[self.auth_flow_session_key] = flow
|
||||
return flow["auth_uri"]
|
||||
|
||||
def get_token_from_flow(self) -> dict:
|
||||
"""
|
||||
Acquires the token from the auth flow on the session and the content of
|
||||
the redirect request from Active Directory.
|
||||
|
||||
:return: Token result containing `access_token`/`id_token` and other
|
||||
claims, depending on scopes used
|
||||
"""
|
||||
flow = self.request.session.pop(self.auth_flow_session_key, {})
|
||||
token_result = self.msal_app.acquire_token_by_auth_code_flow(
|
||||
auth_code_flow=flow, auth_response=self.request.GET
|
||||
)
|
||||
if "error" in token_result:
|
||||
raise TokenError(token_result["error"], token_result["error_description"])
|
||||
self._save_cache()
|
||||
self.request.session["id_token_claims"] = token_result["id_token_claims"]
|
||||
return token_result
|
||||
|
||||
def get_token_from_cache(self):
|
||||
accounts = self.msal_app.get_accounts()
|
||||
if accounts: # pragma: no branch
|
||||
# Will return `None` if CCA cannot retrieve or generate new token
|
||||
token_result = self.msal_app.acquire_token_silent(
|
||||
scopes=settings.AZURE_AUTH["SCOPES"], account=accounts[0]
|
||||
)
|
||||
self._save_cache()
|
||||
return token_result
|
||||
|
||||
def authenticate(self, token: dict) -> UserModel:
|
||||
"""
|
||||
Helper method to authenticate the user. Gets the Azure user from the
|
||||
Microsoft Graph endpoint and gets/creates the associated Django user.
|
||||
|
||||
:param token: MSAL auth token dictionary
|
||||
:return: Django user instance
|
||||
"""
|
||||
azure_user = self._get_azure_user(token["access_token"])
|
||||
|
||||
# Allow for `outlook.com` users with email set on the
|
||||
# `userPrincipalName` attribute
|
||||
email = (
|
||||
azure_user["mail"]
|
||||
if azure_user.get("mail", None)
|
||||
else azure_user["userPrincipalName"]
|
||||
)
|
||||
|
||||
# Using `UserModel._default_manager.get_by_natural_key` handles custom
|
||||
# user model and `USERNAME_FIELD` setting
|
||||
try:
|
||||
user = UserModel._default_manager.get_by_natural_key(email)
|
||||
except UserModel.DoesNotExist:
|
||||
return None
|
||||
|
||||
# TODO: Handle groups
|
||||
return user
|
||||
|
||||
@staticmethod
|
||||
def get_logout_uri() -> str:
|
||||
"""
|
||||
Forms the URI to log the user out in the Active Directory app and
|
||||
redirect to the webapp logout page.
|
||||
|
||||
:return: Active Directory app logout URI
|
||||
"""
|
||||
authority = settings.AZURE_AUTH["AUTHORITY"]
|
||||
logout_uri = settings.AZURE_AUTH.get("LOGOUT_URI", "")
|
||||
if logout_uri:
|
||||
return (
|
||||
f"{authority}/oauth2/v2.0/logout?post_logout_redirect_uri={logout_uri}"
|
||||
)
|
||||
return f"{authority}/oauth2/v2.0/logout"
|
||||
|
||||
@property
|
||||
def msal_app(self):
|
||||
if self._msal_app is None:
|
||||
self._msal_app = msal.ConfidentialClientApplication(
|
||||
client_id=settings.AZURE_AUTH["CLIENT_ID"],
|
||||
client_credential=settings.AZURE_AUTH["CLIENT_SECRET"],
|
||||
authority=settings.AZURE_AUTH["AUTHORITY"],
|
||||
token_cache=self.cache,
|
||||
)
|
||||
return self._msal_app
|
||||
|
||||
@property
|
||||
def cache(self):
|
||||
if self.request.session.get("token_cache"):
|
||||
self._cache.deserialize(self.request.session["token_cache"])
|
||||
return self._cache
|
||||
|
||||
def _save_cache(self):
|
||||
if self.cache.has_state_changed:
|
||||
self.request.session["token_cache"] = self.cache.serialize()
|
||||
|
||||
def _get_azure_user(self, token: str):
|
||||
resp = requests.get(
|
||||
self.graph_user_endpoint, headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
if resp.ok:
|
||||
return resp.json()
|
||||
elif resp.status_code == HTTPStatus.UNAUTHORIZED:
|
||||
error = resp.json()["error"]
|
||||
raise TokenError(message=error["code"], description=error["message"])
|
||||
else: # pragma: no cover
|
||||
raise DjangoAzureAuthException("An unknown error occurred.")
|
||||
@@ -0,0 +1,32 @@
|
||||
from django.conf import settings
|
||||
from django.shortcuts import redirect
|
||||
from django.urls import reverse
|
||||
|
||||
from .handlers import AuthHandler
|
||||
|
||||
|
||||
class AzureMiddleware:
|
||||
def __init__(self, get_response):
|
||||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request):
|
||||
public_views = ["azure_auth:login", "azure_auth:logout", "azure_auth:callback"]
|
||||
public_views.extend(settings.AZURE_AUTH.get("PUBLIC_URLS", []))
|
||||
public_urls = [reverse(view_name) for view_name in public_views]
|
||||
public_paths = settings.AZURE_AUTH.get(
|
||||
"PUBLIC_PATHS", []
|
||||
) # added to resolve paths
|
||||
|
||||
if request.path_info in public_urls:
|
||||
return self.get_response(request)
|
||||
|
||||
# Added to resolve paths that can't be reversed
|
||||
for path in public_paths:
|
||||
if request.path_info.startswith(path):
|
||||
return self.get_response(request)
|
||||
|
||||
if AuthHandler(request).get_token_from_cache():
|
||||
# If the user is authenticated
|
||||
if request.user.is_authenticated:
|
||||
return self.get_response(request)
|
||||
return redirect("azure_auth:login")
|
||||
@@ -0,0 +1,10 @@
|
||||
from django.urls import path
|
||||
|
||||
from .views import azure_auth_callback, azure_auth_login, azure_auth_logout
|
||||
|
||||
app_name = "azure_auth"
|
||||
urlpatterns = [
|
||||
path("login", azure_auth_login, name="login"),
|
||||
path("logout", azure_auth_logout, name="logout"),
|
||||
path("callback", azure_auth_callback, name="callback"),
|
||||
]
|
||||
@@ -0,0 +1,24 @@
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import authenticate, login, logout
|
||||
from django.http import HttpResponseForbidden, HttpResponseRedirect
|
||||
|
||||
from .handlers import AuthHandler
|
||||
|
||||
|
||||
def azure_auth_login(request):
|
||||
return HttpResponseRedirect(AuthHandler(request).get_auth_uri())
|
||||
|
||||
|
||||
def azure_auth_logout(request):
|
||||
logout(request)
|
||||
return HttpResponseRedirect(AuthHandler.get_logout_uri())
|
||||
|
||||
|
||||
def azure_auth_callback(request):
|
||||
token = AuthHandler(request).get_token_from_flow()
|
||||
user = authenticate(request, token=token)
|
||||
if user:
|
||||
login(request, user)
|
||||
else:
|
||||
return HttpResponseForbidden("Invalid email for this app.")
|
||||
return HttpResponseRedirect(settings.LOGIN_REDIRECT_URL)
|
||||
Binary file not shown.
Reference in New Issue
Block a user