Compare commits

...

3 Commits

Author SHA1 Message Date
c2abcd7b97 feat(payments): add discount code admin API
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-14 00:03:57 +03:30
bdc4fc1a49 feat(events): expand admin event management APIs 2026-06-14 00:03:42 +03:30
20e7a04e59 feat(users): add paginated admin metadata APIs 2026-06-14 00:03:27 +03:30
8 changed files with 631 additions and 29 deletions

View File

@@ -138,6 +138,7 @@ class EventListSchema(Schema):
class EventCreateSchema(Schema):
"""Payload for creating events via the API."""
title: str
slug: Optional[str] = None
description: str
event_type: str
address: Optional[str] = None
@@ -150,11 +151,13 @@ class EventCreateSchema(Schema):
capacity: Optional[int] = None
price: Optional[float] = None
status: str = "draft"
registration_success_markdown: Optional[str] = None
gallery_image_ids: Optional[List[int]] = []
class EventUpdateSchema(Schema):
"""Payload for updating events via the API."""
title: Optional[str] = None
slug: Optional[str] = None
description: Optional[str] = None
event_type: Optional[str] = None
address: Optional[str] = None
@@ -167,6 +170,7 @@ class EventUpdateSchema(Schema):
capacity: Optional[int] = None
price: Optional[float] = None
status: Optional[str] = None
registration_success_markdown: Optional[str] = None
gallery_image_ids: Optional[List[int]] = None
class RegistrationSchema(ModelSchema):
@@ -199,12 +203,56 @@ class AdminUserSchema(Schema):
first_name: str
last_name: str
email: str
mobile: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
university: Optional[str] = None
major: Optional[str] = None
student_id: Optional[str] = None
year_of_study: Optional[int] = None
@staticmethod
def resolve_profile_picture(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
return request.build_absolute_uri(image.url) if hasattr(image, "url") else None
@staticmethod
def resolve_profile_picture_thumbnail_url(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
url = derivative_url(image, THUMBNAIL_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_profile_picture_preview_url(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
url = derivative_url(image, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_university(obj):
return obj.get_university_display()
@staticmethod
def resolve_major(obj):
return obj.get_major_display()
class PaymentAdminSchema(Schema):
id: int
authority: Optional[str]
ref_id: Optional[str]
card_pan: Optional[str]
card_hash: Optional[str]
status: int
status_label: str
base_amount: int
@@ -241,7 +289,7 @@ class EventAdminDetailSchema(EventSchema):
@staticmethod
def resolve_registrations(obj):
return obj.registrations.select_related("user").prefetch_related(
return obj.registrations.select_related("user", "user__university", "user__major").prefetch_related(
"payments__discount_code"
).order_by("-registered_at")

View File

@@ -1,18 +1,20 @@
from django.conf import settings
from django.core.files.base import ContentFile
from django.shortcuts import get_object_or_404
from django.db.models import Q, Case, When, IntegerField
from django.utils.text import slugify
from django.utils import timezone
from ninja import Router, Query
from ninja import File, Router, Query, UploadedFile
from ninja.errors import HttpError
from typing import List, Optional
from uuid import UUID
from uuid import UUID, uuid4
from apps.events.api.schemas import (
EventAdminDetailSchema,
EventBriefSchema,
EventCreateSchema,
EventGallerySchema,
EventListSchema,
EventSchema,
EventUpdateSchema,
@@ -26,6 +28,8 @@ from apps.events.api.schemas import (
)
from core.authentication import jwt_auth
from apps.events.models import Event, Registration
from apps.gallery.models import Gallery
from apps.gallery.tasks import process_uploaded_image
from apps.notifications.services import notify_user
from apps.payments.models import DiscountCode
from apps.users.tasks import send_critical_sms
@@ -34,6 +38,28 @@ from core.api.schemas import ErrorSchema, MessageSchema
events_router = Router()
def _is_staff_user(user) -> bool:
return bool(user and (user.is_staff or user.is_superuser))
def _staff_forbidden():
return 403, {"error": "اجازه دسترسی ندارید."}
def _save_uploaded_image(instance, field_name: str, file: UploadedFile, folder: str):
if not file.content_type or not file.content_type.startswith("image/"):
return False, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return False, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
getattr(instance, field_name).save(
f"{folder}/{uuid4().hex}.{extension}",
ContentFile(file.read()),
save=True,
)
return True, instance
def _frontend_event_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):
@@ -130,20 +156,32 @@ def get_event_by_slug(request, slug: str):
)
return event
@events_router.post("/", response=EventSchema)
@events_router.post("/", response={201: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def create_event(request, payload: EventCreateSchema):
"""Create a new event"""
gallery_image_ids = payload.dict().pop('gallery_image_ids', [])
event = Event.objects.create(**payload.dict(exclude={'gallery_image_ids'}))
if not _is_staff_user(request.auth):
return _staff_forbidden()
data = payload.dict(exclude={'gallery_image_ids'})
gallery_image_ids = payload.gallery_image_ids or []
if data.get("slug"):
data["slug"] = slugify(data["slug"])
event = Event(**data)
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids:
event.gallery_images.set(gallery_image_ids)
return event
return 201, event
@events_router.put("/{int:event_id}", response=EventSchema)
@events_router.put("/{int:event_id}", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_event(request, event_id: int, payload: EventUpdateSchema):
"""Update an existing event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
previous_state = {
"status": event.status,
@@ -158,12 +196,18 @@ def update_event(request, event_id: int, payload: EventUpdateSchema):
gallery_image_ids = update_data.pop('gallery_image_ids', None)
for attr, value in update_data.items():
if attr == "slug" and value:
value = slugify(value)
setattr(event, attr, value)
if 'title' in update_data:
if 'title' in update_data and not update_data.get("slug"):
event.slug = slugify(event.title)
event.save()
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids is not None:
event.gallery_images.set(gallery_image_ids)
@@ -196,14 +240,94 @@ def update_event(request, event_id: int, payload: EventUpdateSchema):
sms_kind="event_reschedule",
)
return event
return 200, event
@events_router.delete("/{int:event_id}", response=MessageSchema)
@events_router.delete("/{int:event_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event(request, event_id: int):
"""Soft delete an event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
event.delete()
return {"message": "Event deleted successfully"}
return 200, {"message": "Event deleted successfully"}
@events_router.post("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_featured_image(request, event_id: int, file: UploadedFile = File(...)):
"""Upload or replace the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
ok, result = _save_uploaded_image(event, "featured_image", file, "events/featured")
if not ok:
return 400, result
return 200, event
@events_router.delete("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_featured_image(request, event_id: int):
"""Remove the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if event.featured_image:
event.featured_image.delete(save=False)
event.featured_image = None
event.save(update_fields=["featured_image", "updated_at"])
return 200, event
@events_router.get("/{int:event_id}/gallery", response={200: List[EventGallerySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_event_gallery(request, event_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
return 200, event.gallery_images.filter(is_deleted=False).select_related("uploaded_by")
@events_router.post("/{int:event_id}/gallery", response={201: EventGallerySchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_gallery_image(
request,
event_id: int,
file: UploadedFile = File(...),
title: str | None = None,
alt_text: str | None = None,
):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if not file.content_type or not file.content_type.startswith("image/"):
return 400, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return 400, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
try:
gallery_item = Gallery.objects.create(
title=title or file.name,
description="",
uploaded_by=request.auth,
alt_text=alt_text or title or file.name,
is_public=True,
)
gallery_item._defer_image_processing = True
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
gallery_item.image.save(f"gallery/{uuid4().hex}.{extension}", ContentFile(file.read()))
event.gallery_images.add(gallery_item)
process_uploaded_image.delay(gallery_item.id)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, gallery_item
@events_router.delete("/{int:event_id}/gallery/{int:image_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_gallery_image(request, event_id: int, image_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
image = get_object_or_404(Gallery, id=image_id, is_deleted=False)
event.gallery_images.remove(image)
if not image.event_galleries.exclude(id=event.id).exists():
image.delete()
return 200, {"message": "Gallery image removed"}
# Registration endpoints
@events_router.get("/{int:event_id}/registrations", response=List[RegistrationSchema])
@@ -235,7 +359,7 @@ def list_event_registrations_admin(
event = get_object_or_404(Event, id=event_id, is_deleted=False)
qs = (
event.registrations.filter(is_deleted=False)
.select_related("user")
.select_related("user", "user__university", "user__major")
.prefetch_related("payments__discount_code")
.order_by("-registered_at")
)
@@ -259,6 +383,7 @@ def list_event_registrations_admin(
if search:
qs = qs.filter(
Q(user__username__icontains=search)
| Q(user__mobile__icontains=search)
| Q(user__email__icontains=search)
| Q(user__first_name__icontains=search)
| Q(user__last_name__icontains=search)

View File

@@ -37,6 +37,7 @@ class EventsAPIIntegrationTests(TestCase):
cls.user = User.objects.create_user(
username="event_user",
email="event.user@example.com",
mobile="09198000001",
password=cls.password,
)
cls.user.is_email_verified = True
@@ -45,6 +46,7 @@ class EventsAPIIntegrationTests(TestCase):
cls.staff = User.objects.create_user(
username="event_staff",
email="event.staff@example.com",
mobile="09198000002",
password=cls.password,
is_staff=True,
)
@@ -151,19 +153,21 @@ class EventsAPIIntegrationTests(TestCase):
"/api/events/",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(created.status_code, 200)
self.assertEqual(created.status_code, 201)
event_id = created.json()["id"]
updated = self.client.put(
f"/api/events/{event_id}",
data=json.dumps({"title": "Updated Event"}),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(updated.status_code, 200)
self.assertEqual(updated.json()["title"], "Updated Event")
deleted = self.client.delete(f"/api/events/{event_id}")
deleted = self.client.delete(f"/api/events/{event_id}", **self._auth_headers(self.staff_token))
self.assertEqual(deleted.status_code, 200)
def test_admin_detail_and_registration_list_requires_staff(self):
@@ -230,9 +234,10 @@ class EventsAPIIntegrationTests(TestCase):
"/api/events/",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
body = response.json()
self.assertEqual(response.status_code, 200)
self.assertEqual(response.status_code, 201)
self.assertTrue(body["gallery_images"])
updated = self.client.put(
@@ -244,6 +249,7 @@ class EventsAPIIntegrationTests(TestCase):
}
),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(updated.status_code, 200)
self.assertEqual(updated.json()["slug"], "gallery-event-updated")
@@ -370,7 +376,8 @@ class EventsAPIIntegrationTests(TestCase):
self.assertEqual(response.status_code, 400)
def _create_event_user(self, username, email):
user = User.objects.create_user(username=username, email=email, password=self.password)
suffix = str(abs(hash(username)) % 1_000_000).zfill(6)
user = User.objects.create_user(username=username, email=email, mobile=f"09190{suffix}", password=self.password)
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
user.major = self.user.major
@@ -468,6 +475,7 @@ class EventSchemasIntegrationTests(TestCase):
self.user = User.objects.create_user(
username="schema_user",
email="schema.user@example.com",
mobile="09198000003",
password=self.password,
)
self.user.is_email_verified = True

View File

@@ -1,4 +1,42 @@
from ninja import Schema
from datetime import datetime
class DiscountCodeSchema(Schema):
id: int
code: str
type: str
value: int
max_discount: int | None = None
is_active: bool
starts_at: datetime | None = None
ends_at: datetime | None = None
usage_limit_total: int | None = None
usage_limit_per_user: int | None = None
min_amount: int | None = None
applicable_event_ids: list[int]
usage_count: int = 0
created_at: datetime
updated_at: datetime
class PagedDiscountCodeSchema(Schema):
count: int
results: list[DiscountCodeSchema]
class DiscountCodeWriteSchema(Schema):
code: str
type: str = "percent"
value: int
max_discount: int | None = None
is_active: bool = True
starts_at: datetime | None = None
ends_at: datetime | None = None
usage_limit_total: int | None = None
usage_limit_per_user: int | None = None
min_amount: int | None = None
applicable_event_ids: list[int] = []
class CreatePaymentIn(Schema):

View File

@@ -1,8 +1,9 @@
from django.conf import settings
from django.shortcuts import redirect, get_object_or_404
from django.utils import timezone
from django.db.models import Count, Q
from ninja import Router
from ninja import Query, Router
from ninja.errors import HttpError
import requests
@@ -11,11 +12,140 @@ from apps.events.models import Event, Registration
from apps.notifications.services import notify_user
from apps.users.tasks import send_critical_sms
from core.authentication import jwt_auth
from apps.payments.api.schemas import CouponVerifyIn, CouponVerifyOut, CreatePaymentIn, CreatePaymentOut, PaymentDetailOut
from core.api.schemas import ErrorSchema, MessageSchema
from apps.payments.api.schemas import (
CouponVerifyIn,
CouponVerifyOut,
CreatePaymentIn,
CreatePaymentOut,
DiscountCodeSchema,
DiscountCodeWriteSchema,
PagedDiscountCodeSchema,
PaymentDetailOut,
)
payments_router = Router(tags=["Payments"])
def _staff_required(user):
return bool(user and (user.is_staff or user.is_superuser))
def _discount_payload(code: DiscountCode):
return {
"id": code.id,
"code": code.code,
"type": code.type,
"value": code.value,
"max_discount": code.max_discount,
"is_active": code.is_active,
"starts_at": code.starts_at,
"ends_at": code.ends_at,
"usage_limit_total": code.usage_limit_total,
"usage_limit_per_user": code.usage_limit_per_user,
"min_amount": code.min_amount,
"applicable_event_ids": list(code.applicable_events.values_list("id", flat=True)),
"usage_count": getattr(code, "usage_count", None) or code.payments.filter(
status__in=[Payment.OrderStatusChoices.PAID, Payment.OrderStatusChoices.PENDING]
).count(),
"created_at": code.created_at,
"updated_at": code.updated_at,
}
def _apply_discount_payload(instance: DiscountCode, payload: DiscountCodeWriteSchema):
data = payload.dict()
event_ids = data.pop("applicable_event_ids", [])
for field, value in data.items():
setattr(instance, field, value)
instance.code = instance.code.strip().upper()
instance.full_clean()
instance.save()
instance.applicable_events.set(Event.objects.filter(id__in=event_ids, is_deleted=False))
return instance
@payments_router.get("/admin/discount-codes", response={200: PagedDiscountCodeSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_discount_codes(
request,
search: str | None = Query(None),
is_active: bool | None = Query(None),
type: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
queryset = DiscountCode.objects.annotate(
usage_count=Count(
"payments",
filter=Q(
payments__status__in=[
Payment.OrderStatusChoices.PAID,
Payment.OrderStatusChoices.PENDING,
]
),
)
).prefetch_related("applicable_events").order_by("-created_at")
if search:
queryset = queryset.filter(code__icontains=search)
if is_active is not None:
queryset = queryset.filter(is_active=is_active)
if type:
queryset = queryset.filter(type=type)
count = queryset.count()
return 200, {"count": count, "results": [_discount_payload(item) for item in queryset[offset : offset + limit]]}
@payments_router.post("/admin/discount-codes", response={201: DiscountCodeSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_discount_code(request, payload: DiscountCodeWriteSchema):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
if DiscountCode.all_objects.filter(code=payload.code.strip().upper()).exists():
return 400, {"error": "Discount code already exists"}
try:
code = _apply_discount_payload(DiscountCode(), payload)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, _discount_payload(code)
@payments_router.put("/admin/discount-codes/{int:code_id}", response={200: DiscountCodeSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_discount_code(request, code_id: int, payload: DiscountCodeWriteSchema):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
code = get_object_or_404(DiscountCode, id=code_id, is_deleted=False)
normalized = payload.code.strip().upper()
if DiscountCode.all_objects.filter(code=normalized).exclude(id=code_id).exists():
return 400, {"error": "Discount code already exists"}
try:
code = _apply_discount_payload(code, payload)
except Exception as exc:
return 400, {"error": str(exc)}
return 200, _discount_payload(code)
@payments_router.delete("/admin/discount-codes/{int:code_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_discount_code(request, code_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete discount codes"}
code = get_object_or_404(DiscountCode, id=code_id, is_deleted=False)
code.delete()
return 200, {"message": "Discount code deleted"}
@payments_router.post("/admin/discount-codes/{int:code_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_discount_code(request, code_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore discount codes"}
try:
code = DiscountCode.deleted_objects.get(id=code_id)
except DiscountCode.DoesNotExist:
return 400, {"error": "Discount code not found"}
code.restore()
return 200, {"message": "Discount code restored"}
def _event_action_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):

View File

@@ -1,15 +1,209 @@
from ninja import Router
from django.db.models import Q
from ninja import Query, Router, Schema
from apps.users.models import Major, University
from core.api.schemas import ErrorSchema, MessageSchema
from core.authentication import jwt_auth
meta_router = Router(tags=['meta'])
@meta_router.get("/majors")
def list_majors(request):
majors = Major.objects.filter(is_deleted=False, is_active=True).order_by("name")
return [{"id": m.id, "code": m.code, "label": m.name} for m in majors]
@meta_router.get("/universities")
def list_universities(request):
universities = University.objects.filter(is_deleted=False, is_active=True).order_by("name")
return [{"id": u.id, "code": u.code, "label": u.name} for u in universities]
class MetaOptionSchema(Schema):
id: int
code: str
label: str
is_active: bool = True
user_count: int = 0
class PagedMetaOptionSchema(Schema):
count: int
results: list[MetaOptionSchema]
class MetaOptionWriteSchema(Schema):
code: str
name: str
is_active: bool = True
def _is_staff(user):
return bool(user and (user.is_staff or user.is_superuser))
def _option_payload(obj, user_count=0):
return {
"id": obj.id,
"code": obj.code,
"label": obj.name,
"is_active": obj.is_active,
"user_count": user_count,
}
def _list_options(model, search, limit, offset, active_only=True):
queryset = model.objects.filter(is_deleted=False).order_by("name")
if active_only:
queryset = queryset.filter(is_active=True)
if search:
queryset = queryset.filter(Q(code__icontains=search) | Q(name__icontains=search))
count = queryset.count()
return count, list(queryset[offset : offset + limit])
@meta_router.get("/majors", response=PagedMetaOptionSchema)
def list_majors(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
count, majors = _list_options(Major, search, limit, offset)
return {"count": count, "results": [_option_payload(m) for m in majors]}
@meta_router.get("/universities", response=PagedMetaOptionSchema)
def list_universities(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
count, universities = _list_options(University, search, limit, offset)
return {"count": count, "results": [_option_payload(u) for u in universities]}
@meta_router.get("/admin/majors", response={200: PagedMetaOptionSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_majors(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
count, majors = _list_options(Major, search, limit, offset, active_only=False)
return 200, {
"count": count,
"results": [_option_payload(m, m.users.filter(is_deleted=False).count()) for m in majors],
}
@meta_router.post("/admin/majors", response={201: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_major(request, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
if Major.all_objects.filter(code=payload.code).exists():
return 400, {"error": "Major code already exists"}
major = Major.objects.create(code=payload.code.strip(), name=payload.name.strip(), is_active=payload.is_active)
return 201, _option_payload(major)
@meta_router.put("/admin/majors/{int:item_id}", response={200: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_major(request, item_id: int, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
try:
major = Major.objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
conflict = Major.all_objects.filter(code=payload.code).exclude(id=item_id).exists()
if conflict:
return 400, {"error": "Major code already exists"}
major.code = payload.code.strip()
major.name = payload.name.strip()
major.is_active = payload.is_active
major.save(update_fields=["code", "name", "is_active", "updated_at"])
return 200, _option_payload(major, major.users.filter(is_deleted=False).count())
@meta_router.delete("/admin/majors/{int:item_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_major(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete majors"}
try:
major = Major.objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
major.delete()
return 200, {"message": "Major deleted"}
@meta_router.post("/admin/majors/{int:item_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_major(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore majors"}
try:
major = Major.deleted_objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
major.restore()
return 200, {"message": "Major restored"}
@meta_router.get("/admin/universities", response={200: PagedMetaOptionSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_universities(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
count, universities = _list_options(University, search, limit, offset, active_only=False)
return 200, {
"count": count,
"results": [_option_payload(u, u.users.filter(is_deleted=False).count()) for u in universities],
}
@meta_router.post("/admin/universities", response={201: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_university(request, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
if University.all_objects.filter(code=payload.code).exists():
return 400, {"error": "University code already exists"}
university = University.objects.create(code=payload.code.strip(), name=payload.name.strip(), is_active=payload.is_active)
return 201, _option_payload(university)
@meta_router.put("/admin/universities/{int:item_id}", response={200: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_university(request, item_id: int, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
try:
university = University.objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
conflict = University.all_objects.filter(code=payload.code).exclude(id=item_id).exists()
if conflict:
return 400, {"error": "University code already exists"}
university.code = payload.code.strip()
university.name = payload.name.strip()
university.is_active = payload.is_active
university.save(update_fields=["code", "name", "is_active", "updated_at"])
return 200, _option_payload(university, university.users.filter(is_deleted=False).count())
@meta_router.delete("/admin/universities/{int:item_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_university(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete universities"}
try:
university = University.objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
university.delete()
return 200, {"message": "University deleted"}
@meta_router.post("/admin/universities/{int:item_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_university(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore universities"}
try:
university = University.deleted_objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
university.restore()
return 200, {"message": "University restored"}

View File

@@ -178,6 +178,19 @@ class UserListSchema(ModelSchema):
major: Optional[str] = None
university: Optional[str] = None
mobile: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
student_id: Optional[str] = None
year_of_study: Optional[int] = None
bio: Optional[str] = None
is_email_verified: bool
is_mobile_verified: bool
is_deleted: bool
deleted_at: Optional[datetime] = None
can_access_blog_admin: bool
can_write_blog_posts: bool
can_review_blog_posts: bool
class Meta:
model = User
@@ -188,13 +201,19 @@ class UserListSchema(ModelSchema):
"mobile",
"first_name",
"last_name",
"student_id",
"year_of_study",
"bio",
"is_active",
"is_staff",
"is_superuser",
"date_joined",
"major",
"university",
"is_email_verified",
"is_mobile_verified",
"is_deleted",
"deleted_at",
]
@staticmethod
@@ -205,6 +224,37 @@ class UserListSchema(ModelSchema):
def resolve_university(obj):
return obj.get_university_display()
@staticmethod
def resolve_can_access_blog_admin(obj):
return can_access_blog_admin(obj)
@staticmethod
def resolve_can_write_blog_posts(obj):
return can_write_blog_posts(obj)
@staticmethod
def resolve_can_review_blog_posts(obj):
return can_review_blog_posts(obj)
@staticmethod
def resolve_profile_picture(obj, context):
request = context["request"]
if obj.profile_picture and hasattr(obj.profile_picture, "url"):
return request.build_absolute_uri(obj.profile_picture.url)
return None
@staticmethod
def resolve_profile_picture_thumbnail_url(obj, context):
request = context["request"]
url = derivative_url(obj.profile_picture, THUMBNAIL_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_profile_picture_preview_url(obj, context):
request = context["request"]
url = derivative_url(obj.profile_picture, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
class AuthorizationRoleSchema(Schema):
key: str

View File

@@ -533,6 +533,15 @@ def list_users(
return queryset[offset : offset + limit]
@auth_router.get("/users/{user_id}", response={200: UserProfileSchema, 403: ErrorSchema, 404: ErrorSchema}, auth=jwt_auth)
def get_user_detail(request, user_id: int):
user = request.auth
if not (user.is_staff or user.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
target = get_object_or_404(User, id=user_id)
return 200, target
@auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_authorization_roles(request):
if not _ensure_superuser(request.auth):