410 lines
14 KiB
Python
410 lines
14 KiB
Python
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
|
|
from flask_login import login_required, current_user
|
|
from functools import wraps
|
|
from app import db
|
|
from app.models.user import User
|
|
from app.models.set import Set
|
|
from app.models.instruction import Instruction
|
|
from sqlalchemy import func
|
|
from datetime import datetime, timedelta
|
|
|
|
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
|
|
|
|
|
|
def admin_required(f):
|
|
"""Decorator to require admin access."""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated:
|
|
flash('Please log in to access this page.', 'warning')
|
|
return redirect(url_for('auth.login'))
|
|
if not current_user.is_admin:
|
|
flash('You do not have permission to access this page.', 'danger')
|
|
return redirect(url_for('main.index'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
@admin_bp.route('/')
|
|
@login_required
|
|
@admin_required
|
|
def dashboard():
|
|
"""Admin dashboard with site statistics."""
|
|
# Get statistics
|
|
total_users = User.query.count()
|
|
total_sets = Set.query.count()
|
|
total_instructions = Instruction.query.count()
|
|
total_mocs = Set.query.filter_by(is_moc=True).count()
|
|
|
|
# Recent activity
|
|
recent_users = User.query.order_by(User.created_at.desc()).limit(5).all()
|
|
recent_sets = Set.query.order_by(Set.created_at.desc()).limit(10).all()
|
|
|
|
# Storage statistics
|
|
total_storage = db.session.query(
|
|
func.sum(Instruction.file_size)
|
|
).scalar() or 0
|
|
total_storage_mb = round(total_storage / (1024 * 1024), 2)
|
|
|
|
# Get users with most sets
|
|
top_contributors = db.session.query(
|
|
User, func.count(Set.id).label('set_count')
|
|
).join(Set).group_by(User.id).order_by(
|
|
func.count(Set.id).desc()
|
|
).limit(5).all()
|
|
|
|
# Theme statistics
|
|
theme_stats = db.session.query(
|
|
Set.theme, func.count(Set.id).label('count')
|
|
).group_by(Set.theme).order_by(
|
|
func.count(Set.id).desc()
|
|
).limit(10).all()
|
|
|
|
return render_template('admin/dashboard.html',
|
|
total_users=total_users,
|
|
total_sets=total_sets,
|
|
total_instructions=total_instructions,
|
|
total_mocs=total_mocs,
|
|
total_storage_mb=total_storage_mb,
|
|
recent_users=recent_users,
|
|
recent_sets=recent_sets,
|
|
top_contributors=top_contributors,
|
|
theme_stats=theme_stats)
|
|
|
|
|
|
@admin_bp.route('/users')
|
|
@login_required
|
|
@admin_required
|
|
def users():
|
|
"""User management page."""
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
# Search functionality
|
|
search = request.args.get('search', '')
|
|
query = User.query
|
|
|
|
if search:
|
|
query = query.filter(
|
|
(User.username.ilike(f'%{search}%')) |
|
|
(User.email.ilike(f'%{search}%'))
|
|
)
|
|
|
|
pagination = query.order_by(User.created_at.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False
|
|
)
|
|
|
|
# Get set counts for each user
|
|
user_stats = {}
|
|
for user in pagination.items:
|
|
user_stats[user.id] = {
|
|
'sets': user.sets.count(),
|
|
'instructions': user.instructions.count()
|
|
}
|
|
|
|
return render_template('admin/users.html',
|
|
users=pagination.items,
|
|
pagination=pagination,
|
|
user_stats=user_stats,
|
|
search=search)
|
|
|
|
|
|
@admin_bp.route('/users/<int:user_id>/toggle-admin', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def toggle_admin(user_id):
|
|
"""Toggle admin status for a user."""
|
|
if user_id == current_user.id:
|
|
return jsonify({'error': 'Cannot change your own admin status'}), 400
|
|
|
|
user = User.query.get_or_404(user_id)
|
|
user.is_admin = not user.is_admin
|
|
db.session.commit()
|
|
|
|
status = 'granted' if user.is_admin else 'revoked'
|
|
flash(f'Admin access {status} for {user.username}', 'success')
|
|
|
|
return jsonify({'success': True, 'is_admin': user.is_admin})
|
|
|
|
|
|
@admin_bp.route('/users/<int:user_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_user(user_id):
|
|
"""Delete a user and optionally their data."""
|
|
if user_id == current_user.id:
|
|
flash('Cannot delete your own account from admin panel', 'danger')
|
|
return redirect(url_for('admin.users'))
|
|
|
|
user = User.query.get_or_404(user_id)
|
|
username = user.username
|
|
|
|
# Check if should delete user's data
|
|
delete_data = request.form.get('delete_data') == 'on'
|
|
|
|
if delete_data:
|
|
# Delete user's sets and instructions
|
|
Set.query.filter_by(user_id=user_id).delete()
|
|
Instruction.query.filter_by(user_id=user_id).delete()
|
|
else:
|
|
# Reassign to admin (current user)
|
|
Set.query.filter_by(user_id=user_id).update({'user_id': current_user.id})
|
|
Instruction.query.filter_by(user_id=user_id).update({'user_id': current_user.id})
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
flash(f'User {username} has been deleted', 'success')
|
|
return redirect(url_for('admin.users'))
|
|
|
|
|
|
@admin_bp.route('/sets')
|
|
@login_required
|
|
@admin_required
|
|
def sets():
|
|
"""View all sets in the system."""
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
# Filter options
|
|
filter_type = request.args.get('type', 'all')
|
|
search = request.args.get('search', '')
|
|
|
|
query = Set.query
|
|
|
|
# Apply filters
|
|
if filter_type == 'mocs':
|
|
query = query.filter_by(is_moc=True)
|
|
elif filter_type == 'official':
|
|
query = query.filter_by(is_moc=False)
|
|
|
|
if search:
|
|
query = query.filter(
|
|
(Set.set_number.ilike(f'%{search}%')) |
|
|
(Set.set_name.ilike(f'%{search}%')) |
|
|
(Set.theme.ilike(f'%{search}%'))
|
|
)
|
|
|
|
pagination = query.order_by(Set.created_at.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False
|
|
)
|
|
|
|
return render_template('admin/sets.html',
|
|
sets=pagination.items,
|
|
pagination=pagination,
|
|
filter_type=filter_type,
|
|
search=search)
|
|
|
|
|
|
@admin_bp.route('/sets/<int:set_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def delete_set(set_id):
|
|
"""Delete a set and its instructions."""
|
|
lego_set = Set.query.get_or_404(set_id)
|
|
set_number = lego_set.set_number
|
|
|
|
# Delete instructions (cascade should handle this)
|
|
db.session.delete(lego_set)
|
|
db.session.commit()
|
|
|
|
flash(f'Set {set_number} has been deleted', 'success')
|
|
return redirect(url_for('admin.sets'))
|
|
|
|
|
|
@admin_bp.route('/site-settings', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def site_settings():
|
|
"""Site-wide settings configuration."""
|
|
if request.method == 'POST':
|
|
# This is a placeholder for future site settings
|
|
# You could store settings in a database table or config file
|
|
flash('Settings updated successfully', 'success')
|
|
return redirect(url_for('admin.site_settings'))
|
|
|
|
# Get current stats for display
|
|
stats = {
|
|
'total_users': User.query.count(),
|
|
'total_sets': Set.query.count(),
|
|
'total_instructions': Instruction.query.count(),
|
|
'total_storage': db.session.query(func.sum(Instruction.file_size)).scalar() or 0
|
|
}
|
|
|
|
return render_template('admin/settings.html', stats=stats)
|
|
|
|
|
|
@admin_bp.route('/api/stats')
|
|
@login_required
|
|
@admin_required
|
|
def api_stats():
|
|
"""API endpoint for real-time statistics."""
|
|
# Get stats for the last 30 days
|
|
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
|
|
|
|
new_users = User.query.filter(User.created_at >= thirty_days_ago).count()
|
|
new_sets = Set.query.filter(Set.created_at >= thirty_days_ago).count()
|
|
|
|
return jsonify({
|
|
'total_users': User.query.count(),
|
|
'total_sets': Set.query.count(),
|
|
'total_instructions': Instruction.query.count(),
|
|
'new_users_30d': new_users,
|
|
'new_sets_30d': new_sets
|
|
})
|
|
|
|
|
|
@admin_bp.route('/bulk-import', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def bulk_import():
|
|
"""Bulk import sets from Brickset."""
|
|
from app.services.brickset_api import BricksetAPI
|
|
import time
|
|
|
|
if request.method == 'POST':
|
|
# Get form data
|
|
set_numbers_text = request.form.get('set_numbers', '')
|
|
user_id = request.form.get('user_id')
|
|
throttle_delay = float(request.form.get('throttle_delay', '0.5')) # Default 0.5 seconds
|
|
|
|
# Parse set numbers (split by newlines, commas, or spaces)
|
|
import re
|
|
set_numbers = re.split(r'[,\s\n]+', set_numbers_text.strip())
|
|
set_numbers = [s.strip() for s in set_numbers if s.strip()]
|
|
|
|
if not set_numbers:
|
|
flash('Please enter at least one set number.', 'warning')
|
|
return redirect(url_for('admin.bulk_import'))
|
|
|
|
if not user_id:
|
|
flash('Please select a user.', 'warning')
|
|
return redirect(url_for('admin.bulk_import'))
|
|
|
|
# Warn if trying to import too many at once
|
|
if len(set_numbers) > 50:
|
|
flash('Warning: Importing more than 50 sets may take a while. Consider splitting into smaller batches.', 'warning')
|
|
|
|
# Check if Brickset is configured
|
|
if not BricksetAPI.is_configured():
|
|
flash('Brickset API is not configured. Please add credentials to .env file.', 'danger')
|
|
return redirect(url_for('admin.bulk_import'))
|
|
|
|
# Import sets with throttling
|
|
api = BricksetAPI()
|
|
results = {
|
|
'success': [],
|
|
'failed': [],
|
|
'already_exists': [],
|
|
'rate_limited': []
|
|
}
|
|
|
|
for index, set_number in enumerate(set_numbers, 1):
|
|
try:
|
|
# Check if set already exists
|
|
existing_set = Set.query.filter_by(set_number=set_number).first()
|
|
if existing_set:
|
|
results['already_exists'].append({
|
|
'set_number': set_number,
|
|
'name': existing_set.set_name
|
|
})
|
|
continue
|
|
|
|
# Add delay between requests to respect rate limits
|
|
if index > 1: # Don't delay on first request
|
|
time.sleep(throttle_delay)
|
|
|
|
# Fetch from Brickset
|
|
set_data = api.get_set_by_number(set_number)
|
|
|
|
if not set_data:
|
|
results['failed'].append({
|
|
'set_number': set_number,
|
|
'reason': 'Not found in Brickset'
|
|
})
|
|
continue
|
|
|
|
# Create set in database
|
|
new_set = Set(
|
|
set_number=set_number,
|
|
set_name=set_data.get('name', 'Unknown'),
|
|
theme=set_data.get('theme', 'Unknown'),
|
|
year_released=set_data.get('year', datetime.now().year),
|
|
piece_count=set_data.get('pieces', 0),
|
|
image_url=set_data.get('image', {}).get('imageURL'),
|
|
user_id=user_id,
|
|
is_moc=False
|
|
)
|
|
|
|
db.session.add(new_set)
|
|
results['success'].append({
|
|
'set_number': set_number,
|
|
'name': new_set.set_name,
|
|
'theme': new_set.theme
|
|
})
|
|
|
|
current_app.logger.info(f"Imported set {index}/{len(set_numbers)}: {set_number}")
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if 'API limit exceeded' in error_msg or 'rate limit' in error_msg.lower():
|
|
results['rate_limited'].append({
|
|
'set_number': set_number,
|
|
'reason': 'API rate limit exceeded'
|
|
})
|
|
current_app.logger.warning(f"Rate limit hit on set {set_number}, stopping import")
|
|
# Stop processing remaining sets
|
|
break
|
|
else:
|
|
results['failed'].append({
|
|
'set_number': set_number,
|
|
'reason': error_msg
|
|
})
|
|
current_app.logger.error(f"Failed to import {set_number}: {error_msg}")
|
|
|
|
# Commit all successful imports
|
|
if results['success']:
|
|
db.session.commit()
|
|
flash(f"Successfully imported {len(results['success'])} set(s)!", 'success')
|
|
|
|
if results['rate_limited']:
|
|
remaining = len(set_numbers) - len(results['success']) - len(results['failed']) - len(results['already_exists']) - len(results['rate_limited'])
|
|
flash(f"API rate limit reached after {len(results['success'])} imports. "
|
|
f"{len(results['rate_limited'])} set(s) not processed due to rate limit. "
|
|
f"Try again in a few minutes or increase the throttle delay.", 'warning')
|
|
|
|
if results['failed']:
|
|
flash(f"Failed to import {len(results['failed'])} set(s).", 'warning')
|
|
|
|
if results['already_exists']:
|
|
flash(f"{len(results['already_exists'])} set(s) already exist in database.", 'info')
|
|
|
|
# Store results in session for display
|
|
from flask import session
|
|
session['import_results'] = results
|
|
|
|
return redirect(url_for('admin.bulk_import_results'))
|
|
|
|
# GET request - show form
|
|
users = User.query.order_by(User.username).all()
|
|
brickset_configured = BricksetAPI.is_configured()
|
|
|
|
return render_template('admin/bulk_import.html',
|
|
users=users,
|
|
brickset_configured=brickset_configured)
|
|
|
|
|
|
@admin_bp.route('/bulk-import/results')
|
|
@login_required
|
|
@admin_required
|
|
def bulk_import_results():
|
|
"""Display bulk import results."""
|
|
from flask import session
|
|
results = session.pop('import_results', None)
|
|
|
|
if not results:
|
|
flash('No import results to display.', 'info')
|
|
return redirect(url_for('admin.bulk_import'))
|
|
|
|
return render_template('admin/bulk_import_results.html', results=results)
|