Compare commits

...

7 Commits

Author SHA1 Message Date
08cab3b815 feat(analytics): expose full dashboard result groups
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-15 17:33:30 +03:30
a326d2e31d fix(analytics): normalize dashboard entry years 2026-06-15 17:26:57 +03:30
38e7baef9c feat(analytics): split dashboard metrics by domain
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-15 16:17:48 +03:30
8669e99ca5 feat(analytics): add admin dashboard api
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-14 09:51:46 +03:30
c2abcd7b97 feat(payments): add discount code admin API
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-14 00:03:57 +03:30
bdc4fc1a49 feat(events): expand admin event management APIs 2026-06-14 00:03:42 +03:30
20e7a04e59 feat(users): add paginated admin metadata APIs 2026-06-14 00:03:27 +03:30
13 changed files with 1688 additions and 29 deletions

View File

View File

View File

@@ -0,0 +1,212 @@
from typing import Literal
from ninja import Schema
class AnalyticsPointSchema(Schema):
label: str
value: int | float
class AnalyticsPointGroupSchema(Schema):
items: list[AnalyticsPointSchema]
top_items: list[AnalyticsPointSchema]
other_count: int = 0
total_count: int = 0
class AnalyticsTrendPointSchema(Schema):
date: str
label: str
value: int | float
class AnalyticsRegistrationStatusSchema(Schema):
status: str
label: str
value: int
class AnalyticsTopEventSchema(Schema):
id: int
title: str
slug: str
attendees: int
capacity: int | None = None
fill_rate: float | None = None
revenue: int = 0
class AnalyticsPostPopularitySchema(Schema):
id: int
title: str
slug: str
likes: int
saves: int
comments: int
class AnalyticsPostPopularityGroupSchema(Schema):
items: list[AnalyticsPostPopularitySchema]
top_items: list[AnalyticsPostPopularitySchema]
other_count: int = 0
total_count: int = 0
class AnalyticsTopPostSchema(AnalyticsPostPopularitySchema):
score: int
class AnalyticsSummarySchema(Schema):
total_users: int
verified_users: int
total_events: int
total_registrations: int
total_revenue: int
total_discount: int
published_posts: int
total_likes: int
total_saves: int
total_comments: int
class AnalyticsUsersSchema(Schema):
signup_trend: list[AnalyticsTrendPointSchema]
by_major: list[AnalyticsPointSchema]
by_university: list[AnalyticsPointSchema]
by_year: list[AnalyticsPointSchema]
class AnalyticsEventsSchema(Schema):
registration_status: list[AnalyticsRegistrationStatusSchema]
by_major: list[AnalyticsPointSchema]
by_university: list[AnalyticsPointSchema]
top_events: list[AnalyticsTopEventSchema]
registration_trend: list[AnalyticsTrendPointSchema]
class AnalyticsRevenueSchema(Schema):
trend: list[AnalyticsTrendPointSchema]
by_event: list[AnalyticsPointSchema]
payment_status: list[AnalyticsRegistrationStatusSchema]
total_paid: int
total_discount: int
total_base: int
class AnalyticsBlogSchema(Schema):
totals: dict[str, int]
post_popularity: list[AnalyticsPostPopularitySchema]
top_posts: list[AnalyticsTopPostSchema]
activity_trend: list[dict[str, int | str]]
by_category: list[AnalyticsPointSchema]
by_tag: list[AnalyticsPointSchema]
class AnalyticsAchievementsSchema(Schema):
distinct_participants: int
learning_hours: float
published_content: int
community_engagement: int
class AnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
event_id: int | None = None
granularity: Literal["day", "week", "month"]
class AdminDashboardAnalyticsSchema(Schema):
filters: AnalyticsFiltersSchema
summary: AnalyticsSummarySchema
users: AnalyticsUsersSchema
events: AnalyticsEventsSchema
revenue: AnalyticsRevenueSchema
blog: AnalyticsBlogSchema
achievements: AnalyticsAchievementsSchema
class AnalyticsEventOptionSchema(Schema):
value: str
label: str
description: str | None = None
class AnalyticsEventOptionsSchema(Schema):
count: int
results: list[AnalyticsEventOptionSchema]
class UserAnalyticsSummarySchema(Schema):
total_users: int
verified_users: int
unverified_users: int
profile_completion_rate: float
class UserAnalyticsSchema(Schema):
filters: AnalyticsFiltersSchema
summary: UserAnalyticsSummarySchema
signup_trend: list[AnalyticsTrendPointSchema]
by_major: AnalyticsPointGroupSchema
by_university: AnalyticsPointGroupSchema
by_year: AnalyticsPointGroupSchema
class EventAnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
event_id: int | None = None
class EventAnalyticsSummarySchema(Schema):
total_events: int
total_registrations: int
distinct_participants: int
total_revenue: int
total_discount: int
total_base: int
learning_hours: float
class AnalyticsTopEventGroupSchema(Schema):
top_items: list[AnalyticsTopEventSchema]
other_count: int = 0
total_count: int = 0
class EventAnalyticsSchema(Schema):
filters: EventAnalyticsFiltersSchema
summary: EventAnalyticsSummarySchema
registration_status: list[AnalyticsRegistrationStatusSchema]
payment_status: list[AnalyticsRegistrationStatusSchema]
attendee_by_major: AnalyticsPointGroupSchema
attendee_by_university: AnalyticsPointGroupSchema
registration_trend: list[AnalyticsTrendPointSchema]
revenue_trend: list[AnalyticsTrendPointSchema]
revenue_by_event: AnalyticsPointGroupSchema
top_events: AnalyticsTopEventGroupSchema
class BlogAnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
class BlogAnalyticsSummarySchema(Schema):
published_posts: int
total_likes: int
total_saves: int
total_comments: int
community_engagement: int
class BlogAnalyticsSchema(Schema):
filters: BlogAnalyticsFiltersSchema
summary: BlogAnalyticsSummarySchema
activity_trend: list[dict[str, int | str]]
post_popularity: AnalyticsPostPopularityGroupSchema
top_posts: list[AnalyticsTopPostSchema]
by_category: AnalyticsPointGroupSchema
by_tag: AnalyticsPointGroupSchema

843
apps/analytics/api/views.py Normal file
View File

