Compare commits

...

12 Commits

Author SHA1 Message Date
08cab3b815 feat(analytics): expose full dashboard result groups
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-15 17:33:30 +03:30
a326d2e31d fix(analytics): normalize dashboard entry years 2026-06-15 17:26:57 +03:30
38e7baef9c feat(analytics): split dashboard metrics by domain
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-15 16:17:48 +03:30
8669e99ca5 feat(analytics): add admin dashboard api
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-14 09:51:46 +03:30
c2abcd7b97 feat(payments): add discount code admin API
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-14 00:03:57 +03:30
bdc4fc1a49 feat(events): expand admin event management APIs 2026-06-14 00:03:42 +03:30
20e7a04e59 feat(users): add paginated admin metadata APIs 2026-06-14 00:03:27 +03:30
0151497385 feat(blog): add production taxonomy seed commands
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-13 00:18:05 +03:30
8b307196da feat(blog): expose review feedback
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-13 00:00:10 +03:30
690dc7b600 fix(blog): limit category nesting depth 2026-06-12 21:35:04 +03:30
9acab4af2c feat(users): add authorization management APIs 2026-06-12 15:08:19 +03:30
7cbc99a82f feat(blog): add admin taxonomy APIs 2026-06-12 15:08:07 +03:30
17 changed files with 2289 additions and 30 deletions

View File

View File

View File

@@ -0,0 +1,212 @@
from typing import Literal
from ninja import Schema
class AnalyticsPointSchema(Schema):
label: str
value: int | float
class AnalyticsPointGroupSchema(Schema):
items: list[AnalyticsPointSchema]
top_items: list[AnalyticsPointSchema]
other_count: int = 0
total_count: int = 0
class AnalyticsTrendPointSchema(Schema):
date: str
label: str
value: int | float
class AnalyticsRegistrationStatusSchema(Schema):
status: str
label: str
value: int
class AnalyticsTopEventSchema(Schema):
id: int
title: str
slug: str
attendees: int
capacity: int | None = None
fill_rate: float | None = None
revenue: int = 0
class AnalyticsPostPopularitySchema(Schema):
id: int
title: str
slug: str
likes: int
saves: int
comments: int
class AnalyticsPostPopularityGroupSchema(Schema):
items: list[AnalyticsPostPopularitySchema]
top_items: list[AnalyticsPostPopularitySchema]
other_count: int = 0
total_count: int = 0
class AnalyticsTopPostSchema(AnalyticsPostPopularitySchema):
score: int
class AnalyticsSummarySchema(Schema):
total_users: int
verified_users: int
total_events: int
total_registrations: int
total_revenue: int
total_discount: int
published_posts: int
total_likes: int
total_saves: int
total_comments: int
class AnalyticsUsersSchema(Schema):
signup_trend: list[AnalyticsTrendPointSchema]
by_major: list[AnalyticsPointSchema]
by_university: list[AnalyticsPointSchema]
by_year: list[AnalyticsPointSchema]
class AnalyticsEventsSchema(Schema):
registration_status: list[AnalyticsRegistrationStatusSchema]
by_major: list[AnalyticsPointSchema]
by_university: list[AnalyticsPointSchema]
top_events: list[AnalyticsTopEventSchema]
registration_trend: list[AnalyticsTrendPointSchema]
class AnalyticsRevenueSchema(Schema):
trend: list[AnalyticsTrendPointSchema]
by_event: list[AnalyticsPointSchema]
payment_status: list[AnalyticsRegistrationStatusSchema]
total_paid: int
total_discount: int
total_base: int
class AnalyticsBlogSchema(Schema):
totals: dict[str, int]
post_popularity: list[AnalyticsPostPopularitySchema]
top_posts: list[AnalyticsTopPostSchema]
activity_trend: list[dict[str, int | str]]
by_category: list[AnalyticsPointSchema]
by_tag: list[AnalyticsPointSchema]
class AnalyticsAchievementsSchema(Schema):
distinct_participants: int
learning_hours: float
published_content: int
community_engagement: int
class AnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
event_id: int | None = None
granularity: Literal["day", "week", "month"]
class AdminDashboardAnalyticsSchema(Schema):
filters: AnalyticsFiltersSchema
summary: AnalyticsSummarySchema
users: AnalyticsUsersSchema
events: AnalyticsEventsSchema
revenue: AnalyticsRevenueSchema
blog: AnalyticsBlogSchema
achievements: AnalyticsAchievementsSchema
class AnalyticsEventOptionSchema(Schema):
value: str
label: str
description: str | None = None
class AnalyticsEventOptionsSchema(Schema):
count: int
results: list[AnalyticsEventOptionSchema]
class UserAnalyticsSummarySchema(Schema):
total_users: int
verified_users: int
unverified_users: int
profile_completion_rate: float
class UserAnalyticsSchema(Schema):
filters: AnalyticsFiltersSchema
summary: UserAnalyticsSummarySchema
signup_trend: list[AnalyticsTrendPointSchema]
by_major: AnalyticsPointGroupSchema
by_university: AnalyticsPointGroupSchema
by_year: AnalyticsPointGroupSchema
class EventAnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
event_id: int | None = None
class EventAnalyticsSummarySchema(Schema):
total_events: int
total_registrations: int
distinct_participants: int
total_revenue: int
total_discount: int
total_base: int
learning_hours: float
class AnalyticsTopEventGroupSchema(Schema):
top_items: list[AnalyticsTopEventSchema]
other_count: int = 0
total_count: int = 0
class EventAnalyticsSchema(Schema):
filters: EventAnalyticsFiltersSchema
summary: EventAnalyticsSummarySchema
registration_status: list[AnalyticsRegistrationStatusSchema]
payment_status: list[AnalyticsRegistrationStatusSchema]
attendee_by_major: AnalyticsPointGroupSchema
attendee_by_university: AnalyticsPointGroupSchema
registration_trend: list[AnalyticsTrendPointSchema]
revenue_trend: list[AnalyticsTrendPointSchema]
revenue_by_event: AnalyticsPointGroupSchema
top_events: AnalyticsTopEventGroupSchema
class BlogAnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
class BlogAnalyticsSummarySchema(Schema):
published_posts: int
total_likes: int
total_saves: int
total_comments: int
community_engagement: int
class BlogAnalyticsSchema(Schema):
filters: BlogAnalyticsFiltersSchema
summary: BlogAnalyticsSummarySchema
activity_trend: list[dict[str, int | str]]
post_popularity: AnalyticsPostPopularityGroupSchema
top_posts: list[AnalyticsTopPostSchema]
by_category: AnalyticsPointGroupSchema
by_tag: AnalyticsPointGroupSchema

843
apps/analytics/api/views.py Normal file
View File

