228 lines
7.4 KiB
Python
228 lines
7.4 KiB
Python
import contextlib
|
|
import uuid
|
|
from functools import cached_property
|
|
|
|
from django.conf import settings
|
|
from django.db import models
|
|
from django.db.models.deletion import ProtectedError
|
|
from django.utils import timezone
|
|
|
|
from core.middlewares.current_user import get_current_user
|
|
from core.utils import common_datetime_str
|
|
|
|
|
|
class SoftDeleteQuerySet(models.QuerySet):
|
|
def delete(self):
|
|
for obj in self:
|
|
obj.delete()
|
|
return
|
|
|
|
def hard_delete(self):
|
|
return super().delete()
|
|
|
|
def alive(self):
|
|
return self.filter(is_deleted=False)
|
|
|
|
def dead(self):
|
|
return self.filter(is_deleted=True)
|
|
|
|
|
|
class SoftDeleteManager(models.Manager):
|
|
def __init__(self, *args, **kwargs):
|
|
self.alive_only = kwargs.pop("alive_only", None)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def get_queryset(self) -> SoftDeleteQuerySet:
|
|
if self.alive_only is True:
|
|
return SoftDeleteQuerySet(self.model).filter(is_deleted=False)
|
|
|
|
if self.alive_only is False:
|
|
return SoftDeleteQuerySet(self.model).filter(is_deleted=True)
|
|
|
|
return SoftDeleteQuerySet(self.model)
|
|
|
|
def hard_delete(self):
|
|
return self.get_queryset().hard_delete()
|
|
|
|
|
|
class BaseModel(models.Model):
|
|
id = models.UUIDField(default=uuid.uuid7, primary_key=True)
|
|
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
deleted_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
is_deleted = models.BooleanField(default=False)
|
|
is_active = models.BooleanField(default=False)
|
|
|
|
created_by = models.ForeignKey(
|
|
settings.AUTH_USER_MODEL,
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
related_name="created_%(app_label)s_%(class)s_set",
|
|
)
|
|
updated_by = models.ForeignKey(
|
|
settings.AUTH_USER_MODEL,
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
related_name="updated_%(app_label)s_%(class)s_set",
|
|
)
|
|
|
|
objects = SoftDeleteManager(alive_only=True)
|
|
all_objects = SoftDeleteManager(alive_only=None)
|
|
deleted_objects = SoftDeleteManager(alive_only=False)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
indexes = (models.Index(fields=["id"], name="%(class)s_id_idx"),)
|
|
|
|
def save(self, *args, **kwargs):
|
|
user = get_current_user()
|
|
if user and user.is_authenticated:
|
|
if not self.created_by:
|
|
self.created_by = user
|
|
self.updated_by = user
|
|
super().save(*args, **kwargs)
|
|
|
|
@classmethod
|
|
def get_or_restore(cls, defaults=None, **kwargs):
|
|
instance = cls.all_objects.filter(**kwargs).first()
|
|
if instance:
|
|
restored = False
|
|
if instance.is_deleted:
|
|
instance.restore()
|
|
restored = True
|
|
if defaults:
|
|
for key, value in defaults.items():
|
|
setattr(instance, key, value)
|
|
instance.save(update_fields=list(defaults.keys()))
|
|
return instance, False, restored
|
|
|
|
instance, created = cls.objects.get_or_create(defaults=defaults, **kwargs)
|
|
return instance, created, False
|
|
|
|
@classmethod
|
|
def update_or_restore(cls, defaults=None, **kwargs):
|
|
instance = cls.all_objects.filter(**kwargs).first()
|
|
if instance:
|
|
restored = False
|
|
if instance.is_deleted:
|
|
instance.restore()
|
|
restored = True
|
|
if defaults:
|
|
for key, value in defaults.items():
|
|
setattr(instance, key, value)
|
|
instance.save(update_fields=list(defaults.keys()))
|
|
return instance, False, restored
|
|
|
|
instance, created = cls.objects.update_or_create(defaults=defaults, **kwargs)
|
|
return instance, created, False
|
|
|
|
def _soft_delete_related(self, using=None):
|
|
for rel in self._meta.related_objects:
|
|
if not hasattr(rel, "on_delete"):
|
|
continue
|
|
|
|
on_delete = rel.on_delete
|
|
if on_delete not in (models.CASCADE, models.SET_NULL, models.PROTECT):
|
|
continue
|
|
|
|
accessor = rel.get_accessor_name()
|
|
try:
|
|
related = getattr(self, accessor)
|
|
except Exception:
|
|
continue
|
|
|
|
if on_delete is models.PROTECT:
|
|
if rel.one_to_one:
|
|
try:
|
|
_ = related
|
|
except rel.related_model.DoesNotExist:
|
|
continue
|
|
raise ProtectedError(
|
|
"Cannot delete because related protected objects exist.",
|
|
[related],
|
|
)
|
|
if related.all().exists():
|
|
raise ProtectedError(
|
|
"Cannot delete because related protected objects exist.",
|
|
list(related.all()),
|
|
)
|
|
continue
|
|
|
|
if on_delete is models.SET_NULL:
|
|
field_name = rel.field.name
|
|
if rel.one_to_one:
|
|
try:
|
|
obj = related
|
|
except rel.related_model.DoesNotExist:
|
|
continue
|
|
setattr(obj, field_name, None)
|
|
obj.save(using=using, update_fields=[field_name])
|
|
else:
|
|
related.all().update(**{field_name: None})
|
|
continue
|
|
|
|
if rel.one_to_one:
|
|
with contextlib.suppress(rel.related_model.DoesNotExist):
|
|
related.delete(using=using)
|
|
else:
|
|
for obj in related.all():
|
|
obj.delete(using=using)
|
|
|
|
def delete(self, using=None, keep_parents=False):
|
|
if self.is_deleted:
|
|
return
|
|
self._soft_delete_related(using=using)
|
|
self.is_deleted = True
|
|
self.deleted_at = timezone.now()
|
|
self.save(using=using, update_fields=["is_deleted", "deleted_at"])
|
|
|
|
def hard_delete(self, using=None, keep_parents=False):
|
|
super().delete(using=using, keep_parents=keep_parents)
|
|
|
|
def restore(self):
|
|
if not self.is_deleted:
|
|
return
|
|
# Restore related soft-deleted objects for CASCADE relations.
|
|
for rel in self._meta.related_objects:
|
|
if not hasattr(rel, "on_delete") or rel.on_delete is not models.CASCADE:
|
|
continue
|
|
|
|
accessor = rel.get_accessor_name()
|
|
try:
|
|
related = getattr(self, accessor)
|
|
except Exception:
|
|
continue
|
|
|
|
if rel.one_to_one:
|
|
with contextlib.suppress(rel.related_model.DoesNotExist):
|
|
related.restore()
|
|
else:
|
|
for obj in related.all():
|
|
obj.restore()
|
|
|
|
self.is_deleted = False
|
|
self.deleted_at = None
|
|
self.save(update_fields=["is_deleted", "deleted_at"])
|
|
|
|
@cached_property
|
|
def can_delete(self):
|
|
for field in self._meta.related_objects:
|
|
try:
|
|
if getattr(self, field.related_name).all().exists():
|
|
return False
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
@property
|
|
def created_at_display(self):
|
|
return common_datetime_str(self.created_at)
|
|
|
|
@property
|
|
def updated_at_display(self):
|
|
return common_datetime_str(self.updated_at)
|