@@ -0,0 +1,843 @@
from __future__ import annotations
from datetime import datetime, time
from typing import Literal
from django.contrib.auth import get_user_model
from django.db.models import Count, Q, Sum
from django.db.models.functions import Coalesce, TruncDay, TruncMonth, TruncWeek
from django.shortcuts import get_object_or_404
from django.utils import timezone
from django.utils.dateparse import parse_date, parse_datetime
from ninja import Router
from ninja.errors import HttpError
from apps.analytics.api.schemas import (
AdminDashboardAnalyticsSchema,
AnalyticsEventOptionsSchema,
BlogAnalyticsSchema,
EventAnalyticsSchema,
UserAnalyticsSchema,
)
from apps.blog.models import Category, Comment, Like, Post, SavedPost, Tag
from apps.events.models import Event, Registration
from apps.payments.models import Payment
from core.authentication import jwt_auth
analytics_router = Router()
UNKNOWN_LABEL = "نامشخص"
TOP_ITEMS_LIMIT = 12
PARTICIPANT_STATUSES = [
Registration.StatusChoices.CONFIRMED,
Registration.StatusChoices.ATTENDED,
]
GRANULARITY_TRUNC = {
"day": TruncDay,
"week": TruncWeek,
"month": TruncMonth,
}
REGISTRATION_STATUS_LABELS = {
Registration.StatusChoices.PENDING: "در انتظار",
Registration.StatusChoices.CONFIRMED: "تایید شده",
Registration.StatusChoices.CANCELLED: "لغو شده",
Registration.StatusChoices.ATTENDED: "حاضر شده",
}
PAYMENT_STATUS_LABELS = {
Payment.OrderStatusChoices.INIT: "ایجاد شده",
Payment.OrderStatusChoices.PENDING: "در انتظار پرداخت",
Payment.OrderStatusChoices.PAID: "پرداخت موفق",
Payment.OrderStatusChoices.FAILED: "پرداخت ناموفق",
Payment.OrderStatusChoices.CANCELED: "لغو شده",
}
def _parse_boundary(value: str | None, *, is_end: bool = False) -> datetime | None:
if not value:
return None
parsed_datetime = parse_datetime(value)
if parsed_datetime is None:
parsed_date = parse_date(value)
if parsed_date is None:
raise HttpError(400, "Invalid date filter.")
parsed_datetime = datetime.combine(parsed_date, time.max if is_end else time.min)
if timezone.is_naive(parsed_datetime):
parsed_datetime = timezone.make_aware(parsed_datetime, timezone.get_current_timezone())
return parsed_datetime
def _apply_range(queryset, field: str, start: datetime | None, end: datetime | None):
if start:
queryset = queryset.filter(**{f"{field}__gte": start})
if end:
queryset = queryset.filter(**{f"{field}__lte": end})
return queryset
def _auto_granularity(start: datetime | None, end: datetime | None) -> Literal["day", "week", "month"]:
if not start or not end:
return "month"
days = max((end - start).days, 1)
if days <= 45:
return "day"
if days <= 180:
return "week"
return "month"
def _label(value) -> str:
return str(value or UNKNOWN_LABEL)
def _normalize_entry_year(value) -> str:
if value in (None, ""):
return UNKNOWN_LABEL
raw = str(value).strip()
normalized = (
raw.translate(str.maketrans("۰۱۲۳۴۵۶۷۸۹٠١٢٣٤٥٦٧٨٩", "01234567890123456789"))
.replace(",", "")
.replace("٬", "")
)
try:
year = int(normalized)
except (TypeError, ValueError):
return UNKNOWN_LABEL
if 1390 <= year <= 1499:
return str(year)
if 400 <= year <= 499:
return str(1000 + year)
if 90 <= year <= 99:
return str(1300 + year)
return UNKNOWN_LABEL
def _point_group_from_rows(rows: list[dict], *, limit: int = TOP_ITEMS_LIMIT):
sorted_rows = sorted(rows, key=lambda item: (-int(item["value"] or 0), str(item["label"])))
other_rows = sorted_rows[limit:]
items = [
{"label": _label(item["label"]), "value": int(item["value"] or 0)}
for item in sorted_rows
]
return {
"items": items,
"top_items": items[:limit],
"other_count": len(other_rows),
"total_count": len(sorted_rows),
}
def _point_queryset(queryset, label_field: str, *, limit: int = 12):
return [
{"label": _label(item.get(label_field)), "value": item["value"]}
for item in queryset.values(label_field).annotate(value=Count("id")).order_by("-value", label_field)[:limit]
]
def _point_group_queryset(queryset, label_field: str, *, limit: int = TOP_ITEMS_LIMIT, sum_field: str | None = None):
if sum_field:
rows = list(
queryset.values(label_field)
.annotate(value=Coalesce(Sum(sum_field), 0))
.order_by("-value", label_field)
)
else:
rows = list(
queryset.values(label_field)
.annotate(value=Count("id"))
.order_by("-value", label_field)
)
total_count = len(rows)
other_rows = rows[limit:]
items = [
{"label": _label(item.get(label_field)), "value": int(item["value"] or 0)}
for item in rows
]
return {
"items": items,
"top_items": items[:limit],
"other_count": len(other_rows),
"total_count": total_count,
}
def _entry_year_group_queryset(queryset, *, limit: int = TOP_ITEMS_LIMIT):
buckets: dict[str, int] = {}
for item in queryset.values("year_of_study").annotate(value=Count("id")):
label = _normalize_entry_year(item.get("year_of_study"))
buckets[label] = buckets.get(label, 0) + int(item["value"] or 0)
return _point_group_from_rows(
[{"label": label, "value": value} for label, value in buckets.items()],
limit=limit,
)
def _trend_queryset(queryset, field: str, granularity: str):
trunc = GRANULARITY_TRUNC[granularity]
rows = (
queryset.annotate(bucket=trunc(field))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
)
return [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": item["value"],
}
for item in rows
]
def _trend_sum_queryset(queryset, field: str, sum_field: str, granularity: str):
trunc = GRANULARITY_TRUNC[granularity]
rows = (
queryset.annotate(bucket=trunc(field))
.values("bucket")
.annotate(value=Coalesce(Sum(sum_field), 0))
.order_by("bucket")
)
return [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": int(item["value"] or 0),
}
for item in rows
]
def _sum(queryset, field: str) -> int:
return int(queryset.aggregate(total=Coalesce(Sum(field), 0))["total"] or 0)
def _status_label(value) -> str:
try:
return PAYMENT_STATUS_LABELS.get(Payment.OrderStatusChoices(value), str(value))
except ValueError:
return str(value)
def _registration_status_label(value: str) -> str:
try:
return REGISTRATION_STATUS_LABELS.get(Registration.StatusChoices(value), value)
except ValueError:
return value
def _event_learning_hours(events_queryset) -> float:
learning_hours = 0.0
for event in events_queryset.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
)
):
duration = event.end_time - event.start_time
learning_hours += max(duration.total_seconds(), 0) / 3600 * event.attendees
return round(learning_hours, 1)
def _filters_payload(start: datetime | None, end: datetime | None, *, event_id: int | None = None, include_granularity: bool = False):
payload = {
"date_from": start.isoformat() if start else None,
"date_to": end.isoformat() if end else None,
}
if event_id is not None:
payload["event_id"] = event_id
if include_granularity:
payload["granularity"] = _auto_granularity(start, end)
return payload
def _require_staff(user):
if not (user and (user.is_staff or user.is_superuser)):
raise HttpError(403, "اجازه دسترسی ندارید.")
@analytics_router.get("/admin/events/options", response=AnalyticsEventOptionsSchema, auth=jwt_auth)
def admin_event_options(
request,
search: str | None = None,
limit: int = 20,
offset: int = 0,
):
_require_staff(request.auth)
safe_limit = max(1, min(limit, 50))
safe_offset = max(offset, 0)
queryset = Event.objects.filter(is_deleted=False)
if search:
queryset = queryset.filter(Q(title__icontains=search) | Q(slug__icontains=search))
queryset = queryset.order_by("-start_time", "-id")
count = queryset.count()
results = [
{
"value": str(event.id),
"label": event.title,
"description": event.start_time.date().isoformat() if event.start_time else None,
}
for event in queryset[safe_offset : safe_offset + safe_limit]
]
return {"count": count, "results": results}
@analytics_router.get("/admin/users", response=UserAnalyticsSchema, auth=jwt_auth)
def admin_users_analytics(request, date_from: str | None = None, date_to: str | None = None):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
User = get_user_model()
users_qs = _apply_range(User.objects.filter(is_deleted=False), "date_joined", start, end)
total_users = users_qs.count()
verified_users = users_qs.filter(is_mobile_verified=True).count()
completed_profiles = users_qs.exclude(first_name="").exclude(last_name="").filter(
mobile__isnull=False,
major__isnull=False,
university__isnull=False,
).count()
return {
"filters": _filters_payload(start, end, include_granularity=True),
"summary": {
"total_users": total_users,
"verified_users": verified_users,
"unverified_users": max(total_users - verified_users, 0),
"profile_completion_rate": round((completed_profiles / total_users) * 100, 1) if total_users else 0,
},
"signup_trend": _trend_queryset(users_qs, "date_joined", granularity),
"by_major": _point_group_queryset(users_qs, "major__name"),
"by_university": _point_group_queryset(users_qs, "university__name"),
"by_year": _entry_year_group_queryset(users_qs),
}
@analytics_router.get("/admin/events", response=EventAnalyticsSchema, auth=jwt_auth)
def admin_events_analytics(
request,
date_from: str | None = None,
date_to: str | None = None,
event_id: int | None = None,
):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
selected_event = None
if event_id is not None:
selected_event = get_object_or_404(Event, id=event_id, is_deleted=False)
events_qs = Event.objects.filter(is_deleted=False)
if selected_event:
events_qs = events_qs.filter(id=selected_event.id)
else:
events_qs = _apply_range(events_qs, "start_time", start, end)
registrations_qs = _apply_range(
Registration.objects.filter(is_deleted=False).select_related("user", "event", "user__major", "user__university"),
"registered_at",
start,
end,
)
if selected_event:
registrations_qs = registrations_qs.filter(event_id=selected_event.id)
participant_qs = registrations_qs.filter(status__in=PARTICIPANT_STATUSES)
paid_payments_qs = Payment.objects.filter(is_deleted=False, status=Payment.OrderStatusChoices.PAID, verified_at__isnull=False)
paid_payments_qs = _apply_range(paid_payments_qs.select_related("event"), "verified_at", start, end)
all_payments_qs = _apply_range(Payment.objects.filter(is_deleted=False), "created_at", start, end)
if selected_event:
paid_payments_qs = paid_payments_qs.filter(event_id=selected_event.id)
all_payments_qs = all_payments_qs.filter(event_id=selected_event.id)
registration_status = [
{"status": item["status"], "label": _registration_status_label(item["status"]), "value": item["value"]}
for item in registrations_qs.values("status").annotate(value=Count("id")).order_by("status")
]
payment_status = [
{"status": str(item["status"]), "label": _status_label(item["status"]), "value": item["value"]}
for item in all_payments_qs.values("status").annotate(value=Count("id")).order_by("status")
]
top_events_qs = Event.objects.filter(is_deleted=False)
if selected_event:
top_events_qs = top_events_qs.filter(id=selected_event.id)
top_events_annotated = list(
top_events_qs.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
),
revenue=Coalesce(
Sum(
"payments__amount",
filter=Q(payments__is_deleted=False, payments__status=Payment.OrderStatusChoices.PAID),
),
0,
),
).order_by("-attendees", "-revenue", "-start_time")
)
top_events = []
for event in top_events_annotated[:TOP_ITEMS_LIMIT]:
fill_rate = round((event.attendees / event.capacity) * 100, 1) if event.capacity else None
top_events.append(
{
"id": event.id,
"title": event.title,
"slug": event.slug,
"attendees": event.attendees,
"capacity": event.capacity,
"fill_rate": fill_rate,
"revenue": int(event.revenue or 0),
}
)
total_revenue = _sum(paid_payments_qs, "amount")
total_discount = _sum(paid_payments_qs, "discount_amount")
total_base = _sum(paid_payments_qs, "base_amount")
return {
"filters": _filters_payload(start, end, event_id=event_id),
"summary": {
"total_events": events_qs.count(),
"total_registrations": registrations_qs.count(),
"distinct_participants": participant_qs.values("user_id").distinct().count(),
"total_revenue": total_revenue,
"total_discount": total_discount,
"total_base": total_base,
"learning_hours": _event_learning_hours(events_qs),
},
"registration_status": registration_status,
"payment_status": payment_status,
"attendee_by_major": _point_group_queryset(participant_qs, "user__major__name"),
"attendee_by_university": _point_group_queryset(participant_qs, "user__university__name"),
"registration_trend": _trend_queryset(registrations_qs, "registered_at", granularity),
"revenue_trend": _trend_sum_queryset(paid_payments_qs, "verified_at", "amount", granularity),
"revenue_by_event": _point_group_queryset(paid_payments_qs, "event__title", sum_field="amount"),
"top_events": {
"top_items": top_events,
"other_count": max(len(top_events_annotated) - TOP_ITEMS_LIMIT, 0),
"total_count": len(top_events_annotated),
},
}
@analytics_router.get("/admin/blog", response=BlogAnalyticsSchema, auth=jwt_auth)
def admin_blog_analytics(request, date_from: str | None = None, date_to: str | None = None):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
published_posts_qs = _apply_range(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED),
"published_at",
start,
end,
)
visible_comments_qs = _apply_range(
Comment.objects.filter(
is_deleted=False,
is_hidden=False,
is_approved=True,
post__is_deleted=False,
post__status=Post.StatusChoices.PUBLISHED,
),
"created_at",
start,
end,
)
likes_qs = _apply_range(
Like.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
saves_qs = _apply_range(
SavedPost.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
like_filter = Q()
save_filter = Q()
comment_filter = Q(comments__is_deleted=False, comments__is_hidden=False, comments__is_approved=True)
if start:
like_filter &= Q(likes__created_at__gte=start)
save_filter &= Q(saves__created_at__gte=start)
comment_filter &= Q(comments__created_at__gte=start)
if end:
like_filter &= Q(likes__created_at__lte=end)
save_filter &= Q(saves__created_at__lte=end)
comment_filter &= Q(comments__created_at__lte=end)
post_popularity_all = list(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED)
.annotate(
likes_total=Count("likes", filter=like_filter, distinct=True),
saves_total=Count("saves", filter=save_filter, distinct=True),
comments_total=Count("comments", filter=comment_filter, distinct=True),
)
.filter(Q(likes_total__gt=0) | Q(saves_total__gt=0) | Q(comments_total__gt=0))
.order_by("-likes_total", "-saves_total", "-comments_total", "-published_at")
)
post_popularity = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_all[:TOP_ITEMS_LIMIT]
]
post_popularity_items = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_all
]
top_posts = [
{
**post,
"score": post["likes"] + post["saves"] + post["comments"],
}
for post in sorted(post_popularity, key=lambda item: item["likes"] + item["saves"] + item["comments"], reverse=True)
]
activity_buckets: dict[str, dict[str, int | str]] = {}
for key, qs in (("likes", likes_qs), ("saves", saves_qs), ("comments", visible_comments_qs)):
for item in (
qs.annotate(bucket=GRANULARITY_TRUNC[granularity]("created_at"))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
):
bucket = item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL
activity_buckets.setdefault(bucket, {"date": bucket, "likes": 0, "saves": 0, "comments": 0})
activity_buckets[bucket][key] = item["value"]
total_likes = likes_qs.count()
total_saves = saves_qs.count()
total_comments = visible_comments_qs.count()
return {
"filters": _filters_payload(start, end),
"summary": {
"published_posts": published_posts_qs.count(),
"total_likes": total_likes,
"total_saves": total_saves,
"total_comments": total_comments,
"community_engagement": total_likes + total_saves + total_comments,
},
"activity_trend": list(activity_buckets.values()),
"post_popularity": {
"items": post_popularity_items,
"top_items": post_popularity,
"other_count": max(len(post_popularity_all) - TOP_ITEMS_LIMIT, 0),
"total_count": len(post_popularity_all),
},
"top_posts": top_posts,
"by_category": _point_group_queryset(published_posts_qs, "category__name"),
"by_tag": _point_group_queryset(published_posts_qs.filter(tags__is_deleted=False), "tags__name"),
}
@analytics_router.get("/admin/dashboard", response=AdminDashboardAnalyticsSchema, auth=jwt_auth)
def admin_dashboard(
request,
date_from: str | None = None,
date_to: str | None = None,
event_id: int | None = None,
granularity: Literal["day", "week", "month", "auto"] = "auto",
):
user = request.auth
if not (user and (user.is_staff or user.is_superuser)):
raise HttpError(403, "اجازه دسترسی ندارید.")
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
selected_granularity = _auto_granularity(start, end) if granularity == "auto" else granularity
if event_id is not None:
get_object_or_404(Event, id=event_id, is_deleted=False)
User = get_user_model()
users_qs = _apply_range(User.objects.filter(is_deleted=False), "date_joined", start, end)
events_qs = _apply_range(Event.objects.filter(is_deleted=False), "start_time", start, end)
registrations_qs = _apply_range(
Registration.objects.filter(is_deleted=False).select_related("user", "event", "user__major", "user__university"),
"registered_at",
start,
end,
)
if event_id is not None:
registrations_qs = registrations_qs.filter(event_id=event_id)
paid_payments_qs = Payment.objects.filter(is_deleted=False, status=Payment.OrderStatusChoices.PAID, verified_at__isnull=False)
paid_payments_qs = _apply_range(paid_payments_qs.select_related("event"), "verified_at", start, end)
all_payments_qs = _apply_range(Payment.objects.filter(is_deleted=False), "created_at", start, end)
if event_id is not None:
paid_payments_qs = paid_payments_qs.filter(event_id=event_id)
all_payments_qs = all_payments_qs.filter(event_id=event_id)
published_posts_qs = _apply_range(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED),
"published_at",
start,
end,
)
visible_comments_qs = _apply_range(
Comment.objects.filter(
is_deleted=False,
is_hidden=False,
is_approved=True,
post__is_deleted=False,
post__status=Post.StatusChoices.PUBLISHED,
),
"created_at",
start,
end,
)
likes_qs = _apply_range(
Like.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
saves_qs = _apply_range(
SavedPost.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
participant_qs = registrations_qs.filter(status__in=PARTICIPANT_STATUSES)
registration_status = [
{"status": item["status"], "label": _registration_status_label(item["status"]), "value": item["value"]}
for item in registrations_qs.values("status").annotate(value=Count("id")).order_by("status")
]
payment_status = [
{"status": str(item["status"]), "label": _status_label(item["status"]), "value": item["value"]}
for item in all_payments_qs.values("status").annotate(value=Count("id")).order_by("status")
]
top_events_qs = Event.objects.filter(is_deleted=False)
if event_id is not None:
top_events_qs = top_events_qs.filter(id=event_id)
top_events = []
for event in (
top_events_qs.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
),
revenue=Coalesce(
Sum(
"payments__amount",
filter=Q(payments__is_deleted=False, payments__status=Payment.OrderStatusChoices.PAID),
),
0,
),
)
.order_by("-attendees", "-revenue", "-start_time")[:10]
):
fill_rate = round((event.attendees / event.capacity) * 100, 1) if event.capacity else None
top_events.append(
{
"id": event.id,
"title": event.title,
"slug": event.slug,
"attendees": event.attendees,
"capacity": event.capacity,
"fill_rate": fill_rate,
"revenue": int(event.revenue or 0),
}
)
revenue_trend = [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": int(item["value"] or 0),
}
for item in (
paid_payments_qs.annotate(bucket=GRANULARITY_TRUNC[selected_granularity]("verified_at"))
.values("bucket")
.annotate(value=Coalesce(Sum("amount"), 0))
.order_by("bucket")
)
]
revenue_by_event = [
{"label": _label(item["event__title"]), "value": int(item["value"] or 0)}
for item in (
paid_payments_qs.values("event__title")
.annotate(value=Coalesce(Sum("amount"), 0))
.order_by("-value", "event__title")[:10]
)
]
post_popularity_qs = (
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED)
.annotate(
likes_total=Count("likes", distinct=True),
saves_total=Count("saves", distinct=True),
comments_total=Count(
"comments",
filter=Q(comments__is_deleted=False, comments__is_hidden=False, comments__is_approved=True),
distinct=True,
),
)
.order_by("-likes_total", "-saves_total", "-comments_total")[:30]
)
post_popularity = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_qs
]
top_posts = [
{
**post,
"score": post["likes"] + post["saves"] + post["comments"],
}
for post in sorted(post_popularity, key=lambda item: item["likes"] + item["saves"] + item["comments"], reverse=True)[:10]
]
activity_buckets: dict[str, dict[str, int | str]] = {}
for key, qs in (("likes", likes_qs), ("saves", saves_qs), ("comments", visible_comments_qs)):
for item in (
qs.annotate(bucket=GRANULARITY_TRUNC[selected_granularity]("created_at"))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
):
bucket = item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL
activity_buckets.setdefault(bucket, {"date": bucket, "likes": 0, "saves": 0, "comments": 0})
activity_buckets[bucket][key] = item["value"]
category_engagement = [
{"label": _label(item["name"]), "value": item["value"]}
for item in (
Category.objects.filter(is_deleted=False, posts__status=Post.StatusChoices.PUBLISHED, posts__is_deleted=False)
.annotate(value=Count("posts", distinct=True))
.order_by("-value", "name")[:10]
.values("name", "value")
)
]
tag_engagement = [
{"label": _label(item["name"]), "value": item["value"]}
for item in (
Tag.objects.filter(is_deleted=False, posts__status=Post.StatusChoices.PUBLISHED, posts__is_deleted=False)
.annotate(value=Count("posts", distinct=True))
.order_by("-value", "name")[:10]
.values("name", "value")
)
]
learning_hours = 0.0
for event in Event.objects.filter(is_deleted=False).annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
)
):
duration = event.end_time - event.start_time
learning_hours += max(duration.total_seconds(), 0) / 3600 * event.attendees
total_revenue = _sum(paid_payments_qs, "amount")
total_discount = _sum(paid_payments_qs, "discount_amount")
total_base = _sum(paid_payments_qs, "base_amount")
total_likes = likes_qs.count()
total_saves = saves_qs.count()
total_comments = visible_comments_qs.count()
published_posts_count = published_posts_qs.count()
return {
"filters": {
"date_from": start.isoformat() if start else None,
"date_to": end.isoformat() if end else None,
"event_id": event_id,
"granularity": selected_granularity,
},
"summary": {
"total_users": users_qs.count(),
"verified_users": users_qs.filter(is_mobile_verified=True).count(),
"total_events": events_qs.count(),
"total_registrations": registrations_qs.count(),
"total_revenue": total_revenue,
"total_discount": total_discount,
"published_posts": published_posts_count,
"total_likes": total_likes,
"total_saves": total_saves,
"total_comments": total_comments,
},
"users": {
"signup_trend": _trend_queryset(users_qs, "date_joined", selected_granularity),
"by_major": _point_queryset(users_qs, "major__name"),
"by_university": _point_queryset(users_qs, "university__name"),
"by_year": _entry_year_group_queryset(users_qs)["top_items"],
},
"events": {
"registration_status": registration_status,
"by_major": _point_queryset(participant_qs, "user__major__name"),
"by_university": _point_queryset(participant_qs, "user__university__name"),
"top_events": top_events,
"registration_trend": _trend_queryset(registrations_qs, "registered_at", selected_granularity),
},
"revenue": {
"trend": revenue_trend,
"by_event": revenue_by_event,
"payment_status": payment_status,
"total_paid": total_revenue,
"total_discount": total_discount,
"total_base": total_base,
},
"blog": {
"totals": {
"posts": published_posts_count,
"likes": total_likes,
"saves": total_saves,
"comments": total_comments,
},
"post_popularity": post_popularity,
"top_posts": top_posts,
"activity_trend": list(activity_buckets.values()),
"by_category": category_engagement,
"by_tag": tag_engagement,
},
"achievements": {
"distinct_participants": participant_qs.values("user_id").distinct().count(),
"learning_hours": round(learning_hours, 1),
"published_content": published_posts_count,
"community_engagement": total_likes + total_saves + total_comments,
},
}

