Phase 3.1: Enhanced Chore Logging and Reporting System

This commit is contained in:
2026-02-05 12:33:51 +11:00
commit e3cae7bfbb
178 changed files with 30105 additions and 0 deletions

30
backend/.env.example Normal file
View File

@@ -0,0 +1,30 @@
# Application Settings
APP_NAME=Family Hub
APP_VERSION=0.1.0
DEBUG=True
# Database
DATABASE_URL=sqlite:///./family_hub.db
# For PostgreSQL (production):
# DATABASE_URL=postgresql://user:password@localhost:5432/family_hub
# Security
SECRET_KEY=your-super-secret-key-change-this-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
# CORS
ALLOWED_ORIGINS=http://localhost:5173,http://localhost:3000
# Google Calendar API (Optional - configure when needed)
GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=
GOOGLE_REDIRECT_URI=
# Mealie Integration (Optional - configure when needed)
MEALIE_API_URL=
MEALIE_API_TOKEN=
# Home Assistant Integration (Optional - configure when needed)
HOME_ASSISTANT_URL=
HOME_ASSISTANT_TOKEN=

23
backend/Dockerfile Normal file
View File

@@ -0,0 +1,23 @@
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Run the application
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

View File

@@ -0,0 +1,69 @@
# Database Fix - READY TO EXECUTE
## Problem Found
There were **TWO** `init_db.py` files:
1. `D:\Hosted\familyhub\backend\init_db.py`
2. `D:\Hosted\familyhub\backend\migrations\init_db.py`
Both were using UPPERCASE enum values (DAILY, PENDING, etc.) but the code expects lowercase values (daily, pending, etc.).
## What Was Fixed
### 1. ✅ app/models/chore.py
Added `values_callable=lambda x: [e.value for e in x]` to all SQLEnum columns to properly match enum values.
### 2. ✅ app/schemas/chore.py
Fixed `due_date` validators to convert empty strings to `None`.
### 3. ✅ backend/init_db.py
Changed ALL enum values from uppercase to lowercase.
### 4. ✅ backend/migrations/init_db.py
Changed ALL enum values from uppercase to lowercase.
## How To Fix
### Option 1: Use the Reset Script (Recommended)
1. Stop the backend server (Ctrl+C)
2. Run the reset script:
```
cd D:\Hosted\familyhub\backend
python reset_database.py
```
3. Restart the backend:
```
.\start_backend.bat
```
### Option 2: Manual Steps
1. Stop the backend server (Ctrl+C)
2. Delete the database:
```
del D:\Hosted\familyhub\backend\data\family_hub.db
```
3. Run initialization:
```
cd D:\Hosted\familyhub\backend
python init_db.py
```
4. Restart the backend:
```
.\start_backend.bat
```
## What You'll Get
After reinitialization:
- 5 demo users (jess, lou, william, xander, bella)
- Password for all: `password123`
- 12 demo chores with various frequencies and statuses
- All enum values properly set to lowercase
The chores will now load without errors! 🎉

1
backend/app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# App package

View File

@@ -0,0 +1 @@
# API package

View File

@@ -0,0 +1 @@
# API v1 package

196
backend/app/api/v1/auth.py Normal file
View File

