250 lines
8.0 KiB
Python
250 lines
8.0 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
from django.core.files.storage import Storage, default_storage
|
|
|
|
from PIL import Image, ImageFilter, ImageOps, UnidentifiedImageError
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BLUR_VARIANT = "blur"
|
|
PREVIEW_VARIANT = "preview"
|
|
THUMBNAIL_VARIANT = "thumbnail"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ImageVariantSpec:
|
|
key: str
|
|
max_size: tuple[int, int]
|
|
quality: int = 80
|
|
blur_radius: float = 0.0
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ImageFamilySpec:
|
|
key: str
|
|
original_max_size: tuple[int, int]
|
|
original_quality: int
|
|
variants: tuple[ImageVariantSpec, ...]
|
|
|
|
|
|
IMAGE_FAMILIES: dict[str, ImageFamilySpec] = {
|
|
"event_featured": ImageFamilySpec(
|
|
key="event_featured",
|
|
original_max_size=(2560, 2560),
|
|
original_quality=84,
|
|
variants=(
|
|
ImageVariantSpec(THUMBNAIL_VARIANT, (640, 640), quality=72),
|
|
ImageVariantSpec(PREVIEW_VARIANT, (1600, 1600), quality=82),
|
|
ImageVariantSpec(BLUR_VARIANT, (48, 48), quality=36, blur_radius=10.0),
|
|
),
|
|
),
|
|
"gallery": ImageFamilySpec(
|
|
key="gallery",
|
|
original_max_size=(2560, 2560),
|
|
original_quality=84,
|
|
variants=(
|
|
ImageVariantSpec(THUMBNAIL_VARIANT, (480, 480), quality=68),
|
|
ImageVariantSpec(PREVIEW_VARIANT, (1280, 1280), quality=78),
|
|
ImageVariantSpec(BLUR_VARIANT, (48, 48), quality=34, blur_radius=12.0),
|
|
),
|
|
),
|
|
"blog_featured": ImageFamilySpec(
|
|
key="blog_featured",
|
|
original_max_size=(2560, 2560),
|
|
original_quality=84,
|
|
variants=(
|
|
ImageVariantSpec(THUMBNAIL_VARIANT, (640, 640), quality=72),
|
|
ImageVariantSpec(PREVIEW_VARIANT, (1600, 1600), quality=82),
|
|
ImageVariantSpec(BLUR_VARIANT, (48, 48), quality=36, blur_radius=10.0),
|
|
),
|
|
),
|
|
"blog_asset": ImageFamilySpec(
|
|
key="blog_asset",
|
|
original_max_size=(2560, 2560),
|
|
original_quality=84,
|
|
variants=(
|
|
ImageVariantSpec(THUMBNAIL_VARIANT, (640, 640), quality=72),
|
|
ImageVariantSpec(PREVIEW_VARIANT, (1600, 1600), quality=82),
|
|
ImageVariantSpec(BLUR_VARIANT, (48, 48), quality=36, blur_radius=10.0),
|
|
),
|
|
),
|
|
"profile_picture": ImageFamilySpec(
|
|
key="profile_picture",
|
|
original_max_size=(1200, 1200),
|
|
original_quality=82,
|
|
variants=(
|
|
ImageVariantSpec(THUMBNAIL_VARIANT, (160, 160), quality=72),
|
|
ImageVariantSpec(PREVIEW_VARIANT, (640, 640), quality=82),
|
|
ImageVariantSpec(BLUR_VARIANT, (40, 40), quality=32, blur_radius=10.0),
|
|
),
|
|
),
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ProcessedImageResult:
|
|
width: int
|
|
height: int
|
|
file_size: int
|
|
processed: bool
|
|
|
|
|
|
def get_image_family_spec(family: str) -> ImageFamilySpec:
|
|
try:
|
|
return IMAGE_FAMILIES[family]
|
|
except KeyError as exc:
|
|
raise ValueError(f"Unknown image family: {family}") from exc
|
|
|
|
|
|
def get_image_previous_name(instance, field_name: str) -> str | None:
|
|
if not getattr(instance, "pk", None):
|
|
return None
|
|
|
|
manager = getattr(instance.__class__, "all_objects", instance.__class__._default_manager)
|
|
return manager.filter(pk=instance.pk).values_list(field_name, flat=True).first()
|
|
|
|
|
|
def derivative_name(original_name: str, variant_key: str) -> str:
|
|
original_path = Path(original_name)
|
|
return str(original_path.with_name(f"{original_path.stem}.{variant_key}.webp")).replace("\\", "/")
|
|
|
|
|
|
def iter_derivative_names(original_name: str, family: str) -> Iterable[str]:
|
|
for variant in get_image_family_spec(family).variants:
|
|
yield derivative_name(original_name, variant.key)
|
|
|
|
|
|
def derivative_url(file_field, variant_key: str) -> str | None:
|
|
if not file_field or not getattr(file_field, "name", None):
|
|
return None
|
|
|
|
name = derivative_name(file_field.name, variant_key)
|
|
if not file_field.storage.exists(name):
|
|
return None
|
|
|
|
return file_field.storage.url(name)
|
|
|
|
|
|
def delete_image_derivatives_by_name(
|
|
storage: Storage | None,
|
|
original_name: str | None,
|
|
family: str,
|
|
*,
|
|
delete_original: bool = False,
|
|
) -> None:
|
|
if not original_name:
|
|
return
|
|
|
|
storage = storage or default_storage
|
|
for name in iter_derivative_names(original_name, family):
|
|
if storage.exists(name):
|
|
storage.delete(name)
|
|
|
|
if delete_original and storage.exists(original_name):
|
|
storage.delete(original_name)
|
|
|
|
|
|
def delete_image_derivatives(file_field, family: str, *, delete_original: bool = False) -> None:
|
|
if not file_field or not getattr(file_field, "name", None):
|
|
return
|
|
delete_image_derivatives_by_name(
|
|
getattr(file_field, "storage", default_storage),
|
|
file_field.name,
|
|
family,
|
|
delete_original=delete_original,
|
|
)
|
|
|
|
|
|
def process_public_image(file_field, family: str, *, force: bool = False) -> ProcessedImageResult:
|
|
if not file_field or not getattr(file_field, "path", None):
|
|
raise ValueError("Image field must point to a local file path.")
|
|
|
|
spec = get_image_family_spec(family)
|
|
missing_derivatives = [
|
|
name
|
|
for name in iter_derivative_names(file_field.name, family)
|
|
if not file_field.storage.exists(name)
|
|
]
|
|
should_process = force or bool(missing_derivatives)
|
|
|
|
if should_process:
|
|
with Image.open(file_field.path) as opened:
|
|
source = ImageOps.exif_transpose(opened)
|
|
optimized = _resize_for_original(source, spec.original_max_size)
|
|
_save_original(optimized, file_field.path, quality=spec.original_quality)
|
|
|
|
for variant in spec.variants:
|
|
derivative = _build_variant_image(optimized, variant)
|
|
target_name = derivative_name(file_field.name, variant.key)
|
|
target_path = file_field.storage.path(target_name)
|
|
Path(target_path).parent.mkdir(parents=True, exist_ok=True)
|
|
derivative.save(target_path, format="WEBP", quality=variant.quality, method=6)
|
|
|
|
with Image.open(file_field.path) as current:
|
|
width, height = current.size
|
|
|
|
file_size = file_field.storage.size(file_field.name)
|
|
return ProcessedImageResult(
|
|
width=width,
|
|
height=height,
|
|
file_size=file_size,
|
|
processed=should_process,
|
|
)
|
|
|
|
|
|
def _resize_for_original(image: Image.Image, max_size: tuple[int, int]) -> Image.Image:
|
|
prepared = image.copy()
|
|
prepared.thumbnail(max_size, Image.Resampling.LANCZOS)
|
|
return prepared
|
|
|
|
|
|
def _save_original(image: Image.Image, path: str, *, quality: int) -> None:
|
|
suffix = Path(path).suffix.lower()
|
|
has_alpha = "A" in image.getbands()
|
|
|
|
if suffix in {".jpg", ".jpeg"}:
|
|
image.convert("RGB").save(
|
|
path,
|
|
format="JPEG",
|
|
quality=quality,
|
|
optimize=True,
|
|
progressive=True,
|
|
)
|
|
return
|
|
|
|
if suffix == ".png":
|
|
target = image if has_alpha else image.convert("RGB")
|
|
target.save(path, format="PNG", optimize=True, compress_level=9)
|
|
return
|
|
|
|
if suffix == ".webp":
|
|
image.save(path, format="WEBP", quality=quality, method=6)
|
|
return
|
|
|
|
image.save(path)
|
|
|
|
|
|
def _build_variant_image(image: Image.Image, variant: ImageVariantSpec) -> Image.Image:
|
|
target = image.copy()
|
|
if variant.blur_radius > 0:
|
|
target.thumbnail(variant.max_size, Image.Resampling.LANCZOS)
|
|
target = target.filter(ImageFilter.GaussianBlur(radius=variant.blur_radius))
|
|
return target
|
|
|
|
target.thumbnail(variant.max_size, Image.Resampling.LANCZOS)
|
|
return target
|
|
|
|
|
|
def safe_process_public_image(file_field, family: str, *, force: bool = False) -> ProcessedImageResult | None:
|
|
try:
|
|
return process_public_image(file_field, family, force=force)
|
|
except (FileNotFoundError, OSError, UnidentifiedImageError, ValueError) as exc:
|
|
logger.warning("Failed to process image '%s': %s", getattr(file_field, "name", None), exc)
|
|
return None
|