Files

274 lines
10 KiB
Python

import os
from flask import Blueprint, render_template, redirect, url_for, flash, request, send_file, current_app, jsonify
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from app import db
from app.models.set import Set
from app.models.extra_file import ExtraFile
import uuid
extra_files_bp = Blueprint('extra_files', __name__)
ALLOWED_EXTENSIONS = {
# Images
'jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp', 'svg',
# Documents
'pdf', 'doc', 'docx', 'txt', 'rtf', 'odt',
# Spreadsheets
'xls', 'xlsx', 'csv', 'ods',
# Data files
'xml', 'json', 'yaml', 'yml',
# 3D/CAD files
'ldr', 'mpd', 'io', 'lxf', 'lxfml', 'stl', 'obj',
# Archives
'zip', 'rar', '7z', 'tar', 'gz',
# Other
'md', 'html', 'css', 'js'
}
# File categories
FILE_CATEGORIES = {
'bricklink': ['xml'],
'studio': ['io'],
'ldraw': ['ldr', 'mpd'],
'ldd': ['lxf', 'lxfml'],
'box_art': ['jpg', 'jpeg', 'png', 'gif', 'webp'],
'document': ['pdf', 'doc', 'docx', 'txt', 'rtf'],
'photo': ['jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp'],
'data': ['xml', 'json', 'csv', 'xlsx', 'xls'],
'archive': ['zip', 'rar', '7z', 'tar', 'gz'],
'other': []
}
def allowed_file(filename):
"""Check if file extension is allowed."""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_file_category(file_extension):
"""Determine file category based on extension."""
ext = file_extension.lower()
# Check each category
for category, extensions in FILE_CATEGORIES.items():
if ext in extensions:
return category
return 'other'
@extra_files_bp.route('/upload/<int:set_id>', methods=['GET', 'POST'])
@login_required
def upload(set_id):
"""Upload extra files for a set."""
lego_set = Set.query.get_or_404(set_id)
# Check permission
if lego_set.user_id != current_user.id and not current_user.is_admin:
flash('You do not have permission to upload files to this set.', 'danger')
return redirect(url_for('sets.view_set', set_id=set_id))
if request.method == 'POST':
# Check if files were uploaded
if 'files' not in request.files:
flash('No files selected.', 'warning')
return redirect(request.url)
files = request.files.getlist('files')
description = request.form.get('description', '').strip()
category = request.form.get('category', 'other')
if not files or all(file.filename == '' for file in files):
flash('No files selected.', 'warning')
return redirect(request.url)
uploaded_count = 0
failed_files = []
for file in files:
if file and file.filename:
if not allowed_file(file.filename):
failed_files.append(f"{file.filename} (unsupported file type)")
continue
try:
# Secure the filename
original_filename = secure_filename(file.filename)
file_extension = original_filename.rsplit('.', 1)[1].lower()
# Generate unique filename
unique_filename = f"{uuid.uuid4().hex}.{file_extension}"
# Create directory for extra files
extra_files_dir = os.path.join(current_app.config['UPLOAD_FOLDER'],
'extra_files',
str(lego_set.set_number))
os.makedirs(extra_files_dir, exist_ok=True)
# Save file
file_path = os.path.join(extra_files_dir, unique_filename)
file.save(file_path)
# Get file size
file_size = os.path.getsize(file_path)
# Determine category if auto
if category == 'auto':
category = get_file_category(file_extension)
# Create database record
relative_path = os.path.join('extra_files',
str(lego_set.set_number),
unique_filename)
extra_file = ExtraFile(
set_id=lego_set.id,
file_name=unique_filename,
original_filename=original_filename,
file_path=relative_path,
file_type=file_extension,
file_size=file_size,
description=description if description else None,
category=category,
uploaded_by=current_user.id
)
db.session.add(extra_file)
uploaded_count += 1
except Exception as e:
failed_files.append(f"{file.filename} ({str(e)})")
current_app.logger.error(f"Error uploading file {file.filename}: {str(e)}")
# Commit all successful uploads
if uploaded_count > 0:
db.session.commit()
flash(f'Successfully uploaded {uploaded_count} file(s)!', 'success')
if failed_files:
flash(f"Failed to upload {len(failed_files)} file(s): {', '.join(failed_files)}", 'warning')
return redirect(url_for('sets.view_set', set_id=set_id))
# GET request - show upload form
return render_template('extra_files/upload.html',
lego_set=lego_set,
file_categories=FILE_CATEGORIES)
@extra_files_bp.route('/download/<int:file_id>')
@login_required
def download(file_id):
"""Download an extra file."""
extra_file = ExtraFile.query.get_or_404(file_id)
# Check permission
if extra_file.lego_set.user_id != current_user.id and not current_user.is_admin:
flash('You do not have permission to download this file.', 'danger')
return redirect(url_for('main.index'))
# Get full file path
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], extra_file.file_path)
if not os.path.exists(file_path):
flash('File not found.', 'danger')
return redirect(url_for('sets.view_set', set_id=extra_file.set_id))
return send_file(
file_path,
as_attachment=True,
download_name=extra_file.original_filename
)
@extra_files_bp.route('/preview/<int:file_id>')
@login_required
def preview(file_id):
"""Preview a file (for images and PDFs)."""
extra_file = ExtraFile.query.get_or_404(file_id)
# Check permission
if extra_file.lego_set.user_id != current_user.id and not current_user.is_admin:
flash('You do not have permission to view this file.', 'danger')
return redirect(url_for('main.index'))
if not extra_file.can_preview:
flash('This file type cannot be previewed.', 'info')
return redirect(url_for('extra_files.download', file_id=file_id))
# Get full file path
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], extra_file.file_path)
if not os.path.exists(file_path):
flash('File not found.', 'danger')
return redirect(url_for('sets.view_set', set_id=extra_file.set_id))
return send_file(file_path)
@extra_files_bp.route('/delete/<int:file_id>', methods=['POST'])
@login_required
def delete(file_id):
"""Delete an extra file."""
extra_file = ExtraFile.query.get_or_404(file_id)
set_id = extra_file.set_id
# Check permission
if extra_file.lego_set.user_id != current_user.id and not current_user.is_admin:
flash('You do not have permission to delete this file.', 'danger')
return redirect(url_for('sets.view_set', set_id=set_id))
try:
# Delete physical file
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], extra_file.file_path)
if os.path.exists(file_path):
os.remove(file_path)
# Delete database record
db.session.delete(extra_file)
db.session.commit()
flash('File deleted successfully!', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error deleting file: {str(e)}', 'danger')
current_app.logger.error(f"Error deleting extra file {file_id}: {str(e)}")
return redirect(url_for('sets.view_set', set_id=set_id))
@extra_files_bp.route('/edit/<int:file_id>', methods=['POST'])
@login_required
def edit(file_id):
"""Edit file description and category."""
extra_file = ExtraFile.query.get_or_404(file_id)
# Check permission
if extra_file.lego_set.user_id != current_user.id and not current_user.is_admin:
return jsonify({'success': False, 'error': 'Permission denied'}), 403
try:
extra_file.description = request.form.get('description', '').strip() or None
extra_file.category = request.form.get('category', 'other')
db.session.commit()
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': True, 'message': 'File updated successfully'})
else:
flash('File updated successfully!', 'success')
return redirect(url_for('sets.view_set', set_id=extra_file.set_id))
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error updating extra file {file_id}: {str(e)}")
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'error': str(e)}), 500
else:
flash(f'Error updating file: {str(e)}', 'danger')
return redirect(url_for('sets.view_set', set_id=extra_file.set_id))