initial commit

This commit is contained in:
2026-03-11 17:12:28 +08:00
commit 5d1e1cb7cb
61 changed files with 2971 additions and 0 deletions

0
core/__init__.py Normal file
View File

65
core/admins/base.py Normal file
View File

@@ -0,0 +1,65 @@
from auditlog.mixins import AuditlogHistoryAdminMixin
from django.contrib import admin, messages
from django.db import transaction
from django.db.models.deletion import ProtectedError
from import_export.admin import ImportExportModelAdmin
from unfold.admin import ModelAdmin as UnfoldModelAdmin
from core.admins.utils import SoftDeleteListFilter
class BaseAdmin(AuditlogHistoryAdminMixin, ImportExportModelAdmin, UnfoldModelAdmin):
show_auditlog_history_link = True
actions = ["hard_delete_selected", "restore_selected"]
list_filter = (SoftDeleteListFilter,)
def get_queryset(self, request):
return self.model.all_objects.all()
@admin.action(description="Hard delete selected (permanent)")
def hard_delete_selected(self, request, queryset):
count = queryset.count()
try:
with transaction.atomic():
queryset.hard_delete()
self.message_user(
request,
f"{count} record(s) permanently deleted.",
level=messages.SUCCESS,
)
except ProtectedError:
self.message_user(
request,
"Cannot hard delete because related protected objects exist.",
level=messages.ERROR,
)
except Exception as e:
self.message_user(request, str(e), level=messages.ERROR)
@admin.action(description="Restore selected (undo soft delete)")
def restore_selected(self, request, queryset):
restored = 0
for obj in queryset:
if getattr(obj, "is_deleted", False):
obj.restore()
restored += 1
self.message_user(
request,
f"{restored} record(s) restored.",
level=messages.SUCCESS,
)
def get_actions(self, request):
actions = super().get_actions(request)
if not request.user.is_superuser:
actions.pop("hard_delete_selected", None)
is_deleted_filter = request.GET.get("is_deleted")
should_show_restore_actions = is_deleted_filter == "1"
if not should_show_restore_actions:
actions.pop("restore_selected", None)
actions.pop("hard_delete_selected", None)
return actions

21
core/admins/utils.py Normal file
View File

@@ -0,0 +1,21 @@
from django.contrib import admin
class SoftDeleteListFilter(admin.SimpleListFilter):
title = "Soft Delete Status"
parameter_name = "is_deleted"
def lookups(self, request, model_admin):
return [
("0", "Active"),
("1", "Deleted"),
]
def queryset(self, request, queryset):
if self.value() == "0":
return queryset.filter(is_deleted=False)
if self.value() == "1":
return queryset.model.deleted_objects.all()
return queryset

9
core/api/mixins.py Normal file
View File

@@ -0,0 +1,9 @@
class WorkspaceQuerysetMixin:
workspace_lookup_url_kwarg = "workspace_id"
def get_workspace(self):
return self.kwargs[self.workspace_lookup_url_kwarg]
def get_queryset(self):
queryset = super().get_queryset()
return queryset.filter(workspace_id=self.get_workspace())

View File

