from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify from flask_login import login_required, current_user from functools import wraps from app import db from app.models.user import User from app.models.set import Set from app.models.instruction import Instruction from sqlalchemy import func from datetime import datetime, timedelta admin_bp = Blueprint('admin', __name__, url_prefix='/admin') def admin_required(f): """Decorator to require admin access.""" @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: flash('Please log in to access this page.', 'warning') return redirect(url_for('auth.login')) if not current_user.is_admin: flash('You do not have permission to access this page.', 'danger') return redirect(url_for('main.index')) return f(*args, **kwargs) return decorated_function @admin_bp.route('/') @login_required @admin_required def dashboard(): """Admin dashboard with site statistics.""" # Get statistics total_users = User.query.count() total_sets = Set.query.count() total_instructions = Instruction.query.count() total_mocs = Set.query.filter_by(is_moc=True).count() # Recent activity recent_users = User.query.order_by(User.created_at.desc()).limit(5).all() recent_sets = Set.query.order_by(Set.created_at.desc()).limit(10).all() # Storage statistics total_storage = db.session.query( func.sum(Instruction.file_size) ).scalar() or 0 total_storage_mb = round(total_storage / (1024 * 1024), 2) # Get users with most sets top_contributors = db.session.query( User, func.count(Set.id).label('set_count') ).join(Set).group_by(User.id).order_by( func.count(Set.id).desc() ).limit(5).all() # Theme statistics theme_stats = db.session.query( Set.theme, func.count(Set.id).label('count') ).group_by(Set.theme).order_by( func.count(Set.id).desc() ).limit(10).all() return render_template('admin/dashboard.html', total_users=total_users, total_sets=total_sets, total_instructions=total_instructions, total_mocs=total_mocs, total_storage_mb=total_storage_mb, recent_users=recent_users, recent_sets=recent_sets, top_contributors=top_contributors, theme_stats=theme_stats) @admin_bp.route('/users') @login_required @admin_required def users(): """User management page.""" page = request.args.get('page', 1, type=int) per_page = 20 # Search functionality search = request.args.get('search', '') query = User.query if search: query = query.filter( (User.username.ilike(f'%{search}%')) | (User.email.ilike(f'%{search}%')) ) pagination = query.order_by(User.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) # Get set counts for each user user_stats = {} for user in pagination.items: user_stats[user.id] = { 'sets': user.sets.count(), 'instructions': user.instructions.count() } return render_template('admin/users.html', users=pagination.items, pagination=pagination, user_stats=user_stats, search=search) @admin_bp.route('/users//toggle-admin', methods=['POST']) @login_required @admin_required def toggle_admin(user_id): """Toggle admin status for a user.""" if user_id == current_user.id: return jsonify({'error': 'Cannot change your own admin status'}), 400 user = User.query.get_or_404(user_id) user.is_admin = not user.is_admin db.session.commit() status = 'granted' if user.is_admin else 'revoked' flash(f'Admin access {status} for {user.username}', 'success') return jsonify({'success': True, 'is_admin': user.is_admin}) @admin_bp.route('/users//delete', methods=['POST']) @login_required @admin_required def delete_user(user_id): """Delete a user and optionally their data.""" if user_id == current_user.id: flash('Cannot delete your own account from admin panel', 'danger') return redirect(url_for('admin.users')) user = User.query.get_or_404(user_id) username = user.username # Check if should delete user's data delete_data = request.form.get('delete_data') == 'on' if delete_data: # Delete user's sets and instructions Set.query.filter_by(user_id=user_id).delete() Instruction.query.filter_by(user_id=user_id).delete() else: # Reassign to admin (current user) Set.query.filter_by(user_id=user_id).update({'user_id': current_user.id}) Instruction.query.filter_by(user_id=user_id).update({'user_id': current_user.id}) db.session.delete(user) db.session.commit() flash(f'User {username} has been deleted', 'success') return redirect(url_for('admin.users')) @admin_bp.route('/sets') @login_required @admin_required def sets(): """View all sets in the system.""" page = request.args.get('page', 1, type=int) per_page = 20 # Filter options filter_type = request.args.get('type', 'all') search = request.args.get('search', '') query = Set.query # Apply filters if filter_type == 'mocs': query = query.filter_by(is_moc=True) elif filter_type == 'official': query = query.filter_by(is_moc=False) if search: query = query.filter( (Set.set_number.ilike(f'%{search}%')) | (Set.set_name.ilike(f'%{search}%')) | (Set.theme.ilike(f'%{search}%')) ) pagination = query.order_by(Set.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) return render_template('admin/sets.html', sets=pagination.items, pagination=pagination, filter_type=filter_type, search=search) @admin_bp.route('/sets//delete', methods=['POST']) @login_required @admin_required def delete_set(set_id): """Delete a set and its instructions.""" lego_set = Set.query.get_or_404(set_id) set_number = lego_set.set_number # Delete instructions (cascade should handle this) db.session.delete(lego_set) db.session.commit() flash(f'Set {set_number} has been deleted', 'success') return redirect(url_for('admin.sets')) @admin_bp.route('/site-settings', methods=['GET', 'POST']) @login_required @admin_required def site_settings(): """Site-wide settings configuration.""" if request.method == 'POST': # This is a placeholder for future site settings # You could store settings in a database table or config file flash('Settings updated successfully', 'success') return redirect(url_for('admin.site_settings')) # Get current stats for display stats = { 'total_users': User.query.count(), 'total_sets': Set.query.count(), 'total_instructions': Instruction.query.count(), 'total_storage': db.session.query(func.sum(Instruction.file_size)).scalar() or 0 } return render_template('admin/settings.html', stats=stats) @admin_bp.route('/api/stats') @login_required @admin_required def api_stats(): """API endpoint for real-time statistics.""" # Get stats for the last 30 days thirty_days_ago = datetime.utcnow() - timedelta(days=30) new_users = User.query.filter(User.created_at >= thirty_days_ago).count() new_sets = Set.query.filter(Set.created_at >= thirty_days_ago).count() return jsonify({ 'total_users': User.query.count(), 'total_sets': Set.query.count(), 'total_instructions': Instruction.query.count(), 'new_users_30d': new_users, 'new_sets_30d': new_sets }) @admin_bp.route('/bulk-import', methods=['GET', 'POST']) @login_required @admin_required def bulk_import(): """Bulk import sets from Brickset.""" from app.services.brickset_api import BricksetAPI import time if request.method == 'POST': # Get form data set_numbers_text = request.form.get('set_numbers', '') user_id = request.form.get('user_id') throttle_delay = float(request.form.get('throttle_delay', '0.5')) # Default 0.5 seconds # Parse set numbers (split by newlines, commas, or spaces) import re set_numbers = re.split(r'[,\s\n]+', set_numbers_text.strip()) set_numbers = [s.strip() for s in set_numbers if s.strip()] if not set_numbers: flash('Please enter at least one set number.', 'warning') return redirect(url_for('admin.bulk_import')) if not user_id: flash('Please select a user.', 'warning') return redirect(url_for('admin.bulk_import')) # Warn if trying to import too many at once if len(set_numbers) > 50: flash('Warning: Importing more than 50 sets may take a while. Consider splitting into smaller batches.', 'warning') # Check if Brickset is configured if not BricksetAPI.is_configured(): flash('Brickset API is not configured. Please add credentials to .env file.', 'danger') return redirect(url_for('admin.bulk_import')) # Import sets with throttling api = BricksetAPI() results = { 'success': [], 'failed': [], 'already_exists': [], 'rate_limited': [] } for index, set_number in enumerate(set_numbers, 1): try: # Check if set already exists existing_set = Set.query.filter_by(set_number=set_number).first() if existing_set: results['already_exists'].append({ 'set_number': set_number, 'name': existing_set.set_name }) continue # Add delay between requests to respect rate limits if index > 1: # Don't delay on first request time.sleep(throttle_delay) # Fetch from Brickset set_data = api.get_set_by_number(set_number) if not set_data: results['failed'].append({ 'set_number': set_number, 'reason': 'Not found in Brickset' }) continue # Create set in database new_set = Set( set_number=set_number, set_name=set_data.get('name', 'Unknown'), theme=set_data.get('theme', 'Unknown'), year_released=set_data.get('year', datetime.now().year), piece_count=set_data.get('pieces', 0), image_url=set_data.get('image', {}).get('imageURL'), user_id=user_id, is_moc=False ) db.session.add(new_set) results['success'].append({ 'set_number': set_number, 'name': new_set.set_name, 'theme': new_set.theme }) current_app.logger.info(f"Imported set {index}/{len(set_numbers)}: {set_number}") except Exception as e: error_msg = str(e) if 'API limit exceeded' in error_msg or 'rate limit' in error_msg.lower(): results['rate_limited'].append({ 'set_number': set_number, 'reason': 'API rate limit exceeded' }) current_app.logger.warning(f"Rate limit hit on set {set_number}, stopping import") # Stop processing remaining sets break else: results['failed'].append({ 'set_number': set_number, 'reason': error_msg }) current_app.logger.error(f"Failed to import {set_number}: {error_msg}") # Commit all successful imports if results['success']: db.session.commit() flash(f"Successfully imported {len(results['success'])} set(s)!", 'success') if results['rate_limited']: remaining = len(set_numbers) - len(results['success']) - len(results['failed']) - len(results['already_exists']) - len(results['rate_limited']) flash(f"API rate limit reached after {len(results['success'])} imports. " f"{len(results['rate_limited'])} set(s) not processed due to rate limit. " f"Try again in a few minutes or increase the throttle delay.", 'warning') if results['failed']: flash(f"Failed to import {len(results['failed'])} set(s).", 'warning') if results['already_exists']: flash(f"{len(results['already_exists'])} set(s) already exist in database.", 'info') # Store results in session for display from flask import session session['import_results'] = results return redirect(url_for('admin.bulk_import_results')) # GET request - show form users = User.query.order_by(User.username).all() brickset_configured = BricksetAPI.is_configured() return render_template('admin/bulk_import.html', users=users, brickset_configured=brickset_configured) @admin_bp.route('/bulk-import/results') @login_required @admin_required def bulk_import_results(): """Display bulk import results.""" from flask import session results = session.pop('import_results', None) if not results: flash('No import results to display.', 'info') return redirect(url_for('admin.bulk_import')) return render_template('admin/bulk_import_results.html', results=results)