Initial commit - LEGO Instructions Manager v1.5.0
This commit is contained in:
4
app/services/__init__.py
Normal file
4
app/services/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from app.services.brickset_api import BricksetAPI
|
||||
from app.services.file_handler import FileHandler
|
||||
|
||||
__all__ = ['BricksetAPI', 'FileHandler']
|
||||
198
app/services/brickset_api.py
Normal file
198
app/services/brickset_api.py
Normal file
@@ -0,0 +1,198 @@
|
||||
import requests
|
||||
from flask import current_app
|
||||
from typing import Optional, List, Dict, Any
|
||||
|
||||
|
||||
class BricksetAPI:
|
||||
"""Service for interacting with the Brickset API v3."""
|
||||
|
||||
BASE_URL = 'https://brickset.com/api/v3.asmx'
|
||||
|
||||
def __init__(self):
|
||||
self.api_key = current_app.config.get('BRICKSET_API_KEY')
|
||||
self.username = current_app.config.get('BRICKSET_USERNAME')
|
||||
self.password = current_app.config.get('BRICKSET_PASSWORD')
|
||||
self._user_hash = None
|
||||
|
||||
@staticmethod
|
||||
def is_configured() -> bool:
|
||||
"""Check if Brickset API is properly configured."""
|
||||
return bool(
|
||||
current_app.config.get('BRICKSET_API_KEY') and
|
||||
current_app.config.get('BRICKSET_USERNAME') and
|
||||
current_app.config.get('BRICKSET_PASSWORD')
|
||||
)
|
||||
|
||||
def _get_user_hash(self) -> Optional[str]:
|
||||
"""Authenticate and get user hash token."""
|
||||
if self._user_hash:
|
||||
return self._user_hash
|
||||
|
||||
if not all([self.api_key, self.username, self.password]):
|
||||
current_app.logger.warning('Brickset API credentials not configured')
|
||||
return None
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
f'{self.BASE_URL}/login',
|
||||
params={
|
||||
'apiKey': self.api_key,
|
||||
'username': self.username,
|
||||
'password': self.password
|
||||
},
|
||||
timeout=10
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data.get('status') == 'success':
|
||||
self._user_hash = data.get('hash')
|
||||
return self._user_hash
|
||||
else:
|
||||
current_app.logger.error(f"Brickset login failed: {data.get('message')}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f'Brickset API authentication error: {str(e)}')
|
||||
return None
|
||||
|
||||
def search_sets(self,
|
||||
set_number: Optional[str] = None,
|
||||
theme: Optional[str] = None,
|
||||
year: Optional[int] = None,
|
||||
query: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Search for LEGO sets using various parameters.
|
||||
|
||||
Args:
|
||||
set_number: Specific set number to search for
|
||||
theme: Theme name to filter by
|
||||
year: Year released
|
||||
query: General search query
|
||||
|
||||
Returns:
|
||||
List of set dictionaries
|
||||
"""
|
||||
user_hash = self._get_user_hash()
|
||||
if not user_hash:
|
||||
return []
|
||||
|
||||
params = {
|
||||
'apiKey': self.api_key,
|
||||
'userHash': user_hash,
|
||||
'params': '{}' # JSON params object
|
||||
}
|
||||
|
||||
# Build search parameters
|
||||
search_params = {}
|
||||
if set_number:
|
||||
search_params['setNumber'] = set_number
|
||||
if theme:
|
||||
search_params['theme'] = theme
|
||||
if year:
|
||||
search_params['year'] = year
|
||||
if query:
|
||||
search_params['query'] = query
|
||||
|
||||
params['params'] = str(search_params)
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
f'{self.BASE_URL}/getSets',
|
||||
params=params,
|
||||
timeout=15
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data.get('status') == 'success':
|
||||
return data.get('sets', [])
|
||||
else:
|
||||
current_app.logger.error(f"Brickset search failed: {data.get('message')}")
|
||||
return []
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f'Brickset API search error: {str(e)}')
|
||||
return []
|
||||
|
||||
def get_set_by_number(self, set_number: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get detailed information for a specific set by its number.
|
||||
|
||||
Args:
|
||||
set_number: The LEGO set number (e.g., "10497")
|
||||
|
||||
Returns:
|
||||
Dictionary with set information or None
|
||||
"""
|
||||
results = self.search_sets(set_number=set_number)
|
||||
return results[0] if results else None
|
||||
|
||||
def get_themes(self) -> List[str]:
|
||||
"""
|
||||
Get list of all available LEGO themes.
|
||||
|
||||
Returns:
|
||||
List of theme names
|
||||
"""
|
||||
user_hash = self._get_user_hash()
|
||||
if not user_hash:
|
||||
return []
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
f'{self.BASE_URL}/getThemes',
|
||||
params={
|
||||
'apiKey': self.api_key,
|
||||
'userHash': user_hash
|
||||
},
|
||||
timeout=10
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data.get('status') == 'success':
|
||||
themes = data.get('themes', [])
|
||||
return [theme.get('theme') for theme in themes if theme.get('theme')]
|
||||
else:
|
||||
return []
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f'Brickset API themes error: {str(e)}')
|
||||
return []
|
||||
|
||||
def get_instructions(self, set_number: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get instruction information for a specific set.
|
||||
|
||||
Args:
|
||||
set_number: The LEGO set number
|
||||
|
||||
Returns:
|
||||
List of instruction dictionaries
|
||||
"""
|
||||
user_hash = self._get_user_hash()
|
||||
if not user_hash:
|
||||
return []
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
f'{self.BASE_URL}/getInstructions',
|
||||
params={
|
||||
'apiKey': self.api_key,
|
||||
'userHash': user_hash,
|
||||
'setNumber': set_number
|
||||
},
|
||||
timeout=10
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data.get('status') == 'success':
|
||||
return data.get('instructions', [])
|
||||
else:
|
||||
return []
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f'Brickset API instructions error: {str(e)}')
|
||||
return []
|
||||
317
app/services/file_handler.py
Normal file
317
app/services/file_handler.py
Normal file
@@ -0,0 +1,317 @@
|
||||
import os
|
||||
import uuid
|
||||
from werkzeug.utils import secure_filename
|
||||
from flask import current_app
|
||||
from PIL import Image
|
||||
from typing import Tuple, Optional
|
||||
|
||||
|
||||
class FileHandler:
|
||||
"""Service for handling file uploads and storage."""
|
||||
|
||||
@staticmethod
|
||||
def allowed_file(filename: str) -> bool:
|
||||
"""Check if file extension is allowed."""
|
||||
return '.' in filename and \
|
||||
filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS']
|
||||
|
||||
@staticmethod
|
||||
def get_file_type(filename: str) -> str:
|
||||
"""Determine file type based on extension."""
|
||||
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
|
||||
return 'PDF' if ext == 'pdf' else 'IMAGE'
|
||||
|
||||
@staticmethod
|
||||
def generate_unique_filename(original_filename: str) -> str:
|
||||
"""Generate a unique filename while preserving the extension."""
|
||||
ext = secure_filename(original_filename).rsplit('.', 1)[1].lower()
|
||||
unique_name = f"{uuid.uuid4().hex}.{ext}"
|
||||
return unique_name
|
||||
|
||||
@staticmethod
|
||||
def save_cover_image(file, set_number: str) -> Tuple[str, int]:
|
||||
"""
|
||||
Save cover image for a set.
|
||||
|
||||
Args:
|
||||
file: FileStorage object from request
|
||||
set_number: Set number for organizing files
|
||||
|
||||
Returns:
|
||||
Tuple of (relative_path, file_size_bytes)
|
||||
"""
|
||||
# Create covers directory if it doesn't exist
|
||||
covers_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], 'covers')
|
||||
os.makedirs(covers_dir, exist_ok=True)
|
||||
|
||||
# Generate unique filename
|
||||
filename = FileHandler.generate_unique_filename(file.filename)
|
||||
|
||||
# Create set-specific subdirectory
|
||||
set_dir = os.path.join(covers_dir, secure_filename(set_number))
|
||||
os.makedirs(set_dir, exist_ok=True)
|
||||
|
||||
# Save file
|
||||
file_path = os.path.join(set_dir, filename)
|
||||
file.save(file_path)
|
||||
|
||||
# Get file size
|
||||
file_size = os.path.getsize(file_path)
|
||||
|
||||
# Create thumbnail and optimize
|
||||
try:
|
||||
img = Image.open(file_path)
|
||||
|
||||
# Convert RGBA to RGB if necessary
|
||||
if img.mode == 'RGBA':
|
||||
background = Image.new('RGB', img.size, (255, 255, 255))
|
||||
background.paste(img, mask=img.split()[3])
|
||||
img = background
|
||||
|
||||
# Resize if too large (max 800px on longest side)
|
||||
max_size = 800
|
||||
if max(img.size) > max_size:
|
||||
ratio = max_size / max(img.size)
|
||||
new_size = tuple(int(dim * ratio) for dim in img.size)
|
||||
img = img.resize(new_size, Image.Resampling.LANCZOS)
|
||||
img.save(file_path, optimize=True, quality=85)
|
||||
file_size = os.path.getsize(file_path)
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Error processing cover image: {e}")
|
||||
|
||||
# Return relative path from uploads folder
|
||||
relative_path = os.path.join('covers', secure_filename(set_number), filename)
|
||||
return relative_path.replace('\\', '/'), file_size
|
||||
|
||||
@staticmethod
|
||||
def save_file(file, set_number: str, file_type: str) -> Tuple[str, int, Optional[str]]:
|
||||
"""
|
||||
Save uploaded file to appropriate directory.
|
||||
|
||||
Args:
|
||||
file: FileStorage object from request
|
||||
set_number: LEGO set number for organizing files
|
||||
file_type: 'PDF' or 'IMAGE'
|
||||
|
||||
Returns:
|
||||
Tuple of (relative_path, file_size, thumbnail_path)
|
||||
"""
|
||||
# Generate unique filename
|
||||
unique_filename = FileHandler.generate_unique_filename(file.filename)
|
||||
|
||||
# Determine subdirectory
|
||||
subdir = 'pdfs' if file_type == 'PDF' else 'images'
|
||||
|
||||
# Create set-specific directory
|
||||
set_dir = os.path.join(
|
||||
current_app.config['UPLOAD_FOLDER'],
|
||||
subdir,
|
||||
secure_filename(set_number)
|
||||
)
|
||||
os.makedirs(set_dir, exist_ok=True)
|
||||
|
||||
# Full file path
|
||||
file_path = os.path.join(set_dir, unique_filename)
|
||||
|
||||
# Save the file
|
||||
file.save(file_path)
|
||||
|
||||
# Get file size
|
||||
file_size = os.path.getsize(file_path)
|
||||
|
||||
# Generate thumbnail
|
||||
thumbnail_rel_path = None
|
||||
if file_type == 'IMAGE':
|
||||
thumb_path = FileHandler.create_thumbnail(file_path)
|
||||
if thumb_path:
|
||||
# Get relative path for thumbnail
|
||||
thumbnail_rel_path = os.path.join(subdir, secure_filename(set_number),
|
||||
os.path.basename(thumb_path))
|
||||
thumbnail_rel_path = thumbnail_rel_path.replace('\\', '/')
|
||||
elif file_type == 'PDF':
|
||||
thumb_path = FileHandler.create_pdf_thumbnail(file_path, set_number)
|
||||
if thumb_path:
|
||||
thumbnail_rel_path = thumb_path.replace('\\', '/')
|
||||
|
||||
# Return relative path for database storage
|
||||
relative_path = os.path.join(subdir, secure_filename(set_number), unique_filename)
|
||||
|
||||
return relative_path.replace('\\', '/'), file_size, thumbnail_rel_path
|
||||
|
||||
@staticmethod
|
||||
def create_thumbnail(image_path: str, size: Tuple[int, int] = (300, 300)) -> Optional[str]:
|
||||
"""
|
||||
Create a thumbnail for an image.
|
||||
|
||||
Args:
|
||||
image_path: Path to the original image
|
||||
size: Thumbnail size (width, height)
|
||||
|
||||
Returns:
|
||||
Path to thumbnail or None if failed
|
||||
"""
|
||||
try:
|
||||
img = Image.open(image_path)
|
||||
img.thumbnail(size, Image.Resampling.LANCZOS)
|
||||
|
||||
# Generate thumbnail filename
|
||||
base, ext = os.path.splitext(image_path)
|
||||
thumb_path = f"{base}_thumb{ext}"
|
||||
|
||||
img.save(thumb_path, optimize=True)
|
||||
return thumb_path
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f'Thumbnail creation failed: {str(e)}')
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def create_pdf_thumbnail(pdf_path: str, set_number: str, size: Tuple[int, int] = (300, 400)) -> Optional[str]:
|
||||
"""
|
||||
Create a thumbnail from the first page of a PDF.
|
||||
|
||||
Args:
|
||||
pdf_path: Path to the PDF file
|
||||
set_number: Set number for organizing thumbnails
|
||||
size: Thumbnail size (width, height)
|
||||
|
||||
Returns:
|
||||
Relative path to thumbnail or None if failed
|
||||
"""
|
||||
try:
|
||||
# Try using PyMuPDF (fitz) first
|
||||
try:
|
||||
import fitz # PyMuPDF
|
||||
|
||||
# Open PDF
|
||||
doc = fitz.open(pdf_path)
|
||||
|
||||
# Get first page
|
||||
page = doc[0]
|
||||
|
||||
# Render page to pixmap (image)
|
||||
# Use matrix for scaling to desired size
|
||||
zoom = 2 # Higher zoom for better quality
|
||||
mat = fitz.Matrix(zoom, zoom)
|
||||
pix = page.get_pixmap(matrix=mat)
|
||||
|
||||
# Create thumbnails directory
|
||||
thumbnails_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], 'thumbnails', secure_filename(set_number))
|
||||
os.makedirs(thumbnails_dir, exist_ok=True)
|
||||
|
||||
# Generate thumbnail filename
|
||||
pdf_filename = os.path.basename(pdf_path)
|
||||
thumb_filename = os.path.splitext(pdf_filename)[0] + '_thumb.png'
|
||||
thumb_path = os.path.join(thumbnails_dir, thumb_filename)
|
||||
|
||||
# Save as PNG
|
||||
pix.save(thumb_path)
|
||||
|
||||
# Close document
|
||||
doc.close()
|
||||
|
||||
# Resize to target size using PIL
|
||||
img = Image.open(thumb_path)
|
||||
img.thumbnail(size, Image.Resampling.LANCZOS)
|
||||
img.save(thumb_path, optimize=True)
|
||||
|
||||
# Return relative path
|
||||
rel_path = os.path.join('thumbnails', secure_filename(set_number), thumb_filename)
|
||||
return rel_path
|
||||
|
||||
except ImportError:
|
||||
# Try pdf2image as fallback
|
||||
try:
|
||||
from pdf2image import convert_from_path
|
||||
|
||||
# Convert first page only
|
||||
images = convert_from_path(pdf_path, first_page=1, last_page=1, dpi=150)
|
||||
|
||||
if images:
|
||||
# Create thumbnails directory
|
||||
thumbnails_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], 'thumbnails', secure_filename(set_number))
|
||||
os.makedirs(thumbnails_dir, exist_ok=True)
|
||||
|
||||
# Generate thumbnail filename
|
||||
pdf_filename = os.path.basename(pdf_path)
|
||||
thumb_filename = os.path.splitext(pdf_filename)[0] + '_thumb.png'
|
||||
thumb_path = os.path.join(thumbnails_dir, thumb_filename)
|
||||
|
||||
# Resize and save
|
||||
img = images[0]
|
||||
img.thumbnail(size, Image.Resampling.LANCZOS)
|
||||
img.save(thumb_path, 'PNG', optimize=True)
|
||||
|
||||
# Return relative path
|
||||
rel_path = os.path.join('thumbnails', secure_filename(set_number), thumb_filename)
|
||||
return rel_path
|
||||
|
||||
except ImportError:
|
||||
current_app.logger.warning('Neither PyMuPDF nor pdf2image available for PDF thumbnails')
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f'PDF thumbnail creation failed: {str(e)}')
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def delete_file(file_path: str) -> bool:
|
||||
"""
|
||||
Delete a file from storage.
|
||||
|
||||
Args:
|
||||
file_path: Relative path to the file
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
full_path = os.path.join(current_app.config['UPLOAD_FOLDER'], file_path)
|
||||
|
||||
if os.path.exists(full_path):
|
||||
os.remove(full_path)
|
||||
|
||||
# Also delete thumbnail if it exists
|
||||
base, ext = os.path.splitext(full_path)
|
||||
thumb_path = f"{base}_thumb{ext}"
|
||||
if os.path.exists(thumb_path):
|
||||
os.remove(thumb_path)
|
||||
|
||||
return True
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f'File deletion failed: {str(e)}')
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def get_directory_size(set_number: str) -> Tuple[int, int]:
|
||||
"""
|
||||
Get total size of all files for a specific set.
|
||||
|
||||
Args:
|
||||
set_number: LEGO set number
|
||||
|
||||
Returns:
|
||||
Tuple of (total_size_bytes, file_count)
|
||||
"""
|
||||
total_size = 0
|
||||
file_count = 0
|
||||
|
||||
for subdir in ['pdfs', 'images']:
|
||||
set_dir = os.path.join(
|
||||
current_app.config['UPLOAD_FOLDER'],
|
||||
subdir,
|
||||
secure_filename(set_number)
|
||||
)
|
||||
|
||||
if os.path.exists(set_dir):
|
||||
for filename in os.listdir(set_dir):
|
||||
if not filename.endswith('_thumb'):
|
||||
filepath = os.path.join(set_dir, filename)
|
||||
if os.path.isfile(filepath):
|
||||
total_size += os.path.getsize(filepath)
|
||||
file_count += 1
|
||||
|
||||
return total_size, file_count
|
||||
Reference in New Issue
Block a user