from __future__ import annotations import logging from dataclasses import dataclass from pathlib import Path from typing import Iterable from django.core.files.storage import Storage, default_storage from PIL import Image, ImageFilter, ImageOps, UnidentifiedImageError logger = logging.getLogger(__name__) BLUR_VARIANT = "blur" PREVIEW_VARIANT = "preview" THUMBNAIL_VARIANT = "thumbnail" @dataclass(frozen=True) class ImageVariantSpec: key: str max_size: tuple[int, int] quality: int = 80 blur_radius: float = 0.0 @dataclass(frozen=True) class ImageFamilySpec: key: str original_max_size: tuple[int, int] original_quality: int variants: tuple[ImageVariantSpec, ...] IMAGE_FAMILIES: dict[str, ImageFamilySpec] = { "event_featured": ImageFamilySpec( key="event_featured", original_max_size=(2560, 2560), original_quality=84, variants=( ImageVariantSpec(THUMBNAIL_VARIANT, (640, 640), quality=72), ImageVariantSpec(PREVIEW_VARIANT, (1600, 1600), quality=82), ImageVariantSpec(BLUR_VARIANT, (48, 48), quality=36, blur_radius=10.0), ), ), "gallery": ImageFamilySpec( key="gallery", original_max_size=(2560, 2560), original_quality=84, variants=( ImageVariantSpec(THUMBNAIL_VARIANT, (480, 480), quality=68), ImageVariantSpec(PREVIEW_VARIANT, (1280, 1280), quality=78), ImageVariantSpec(BLUR_VARIANT, (48, 48), quality=34, blur_radius=12.0), ), ), "blog_featured": ImageFamilySpec( key="blog_featured", original_max_size=(2560, 2560), original_quality=84, variants=( ImageVariantSpec(THUMBNAIL_VARIANT, (640, 640), quality=72), ImageVariantSpec(PREVIEW_VARIANT, (1600, 1600), quality=82), ImageVariantSpec(BLUR_VARIANT, (48, 48), quality=36, blur_radius=10.0), ), ), "blog_asset": ImageFamilySpec( key="blog_asset", original_max_size=(2560, 2560), original_quality=84, variants=( ImageVariantSpec(THUMBNAIL_VARIANT, (640, 640), quality=72), ImageVariantSpec(PREVIEW_VARIANT, (1600, 1600), quality=82), ImageVariantSpec(BLUR_VARIANT, (48, 48), quality=36, blur_radius=10.0), ), ), "profile_picture": ImageFamilySpec( key="profile_picture", original_max_size=(1200, 1200), original_quality=82, variants=( ImageVariantSpec(THUMBNAIL_VARIANT, (160, 160), quality=72), ImageVariantSpec(PREVIEW_VARIANT, (640, 640), quality=82), ImageVariantSpec(BLUR_VARIANT, (40, 40), quality=32, blur_radius=10.0), ), ), } @dataclass(frozen=True) class ProcessedImageResult: width: int height: int file_size: int processed: bool def get_image_family_spec(family: str) -> ImageFamilySpec: try: return IMAGE_FAMILIES[family] except KeyError as exc: raise ValueError(f"Unknown image family: {family}") from exc def get_image_previous_name(instance, field_name: str) -> str | None: if not getattr(instance, "pk", None): return None manager = getattr(instance.__class__, "all_objects", instance.__class__._default_manager) return manager.filter(pk=instance.pk).values_list(field_name, flat=True).first() def derivative_name(original_name: str, variant_key: str) -> str: original_path = Path(original_name) return str(original_path.with_name(f"{original_path.stem}.{variant_key}.webp")).replace("\\", "/") def iter_derivative_names(original_name: str, family: str) -> Iterable[str]: for variant in get_image_family_spec(family).variants: yield derivative_name(original_name, variant.key) def derivative_url(file_field, variant_key: str) -> str | None: if not file_field or not getattr(file_field, "name", None): return None name = derivative_name(file_field.name, variant_key) if not file_field.storage.exists(name): return None return file_field.storage.url(name) def delete_image_derivatives_by_name( storage: Storage | None, original_name: str | None, family: str, *, delete_original: bool = False, ) -> None: if not original_name: return storage = storage or default_storage for name in iter_derivative_names(original_name, family): if storage.exists(name): storage.delete(name) if delete_original and storage.exists(original_name): storage.delete(original_name) def delete_image_derivatives(file_field, family: str, *, delete_original: bool = False) -> None: if not file_field or not getattr(file_field, "name", None): return delete_image_derivatives_by_name( getattr(file_field, "storage", default_storage), file_field.name, family, delete_original=delete_original, ) def process_public_image(file_field, family: str, *, force: bool = False) -> ProcessedImageResult: if not file_field or not getattr(file_field, "path", None): raise ValueError("Image field must point to a local file path.") spec = get_image_family_spec(family) missing_derivatives = [ name for name in iter_derivative_names(file_field.name, family) if not file_field.storage.exists(name) ] should_process = force or bool(missing_derivatives) if should_process: with Image.open(file_field.path) as opened: source = ImageOps.exif_transpose(opened) optimized = _resize_for_original(source, spec.original_max_size) _save_original(optimized, file_field.path, quality=spec.original_quality) for variant in spec.variants: derivative = _build_variant_image(optimized, variant) target_name = derivative_name(file_field.name, variant.key) target_path = file_field.storage.path(target_name) Path(target_path).parent.mkdir(parents=True, exist_ok=True) derivative.save(target_path, format="WEBP", quality=variant.quality, method=6) with Image.open(file_field.path) as current: width, height = current.size file_size = file_field.storage.size(file_field.name) return ProcessedImageResult( width=width, height=height, file_size=file_size, processed=should_process, ) def _resize_for_original(image: Image.Image, max_size: tuple[int, int]) -> Image.Image: prepared = image.copy() prepared.thumbnail(max_size, Image.Resampling.LANCZOS) return prepared def _save_original(image: Image.Image, path: str, *, quality: int) -> None: suffix = Path(path).suffix.lower() has_alpha = "A" in image.getbands() if suffix in {".jpg", ".jpeg"}: image.convert("RGB").save( path, format="JPEG", quality=quality, optimize=True, progressive=True, ) return if suffix == ".png": target = image if has_alpha else image.convert("RGB") target.save(path, format="PNG", optimize=True, compress_level=9) return if suffix == ".webp": image.save(path, format="WEBP", quality=quality, method=6) return image.save(path) def _build_variant_image(image: Image.Image, variant: ImageVariantSpec) -> Image.Image: target = image.copy() if variant.blur_radius > 0: target.thumbnail(variant.max_size, Image.Resampling.LANCZOS) target = target.filter(ImageFilter.GaussianBlur(radius=variant.blur_radius)) return target target.thumbnail(variant.max_size, Image.Resampling.LANCZOS) return target def safe_process_public_image(file_field, family: str, *, force: bool = False) -> ProcessedImageResult | None: try: return process_public_image(file_field, family, force=force) except (FileNotFoundError, OSError, UnidentifiedImageError, ValueError) as exc: logger.warning("Failed to process image '%s': %s", getattr(file_field, "name", None), exc) return None