diff --git a/backend/app/api/v1/auth.py b/backend/app/api/v1/auth.py new file mode 100644 index 0000000..b56fc4c --- /dev/null +++ b/backend/app/api/v1/auth.py @@ -0,0 +1,102 @@ +"""Authentication endpoints.""" +from datetime import timedelta +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from sqlalchemy.orm import Session + +from app.core.database import get_db +from app.core.security import verify_password, create_access_token, decode_access_token +from app.core.config import settings +from app.models.user import User +from app.schemas.auth import Token +from app.schemas.user import UserCreate, UserResponse + +router = APIRouter() +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") + +def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: + """Get the current authenticated user.""" + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + + payload = decode_access_token(token) + if payload is None: + raise credentials_exception + + username: str = payload.get("sub") + if username is None: + raise credentials_exception + + user = db.query(User).filter(User.username == username).first() + if user is None: + raise credentials_exception + + return user + +@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED) +async def register(user_data: UserCreate, db: Session = Depends(get_db)): + """Register a new user.""" + # Check if username already exists + if db.query(User).filter(User.username == user_data.username).first(): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Username already registered" + ) + + # Check if email already exists + if db.query(User).filter(User.email == user_data.email).first(): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Email already registered" + ) + + # Create new user + from app.core.security import get_password_hash + db_user = User( + username=user_data.username, + email=user_data.email, + full_name=user_data.full_name, + hashed_password=get_password_hash(user_data.password), + is_active=True, + is_admin=False + ) + + db.add(db_user) + db.commit() + db.refresh(db_user) + + return db_user + +@router.post("/login", response_model=Token) +async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): + """Login and get access token.""" + user = db.query(User).filter(User.username == form_data.username).first() + + if not user or not verify_password(form_data.password, user.hashed_password): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + + if not user.is_active: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Inactive user" + ) + + access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.username}, + expires_delta=access_token_expires + ) + + return {"access_token": access_token, "token_type": "bearer"} + +@router.get("/me", response_model=UserResponse) +async def get_current_user_info(current_user: User = Depends(get_current_user)): + """Get current user information.""" + return current_user