@@ -0,0 +1,95 @@
import logging
import traceback
from collections.abc import Iterable
from typing import Any
from django.conf import settings
from rest_framework import status as http_status
from rest_framework.exceptions import ErrorDetail
from rest_framework.response import Response
from rest_framework.views import exception_handler as drf_exception_handler
logger = logging.getLogger(__name__)
def _flatten_messages(values: Iterable) -> list[str]:
items: list[str] = []
for value in values:
items.extend(_to_str_list(value))
return items
def _to_str_list(value: str | ErrorDetail | list | tuple | dict) -> list[str]:
if isinstance(value, str | ErrorDetail):
return [str(value)]
if isinstance(value, list | tuple):
return _flatten_messages(value)
if isinstance(value, dict):
items: list[str] = []
for field, v in value.items():
msgs = _to_str_list(v)
for msg in msgs:
if field in ("non_field_errors", "__all__"):
items.append(str(msg))
else:
items.append(f"{field}: {msg}")
return items
return [str(value)]
def _format_payload(messages: list[str], status_code: int) -> dict[str, Any]:
clean_messages: list[str] = []
for msg in messages:
msg = msg.replace("error:", "").strip()
if ":" in msg:
_, only_msg = msg.split(":", 1)
clean_messages.append(only_msg.strip())
else:
clean_messages.append(msg)
error_message = messages[0] if messages else http_status.HTTP_STATUS_CODES.get(status_code, "Error")
return {
"error": error_message,
"status_code": status_code,
"messages": [{"message": msg} for msg in clean_messages],
}
def _request_extra(context: dict[str, Any]) -> dict[str, Any]:
request = context.get("request")
meta = getattr(request, "META", {})
return {
"request_method": getattr(request, "method", None),
"request_url": getattr(request, "get_full_path", lambda: None)(),
"remote_addr": meta.get("REMOTE_ADDR") if meta else None,
"user_agent": meta.get("HTTP_USER_AGENT", "") if meta else "",
}
def exception_handler(exc, context) -> Response:
response = drf_exception_handler(exc, context)
is_server_error = response is None or getattr(response, "status_code", 500) >= 500
if is_server_error:
logger.exception("DRF exception", extra=_request_extra(context))
if settings.DEBUG:
is_unhandled = response is None
if is_unhandled or is_server_error:
raise
if response is not None:
status_code = response.status_code
detail = response.data
if status_code < 500:
messages = _to_str_list(detail)
payload = _format_payload(messages, status_code)
return Response(payload, status=status_code)
traceback_text = traceback.format_exc()
payload = _format_payload(
["Internal server error."],
http_status.HTTP_500_INTERNAL_SERVER_ERROR,
)
payload["exception"] = str(exc)
payload["traceback"] = traceback_text
return Response(payload, status=http_status.HTTP_500_INTERNAL_SERVER_ERROR)

7
core/filters/base.py Normal file
View File

@@ -0,0 +1,7 @@
import django_filters as filters
class BaseFilterSet(filters.FilterSet):
created_after = filters.DateTimeFilter(field_name="created_at", lookup_expr="gte")
created_before = filters.DateTimeFilter(field_name="created_at", lookup_expr="lte")
updated_after = filters.DateTimeFilter(field_name="updated_at", lookup_expr="gte")
updated_before = filters.DateTimeFilter(field_name="updated_at", lookup_expr="lte")

View File

@@ -0,0 +1,18 @@
import logging
import threading
logger = logging.getLogger(__name__)
_local = threading.local()
def get_current_user():
return getattr(_local, "user", None)
class CurrentUserMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
_local.user = request.user
return self.get_response(request)

View File

@@ -0,0 +1,23 @@
import logging
logger = logging.getLogger(__name__)
class ExceptionLoggingMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
try:
return self.get_response(request)
except Exception:
logger.exception(
"Unhandled exception",
extra={
"request_method": request.method,
"request_url": request.get_full_path(),
"remote_addr": request.META.get("REMOTE_ADDR"),
"user_agent": request.META.get("HTTP_USER_AGENT", ""),
},
)
raise

227
core/models/base.py Normal file
View File