@@ -0,0 +1,196 @@
"""Authentication endpoints."""
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import List
from app.core.database import get_db
from app.core.security import verify_password, create_access_token, decode_access_token, get_password_hash
from app.core.config import settings
from app.models.user import User
from app.schemas.auth import Token
from app.schemas.user import UserCreate, UserResponse, UserUpdate, UserAdminUpdate
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
"""Get the current authenticated user."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_access_token(token)
if payload is None:
raise credentials_exception
username: str = payload.get("sub")
if username is None:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
return user
def get_current_admin_user(current_user: User = Depends(get_current_user)) -> User:
"""Get current user and verify they are an admin."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Register a new user."""
# Check if username already exists
if db.query(User).filter(User.username == user_data.username).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Check if email already exists
if db.query(User).filter(User.email == user_data.email).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
db_user = User(
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
discord_id=user_data.discord_id,
profile_picture=user_data.profile_picture,
hashed_password=get_password_hash(user_data.password),
is_active=True,
is_admin=False
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.post("/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""Login and get access token."""
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(current_user: User = Depends(get_current_user)):
"""Get current user information."""
return current_user
@router.put("/me", response_model=UserResponse)
async def update_current_user(
user_update: UserUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Update current user's own profile."""
update_data = user_update.model_dump(exclude_unset=True)
# Hash password if provided
if "password" in update_data and update_data["password"]:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
# Check email uniqueness if being updated
if "email" in update_data:
existing_user = db.query(User).filter(
User.email == update_data["email"],
User.id != current_user.id
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already in use"
)
# Update user fields
for field, value in update_data.items():
setattr(current_user, field, value)
db.commit()
db.refresh(current_user)
return current_user
@router.get("/users", response_model=List[UserResponse])
async def list_users(
db: Session = Depends(get_db),
admin_user: User = Depends(get_current_admin_user)
):
"""Admin endpoint to list all users."""
users = db.query(User).all()
return users
@router.put("/users/{user_id}", response_model=UserResponse)
async def update_user_admin(
user_id: int,
user_update: UserAdminUpdate,
db: Session = Depends(get_db),
admin_user: User = Depends(get_current_admin_user)
):
"""Admin endpoint to update any user."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
update_data = user_update.model_dump(exclude_unset=True)
# Hash password if provided
if "password" in update_data and update_data["password"]:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
# Check email uniqueness if being updated
if "email" in update_data:
existing_user = db.query(User).filter(
User.email == update_data["email"],
User.id != user.id
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already in use"
)
# Update user fields
for field, value in update_data.items():
setattr(user, field, value)
db.commit()
db.refresh(user)
return user

View File

@@ -0,0 +1,397 @@
"""Chore Completion Log API endpoints."""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from typing import List, Optional
from datetime import datetime, timedelta
from app.core.database import get_db
from app.api.v1.auth import get_current_user
from app.models.user import User
from app.models.chore import Chore
from app.models.chore_assignment import ChoreAssignment
from app.models.chore_completion_log import ChoreCompletionLog
from app.schemas import chore_completion_log as log_schemas
router = APIRouter()
def enrich_completion_log(db: Session, log: ChoreCompletionLog) -> dict:
"""Add related information to completion log."""
# Get chore info
chore = db.query(Chore).filter(Chore.id == log.chore_id).first()
# Get user info
user = db.query(User).filter(User.id == log.user_id).first()
# Get verified_by info if exists
verified_by_name = None
if log.verified_by_user_id:
verified_by = db.query(User).filter(User.id == log.verified_by_user_id).first()
if verified_by:
verified_by_name = verified_by.full_name or verified_by.username
return {
"id": log.id,
"chore_id": log.chore_id,
"user_id": log.user_id,
"completed_at": log.completed_at,
"notes": log.notes,
"verified_by_user_id": log.verified_by_user_id,
"created_at": log.created_at,
"chore_title": chore.title if chore else None,
"user_name": user.full_name or user.username if user else None,
"user_avatar": user.avatar_url if user else None,
"verified_by_name": verified_by_name
}
@router.post("/{chore_id}/complete", response_model=log_schemas.ChoreCompletionLog, status_code=status.HTTP_201_CREATED)
def complete_chore(
chore_id: int,
notes: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Log a chore completion.
Creates a completion log entry and updates the chore assignment.
This is the primary endpoint for completing chores.
"""
# Check if chore exists
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
# Check if user is assigned to this chore
assignment = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id,
ChoreAssignment.user_id == current_user.id
).first()
if not assignment:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not assigned to this chore"
)
# Create completion log
completion_log = ChoreCompletionLog(
chore_id=chore_id,
user_id=current_user.id,
completed_at=datetime.utcnow(),
notes=notes
)
db.add(completion_log)
# Update assignment completed_at
assignment.completed_at = datetime.utcnow()
# Check if all assignments are completed
all_assignments = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id
).all()
if all(a.completed_at is not None for a in all_assignments):
chore.completed_at = datetime.utcnow()
chore.status = "completed"
db.commit()
db.refresh(completion_log)
return enrich_completion_log(db, completion_log)
@router.get("/completions", response_model=List[log_schemas.ChoreCompletionLog])
def get_completion_logs(
skip: int = 0,
limit: int = 100,
chore_id: Optional[int] = Query(None, description="Filter by chore ID"),
user_id: Optional[int] = Query(None, description="Filter by user ID"),
start_date: Optional[datetime] = Query(None, description="Filter completions after this date"),
end_date: Optional[datetime] = Query(None, description="Filter completions before this date"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get chore completion logs with optional filters.
- **chore_id**: Filter by specific chore
- **user_id**: Filter by specific user
- **start_date**: Filter completions after this date
- **end_date**: Filter completions before this date
"""
query = db.query(ChoreCompletionLog)
# Apply filters
if chore_id:
query = query.filter(ChoreCompletionLog.chore_id == chore_id)
if user_id:
query = query.filter(ChoreCompletionLog.user_id == user_id)
if start_date:
query = query.filter(ChoreCompletionLog.completed_at >= start_date)
if end_date:
query = query.filter(ChoreCompletionLog.completed_at <= end_date)
# Order by most recent first
query = query.order_by(ChoreCompletionLog.completed_at.desc())
logs = query.offset(skip).limit(limit).all()
# Enrich with related data
return [enrich_completion_log(db, log) for log in logs]
@router.get("/reports/weekly", response_model=log_schemas.WeeklyChoreReport)
def get_weekly_report(
user_id: Optional[int] = Query(None, description="Get report for specific user (omit for family-wide)"),
weeks_ago: int = Query(0, description="Number of weeks ago (0 = current week)"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Generate a weekly chore completion report.
- **user_id**: Optional - get report for specific user
- **weeks_ago**: Which week to report on (0 = current week, 1 = last week, etc.)
Returns comprehensive statistics including:
- Total completions
- Completions by user
- Completions by chore
- Completions by day
- Top performers
- Recent completions
"""
# Calculate week boundaries
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
start_of_week = today - timedelta(days=today.weekday()) - timedelta(weeks=weeks_ago)
end_of_week = start_of_week + timedelta(days=7)
# Base query
query = db.query(ChoreCompletionLog).filter(
and_(
ChoreCompletionLog.completed_at >= start_of_week,
ChoreCompletionLog.completed_at < end_of_week
)
)
# Apply user filter if specified
if user_id:
query = query.filter(ChoreCompletionLog.user_id == user_id)
logs = query.all()
# Calculate statistics
total_completions = len(logs)
# Completions by user
completions_by_user = {}
for log in logs:
user = db.query(User).filter(User.id == log.user_id).first()
if user:
username = user.full_name or user.username
completions_by_user[username] = completions_by_user.get(username, 0) + 1
# Completions by chore
completions_by_chore = {}
for log in logs:
chore = db.query(Chore).filter(Chore.id == log.chore_id).first()
if chore:
completions_by_chore[chore.title] = completions_by_chore.get(chore.title, 0) + 1
# Completions by day
completions_by_day = {}
for log in logs:
day_name = log.completed_at.strftime("%A")
completions_by_day[day_name] = completions_by_day.get(day_name, 0) + 1
# Top performers
user_stats = []
for user_name, count in completions_by_user.items():
user = db.query(User).filter(
(User.full_name == user_name) | (User.username == user_name)
).first()
user_stats.append({
"username": user_name,
"count": count,
"avatar_url": user.avatar_url if user else None
})
user_stats.sort(key=lambda x: x["count"], reverse=True)
top_performers = user_stats[:5] # Top 5 performers
# Recent completions (last 10)
recent_logs = sorted(logs, key=lambda x: x.completed_at, reverse=True)[:10]
recent_completions = [enrich_completion_log(db, log) for log in recent_logs]
return {
"start_date": start_of_week,
"end_date": end_of_week,
"total_completions": total_completions,
"completions_by_user": completions_by_user,
"completions_by_chore": completions_by_chore,
"completions_by_day": completions_by_day,
"top_performers": top_performers,
"recent_completions": recent_completions
}
@router.get("/reports/user/{user_id}", response_model=log_schemas.UserChoreStats)
def get_user_stats(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get comprehensive statistics for a specific user.
Returns:
- Total completions (all time)
- Completions this week
- Completions this month
- Favorite chore (most completed)
- Recent completions
"""
# Check if user exists
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Total completions
total_completions = db.query(ChoreCompletionLog).filter(
ChoreCompletionLog.user_id == user_id
).count()
# This week
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
start_of_week = today - timedelta(days=today.weekday())
completions_this_week = db.query(ChoreCompletionLog).filter(
and_(
ChoreCompletionLog.user_id == user_id,
ChoreCompletionLog.completed_at >= start_of_week
)
).count()
# This month
start_of_month = today.replace(day=1)
completions_this_month = db.query(ChoreCompletionLog).filter(
and_(
ChoreCompletionLog.user_id == user_id,
ChoreCompletionLog.completed_at >= start_of_month
)
).count()
# Favorite chore (most completed)
favorite_chore = None
chore_counts = db.query(
ChoreCompletionLog.chore_id,
func.count(ChoreCompletionLog.id).label('count')
).filter(
ChoreCompletionLog.user_id == user_id
).group_by(
ChoreCompletionLog.chore_id
).order_by(
func.count(ChoreCompletionLog.id).desc()
).first()
if chore_counts:
chore = db.query(Chore).filter(Chore.id == chore_counts[0]).first()
if chore:
favorite_chore = chore.title
# Recent completions
recent_logs = db.query(ChoreCompletionLog).filter(
ChoreCompletionLog.user_id == user_id
).order_by(
ChoreCompletionLog.completed_at.desc()
).limit(10).all()
recent_completions = [enrich_completion_log(db, log) for log in recent_logs]
return {
"user_id": user.id,
"username": user.username,
"full_name": user.full_name,
"avatar_url": user.avatar_url,
"total_completions": total_completions,
"completions_this_week": completions_this_week,
"completions_this_month": completions_this_month,
"favorite_chore": favorite_chore,
"recent_completions": recent_completions
}
@router.delete("/completions/{log_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_completion_log(
log_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Delete a completion log entry (admin only or log owner).
Useful for correcting mistakes or removing accidental completions.
"""
log = db.query(ChoreCompletionLog).filter(ChoreCompletionLog.id == log_id).first()
if not log:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Completion log not found"
)
# Only admin or the user who completed can delete
if not current_user.is_admin and log.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to delete this completion log"
)
db.delete(log)
db.commit()
return None
@router.post("/completions/{log_id}/verify", response_model=log_schemas.ChoreCompletionLog)
def verify_completion(
log_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Verify a chore completion (requires different user than completer).
Useful for parents verifying kids' chores, or quality checks.
"""
log = db.query(ChoreCompletionLog).filter(ChoreCompletionLog.id == log_id).first()
if not log:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Completion log not found"
)
# Can't verify your own completion
if log.user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You cannot verify your own completion"
)
log.verified_by_user_id = current_user.id
db.commit()
db.refresh(log)
return enrich_completion_log(db, log)

View File

@@ -0,0 +1,336 @@
"""Chores API endpoints."""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from datetime import datetime, date
from app.core.database import get_db
from app.api.v1.auth import get_current_user
from app.models.user import User
from app.models.chore import Chore, ChoreStatus
from app.models.chore_assignment import ChoreAssignment
from app.schemas import chore as chore_schemas
router = APIRouter()
def is_birthday_today(user: User) -> bool:
"""Check if today is the user's birthday."""
if not user.birthday:
return False
today = date.today()
return user.birthday.month == today.month and user.birthday.day == today.day
def get_chore_with_assignments(db: Session, chore: Chore) -> dict:
"""Convert chore to dict with assignment details."""
# Get all assignments for this chore
assignments = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore.id
).all()
# Build assigned users list with completion status
assigned_users = []
for assignment in assignments:
user = db.query(User).filter(User.id == assignment.user_id).first()
if user:
assigned_users.append({
"id": user.id,
"username": user.username,
"full_name": user.full_name,
"avatar_url": user.avatar_url,
"birthday": user.birthday,
"completed_at": assignment.completed_at
})
# Convert chore to dict
chore_dict = {
"id": chore.id,
"title": chore.title,
"description": chore.description,
"room": chore.room,
"frequency": chore.frequency,
"points": chore.points,
"image_url": chore.image_url,
"assignment_type": chore.assignment_type,
"status": chore.status,
"due_date": chore.due_date,
"completed_at": chore.completed_at,
"created_at": chore.created_at,
"updated_at": chore.updated_at,
"assigned_users": assigned_users,
"assigned_user_id": chore.assigned_user_id # Legacy compatibility
}
return chore_dict
@router.get("", response_model=List[chore_schemas.Chore])
def get_chores(
skip: int = 0,
limit: int = 100,
user_id: Optional[int] = Query(None, description="Filter by assigned user ID"),
exclude_birthdays: bool = Query(False, description="Exclude chores for users with birthdays today"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get all chores.
- **user_id**: Filter chores assigned to specific user
- **exclude_birthdays**: Skip chores for users with birthdays today
"""
query = db.query(Chore)
# Apply user filter if specified
if user_id:
# Find chores assigned to this user through the assignments table
assignment_ids = db.query(ChoreAssignment.chore_id).filter(
ChoreAssignment.user_id == user_id
).all()
chore_ids = [aid[0] for aid in assignment_ids]
query = query.filter(Chore.id.in_(chore_ids))
chores = query.offset(skip).limit(limit).all()
# Build response with assignments
result = []
for chore in chores:
chore_data = get_chore_with_assignments(db, chore)
# Filter out if birthday exclusion is enabled
if exclude_birthdays:
# Skip if any assigned user has birthday today
skip_chore = False
for assigned_user in chore_data["assigned_users"]:
user = db.query(User).filter(User.id == assigned_user["id"]).first()
if user and is_birthday_today(user):
skip_chore = True
break
if skip_chore:
continue
result.append(chore_data)
return result
@router.get("/{chore_id}", response_model=chore_schemas.Chore)
def get_chore(
chore_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get a specific chore by ID."""
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
return get_chore_with_assignments(db, chore)
@router.post("", response_model=chore_schemas.Chore, status_code=status.HTTP_201_CREATED)
def create_chore(
chore_in: chore_schemas.ChoreCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Create a new chore with multiple user assignments."""
# Extract user IDs before creating chore
assigned_user_ids = chore_in.assigned_user_ids or []
# Create chore without assigned_user_ids (not a DB column)
chore_data = chore_in.model_dump(exclude={'assigned_user_ids'})
chore = Chore(**chore_data)
db.add(chore)
db.commit()
db.refresh(chore)
# Create assignments for each user
for user_id in assigned_user_ids:
# Verify user exists
user = db.query(User).filter(User.id == user_id).first()
if not user:
continue
assignment = ChoreAssignment(
chore_id=chore.id,
user_id=user_id
)
db.add(assignment)
db.commit()
return get_chore_with_assignments(db, chore)
@router.put("/{chore_id}", response_model=chore_schemas.Chore)
def update_chore(
chore_id: int,
chore_in: chore_schemas.ChoreUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Update a chore (admin only or assigned user for status updates).
Admin users can update all fields.
Non-admin users can only update status for chores assigned to them.
"""
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
update_data = chore_in.model_dump(exclude_unset=True, exclude={'assigned_user_ids'})
# Check permissions
is_admin = current_user.is_admin
is_assigned = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id,
ChoreAssignment.user_id == current_user.id
).first() is not None
if not is_admin and not is_assigned:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to update this chore"
)
# Non-admins can only update status
if not is_admin:
allowed_fields = {'status'}
update_data = {k: v for k, v in update_data.items() if k in allowed_fields}
# Handle status change to completed
if update_data.get("status") == ChoreStatus.COMPLETED:
# Mark assignment as completed for current user
assignment = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id,
ChoreAssignment.user_id == current_user.id
).first()
if assignment:
assignment.completed_at = datetime.utcnow()
# If all assignments are completed, mark chore as completed
all_assignments = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id
).all()
if all(a.completed_at is not None for a in all_assignments):
chore.completed_at = datetime.utcnow()
chore.status = ChoreStatus.COMPLETED
# Update chore fields (admins only)
if is_admin:
for field, value in update_data.items():
# Convert empty string to None for datetime fields
if field == 'due_date' and value == '':
value = None
setattr(chore, field, value)
# Handle user assignments update
if 'assigned_user_ids' in chore_in.model_dump(exclude_unset=True):
assigned_user_ids = chore_in.assigned_user_ids or []
# Remove old assignments
db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id
).delete()
# Add new assignments
for user_id in assigned_user_ids:
user = db.query(User).filter(User.id == user_id).first()
if not user:
continue
assignment = ChoreAssignment(
chore_id=chore.id,
user_id=user_id
)
db.add(assignment)
db.commit()
db.refresh(chore)
return get_chore_with_assignments(db, chore)
@router.delete("/{chore_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_chore(
chore_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete a chore (admin only)."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can delete chores"
)
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
db.delete(chore)
db.commit()
return None
@router.post("/{chore_id}/assign", response_model=chore_schemas.Chore)
def assign_users_to_chore(
chore_id: int,
user_ids: List[int],
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Assign multiple users to a chore (admin only).
Replaces existing assignments.
"""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can assign chores"
)
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
# Remove existing assignments
db.query(ChoreAssignment).filter(ChoreAssignment.chore_id == chore_id).delete()
# Add new assignments
for user_id in user_ids:
user = db.query(User).filter(User.id == user_id).first()
if not user:
continue
assignment = ChoreAssignment(
chore_id=chore.id,
user_id=user_id
)
db.add(assignment)
db.commit()
db.refresh(chore)
return get_chore_with_assignments(db, chore)

View File

@@ -0,0 +1,297 @@
"""Public API endpoints for kiosk view (no authentication required)."""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List, Optional
from datetime import date
from app.core.database import get_db
from app.models.user import User
from app.models.chore import Chore
from app.models.chore_assignment import ChoreAssignment
from app.models.chore_completion_log import ChoreCompletionLog
router = APIRouter()
@router.get("/users")
async def get_public_users(db: Session = Depends(get_db)):
"""Get all active users (public endpoint for kiosk)."""
users = db.query(User).filter(User.is_active == True).all()
return [
{
"id": user.id,
"username": user.username,
"full_name": user.full_name,
"avatar_url": user.avatar_url,
"birthday": user.birthday,
"is_active": user.is_active,
}
for user in users
]
@router.get("/chores")
async def get_public_chores(
user_id: Optional[int] = None,
exclude_birthdays: bool = False,
db: Session = Depends(get_db)
):
"""
Get chores with optional filtering (public endpoint for kiosk).
- user_id: Filter by assigned user
- exclude_birthdays: Hide chores for users whose birthday is today
"""
query = db.query(Chore)
# Filter by user if specified
if user_id:
query = query.join(ChoreAssignment).filter(
ChoreAssignment.user_id == user_id
)
chores = query.all()
# Filter out birthday chores if requested
if exclude_birthdays:
today = date.today()
filtered_chores = []
for chore in chores:
# Get all assignments for this chore
assignments = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore.id
).all()
# Check if any assigned user has birthday today
has_birthday = False
for assignment in assignments:
user = db.query(User).filter(User.id == assignment.user_id).first()
if user and user.birthday:
if user.birthday.month == today.month and user.birthday.day == today.day:
has_birthday = True
break
if not has_birthday:
filtered_chores.append(chore)
chores = filtered_chores
# Build response with assigned users info
result = []
for chore in chores:
# Get assignments for this chore
assignments = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore.id
).all()
assigned_users = []
for assignment in assignments:
user = db.query(User).filter(User.id == assignment.user_id).first()
if user:
assigned_users.append({
"id": user.id,
"username": user.username,
"full_name": user.full_name,
"avatar_url": user.avatar_url,
"birthday": user.birthday,
"completed_at": assignment.completed_at
})
chore_dict = {
"id": chore.id,
"title": chore.title,
"description": chore.description,
"room": chore.room,
"frequency": chore.frequency,
"points": chore.points,
"image_url": chore.image_url,
"assignment_type": chore.assignment_type,
"status": chore.status,
"due_date": chore.due_date,
"created_at": chore.created_at,
"updated_at": chore.updated_at,
"completed_at": chore.completed_at,
"assigned_users": assigned_users,
"assigned_user_id": None # Legacy field
}
result.append(chore_dict)
return result
@router.post("/chores/{chore_id}/complete")
async def complete_public_chore(
chore_id: int,
user_id: int,
helper_ids: Optional[List[int]] = None,
db: Session = Depends(get_db)
):
"""
Mark a chore as complete for a specific user (public endpoint for kiosk).
Query params:
- user_id: The user completing the chore
- helper_ids: Optional list of other user IDs who helped
"""
# Get chore
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
# Get user
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Get assignment
assignment = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id,
ChoreAssignment.user_id == user_id
).first()
if not assignment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not assigned to this chore"
)
# Mark as complete
from datetime import datetime
completion_time = datetime.now()
assignment.completed_at = completion_time
# CREATE COMPLETION LOG ENTRY
completion_log = ChoreCompletionLog(
chore_id=chore_id,
user_id=user_id,
completed_at=completion_time,
notes=None # Kiosk doesn't support notes yet
)
db.add(completion_log)
# If helpers provided, mark them as completed too
if helper_ids:
for helper_id in helper_ids:
helper_assignment = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id,
ChoreAssignment.user_id == helper_id
).first()
# Create assignment if doesn't exist (helper claimed they helped)
if not helper_assignment:
helper_assignment = ChoreAssignment(
chore_id=chore_id,
user_id=helper_id,
completed_at=datetime.now()
)
db.add(helper_assignment)
else:
helper_assignment.completed_at = datetime.now()
# CREATE COMPLETION LOG FOR HELPER
helper_log = ChoreCompletionLog(
chore_id=chore_id,
user_id=helper_id,
completed_at=datetime.now(),
notes=f"Helped {user.full_name or user.username}"
)
db.add(helper_log)
# Check if chore is complete based on assignment_type
all_assignments = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id
).all()
if chore.assignment_type == "any_one":
# Only one person needs to complete
if assignment.completed_at:
chore.status = 'completed'
chore.completed_at = datetime.now()
else: # all_assigned
# All assigned must complete
all_complete = all([a.completed_at is not None for a in all_assignments])
if all_complete:
chore.status = 'completed'
chore.completed_at = datetime.now()
else:
chore.status = 'in_progress'
db.commit()
return {
"message": "Chore marked as complete",
"chore_id": chore_id,
"user_id": user_id,
"helper_ids": helper_ids or [],
"chore_status": chore.status
}
@router.post("/chores/{chore_id}/claim")
async def claim_public_chore(
chore_id: int,
user_id: int,
db: Session = Depends(get_db)
):
"""
Claim an available chore (add user as assigned).
Query params:
- user_id: The user claiming the chore
"""
# Get chore
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
# Get user
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Check if already assigned
existing = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id,
ChoreAssignment.user_id == user_id
).first()
if existing:
return {
"message": "Already assigned to this chore",
"chore_id": chore_id,
"user_id": user_id
}
# Create assignment
assignment = ChoreAssignment(
chore_id=chore_id,
user_id=user_id
)
db.add(assignment)
# Update chore status if needed
if chore.status == 'pending':
chore.status = 'in_progress'
db.commit()
return {
"message": "Chore claimed successfully",
"chore_id": chore_id,
"user_id": user_id
}

View File

@@ -0,0 +1,322 @@
"""Upload API endpoints for images."""
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File
from sqlalchemy.orm import Session
from pathlib import Path
import uuid
import shutil
from app.core.database import get_db
from app.api.v1.auth import get_current_user
from app.models.user import User
from app.models.chore import Chore
router = APIRouter()
# Configure upload directories
UPLOAD_DIR = Path(__file__).parent.parent.parent / "static" / "uploads"
USER_UPLOAD_DIR = UPLOAD_DIR / "users"
CHORE_UPLOAD_DIR = UPLOAD_DIR / "chores"
# Ensure directories exist
USER_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
CHORE_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# Allowed image extensions
ALLOWED_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp'}
def validate_image(filename: str) -> bool:
"""Check if file extension is allowed"""
ext = Path(filename).suffix.lower()
return ext in ALLOWED_EXTENSIONS
def save_upload_file(upload_file: UploadFile, destination: Path) -> str:
"""Save uploaded file and return filename"""
try:
with destination.open("wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
return destination.name
finally:
upload_file.file.close()
@router.post("/users/avatar", status_code=status.HTTP_200_OK)
async def upload_user_avatar(
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Upload avatar for current user.
- Accepts: JPG, JPEG, PNG, GIF, WEBP
- Max size: 5MB (handled by FastAPI)
- Overwrites existing avatar
"""
if not validate_image(file.filename):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_EXTENSIONS)}"
)
# Generate unique filename
file_ext = Path(file.filename).suffix.lower()
filename = f"user_{current_user.id}_{uuid.uuid4().hex[:8]}{file_ext}"
file_path = USER_UPLOAD_DIR / filename
# Delete old avatar if exists
if current_user.avatar_url:
old_file = USER_UPLOAD_DIR / Path(current_user.avatar_url).name
if old_file.exists():
old_file.unlink()
# Save new avatar
save_upload_file(file, file_path)
# Update user record
avatar_url = f"/static/uploads/users/{filename}"
current_user.avatar_url = avatar_url
db.commit()
db.refresh(current_user)
return {
"message": "Avatar uploaded successfully",
"avatar_url": avatar_url
}
@router.delete("/users/avatar", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user_avatar(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete current user's avatar"""
if not current_user.avatar_url:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No avatar to delete"
)
# Delete file
old_file = USER_UPLOAD_DIR / Path(current_user.avatar_url).name
if old_file.exists():
old_file.unlink()
# Update database
current_user.avatar_url = None
db.commit()
return None
@router.post("/chores/{chore_id}/image", status_code=status.HTTP_200_OK)
async def upload_chore_image(
chore_id: int,
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Upload image for a chore (admin or assigned user only).
- Accepts: JPG, JPEG, PNG, GIF, WEBP
- Max size: 5MB (handled by FastAPI)
- Overwrites existing image
"""
# Get chore
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
# Check permissions (admin or assigned user)
from app.models.chore_assignment import ChoreAssignment
is_assigned = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id,
ChoreAssignment.user_id == current_user.id
).first() is not None
if not current_user.is_admin and not is_assigned:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to upload image for this chore"
)
if not validate_image(file.filename):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_EXTENSIONS)}"
)
# Generate unique filename
file_ext = Path(file.filename).suffix.lower()
filename = f"chore_{chore_id}_{uuid.uuid4().hex[:8]}{file_ext}"
file_path = CHORE_UPLOAD_DIR / filename
# Delete old image if exists
if chore.image_url:
old_file = CHORE_UPLOAD_DIR / Path(chore.image_url).name
if old_file.exists():
old_file.unlink()
# Save new image
save_upload_file(file, file_path)
# Update chore record
image_url = f"/static/uploads/chores/{filename}"
chore.image_url = image_url
db.commit()
db.refresh(chore)
return {
"message": "Image uploaded successfully",
"image_url": image_url
}
@router.delete("/chores/{chore_id}/image", status_code=status.HTTP_204_NO_CONTENT)
async def delete_chore_image(
chore_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete chore image (admin or assigned user only)"""
# Get chore
chore = db.query(Chore).filter(Chore.id == chore_id).first()
if not chore:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Chore not found"
)
# Check permissions
from app.models.chore_assignment import ChoreAssignment
is_assigned = db.query(ChoreAssignment).filter(
ChoreAssignment.chore_id == chore_id,
ChoreAssignment.user_id == current_user.id
).first() is not None
if not current_user.is_admin and not is_assigned:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to delete image for this chore"
)
if not chore.image_url:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No image to delete"
)
# Delete file
old_file = CHORE_UPLOAD_DIR / Path(chore.image_url).name
if old_file.exists():
old_file.unlink()
# Update database
chore.image_url = None
db.commit()
return None
@router.post("/admin/users/{user_id}/avatar", status_code=status.HTTP_200_OK)
async def admin_upload_user_avatar(
user_id: int,
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Upload avatar for any user (admin only).
- Accepts: JPG, JPEG, PNG, GIF, WEBP
- Max size: 5MB
- Admin only
"""
# Check if current user is admin
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can upload avatars for other users"
)
# Get target user
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if not validate_image(file.filename):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_EXTENSIONS)}"
)
# Generate unique filename
file_ext = Path(file.filename).suffix.lower()
filename = f"user_{user_id}_{uuid.uuid4().hex[:8]}{file_ext}"
file_path = USER_UPLOAD_DIR / filename
# Delete old avatar if exists
if target_user.avatar_url:
old_file = USER_UPLOAD_DIR / Path(target_user.avatar_url).name
if old_file.exists():
old_file.unlink()
# Save new avatar
save_upload_file(file, file_path)
# Update user record
avatar_url = f"/static/uploads/users/{filename}"
target_user.avatar_url = avatar_url
db.commit()
db.refresh(target_user)
return {
"message": "Avatar uploaded successfully",
"avatar_url": avatar_url
}
@router.delete("/admin/users/{user_id}/avatar", status_code=status.HTTP_204_NO_CONTENT)
async def admin_delete_user_avatar(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete avatar for any user (admin only)"""
# Check if current user is admin
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can delete avatars for other users"
)
# Get target user
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if not target_user.avatar_url:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No avatar to delete"
)
# Delete file
old_file = USER_UPLOAD_DIR / Path(target_user.avatar_url).name
if old_file.exists():
old_file.unlink()
# Update database
target_user.avatar_url = None
db.commit()
return None

