Files

574 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from django.conf import settings
from django.core.files.base import ContentFile
from django.shortcuts import get_object_or_404
from django.db.models import Q, Case, When, IntegerField
from django.utils.text import slugify
from django.utils import timezone
from ninja import File, Router, Query, UploadedFile
from ninja.errors import HttpError
from typing import List, Optional
from uuid import UUID, uuid4
from apps.events.api.schemas import (
EventAdminDetailSchema,
EventBriefSchema,
EventCreateSchema,
EventGallerySchema,
EventListSchema,
EventSchema,
EventUpdateSchema,
MyEventRegistrationOut,
PaginatedRegistrationSchema,
RegisterationDetailSchema,
RegistrationCreateSchema,
RegistrationSchema,
RegistrationStatusOut,
RegistrationStatusUpdateSchema,
)
from core.authentication import jwt_auth
from apps.events.models import Event, Registration
from apps.gallery.models import Gallery
from apps.gallery.tasks import process_uploaded_image
from apps.notifications.services import notify_user
from apps.payments.models import DiscountCode
from apps.users.tasks import send_critical_sms
from core.api.schemas import ErrorSchema, MessageSchema
events_router = Router()
def _is_staff_user(user) -> bool:
return bool(user and (user.is_staff or user.is_superuser))
def _staff_forbidden():
return 403, {"error": "اجازه دسترسی ندارید."}
def _save_uploaded_image(instance, field_name: str, file: UploadedFile, folder: str):
if not file.content_type or not file.content_type.startswith("image/"):
return False, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return False, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
getattr(instance, field_name).save(
f"{folder}/{uuid4().hex}.{extension}",
ContentFile(file.read()),
save=True,
)
return True, instance
def _frontend_event_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):
root = f"{root}/"
return f"{root}events/{event.slug or event.id}"
def _notify_event_update(
event: Event,
*,
notification_type: str,
title: str,
message: str,
level: str,
sms_kind: str | None = None,
):
recipients = (
Registration.objects.filter(event=event, is_deleted=False)
.exclude(status=Registration.StatusChoices.CANCELLED)
.select_related("user")
)
for registration in recipients:
notify_user(
registration.user_id,
{
"type": notification_type,
"title": title,
"message": message,
"level": level,
"action_url": _frontend_event_url(event),
"entity_type": "event",
"entity_id": event.id,
"meta": {"event_status": event.status},
},
)
if sms_kind and registration.user.mobile and registration.user.is_mobile_verified:
send_critical_sms.delay(registration.user.mobile, sms_kind, event.title)
# Event endpoints
@events_router.get("/", response=List[EventListSchema])
def list_events(
request,
# status: Optional[str] = None,
status: Optional[List[str]] = Query(None),
event_type: Optional[str] = None,
search: Optional[str] = None,
limit: int = 20,
offset: int = 0
):
"""List events with filtering and pagination"""
queryset = Event.objects.filter(is_deleted=False).prefetch_related('gallery_images')
if status:
if "," in status:
parts = [s.strip() for s in status.split(",") if s.strip()]
queryset = queryset.filter(status__in=parts)
else:
queryset = queryset.filter(status__in=status)
if event_type:
queryset = queryset.filter(event_type=event_type)
if search:
queryset = queryset.filter(
Q(title__icontains=search) | Q(description__icontains=search)
)
queryset = queryset.annotate(
published_first=Case(
When(status='published', then=0),
default=1,
output_field=IntegerField()
)
).order_by('published_first', '-start_time', '-id')
events = queryset[offset:offset + limit]
return events
@events_router.get("/{int:event_id}", response=EventSchema)
def get_event(request, event_id: int):
"""Get event details by ID"""
event = get_object_or_404(
Event.objects.prefetch_related('gallery_images'),
id=event_id,
is_deleted=False
)
return event
@events_router.get("/slug/{str:slug}", response=EventSchema)
def get_event_by_slug(request, slug: str):
"""Get event details by slug"""
event = get_object_or_404(
Event.objects.prefetch_related('gallery_images'),
slug=slug,
is_deleted=False
)
return event
@events_router.post("/", response={201: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def create_event(request, payload: EventCreateSchema):
"""Create a new event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
data = payload.dict(exclude={'gallery_image_ids'})
gallery_image_ids = payload.gallery_image_ids or []
if data.get("slug"):
data["slug"] = slugify(data["slug"])
event = Event(**data)
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids:
event.gallery_images.set(gallery_image_ids)
return 201, event
@events_router.put("/{int:event_id}", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_event(request, event_id: int, payload: EventUpdateSchema):
"""Update an existing event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
previous_state = {
"status": event.status,
"start_time": event.start_time,
"end_time": event.end_time,
"address": event.address,
"location": event.location,
"online_link": event.online_link,
}
update_data = payload.dict(exclude_unset=True)
gallery_image_ids = update_data.pop('gallery_image_ids', None)
for attr, value in update_data.items():
if attr == "slug" and value:
value = slugify(value)
setattr(event, attr, value)
if 'title' in update_data and not update_data.get("slug"):
event.slug = slugify(event.title)
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids is not None:
event.gallery_images.set(gallery_image_ids)
schedule_changed = any(
previous_state[field] != getattr(event, field)
for field in ("start_time", "end_time", "address", "location", "online_link")
)
cancelled_now = (
previous_state["status"] != Event.StatusChoices.CANCELLED
and event.status == Event.StatusChoices.CANCELLED
)
if cancelled_now:
_notify_event_update(
event,
notification_type="event_cancelled",
title=f"رویداد {event.title} لغو شد",
message="این رویداد لغو شده است. برای جزئیات بیشتر صفحه رویداد را بررسی کنید.",
level="warning",
sms_kind="event_cancellation",
)
elif schedule_changed:
_notify_event_update(
event,
notification_type="event_rescheduled",
title=f"زمان یا محل {event.title} تغییر کرد",
message="جزئیات زمان‌بندی یا محل برگزاری این رویداد به‌روزرسانی شده است.",
level="info",
sms_kind="event_reschedule",
)
return 200, event
@events_router.delete("/{int:event_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event(request, event_id: int):
"""Soft delete an event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
event.delete()
return 200, {"message": "Event deleted successfully"}
@events_router.post("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_featured_image(request, event_id: int, file: UploadedFile = File(...)):
"""Upload or replace the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
ok, result = _save_uploaded_image(event, "featured_image", file, "events/featured")
if not ok:
return 400, result
return 200, event
@events_router.delete("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_featured_image(request, event_id: int):
"""Remove the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if event.featured_image:
event.featured_image.delete(save=False)
event.featured_image = None
event.save(update_fields=["featured_image", "updated_at"])
return 200, event
@events_router.get("/{int:event_id}/gallery", response={200: List[EventGallerySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_event_gallery(request, event_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
return 200, event.gallery_images.filter(is_deleted=False).select_related("uploaded_by")
@events_router.post("/{int:event_id}/gallery", response={201: EventGallerySchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_gallery_image(
request,
event_id: int,
file: UploadedFile = File(...),
title: str | None = None,
alt_text: str | None = None,
):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if not file.content_type or not file.content_type.startswith("image/"):
return 400, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return 400, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
try:
gallery_item = Gallery.objects.create(
title=title or file.name,
description="",
uploaded_by=request.auth,
alt_text=alt_text or title or file.name,
is_public=True,
)
gallery_item._defer_image_processing = True
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
gallery_item.image.save(f"gallery/{uuid4().hex}.{extension}", ContentFile(file.read()))
event.gallery_images.add(gallery_item)
process_uploaded_image.delay(gallery_item.id)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, gallery_item
@events_router.delete("/{int:event_id}/gallery/{int:image_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_gallery_image(request, event_id: int, image_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
image = get_object_or_404(Gallery, id=image_id, is_deleted=False)
event.gallery_images.remove(image)
if not image.event_galleries.exclude(id=event.id).exists():
image.delete()
return 200, {"message": "Gallery image removed"}
# Registration endpoints
@events_router.get("/{int:event_id}/registrations", response=List[RegistrationSchema])
def list_event_registrations(request, event_id: int, limit: int = 20, offset: int = 0):
"""List registrations for a specific event"""
event = get_object_or_404(Event, id=event_id, is_deleted=False)
queryset = event.registrations.filter(is_deleted=False).select_related('user')
registrations = queryset[offset:offset + limit]
return registrations
@events_router.get("/{int:event_id}/admin-registrations", response={200: PaginatedRegistrationSchema, 403: ErrorSchema}, auth=jwt_auth)
def list_event_registrations_admin(
request,
event_id: int,
status: Optional[List[str]] = Query(None),
university: Optional[str] = Query(None),
major: Optional[str] = Query(None),
search: Optional[str] = Query(None),
limit: int = Query(20, ge=1, le=200),
offset: int = Query(0, ge=0),
):
"""List registrations with filters for admin dashboard"""
user = request.auth
if not (user.is_staff or user.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
event = get_object_or_404(Event, id=event_id, is_deleted=False)
qs = (
event.registrations.filter(is_deleted=False)
.select_related("user", "user__university", "user__major")
.prefetch_related("payments__discount_code")
.order_by("-registered_at")
)
status_values = status or request.GET.getlist('status')
if status_values:
qs = qs.filter(status__in=status_values)
if university:
qs = qs.filter(
Q(user__university__code__icontains=university)
| Q(user__university__name__icontains=university)
)
if major:
qs = qs.filter(
Q(user__major__code__icontains=major)
| Q(user__major__name__icontains=major)
)
if search:
qs = qs.filter(
Q(user__username__icontains=search)
| Q(user__mobile__icontains=search)
| Q(user__email__icontains=search)
| Q(user__first_name__icontains=search)
| Q(user__last_name__icontains=search)
)
total = qs.count()
results = qs[offset : offset + limit]
return PaginatedRegistrationSchema(count=total, next=None, previous=None, results=list(results))
@events_router.post(
"/{int:event_id}/register",
response=RegistrationSchema,
auth=jwt_auth,
)
def register_for_event(
request,
event_id: int,
payload: RegistrationCreateSchema | None = None,
):
"""Register current user for an event"""
event = get_object_or_404(Event, id=event_id, is_deleted=False)
user = request.auth
if Registration.objects.filter(event=event, user=user, status=Registration.StatusChoices.CONFIRMED).exists():
raise HttpError(400, "شما قبلا در این ایونت ثبت‌نام کرده‌اید.")
if event.registration_end_date and event.registration_end_date < timezone.now():
raise HttpError(400, "مهلت ثبت‌نام به پایان رسیده‌است")
if event.registration_start_date and event.registration_start_date > timezone.now():
raise HttpError(400, "زمان ثبت‌نام هنوز آغاز نشده است")
if not event.has_available_slots:
raise HttpError(400, "ظرفیت شرکت‌کنندگان تکمیل است")
# Create or get existing registration
discount_code = None
if payload and payload.discount_code:
discount_code = payload.discount_code
elif request.GET.get("discount_code"):
discount_code = request.GET.get("discount_code")
registration, created = Registration.objects.get_or_create(
event=event,
user=user,
status=Registration.StatusChoices.PENDING,
defaults={"final_price": event.price},
)
if registration.status == Registration.StatusChoices.CONFIRMED:
return HttpError(400, "شما قبلا در این ایونت ثبت‌نام کرده‌اید")
if registration.status == Registration.StatusChoices.CANCELLED:
registration = Registration.objects.create(
event=event,
user=user,
status=Registration.StatusChoices.PENDING,
final_price=event.price,
)
elif not created and registration.final_price is None:
registration.final_price = event.price
registration.save(update_fields=["final_price"])
applied_code = None
discount_amount = 0
final_price = event.price
fields_to_update = []
if discount_code:
applied_code = DiscountCode.objects.filter(
code=discount_code,
applicable_events=event,
is_active=True,
).first()
if not applied_code:
raise HttpError(400, "UcO_ O<>OrU?UOU? U.O1O<31>O\"O<EFBFBD> U+UOO3O<33>")
final_price, discount_amount = applied_code.calculate_discount(event, user)
registration.discount_code = applied_code
registration.discount_amount = discount_amount
fields_to_update.extend(["discount_code", "discount_amount"])
if registration.final_price != final_price:
registration.final_price = final_price
fields_to_update.append("final_price")
if not event.price or final_price == 0:
registration.status = Registration.StatusChoices.CONFIRMED
fields_to_update.append("status")
if fields_to_update:
registration.save(update_fields=list(set(fields_to_update)))
return registration
@events_router.put("/registrations/{int:registration_id}", response=RegistrationSchema, auth=jwt_auth)
def update_registration_status(request, registration_id: int, payload: RegistrationStatusUpdateSchema):
"""Update registration status"""
user = request.auth
registration = get_object_or_404(Registration, id=registration_id, user=user, is_deleted=False)
registration.status = payload.dict(exclude_unset=True).get('status')
registration.full_clean()
registration.save()
return registration
@events_router.delete("/registrations/{int:registration_id}", response=MessageSchema, auth=jwt_auth)
def cancel_registration(request, registration_id: int):
"""Cancel a registration"""
user = request.auth
registration = get_object_or_404(Registration, id=registration_id, user=user, is_deleted=False)
registration.delete()
return {"message": "ثبت‌نام شما لغو شد :("}
@events_router.get("/registerations/verify/{UUID:ticket_id}", response=RegisterationDetailSchema, auth=jwt_auth)
def verify_my_registration(request, ticket_id: UUID):
try:
reg = Registration.objects.select_related("event").get(ticket_id=ticket_id, user=request.auth)
return {
"event_image": request.build_absolute_uri(reg.event.featured_image.url) if reg.event.featured_image else None,
"event_title": reg.event.title,
"event_type": reg.event.get_event_type_display(),
"ticket_id": reg.ticket_id,
"status": reg.status,
"registered_at": reg.registered_at,
"success_markdown": reg.event.registration_success_markdown,
}
except Registration.DoesNotExist:
raise HttpError(404, "registration not found")
@events_router.get("/my-registrations", response=List[MyEventRegistrationOut], auth=jwt_auth)
def my_registrations(request):
qs = (
Registration.objects
.filter(user=request.auth)
.select_related("event")
.order_by("-created_at")
)
out: List[MyEventRegistrationOut] = []
for r in qs:
out.append(
MyEventRegistrationOut(
id=r.id,
created_at=r.created_at,
status=r.status,
event=EventBriefSchema(
id=r.event.id,
title=r.event.title,
slug=r.event.slug,
start_date=r.event.start_time,
end_date=r.event.end_time,
location=r.event.location,
price=r.event.price,
absolute_image_url=request.build_absolute_uri(r.event.featured_image.url) if r.event.featured_image else None,
),
)
)
return out
@events_router.get("/{event_id}/is-registered", response=RegistrationStatusOut, auth=jwt_auth)
def is_registered(request, event_id: int):
exists = Registration.objects.filter(
user=request.auth,
event_id=event_id,
status=Registration.StatusChoices.CONFIRMED
).exists()
return {"is_registered": exists}
@events_router.get("/{int:event_id}/admin-detail", response=EventAdminDetailSchema, auth=jwt_auth)
def event_admin_detail(request, event_id: int):
user = request.auth
if not (user.is_staff or user.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
event = get_object_or_404(
Event.objects.prefetch_related(
'gallery_images',
'registrations__user',
'registrations__payments__discount_code'
),
id=event_id,
is_deleted=False,
)
return event