Files
family-hub/backend/app/api/v1/auth.py

187 lines
6.3 KiB
Python

"""Authentication endpoints."""
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.security import verify_password, create_access_token, decode_access_token, get_password_hash
from app.core.config import settings
from app.models.user import User
from app.schemas.auth import Token
from app.schemas.user import UserCreate, UserResponse, UserUpdate, UserAdminUpdate
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
"""Get the current authenticated user."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_access_token(token)
if payload is None:
raise credentials_exception
username: str = payload.get("sub")
if username is None:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
return user
def get_current_admin_user(current_user: User = Depends(get_current_user)) -> User:
"""Get current user and verify they are an admin."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Register a new user."""
# Check if username already exists
if db.query(User).filter(User.username == user_data.username).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Check if email already exists
if db.query(User).filter(User.email == user_data.email).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
db_user = User(
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
discord_id=user_data.discord_id,
profile_picture=user_data.profile_picture,
hashed_password=get_password_hash(user_data.password),
is_active=True,
is_admin=False
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.post("/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""Login and get access token."""
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(current_user: User = Depends(get_current_user)):
"""Get current user information."""
return current_user
@router.put("/me", response_model=UserResponse)
async def update_current_user(
user_update: UserUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Update current user's own profile."""
update_data = user_update.model_dump(exclude_unset=True)
# Hash password if provided
if "password" in update_data and update_data["password"]:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
# Check email uniqueness if being updated
if "email" in update_data:
existing_user = db.query(User).filter(
User.email == update_data["email"],
User.id != current_user.id
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already in use"
)
# Update user fields
for field, value in update_data.items():
setattr(current_user, field, value)
db.commit()
db.refresh(current_user)
return current_user
@router.put("/users/{user_id}", response_model=UserResponse)
async def update_user_admin(
user_id: int,
user_update: UserAdminUpdate,
db: Session = Depends(get_db),
admin_user: User = Depends(get_current_admin_user)
):
"""Admin endpoint to update any user."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
update_data = user_update.model_dump(exclude_unset=True)
# Hash password if provided
if "password" in update_data and update_data["password"]:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
# Check email uniqueness if being updated
if "email" in update_data:
existing_user = db.query(User).filter(
User.email == update_data["email"],
User.id != user.id
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already in use"
)
# Update user fields
for field, value in update_data.items():
setattr(user, field, value)
db.commit()
db.refresh(user)
return user