View File

@@ -138,6 +138,7 @@ class EventListSchema(Schema):
class EventCreateSchema(Schema):
"""Payload for creating events via the API."""
title: str
slug: Optional[str] = None
description: str
event_type: str
address: Optional[str] = None
@@ -150,11 +151,13 @@ class EventCreateSchema(Schema):
capacity: Optional[int] = None
price: Optional[float] = None
status: str = "draft"
registration_success_markdown: Optional[str] = None
gallery_image_ids: Optional[List[int]] = []
class EventUpdateSchema(Schema):
"""Payload for updating events via the API."""
title: Optional[str] = None
slug: Optional[str] = None
description: Optional[str] = None
event_type: Optional[str] = None
address: Optional[str] = None
@@ -167,6 +170,7 @@ class EventUpdateSchema(Schema):
capacity: Optional[int] = None
price: Optional[float] = None
status: Optional[str] = None
registration_success_markdown: Optional[str] = None
gallery_image_ids: Optional[List[int]] = None
class RegistrationSchema(ModelSchema):
@@ -199,12 +203,56 @@ class AdminUserSchema(Schema):
first_name: str
last_name: str
email: str
mobile: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
university: Optional[str] = None
major: Optional[str] = None
student_id: Optional[str] = None
year_of_study: Optional[int] = None
@staticmethod
def resolve_profile_picture(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
return request.build_absolute_uri(image.url) if hasattr(image, "url") else None
@staticmethod
def resolve_profile_picture_thumbnail_url(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
url = derivative_url(image, THUMBNAIL_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_profile_picture_preview_url(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
url = derivative_url(image, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_university(obj):
return obj.get_university_display()
@staticmethod
def resolve_major(obj):
return obj.get_major_display()
class PaymentAdminSchema(Schema):
id: int
authority: Optional[str]
ref_id: Optional[str]
card_pan: Optional[str]
card_hash: Optional[str]
status: int
status_label: str
base_amount: int
@@ -241,7 +289,7 @@ class EventAdminDetailSchema(EventSchema):
@staticmethod
def resolve_registrations(obj):
return obj.registrations.select_related("user").prefetch_related(
return obj.registrations.select_related("user", "user__university", "user__major").prefetch_related(
"payments__discount_code"
).order_by("-registered_at")

View File

@@ -1,18 +1,20 @@
from django.conf import settings
from django.core.files.base import ContentFile
from django.shortcuts import get_object_or_404
from django.db.models import Q, Case, When, IntegerField
from django.utils.text import slugify
from django.utils import timezone
from ninja import Router, Query
from ninja import File, Router, Query, UploadedFile
from ninja.errors import HttpError
from typing import List, Optional
from uuid import UUID
from uuid import UUID, uuid4
from apps.events.api.schemas import (
EventAdminDetailSchema,
EventBriefSchema,
EventCreateSchema,
EventGallerySchema,
EventListSchema,
EventSchema,
EventUpdateSchema,
@@ -26,6 +28,8 @@ from apps.events.api.schemas import (
)
from core.authentication import jwt_auth
from apps.events.models import Event, Registration
from apps.gallery.models import Gallery
from apps.gallery.tasks import process_uploaded_image
from apps.notifications.services import notify_user
from apps.payments.models import DiscountCode
from apps.users.tasks import send_critical_sms
@@ -34,6 +38,28 @@ from core.api.schemas import ErrorSchema, MessageSchema
events_router = Router()
def _is_staff_user(user) -> bool:
return bool(user and (user.is_staff or user.is_superuser))
def _staff_forbidden():
return 403, {"error": "اجازه دسترسی ندارید."}
def _save_uploaded_image(instance, field_name: str, file: UploadedFile, folder: str):
if not file.content_type or not file.content_type.startswith("image/"):
return False, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return False, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
getattr(instance, field_name).save(
f"{folder}/{uuid4().hex}.{extension}",
ContentFile(file.read()),
save=True,
)
return True, instance
def _frontend_event_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):
@@ -130,20 +156,32 @@ def get_event_by_slug(request, slug: str):
)
return event
@events_router.post("/", response=EventSchema)
@events_router.post("/", response={201: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def create_event(request, payload: EventCreateSchema):
"""Create a new event"""
gallery_image_ids = payload.dict().pop('gallery_image_ids', [])
event = Event.objects.create(**payload.dict(exclude={'gallery_image_ids'}))
if not _is_staff_user(request.auth):
return _staff_forbidden()
data = payload.dict(exclude={'gallery_image_ids'})
gallery_image_ids = payload.gallery_image_ids or []
if data.get("slug"):
data["slug"] = slugify(data["slug"])
event = Event(**data)
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids:
event.gallery_images.set(gallery_image_ids)
return event
return 201, event
@events_router.put("/{int:event_id}", response=EventSchema)
@events_router.put("/{int:event_id}", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_event(request, event_id: int, payload: EventUpdateSchema):
"""Update an existing event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
previous_state = {
"status": event.status,
@@ -158,12 +196,18 @@ def update_event(request, event_id: int, payload: EventUpdateSchema):
gallery_image_ids = update_data.pop('gallery_image_ids', None)
for attr, value in update_data.items():
if attr == "slug" and value:
value = slugify(value)
setattr(event, attr, value)
if 'title' in update_data:
if 'title' in update_data and not update_data.get("slug"):
event.slug = slugify(event.title)
event.save()
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids is not None:
event.gallery_images.set(gallery_image_ids)
@@ -196,14 +240,94 @@ def update_event(request, event_id: int, payload: EventUpdateSchema):
sms_kind="event_reschedule",
)
return event
return 200, event
@events_router.delete("/{int:event_id}", response=MessageSchema)
@events_router.delete("/{int:event_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event(request, event_id: int):
"""Soft delete an event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
event.delete()
return {"message": "Event deleted successfully"}
return 200, {"message": "Event deleted successfully"}
@events_router.post("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_featured_image(request, event_id: int, file: UploadedFile = File(...)):
"""Upload or replace the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
ok, result = _save_uploaded_image(event, "featured_image", file, "events/featured")
if not ok:
return 400, result
return 200, event
@events_router.delete("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_featured_image(request, event_id: int):
"""Remove the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if event.featured_image:
event.featured_image.delete(save=False)
event.featured_image = None
event.save(update_fields=["featured_image", "updated_at"])
return 200, event
@events_router.get("/{int:event_id}/gallery", response={200: List[EventGallerySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_event_gallery(request, event_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
return 200, event.gallery_images.filter(is_deleted=False).select_related("uploaded_by")
@events_router.post("/{int:event_id}/gallery", response={201: EventGallerySchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_gallery_image(
request,
event_id: int,
file: UploadedFile = File(...),
title: str | None = None,
alt_text: str | None = None,
):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if not file.content_type or not file.content_type.startswith("image/"):
return 400, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return 400, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
try:
gallery_item = Gallery.objects.create(
title=title or file.name,
description="",
uploaded_by=request.auth,
alt_text=alt_text or title or file.name,
is_public=True,
)
gallery_item._defer_image_processing = True
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
gallery_item.image.save(f"gallery/{uuid4().hex}.{extension}", ContentFile(file.read()))
event.gallery_images.add(gallery_item)
process_uploaded_image.delay(gallery_item.id)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, gallery_item
@events_router.delete("/{int:event_id}/gallery/{int:image_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_gallery_image(request, event_id: int, image_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
image = get_object_or_404(Gallery, id=image_id, is_deleted=False)
event.gallery_images.remove(image)
if not image.event_galleries.exclude(id=event.id).exists():
image.delete()
return 200, {"message": "Gallery image removed"}
# Registration endpoints
@events_router.get("/{int:event_id}/registrations", response=List[RegistrationSchema])
@@ -235,7 +359,7 @@ def list_event_registrations_admin(
event = get_object_or_404(Event, id=event_id, is_deleted=False)
qs = (
event.registrations.filter(is_deleted=False)
.select_related("user")
.select_related("user", "user__university", "user__major")
.prefetch_related("payments__discount_code")
.order_by("-registered_at")
)
@@ -259,6 +383,7 @@ def list_event_registrations_admin(
if search:
qs = qs.filter(
Q(user__username__icontains=search)
| Q(user__mobile__icontains=search)
| Q(user__email__icontains=search)
| Q(user__first_name__icontains=search)
| Q(user__last_name__icontains=search)

View File

@@ -37,6 +37,7 @@ class EventsAPIIntegrationTests(TestCase):
cls.user = User.objects.create_user(
username="event_user",
email="event.user@example.com",
mobile="09198000001",
password=cls.password,
)
cls.user.is_email_verified = True
@@ -45,6 +46,7 @@ class EventsAPIIntegrationTests(TestCase):
cls.staff = User.objects.create_user(
username="event_staff",
email="event.staff@example.com",
mobile="09198000002",
password=cls.password,
is_staff=True,
)
@@ -151,19 +153,21 @@ class EventsAPIIntegrationTests(TestCase):
"/api/events/",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(created.status_code, 200)
self.assertEqual(created.status_code, 201)
event_id = created.json()["id"]
updated = self.client.put(
f"/api/events/{event_id}",
data=json.dumps({"title": "Updated Event"}),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(updated.status_code, 200)
self.assertEqual(updated.json()["title"], "Updated Event")
deleted = self.client.delete(f"/api/events/{event_id}")
deleted = self.client.delete(f"/api/events/{event_id}", **self._auth_headers(self.staff_token))
self.assertEqual(deleted.status_code, 200)
def test_admin_detail_and_registration_list_requires_staff(self):
@@ -230,9 +234,10 @@ class EventsAPIIntegrationTests(TestCase):
"/api/events/",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
body = response.json()
self.assertEqual(response.status_code, 200)
self.assertEqual(response.status_code, 201)
self.assertTrue(body["gallery_images"])
updated = self.client.put(
@@ -244,6 +249,7 @@ class EventsAPIIntegrationTests(TestCase):
}
),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(updated.status_code, 200)
self.assertEqual(updated.json()["slug"], "gallery-event-updated")
@@ -370,7 +376,8 @@ class EventsAPIIntegrationTests(TestCase):
self.assertEqual(response.status_code, 400)
def _create_event_user(self, username, email):
user = User.objects.create_user(username=username, email=email, password=self.password)
suffix = str(abs(hash(username)) % 1_000_000).zfill(6)
user = User.objects.create_user(username=username, email=email, mobile=f"09190{suffix}", password=self.password)
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
user.major = self.user.major
@@ -468,6 +475,7 @@ class EventSchemasIntegrationTests(TestCase):
self.user = User.objects.create_user(
username="schema_user",
email="schema.user@example.com",
mobile="09198000003",
password=self.password,
)
self.user.is_email_verified = True

View File

@@ -1,4 +1,42 @@
from ninja import Schema
from datetime import datetime
class DiscountCodeSchema(Schema):
id: int
code: str
type: str
value: int
max_discount: int | None = None
is_active: bool
starts_at: datetime | None = None
ends_at: datetime | None = None
usage_limit_total: int | None = None
usage_limit_per_user: int | None = None
min_amount: int | None = None
applicable_event_ids: list[int]
usage_count: int = 0
created_at: datetime
updated_at: datetime
class PagedDiscountCodeSchema(Schema):
count: int
results: list[DiscountCodeSchema]
class DiscountCodeWriteSchema(Schema):
code: str
type: str = "percent"
value: int
max_discount: int | None = None
is_active: bool = True
starts_at: datetime | None = None
ends_at: datetime | None = None
usage_limit_total: int | None = None
usage_limit_per_user: int | None = None
min_amount: int | None = None
applicable_event_ids: list[int] = []
class CreatePaymentIn(Schema):

View File

@@ -1,8 +1,9 @@
from django.conf import settings
from django.shortcuts import redirect, get_object_or_404
from django.utils import timezone
from django.db.models import Count, Q
from ninja import Router
from ninja import Query, Router
from ninja.errors import HttpError
import requests
@@ -11,11 +12,140 @@ from apps.events.models import Event, Registration
from apps.notifications.services import notify_user
from apps.users.tasks import send_critical_sms
from core.authentication import jwt_auth
from apps.payments.api.schemas import CouponVerifyIn, CouponVerifyOut, CreatePaymentIn, CreatePaymentOut, PaymentDetailOut
from core.api.schemas import ErrorSchema, MessageSchema
from apps.payments.api.schemas import (
CouponVerifyIn,
CouponVerifyOut,
CreatePaymentIn,
CreatePaymentOut,
DiscountCodeSchema,
DiscountCodeWriteSchema,
PagedDiscountCodeSchema,
PaymentDetailOut,
)
payments_router = Router(tags=["Payments"])
def _staff_required(user):
return bool(user and (user.is_staff or user.is_superuser))
def _discount_payload(code: DiscountCode):
return {
"id": code.id,
"code": code.code,
"type": code.type,
"value": code.value,
"max_discount": code.max_discount,
"is_active": code.is_active,
"starts_at": code.starts_at,
"ends_at": code.ends_at,
"usage_limit_total": code.usage_limit_total,
"usage_limit_per_user": code.usage_limit_per_user,
"min_amount": code.min_amount,
"applicable_event_ids": list(code.applicable_events.values_list("id", flat=True)),
"usage_count": getattr(code, "usage_count", None) or code.payments.filter(
status__in=[Payment.OrderStatusChoices.PAID, Payment.OrderStatusChoices.PENDING]
).count(),
"created_at": code.created_at,
"updated_at": code.updated_at,
}
def _apply_discount_payload(instance: DiscountCode, payload: DiscountCodeWriteSchema):
data = payload.dict()
event_ids = data.pop("applicable_event_ids", [])
for field, value in data.items():
setattr(instance, field, value)
instance.code = instance.code.strip().upper()
instance.full_clean()
instance.save()
instance.applicable_events.set(Event.objects.filter(id__in=event_ids, is_deleted=False))
return instance
@payments_router.get("/admin/discount-codes", response={200: PagedDiscountCodeSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_discount_codes(
request,
search: str | None = Query(None),
is_active: bool | None = Query(None),
type: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
queryset = DiscountCode.objects.annotate(
usage_count=Count(
"payments",
filter=Q(
payments__status__in=[
Payment.OrderStatusChoices.PAID,
Payment.OrderStatusChoices.PENDING,
]
),
)
).prefetch_related("applicable_events").order_by("-created_at")
if search:
queryset = queryset.filter(code__icontains=search)
if is_active is not None:
queryset = queryset.filter(is_active=is_active)
if type:
queryset = queryset.filter(type=type)
count = queryset.count()
return 200, {"count": count, "results": [_discount_payload(item) for item in queryset[offset : offset + limit]]}
@payments_router.post("/admin/discount-codes", response={201: DiscountCodeSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_discount_code(request, payload: DiscountCodeWriteSchema):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
if DiscountCode.all_objects.filter(code=payload.code.strip().upper()).exists():
return 400, {"error": "Discount code already exists"}
try:
code = _apply_discount_payload(DiscountCode(), payload)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, _discount_payload(code)
@payments_router.put("/admin/discount-codes/{int:code_id}", response={200: DiscountCodeSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_discount_code(request, code_id: int, payload: DiscountCodeWriteSchema):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
code = get_object_or_404(DiscountCode, id=code_id, is_deleted=False)
normalized = payload.code.strip().upper()
if DiscountCode.all_objects.filter(code=normalized).exclude(id=code_id).exists():
return 400, {"error": "Discount code already exists"}
try:
code = _apply_discount_payload(code, payload)
except Exception as exc:
return 400, {"error": str(exc)}
return 200, _discount_payload(code)
@payments_router.delete("/admin/discount-codes/{int:code_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_discount_code(request, code_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete discount codes"}
code = get_object_or_404(DiscountCode, id=code_id, is_deleted=False)
code.delete()
return 200, {"message": "Discount code deleted"}
@payments_router.post("/admin/discount-codes/{int:code_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_discount_code(request, code_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore discount codes"}
try:
code = DiscountCode.deleted_objects.get(id=code_id)
except DiscountCode.DoesNotExist:
return 400, {"error": "Discount code not found"}
code.restore()
return 200, {"message": "Discount code restored"}
def _event_action_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):

View File

@@ -1,15 +1,209 @@
from ninja import Router
from django.db.models import Q
from ninja import Query, Router, Schema
from apps.users.models import Major, University
from core.api.schemas import ErrorSchema, MessageSchema
from core.authentication import jwt_auth
meta_router = Router(tags=['meta'])
@meta_router.get("/majors")
def list_majors(request):
majors = Major.objects.filter(is_deleted=False, is_active=True).order_by("name")
return [{"id": m.id, "code": m.code, "label": m.name} for m in majors]
@meta_router.get("/universities")
def list_universities(request):
universities = University.objects.filter(is_deleted=False, is_active=True).order_by("name")
return [{"id": u.id, "code": u.code, "label": u.name} for u in universities]
class MetaOptionSchema(Schema):
id: int
code: str
label: str
is_active: bool = True
user_count: int = 0
class PagedMetaOptionSchema(Schema):
count: int
results: list[MetaOptionSchema]
class MetaOptionWriteSchema(Schema):
code: str
name: str
is_active: bool = True
def _is_staff(user):
return bool(user and (user.is_staff or user.is_superuser))
def _option_payload(obj, user_count=0):
return {
"id": obj.id,
"code": obj.code,
"label": obj.name,
"is_active": obj.is_active,
"user_count": user_count,
}
def _list_options(model, search, limit, offset, active_only=True):
queryset = model.objects.filter(is_deleted=False).order_by("name")
if active_only:
queryset = queryset.filter(is_active=True)
if search:
queryset = queryset.filter(Q(code__icontains=search) | Q(name__icontains=search))
count = queryset.count()
return count, list(queryset[offset : offset + limit])
@meta_router.get("/majors", response=PagedMetaOptionSchema)
def list_majors(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
count, majors = _list_options(Major, search, limit, offset)
return {"count": count, "results": [_option_payload(m) for m in majors]}
@meta_router.get("/universities", response=PagedMetaOptionSchema)
def list_universities(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
count, universities = _list_options(University, search, limit, offset)
return {"count": count, "results": [_option_payload(u) for u in universities]}
@meta_router.get("/admin/majors", response={200: PagedMetaOptionSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_majors(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
count, majors = _list_options(Major, search, limit, offset, active_only=False)
return 200, {
"count": count,
"results": [_option_payload(m, m.users.filter(is_deleted=False).count()) for m in majors],
}
@meta_router.post("/admin/majors", response={201: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_major(request, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
if Major.all_objects.filter(code=payload.code).exists():
return 400, {"error": "Major code already exists"}
major = Major.objects.create(code=payload.code.strip(), name=payload.name.strip(), is_active=payload.is_active)
return 201, _option_payload(major)
@meta_router.put("/admin/majors/{int:item_id}", response={200: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_major(request, item_id: int, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
try:
major = Major.objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
conflict = Major.all_objects.filter(code=payload.code).exclude(id=item_id).exists()
if conflict:
return 400, {"error": "Major code already exists"}
major.code = payload.code.strip()
major.name = payload.name.strip()
major.is_active = payload.is_active
major.save(update_fields=["code", "name", "is_active", "updated_at"])
return 200, _option_payload(major, major.users.filter(is_deleted=False).count())
@meta_router.delete("/admin/majors/{int:item_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_major(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete majors"}
try:
major = Major.objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
major.delete()
return 200, {"message": "Major deleted"}
@meta_router.post("/admin/majors/{int:item_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_major(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore majors"}
try:
major = Major.deleted_objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
major.restore()
return 200, {"message": "Major restored"}
@meta_router.get("/admin/universities", response={200: PagedMetaOptionSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_universities(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
count, universities = _list_options(University, search, limit, offset, active_only=False)
return 200, {
"count": count,
"results": [_option_payload(u, u.users.filter(is_deleted=False).count()) for u in universities],
}
@meta_router.post("/admin/universities", response={201: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_university(request, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
if University.all_objects.filter(code=payload.code).exists():
return 400, {"error": "University code already exists"}
university = University.objects.create(code=payload.code.strip(), name=payload.name.strip(), is_active=payload.is_active)
return 201, _option_payload(university)
@meta_router.put("/admin/universities/{int:item_id}", response={200: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_university(request, item_id: int, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
try:
university = University.objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
conflict = University.all_objects.filter(code=payload.code).exclude(id=item_id).exists()
if conflict:
return 400, {"error": "University code already exists"}
university.code = payload.code.strip()
university.name = payload.name.strip()
university.is_active = payload.is_active
university.save(update_fields=["code", "name", "is_active", "updated_at"])
return 200, _option_payload(university, university.users.filter(is_deleted=False).count())
@meta_router.delete("/admin/universities/{int:item_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_university(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete universities"}
try:
university = University.objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
university.delete()
return 200, {"message": "University deleted"}
@meta_router.post("/admin/universities/{int:item_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_university(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore universities"}
try:
university = University.deleted_objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
university.restore()
return 200, {"message": "University restored"}

View File

@@ -178,6 +178,19 @@ class UserListSchema(ModelSchema):
major: Optional[str] = None
university: Optional[str] = None
mobile: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
student_id: Optional[str] = None
year_of_study: Optional[int] = None
bio: Optional[str] = None
is_email_verified: bool
is_mobile_verified: bool
is_deleted: bool
deleted_at: Optional[datetime] = None
can_access_blog_admin: bool
can_write_blog_posts: bool
can_review_blog_posts: bool
class Meta:
model = User
@@ -188,13 +201,19 @@ class UserListSchema(ModelSchema):
"mobile",
"first_name",
"last_name",
"student_id",
"year_of_study",
"bio",
"is_active",
"is_staff",
"is_superuser",
"date_joined",
"major",
"university",
"is_email_verified",
"is_mobile_verified",
"is_deleted",
"deleted_at",
]
@staticmethod
@@ -205,6 +224,37 @@ class UserListSchema(ModelSchema):
def resolve_university(obj):
return obj.get_university_display()
@staticmethod
def resolve_can_access_blog_admin(obj):
return can_access_blog_admin(obj)
@staticmethod
def resolve_can_write_blog_posts(obj):
return can_write_blog_posts(obj)
@staticmethod
def resolve_can_review_blog_posts(obj):
return can_review_blog_posts(obj)
@staticmethod
def resolve_profile_picture(obj, context):
request = context["request"]
if obj.profile_picture and hasattr(obj.profile_picture, "url"):
return request.build_absolute_uri(obj.profile_picture.url)
return None
@staticmethod
def resolve_profile_picture_thumbnail_url(obj, context):
request = context["request"]
url = derivative_url(obj.profile_picture, THUMBNAIL_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_profile_picture_preview_url(obj, context):
request = context["request"]
url = derivative_url(obj.profile_picture, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
class AuthorizationRoleSchema(Schema):
key: str

View File

@@ -533,6 +533,15 @@ def list_users(
return queryset[offset : offset + limit]
@auth_router.get("/users/{user_id}", response={200: UserProfileSchema, 403: ErrorSchema, 404: ErrorSchema}, auth=jwt_auth)
def get_user_detail(request, user_id: int):
user = request.auth
if not (user.is_staff or user.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
target = get_object_or_404(User, id=user_id)
return 200, target
@auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_authorization_roles(request):
if not _ensure_superuser(request.auth):

View File

@@ -1,5 +1,6 @@
from ninja import Router
from apps.analytics.api.views import analytics_router
from apps.blog.api.views import blog_router
from apps.certificates.api.views import certificates_router
from apps.communications.api.views import communications_router
@@ -12,6 +13,7 @@ from apps.users.api.views import auth_router
from core.api.views import health_router
router = Router()
router.add_router("analytics/", analytics_router, tags=["Analytics"])
router.add_router("auth/", auth_router, tags=["Authentication"])
router.add_router("blog/", blog_router, tags=["Blog"])
router.add_router("gallery/", gallery_router, tags=["Gallery"])