@@ -0,0 +1,843 @@
from __future__ import annotations
from datetime import datetime, time
from typing import Literal
from django.contrib.auth import get_user_model
from django.db.models import Count, Q, Sum
from django.db.models.functions import Coalesce, TruncDay, TruncMonth, TruncWeek
from django.shortcuts import get_object_or_404
from django.utils import timezone
from django.utils.dateparse import parse_date, parse_datetime
from ninja import Router
from ninja.errors import HttpError
from apps.analytics.api.schemas import (
AdminDashboardAnalyticsSchema,
AnalyticsEventOptionsSchema,
BlogAnalyticsSchema,
EventAnalyticsSchema,
UserAnalyticsSchema,
)
from apps.blog.models import Category, Comment, Like, Post, SavedPost, Tag
from apps.events.models import Event, Registration
from apps.payments.models import Payment
from core.authentication import jwt_auth
analytics_router = Router()
UNKNOWN_LABEL = "نامشخص"
TOP_ITEMS_LIMIT = 12
PARTICIPANT_STATUSES = [
Registration.StatusChoices.CONFIRMED,
Registration.StatusChoices.ATTENDED,
]
GRANULARITY_TRUNC = {
"day": TruncDay,
"week": TruncWeek,
"month": TruncMonth,
}
REGISTRATION_STATUS_LABELS = {
Registration.StatusChoices.PENDING: "در انتظار",
Registration.StatusChoices.CONFIRMED: "تایید شده",
Registration.StatusChoices.CANCELLED: "لغو شده",
Registration.StatusChoices.ATTENDED: "حاضر شده",
}
PAYMENT_STATUS_LABELS = {
Payment.OrderStatusChoices.INIT: "ایجاد شده",
Payment.OrderStatusChoices.PENDING: "در انتظار پرداخت",
Payment.OrderStatusChoices.PAID: "پرداخت موفق",
Payment.OrderStatusChoices.FAILED: "پرداخت ناموفق",
Payment.OrderStatusChoices.CANCELED: "لغو شده",
}
def _parse_boundary(value: str | None, *, is_end: bool = False) -> datetime | None:
if not value:
return None
parsed_datetime = parse_datetime(value)
if parsed_datetime is None:
parsed_date = parse_date(value)
if parsed_date is None:
raise HttpError(400, "Invalid date filter.")
parsed_datetime = datetime.combine(parsed_date, time.max if is_end else time.min)
if timezone.is_naive(parsed_datetime):
parsed_datetime = timezone.make_aware(parsed_datetime, timezone.get_current_timezone())
return parsed_datetime
def _apply_range(queryset, field: str, start: datetime | None, end: datetime | None):
if start:
queryset = queryset.filter(**{f"{field}__gte": start})
if end:
queryset = queryset.filter(**{f"{field}__lte": end})
return queryset
def _auto_granularity(start: datetime | None, end: datetime | None) -> Literal["day", "week", "month"]:
if not start or not end:
return "month"
days = max((end - start).days, 1)
if days <= 45:
return "day"
if days <= 180:
return "week"
return "month"
def _label(value) -> str:
return str(value or UNKNOWN_LABEL)
def _normalize_entry_year(value) -> str:
if value in (None, ""):
return UNKNOWN_LABEL
raw = str(value).strip()
normalized = (
raw.translate(str.maketrans("۰۱۲۳۴۵۶۷۸۹٠١٢٣٤٥٦٧٨٩", "01234567890123456789"))
.replace(",", "")
.replace("٬", "")
)
try:
year = int(normalized)
except (TypeError, ValueError):
return UNKNOWN_LABEL
if 1390 <= year <= 1499:
return str(year)
if 400 <= year <= 499:
return str(1000 + year)
if 90 <= year <= 99:
return str(1300 + year)
return UNKNOWN_LABEL
def _point_group_from_rows(rows: list[dict], *, limit: int = TOP_ITEMS_LIMIT):
sorted_rows = sorted(rows, key=lambda item: (-int(item["value"] or 0), str(item["label"])))
other_rows = sorted_rows[limit:]
items = [
{"label": _label(item["label"]), "value": int(item["value"] or 0)}
for item in sorted_rows
]
return {
"items": items,
"top_items": items[:limit],
"other_count": len(other_rows),
"total_count": len(sorted_rows),
}
def _point_queryset(queryset, label_field: str, *, limit: int = 12):
return [
{"label": _label(item.get(label_field)), "value": item["value"]}
for item in queryset.values(label_field).annotate(value=Count("id")).order_by("-value", label_field)[:limit]
]
def _point_group_queryset(queryset, label_field: str, *, limit: int = TOP_ITEMS_LIMIT, sum_field: str | None = None):
if sum_field:
rows = list(
queryset.values(label_field)
.annotate(value=Coalesce(Sum(sum_field), 0))
.order_by("-value", label_field)
)
else:
rows = list(
queryset.values(label_field)
.annotate(value=Count("id"))
.order_by("-value", label_field)
)
total_count = len(rows)
other_rows = rows[limit:]
items = [
{"label": _label(item.get(label_field)), "value": int(item["value"] or 0)}
for item in rows
]
return {
"items": items,
"top_items": items[:limit],
"other_count": len(other_rows),
"total_count": total_count,
}
def _entry_year_group_queryset(queryset, *, limit: int = TOP_ITEMS_LIMIT):
buckets: dict[str, int] = {}
for item in queryset.values("year_of_study").annotate(value=Count("id")):
label = _normalize_entry_year(item.get("year_of_study"))
buckets[label] = buckets.get(label, 0) + int(item["value"] or 0)
return _point_group_from_rows(
[{"label": label, "value": value} for label, value in buckets.items()],
limit=limit,
)
def _trend_queryset(queryset, field: str, granularity: str):
trunc = GRANULARITY_TRUNC[granularity]
rows = (
queryset.annotate(bucket=trunc(field))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
)
return [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": item["value"],
}
for item in rows
]
def _trend_sum_queryset(queryset, field: str, sum_field: str, granularity: str):
trunc = GRANULARITY_TRUNC[granularity]
rows = (
queryset.annotate(bucket=trunc(field))
.values("bucket")
.annotate(value=Coalesce(Sum(sum_field), 0))
.order_by("bucket")
)
return [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": int(item["value"] or 0),
}
for item in rows
]
def _sum(queryset, field: str) -> int:
return int(queryset.aggregate(total=Coalesce(Sum(field), 0))["total"] or 0)
def _status_label(value) -> str:
try:
return PAYMENT_STATUS_LABELS.get(Payment.OrderStatusChoices(value), str(value))
except ValueError:
return str(value)
def _registration_status_label(value: str) -> str:
try:
return REGISTRATION_STATUS_LABELS.get(Registration.StatusChoices(value), value)
except ValueError:
return value
def _event_learning_hours(events_queryset) -> float:
learning_hours = 0.0
for event in events_queryset.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
)
):
duration = event.end_time - event.start_time
learning_hours += max(duration.total_seconds(), 0) / 3600 * event.attendees
return round(learning_hours, 1)
def _filters_payload(start: datetime | None, end: datetime | None, *, event_id: int | None = None, include_granularity: bool = False):
payload = {
"date_from": start.isoformat() if start else None,
"date_to": end.isoformat() if end else None,
}
if event_id is not None:
payload["event_id"] = event_id
if include_granularity:
payload["granularity"] = _auto_granularity(start, end)
return payload
def _require_staff(user):
if not (user and (user.is_staff or user.is_superuser)):
raise HttpError(403, "اجازه دسترسی ندارید.")
@analytics_router.get("/admin/events/options", response=AnalyticsEventOptionsSchema, auth=jwt_auth)
def admin_event_options(
request,
search: str | None = None,
limit: int = 20,
offset: int = 0,
):
_require_staff(request.auth)
safe_limit = max(1, min(limit, 50))
safe_offset = max(offset, 0)
queryset = Event.objects.filter(is_deleted=False)
if search:
queryset = queryset.filter(Q(title__icontains=search) | Q(slug__icontains=search))
queryset = queryset.order_by("-start_time", "-id")
count = queryset.count()
results = [
{
"value": str(event.id),
"label": event.title,
"description": event.start_time.date().isoformat() if event.start_time else None,
}
for event in queryset[safe_offset : safe_offset + safe_limit]
]
return {"count": count, "results": results}
@analytics_router.get("/admin/users", response=UserAnalyticsSchema, auth=jwt_auth)
def admin_users_analytics(request, date_from: str | None = None, date_to: str | None = None):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
User = get_user_model()
users_qs = _apply_range(User.objects.filter(is_deleted=False), "date_joined", start, end)
total_users = users_qs.count()
verified_users = users_qs.filter(is_mobile_verified=True).count()
completed_profiles = users_qs.exclude(first_name="").exclude(last_name="").filter(
mobile__isnull=False,
major__isnull=False,
university__isnull=False,
).count()
return {
"filters": _filters_payload(start, end, include_granularity=True),
"summary": {
"total_users": total_users,
"verified_users": verified_users,
"unverified_users": max(total_users - verified_users, 0),
"profile_completion_rate": round((completed_profiles / total_users) * 100, 1) if total_users else 0,
},
"signup_trend": _trend_queryset(users_qs, "date_joined", granularity),
"by_major": _point_group_queryset(users_qs, "major__name"),
"by_university": _point_group_queryset(users_qs, "university__name"),
"by_year": _entry_year_group_queryset(users_qs),
}
@analytics_router.get("/admin/events", response=EventAnalyticsSchema, auth=jwt_auth)
def admin_events_analytics(
request,
date_from: str | None = None,
date_to: str | None = None,
event_id: int | None = None,
):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
selected_event = None
if event_id is not None:
selected_event = get_object_or_404(Event, id=event_id, is_deleted=False)
events_qs = Event.objects.filter(is_deleted=False)
if selected_event:
events_qs = events_qs.filter(id=selected_event.id)
else:
events_qs = _apply_range(events_qs, "start_time", start, end)
registrations_qs = _apply_range(
Registration.objects.filter(is_deleted=False).select_related("user", "event", "user__major", "user__university"),
"registered_at",
start,
end,
)
if selected_event:
registrations_qs = registrations_qs.filter(event_id=selected_event.id)
participant_qs = registrations_qs.filter(status__in=PARTICIPANT_STATUSES)
paid_payments_qs = Payment.objects.filter(is_deleted=False, status=Payment.OrderStatusChoices.PAID, verified_at__isnull=False)
paid_payments_qs = _apply_range(paid_payments_qs.select_related("event"), "verified_at", start, end)
all_payments_qs = _apply_range(Payment.objects.filter(is_deleted=False), "created_at", start, end)
if selected_event:
paid_payments_qs = paid_payments_qs.filter(event_id=selected_event.id)
all_payments_qs = all_payments_qs.filter(event_id=selected_event.id)
registration_status = [
{"status": item["status"], "label": _registration_status_label(item["status"]), "value": item["value"]}
for item in registrations_qs.values("status").annotate(value=Count("id")).order_by("status")
]
payment_status = [
{"status": str(item["status"]), "label": _status_label(item["status"]), "value": item["value"]}
for item in all_payments_qs.values("status").annotate(value=Count("id")).order_by("status")
]
top_events_qs = Event.objects.filter(is_deleted=False)
if selected_event:
top_events_qs = top_events_qs.filter(id=selected_event.id)
top_events_annotated = list(
top_events_qs.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
),
revenue=Coalesce(
Sum(
"payments__amount",
filter=Q(payments__is_deleted=False, payments__status=Payment.OrderStatusChoices.PAID),
),
0,
),
).order_by("-attendees", "-revenue", "-start_time")
)
top_events = []
for event in top_events_annotated[:TOP_ITEMS_LIMIT]:
fill_rate = round((event.attendees / event.capacity) * 100, 1) if event.capacity else None
top_events.append(
{
"id": event.id,
"title": event.title,
"slug": event.slug,
"attendees": event.attendees,
"capacity": event.capacity,
"fill_rate": fill_rate,
"revenue": int(event.revenue or 0),
}
)
total_revenue = _sum(paid_payments_qs, "amount")
total_discount = _sum(paid_payments_qs, "discount_amount")
total_base = _sum(paid_payments_qs, "base_amount")
return {
"filters": _filters_payload(start, end, event_id=event_id),
"summary": {
"total_events": events_qs.count(),
"total_registrations": registrations_qs.count(),
"distinct_participants": participant_qs.values("user_id").distinct().count(),
"total_revenue": total_revenue,
"total_discount": total_discount,
"total_base": total_base,
"learning_hours": _event_learning_hours(events_qs),
},
"registration_status": registration_status,
"payment_status": payment_status,
"attendee_by_major": _point_group_queryset(participant_qs, "user__major__name"),
"attendee_by_university": _point_group_queryset(participant_qs, "user__university__name"),
"registration_trend": _trend_queryset(registrations_qs, "registered_at", granularity),
"revenue_trend": _trend_sum_queryset(paid_payments_qs, "verified_at", "amount", granularity),
"revenue_by_event": _point_group_queryset(paid_payments_qs, "event__title", sum_field="amount"),
"top_events": {
"top_items": top_events,
"other_count": max(len(top_events_annotated) - TOP_ITEMS_LIMIT, 0),
"total_count": len(top_events_annotated),
},
}
@analytics_router.get("/admin/blog", response=BlogAnalyticsSchema, auth=jwt_auth)
def admin_blog_analytics(request, date_from: str | None = None, date_to: str | None = None):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
published_posts_qs = _apply_range(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED),
"published_at",
start,
end,
)
visible_comments_qs = _apply_range(
Comment.objects.filter(
is_deleted=False,
is_hidden=False,
is_approved=True,
post__is_deleted=False,
post__status=Post.StatusChoices.PUBLISHED,
),
"created_at",
start,
end,
)
likes_qs = _apply_range(
Like.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
saves_qs = _apply_range(
SavedPost.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
like_filter = Q()
save_filter = Q()
comment_filter = Q(comments__is_deleted=False, comments__is_hidden=False, comments__is_approved=True)
if start:
like_filter &= Q(likes__created_at__gte=start)
save_filter &= Q(saves__created_at__gte=start)
comment_filter &= Q(comments__created_at__gte=start)
if end:
like_filter &= Q(likes__created_at__lte=end)
save_filter &= Q(saves__created_at__lte=end)
comment_filter &= Q(comments__created_at__lte=end)
post_popularity_all = list(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED)
.annotate(
likes_total=Count("likes", filter=like_filter, distinct=True),
saves_total=Count("saves", filter=save_filter, distinct=True),
comments_total=Count("comments", filter=comment_filter, distinct=True),
)
.filter(Q(likes_total__gt=0) | Q(saves_total__gt=0) | Q(comments_total__gt=0))
.order_by("-likes_total", "-saves_total", "-comments_total", "-published_at")
)
post_popularity = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_all[:TOP_ITEMS_LIMIT]
]
post_popularity_items = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_all
]
top_posts = [
{
**post,
"score": post["likes"] + post["saves"] + post["comments"],
}
for post in sorted(post_popularity, key=lambda item: item["likes"] + item["saves"] + item["comments"], reverse=True)
]
activity_buckets: dict[str, dict[str, int | str]] = {}
for key, qs in (("likes", likes_qs), ("saves", saves_qs), ("comments", visible_comments_qs)):
for item in (
qs.annotate(bucket=GRANULARITY_TRUNC[granularity]("created_at"))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
):
bucket = item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL
activity_buckets.setdefault(bucket, {"date": bucket, "likes": 0, "saves": 0, "comments": 0})
activity_buckets[bucket][key] = item["value"]
total_likes = likes_qs.count()
total_saves = saves_qs.count()
total_comments = visible_comments_qs.count()
return {
"filters": _filters_payload(start, end),
"summary": {
"published_posts": published_posts_qs.count(),
"total_likes": total_likes,
"total_saves": total_saves,
"total_comments": total_comments,
"community_engagement": total_likes + total_saves + total_comments,
},
"activity_trend": list(activity_buckets.values()),
"post_popularity": {
"items": post_popularity_items,
"top_items": post_popularity,
"other_count": max(len(post_popularity_all) - TOP_ITEMS_LIMIT, 0),
"total_count": len(post_popularity_all),
},
"top_posts": top_posts,
"by_category": _point_group_queryset(published_posts_qs, "category__name"),
"by_tag": _point_group_queryset(published_posts_qs.filter(tags__is_deleted=False), "tags__name"),
}
@analytics_router.get("/admin/dashboard", response=AdminDashboardAnalyticsSchema, auth=jwt_auth)
def admin_dashboard(
request,
date_from: str | None = None,
date_to: str | None = None,
event_id: int | None = None,
granularity: Literal["day", "week", "month", "auto"] = "auto",
):
user = request.auth
if not (user and (user.is_staff or user.is_superuser)):
raise HttpError(403, "اجازه دسترسی ندارید.")
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
selected_granularity = _auto_granularity(start, end) if granularity == "auto" else granularity
if event_id is not None:
get_object_or_404(Event, id=event_id, is_deleted=False)
User = get_user_model()
users_qs = _apply_range(User.objects.filter(is_deleted=False), "date_joined", start, end)
events_qs = _apply_range(Event.objects.filter(is_deleted=False), "start_time", start, end)
registrations_qs = _apply_range(
Registration.objects.filter(is_deleted=False).select_related("user", "event", "user__major", "user__university"),
"registered_at",
start,
end,
)
if event_id is not None:
registrations_qs = registrations_qs.filter(event_id=event_id)
paid_payments_qs = Payment.objects.filter(is_deleted=False, status=Payment.OrderStatusChoices.PAID, verified_at__isnull=False)
paid_payments_qs = _apply_range(paid_payments_qs.select_related("event"), "verified_at", start, end)
all_payments_qs = _apply_range(Payment.objects.filter(is_deleted=False), "created_at", start, end)
if event_id is not None:
paid_payments_qs = paid_payments_qs.filter(event_id=event_id)
all_payments_qs = all_payments_qs.filter(event_id=event_id)
published_posts_qs = _apply_range(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED),
"published_at",
start,
end,
)
visible_comments_qs = _apply_range(
Comment.objects.filter(
is_deleted=False,
is_hidden=False,
is_approved=True,
post__is_deleted=False,
post__status=Post.StatusChoices.PUBLISHED,
),
"created_at",
start,
end,
)
likes_qs = _apply_range(
Like.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
saves_qs = _apply_range(
SavedPost.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
participant_qs = registrations_qs.filter(status__in=PARTICIPANT_STATUSES)
registration_status = [
{"status": item["status"], "label": _registration_status_label(item["status"]), "value": item["value"]}
for item in registrations_qs.values("status").annotate(value=Count("id")).order_by("status")
]
payment_status = [
{"status": str(item["status"]), "label": _status_label(item["status"]), "value": item["value"]}
for item in all_payments_qs.values("status").annotate(value=Count("id")).order_by("status")
]
top_events_qs = Event.objects.filter(is_deleted=False)
if event_id is not None:
top_events_qs = top_events_qs.filter(id=event_id)
top_events = []
for event in (
top_events_qs.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
),
revenue=Coalesce(
Sum(
"payments__amount",
filter=Q(payments__is_deleted=False, payments__status=Payment.OrderStatusChoices.PAID),
),
0,
),
)
.order_by("-attendees", "-revenue", "-start_time")[:10]
):
fill_rate = round((event.attendees / event.capacity) * 100, 1) if event.capacity else None
top_events.append(
{
"id": event.id,
"title": event.title,
"slug": event.slug,
"attendees": event.attendees,
"capacity": event.capacity,
"fill_rate": fill_rate,
"revenue": int(event.revenue or 0),
}
)
revenue_trend = [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": int(item["value"] or 0),
}
for item in (
paid_payments_qs.annotate(bucket=GRANULARITY_TRUNC[selected_granularity]("verified_at"))
.values("bucket")
.annotate(value=Coalesce(Sum("amount"), 0))
.order_by("bucket")
)
]
revenue_by_event = [
{"label": _label(item["event__title"]), "value": int(item["value"] or 0)}
for item in (
paid_payments_qs.values("event__title")
.annotate(value=Coalesce(Sum("amount"), 0))
.order_by("-value", "event__title")[:10]
)
]
post_popularity_qs = (
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED)
.annotate(
likes_total=Count("likes", distinct=True),
saves_total=Count("saves", distinct=True),
comments_total=Count(
"comments",
filter=Q(comments__is_deleted=False, comments__is_hidden=False, comments__is_approved=True),
distinct=True,
),
)
.order_by("-likes_total", "-saves_total", "-comments_total")[:30]
)
post_popularity = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_qs
]
top_posts = [
{
**post,
"score": post["likes"] + post["saves"] + post["comments"],
}
for post in sorted(post_popularity, key=lambda item: item["likes"] + item["saves"] + item["comments"], reverse=True)[:10]
]
activity_buckets: dict[str, dict[str, int | str]] = {}
for key, qs in (("likes", likes_qs), ("saves", saves_qs), ("comments", visible_comments_qs)):
for item in (
qs.annotate(bucket=GRANULARITY_TRUNC[selected_granularity]("created_at"))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
):
bucket = item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL
activity_buckets.setdefault(bucket, {"date": bucket, "likes": 0, "saves": 0, "comments": 0})
activity_buckets[bucket][key] = item["value"]
category_engagement = [
{"label": _label(item["name"]), "value": item["value"]}
for item in (
Category.objects.filter(is_deleted=False, posts__status=Post.StatusChoices.PUBLISHED, posts__is_deleted=False)
.annotate(value=Count("posts", distinct=True))
.order_by("-value", "name")[:10]
.values("name", "value")
)
]
tag_engagement = [
{"label": _label(item["name"]), "value": item["value"]}
for item in (
Tag.objects.filter(is_deleted=False, posts__status=Post.StatusChoices.PUBLISHED, posts__is_deleted=False)
.annotate(value=Count("posts", distinct=True))
.order_by("-value", "name")[:10]
.values("name", "value")
)
]
learning_hours = 0.0
for event in Event.objects.filter(is_deleted=False).annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
)
):
duration = event.end_time - event.start_time
learning_hours += max(duration.total_seconds(), 0) / 3600 * event.attendees
total_revenue = _sum(paid_payments_qs, "amount")
total_discount = _sum(paid_payments_qs, "discount_amount")
total_base = _sum(paid_payments_qs, "base_amount")
total_likes = likes_qs.count()
total_saves = saves_qs.count()
total_comments = visible_comments_qs.count()
published_posts_count = published_posts_qs.count()
return {
"filters": {
"date_from": start.isoformat() if start else None,
"date_to": end.isoformat() if end else None,
"event_id": event_id,
"granularity": selected_granularity,
},
"summary": {
"total_users": users_qs.count(),
"verified_users": users_qs.filter(is_mobile_verified=True).count(),
"total_events": events_qs.count(),
"total_registrations": registrations_qs.count(),
"total_revenue": total_revenue,
"total_discount": total_discount,
"published_posts": published_posts_count,
"total_likes": total_likes,
"total_saves": total_saves,
"total_comments": total_comments,
},
"users": {
"signup_trend": _trend_queryset(users_qs, "date_joined", selected_granularity),
"by_major": _point_queryset(users_qs, "major__name"),
"by_university": _point_queryset(users_qs, "university__name"),
"by_year": _entry_year_group_queryset(users_qs)["top_items"],
},
"events": {
"registration_status": registration_status,
"by_major": _point_queryset(participant_qs, "user__major__name"),
"by_university": _point_queryset(participant_qs, "user__university__name"),
"top_events": top_events,
"registration_trend": _trend_queryset(registrations_qs, "registered_at", selected_granularity),
},
"revenue": {
"trend": revenue_trend,
"by_event": revenue_by_event,
"payment_status": payment_status,
"total_paid": total_revenue,
"total_discount": total_discount,
"total_base": total_base,
},
"blog": {
"totals": {
"posts": published_posts_count,
"likes": total_likes,
"saves": total_saves,
"comments": total_comments,
},
"post_popularity": post_popularity,
"top_posts": top_posts,
"activity_trend": list(activity_buckets.values()),
"by_category": category_engagement,
"by_tag": tag_engagement,
},
"achievements": {
"distinct_participants": participant_qs.values("user_id").distinct().count(),
"learning_hours": round(learning_hours, 1),
"published_content": published_posts_count,
"community_engagement": total_likes + total_saves + total_comments,
},
}

