Files
qlockify-backend-deployment/core/models/base.py
2026-03-11 18:01:27 +08:00

228 lines
7.4 KiB
Python

import contextlib
import uuid
from functools import cached_property
from django.conf import settings
from django.db import models
from django.db.models.deletion import ProtectedError
from django.utils import timezone
from core.middlewares.current_user import get_current_user
from core.utils import common_datetime_str
class SoftDeleteQuerySet(models.QuerySet):
def delete(self):
for obj in self:
obj.delete()
return
def hard_delete(self):
return super().delete()
def alive(self):
return self.filter(is_deleted=False)
def dead(self):
return self.filter(is_deleted=True)
class SoftDeleteManager(models.Manager):
def __init__(self, *args, **kwargs):
self.alive_only = kwargs.pop("alive_only", None)
super().__init__(*args, **kwargs)
def get_queryset(self) -> SoftDeleteQuerySet:
if self.alive_only is True:
return SoftDeleteQuerySet(self.model).filter(is_deleted=False)
if self.alive_only is False:
return SoftDeleteQuerySet(self.model).filter(is_deleted=True)
return SoftDeleteQuerySet(self.model)
def hard_delete(self):
return self.get_queryset().hard_delete()
class BaseModel(models.Model):
id = models.UUIDField(default=uuid.uuid7, primary_key=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
deleted_at = models.DateTimeField(null=True, blank=True)
is_deleted = models.BooleanField(default=False)
is_active = models.BooleanField(default=False)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="created_%(app_label)s_%(class)s_set",
)
updated_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="updated_%(app_label)s_%(class)s_set",
)
objects = SoftDeleteManager(alive_only=True)
all_objects = SoftDeleteManager(alive_only=None)
deleted_objects = SoftDeleteManager(alive_only=False)
class Meta:
abstract = True
indexes = (models.Index(fields=["id"], name="%(class)s_id_idx"),)
def save(self, *args, **kwargs):
user = get_current_user()
if user and user.is_authenticated:
if not self.created_by:
self.created_by = user
self.updated_by = user
super().save(*args, **kwargs)
@classmethod
def get_or_restore(cls, defaults=None, **kwargs):
instance = cls.all_objects.filter(**kwargs).first()
if instance:
restored = False
if instance.is_deleted:
instance.restore()
restored = True
if defaults:
for key, value in defaults.items():
setattr(instance, key, value)
instance.save(update_fields=list(defaults.keys()))
return instance, False, restored
instance, created = cls.objects.get_or_create(defaults=defaults, **kwargs)
return instance, created, False
@classmethod
def update_or_restore(cls, defaults=None, **kwargs):
instance = cls.all_objects.filter(**kwargs).first()
if instance:
restored = False
if instance.is_deleted:
instance.restore()
restored = True
if defaults:
for key, value in defaults.items():
setattr(instance, key, value)
instance.save(update_fields=list(defaults.keys()))
return instance, False, restored
instance, created = cls.objects.update_or_create(defaults=defaults, **kwargs)
return instance, created, False
def _soft_delete_related(self, using=None):
for rel in self._meta.related_objects:
if not hasattr(rel, "on_delete"):
continue
on_delete = rel.on_delete
if on_delete not in (models.CASCADE, models.SET_NULL, models.PROTECT):
continue
accessor = rel.get_accessor_name()
try:
related = getattr(self, accessor)
except Exception:
continue
if on_delete is models.PROTECT:
if rel.one_to_one:
try:
_ = related
except rel.related_model.DoesNotExist:
continue
raise ProtectedError(
"Cannot delete because related protected objects exist.",
[related],
)
if related.all().exists():
raise ProtectedError(
"Cannot delete because related protected objects exist.",
list(related.all()),
)
continue
if on_delete is models.SET_NULL:
field_name = rel.field.name
if rel.one_to_one:
try:
obj = related
except rel.related_model.DoesNotExist:
continue
setattr(obj, field_name, None)
obj.save(using=using, update_fields=[field_name])
else:
related.all().update(**{field_name: None})
continue
if rel.one_to_one:
with contextlib.suppress(rel.related_model.DoesNotExist):
related.delete(using=using)
else:
for obj in related.all():
obj.delete(using=using)
def delete(self, using=None, keep_parents=False):
if self.is_deleted:
return
self._soft_delete_related(using=using)
self.is_deleted = True
self.deleted_at = timezone.now()
self.save(using=using, update_fields=["is_deleted", "deleted_at"])
def hard_delete(self, using=None, keep_parents=False):
super().delete(using=using, keep_parents=keep_parents)
def restore(self):
if not self.is_deleted:
return
# Restore related soft-deleted objects for CASCADE relations.
for rel in self._meta.related_objects:
if not hasattr(rel, "on_delete") or rel.on_delete is not models.CASCADE:
continue
accessor = rel.get_accessor_name()
try:
related = getattr(self, accessor)
except Exception:
continue
if rel.one_to_one:
with contextlib.suppress(rel.related_model.DoesNotExist):
related.restore()
else:
for obj in related.all():
obj.restore()
self.is_deleted = False
self.deleted_at = None
self.save(update_fields=["is_deleted", "deleted_at"])
@cached_property
def can_delete(self):
for field in self._meta.related_objects:
try:
if getattr(self, field.related_name).all().exists():
return False
except Exception:
pass
return True
@property
def created_at_display(self):
return common_datetime_str(self.created_at)
@property
def updated_at_display(self):
return common_datetime_str(self.updated_at)