114
backend/app/api/v1/users.py Normal file
View File

@@ -0,0 +1,114 @@
"""User management endpoints."""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from app.core.database import get_db
from app.models.user import User
from app.schemas.user import UserResponse, UserUpdate
from app.api.v1.auth import get_current_user
router = APIRouter()
@router.get("/", response_model=List[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""List all users (admin only)."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get user by ID."""
# Users can view their own profile, admins can view any profile
if user_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_update: UserUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update user information."""
# Users can update their own profile, admins can update any profile
if user_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Update fields if provided
if user_update.email is not None:
user.email = user_update.email
if user_update.full_name is not None:
user.full_name = user_update.full_name
if user_update.password is not None:
from app.core.security import get_password_hash
user.hashed_password = get_password_hash(user_update.password)
if user_update.is_active is not None and current_user.is_admin:
user.is_active = user_update.is_active
db.commit()
db.refresh(user)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete user (admin only)."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
db.delete(user)
db.commit()
return None

View File

@@ -0,0 +1 @@
# Core package

View File

@@ -0,0 +1,42 @@
"""Application configuration."""
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
"""Application settings."""
APP_NAME: str = "Family Hub"
APP_VERSION: str = "0.1.0"
DEBUG: bool = True
# Database
DATABASE_URL: str = "sqlite:///./data/family_hub.db"
# Security
SECRET_KEY: str = "your-secret-key-change-this-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS - Allow all origins for development (network access)
# In production, set this to specific domains in .env file
ALLOWED_ORIGINS: str = "*"
# Environment
ENVIRONMENT: str = "development"
class Config:
env_file = ".env"
case_sensitive = True
@property
def cors_origins(self) -> List[str]:
"""Parse ALLOWED_ORIGINS into a list."""
# Allow all origins if set to "*"
if self.ALLOWED_ORIGINS == "*":
return ["*"]
if isinstance(self.ALLOWED_ORIGINS, str):
return [origin.strip() for origin in self.ALLOWED_ORIGINS.split(',')]
return self.ALLOWED_ORIGINS
settings = Settings()

View File

@@ -0,0 +1,25 @@
"""Database configuration."""
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
# Create database engine
engine = create_engine(
settings.DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {}
)
# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create base class for models
Base = declarative_base()
def get_db():
"""Dependency to get database session."""
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@@ -0,0 +1,39 @@
"""Security utilities for password hashing and JWT tokens."""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
"""Decode a JWT access token."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None

57
backend/app/main.py Normal file
View File

@@ -0,0 +1,57 @@
"""Main FastAPI application."""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pathlib import Path
from app.core.config import settings
from app.api.v1 import auth, users, chores, uploads, public, chore_logs
# Create FastAPI app
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
docs_url="/docs",
redoc_url="/redoc"
)
# Configure CORS
print("="*70)
print("FAMILY HUB - CORS CONFIGURATION")
print("="*70)
print(f"Allowed Origins: {settings.cors_origins}")
print("="*70)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files for uploads
static_path = Path(__file__).parent / "static"
static_path.mkdir(exist_ok=True)
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
# Include routers
app.include_router(auth.router, prefix="/api/v1/auth", tags=["authentication"])
app.include_router(users.router, prefix="/api/v1/users", tags=["users"])
app.include_router(chores.router, prefix="/api/v1/chores", tags=["chores"])
app.include_router(chore_logs.router, prefix="/api/v1/chores", tags=["chore-logs"])
app.include_router(uploads.router, prefix="/api/v1/uploads", tags=["uploads"])
app.include_router(public.router, prefix="/api/v1/public", tags=["public"])
@app.get("/")
async def root():
"""Root endpoint."""
return {
"message": "Family Hub API",
"version": settings.APP_VERSION,
"docs": "/docs"
}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}