@@ -0,0 +1,227 @@
import contextlib
import uuid
from functools import cached_property
from django.conf import settings
from django.db import models
from django.db.models.deletion import ProtectedError
from django.utils import timezone
from core.middlewares.current_user import get_current_user
from core.utils import common_datetime_str
class SoftDeleteQuerySet(models.QuerySet):
def delete(self):
for obj in self:
obj.delete()
return
def hard_delete(self):
return super().delete()
def alive(self):
return self.filter(is_deleted=False)
def dead(self):
return self.filter(is_deleted=True)
class SoftDeleteManager(models.Manager):
def __init__(self, *args, **kwargs):
self.alive_only = kwargs.pop("alive_only", None)
super().__init__(*args, **kwargs)
def get_queryset(self) -> SoftDeleteQuerySet:
if self.alive_only is True:
return SoftDeleteQuerySet(self.model).filter(is_deleted=False)
if self.alive_only is False:
return SoftDeleteQuerySet(self.model).filter(is_deleted=True)
return SoftDeleteQuerySet(self.model)
def hard_delete(self):
return self.get_queryset().hard_delete()
class BaseModel(models.Model):
id = models.UUIDField(default=uuid.uuid7, primary_key=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
deleted_at = models.DateTimeField(null=True, blank=True)
is_deleted = models.BooleanField(default=False)
is_active = models.BooleanField(default=False)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="created_%(app_label)s_%(class)s_set",
)
updated_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="updated_%(app_label)s_%(class)s_set",
)
objects = SoftDeleteManager(alive_only=True)
all_objects = SoftDeleteManager(alive_only=None)
deleted_objects = SoftDeleteManager(alive_only=False)
class Meta:
abstract = True
indexes = (models.Index(fields=["id"], name="%(class)s_id_idx"),)
def save(self, *args, **kwargs):
user = get_current_user()
if user and user.is_authenticated:
if not self.created_by:
self.created_by = user
self.updated_by = user
super().save(*args, **kwargs)
@classmethod
def get_or_restore(cls, defaults=None, **kwargs):
instance = cls.all_objects.filter(**kwargs).first()
if instance:
restored = False
if instance.is_deleted:
instance.restore()
restored = True
if defaults:
for key, value in defaults.items():
setattr(instance, key, value)
instance.save(update_fields=list(defaults.keys()))
return instance, False, restored
instance, created = cls.objects.get_or_create(defaults=defaults, **kwargs)
return instance, created, False
@classmethod
def update_or_restore(cls, defaults=None, **kwargs):
instance = cls.all_objects.filter(**kwargs).first()
if instance:
restored = False
if instance.is_deleted:
instance.restore()
restored = True
if defaults:
for key, value in defaults.items():
setattr(instance, key, value)
instance.save(update_fields=list(defaults.keys()))
return instance, False, restored
instance, created = cls.objects.update_or_create(defaults=defaults, **kwargs)
return instance, created, False
def _soft_delete_related(self, using=None):
for rel in self._meta.related_objects:
if not hasattr(rel, "on_delete"):
continue
on_delete = rel.on_delete
if on_delete not in (models.CASCADE, models.SET_NULL, models.PROTECT):
continue
accessor = rel.get_accessor_name()
try:
related = getattr(self, accessor)
except Exception:
continue
if on_delete is models.PROTECT:
if rel.one_to_one:
try:
_ = related
except rel.related_model.DoesNotExist:
continue
raise ProtectedError(
"Cannot delete because related protected objects exist.",
[related],
)
if related.all().exists():
raise ProtectedError(
"Cannot delete because related protected objects exist.",
list(related.all()),
)
continue
if on_delete is models.SET_NULL:
field_name = rel.field.name
if rel.one_to_one:
try:
obj = related
except rel.related_model.DoesNotExist:
continue
setattr(obj, field_name, None)
obj.save(using=using, update_fields=[field_name])
else:
related.all().update(**{field_name: None})
continue
if rel.one_to_one:
with contextlib.suppress(rel.related_model.DoesNotExist):
related.delete(using=using)
else:
for obj in related.all():
obj.delete(using=using)
def delete(self, using=None, keep_parents=False):
if self.is_deleted:
return
self._soft_delete_related(using=using)
self.is_deleted = True
self.deleted_at = timezone.now()
self.save(using=using, update_fields=["is_deleted", "deleted_at"])
def hard_delete(self, using=None, keep_parents=False):
super().delete(using=using, keep_parents=keep_parents)
def restore(self):
if not self.is_deleted:
return
# Restore related soft-deleted objects for CASCADE relations.
for rel in self._meta.related_objects:
if not hasattr(rel, "on_delete") or rel.on_delete is not models.CASCADE:
continue
accessor = rel.get_accessor_name()
try:
related = getattr(self, accessor)
except Exception:
continue
if rel.one_to_one:
with contextlib.suppress(rel.related_model.DoesNotExist):
related.restore()
else:
for obj in related.all():
obj.restore()
self.is_deleted = False
self.deleted_at = None
self.save(update_fields=["is_deleted", "deleted_at"])
@cached_property
def can_delete(self):
for field in self._meta.related_objects:
try:
if getattr(self, field.related_name).all().exists():
return False
except Exception:
pass
return True
@property
def created_at_display(self):
return common_datetime_str(self.created_at)
@property
def updated_at_display(self):
return common_datetime_str(self.updated_at)

View File

@@ -0,0 +1,8 @@
from rest_framework.pagination import CursorPagination
class StandardCursorPagination(CursorPagination):
page_size = 50
page_size_query_param = "limit"
cursor_query_param = "cursor"
max_page_size = 100

View File