View File

@@ -22,6 +22,21 @@ class CategorySchema(ModelSchema):
return obj.parent_id
class AdminCategorySchema(CategorySchema):
post_count: int = 0
@staticmethod
def resolve_post_count(obj):
return getattr(obj, "post_count", None) or obj.posts.filter(is_deleted=False).count()
class CategoryWriteSchema(Schema):
name: str
slug: Optional[str] = None
description: Optional[str] = ""
parent_id: Optional[int] = None
class CategoryPathSchema(Schema):
id: int
name: str
@@ -45,6 +60,19 @@ class TagSchema(ModelSchema):
model_fields = ["id", "name", "slug", "created_at"]
class AdminTagSchema(TagSchema):
post_count: int = 0
@staticmethod
def resolve_post_count(obj):
return getattr(obj, "post_count", None) or obj.posts.filter(is_deleted=False).count()
class TagWriteSchema(Schema):
name: str
slug: Optional[str] = None
class TagFilterSchema(Schema):
id: int
name: str
@@ -220,6 +248,7 @@ class PostListSchema(Schema):
class PostDetailSchema(PostListSchema):
content: str
content_html: str
review_note: Optional[str] = ""
og_image_url: Optional[str] = None
assets: List[PostAssetSchema] = []

View File

@@ -7,17 +7,21 @@ from typing import List, Optional
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import IntegrityError
from django.db.models import Count, Prefetch, Q
from django.shortcuts import get_object_or_404
from django.utils import timezone
from ninja import File, Form, Query, Router, UploadedFile
from apps.blog.api.schemas import (
AdminCategorySchema,
AdminTagSchema,
BlogBannerSchema,
BlogFiltersSchema,
BlogInteractionSchema,
BlogProfileActivitySchema,
CategorySchema,
CategoryWriteSchema,
CommentCreateSchema,
CommentHideSchema,
CommentSchema,
@@ -30,6 +34,7 @@ from apps.blog.api.schemas import (
PostListSchema,
PostReviewSchema,
TagSchema,
TagWriteSchema,
)
from apps.blog.models import BlogBanner, Category, Comment, Like, Post, PostAsset, SavedPost, Tag
from apps.blog.permissions import (
@@ -310,6 +315,83 @@ def _notify_blog_comment(comment: Comment) -> None:
)
def _can_manage_blog_taxonomy(user) -> bool:
return bool(
user
and getattr(user, "is_authenticated", False)
and (
user.is_superuser
or user.is_staff
or user.has_perm("blog.add_category")
or user.has_perm("blog.change_category")
or user.has_perm("blog.add_tag")
or user.has_perm("blog.change_tag")
)
)
def _category_queryset_with_counts():
return Category.objects.annotate(post_count=Count("posts", filter=Q(posts__is_deleted=False), distinct=True))
def _tag_queryset_with_counts():
return Tag.objects.annotate(post_count=Count("posts", filter=Q(posts__is_deleted=False), distinct=True))
def _validate_category_parent(category_id: int | None, parent_id: int | None) -> tuple[Category | None, str | None]:
if not parent_id:
return None, None
if category_id and parent_id == category_id:
return None, "A category cannot be its own parent."
if category_id and Category.objects.filter(parent_id=category_id).exists():
return None, "A category with child categories must remain a root category."
parent = Category.objects.filter(id=parent_id).first()
if not parent:
return None, "Parent category not found."
if parent.parent_id:
return None, "Only root categories can be selected as a parent."
current = parent
seen: set[int] = set()
while current:
if current.id in seen:
return None, "Invalid category hierarchy."
seen.add(current.id)
if category_id and current.id == category_id:
return None, "Category parent would create a cycle."
current = current.parent
return parent, None
def _apply_category_payload(category: Category, data: CategoryWriteSchema) -> tuple[Category | None, str | None]:
name = (data.name or "").strip()
if not name:
return None, "Category name is required."
parent, error = _validate_category_parent(category.id, data.parent_id)
if error:
return None, error
category.name = name
if data.slug is not None:
category.slug = data.slug.strip()
category.description = data.description or ""
category.parent = parent
return category, None
def _apply_tag_payload(tag: Tag, data: TagWriteSchema) -> tuple[Tag | None, str | None]:
name = (data.name or "").strip()
if not name:
return None, "Tag name is required."
tag.name = name
if data.slug is not None:
tag.slug = data.slug.strip()
return tag, None
@blog_router.get("/admin/writers", response={200: List[AuthorSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_blog_writers(request):
if not (request.auth.is_superuser or request.auth.is_staff or can_review_blog_posts(request.auth)):
@@ -816,6 +898,96 @@ def restore_comment(request, comment_id: int):
return 400, {"error": "Comment not found or not soft-deleted."}
@blog_router.get("/admin/categories", response={200: List[AdminCategorySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_admin_categories(request):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
return 200, _category_queryset_with_counts().order_by("name")
@blog_router.post("/admin/categories", response={201: AdminCategorySchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def create_admin_category(request, data: CategoryWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
category, error = _apply_category_payload(Category(), data)
if error:
return 400, {"error": error}
try:
category.save()
except IntegrityError:
return 400, {"error": "Category name or slug already exists."}
return 201, _category_queryset_with_counts().get(id=category.id)
@blog_router.put("/admin/categories/{category_id}", response={200: AdminCategorySchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_admin_category(request, category_id: int, data: CategoryWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
category = get_object_or_404(Category, id=category_id)
category, error = _apply_category_payload(category, data)
if error:
return 400, {"error": error}
try:
category.save()
except IntegrityError:
return 400, {"error": "Category name or slug already exists."}
return 200, _category_queryset_with_counts().get(id=category.id)
@blog_router.delete("/admin/categories/{category_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_admin_category(request, category_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
category = get_object_or_404(Category, id=category_id)
category.delete()
return 200, {"message": f"Category '{category.name}' deleted successfully."}
@blog_router.get("/admin/tags", response={200: List[AdminTagSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_admin_tags(request):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
return 200, _tag_queryset_with_counts().order_by("name")
@blog_router.post("/admin/tags", response={201: AdminTagSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def create_admin_tag(request, data: TagWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
tag, error = _apply_tag_payload(Tag(), data)
if error:
return 400, {"error": error}
try:
tag.save()
except IntegrityError:
return 400, {"error": "Tag name or slug already exists."}
return 201, _tag_queryset_with_counts().get(id=tag.id)
@blog_router.put("/admin/tags/{tag_id}", response={200: AdminTagSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_admin_tag(request, tag_id: int, data: TagWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
tag = get_object_or_404(Tag, id=tag_id)
tag, error = _apply_tag_payload(tag, data)
if error:
return 400, {"error": error}
try:
tag.save()
except IntegrityError:
return 400, {"error": "Tag name or slug already exists."}
return 200, _tag_queryset_with_counts().get(id=tag.id)
@blog_router.delete("/admin/tags/{tag_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_admin_tag(request, tag_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
tag = get_object_or_404(Tag, id=tag_id)
tag.delete()
return 200, {"message": f"Tag '{tag.name}' deleted successfully."}
@blog_router.get("/categories", response=List[CategorySchema])
def list_categories(request):
return Category.objects.all()

View File

@@ -0,0 +1,125 @@
from __future__ import annotations
import sys
from django.core.management.base import BaseCommand
from apps.blog.models import Category
CATEGORIES = [
{
"name": "اخبار و اطلاعیه‌ها",
"description": "خبرها، اطلاعیه‌ها و گزارش‌های مرتبط با انجمن، دانشکده و جامعه دانشجویی.",
"children": [
("اخبار انجمن", "خبرها و گزارش‌های رسمی انجمن علمی مهندسی کامپیوتر."),
("اطلاعیه‌های آموزشی", "اطلاعیه‌های مهم آموزشی، انتخاب واحد، امتحانات و امور دانشجویی."),
("رویدادها و کارگاه‌ها", "معرفی، گزارش و پیگیری رویدادها، نشست‌ها و کارگاه‌ها."),
],
},
{
"name": "آموزش و مسیر یادگیری",
"description": "مطالب آموزشی و مسیرهای یادگیری برای دانشجویان علوم و مهندسی کامپیوتر.",
"children": [
("راهنمای شروع", "مطالب مقدماتی برای شروع برنامه‌نویسی، دانشگاه و مهارت‌آموزی."),
("آموزش‌های فنی", "آموزش‌های عملی، گام‌به‌گام و مسئله‌محور در حوزه‌های فنی."),
("منابع یادگیری", "معرفی کتاب، دوره، مستندات، مسیر مطالعه و منابع مفید."),
],
},
{
"name": "فناوری و مهندسی نرم‌افزار",
"description": "مقاله‌های فنی درباره توسعه نرم‌افزار، ابزارها، معماری و فناوری‌های روز.",
"children": [
("برنامه‌نویسی", "زبان‌ها، الگوها، نکته‌های کدنویسی و تجربه‌های عملی توسعه."),
("وب و اپلیکیشن", "فرانت‌اند، بک‌اند، موبایل، API و تجربه ساخت محصول."),
("دواپس و ابزارها", "لینوکس، گیت، CI/CD، استقرار، کانتینر و ابزارهای توسعه."),
],
},
{
"name": "هوش مصنوعی و داده",
"description": "مطالب مرتبط با هوش مصنوعی، یادگیری ماشین، داده و کاربردهای آن‌ها.",
"children": [
("یادگیری ماشین", "مفاهیم، تمرین‌ها و تجربه‌های یادگیری ماشین و مدل‌سازی."),
("علم داده", "تحلیل داده، مصورسازی، آمار کاربردی و پروژه‌های داده‌محور."),
("هوش مصنوعی کاربردی", "ابزارها، کاربردها، ایده‌ها و تجربه‌های عملی با AI."),
],
},
{
"name": "دانشگاه و پژوهش",
"description": "محتوای علمی، پژوهشی و دانشگاهی برای دانشجویان و اعضای انجمن.",
"children": [
("پژوهش دانشجویی", "تجربه‌ها، معرفی مقاله، ایده پژوهشی و همکاری‌های علمی."),
("درس و دانشگاه", "راهنمای درس‌ها، پروژه‌های درسی، امتحان و تجربه دانشگاهی."),
("مسابقات علمی", "برنامه‌نویسی رقابتی، مسابقات، چالش‌ها و آمادگی تیمی."),
],
},
{
"name": "پروژه‌ها و تجربه‌ها",
"description": "تجربه‌های واقعی دانشجویان از پروژه، کار تیمی، کارآموزی و مسیر حرفه‌ای.",
"children": [
("پروژه‌های دانشجویی", "معرفی، کالبدشکافی و گزارش پروژه‌های دانشجویی و تیمی."),
("کارآموزی و بازار کار", "رزومه، مصاحبه، کارآموزی، مسیر شغلی و تجربه ورود به کار."),
("تجربه‌های انجمنی", "روایت‌ها و درس‌آموخته‌های فعالیت در انجمن و تیم‌های دانشجویی."),
],
},
]
def console_safe(value: str) -> str:
encoding = sys.stdout.encoding or "utf-8"
return value.encode(encoding, errors="backslashreplace").decode(encoding)
class Command(BaseCommand):
help = "Create or update production blog categories for the CS association blog."
def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true", help="Print planned categories without writing changes.")
def handle(self, *args, **options):
dry_run = options["dry_run"]
root_count = 0
child_count = 0
for root_spec in CATEGORIES:
if dry_run:
self.stdout.write(console_safe(f"[root] {root_spec['name']}"))
for child_name, _ in root_spec["children"]:
self.stdout.write(console_safe(f" [child] {child_name}"))
continue
root = self._upsert_category(
name=root_spec["name"],
description=root_spec["description"],
parent=None,
)
root_count += 1
for child_name, child_description in root_spec["children"]:
self._upsert_category(
name=child_name,
description=child_description,
parent=root,
)
child_count += 1
if dry_run:
self.stdout.write(self.style.WARNING("Dry run only. No categories were changed."))
return
self.stdout.write(self.style.SUCCESS(f"Blog categories synchronized: {root_count} roots, {child_count} children."))
def _upsert_category(self, *, name: str, description: str, parent: Category | None) -> Category:
if parent and parent.parent_id:
raise ValueError(f"Invalid category tree: parent '{parent.name}' is not a root category.")
category, _ = Category.all_objects.update_or_create(
name=name,
defaults={
"description": description,
"parent": parent,
"is_deleted": False,
"deleted_at": None,
},
)
return category

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import sys
from django.core.management.base import BaseCommand
from apps.blog.models import Tag
TAGS = [
"انجمن علمی",
"دانشکده",
"اطلاعیه",
"رویداد",
"کارگاه",
"گزارش رویداد",
"برنامه‌نویسی",
"پایتون",
"جاوااسکریپت",
"تایپ‌اسکریپت",
"جاوا",
"سی‌پلاس‌پلاس",
"گولنگ",
"فرانت‌اند",
"بک‌اند",
"React",
"Next.js",
"Django",
"REST API",
"پایگاه داده",
"PostgreSQL",
"Redis",
"گیت",
"لینوکس",
"Docker",
"DevOps",
"استقرار",
"امنیت",
"شبکه",
"سیستم‌عامل",
"الگوریتم",
"ساختمان داده",
"برنامه‌نویسی رقابتی",
"حل مسئله",
"هوش مصنوعی",
"یادگیری ماشین",
"یادگیری عمیق",
"علم داده",
"تحلیل داده",
"داده‌کاوی",
"پردازش زبان طبیعی",
"بینایی ماشین",
"پژوهش",
"مقاله‌خوانی",
"پروژه دانشجویی",
"پروژه درسی",
"تیم‌سازی",
"مدیریت پروژه",
"طراحی نرم‌افزار",
"معماری نرم‌افزار",
"تست نرم‌افزار",
"تجربه کاربری",
"طراحی رابط کاربری",
"اپن‌سورس",
"کارآموزی",
"رزومه",
"مصاحبه",
"مسیر شغلی",
"منابع یادگیری",
"کتاب",
"دوره آموزشی",
"تجربه دانشجویی",
"انتخاب واحد",
"امتحانات",
"آموزش",
"راهنمای شروع",
]
def console_safe(value: str) -> str:
encoding = sys.stdout.encoding or "utf-8"
return value.encode(encoding, errors="backslashreplace").decode(encoding)
class Command(BaseCommand):
help = "Create or update production blog tags for the CS association blog."
def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true", help="Print planned tags without writing changes.")
def handle(self, *args, **options):
dry_run = options["dry_run"]
if dry_run:
for name in TAGS:
self.stdout.write(console_safe(f"[tag] {name}"))
self.stdout.write(self.style.WARNING("Dry run only. No tags were changed."))
return
count = 0
for name in TAGS:
Tag.all_objects.update_or_create(
name=name,
defaults={
"is_deleted": False,
"deleted_at": None,
},
)
count += 1
self.stdout.write(self.style.SUCCESS(f"Blog tags synchronized: {count} tags."))

View File

@@ -138,6 +138,7 @@ class EventListSchema(Schema):
class EventCreateSchema(Schema):
"""Payload for creating events via the API."""
title: str
slug: Optional[str] = None
description: str
event_type: str
address: Optional[str] = None
@@ -150,11 +151,13 @@ class EventCreateSchema(Schema):
capacity: Optional[int] = None
price: Optional[float] = None
status: str = "draft"
registration_success_markdown: Optional[str] = None
gallery_image_ids: Optional[List[int]] = []
class EventUpdateSchema(Schema):
"""Payload for updating events via the API."""
title: Optional[str] = None
slug: Optional[str] = None
description: Optional[str] = None
event_type: Optional[str] = None
address: Optional[str] = None
@@ -167,6 +170,7 @@ class EventUpdateSchema(Schema):
capacity: Optional[int] = None
price: Optional[float] = None
status: Optional[str] = None
registration_success_markdown: Optional[str] = None
gallery_image_ids: Optional[List[int]] = None
class RegistrationSchema(ModelSchema):
@@ -199,12 +203,56 @@ class AdminUserSchema(Schema):
first_name: str
last_name: str
email: str
mobile: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
university: Optional[str] = None
major: Optional[str] = None
student_id: Optional[str] = None
year_of_study: Optional[int] = None
@staticmethod
def resolve_profile_picture(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
return request.build_absolute_uri(image.url) if hasattr(image, "url") else None
@staticmethod
def resolve_profile_picture_thumbnail_url(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
url = derivative_url(image, THUMBNAIL_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_profile_picture_preview_url(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
url = derivative_url(image, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_university(obj):
return obj.get_university_display()
@staticmethod
def resolve_major(obj):
return obj.get_major_display()
class PaymentAdminSchema(Schema):
id: int
authority: Optional[str]
ref_id: Optional[str]
card_pan: Optional[str]
card_hash: Optional[str]
status: int
status_label: str
base_amount: int
@@ -241,7 +289,7 @@ class EventAdminDetailSchema(EventSchema):
@staticmethod
def resolve_registrations(obj):
return obj.registrations.select_related("user").prefetch_related(
return obj.registrations.select_related("user", "user__university", "user__major").prefetch_related(
"payments__discount_code"
).order_by("-registered_at")

View File

@@ -1,18 +1,20 @@
from django.conf import settings
from django.core.files.base import ContentFile
from django.shortcuts import get_object_or_404
from django.db.models import Q, Case, When, IntegerField
from django.utils.text import slugify
from django.utils import timezone
from ninja import Router, Query
from ninja import File, Router, Query, UploadedFile
from ninja.errors import HttpError
from typing import List, Optional
from uuid import UUID
from uuid import UUID, uuid4
from apps.events.api.schemas import (
EventAdminDetailSchema,
EventBriefSchema,
EventCreateSchema,
EventGallerySchema,
EventListSchema,
EventSchema,
EventUpdateSchema,
@@ -26,6 +28,8 @@ from apps.events.api.schemas import (
)
from core.authentication import jwt_auth
from apps.events.models import Event, Registration
from apps.gallery.models import Gallery
from apps.gallery.tasks import process_uploaded_image
from apps.notifications.services import notify_user
from apps.payments.models import DiscountCode
from apps.users.tasks import send_critical_sms
@@ -34,6 +38,28 @@ from core.api.schemas import ErrorSchema, MessageSchema
events_router = Router()
def _is_staff_user(user) -> bool:
return bool(user and (user.is_staff or user.is_superuser))
def _staff_forbidden():
return 403, {"error": "اجازه دسترسی ندارید."}
def _save_uploaded_image(instance, field_name: str, file: UploadedFile, folder: str):
if not file.content_type or not file.content_type.startswith("image/"):
return False, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return False, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
getattr(instance, field_name).save(
f"{folder}/{uuid4().hex}.{extension}",
ContentFile(file.read()),
save=True,
)
return True, instance
def _frontend_event_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):
@@ -130,20 +156,32 @@ def get_event_by_slug(request, slug: str):
)
return event
@events_router.post("/", response=EventSchema)
@events_router.post("/", response={201: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def create_event(request, payload: EventCreateSchema):
"""Create a new event"""
gallery_image_ids = payload.dict().pop('gallery_image_ids', [])
event = Event.objects.create(**payload.dict(exclude={'gallery_image_ids'}))
if not _is_staff_user(request.auth):
return _staff_forbidden()
data = payload.dict(exclude={'gallery_image_ids'})
gallery_image_ids = payload.gallery_image_ids or []
if data.get("slug"):
data["slug"] = slugify(data["slug"])
event = Event(**data)
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids:
event.gallery_images.set(gallery_image_ids)
return event
return 201, event
@events_router.put("/{int:event_id}", response=EventSchema)
@events_router.put("/{int:event_id}", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_event(request, event_id: int, payload: EventUpdateSchema):
"""Update an existing event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
previous_state = {
"status": event.status,
@@ -158,12 +196,18 @@ def update_event(request, event_id: int, payload: EventUpdateSchema):
gallery_image_ids = update_data.pop('gallery_image_ids', None)
for attr, value in update_data.items():
if attr == "slug" and value:
value = slugify(value)
setattr(event, attr, value)
if 'title' in update_data:
if 'title' in update_data and not update_data.get("slug"):
event.slug = slugify(event.title)
event.save()
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids is not None:
event.gallery_images.set(gallery_image_ids)
@@ -196,14 +240,94 @@ def update_event(request, event_id: int, payload: EventUpdateSchema):
sms_kind="event_reschedule",
)
return event
return 200, event
@events_router.delete("/{int:event_id}", response=MessageSchema)
@events_router.delete("/{int:event_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event(request, event_id: int):
"""Soft delete an event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
event.delete()
return {"message": "Event deleted successfully"}
return 200, {"message": "Event deleted successfully"}
@events_router.post("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_featured_image(request, event_id: int, file: UploadedFile = File(...)):
"""Upload or replace the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
ok, result = _save_uploaded_image(event, "featured_image", file, "events/featured")
if not ok:
return 400, result
return 200, event
@events_router.delete("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_featured_image(request, event_id: int):
"""Remove the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if event.featured_image:
event.featured_image.delete(save=False)
event.featured_image = None
event.save(update_fields=["featured_image", "updated_at"])
return 200, event
@events_router.get("/{int:event_id}/gallery", response={200: List[EventGallerySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_event_gallery(request, event_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
return 200, event.gallery_images.filter(is_deleted=False).select_related("uploaded_by")
@events_router.post("/{int:event_id}/gallery", response={201: EventGallerySchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_gallery_image(
request,
event_id: int,
file: UploadedFile = File(...),
title: str | None = None,
alt_text: str | None = None,
):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if not file.content_type or not file.content_type.startswith("image/"):
return 400, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return 400, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
try:
gallery_item = Gallery.objects.create(
title=title or file.name,
description="",
uploaded_by=request.auth,
alt_text=alt_text or title or file.name,
is_public=True,
)
gallery_item._defer_image_processing = True
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
gallery_item.image.save(f"gallery/{uuid4().hex}.{extension}", ContentFile(file.read()))
event.gallery_images.add(gallery_item)
process_uploaded_image.delay(gallery_item.id)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, gallery_item
@events_router.delete("/{int:event_id}/gallery/{int:image_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_gallery_image(request, event_id: int, image_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
image = get_object_or_404(Gallery, id=image_id, is_deleted=False)
event.gallery_images.remove(image)
if not image.event_galleries.exclude(id=event.id).exists():
image.delete()
return 200, {"message": "Gallery image removed"}
# Registration endpoints
@events_router.get("/{int:event_id}/registrations", response=List[RegistrationSchema])
@@ -235,7 +359,7 @@ def list_event_registrations_admin(
event = get_object_or_404(Event, id=event_id, is_deleted=False)
qs = (
event.registrations.filter(is_deleted=False)
.select_related("user")
.select_related("user", "user__university", "user__major")
.prefetch_related("payments__discount_code")
.order_by("-registered_at")
)
@@ -259,6 +383,7 @@ def list_event_registrations_admin(
if search:
qs = qs.filter(
Q(user__username__icontains=search)
| Q(user__mobile__icontains=search)
| Q(user__email__icontains=search)
| Q(user__first_name__icontains=search)
| Q(user__last_name__icontains=search)

View File

@@ -37,6 +37,7 @@ class EventsAPIIntegrationTests(TestCase):
cls.user = User.objects.create_user(
username="event_user",
email="event.user@example.com",
mobile="09198000001",
password=cls.password,
)
cls.user.is_email_verified = True
@@ -45,6 +46,7 @@ class EventsAPIIntegrationTests(TestCase):
cls.staff = User.objects.create_user(
username="event_staff",
email="event.staff@example.com",
mobile="09198000002",
password=cls.password,
is_staff=True,
)
@@ -151,19 +153,21 @@ class EventsAPIIntegrationTests(TestCase):
"/api/events/",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(created.status_code, 200)
self.assertEqual(created.status_code, 201)
event_id = created.json()["id"]
updated = self.client.put(
f"/api/events/{event_id}",
data=json.dumps({"title": "Updated Event"}),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(updated.status_code, 200)
self.assertEqual(updated.json()["title"], "Updated Event")
deleted = self.client.delete(f"/api/events/{event_id}")
deleted = self.client.delete(f"/api/events/{event_id}", **self._auth_headers(self.staff_token))
self.assertEqual(deleted.status_code, 200)
def test_admin_detail_and_registration_list_requires_staff(self):
@@ -230,9 +234,10 @@ class EventsAPIIntegrationTests(TestCase):
"/api/events/",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
body = response.json()
self.assertEqual(response.status_code, 200)
self.assertEqual(response.status_code, 201)
self.assertTrue(body["gallery_images"])
updated = self.client.put(
@@ -244,6 +249,7 @@ class EventsAPIIntegrationTests(TestCase):
}
),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(updated.status_code, 200)
self.assertEqual(updated.json()["slug"], "gallery-event-updated")
@@ -370,7 +376,8 @@ class EventsAPIIntegrationTests(TestCase):
self.assertEqual(response.status_code, 400)
def _create_event_user(self, username, email):
user = User.objects.create_user(username=username, email=email, password=self.password)
suffix = str(abs(hash(username)) % 1_000_000).zfill(6)
user = User.objects.create_user(username=username, email=email, mobile=f"09190{suffix}", password=self.password)
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
user.major = self.user.major
@@ -468,6 +475,7 @@ class EventSchemasIntegrationTests(TestCase):
self.user = User.objects.create_user(
username="schema_user",
email="schema.user@example.com",
mobile="09198000003",
password=self.password,
)
self.user.is_email_verified = True

View File

@@ -1,4 +1,42 @@
from ninja import Schema
from datetime import datetime
class DiscountCodeSchema(Schema):
id: int
code: str
type: str
value: int
max_discount: int | None = None
is_active: bool
starts_at: datetime | None = None
ends_at: datetime | None = None
usage_limit_total: int | None = None
usage_limit_per_user: int | None = None
min_amount: int | None = None
applicable_event_ids: list[int]
usage_count: int = 0
created_at: datetime
updated_at: datetime
class PagedDiscountCodeSchema(Schema):
count: int
results: list[DiscountCodeSchema]
class DiscountCodeWriteSchema(Schema):
code: str
type: str = "percent"
value: int
max_discount: int | None = None
is_active: bool = True
starts_at: datetime | None = None
ends_at: datetime | None = None
usage_limit_total: int | None = None
usage_limit_per_user: int | None = None
min_amount: int | None = None
applicable_event_ids: list[int] = []
class CreatePaymentIn(Schema):

View File

@@ -1,8 +1,9 @@
from django.conf import settings
from django.shortcuts import redirect, get_object_or_404
from django.utils import timezone
from django.db.models import Count, Q
from ninja import Router
from ninja import Query, Router
from ninja.errors import HttpError
import requests
@@ -11,11 +12,140 @@ from apps.events.models import Event, Registration
from apps.notifications.services import notify_user
from apps.users.tasks import send_critical_sms
from core.authentication import jwt_auth
from apps.payments.api.schemas import CouponVerifyIn, CouponVerifyOut, CreatePaymentIn, CreatePaymentOut, PaymentDetailOut
from core.api.schemas import ErrorSchema, MessageSchema
from apps.payments.api.schemas import (
CouponVerifyIn,
CouponVerifyOut,
CreatePaymentIn,
CreatePaymentOut,
DiscountCodeSchema,
DiscountCodeWriteSchema,
PagedDiscountCodeSchema,
PaymentDetailOut,
)
payments_router = Router(tags=["Payments"])
def _staff_required(user):
return bool(user and (user.is_staff or user.is_superuser))
def _discount_payload(code: DiscountCode):
return {
"id": code.id,
"code": code.code,
"type": code.type,
"value": code.value,
"max_discount": code.max_discount,
"is_active": code.is_active,
"starts_at": code.starts_at,
"ends_at": code.ends_at,
"usage_limit_total": code.usage_limit_total,
"usage_limit_per_user": code.usage_limit_per_user,
"min_amount": code.min_amount,
"applicable_event_ids": list(code.applicable_events.values_list("id", flat=True)),
"usage_count": getattr(code, "usage_count", None) or code.payments.filter(
status__in=[Payment.OrderStatusChoices.PAID, Payment.OrderStatusChoices.PENDING]
).count(),
"created_at": code.created_at,
"updated_at": code.updated_at,
}
def _apply_discount_payload(instance: DiscountCode, payload: DiscountCodeWriteSchema):
data = payload.dict()
event_ids = data.pop("applicable_event_ids", [])
for field, value in data.items():
setattr(instance, field, value)
instance.code = instance.code.strip().upper()
instance.full_clean()
instance.save()
instance.applicable_events.set(Event.objects.filter(id__in=event_ids, is_deleted=False))
return instance
@payments_router.get("/admin/discount-codes", response={200: PagedDiscountCodeSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_discount_codes(
request,
search: str | None = Query(None),
is_active: bool | None = Query(None),
type: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
queryset = DiscountCode.objects.annotate(
usage_count=Count(
"payments",
filter=Q(
payments__status__in=[
Payment.OrderStatusChoices.PAID,
Payment.OrderStatusChoices.PENDING,
]
),
)
).prefetch_related("applicable_events").order_by("-created_at")
if search:
queryset = queryset.filter(code__icontains=search)
if is_active is not None:
queryset = queryset.filter(is_active=is_active)
if type:
queryset = queryset.filter(type=type)
count = queryset.count()
return 200, {"count": count, "results": [_discount_payload(item) for item in queryset[offset : offset + limit]]}
@payments_router.post("/admin/discount-codes", response={201: DiscountCodeSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_discount_code(request, payload: DiscountCodeWriteSchema):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
if DiscountCode.all_objects.filter(code=payload.code.strip().upper()).exists():
return 400, {"error": "Discount code already exists"}
try:
code = _apply_discount_payload(DiscountCode(), payload)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, _discount_payload(code)
@payments_router.put("/admin/discount-codes/{int:code_id}", response={200: DiscountCodeSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_discount_code(request, code_id: int, payload: DiscountCodeWriteSchema):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
code = get_object_or_404(DiscountCode, id=code_id, is_deleted=False)
normalized = payload.code.strip().upper()
if DiscountCode.all_objects.filter(code=normalized).exclude(id=code_id).exists():
return 400, {"error": "Discount code already exists"}
try:
code = _apply_discount_payload(code, payload)
except Exception as exc:
return 400, {"error": str(exc)}
return 200, _discount_payload(code)
@payments_router.delete("/admin/discount-codes/{int:code_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_discount_code(request, code_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete discount codes"}
code = get_object_or_404(DiscountCode, id=code_id, is_deleted=False)
code.delete()
return 200, {"message": "Discount code deleted"}
@payments_router.post("/admin/discount-codes/{int:code_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_discount_code(request, code_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore discount codes"}
try:
code = DiscountCode.deleted_objects.get(id=code_id)
except DiscountCode.DoesNotExist:
return 400, {"error": "Discount code not found"}
code.restore()
return 200, {"message": "Discount code restored"}
def _event_action_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):

View File

@@ -1,15 +1,209 @@
from ninja import Router
from django.db.models import Q
from ninja import Query, Router, Schema
from apps.users.models import Major, University
from core.api.schemas import ErrorSchema, MessageSchema
from core.authentication import jwt_auth
meta_router = Router(tags=['meta'])
@meta_router.get("/majors")
def list_majors(request):
majors = Major.objects.filter(is_deleted=False, is_active=True).order_by("name")
return [{"id": m.id, "code": m.code, "label": m.name} for m in majors]
@meta_router.get("/universities")
def list_universities(request):
universities = University.objects.filter(is_deleted=False, is_active=True).order_by("name")
return [{"id": u.id, "code": u.code, "label": u.name} for u in universities]
class MetaOptionSchema(Schema):
id: int
code: str
label: str
is_active: bool = True
user_count: int = 0
class PagedMetaOptionSchema(Schema):
count: int
results: list[MetaOptionSchema]
class MetaOptionWriteSchema(Schema):
code: str
name: str
is_active: bool = True
def _is_staff(user):
return bool(user and (user.is_staff or user.is_superuser))
def _option_payload(obj, user_count=0):
return {
"id": obj.id,
"code": obj.code,
"label": obj.name,
"is_active": obj.is_active,
"user_count": user_count,
}
def _list_options(model, search, limit, offset, active_only=True):
queryset = model.objects.filter(is_deleted=False).order_by("name")
if active_only:
queryset = queryset.filter(is_active=True)
if search:
queryset = queryset.filter(Q(code__icontains=search) | Q(name__icontains=search))
count = queryset.count()
return count, list(queryset[offset : offset + limit])
@meta_router.get("/majors", response=PagedMetaOptionSchema)
def list_majors(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
count, majors = _list_options(Major, search, limit, offset)
return {"count": count, "results": [_option_payload(m) for m in majors]}
@meta_router.get("/universities", response=PagedMetaOptionSchema)
def list_universities(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
count, universities = _list_options(University, search, limit, offset)
return {"count": count, "results": [_option_payload(u) for u in universities]}
@meta_router.get("/admin/majors", response={200: PagedMetaOptionSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_majors(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
count, majors = _list_options(Major, search, limit, offset, active_only=False)
return 200, {
"count": count,
"results": [_option_payload(m, m.users.filter(is_deleted=False).count()) for m in majors],
}
@meta_router.post("/admin/majors", response={201: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_major(request, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
if Major.all_objects.filter(code=payload.code).exists():
return 400, {"error": "Major code already exists"}
major = Major.objects.create(code=payload.code.strip(), name=payload.name.strip(), is_active=payload.is_active)
return 201, _option_payload(major)
@meta_router.put("/admin/majors/{int:item_id}", response={200: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_major(request, item_id: int, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
try:
major = Major.objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
conflict = Major.all_objects.filter(code=payload.code).exclude(id=item_id).exists()
if conflict:
return 400, {"error": "Major code already exists"}
major.code = payload.code.strip()
major.name = payload.name.strip()
major.is_active = payload.is_active
major.save(update_fields=["code", "name", "is_active", "updated_at"])
return 200, _option_payload(major, major.users.filter(is_deleted=False).count())
@meta_router.delete("/admin/majors/{int:item_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_major(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete majors"}
try:
major = Major.objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
major.delete()
return 200, {"message": "Major deleted"}
@meta_router.post("/admin/majors/{int:item_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_major(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore majors"}
try:
major = Major.deleted_objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
major.restore()
return 200, {"message": "Major restored"}
@meta_router.get("/admin/universities", response={200: PagedMetaOptionSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_universities(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
count, universities = _list_options(University, search, limit, offset, active_only=False)
return 200, {
"count": count,
"results": [_option_payload(u, u.users.filter(is_deleted=False).count()) for u in universities],
}
@meta_router.post("/admin/universities", response={201: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_university(request, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
if University.all_objects.filter(code=payload.code).exists():
return 400, {"error": "University code already exists"}
university = University.objects.create(code=payload.code.strip(), name=payload.name.strip(), is_active=payload.is_active)
return 201, _option_payload(university)
@meta_router.put("/admin/universities/{int:item_id}", response={200: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_university(request, item_id: int, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
try:
university = University.objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
conflict = University.all_objects.filter(code=payload.code).exclude(id=item_id).exists()
if conflict:
return 400, {"error": "University code already exists"}
university.code = payload.code.strip()
university.name = payload.name.strip()
university.is_active = payload.is_active
university.save(update_fields=["code", "name", "is_active", "updated_at"])
return 200, _option_payload(university, university.users.filter(is_deleted=False).count())
@meta_router.delete("/admin/universities/{int:item_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_university(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete universities"}
try:
university = University.objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
university.delete()
return 200, {"message": "University deleted"}
@meta_router.post("/admin/universities/{int:item_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_university(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore universities"}
try:
university = University.deleted_objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
university.restore()
return 200, {"message": "University restored"}

View File

@@ -1,7 +1,7 @@
"""Authentication-related API schemas."""
from datetime import datetime
from typing import Optional
from typing import List, Optional
from ninja import ModelSchema, Schema
@@ -178,6 +178,19 @@ class UserListSchema(ModelSchema):
major: Optional[str] = None
university: Optional[str] = None
mobile: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
student_id: Optional[str] = None
year_of_study: Optional[int] = None
bio: Optional[str] = None
is_email_verified: bool
is_mobile_verified: bool
is_deleted: bool
deleted_at: Optional[datetime] = None
can_access_blog_admin: bool
can_write_blog_posts: bool
can_review_blog_posts: bool
class Meta:
model = User
@@ -188,13 +201,19 @@ class UserListSchema(ModelSchema):
"mobile",
"first_name",
"last_name",
"student_id",
"year_of_study",
"bio",
"is_active",
"is_staff",
"is_superuser",
"date_joined",
"major",
"university",
"is_email_verified",
"is_mobile_verified",
"is_deleted",
"deleted_at",
]
@staticmethod
@@ -205,6 +224,64 @@ class UserListSchema(ModelSchema):
def resolve_university(obj):
return obj.get_university_display()
@staticmethod
def resolve_can_access_blog_admin(obj):
return can_access_blog_admin(obj)
@staticmethod
def resolve_can_write_blog_posts(obj):
return can_write_blog_posts(obj)
@staticmethod
def resolve_can_review_blog_posts(obj):
return can_review_blog_posts(obj)
@staticmethod
def resolve_profile_picture(obj, context):
request = context["request"]
if obj.profile_picture and hasattr(obj.profile_picture, "url"):
return request.build_absolute_uri(obj.profile_picture.url)
return None
@staticmethod
def resolve_profile_picture_thumbnail_url(obj, context):
request = context["request"]
url = derivative_url(obj.profile_picture, THUMBNAIL_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_profile_picture_preview_url(obj, context):
request = context["request"]
url = derivative_url(obj.profile_picture, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
class AuthorizationRoleSchema(Schema):
key: str
label: str
description: str
enabled: bool = False
locked: bool = False
class UserAuthorizationSchema(Schema):
id: int
username: str
email: Optional[str] = None
mobile: Optional[str] = None
first_name: str
last_name: str
is_active: bool
is_staff: bool
is_superuser: bool
groups: List[str]
roles: List[AuthorizationRoleSchema]
class UserAuthorizationUpdateSchema(Schema):
is_staff: bool = False
groups: List[str] = []
class UserUpdateSchema(Schema):
email: Optional[str] = None

View File

@@ -4,6 +4,7 @@ import jwt
import uuid
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.files.base import ContentFile
from django.db.models import Q
from django.http import HttpResponseRedirect
@@ -11,6 +12,7 @@ from django.shortcuts import get_object_or_404
from ninja import Query, Router
from apps.users.api.schemas import (
AuthorizationRoleSchema,
GoogleClaimVerifySchema,
GoogleCompleteSchema,
GoogleFlowResponseSchema,
@@ -25,6 +27,8 @@ from apps.users.api.schemas import (
TokenRefreshIn,
TokenSchema,
UserListSchema,
UserAuthorizationSchema,
UserAuthorizationUpdateSchema,
UserLoginSchema,
UserOtpLoginSchema,
UserProfileSchema,
@@ -32,6 +36,7 @@ from apps.users.api.schemas import (
UserUpdateSchema,
UsernameCheckSchema,
)
from apps.blog.permissions import ASSOCIATION_ADMIN_GROUP, BLOG_EDITOR_GROUP, BLOG_SUPERVISOR_GROUP
from apps.users.email_identity import normalize_email_identity
from apps.users.models import Major, University, User
from apps.users.services.auth import (
@@ -70,11 +75,93 @@ from core.media import delete_image_derivatives
auth_router = Router()
CURATED_ROLE_GROUPS = {
BLOG_EDITOR_GROUP,
BLOG_SUPERVISOR_GROUP,
ASSOCIATION_ADMIN_GROUP,
}
ROLE_SPECS = [
{
"key": BLOG_EDITOR_GROUP,
"label": "ویرایشگر بلاگ",
"description": "امکان نوشتن و مدیریت نوشته‌های خودش در بلاگ.",
"group": BLOG_EDITOR_GROUP,
},
{
"key": BLOG_SUPERVISOR_GROUP,
"label": "سرپرست بلاگ",
"description": "امکان بررسی، انتشار، مدیریت دسته‌ها/برچسب‌ها و نظارت کامنت‌ها.",
"group": BLOG_SUPERVISOR_GROUP,
},
{
"key": ASSOCIATION_ADMIN_GROUP,
"label": "ادمین انجمن",
"description": "نقش سازمانی انجمن برای دسترسی‌های مدیریتی منتخب.",
"group": ASSOCIATION_ADMIN_GROUP,
},
{
"key": "staff_admin",
"label": "دسترسی پنل مدیریت",
"description": "فعال‌سازی is_staff برای ورود به بخش‌های مدیریتی عمومی.",
"field": "is_staff",
},
{
"key": "is_superuser",
"label": "سوپریوزر",
"description": "دسترسی کامل Django؛ از این صفحه قابل تغییر نیست.",
"field": "is_superuser",
"locked": True,
},
]
def _error_response(exc: AuthServiceError | GoogleOAuthFlowError):
return exc.status_code, {"error": exc.message}
def _ensure_superuser(user):
return bool(user and user.is_superuser)
def _role_payload(user: User) -> list[dict]:
user_groups = set(user.groups.values_list("name", flat=True))
roles = []
for spec in ROLE_SPECS:
key = spec["key"]
enabled = False
if spec.get("group"):
enabled = spec["group"] in user_groups
elif spec.get("field"):
enabled = bool(getattr(user, spec["field"]))
roles.append(
{
"key": key,
"label": spec["label"],
"description": spec["description"],
"enabled": enabled,
"locked": bool(spec.get("locked", False)),
}
)
return roles
def _authorization_payload(user: User) -> dict:
return {
"id": user.id,
"username": user.username,
"email": user.email,
"mobile": user.mobile,
"first_name": user.first_name,
"last_name": user.last_name,
"is_active": user.is_active,
"is_staff": user.is_staff,
"is_superuser": user.is_superuser,
"groups": list(user.groups.values_list("name", flat=True)),
"roles": _role_payload(user),
}
def _get_major_from_code(code: str | None):
if not code:
return None
@@ -446,6 +533,64 @@ def list_users(
return queryset[offset : offset + limit]
@auth_router.get("/users/{user_id}", response={200: UserProfileSchema, 403: ErrorSchema, 404: ErrorSchema}, auth=jwt_auth)
def get_user_detail(request, user_id: int):
user = request.auth
if not (user.is_staff or user.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
target = get_object_or_404(User, id=user_id)
return 200, target
@auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_authorization_roles(request):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
return 200, [
{
"key": spec["key"],
"label": spec["label"],
"description": spec["description"],
"enabled": False,
"locked": bool(spec.get("locked", False)),
}
for spec in ROLE_SPECS
]
@auth_router.get("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema}, auth=jwt_auth)
def get_user_authorization(request, user_id: int):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
user = get_object_or_404(User, id=user_id)
return 200, _authorization_payload(user)
@auth_router.put("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_user_authorization(request, user_id: int, data: UserAuthorizationUpdateSchema):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
user = get_object_or_404(User, id=user_id)
if user.id == request.auth.id:
return 400, {"error": "برای جلوگیری از قفل شدن دسترسی، نمی‌توانید نقش‌های خودتان را از این صفحه تغییر دهید."}
requested_groups = set(data.groups or [])
invalid_groups = requested_groups - CURATED_ROLE_GROUPS
if invalid_groups:
return 400, {"error": "نقش انتخاب‌شده معتبر نیست."}
user.is_staff = bool(data.is_staff)
user.save(update_fields=["is_staff"])
current_curated_groups = list(Group.objects.filter(name__in=CURATED_ROLE_GROUPS))
if current_curated_groups:
user.groups.remove(*current_curated_groups)
groups_to_add = [Group.objects.get_or_create(name=name)[0] for name in sorted(requested_groups)]
if groups_to_add:
user.groups.add(*groups_to_add)
return 200, _authorization_payload(user)
@auth_router.get("/check-username", response=UsernameCheckSchema)
def check_username_availability(request, username: str):
return {"exists": User.objects.filter(username=username).exists()}

View File

@@ -1,5 +1,6 @@
from ninja import Router
from apps.analytics.api.views import analytics_router
from apps.blog.api.views import blog_router
from apps.certificates.api.views import certificates_router
from apps.communications.api.views import communications_router
@@ -12,6 +13,7 @@ from apps.users.api.views import auth_router
from core.api.views import health_router
router = Router()
router.add_router("analytics/", analytics_router, tags=["Analytics"])
router.add_router("auth/", auth_router, tags=["Authentication"])
router.add_router("blog/", blog_router, tags=["Blog"])
router.add_router("gallery/", gallery_router, tags=["Gallery"])