View File

@@ -0,0 +1,7 @@
# Models package
from app.models.user import User
from app.models.chore import Chore
from app.models.chore_assignment import ChoreAssignment
from app.models.chore_completion_log import ChoreCompletionLog
__all__ = ["User", "Chore", "ChoreAssignment", "ChoreCompletionLog"]

View File

@@ -0,0 +1,49 @@
"""Chore model."""
from sqlalchemy import Boolean, Column, Integer, String, DateTime, ForeignKey, Enum as SQLEnum
from sqlalchemy.orm import relationship
from datetime import datetime
from app.core.database import Base
import enum
class ChoreFrequency(str, enum.Enum):
"""Chore frequency options."""
ON_TRIGGER = "on_trigger"
DAILY = "daily"
WEEKLY = "weekly"
FORTNIGHTLY = "fortnightly"
MONTHLY = "monthly"
class ChoreStatus(str, enum.Enum):
"""Chore status options."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
SKIPPED = "skipped"
class ChoreAssignmentType(str, enum.Enum):
"""Chore assignment type - how the chore should be completed."""
ANY_ONE = "any_one" # Only one assigned person needs to complete
ALL_ASSIGNED = "all_assigned" # All assigned people must complete
class Chore(Base):
"""Chore model."""
__tablename__ = "chores"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(String(500))
room = Column(String(50)) # bedroom1, bedroom2, kitchen, bathroom1, etc.
frequency = Column(SQLEnum(ChoreFrequency, values_callable=lambda x: [e.value for e in x]), nullable=False)
points = Column(Integer, default=0) # Points awarded for completing the chore
image_url = Column(String(500)) # URL to chore image
assignment_type = Column(SQLEnum(ChoreAssignmentType, values_callable=lambda x: [e.value for e in x]), default=ChoreAssignmentType.ANY_ONE) # How chore should be completed
status = Column(SQLEnum(ChoreStatus, values_callable=lambda x: [e.value for e in x]), default=ChoreStatus.PENDING)
assigned_user_id = Column(Integer, ForeignKey("users.id")) # Deprecated - use assignments instead
due_date = Column(DateTime)
completed_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
assigned_user = relationship("User", back_populates="chores") # Deprecated - use assignments
assignments = relationship("ChoreAssignment", back_populates="chore", cascade="all, delete-orphan")

View File

@@ -0,0 +1,19 @@
"""Chore Assignment model for many-to-many user-chore relationship."""
from sqlalchemy import Column, Integer, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
from app.core.database import Base
class ChoreAssignment(Base):
"""Junction table for assigning multiple users to chores."""
__tablename__ = "chore_assignments"
id = Column(Integer, primary_key=True, index=True)
chore_id = Column(Integer, ForeignKey("chores.id", ondelete="CASCADE"), nullable=False)
user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
completed_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
chore = relationship("Chore", back_populates="assignments")
user = relationship("User", back_populates="chore_assignments")

View File

@@ -0,0 +1,31 @@
"""Chore Completion Log model for tracking historical chore completions."""
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text
from sqlalchemy.orm import relationship
from datetime import datetime
from app.core.database import Base
class ChoreCompletionLog(Base):
"""
Comprehensive log of chore completions.
This model tracks every completion instance, allowing for:
- Historical completion data
- Multiple completions of the same chore
- Optional notes and verification
- Weekly/monthly reporting
"""
__tablename__ = "chore_completion_logs"
id = Column(Integer, primary_key=True, index=True)
chore_id = Column(Integer, ForeignKey("chores.id", ondelete="CASCADE"), nullable=False, index=True)
user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True)
completed_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
notes = Column(Text, nullable=True) # Optional notes about the completion
verified_by_user_id = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True) # Optional verification
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
# Relationships
chore = relationship("Chore", foreign_keys=[chore_id])
user = relationship("User", foreign_keys=[user_id], back_populates="chore_completion_logs")
verified_by = relationship("User", foreign_keys=[verified_by_user_id])

View File

@@ -0,0 +1,18 @@
"""Meal model."""
from sqlalchemy import Column, Integer, String, DateTime, Text
from datetime import datetime
from app.core.database import Base
class Meal(Base):
"""Meal model for menu planning."""
__tablename__ = "meals"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(Text)
meal_type = Column(String(20)) # breakfast, lunch, dinner, snack
scheduled_date = Column(DateTime)
mealie_recipe_id = Column(String(100)) # Link to Mealie recipe
notes = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

View File

@@ -0,0 +1,28 @@
"""User model."""
from sqlalchemy import Boolean, Column, Integer, String, DateTime, Date
from sqlalchemy.orm import relationship
from datetime import datetime
from app.core.database import Base
class User(Base):
"""User model."""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
full_name = Column(String(100))
hashed_password = Column(String(200), nullable=False)
discord_id = Column(String(100)) # For Discord integration
profile_picture = Column(String(500)) # URL to profile picture
avatar_url = Column(String(500)) # URL to uploaded avatar
birthday = Column(Date, nullable=True) # Birthday for chore logic
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships (lazy loaded to avoid circular imports)
chores = relationship("Chore", back_populates="assigned_user", lazy="select")
chore_assignments = relationship("ChoreAssignment", back_populates="user", lazy="select")
chore_completion_logs = relationship("ChoreCompletionLog", foreign_keys="[ChoreCompletionLog.user_id]", back_populates="user", lazy="select")

View File

@@ -0,0 +1,4 @@
# Schemas package
from app.schemas import auth, chore, user, chore_completion_log
__all__ = ["auth", "chore", "user", "chore_completion_log"]

View File

@@ -0,0 +1,11 @@
"""Authentication schemas."""
from pydantic import BaseModel
class Token(BaseModel):
"""Token response schema."""
access_token: str
token_type: str
class TokenData(BaseModel):
"""Token data schema."""
username: str | None = None

View File

@@ -0,0 +1,99 @@
"""Chore schemas."""
from pydantic import BaseModel, ConfigDict, field_validator
from typing import Optional, Union, List
from datetime import datetime, date
from app.models.chore import ChoreFrequency, ChoreStatus, ChoreAssignmentType
class ChoreBase(BaseModel):
"""Base chore schema."""
title: str
description: Optional[str] = None
room: str
frequency: ChoreFrequency
points: Optional[int] = 0
image_url: Optional[str] = None
assignment_type: Optional[ChoreAssignmentType] = ChoreAssignmentType.ANY_ONE
due_date: Optional[Union[datetime, date, str]] = None
@field_validator('due_date', mode='before')
@classmethod
def parse_due_date(cls, v):
"""Parse due_date to handle various formats."""
if v is None or v == '' or isinstance(v, (datetime, date)):
return None if v == '' else v
if isinstance(v, str):
# Try parsing as datetime first
for fmt in ['%Y-%m-%dT%H:%M:%S', '%Y-%m-%d']:
try:
return datetime.strptime(v, fmt)
except ValueError:
continue
# If no format matches, return None instead of the invalid string
return None
return None
class ChoreCreate(ChoreBase):
"""Schema for creating a chore."""
assigned_user_ids: Optional[List[int]] = [] # Multiple users can be assigned
class ChoreUpdate(BaseModel):
"""Schema for updating a chore."""
title: Optional[str] = None
description: Optional[str] = None
room: Optional[str] = None
frequency: Optional[ChoreFrequency] = None
points: Optional[int] = None
status: Optional[ChoreStatus] = None
assignment_type: Optional[ChoreAssignmentType] = None
assigned_user_ids: Optional[List[int]] = None # Multiple users
due_date: Optional[Union[datetime, date, str]] = None
@field_validator('due_date', mode='before')
@classmethod
def parse_due_date(cls, v):
"""Parse due_date to handle various formats."""
if v is None or v == '' or isinstance(v, (datetime, date)):
return None if v == '' else v
if isinstance(v, str):
# Try parsing as datetime first
for fmt in ['%Y-%m-%dT%H:%M:%S', '%Y-%m-%d']:
try:
return datetime.strptime(v, fmt)
except ValueError:
continue
# If no format matches, return None instead of the invalid string
return None
return None
class AssignedUserDetail(BaseModel):
"""User info for chore assignment."""
model_config = ConfigDict(from_attributes=True)
id: int
username: str
full_name: str
avatar_url: Optional[str] = None
birthday: Optional[date] = None
completed_at: Optional[datetime] = None # When this user completed the chore
class Chore(ChoreBase):
"""Schema for a chore response."""
model_config = ConfigDict(from_attributes=True)
id: int
status: ChoreStatus
points: int
assignment_type: ChoreAssignmentType
assigned_users: List[AssignedUserDetail] = [] # Multiple users with completion status
completed_at: Optional[datetime] = None
created_at: datetime
updated_at: datetime
# Legacy field for backward compatibility
assigned_user_id: Optional[int] = None

View File

@@ -0,0 +1,66 @@
"""Schemas for Chore Completion Logs."""
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
class ChoreCompletionLogBase(BaseModel):
"""Base schema for chore completion log."""
notes: Optional[str] = Field(None, description="Optional notes about the completion")
class ChoreCompletionLogCreate(ChoreCompletionLogBase):
"""Schema for creating a chore completion log."""
chore_id: int = Field(..., description="ID of the chore being completed")
user_id: int = Field(..., description="ID of the user completing the chore")
completed_at: Optional[datetime] = Field(None, description="When the chore was completed (defaults to now)")
class ChoreCompletionLogUpdate(BaseModel):
"""Schema for updating a chore completion log."""
notes: Optional[str] = Field(None, description="Update notes about the completion")
verified_by_user_id: Optional[int] = Field(None, description="ID of user verifying the completion")
class ChoreCompletionLog(ChoreCompletionLogBase):
"""Schema for chore completion log response."""
id: int
chore_id: int
user_id: int
completed_at: datetime
verified_by_user_id: Optional[int] = None
created_at: datetime
# Nested objects for expanded responses
chore_title: Optional[str] = None
user_name: Optional[str] = None
user_avatar: Optional[str] = None
verified_by_name: Optional[str] = None
class Config:
from_attributes = True
class WeeklyChoreReport(BaseModel):
"""Schema for weekly chore completion report."""
start_date: datetime
end_date: datetime
total_completions: int
completions_by_user: dict[str, int] # {username: count}
completions_by_chore: dict[str, int] # {chore_title: count}
completions_by_day: dict[str, int] # {day: count}
top_performers: list[dict] # [{username, count, avatar_url}]
recent_completions: list[ChoreCompletionLog]
class UserChoreStats(BaseModel):
"""Schema for user-specific chore statistics."""
user_id: int
username: str
full_name: Optional[str] = None
avatar_url: Optional[str] = None
total_completions: int
completions_this_week: int
completions_this_month: int
favorite_chore: Optional[str] = None
recent_completions: list[ChoreCompletionLog]

View File

@@ -0,0 +1,47 @@
"""User schemas."""
from pydantic import BaseModel, Field
from datetime import datetime, date
from typing import Optional
class UserBase(BaseModel):
"""Base user schema."""
username: str
email: str # Changed from EmailStr to allow .local domains for home networks
full_name: Optional[str] = None
discord_id: Optional[str] = None
profile_picture: Optional[str] = None
avatar_url: Optional[str] = None
birthday: Optional[date] = None
class UserCreate(UserBase):
"""Schema for creating a user."""
password: str
class UserUpdate(BaseModel):
"""Schema for updating a user."""
email: Optional[str] = None
full_name: Optional[str] = None
discord_id: Optional[str] = None
profile_picture: Optional[str] = None
birthday: Optional[date] = None
password: Optional[str] = None
is_active: Optional[bool] = None
class UserAdminUpdate(UserUpdate):
"""Schema for admin updating a user (includes admin-only fields)."""
is_admin: Optional[bool] = None
class UserResponse(UserBase):
"""Schema for user response."""
id: int
is_active: bool
is_admin: bool
created_at: datetime
class Config:
from_attributes = True
class UserLogin(BaseModel):
"""Schema for user login."""
username: str
password: str

37
backend/check_cors.py Normal file
View File

@@ -0,0 +1,37 @@
"""Check what CORS configuration the backend is actually using."""
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from app.core.config import settings
print("="*70)
print("BACKEND CORS CONFIGURATION CHECK")
print("="*70)
print()
print(f"ALLOWED_ORIGINS (raw): {settings.ALLOWED_ORIGINS}")
print(f"Type: {type(settings.ALLOWED_ORIGINS)}")
print()
print(f"cors_origins (parsed): {settings.cors_origins}")
print(f"Type: {type(settings.cors_origins)}")
print()
print("Individual origins:")
for i, origin in enumerate(settings.cors_origins, 1):
print(f" {i}. '{origin}' (length: {len(origin)})")
print()
print("="*70)
print()
# Check if the frontend URL is in there
frontend_url = "http://localhost:5173"
if frontend_url in settings.cors_origins:
print(f"✅ Frontend URL '{frontend_url}' IS in allowed origins")
else:
print(f"❌ Frontend URL '{frontend_url}' NOT in allowed origins")
print(f" Closest match: {[o for o in settings.cors_origins if '5173' in o]}")

43
backend/check_db.py Normal file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
"""Quick script to check database schema."""
import sqlite3
from pathlib import Path
db_path = Path(__file__).parent / "data" / "family_hub.db"
if not db_path.exists():
print(f"ERROR: Database not found at {db_path}")
exit(1)
print(f"Checking database: {db_path}")
print("=" * 70)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Get all tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
print(f"\nTables in database: {[t[0] for t in tables]}")
# Get chores table schema
cursor.execute("PRAGMA table_info(chores)")
columns = cursor.fetchall()
print("\nChores table columns:")
for col in columns:
col_id, name, type_, notnull, default, pk = col
print(f" {name:20} {type_:15} {'NOT NULL' if notnull else ''}")
# Check if we have data with assignment_type
cursor.execute("SELECT id, title, assignment_type FROM chores LIMIT 5")
rows = cursor.fetchall()
print(f"\nFirst 5 chores (checking assignment_type values):")
for row in rows:
id_, title, assignment_type = row
print(f" [{id_:2}] {title:30} -> assignment_type: '{assignment_type}'")
conn.close()
print("\n" + "=" * 70)
print("Database check complete!")

View File

@@ -0,0 +1,37 @@
@echo off
echo ============================================================
echo Clearing Python Cache and Resetting Database
echo ============================================================
echo.
echo 1. Stopping any running backend servers...
taskkill /F /IM python.exe 2>nul
timeout /t 2 >nul
echo 2. Deleting Python cache files...
for /d /r %%d in (__pycache__) do @if exist "%%d" (
rd /s /q "%%d"
echo Deleted: %%d
)
echo 3. Deleting .pyc files...
del /s /q "*.pyc" 2>nul
echo 4. Deleting database...
if exist "data\family_hub.db" (
del /q "data\family_hub.db"
echo Database deleted
) else (
echo No database found
)
echo 5. Running database reset...
call venv\Scripts\activate.bat
python reset_database.py
echo.
echo ============================================================
echo Cache cleared and database reset complete!
echo ============================================================
echo.
pause

79
backend/diagnose.py Normal file
View File

@@ -0,0 +1,79 @@
"""Diagnostic script to check database connection and configuration."""
import sys
import os
from pathlib import Path
print("="*70)
print("FAMILY HUB - DATABASE DIAGNOSTIC")
print("="*70)
print()
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
print(f"Current Working Directory: {os.getcwd()}")
print(f"Script Location: {Path(__file__).parent.absolute()}")
print()
try:
from app.core.config import settings
from app.core.database import engine, SessionLocal
from app.models.user import User
print("✓ Successfully imported app modules")
print()
print("DATABASE CONFIGURATION:")
print(f" DATABASE_URL: {settings.DATABASE_URL}")
print(f" Database Engine: {engine.url}")
print()
# Check if database file exists
db_path = str(engine.url).replace("sqlite:///", "")
if db_path.startswith("./"):
db_path = os.path.join(os.getcwd(), db_path[2:])
print(f" Resolved DB Path: {db_path}")
print(f" Database Exists: {os.path.exists(db_path)}")
if os.path.exists(db_path):
file_size = os.path.getsize(db_path)
print(f" Database Size: {file_size:,} bytes")
print()
# Try to connect and query
print("ATTEMPTING DATABASE CONNECTION:")
db = SessionLocal()
try:
user_count = db.query(User).count()
print(f" ✅ Connection successful!")
print(f" Total users in database: {user_count}")
print()
if user_count > 0:
print("USERS IN DATABASE:")
users = db.query(User).all()
for user in users:
print(f" - {user.username} ({user.full_name})")
print(f" Email: {user.email}")
print(f" Admin: {user.is_admin}, Active: {user.is_active}")
print(f" Password Hash: {user.hashed_password[:50]}...")
print()
else:
print(" ⚠️ No users found in database!")
print(" You may need to run: python init_db.py")
except Exception as e:
print(f" ❌ Database connection failed: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
except Exception as e:
print(f"❌ Failed to import modules: {e}")
import traceback
traceback.print_exc()
print()
print("="*70)

File diff suppressed because it is too large Load Diff

228
backend/init_db.py Normal file
View File

@@ -0,0 +1,228 @@
#!/usr/bin/env python3
"""
Database initialization script for Family Hub.
Creates tables and populates with demo data.
"""
import sys
from pathlib import Path
from datetime import datetime, timedelta
# Add parent directory to path to import from app
sys.path.insert(0, str(Path(__file__).parent))
from app.core.database import engine, SessionLocal, Base
from app.models import User, Chore
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def init_db():
"""Initialize the database with tables and demo data."""
print("Initializing Family Hub database...")
# Create all tables
print("Creating database tables...")
Base.metadata.create_all(bind=engine)
db = SessionLocal()
try:
# Check if data already exists
existing_users = db.query(User).count()
if existing_users > 0:
print(f"Database already has {existing_users} users. Skipping initialization.")
print(" To reset the database, delete the file and run this script again.")
return
# Create demo users
print("\nCreating demo users...")
users_data = [
{"username": "jess", "email": "jess@family.local", "full_name": "Jess", "is_admin": True},
{"username": "lou", "email": "lou@family.local", "full_name": "Lou", "is_admin": False},
{"username": "william", "email": "william@family.local", "full_name": "William", "is_admin": False},
{"username": "xander", "email": "xander@family.local", "full_name": "Xander", "is_admin": False},
{"username": "bella", "email": "bella@family.local", "full_name": "Bella", "is_admin": False},
]
users = []
for user_data in users_data:
user = User(
username=user_data["username"],
email=user_data["email"],
full_name=user_data["full_name"],
hashed_password=pwd_context.hash("password123"),
is_admin=user_data["is_admin"],
is_active=True
)
db.add(user)
users.append(user)
admin_badge = " [ADMIN]" if user.is_admin else ""
print(f" + {user.full_name} ({user.username}){admin_badge}")
db.flush() # Flush to get user IDs
# Create demo chores
print("\nCreating demo chores...")
# Get user IDs for assignment
jess = next(u for u in users if u.username == "jess")
lou = next(u for u in users if u.username == "lou")
william = next(u for u in users if u.username == "william")
xander = next(u for u in users if u.username == "xander")
bella = next(u for u in users if u.username == "bella")
demo_chores = [
# Daily chores
{
"title": "Feed the pets",
"description": "Feed cats in the morning and evening",
"frequency": "daily",
"assignment_type": "any_one",
"points": 5,
"room": "Kitchen",
"assigned_user_id": bella.id,
"status": "pending"
},
{
"title": "Take out trash",
"description": "Empty kitchen and bathroom bins",
"frequency": "daily",
"assignment_type": "any_one",
"points": 3,
"room": "Kitchen",
"assigned_user_id": xander.id,
"status": "pending"
},
{
"title": "Tidy living room",
"description": "Pick up toys, straighten cushions, clear coffee table",
"frequency": "daily",
"assignment_type": "any_one",
"points": 5,
"room": "Living Room",
"assigned_user_id": william.id,
"status": "in_progress"
},
# Weekly chores
{
"title": "Vacuum entire house",
"description": "Vacuum all carpeted areas including stairs",
"frequency": "weekly",
"assignment_type": "any_one",
"points": 15,
"room": "Whole House",
"assigned_user_id": lou.id,
"status": "pending"
},
{
"title": "Clean bathrooms",
"description": "Clean toilets, sinks, mirrors, and mop floors",
"frequency": "weekly",
"assignment_type": "any_one",
"points": 20,
"room": "Bathrooms",
"assigned_user_id": jess.id,
"status": "completed",
"completed_at": datetime.now() - timedelta(days=1)
},
{
"title": "Mow the lawn",
"description": "Mow front and back yard, edge walkways",
"frequency": "weekly",
"assignment_type": "any_one",
"points": 25,
"room": "Yard",
"assigned_user_id": william.id,
"status": "pending"
},
# Monthly chores
{
"title": "Deep clean kitchen",
"description": "Clean oven, fridge, cabinets, and behind appliances",
"frequency": "monthly",
"assignment_type": "any_one",
"points": 50,
"room": "Kitchen",
"assigned_user_id": jess.id,
"status": "pending"
},
{
"title": "Wash windows",
"description": "Clean all interior and exterior windows",
"frequency": "monthly",
"assignment_type": "any_one",
"points": 40,
"room": "Whole House",
"assigned_user_id": lou.id,
"status": "pending"
},
{
"title": "Organize garage",
"description": "Sort items, sweep floor, arrange tools",
"frequency": "monthly",
"assignment_type": "any_one",
"points": 35,
"room": "Garage",
"assigned_user_id": william.id,
"status": "in_progress"
},
{
"title": "Change air filters",
"description": "Replace HVAC filters throughout house",
"frequency": "monthly",
"assignment_type": "any_one",
"points": 10,
"room": "Whole House",
"assigned_user_id": jess.id,
"status": "pending"
},
# On-trigger chores
{
"title": "Grocery shopping",
"description": "Weekly grocery shopping trip",
"frequency": "on_trigger",
"assignment_type": "any_one",
"points": 30,
"room": "Shopping",
"assigned_user_id": lou.id,
"status": "pending"
},
{
"title": "Car wash",
"description": "Wash and vacuum family car",
"frequency": "on_trigger",
"assignment_type": "any_one",
"points": 20,
"room": "Driveway",
"assigned_user_id": xander.id,
"status": "pending"
},
]
for chore_data in demo_chores:
chore = Chore(**chore_data)
db.add(chore)
# Pretty print status
status_marker = "[DONE]" if chore_data["status"] == "completed" else "[WIP]" if chore_data["status"] == "in_progress" else "[TODO]"
print(f" {status_marker} {chore_data['title']} - {chore_data['frequency']} ({chore_data['room']})")
db.commit()
print(f"\n[SUCCESS] Database initialized with {len(users_data)} users and {len(demo_chores)} demo chores!")
print("\nLogin credentials:")
print(" Username: jess (admin) or lou, william, xander, bella")
print(" Password: password123")
except Exception as e:
print(f"[ERROR] Error initializing database: {e}")
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
init_db()

24
backend/make_lou_admin.py Normal file
View File

@@ -0,0 +1,24 @@
"""Quick script to make Lou an admin."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from app.core.database import SessionLocal
from app.models.user import User
def make_lou_admin():
db = SessionLocal()
try:
lou = db.query(User).filter(User.username == "lou").first()
if lou:
lou.is_admin = True
db.commit()
print(f"{lou.full_name} is now an admin!")
else:
print("❌ Lou not found in database")
finally:
db.close()
if __name__ == "__main__":
make_lou_admin()