@@ -0,0 +1,56 @@
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.response import Response
def _positive_int(integer_string):
ret = int(integer_string)
if ret < 1:
raise ValueError()
return ret
class CustomLimitOffsetPagination(LimitOffsetPagination):
limit_query_param = "limit"
offset_query_param = "offset"
def paginate_queryset(self, queryset, request, view=None):
self.limit = self.get_limit(request)
if self.limit is None:
return None
self.count = self.get_count(queryset)
self.offset = self.get_offset(request)
self.request = request
if self.count == 0 or self.offset >= self.count:
return []
return list(queryset[self.offset : self.offset + self.limit])
def get_offset(self, request):
try:
return _positive_int(request.query_params[self.offset_query_param])
except (KeyError, ValueError):
return 0
def get_limit(self, request):
if self.limit_query_param:
try:
return _positive_int(request.query_params[self.limit_query_param])
except (KeyError, ValueError):
pass
return self.default_limit
def get_paginated_response(self, data):
pages_count = 0 if self.count == 0 else (self.count + self.limit - 1) // self.limit
current_page = 0 if self.count == 0 else (self.offset // self.limit) + 1
return Response(
{
"pages_count": pages_count,
"items_per_page": self.limit,
"current_page_items_count": len(data),
"current_page": current_page,
"total_items": self.count,
"items": data,
}
)

57
core/serializers/base.py Normal file
View File

@@ -0,0 +1,57 @@
from django.contrib.auth import get_user_model
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from core.serializers.mini import UserMiniSerializer
User = get_user_model()
class BaseModelSerializer(serializers.ModelSerializer):
"""
Base serializer for all models inheriting from BaseModel.
Returns audit fields with a nested user mini representation.
"""
id = serializers.UUIDField(read_only=True)
created_by = UserMiniSerializer(read_only=True)
updated_by = UserMiniSerializer(read_only=True)
created_at = serializers.SerializerMethodField()
updated_at = serializers.SerializerMethodField()
can_delete = serializers.SerializerMethodField()
class Meta:
model = None
fields = (
"id",
"created_by",
"updated_by",
"created_at",
"updated_at",
"can_delete",
)
read_only_fields = fields
def to_internal_value(self, data):
if isinstance(data, dict):
data = data.copy()
for name, field in self.fields.items():
if (
name in data
and data[name] is None
and isinstance(field, (serializers.CharField, serializers.URLField))
):
data[name] = ""
return super().to_internal_value(data)
@extend_schema_field(serializers.CharField)
def get_created_at(self, obj) -> str:
return obj.created_at_display
@extend_schema_field(serializers.CharField)
def get_updated_at(self, obj) -> str:
return obj.updated_at_display
@extend_schema_field(serializers.BooleanField)
def get_can_delete(self, obj) -> bool:
return bool(getattr(obj, "can_delete", False))

11
core/serializers/mini.py Normal file
View File

@@ -0,0 +1,11 @@
from django.contrib.auth import get_user_model
from rest_framework import serializers
User = get_user_model()
class UserMiniSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ("id", "first_name", "last_name", "mobile")
read_only_fields = fields

72
core/utils.py Normal file
View File

@@ -0,0 +1,72 @@
import os
from datetime import datetime
from django.utils import timezone
from django.utils.text import slugify
def common_user_str(user):
if not user:
return ""
return user.full_name if user.full_name else user.mobile
def common_datetime_str(datetime):
if not datetime:
return ""
try:
if timezone.is_aware(datetime):
datetime = timezone.localtime(datetime)
else:
datetime = timezone.make_aware(datetime, timezone.get_current_timezone())
except Exception:
pass
return datetime.strftime("%Y.%m.%d %H:%M")
def common_date_str(datetime):
if not datetime:
return ""
try:
if timezone.is_aware(datetime):
datetime = timezone.localtime(datetime)
else:
datetime = timezone.make_aware(datetime, timezone.get_current_timezone())
except Exception:
pass
return datetime.strftime("%Y.%m.%d")
def file_name_datetime_str():
dt = timezone.now()
return f"{dt.year}-{dt.month}-{dt.day}-{dt.hour}-{dt.minute}-{dt.second}"
def upload_to_by_date(instance, filename):
today = datetime.now()
timestamp = today.strftime("%Y%m%d%H%M%S")
file_extension = os.path.splitext(filename)[1]
new_filename = f"{timestamp}{file_extension}"
return os.path.join(f"storage/{today.year}/", new_filename)
def calculate_age(birth_date):
"""
Helper Function to calculate age from birth date
"""
if not birth_date:
return None
today = timezone.localdate()
age = today.year - birth_date.year - int((today.month, today.day) < (birth_date.month, birth_date.day))
return age
def generate_slug(title, Object, pk):
base_slug = slugify(title, allow_unicode=True)
slug = base_slug
counter = 2
while slug and Object.objects.filter(slug=slug).exclude(pk=pk).exists():
slug = f"{base_slug}-{counter}"
counter += 1
return slug