View File

@@ -0,0 +1,53 @@
"""
Database migration: Add birthday field to users table.
This script safely adds a birthday column to the users table.
"""
import sys
from pathlib import Path
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.core.database import engine, SessionLocal
from sqlalchemy import text
def add_birthday_column():
"""Add birthday column to users table."""
db = SessionLocal()
try:
print("=" * 70)
print("MIGRATION: Add Birthday Field to Users")
print("=" * 70)
print()
# Check if column already exists
result = db.execute(text("PRAGMA table_info(users)"))
columns = [row[1] for row in result.fetchall()]
if 'birthday' in columns:
print("✅ Birthday column already exists. No migration needed.")
return
print("📋 Adding birthday column to users table...")
# Add birthday column (DATE type, nullable)
db.execute(text("ALTER TABLE users ADD COLUMN birthday DATE"))
db.commit()
print("✅ Birthday column added successfully!")
print()
print("Migration complete!")
print()
except Exception as e:
print(f"❌ Migration failed: {e}")
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
add_birthday_column()

View File

@@ -0,0 +1,81 @@
"""
Database migration: Add support for multiple users per chore.
This creates a chore_assignments junction table to allow
multiple users to be assigned to a single chore.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.core.database import engine, SessionLocal
from sqlalchemy import text
def create_chore_assignments_table():
"""Create chore_assignments table for many-to-many relationship."""
db = SessionLocal()
try:
print("=" * 70)
print("MIGRATION: Add Multiple Users Per Chore Support")
print("=" * 70)
print()
# Check if table already exists
result = db.execute(text("SELECT name FROM sqlite_master WHERE type='table' AND name='chore_assignments'"))
if result.fetchone():
print("✅ chore_assignments table already exists. No migration needed.")
return
print("📋 Creating chore_assignments table...")
# Create junction table
db.execute(text("""
CREATE TABLE chore_assignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chore_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
completed_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (chore_id) REFERENCES chores (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE,
UNIQUE(chore_id, user_id)
)
"""))
print("✅ chore_assignments table created!")
print()
print("📋 Migrating existing chore assignments...")
# Migrate existing assigned_user_id data to new table
# For each chore with an assigned_user_id, create an assignment record
result = db.execute(text("""
INSERT INTO chore_assignments (chore_id, user_id, completed_at)
SELECT id, assigned_user_id, completed_at
FROM chores
WHERE assigned_user_id IS NOT NULL
"""))
migrated_count = result.rowcount
db.commit()
print(f"✅ Migrated {migrated_count} existing chore assignments!")
print()
print("⚠️ NOTE: The assigned_user_id column in chores table is now deprecated.")
print(" It will be kept for backwards compatibility but not used.")
print()
print("Migration complete!")
print()
except Exception as e:
print(f"❌ Migration failed: {e}")
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
create_chore_assignments_table()

View File

@@ -0,0 +1,56 @@
"""Add image fields to users and chores tables
Revision ID: 003
Created: 2026-02-02
"""
import sqlite3
import os
def get_db_path():
"""Get database path"""
# Use absolute path
return r"D:\Hosted\familyhub\backend\data\family_hub.db"
def upgrade():
"""Add image fields to users and chores"""
db_path = get_db_path()
print(f"Connecting to database: {db_path}")
if not os.path.exists(db_path):
print(f"ERROR: Database file not found at {db_path}")
return
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Add avatar_url to users table
cursor.execute("""
ALTER TABLE users ADD COLUMN avatar_url VARCHAR(500);
""")
print("✓ Added avatar_url to users table")
# Add image_url to chores table
cursor.execute("""
ALTER TABLE chores ADD COLUMN image_url VARCHAR(500);
""")
print("✓ Added image_url to chores table")
conn.commit()
print("\n✅ Migration 003 completed successfully!")
except sqlite3.OperationalError as e:
if "duplicate column name" in str(e).lower():
print(f"⚠️ Column already exists: {e}")
else:
raise
finally:
conn.close()
def downgrade():
"""Remove image fields"""
print("Note: SQLite doesn't support DROP COLUMN easily.")
print("To rollback, you would need to recreate the tables.")
if __name__ == "__main__":
upgrade()

View File

@@ -0,0 +1,40 @@
"""Add assignment_type to chores table."""
import sqlite3
import sys
from pathlib import Path
# Add parent directory to path to import from app
sys.path.insert(0, str(Path(__file__).parent.parent))
def upgrade():
"""Add assignment_type column to chores table."""
db_path = Path(__file__).parent.parent / "data" / "family_hub.db"
print(f"Connecting to database: {db_path}")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Add assignment_type column (default 'any_one' for backward compatibility)
print("Adding assignment_type column to chores table...")
cursor.execute("""
ALTER TABLE chores
ADD COLUMN assignment_type VARCHAR(20) DEFAULT 'any_one'
""")
conn.commit()
print("✅ Successfully added assignment_type column")
print(" Values: 'any_one' (only one person needs to complete)")
print(" 'all_assigned' (all assigned people must complete)")
except sqlite3.OperationalError as e:
if "duplicate column name" in str(e).lower():
print("⚠️ Column assignment_type already exists, skipping...")
else:
raise
finally:
conn.close()
if __name__ == "__main__":
upgrade()
print("\n✅ Migration completed!")

View File

@@ -0,0 +1,79 @@
"""Add chore_completion_logs table for comprehensive chore tracking."""
import sqlite3
import sys
from pathlib import Path
# Add parent directory to path to import from app
sys.path.insert(0, str(Path(__file__).parent.parent))
def upgrade():
"""Create chore_completion_logs table."""
db_path = Path(__file__).parent.parent / "data" / "family_hub.db"
print(f"Connecting to database: {db_path}")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Check if table already exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='chore_completion_logs'
""")
if cursor.fetchone():
print("⚠️ Table chore_completion_logs already exists, skipping...")
return
# Create chore_completion_logs table
print("Creating chore_completion_logs table...")
cursor.execute("""
CREATE TABLE chore_completion_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chore_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
completed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
notes TEXT,
verified_by_user_id INTEGER,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (chore_id) REFERENCES chores(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (verified_by_user_id) REFERENCES users(id) ON DELETE SET NULL
)
""")
# Create indexes for better query performance
print("Creating indexes...")
cursor.execute("""
CREATE INDEX idx_completion_logs_chore_id
ON chore_completion_logs(chore_id)
""")
cursor.execute("""
CREATE INDEX idx_completion_logs_user_id
ON chore_completion_logs(user_id)
""")
cursor.execute("""
CREATE INDEX idx_completion_logs_completed_at
ON chore_completion_logs(completed_at)
""")
conn.commit()
print("✅ Successfully created chore_completion_logs table")
print(" Features:")
print(" - Tracks every chore completion instance")
print(" - Optional notes for each completion")
print(" - Optional verification by another user")
print(" - Full historical data for reporting")
except Exception as e:
print(f"❌ Error during migration: {e}")
conn.rollback()
raise
finally:
conn.close()
if __name__ == "__main__":
upgrade()
print("\n✅ Migration completed!")

View File

@@ -0,0 +1,37 @@
"""Add discord_id and profile_picture to users table."""
import sqlite3
from pathlib import Path
def migrate():
"""Add new columns to users table."""
db_path = Path("/app/data/familyhub.db")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Add discord_id column
cursor.execute("ALTER TABLE users ADD COLUMN discord_id VARCHAR(100)")
print("✓ Added discord_id column")
except sqlite3.OperationalError as e:
if "duplicate column name" in str(e):
print("✓ discord_id column already exists")
else:
raise
try:
# Add profile_picture column
cursor.execute("ALTER TABLE users ADD COLUMN profile_picture VARCHAR(500)")
print("✓ Added profile_picture column")
except sqlite3.OperationalError as e:
if "duplicate column name" in str(e):
print("✓ profile_picture column already exists")
else:
raise
conn.commit()
conn.close()
print("✓ Migration complete!")
if __name__ == "__main__":
migrate()

View File

@@ -0,0 +1,227 @@
import sys
import os
from datetime import datetime, timedelta
# Add the app directory to the path
sys.path.insert(0, '/app')
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.db.base import Base
from app.models.user import User
from app.models.chore import Chore
from app.core.security import get_password_hash
# Database setup
DATABASE_URL = "sqlite:///./family_hub.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def init_db():
"""Initialize database with schema and demo data"""
print("Creating all tables...")
Base.metadata.create_all(bind=engine)
db = SessionLocal()
try:
# Check if users already exist
existing_users = db.query(User).count()
if existing_users > 0:
print(f"Database already has {existing_users} users. Skipping initialization.")
return
print("Creating family members...")
# Create family members
users_data = [
{"username": "lou", "email": "lou@family.local", "full_name": "Lou", "is_admin": False},
{"username": "jess", "email": "jess@family.local", "full_name": "Jess", "is_admin": True},
{"username": "william", "email": "william@family.local", "full_name": "William", "is_admin": False},
{"username": "xander", "email": "xander@family.local", "full_name": "Xander", "is_admin": False},
{"username": "bella", "email": "bella@family.local", "full_name": "Bella", "is_admin": False},
]
users = {}
for user_data in users_data:
user = User(
username=user_data["username"],
email=user_data["email"],
full_name=user_data["full_name"],
hashed_password=get_password_hash("password123"),
is_admin=user_data["is_admin"],
is_active=True,
discord_id=None,
profile_picture=None
)
db.add(user)
users[user_data["username"]] = user
print(f" + Created user: {user_data['full_name']} ({user_data['username']})")
db.commit()
# Refresh to get IDs
for user in users.values():
db.refresh(user)
print("\nCreating demo chores...")
# Create demo chores with various statuses and assignments
today = datetime.now()
demo_chores = [
# Daily chores
{
"title": "Feed the Dog",
"description": "Give Rex his breakfast and dinner",
"room": "Kitchen",
"frequency": "daily",
"assignment_type": "any_one",
"status": "completed",
"assigned_user_id": users["william"].id,
"due_date": today.replace(hour=23, minute=59, second=59),
"completed_at": today.replace(hour=8, minute=30)
},
{
"title": "Take Out Trash",
"description": "Empty all bins and take to curb",
"room": "Kitchen",
"frequency": "daily",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["xander"].id,
"due_date": today.replace(hour=23, minute=59, second=59),
"completed_at": None
},
{
"title": "Wash Dishes",
"description": "Load and run the dishwasher",
"room": "Kitchen",
"frequency": "daily",
"assignment_type": "any_one",
"status": "in_progress",
"assigned_user_id": users["bella"].id,
"due_date": today.replace(hour=23, minute=59, second=59),
"completed_at": None
},
# Weekly chores
{
"title": "Vacuum Living Room",
"description": "Vacuum carpets and under furniture",
"room": "Living Room",
"frequency": "weekly",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["lou"].id,
"due_date": (today + timedelta(days=3)).replace(hour=23, minute=59, second=59),
"completed_at": None
},
{
"title": "Clean Bathrooms",
"description": "Scrub toilets, sinks, and showers",
"room": "Bathroom",
"frequency": "weekly",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["jess"].id,
"due_date": (today + timedelta(days=2)).replace(hour=23, minute=59, second=59),
"completed_at": None
},
{
"title": "Mow Lawn",
"description": "Mow front and back yards",
"room": "Yard",
"frequency": "weekly",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["william"].id,
"due_date": (today + timedelta(days=5)).replace(hour=23, minute=59, second=59),
"completed_at": None
},
# Monthly chores
{
"title": "Change Air Filters",
"description": "Replace HVAC air filters throughout house",
"room": "Utility Room",
"frequency": "monthly",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["lou"].id,
"due_date": (today + timedelta(days=15)).replace(hour=23, minute=59, second=59),
"completed_at": None
},
{
"title": "Deep Clean Fridge",
"description": "Empty, wipe down shelves, check expiry dates",
"room": "Kitchen",
"frequency": "monthly",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["jess"].id,
"due_date": (today + timedelta(days=20)).replace(hour=23, minute=59, second=59),
"completed_at": None
},
# On-trigger chores (no specific schedule)
{
"title": "Water Plants",
"description": "Check soil moisture and water as needed",
"room": "Living Room",
"frequency": "on_trigger",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["bella"].id,
"due_date": today.replace(hour=23, minute=59, second=59),
"completed_at": None
},
{
"title": "Sort Recycling",
"description": "Separate recyclables into proper bins",
"room": "Garage",
"frequency": "on_trigger",
"assignment_type": "any_one",
"status": "completed",
"assigned_user_id": users["xander"].id,
"due_date": (today - timedelta(days=1)).replace(hour=23, minute=59, second=59),
"completed_at": (today - timedelta(days=1)).replace(hour=14, minute=20)
},
# Some overdue chores
{
"title": "Organize Garage",
"description": "Sort tools and clean workspace",
"room": "Garage",
"frequency": "monthly",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["william"].id,
"due_date": (today - timedelta(days=3)).replace(hour=23, minute=59, second=59),
"completed_at": None
},
{
"title": "Clean Windows",
"description": "Wash all interior and exterior windows",
"room": "Whole House",
"frequency": "monthly",
"assignment_type": "any_one",
"status": "pending",
"assigned_user_id": users["xander"].id,
"due_date": (today - timedelta(days=1)).replace(hour=23, minute=59, second=59),
"completed_at": None
},
]
for chore_data in demo_chores:
chore = Chore(**chore_data, created_by=users["jess"].id)
db.add(chore)
status_marker = "[DONE]" if chore_data["status"] == "completed" else "[WIP]" if chore_data["status"] == "in_progress" else "[TODO]"
print(f" {status_marker} {chore_data['title']} - {chore_data['frequency']} ({chore_data['room']})")
db.commit()
print(f"\n[SUCCESS] Database initialized with {len(users_data)} users and {len(demo_chores)} demo chores!")
except Exception as e:
print(f"[ERROR] Error initializing database: {e}")
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
init_db()

11
backend/requirements.txt Normal file
View File

@@ -0,0 +1,11 @@
fastapi==0.115.0
uvicorn[standard]==0.32.0
sqlalchemy==2.0.36
python-jose[cryptography]==3.3.0
bcrypt==4.2.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.12
pydantic==2.10.3
pydantic-settings==2.6.1
python-dotenv==1.0.1
email-validator==2.2.0

View File

@@ -0,0 +1,11 @@
fastapi==0.115.0
uvicorn[standard]==0.32.0
sqlalchemy==2.0.36
python-jose[cryptography]==3.3.0
bcrypt==4.2.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.12
pydantic==2.10.3
pydantic-settings==2.6.1
python-dotenv==1.0.1
email-validator==2.2.0

84
backend/reset_database.py Normal file
View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""
Simple script to delete and reinitialize the database.
Run this from the backend directory.
"""
import os
import subprocess
import sys
from pathlib import Path
# Paths
BACKEND_DIR = r'D:\Hosted\familyhub\backend'
DB_PATH = os.path.join(BACKEND_DIR, 'data', 'family_hub.db')
MIGRATIONS_DIR = os.path.join(BACKEND_DIR, 'migrations')
print("=" * 60)
print("Family Hub Database Reset")
print("=" * 60)
# Delete old database
if os.path.exists(DB_PATH):
print(f"\n1. Deleting old database: {DB_PATH}")
os.remove(DB_PATH)
print(" ✓ Database deleted")
else:
print(f"\n1. No existing database found at: {DB_PATH}")
# Run init_db.py
print("\n2. Running database initialization...")
init_script = os.path.join(BACKEND_DIR, 'init_db.py')
# Run the init script
result = subprocess.run(
[sys.executable, init_script],
cwd=BACKEND_DIR,
capture_output=True,
text=True
)
print(result.stdout)
if result.stderr:
print("Errors:", result.stderr)
if result.returncode != 0:
print("\n" + "=" * 60)
print("❌ Database initialization failed!")
print("=" * 60)
sys.exit(1)
# Run migrations
print("\n3. Running database migrations...")
migration_files = sorted([
f for f in os.listdir(MIGRATIONS_DIR)
if f.endswith('.py') and f[0].isdigit()
])
for migration_file in migration_files:
migration_path = os.path.join(MIGRATIONS_DIR, migration_file)
print(f"\n Running {migration_file}...")
result = subprocess.run(
[sys.executable, migration_path],
cwd=BACKEND_DIR,
capture_output=True,
text=True
)
if result.stdout:
# Indent the output
for line in result.stdout.split('\n'):
if line:
print(f" {line}")
if result.returncode != 0:
print(f" ❌ Migration {migration_file} failed!")
if result.stderr:
print(f" Error: {result.stderr}")
else:
print(f" ✅ Migration {migration_file} completed")
print("\n" + "=" * 60)
print("✅ Database reset complete!")
print("=" * 60)
print("\nYou can now restart the backend server.")

69
backend/test_passwords.py Normal file
View File

@@ -0,0 +1,69 @@
"""Test password verification."""
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from app.core.database import SessionLocal
from app.models.user import User
from app.core.security import verify_password
def test_login(username: str, password: str):
"""Test if login credentials work."""
db = SessionLocal()
try:
# Find user
user = db.query(User).filter(User.username == username).first()
if not user:
print(f"❌ User '{username}' not found in database")
print("\nAvailable users:")
all_users = db.query(User).all()
for u in all_users:
print(f" - {u.username} ({u.full_name})")
return False
print(f"✓ Found user: {user.username} ({user.full_name})")
print(f" Email: {user.email}")
print(f" Is Admin: {user.is_admin}")
print(f" Is Active: {user.is_active}")
print(f" Password Hash: {user.hashed_password[:60]}...")
print()
# Verify password
if verify_password(password, user.hashed_password):
print(f"✅ Password verification SUCCESSFUL for '{username}'")
return True
else:
print(f"❌ Password verification FAILED for '{username}'")
print(f" Tried password: '{password}'")
return False
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
return False
finally:
db.close()
if __name__ == "__main__":
print("="*50)
print("Password Verification Test")
print("="*50)
print()
# Test all demo users
test_users = [
("jess", "password123"),
("lou", "password123"),
("william", "password123"),
]
for username, password in test_users:
print(f"\nTesting: {username} / {password}")
print("-"*50)
test_login(username, password)
print()