Compare commits

..

17 Commits

Author SHA1 Message Date
08cab3b815 feat(analytics): expose full dashboard result groups
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-15 17:33:30 +03:30
a326d2e31d fix(analytics): normalize dashboard entry years 2026-06-15 17:26:57 +03:30
38e7baef9c feat(analytics): split dashboard metrics by domain
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-15 16:17:48 +03:30
8669e99ca5 feat(analytics): add admin dashboard api
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-14 09:51:46 +03:30
c2abcd7b97 feat(payments): add discount code admin API
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-14 00:03:57 +03:30
bdc4fc1a49 feat(events): expand admin event management APIs 2026-06-14 00:03:42 +03:30
20e7a04e59 feat(users): add paginated admin metadata APIs 2026-06-14 00:03:27 +03:30
0151497385 feat(blog): add production taxonomy seed commands
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-13 00:18:05 +03:30
8b307196da feat(blog): expose review feedback
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-13 00:00:10 +03:30
690dc7b600 fix(blog): limit category nesting depth 2026-06-12 21:35:04 +03:30
9acab4af2c feat(users): add authorization management APIs 2026-06-12 15:08:19 +03:30
7cbc99a82f feat(blog): add admin taxonomy APIs 2026-06-12 15:08:07 +03:30
36aef98986 fix(blog): filter profile activity results
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-12 11:20:55 +03:30
029f0c7b8d feat(blog): notify users about comment activity 2026-06-12 11:20:47 +03:30
41f9be4c7e fix(users): require mobile for superuser creation
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-06-11 21:20:59 +03:30
13ea129d3a feat(blog): add mock data seed command 2026-06-11 21:20:51 +03:30
5045f8da47 feat(blog): expand publishing and moderation APIs 2026-06-11 21:20:44 +03:30
26 changed files with 3363 additions and 85 deletions

View File

View File

View File

@@ -0,0 +1,212 @@
from typing import Literal
from ninja import Schema
class AnalyticsPointSchema(Schema):
label: str
value: int | float
class AnalyticsPointGroupSchema(Schema):
items: list[AnalyticsPointSchema]
top_items: list[AnalyticsPointSchema]
other_count: int = 0
total_count: int = 0
class AnalyticsTrendPointSchema(Schema):
date: str
label: str
value: int | float
class AnalyticsRegistrationStatusSchema(Schema):
status: str
label: str
value: int
class AnalyticsTopEventSchema(Schema):
id: int
title: str
slug: str
attendees: int
capacity: int | None = None
fill_rate: float | None = None
revenue: int = 0
class AnalyticsPostPopularitySchema(Schema):
id: int
title: str
slug: str
likes: int
saves: int
comments: int
class AnalyticsPostPopularityGroupSchema(Schema):
items: list[AnalyticsPostPopularitySchema]
top_items: list[AnalyticsPostPopularitySchema]
other_count: int = 0
total_count: int = 0
class AnalyticsTopPostSchema(AnalyticsPostPopularitySchema):
score: int
class AnalyticsSummarySchema(Schema):
total_users: int
verified_users: int
total_events: int
total_registrations: int
total_revenue: int
total_discount: int
published_posts: int
total_likes: int
total_saves: int
total_comments: int
class AnalyticsUsersSchema(Schema):
signup_trend: list[AnalyticsTrendPointSchema]
by_major: list[AnalyticsPointSchema]
by_university: list[AnalyticsPointSchema]
by_year: list[AnalyticsPointSchema]
class AnalyticsEventsSchema(Schema):
registration_status: list[AnalyticsRegistrationStatusSchema]
by_major: list[AnalyticsPointSchema]
by_university: list[AnalyticsPointSchema]
top_events: list[AnalyticsTopEventSchema]
registration_trend: list[AnalyticsTrendPointSchema]
class AnalyticsRevenueSchema(Schema):
trend: list[AnalyticsTrendPointSchema]
by_event: list[AnalyticsPointSchema]
payment_status: list[AnalyticsRegistrationStatusSchema]
total_paid: int
total_discount: int
total_base: int
class AnalyticsBlogSchema(Schema):
totals: dict[str, int]
post_popularity: list[AnalyticsPostPopularitySchema]
top_posts: list[AnalyticsTopPostSchema]
activity_trend: list[dict[str, int | str]]
by_category: list[AnalyticsPointSchema]
by_tag: list[AnalyticsPointSchema]
class AnalyticsAchievementsSchema(Schema):
distinct_participants: int
learning_hours: float
published_content: int
community_engagement: int
class AnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
event_id: int | None = None
granularity: Literal["day", "week", "month"]
class AdminDashboardAnalyticsSchema(Schema):
filters: AnalyticsFiltersSchema
summary: AnalyticsSummarySchema
users: AnalyticsUsersSchema
events: AnalyticsEventsSchema
revenue: AnalyticsRevenueSchema
blog: AnalyticsBlogSchema
achievements: AnalyticsAchievementsSchema
class AnalyticsEventOptionSchema(Schema):
value: str
label: str
description: str | None = None
class AnalyticsEventOptionsSchema(Schema):
count: int
results: list[AnalyticsEventOptionSchema]
class UserAnalyticsSummarySchema(Schema):
total_users: int
verified_users: int
unverified_users: int
profile_completion_rate: float
class UserAnalyticsSchema(Schema):
filters: AnalyticsFiltersSchema
summary: UserAnalyticsSummarySchema
signup_trend: list[AnalyticsTrendPointSchema]
by_major: AnalyticsPointGroupSchema
by_university: AnalyticsPointGroupSchema
by_year: AnalyticsPointGroupSchema
class EventAnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
event_id: int | None = None
class EventAnalyticsSummarySchema(Schema):
total_events: int
total_registrations: int
distinct_participants: int
total_revenue: int
total_discount: int
total_base: int
learning_hours: float
class AnalyticsTopEventGroupSchema(Schema):
top_items: list[AnalyticsTopEventSchema]
other_count: int = 0
total_count: int = 0
class EventAnalyticsSchema(Schema):
filters: EventAnalyticsFiltersSchema
summary: EventAnalyticsSummarySchema
registration_status: list[AnalyticsRegistrationStatusSchema]
payment_status: list[AnalyticsRegistrationStatusSchema]
attendee_by_major: AnalyticsPointGroupSchema
attendee_by_university: AnalyticsPointGroupSchema
registration_trend: list[AnalyticsTrendPointSchema]
revenue_trend: list[AnalyticsTrendPointSchema]
revenue_by_event: AnalyticsPointGroupSchema
top_events: AnalyticsTopEventGroupSchema
class BlogAnalyticsFiltersSchema(Schema):
date_from: str | None = None
date_to: str | None = None
class BlogAnalyticsSummarySchema(Schema):
published_posts: int
total_likes: int
total_saves: int
total_comments: int
community_engagement: int
class BlogAnalyticsSchema(Schema):
filters: BlogAnalyticsFiltersSchema
summary: BlogAnalyticsSummarySchema
activity_trend: list[dict[str, int | str]]
post_popularity: AnalyticsPostPopularityGroupSchema
top_posts: list[AnalyticsTopPostSchema]
by_category: AnalyticsPointGroupSchema
by_tag: AnalyticsPointGroupSchema

843
apps/analytics/api/views.py Normal file
View File

@@ -0,0 +1,843 @@
from __future__ import annotations
from datetime import datetime, time
from typing import Literal
from django.contrib.auth import get_user_model
from django.db.models import Count, Q, Sum
from django.db.models.functions import Coalesce, TruncDay, TruncMonth, TruncWeek
from django.shortcuts import get_object_or_404
from django.utils import timezone
from django.utils.dateparse import parse_date, parse_datetime
from ninja import Router
from ninja.errors import HttpError
from apps.analytics.api.schemas import (
AdminDashboardAnalyticsSchema,
AnalyticsEventOptionsSchema,
BlogAnalyticsSchema,
EventAnalyticsSchema,
UserAnalyticsSchema,
)
from apps.blog.models import Category, Comment, Like, Post, SavedPost, Tag
from apps.events.models import Event, Registration
from apps.payments.models import Payment
from core.authentication import jwt_auth
analytics_router = Router()
UNKNOWN_LABEL = "نامشخص"
TOP_ITEMS_LIMIT = 12
PARTICIPANT_STATUSES = [
Registration.StatusChoices.CONFIRMED,
Registration.StatusChoices.ATTENDED,
]
GRANULARITY_TRUNC = {
"day": TruncDay,
"week": TruncWeek,
"month": TruncMonth,
}
REGISTRATION_STATUS_LABELS = {
Registration.StatusChoices.PENDING: "در انتظار",
Registration.StatusChoices.CONFIRMED: "تایید شده",
Registration.StatusChoices.CANCELLED: "لغو شده",
Registration.StatusChoices.ATTENDED: "حاضر شده",
}
PAYMENT_STATUS_LABELS = {
Payment.OrderStatusChoices.INIT: "ایجاد شده",
Payment.OrderStatusChoices.PENDING: "در انتظار پرداخت",
Payment.OrderStatusChoices.PAID: "پرداخت موفق",
Payment.OrderStatusChoices.FAILED: "پرداخت ناموفق",
Payment.OrderStatusChoices.CANCELED: "لغو شده",
}
def _parse_boundary(value: str | None, *, is_end: bool = False) -> datetime | None:
if not value:
return None
parsed_datetime = parse_datetime(value)
if parsed_datetime is None:
parsed_date = parse_date(value)
if parsed_date is None:
raise HttpError(400, "Invalid date filter.")
parsed_datetime = datetime.combine(parsed_date, time.max if is_end else time.min)
if timezone.is_naive(parsed_datetime):
parsed_datetime = timezone.make_aware(parsed_datetime, timezone.get_current_timezone())
return parsed_datetime
def _apply_range(queryset, field: str, start: datetime | None, end: datetime | None):
if start:
queryset = queryset.filter(**{f"{field}__gte": start})
if end:
queryset = queryset.filter(**{f"{field}__lte": end})
return queryset
def _auto_granularity(start: datetime | None, end: datetime | None) -> Literal["day", "week", "month"]:
if not start or not end:
return "month"
days = max((end - start).days, 1)
if days <= 45:
return "day"
if days <= 180:
return "week"
return "month"
def _label(value) -> str:
return str(value or UNKNOWN_LABEL)
def _normalize_entry_year(value) -> str:
if value in (None, ""):
return UNKNOWN_LABEL
raw = str(value).strip()
normalized = (
raw.translate(str.maketrans("۰۱۲۳۴۵۶۷۸۹٠١٢٣٤٥٦٧٨٩", "01234567890123456789"))
.replace(",", "")
.replace("٬", "")
)
try:
year = int(normalized)
except (TypeError, ValueError):
return UNKNOWN_LABEL
if 1390 <= year <= 1499:
return str(year)
if 400 <= year <= 499:
return str(1000 + year)
if 90 <= year <= 99:
return str(1300 + year)
return UNKNOWN_LABEL
def _point_group_from_rows(rows: list[dict], *, limit: int = TOP_ITEMS_LIMIT):
sorted_rows = sorted(rows, key=lambda item: (-int(item["value"] or 0), str(item["label"])))
other_rows = sorted_rows[limit:]
items = [
{"label": _label(item["label"]), "value": int(item["value"] or 0)}
for item in sorted_rows
]
return {
"items": items,
"top_items": items[:limit],
"other_count": len(other_rows),
"total_count": len(sorted_rows),
}
def _point_queryset(queryset, label_field: str, *, limit: int = 12):
return [
{"label": _label(item.get(label_field)), "value": item["value"]}
for item in queryset.values(label_field).annotate(value=Count("id")).order_by("-value", label_field)[:limit]
]
def _point_group_queryset(queryset, label_field: str, *, limit: int = TOP_ITEMS_LIMIT, sum_field: str | None = None):
if sum_field:
rows = list(
queryset.values(label_field)
.annotate(value=Coalesce(Sum(sum_field), 0))
.order_by("-value", label_field)
)
else:
rows = list(
queryset.values(label_field)
.annotate(value=Count("id"))
.order_by("-value", label_field)
)
total_count = len(rows)
other_rows = rows[limit:]
items = [
{"label": _label(item.get(label_field)), "value": int(item["value"] or 0)}
for item in rows
]
return {
"items": items,
"top_items": items[:limit],
"other_count": len(other_rows),
"total_count": total_count,
}
def _entry_year_group_queryset(queryset, *, limit: int = TOP_ITEMS_LIMIT):
buckets: dict[str, int] = {}
for item in queryset.values("year_of_study").annotate(value=Count("id")):
label = _normalize_entry_year(item.get("year_of_study"))
buckets[label] = buckets.get(label, 0) + int(item["value"] or 0)
return _point_group_from_rows(
[{"label": label, "value": value} for label, value in buckets.items()],
limit=limit,
)
def _trend_queryset(queryset, field: str, granularity: str):
trunc = GRANULARITY_TRUNC[granularity]
rows = (
queryset.annotate(bucket=trunc(field))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
)
return [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": item["value"],
}
for item in rows
]
def _trend_sum_queryset(queryset, field: str, sum_field: str, granularity: str):
trunc = GRANULARITY_TRUNC[granularity]
rows = (
queryset.annotate(bucket=trunc(field))
.values("bucket")
.annotate(value=Coalesce(Sum(sum_field), 0))
.order_by("bucket")
)
return [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": int(item["value"] or 0),
}
for item in rows
]
def _sum(queryset, field: str) -> int:
return int(queryset.aggregate(total=Coalesce(Sum(field), 0))["total"] or 0)
def _status_label(value) -> str:
try:
return PAYMENT_STATUS_LABELS.get(Payment.OrderStatusChoices(value), str(value))
except ValueError:
return str(value)
def _registration_status_label(value: str) -> str:
try:
return REGISTRATION_STATUS_LABELS.get(Registration.StatusChoices(value), value)
except ValueError:
return value
def _event_learning_hours(events_queryset) -> float:
learning_hours = 0.0
for event in events_queryset.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
)
):
duration = event.end_time - event.start_time
learning_hours += max(duration.total_seconds(), 0) / 3600 * event.attendees
return round(learning_hours, 1)
def _filters_payload(start: datetime | None, end: datetime | None, *, event_id: int | None = None, include_granularity: bool = False):
payload = {
"date_from": start.isoformat() if start else None,
"date_to": end.isoformat() if end else None,
}
if event_id is not None:
payload["event_id"] = event_id
if include_granularity:
payload["granularity"] = _auto_granularity(start, end)
return payload
def _require_staff(user):
if not (user and (user.is_staff or user.is_superuser)):
raise HttpError(403, "اجازه دسترسی ندارید.")
@analytics_router.get("/admin/events/options", response=AnalyticsEventOptionsSchema, auth=jwt_auth)
def admin_event_options(
request,
search: str | None = None,
limit: int = 20,
offset: int = 0,
):
_require_staff(request.auth)
safe_limit = max(1, min(limit, 50))
safe_offset = max(offset, 0)
queryset = Event.objects.filter(is_deleted=False)
if search:
queryset = queryset.filter(Q(title__icontains=search) | Q(slug__icontains=search))
queryset = queryset.order_by("-start_time", "-id")
count = queryset.count()
results = [
{
"value": str(event.id),
"label": event.title,
"description": event.start_time.date().isoformat() if event.start_time else None,
}
for event in queryset[safe_offset : safe_offset + safe_limit]
]
return {"count": count, "results": results}
@analytics_router.get("/admin/users", response=UserAnalyticsSchema, auth=jwt_auth)
def admin_users_analytics(request, date_from: str | None = None, date_to: str | None = None):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
User = get_user_model()
users_qs = _apply_range(User.objects.filter(is_deleted=False), "date_joined", start, end)
total_users = users_qs.count()
verified_users = users_qs.filter(is_mobile_verified=True).count()
completed_profiles = users_qs.exclude(first_name="").exclude(last_name="").filter(
mobile__isnull=False,
major__isnull=False,
university__isnull=False,
).count()
return {
"filters": _filters_payload(start, end, include_granularity=True),
"summary": {
"total_users": total_users,
"verified_users": verified_users,
"unverified_users": max(total_users - verified_users, 0),
"profile_completion_rate": round((completed_profiles / total_users) * 100, 1) if total_users else 0,
},
"signup_trend": _trend_queryset(users_qs, "date_joined", granularity),
"by_major": _point_group_queryset(users_qs, "major__name"),
"by_university": _point_group_queryset(users_qs, "university__name"),
"by_year": _entry_year_group_queryset(users_qs),
}
@analytics_router.get("/admin/events", response=EventAnalyticsSchema, auth=jwt_auth)
def admin_events_analytics(
request,
date_from: str | None = None,
date_to: str | None = None,
event_id: int | None = None,
):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
selected_event = None
if event_id is not None:
selected_event = get_object_or_404(Event, id=event_id, is_deleted=False)
events_qs = Event.objects.filter(is_deleted=False)
if selected_event:
events_qs = events_qs.filter(id=selected_event.id)
else:
events_qs = _apply_range(events_qs, "start_time", start, end)
registrations_qs = _apply_range(
Registration.objects.filter(is_deleted=False).select_related("user", "event", "user__major", "user__university"),
"registered_at",
start,
end,
)
if selected_event:
registrations_qs = registrations_qs.filter(event_id=selected_event.id)
participant_qs = registrations_qs.filter(status__in=PARTICIPANT_STATUSES)
paid_payments_qs = Payment.objects.filter(is_deleted=False, status=Payment.OrderStatusChoices.PAID, verified_at__isnull=False)
paid_payments_qs = _apply_range(paid_payments_qs.select_related("event"), "verified_at", start, end)
all_payments_qs = _apply_range(Payment.objects.filter(is_deleted=False), "created_at", start, end)
if selected_event:
paid_payments_qs = paid_payments_qs.filter(event_id=selected_event.id)
all_payments_qs = all_payments_qs.filter(event_id=selected_event.id)
registration_status = [
{"status": item["status"], "label": _registration_status_label(item["status"]), "value": item["value"]}
for item in registrations_qs.values("status").annotate(value=Count("id")).order_by("status")
]
payment_status = [
{"status": str(item["status"]), "label": _status_label(item["status"]), "value": item["value"]}
for item in all_payments_qs.values("status").annotate(value=Count("id")).order_by("status")
]
top_events_qs = Event.objects.filter(is_deleted=False)
if selected_event:
top_events_qs = top_events_qs.filter(id=selected_event.id)
top_events_annotated = list(
top_events_qs.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
),
revenue=Coalesce(
Sum(
"payments__amount",
filter=Q(payments__is_deleted=False, payments__status=Payment.OrderStatusChoices.PAID),
),
0,
),
).order_by("-attendees", "-revenue", "-start_time")
)
top_events = []
for event in top_events_annotated[:TOP_ITEMS_LIMIT]:
fill_rate = round((event.attendees / event.capacity) * 100, 1) if event.capacity else None
top_events.append(
{
"id": event.id,
"title": event.title,
"slug": event.slug,
"attendees": event.attendees,
"capacity": event.capacity,
"fill_rate": fill_rate,
"revenue": int(event.revenue or 0),
}
)
total_revenue = _sum(paid_payments_qs, "amount")
total_discount = _sum(paid_payments_qs, "discount_amount")
total_base = _sum(paid_payments_qs, "base_amount")
return {
"filters": _filters_payload(start, end, event_id=event_id),
"summary": {
"total_events": events_qs.count(),
"total_registrations": registrations_qs.count(),
"distinct_participants": participant_qs.values("user_id").distinct().count(),
"total_revenue": total_revenue,
"total_discount": total_discount,
"total_base": total_base,
"learning_hours": _event_learning_hours(events_qs),
},
"registration_status": registration_status,
"payment_status": payment_status,
"attendee_by_major": _point_group_queryset(participant_qs, "user__major__name"),
"attendee_by_university": _point_group_queryset(participant_qs, "user__university__name"),
"registration_trend": _trend_queryset(registrations_qs, "registered_at", granularity),
"revenue_trend": _trend_sum_queryset(paid_payments_qs, "verified_at", "amount", granularity),
"revenue_by_event": _point_group_queryset(paid_payments_qs, "event__title", sum_field="amount"),
"top_events": {
"top_items": top_events,
"other_count": max(len(top_events_annotated) - TOP_ITEMS_LIMIT, 0),
"total_count": len(top_events_annotated),
},
}
@analytics_router.get("/admin/blog", response=BlogAnalyticsSchema, auth=jwt_auth)
def admin_blog_analytics(request, date_from: str | None = None, date_to: str | None = None):
_require_staff(request.auth)
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
granularity = _auto_granularity(start, end)
published_posts_qs = _apply_range(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED),
"published_at",
start,
end,
)
visible_comments_qs = _apply_range(
Comment.objects.filter(
is_deleted=False,
is_hidden=False,
is_approved=True,
post__is_deleted=False,
post__status=Post.StatusChoices.PUBLISHED,
),
"created_at",
start,
end,
)
likes_qs = _apply_range(
Like.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
saves_qs = _apply_range(
SavedPost.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
like_filter = Q()
save_filter = Q()
comment_filter = Q(comments__is_deleted=False, comments__is_hidden=False, comments__is_approved=True)
if start:
like_filter &= Q(likes__created_at__gte=start)
save_filter &= Q(saves__created_at__gte=start)
comment_filter &= Q(comments__created_at__gte=start)
if end:
like_filter &= Q(likes__created_at__lte=end)
save_filter &= Q(saves__created_at__lte=end)
comment_filter &= Q(comments__created_at__lte=end)
post_popularity_all = list(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED)
.annotate(
likes_total=Count("likes", filter=like_filter, distinct=True),
saves_total=Count("saves", filter=save_filter, distinct=True),
comments_total=Count("comments", filter=comment_filter, distinct=True),
)
.filter(Q(likes_total__gt=0) | Q(saves_total__gt=0) | Q(comments_total__gt=0))
.order_by("-likes_total", "-saves_total", "-comments_total", "-published_at")
)
post_popularity = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_all[:TOP_ITEMS_LIMIT]
]
post_popularity_items = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_all
]
top_posts = [
{
**post,
"score": post["likes"] + post["saves"] + post["comments"],
}
for post in sorted(post_popularity, key=lambda item: item["likes"] + item["saves"] + item["comments"], reverse=True)
]
activity_buckets: dict[str, dict[str, int | str]] = {}
for key, qs in (("likes", likes_qs), ("saves", saves_qs), ("comments", visible_comments_qs)):
for item in (
qs.annotate(bucket=GRANULARITY_TRUNC[granularity]("created_at"))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
):
bucket = item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL
activity_buckets.setdefault(bucket, {"date": bucket, "likes": 0, "saves": 0, "comments": 0})
activity_buckets[bucket][key] = item["value"]
total_likes = likes_qs.count()
total_saves = saves_qs.count()
total_comments = visible_comments_qs.count()
return {
"filters": _filters_payload(start, end),
"summary": {
"published_posts": published_posts_qs.count(),
"total_likes": total_likes,
"total_saves": total_saves,
"total_comments": total_comments,
"community_engagement": total_likes + total_saves + total_comments,
},
"activity_trend": list(activity_buckets.values()),
"post_popularity": {
"items": post_popularity_items,
"top_items": post_popularity,
"other_count": max(len(post_popularity_all) - TOP_ITEMS_LIMIT, 0),
"total_count": len(post_popularity_all),
},
"top_posts": top_posts,
"by_category": _point_group_queryset(published_posts_qs, "category__name"),
"by_tag": _point_group_queryset(published_posts_qs.filter(tags__is_deleted=False), "tags__name"),
}
@analytics_router.get("/admin/dashboard", response=AdminDashboardAnalyticsSchema, auth=jwt_auth)
def admin_dashboard(
request,
date_from: str | None = None,
date_to: str | None = None,
event_id: int | None = None,
granularity: Literal["day", "week", "month", "auto"] = "auto",
):
user = request.auth
if not (user and (user.is_staff or user.is_superuser)):
raise HttpError(403, "اجازه دسترسی ندارید.")
start = _parse_boundary(date_from)
end = _parse_boundary(date_to, is_end=True)
if start and end and start > end:
raise HttpError(400, "Start date must be before end date.")
selected_granularity = _auto_granularity(start, end) if granularity == "auto" else granularity
if event_id is not None:
get_object_or_404(Event, id=event_id, is_deleted=False)
User = get_user_model()
users_qs = _apply_range(User.objects.filter(is_deleted=False), "date_joined", start, end)
events_qs = _apply_range(Event.objects.filter(is_deleted=False), "start_time", start, end)
registrations_qs = _apply_range(
Registration.objects.filter(is_deleted=False).select_related("user", "event", "user__major", "user__university"),
"registered_at",
start,
end,
)
if event_id is not None:
registrations_qs = registrations_qs.filter(event_id=event_id)
paid_payments_qs = Payment.objects.filter(is_deleted=False, status=Payment.OrderStatusChoices.PAID, verified_at__isnull=False)
paid_payments_qs = _apply_range(paid_payments_qs.select_related("event"), "verified_at", start, end)
all_payments_qs = _apply_range(Payment.objects.filter(is_deleted=False), "created_at", start, end)
if event_id is not None:
paid_payments_qs = paid_payments_qs.filter(event_id=event_id)
all_payments_qs = all_payments_qs.filter(event_id=event_id)
published_posts_qs = _apply_range(
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED),
"published_at",
start,
end,
)
visible_comments_qs = _apply_range(
Comment.objects.filter(
is_deleted=False,
is_hidden=False,
is_approved=True,
post__is_deleted=False,
post__status=Post.StatusChoices.PUBLISHED,
),
"created_at",
start,
end,
)
likes_qs = _apply_range(
Like.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
saves_qs = _apply_range(
SavedPost.objects.filter(post__is_deleted=False, post__status=Post.StatusChoices.PUBLISHED),
"created_at",
start,
end,
)
participant_qs = registrations_qs.filter(status__in=PARTICIPANT_STATUSES)
registration_status = [
{"status": item["status"], "label": _registration_status_label(item["status"]), "value": item["value"]}
for item in registrations_qs.values("status").annotate(value=Count("id")).order_by("status")
]
payment_status = [
{"status": str(item["status"]), "label": _status_label(item["status"]), "value": item["value"]}
for item in all_payments_qs.values("status").annotate(value=Count("id")).order_by("status")
]
top_events_qs = Event.objects.filter(is_deleted=False)
if event_id is not None:
top_events_qs = top_events_qs.filter(id=event_id)
top_events = []
for event in (
top_events_qs.annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
),
revenue=Coalesce(
Sum(
"payments__amount",
filter=Q(payments__is_deleted=False, payments__status=Payment.OrderStatusChoices.PAID),
),
0,
),
)
.order_by("-attendees", "-revenue", "-start_time")[:10]
):
fill_rate = round((event.attendees / event.capacity) * 100, 1) if event.capacity else None
top_events.append(
{
"id": event.id,
"title": event.title,
"slug": event.slug,
"attendees": event.attendees,
"capacity": event.capacity,
"fill_rate": fill_rate,
"revenue": int(event.revenue or 0),
}
)
revenue_trend = [
{
"date": item["bucket"].date().isoformat() if item["bucket"] else "",
"label": item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL,
"value": int(item["value"] or 0),
}
for item in (
paid_payments_qs.annotate(bucket=GRANULARITY_TRUNC[selected_granularity]("verified_at"))
.values("bucket")
.annotate(value=Coalesce(Sum("amount"), 0))
.order_by("bucket")
)
]
revenue_by_event = [
{"label": _label(item["event__title"]), "value": int(item["value"] or 0)}
for item in (
paid_payments_qs.values("event__title")
.annotate(value=Coalesce(Sum("amount"), 0))
.order_by("-value", "event__title")[:10]
)
]
post_popularity_qs = (
Post.objects.filter(is_deleted=False, status=Post.StatusChoices.PUBLISHED)
.annotate(
likes_total=Count("likes", distinct=True),
saves_total=Count("saves", distinct=True),
comments_total=Count(
"comments",
filter=Q(comments__is_deleted=False, comments__is_hidden=False, comments__is_approved=True),
distinct=True,
),
)
.order_by("-likes_total", "-saves_total", "-comments_total")[:30]
)
post_popularity = [
{
"id": post.id,
"title": post.title,
"slug": post.slug,
"likes": post.likes_total,
"saves": post.saves_total,
"comments": post.comments_total,
}
for post in post_popularity_qs
]
top_posts = [
{
**post,
"score": post["likes"] + post["saves"] + post["comments"],
}
for post in sorted(post_popularity, key=lambda item: item["likes"] + item["saves"] + item["comments"], reverse=True)[:10]
]
activity_buckets: dict[str, dict[str, int | str]] = {}
for key, qs in (("likes", likes_qs), ("saves", saves_qs), ("comments", visible_comments_qs)):
for item in (
qs.annotate(bucket=GRANULARITY_TRUNC[selected_granularity]("created_at"))
.values("bucket")
.annotate(value=Count("id"))
.order_by("bucket")
):
bucket = item["bucket"].date().isoformat() if item["bucket"] else UNKNOWN_LABEL
activity_buckets.setdefault(bucket, {"date": bucket, "likes": 0, "saves": 0, "comments": 0})
activity_buckets[bucket][key] = item["value"]
category_engagement = [
{"label": _label(item["name"]), "value": item["value"]}
for item in (
Category.objects.filter(is_deleted=False, posts__status=Post.StatusChoices.PUBLISHED, posts__is_deleted=False)
.annotate(value=Count("posts", distinct=True))
.order_by("-value", "name")[:10]
.values("name", "value")
)
]
tag_engagement = [
{"label": _label(item["name"]), "value": item["value"]}
for item in (
Tag.objects.filter(is_deleted=False, posts__status=Post.StatusChoices.PUBLISHED, posts__is_deleted=False)
.annotate(value=Count("posts", distinct=True))
.order_by("-value", "name")[:10]
.values("name", "value")
)
]
learning_hours = 0.0
for event in Event.objects.filter(is_deleted=False).annotate(
attendees=Count(
"registrations",
filter=Q(registrations__is_deleted=False, registrations__status__in=PARTICIPANT_STATUSES),
distinct=True,
)
):
duration = event.end_time - event.start_time
learning_hours += max(duration.total_seconds(), 0) / 3600 * event.attendees
total_revenue = _sum(paid_payments_qs, "amount")
total_discount = _sum(paid_payments_qs, "discount_amount")
total_base = _sum(paid_payments_qs, "base_amount")
total_likes = likes_qs.count()
total_saves = saves_qs.count()
total_comments = visible_comments_qs.count()
published_posts_count = published_posts_qs.count()
return {
"filters": {
"date_from": start.isoformat() if start else None,
"date_to": end.isoformat() if end else None,
"event_id": event_id,
"granularity": selected_granularity,
},
"summary": {
"total_users": users_qs.count(),
"verified_users": users_qs.filter(is_mobile_verified=True).count(),
"total_events": events_qs.count(),
"total_registrations": registrations_qs.count(),
"total_revenue": total_revenue,
"total_discount": total_discount,
"published_posts": published_posts_count,
"total_likes": total_likes,
"total_saves": total_saves,
"total_comments": total_comments,
},
"users": {
"signup_trend": _trend_queryset(users_qs, "date_joined", selected_granularity),
"by_major": _point_queryset(users_qs, "major__name"),
"by_university": _point_queryset(users_qs, "university__name"),
"by_year": _entry_year_group_queryset(users_qs)["top_items"],
},
"events": {
"registration_status": registration_status,
"by_major": _point_queryset(participant_qs, "user__major__name"),
"by_university": _point_queryset(participant_qs, "user__university__name"),
"top_events": top_events,
"registration_trend": _trend_queryset(registrations_qs, "registered_at", selected_granularity),
},
"revenue": {
"trend": revenue_trend,
"by_event": revenue_by_event,
"payment_status": payment_status,
"total_paid": total_revenue,
"total_discount": total_discount,
"total_base": total_base,
},
"blog": {
"totals": {
"posts": published_posts_count,
"likes": total_likes,
"saves": total_saves,
"comments": total_comments,
},
"post_popularity": post_popularity,
"top_posts": top_posts,
"activity_trend": list(activity_buckets.values()),
"by_category": category_engagement,
"by_tag": tag_engagement,
},
"achievements": {
"distinct_participants": participant_qs.values("user_id").distinct().count(),
"learning_hours": round(learning_hours, 1),
"published_content": published_posts_count,
"community_engagement": total_likes + total_saves + total_comments,
},
}

View File

@@ -4,22 +4,22 @@ from django.contrib import admin
from import_export.admin import ImportExportModelAdmin
from simplemde.widgets import SimpleMDEEditor
from apps.blog.models import Category, Tag, Post, PostAsset, Comment, Like, SavedPost
from apps.blog.models import BlogBanner, Category, Tag, Post, PostAsset, Comment, Like, SavedPost
from apps.blog.resources import PostResource, CategoryResource
from core.admin import SoftDeleteListFilter, BaseModelAdmin
@admin.register(Category)
class CategoryAdmin(BaseModelAdmin, ImportExportModelAdmin):
resource_class = CategoryResource
list_display = ('name', 'slug', 'created_at', 'is_deleted')
list_display = ('name', 'parent', 'slug', 'created_at', 'is_deleted')
list_filter = ('created_at', 'is_deleted', SoftDeleteListFilter)
search_fields = ('name', 'description')
search_fields = ('name', 'description', 'parent__name')
prepopulated_fields = {'slug': ('name',)}
readonly_fields = ('created_at', 'updated_at', 'deleted_at')
fieldsets = (
('Content', {
'fields': ('name', 'slug', 'description')
'fields': ('name', 'parent', 'slug', 'description')
}),
('Metadata', {
'fields': ('created_at', 'updated_at')
@@ -76,7 +76,7 @@ class PostAdmin(BaseModelAdmin, ImportExportModelAdmin):
list_filter = ('status', 'is_featured', 'category', 'tags', 'created_at', 'published_at', SoftDeleteListFilter)
search_fields = ('title', 'content', 'author__username')
prepopulated_fields = {'slug': ('title',)}
filter_horizontal = ('tags',)
filter_horizontal = ('tags', 'writers')
date_hierarchy = 'published_at'
fieldsets = (
@@ -87,7 +87,7 @@ class PostAdmin(BaseModelAdmin, ImportExportModelAdmin):
'fields': ('seo_title', 'seo_description', 'canonical_url', 'og_title', 'og_description', 'og_image', 'noindex', 'focus_keyword', 'reading_time')
}),
('Metadata', {
'fields': ('author', 'category', 'tags', 'status', 'is_featured', 'submitted_at', 'reviewed_at', 'reviewed_by', 'review_note', 'published_at', 'published_by')
'fields': ('author', 'writers', 'category', 'tags', 'status', 'is_featured', 'submitted_at', 'reviewed_at', 'reviewed_by', 'review_note', 'published_at', 'published_by')
}),
('Soft Delete', {
'fields': ('is_deleted', 'deleted_at'),
@@ -132,8 +132,8 @@ class PostAdmin(BaseModelAdmin, ImportExportModelAdmin):
@admin.register(Comment)
class CommentAdmin(BaseModelAdmin):
list_display = ('author', 'post', 'content_preview', 'is_approved', 'created_at')
list_filter = ('is_approved', 'created_at', 'post', SoftDeleteListFilter)
list_display = ('author', 'post', 'content_preview', 'is_approved', 'is_hidden', 'is_deleted', 'created_at')
list_filter = ('is_approved', 'is_hidden', 'created_at', 'post', SoftDeleteListFilter)
search_fields = ('content', 'author__username', 'author__last_name', 'author__first_name', 'post__title')
readonly_fields = ('content_preview', 'created_at', 'updated_at', 'deleted_at')
@@ -142,28 +142,35 @@ class CommentAdmin(BaseModelAdmin):
'fields': ('post', 'author', 'content')
}),
('Metadata', {
'fields': ('is_approved', 'hidden_by', 'hidden_at', 'moderation_note', 'created_at', 'updated_at')
'fields': ('is_approved', 'is_hidden', 'hidden_by', 'hidden_at', 'moderation_note', 'created_at', 'updated_at')
}),
('Soft Delete', {
'fields': ('is_deleted', 'deleted_at'),
'fields': ('is_deleted', 'deleted_at', 'deleted_by', 'delete_note'),
'classes': ('collapse',)
})
)
actions = BaseModelAdmin.actions + ['approve_comments', 'disapprove_comments']
actions = BaseModelAdmin.actions + ['approve_comments', 'hide_comments', 'unhide_comments']
def content_preview(self, obj):
return obj.content[:50] + '...' if len(obj.content) > 50 else obj.content
content_preview.short_description = 'Content Preview'
def approve_comments(self, request, queryset):
queryset.update(is_approved=True)
queryset.update(is_approved=True, is_hidden=False, hidden_by=None, hidden_at=None, moderation_note='')
self.message_user(request, f"Approved {queryset.count()} comments.")
approve_comments.short_description = "Approve selected comments"
def disapprove_comments(self, request, queryset):
queryset.update(is_approved=False)
self.message_user(request, f"Disapproved {queryset.count()} comments.")
disapprove_comments.short_description = "Disapprove selected comments"
def hide_comments(self, request, queryset):
for comment in queryset:
comment.hide(request.user)
self.message_user(request, f"Hidden {queryset.count()} comments.")
hide_comments.short_description = "Hide selected comments"
def unhide_comments(self, request, queryset):
for comment in queryset:
comment.unhide()
self.message_user(request, f"Restored {queryset.count()} comments.")
unhide_comments.short_description = "Unhide selected comments"
@admin.register(Like)
class LikeAdmin(admin.ModelAdmin):
@@ -185,3 +192,24 @@ class PostAssetAdmin(BaseModelAdmin):
list_filter = ('file_type', 'mime_type', 'created_at')
search_fields = ('title', 'caption', 'alt_text', 'post__title', 'uploaded_by__username')
readonly_fields = ('size', 'mime_type', 'created_at', 'updated_at', 'deleted_at')
@admin.register(BlogBanner)
class BlogBannerAdmin(BaseModelAdmin):
list_display = ('title', 'url', 'is_active', 'sort_order', 'created_at', 'is_deleted')
list_filter = ('is_active', 'created_at', 'is_deleted', SoftDeleteListFilter)
search_fields = ('title', 'alt_text', 'url')
readonly_fields = ('created_at', 'updated_at', 'deleted_at')
fieldsets = (
('Banner', {
'fields': ('title', 'alt_text', 'image', 'url', 'is_active', 'sort_order')
}),
('Metadata', {
'fields': ('created_at', 'updated_at')
}),
('Soft Delete', {
'fields': ('is_deleted', 'deleted_at'),
'classes': ('collapse',)
}),
)

View File

@@ -5,17 +5,52 @@ from typing import List, Optional
from ninja import ModelSchema, Schema
from apps.blog.models import Category, Comment, PostAsset, Tag
from apps.blog.models import BlogBanner, Category, Comment, PostAsset, Tag
from core.media import PREVIEW_VARIANT, THUMBNAIL_VARIANT, derivative_url
class CategorySchema(ModelSchema):
created_at: Optional[datetime] = None
parent_id: Optional[int] = None
class Config:
model = Category
model_fields = ["id", "name", "slug", "description", "created_at"]
@staticmethod
def resolve_parent_id(obj):
return obj.parent_id
class AdminCategorySchema(CategorySchema):
post_count: int = 0
@staticmethod
def resolve_post_count(obj):
return getattr(obj, "post_count", None) or obj.posts.filter(is_deleted=False).count()
class CategoryWriteSchema(Schema):
name: str
slug: Optional[str] = None
description: Optional[str] = ""
parent_id: Optional[int] = None
class CategoryPathSchema(Schema):
id: int
name: str
slug: str
class CategoryFilterSchema(Schema):
id: int
name: str
slug: str
parent_id: Optional[int] = None
post_count: int = 0
children: List["CategoryFilterSchema"] = []
class TagSchema(ModelSchema):
created_at: Optional[datetime] = None
@@ -25,11 +60,32 @@ class TagSchema(ModelSchema):
model_fields = ["id", "name", "slug", "created_at"]
class AdminTagSchema(TagSchema):
post_count: int = 0
@staticmethod
def resolve_post_count(obj):
return getattr(obj, "post_count", None) or obj.posts.filter(is_deleted=False).count()
class TagWriteSchema(Schema):
name: str
slug: Optional[str] = None
class TagFilterSchema(Schema):
id: int
name: str
slug: str
post_count: int = 0
class AuthorSchema(Schema):
id: int
username: str
first_name: str
last_name: str
bio: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
@@ -124,6 +180,8 @@ class PostListSchema(Schema):
status: str
published_at: Optional[datetime] = None
category: Optional[CategorySchema] = None
category_path: List[CategoryPathSchema] = []
writers: List[AuthorSchema] = []
tags: List[TagSchema]
is_featured: bool
created_at: datetime
@@ -159,6 +217,17 @@ class PostListSchema(Schema):
url = derivative_url(obj.featured_image, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_category_path(obj):
if not obj.category_id:
return []
return obj.category.path
@staticmethod
def resolve_writers(obj):
writers = list(obj.writers.all())
return writers or [obj.author]
@staticmethod
def resolve_likes_count(obj):
return getattr(obj, "likes_count", None) or obj.likes.count()
@@ -169,12 +238,17 @@ class PostListSchema(Schema):
@staticmethod
def resolve_comments_count(obj):
return getattr(obj, "comments_count", None) or obj.comments.filter(is_approved=True).count()
return getattr(obj, "comments_count", None) or obj.comments.filter(
is_approved=True,
is_hidden=False,
is_deleted=False,
).count()
class PostDetailSchema(PostListSchema):
content: str
content_html: str
review_note: Optional[str] = ""
og_image_url: Optional[str] = None
assets: List[PostAssetSchema] = []
@@ -192,6 +266,7 @@ class PostCreateSchema(Schema):
excerpt: Optional[str] = None
category_id: Optional[int] = None
tag_ids: Optional[List[int]] = []
writer_ids: Optional[List[int]] = None
status: str = "draft"
is_featured: bool = False
seo_title: Optional[str] = ""
@@ -221,10 +296,14 @@ class CommentSchema(ModelSchema):
post_title: str
post_slug: str
parent_id: Optional[int] = None
is_hidden: bool = False
is_deleted: bool = False
deleted_at: Optional[datetime] = None
hidden_replies_count: int = 0
class Config:
model = Comment
model_fields = ["id", "content", "created_at", "is_approved", "hidden_at"]
model_fields = ["id", "content", "created_at", "updated_at", "is_approved", "hidden_at"]
@staticmethod
def resolve_post_id(obj):
@@ -242,12 +321,22 @@ class CommentSchema(ModelSchema):
def resolve_parent_id(obj):
return obj.parent_id
@staticmethod
def resolve_hidden_replies_count(obj):
if not getattr(obj, "replies", None):
return 0
return sum(len(reply.replies.all()) for reply in obj.replies.all())
class CommentCreateSchema(Schema):
content: str
parent_id: Optional[int] = None
class CommentUpdateSchema(Schema):
content: str
class CommentHideSchema(Schema):
note: Optional[str] = ""
@@ -265,3 +354,30 @@ class BlogProfileActivitySchema(Schema):
saved_posts: List[PostListSchema]
comments: List[CommentSchema]
replies: List[CommentSchema]
class BlogBannerSchema(ModelSchema):
image_url: str
class Config:
model = BlogBanner
model_fields = ["id", "title", "alt_text", "url", "sort_order"]
@staticmethod
def resolve_image_url(obj, context):
request = context["request"]
return request.build_absolute_uri(obj.image.url) if obj.image else ""
class BlogFilterAuthorSchema(Schema):
id: int
username: str
first_name: str
last_name: str
post_count: int = 0
class BlogFiltersSchema(Schema):
categories: List[CategoryFilterSchema]
tags: List[TagFilterSchema]
authors: List[BlogFilterAuthorSchema]

View File

@@ -1,22 +1,32 @@
from __future__ import annotations
import logging
import mimetypes
from pathlib import Path
from typing import List, Optional
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import IntegrityError
from django.db.models import Count, Prefetch, Q
from django.shortcuts import get_object_or_404
from django.utils import timezone
from ninja import File, Form, Query, Router, UploadedFile
from apps.blog.api.schemas import (
AdminCategorySchema,
AdminTagSchema,
BlogBannerSchema,
BlogFiltersSchema,
BlogInteractionSchema,
BlogProfileActivitySchema,
CategorySchema,
CategoryWriteSchema,
CommentCreateSchema,
CommentHideSchema,
CommentSchema,
CommentUpdateSchema,
AuthorSchema,
PostAssetCreateSchema,
PostAssetSchema,
PostCreateSchema,
@@ -24,8 +34,9 @@ from apps.blog.api.schemas import (
PostListSchema,
PostReviewSchema,
TagSchema,
TagWriteSchema,
)
from apps.blog.models import Category, Comment, Like, Post, PostAsset, SavedPost, Tag
from apps.blog.models import BlogBanner, Category, Comment, Like, Post, PostAsset, SavedPost, Tag
from apps.blog.permissions import (
can_access_blog_admin,
can_edit_post,
@@ -34,11 +45,14 @@ from apps.blog.permissions import (
can_review_blog_posts,
can_write_blog_posts,
)
from apps.notifications.services import notify_user
from core.api.schemas import ErrorSchema, MessageSchema
from core.authentication import jwt_auth
blog_router = Router()
User = get_user_model()
logger = logging.getLogger(__name__)
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".svg"}
VIDEO_EXTENSIONS = {".mp4", ".webm", ".ogg", ".mov", ".mkv", ".avi"}
@@ -49,11 +63,15 @@ ARCHIVE_EXTENSIONS = {".zip", ".rar", ".7z", ".tar", ".gz", ".bz2"}
def _post_queryset():
return (
Post.objects.select_related("author", "category", "reviewed_by", "published_by")
.prefetch_related("tags", "assets__uploaded_by")
.prefetch_related("tags", "writers", "assets__uploaded_by")
.annotate(
likes_count=Count("likes", distinct=True),
saves_count=Count("saves", distinct=True),
comments_count=Count("comments", filter=Q(comments__is_approved=True), distinct=True),
comments_count=Count(
"comments",
filter=Q(comments__is_approved=True, comments__is_hidden=False, comments__is_deleted=False),
distinct=True,
),
)
)
@@ -62,6 +80,88 @@ def _published_queryset():
return _post_queryset().filter(status=Post.StatusChoices.PUBLISHED)
def _optional_auth_user(request):
auth_header = request.headers.get("Authorization", "")
if not auth_header.lower().startswith("bearer "):
return None
token = auth_header.split(" ", 1)[1].strip()
return jwt_auth.authenticate(request, token)
def _query_values(request, key: str, fallback: Optional[str] = None) -> list[str]:
values = request.GET.getlist(key)
if fallback and fallback not in values:
values.append(fallback)
cleaned: list[str] = []
for value in values:
for item in str(value).split(","):
item = item.strip()
if item and item not in cleaned:
cleaned.append(item)
return cleaned
def _category_and_descendant_ids(slug: str) -> list[int]:
categories = list(Category.objects.values("id", "parent_id", "slug"))
target = next((category for category in categories if category["slug"] == slug), None)
if not target:
return []
children_by_parent: dict[int, list[int]] = {}
for category in categories:
parent_id = category["parent_id"]
if parent_id:
children_by_parent.setdefault(parent_id, []).append(category["id"])
selected = [target["id"]]
pending = [target["id"]]
while pending:
next_pending: list[int] = []
for parent_id in pending:
next_pending.extend(children_by_parent.get(parent_id, []))
selected.extend(next_pending)
pending = next_pending
return selected
def _comment_visibility_filter(user=None) -> Q:
if user and can_moderate_blog_comments(user):
return Q(is_deleted=False) & (Q(is_approved=True, is_hidden=False) | Q(is_hidden=True))
return Q(is_deleted=False, is_approved=True, is_hidden=False)
def _build_category_filter_tree():
categories = list(
Category.objects.annotate(
post_count=Count(
"posts",
filter=Q(posts__status=Post.StatusChoices.PUBLISHED, posts__is_deleted=False),
distinct=True,
)
).order_by("name")
)
nodes = {
category.id: {
"id": category.id,
"name": category.name,
"slug": category.slug,
"parent_id": category.parent_id,
"post_count": category.post_count,
"children": [],
}
for category in categories
}
roots = []
for category in categories:
node = nodes[category.id]
if category.parent_id and category.parent_id in nodes:
nodes[category.parent_id]["children"].append(node)
else:
roots.append(node)
return roots
def _asset_file_type(file: UploadedFile) -> str:
suffix = Path(file.name).suffix.lower()
content_type = (file.content_type or mimetypes.guess_type(file.name)[0] or "").lower()
@@ -109,6 +209,7 @@ def _validate_featured_image(file: UploadedFile) -> str | None:
def _apply_post_payload(post: Post, data: PostCreateSchema, *, user, allow_status: bool = False) -> Post:
payload = data.dict(exclude_unset=True)
tag_ids = payload.pop("tag_ids", None)
writer_ids = payload.pop("writer_ids", None)
category_id = payload.pop("category_id", None)
requested_status = payload.pop("status", None)
@@ -126,6 +227,14 @@ def _apply_post_payload(post: Post, data: PostCreateSchema, *, user, allow_statu
post.save()
if tag_ids is not None:
post.tags.set(tag_ids)
if writer_ids is not None:
if user.is_superuser or user.is_staff or can_review_blog_posts(user):
writers = list(post.writers.model.objects.filter(id__in=writer_ids, is_active=True))
post.writers.set(writers or [post.author])
else:
post.writers.set([user])
elif not post.writers.exists():
post.writers.set([post.author])
return post
@@ -135,7 +244,168 @@ def _interaction_payload(post: Post, user) -> BlogInteractionSchema:
saved=SavedPost.objects.filter(post=post, user=user).exists(),
likes_count=post.likes.count(),
saves_count=post.saves.count(),
comments_count=post.comments.filter(is_approved=True).count(),
comments_count=post.comments.filter(is_approved=True, is_hidden=False, is_deleted=False).count(),
)
def _frontend_blog_url(post: Post) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):
root = f"{root}/"
return f"{root}blog/{post.slug}"
def _blog_moderator_ids() -> set[int]:
return set(
User.objects.filter(is_active=True)
.filter(
Q(is_staff=True)
| Q(is_superuser=True)
| Q(groups__permissions__content_type__app_label="blog", groups__permissions__codename="moderate_blog_comment")
| Q(user_permissions__content_type__app_label="blog", user_permissions__codename="moderate_blog_comment")
)
.distinct()
.values_list("id", flat=True)
)
def _post_author_ids(post: Post) -> set[int]:
author_ids = set(post.writers.values_list("id", flat=True))
if post.author_id:
author_ids.add(post.author_id)
return author_ids
def _notify_blog_comment(comment: Comment) -> None:
post = comment.post
actor_id = comment.author_id
action_url = _frontend_blog_url(post)
excluded_ids = {actor_id}
if comment.parent_id and comment.parent.author_id != actor_id:
notify_user(
comment.parent.author_id,
{
"type": "blog_reply",
"title": "پاسخ جدید",
"message": f"به کامنت شما در «{post.title}» پاسخ داده شد.",
"level": "info",
"action_url": action_url,
"entity_type": "blog_comment",
"entity_id": comment.id,
"meta": {"post_id": post.id, "post_slug": post.slug, "parent_id": comment.parent_id},
},
)
excluded_ids.add(comment.parent.author_id)
recipient_ids = (_blog_moderator_ids() | _post_author_ids(post)) - excluded_ids
for user_id in recipient_ids:
notify_user(
user_id,
{
"type": "blog_comment",
"title": "کامنت جدید",
"message": f"برای «{post.title}» کامنت جدید ثبت شد.",
"level": "info",
"action_url": action_url,
"entity_type": "blog_post",
"entity_id": post.id,
"meta": {"post_id": post.id, "post_slug": post.slug, "comment_id": comment.id},
},
)
def _can_manage_blog_taxonomy(user) -> bool:
return bool(
user
and getattr(user, "is_authenticated", False)
and (
user.is_superuser
or user.is_staff
or user.has_perm("blog.add_category")
or user.has_perm("blog.change_category")
or user.has_perm("blog.add_tag")
or user.has_perm("blog.change_tag")
)
)
def _category_queryset_with_counts():
return Category.objects.annotate(post_count=Count("posts", filter=Q(posts__is_deleted=False), distinct=True))
def _tag_queryset_with_counts():
return Tag.objects.annotate(post_count=Count("posts", filter=Q(posts__is_deleted=False), distinct=True))
def _validate_category_parent(category_id: int | None, parent_id: int | None) -> tuple[Category | None, str | None]:
if not parent_id:
return None, None
if category_id and parent_id == category_id:
return None, "A category cannot be its own parent."
if category_id and Category.objects.filter(parent_id=category_id).exists():
return None, "A category with child categories must remain a root category."
parent = Category.objects.filter(id=parent_id).first()
if not parent:
return None, "Parent category not found."
if parent.parent_id:
return None, "Only root categories can be selected as a parent."
current = parent
seen: set[int] = set()
while current:
if current.id in seen:
return None, "Invalid category hierarchy."
seen.add(current.id)
if category_id and current.id == category_id:
return None, "Category parent would create a cycle."
current = current.parent
return parent, None
def _apply_category_payload(category: Category, data: CategoryWriteSchema) -> tuple[Category | None, str | None]:
name = (data.name or "").strip()
if not name:
return None, "Category name is required."
parent, error = _validate_category_parent(category.id, data.parent_id)
if error:
return None, error
category.name = name
if data.slug is not None:
category.slug = data.slug.strip()
category.description = data.description or ""
category.parent = parent
return category, None
def _apply_tag_payload(tag: Tag, data: TagWriteSchema) -> tuple[Tag | None, str | None]:
name = (data.name or "").strip()
if not name:
return None, "Tag name is required."
tag.name = name
if data.slug is not None:
tag.slug = data.slug.strip()
return tag, None
@blog_router.get("/admin/writers", response={200: List[AuthorSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_blog_writers(request):
if not (request.auth.is_superuser or request.auth.is_staff or can_review_blog_posts(request.auth)):
return 403, {"error": "Permission denied"}
return (
User.objects.filter(is_active=True)
.filter(
Q(is_staff=True)
| Q(is_superuser=True)
| Q(groups__permissions__content_type__app_label="blog", groups__permissions__codename__in=["add_post", "change_post"])
| Q(user_permissions__content_type__app_label="blog", user_permissions__codename__in=["add_post", "change_post"])
)
.distinct()
.order_by("first_name", "last_name", "username")
)
@@ -319,17 +589,72 @@ def delete_post_asset(request, post_id: int, asset_id: int):
@blog_router.get("/me/activity", response=BlogProfileActivitySchema, auth=jwt_auth)
def my_blog_activity(request):
comments = (
Comment.objects.filter(author=request.auth)
Comment.objects.filter(
author=request.auth,
is_approved=True,
is_hidden=False,
is_deleted=False,
post__status=Post.StatusChoices.PUBLISHED,
post__is_deleted=False,
)
.select_related("author", "post")
.order_by("-created_at")[:20]
)
return BlogProfileActivitySchema(
liked_posts=list(_published_queryset().filter(likes__user=request.auth)[:20]),
saved_posts=list(_published_queryset().filter(saves__user=request.auth)[:20]),
comments=list([comment for comment in comments if comment.parent_id is None]),
replies=list([comment for comment in comments if comment.parent_id is not None]),
return {
"liked_posts": list(_published_queryset().filter(likes__user=request.auth)[:20]),
"saved_posts": list(_published_queryset().filter(saves__user=request.auth)[:20]),
"comments": [comment for comment in comments if comment.parent_id is None],
"replies": [comment for comment in comments if comment.parent_id is not None],
}
@blog_router.get("/banners", response=List[BlogBannerSchema])
def list_banners(request):
return BlogBanner.objects.filter(is_active=True, is_deleted=False).order_by("sort_order", "-created_at")
@blog_router.get("/filters", response=BlogFiltersSchema)
def blog_filters(request):
tag_rows = (
Tag.objects.annotate(
post_count=Count(
"posts",
filter=Q(posts__status=Post.StatusChoices.PUBLISHED, posts__is_deleted=False),
distinct=True,
)
)
.filter(post_count__gt=0)
.order_by("name")
)
author_post_ids: dict[int, set[int]] = {}
published_posts = Post.objects.filter(status=Post.StatusChoices.PUBLISHED, is_deleted=False)
for row in published_posts.values("id", "author_id"):
author_post_ids.setdefault(row["author_id"], set()).add(row["id"])
for row in Post.writers.through.objects.filter(post__status=Post.StatusChoices.PUBLISHED, post__is_deleted=False).values("post_id", "user_id"):
author_post_ids.setdefault(row["user_id"], set()).add(row["post_id"])
users = User.objects.filter(id__in=author_post_ids.keys(), is_active=True).order_by("first_name", "last_name", "username")
authors = [
{
"id": user.id,
"username": user.username,
"first_name": user.first_name,
"last_name": user.last_name,
"post_count": len(author_post_ids.get(user.id, set())),
}
for user in users
]
return {
"categories": _build_category_filter_tree(),
"tags": [
{"id": tag.id, "name": tag.name, "slug": tag.slug, "post_count": tag.post_count}
for tag in tag_rows
],
"authors": authors,
}
@blog_router.get("/posts", response=List[PostListSchema])
def list_posts(
@@ -344,17 +669,20 @@ def list_posts(
):
queryset = _published_queryset()
if category:
queryset = queryset.filter(category__slug=category)
if tag:
queryset = queryset.filter(tags__slug=tag)
category_ids = _category_and_descendant_ids(category)
queryset = queryset.filter(category_id__in=category_ids) if category_ids else queryset.none()
tags = _query_values(request, "tag", tag)
if tags:
queryset = queryset.filter(tags__slug__in=tags)
if search:
queryset = queryset.filter(Q(title__icontains=search) | Q(content__icontains=search) | Q(excerpt__icontains=search))
if featured is not None:
queryset = queryset.filter(is_featured=featured)
if author:
queryset = queryset.filter(author__username=author)
authors = _query_values(request, "author", author)
if authors:
queryset = queryset.filter(Q(author__username__in=authors) | Q(writers__username__in=authors))
offset = (page - 1) * limit
return list(queryset[offset : offset + limit])
return list(queryset.distinct()[offset : offset + limit])
@blog_router.get("/posts/{slug}/recommended", response=List[PostListSchema])
@@ -426,8 +754,19 @@ def delete_post(request, slug: str):
@blog_router.get("/posts/{slug}/comments", response=List[CommentSchema])
def list_comments(request, slug: str):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
comments = Comment.objects.filter(post=post, is_approved=True, parent=None).select_related("author", "post").prefetch_related(
Prefetch("replies", queryset=Comment.objects.filter(is_approved=True).select_related("author", "post"))
user = _optional_auth_user(request)
visibility = _comment_visibility_filter(user)
replies = (
Comment.objects.filter(visibility)
.select_related("author", "post")
.prefetch_related(Prefetch("replies", queryset=Comment.objects.filter(visibility).select_related("author", "post").order_by("-created_at")))
.order_by("-created_at")
)
comments = (
Comment.objects.filter(visibility, post=post, parent=None)
.select_related("author", "post")
.prefetch_related(Prefetch("replies", queryset=replies))
.order_by("-created_at")
)
return list(comments)
@@ -437,8 +776,12 @@ def create_comment(request, slug: str, data: CommentCreateSchema):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
parent = None
if data.parent_id:
parent = get_object_or_404(Comment, id=data.parent_id, post=post, is_approved=True)
parent = get_object_or_404(Comment, id=data.parent_id, post=post, is_approved=True, is_hidden=False, is_deleted=False)
comment = Comment.objects.create(post=post, author=request.auth, content=data.content, parent=parent)
try:
_notify_blog_comment(comment)
except Exception:
logger.exception("Failed to send blog comment notifications for comment=%s", comment.id)
return 201, comment
@@ -451,6 +794,42 @@ def hide_comment(request, comment_id: int, data: CommentHideSchema):
return 200, {"message": "Comment hidden successfully"}
@blog_router.post("/comments/{comment_id}/unhide", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def unhide_comment(request, comment_id: int):
if not can_moderate_blog_comments(request.auth):
return 403, {"error": "Permission denied"}
comment = get_object_or_404(Comment, id=comment_id)
comment.unhide()
return 200, {"message": "Comment restored successfully"}
@blog_router.post("/comments/{comment_id}/delete", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def soft_delete_comment_tree(request, comment_id: int, data: CommentHideSchema):
if not can_moderate_blog_comments(request.auth):
return 403, {"error": "Permission denied"}
comment = get_object_or_404(Comment.all_objects, id=comment_id, is_deleted=False)
deleted_count = 1 + len(comment.descendant_ids())
comment.soft_delete_tree(request.auth, data.note or "")
return 200, {"message": f"{deleted_count} comment(s) deleted successfully"}
@blog_router.put("/comments/{comment_id}", response={200: CommentSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_comment(request, comment_id: int, data: CommentUpdateSchema):
comment = get_object_or_404(Comment.objects.select_related("author", "post"), id=comment_id)
if comment.author_id != request.auth.id:
return 403, {"error": "Permission denied"}
if comment.is_deleted or comment.is_hidden or not comment.is_approved or comment.hidden_at:
return 403, {"error": "Hidden comments cannot be edited"}
content = data.content.strip()
if not content:
return 400, {"error": "Comment content is required"}
comment.content = content
comment.save(update_fields=["content", "updated_at"])
return 200, comment
@blog_router.post("/posts/{slug}/like", response={200: BlogInteractionSchema}, auth=jwt_auth)
def toggle_like(request, slug: str):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
@@ -519,6 +898,96 @@ def restore_comment(request, comment_id: int):
return 400, {"error": "Comment not found or not soft-deleted."}
@blog_router.get("/admin/categories", response={200: List[AdminCategorySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_admin_categories(request):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
return 200, _category_queryset_with_counts().order_by("name")
@blog_router.post("/admin/categories", response={201: AdminCategorySchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def create_admin_category(request, data: CategoryWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
category, error = _apply_category_payload(Category(), data)
if error:
return 400, {"error": error}
try:
category.save()
except IntegrityError:
return 400, {"error": "Category name or slug already exists."}
return 201, _category_queryset_with_counts().get(id=category.id)
@blog_router.put("/admin/categories/{category_id}", response={200: AdminCategorySchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_admin_category(request, category_id: int, data: CategoryWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
category = get_object_or_404(Category, id=category_id)
category, error = _apply_category_payload(category, data)
if error:
return 400, {"error": error}
try:
category.save()
except IntegrityError:
return 400, {"error": "Category name or slug already exists."}
return 200, _category_queryset_with_counts().get(id=category.id)
@blog_router.delete("/admin/categories/{category_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_admin_category(request, category_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
category = get_object_or_404(Category, id=category_id)
category.delete()
return 200, {"message": f"Category '{category.name}' deleted successfully."}
@blog_router.get("/admin/tags", response={200: List[AdminTagSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_admin_tags(request):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
return 200, _tag_queryset_with_counts().order_by("name")
@blog_router.post("/admin/tags", response={201: AdminTagSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def create_admin_tag(request, data: TagWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
tag, error = _apply_tag_payload(Tag(), data)
if error:
return 400, {"error": error}
try:
tag.save()
except IntegrityError:
return 400, {"error": "Tag name or slug already exists."}
return 201, _tag_queryset_with_counts().get(id=tag.id)
@blog_router.put("/admin/tags/{tag_id}", response={200: AdminTagSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_admin_tag(request, tag_id: int, data: TagWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
tag = get_object_or_404(Tag, id=tag_id)
tag, error = _apply_tag_payload(tag, data)
if error:
return 400, {"error": error}
try:
tag.save()
except IntegrityError:
return 400, {"error": "Tag name or slug already exists."}
return 200, _tag_queryset_with_counts().get(id=tag.id)
@blog_router.delete("/admin/tags/{tag_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_admin_tag(request, tag_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
tag = get_object_or_404(Tag, id=tag_id)
tag.delete()
return 200, {"message": f"Tag '{tag.name}' deleted successfully."}
@blog_router.get("/categories", response=List[CategorySchema])
def list_categories(request):
return Category.objects.all()

View File

@@ -0,0 +1,125 @@
from __future__ import annotations
import sys
from django.core.management.base import BaseCommand
from apps.blog.models import Category
CATEGORIES = [
{
"name": "اخبار و اطلاعیه‌ها",
"description": "خبرها، اطلاعیه‌ها و گزارش‌های مرتبط با انجمن، دانشکده و جامعه دانشجویی.",
"children": [
("اخبار انجمن", "خبرها و گزارش‌های رسمی انجمن علمی مهندسی کامپیوتر."),
("اطلاعیه‌های آموزشی", "اطلاعیه‌های مهم آموزشی، انتخاب واحد، امتحانات و امور دانشجویی."),
("رویدادها و کارگاه‌ها", "معرفی، گزارش و پیگیری رویدادها، نشست‌ها و کارگاه‌ها."),
],
},
{
"name": "آموزش و مسیر یادگیری",
"description": "مطالب آموزشی و مسیرهای یادگیری برای دانشجویان علوم و مهندسی کامپیوتر.",
"children": [
("راهنمای شروع", "مطالب مقدماتی برای شروع برنامه‌نویسی، دانشگاه و مهارت‌آموزی."),
("آموزش‌های فنی", "آموزش‌های عملی، گام‌به‌گام و مسئله‌محور در حوزه‌های فنی."),
("منابع یادگیری", "معرفی کتاب، دوره، مستندات، مسیر مطالعه و منابع مفید."),
],
},
{
"name": "فناوری و مهندسی نرم‌افزار",
"description": "مقاله‌های فنی درباره توسعه نرم‌افزار، ابزارها، معماری و فناوری‌های روز.",
"children": [
("برنامه‌نویسی", "زبان‌ها، الگوها، نکته‌های کدنویسی و تجربه‌های عملی توسعه."),
("وب و اپلیکیشن", "فرانت‌اند، بک‌اند، موبایل، API و تجربه ساخت محصول."),
("دواپس و ابزارها", "لینوکس، گیت، CI/CD، استقرار، کانتینر و ابزارهای توسعه."),
],
},
{
"name": "هوش مصنوعی و داده",
"description": "مطالب مرتبط با هوش مصنوعی، یادگیری ماشین، داده و کاربردهای آن‌ها.",
"children": [
("یادگیری ماشین", "مفاهیم، تمرین‌ها و تجربه‌های یادگیری ماشین و مدل‌سازی."),
("علم داده", "تحلیل داده، مصورسازی، آمار کاربردی و پروژه‌های داده‌محور."),
("هوش مصنوعی کاربردی", "ابزارها، کاربردها، ایده‌ها و تجربه‌های عملی با AI."),
],
},
{
"name": "دانشگاه و پژوهش",
"description": "محتوای علمی، پژوهشی و دانشگاهی برای دانشجویان و اعضای انجمن.",
"children": [
("پژوهش دانشجویی", "تجربه‌ها، معرفی مقاله، ایده پژوهشی و همکاری‌های علمی."),
("درس و دانشگاه", "راهنمای درس‌ها، پروژه‌های درسی، امتحان و تجربه دانشگاهی."),
("مسابقات علمی", "برنامه‌نویسی رقابتی، مسابقات، چالش‌ها و آمادگی تیمی."),
],
},
{
"name": "پروژه‌ها و تجربه‌ها",
"description": "تجربه‌های واقعی دانشجویان از پروژه، کار تیمی، کارآموزی و مسیر حرفه‌ای.",
"children": [
("پروژه‌های دانشجویی", "معرفی، کالبدشکافی و گزارش پروژه‌های دانشجویی و تیمی."),
("کارآموزی و بازار کار", "رزومه، مصاحبه، کارآموزی، مسیر شغلی و تجربه ورود به کار."),
("تجربه‌های انجمنی", "روایت‌ها و درس‌آموخته‌های فعالیت در انجمن و تیم‌های دانشجویی."),
],
},
]
def console_safe(value: str) -> str:
encoding = sys.stdout.encoding or "utf-8"
return value.encode(encoding, errors="backslashreplace").decode(encoding)
class Command(BaseCommand):
help = "Create or update production blog categories for the CS association blog."
def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true", help="Print planned categories without writing changes.")
def handle(self, *args, **options):
dry_run = options["dry_run"]
root_count = 0
child_count = 0
for root_spec in CATEGORIES:
if dry_run:
self.stdout.write(console_safe(f"[root] {root_spec['name']}"))
for child_name, _ in root_spec["children"]:
self.stdout.write(console_safe(f" [child] {child_name}"))
continue
root = self._upsert_category(
name=root_spec["name"],
description=root_spec["description"],
parent=None,
)
root_count += 1
for child_name, child_description in root_spec["children"]:
self._upsert_category(
name=child_name,
description=child_description,
parent=root,
)
child_count += 1
if dry_run:
self.stdout.write(self.style.WARNING("Dry run only. No categories were changed."))
return
self.stdout.write(self.style.SUCCESS(f"Blog categories synchronized: {root_count} roots, {child_count} children."))
def _upsert_category(self, *, name: str, description: str, parent: Category | None) -> Category:
if parent and parent.parent_id:
raise ValueError(f"Invalid category tree: parent '{parent.name}' is not a root category.")
category, _ = Category.all_objects.update_or_create(
name=name,
defaults={
"description": description,
"parent": parent,
"is_deleted": False,
"deleted_at": None,
},
)
return category

View File

@@ -0,0 +1,338 @@
from __future__ import annotations
import random
from io import BytesIO
from pathlib import Path
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
from django.utils import timezone
from apps.blog.models import BlogBanner, Category, Comment, Like, Post, SavedPost, Tag
from apps.blog.permissions import BLOG_EDITOR_GROUP
try:
from PIL import Image, ImageDraw
except ImportError: # pragma: no cover - command gracefully explains missing optional dependency.
Image = None
ImageDraw = None
User = get_user_model()
WRITERS = [
{
"username": "mock-blog-writer-ali",
"first_name": "علی",
"last_name": "کریمی",
"bio": "دانشجوی مهندسی کامپیوتر و علاقه‌مند به معماری نرم‌افزار، لینوکس و تجربه‌های واقعی تیمی.",
},
{
"username": "mock-blog-writer-sara",
"first_name": "سارا",
"last_name": "احمدی",
"bio": "نویسنده حوزه تجربه کاربری، فرانت‌اند و یادگیری کاربردی برای دانشجویان تازه‌وارد.",
},
{
"username": "mock-blog-writer-nima",
"first_name": "نیما",
"last_name": "رضایی",
"bio": "علاقه‌مند به الگوریتم، بک‌اند و انتقال تجربه‌های مسابقه‌ای به پروژه‌های واقعی.",
},
]
TAG_NAMES = [
"پایتون",
"فرانت‌اند",
"بک‌اند",
"الگوریتم",
"هوش مصنوعی",
"تجربه دانشجویی",
"مسیر شغلی",
"لینوکس",
]
POSTS = [
{
"title": "چطور یک پروژه دانشجویی را مثل محصول واقعی جلو ببریم؟",
"slug": "mock-پروژه-دانشجویی-محصول-واقعی",
"category": "توسعه نرم‌افزار",
"tags": ["بک‌اند", "فرانت‌اند", "تجربه دانشجویی"],
},
{
"title": "راهنمای شروع پایتون برای دانشجویان مهندسی کامپیوتر",
"slug": "mock-شروع-پایتون-برای-دانشجویان",
"category": "برنامه‌نویسی",
"tags": ["پایتون", "مسیر شغلی"],
},
{
"title": "الگوریتم‌ها را چطور کاربردی یاد بگیریم؟",
"slug": "mock-یادگیری-کاربردی-الگوریتم",
"category": "علوم کامپیوتر",
"tags": ["الگوریتم", "تجربه دانشجویی"],
},
{
"title": "از ترمینال نترسیم: لینوکس برای زندگی روزمره دانشجویی",
"slug": "mock-لینوکس-برای-دانشجویان",
"category": "ابزارها",
"tags": ["لینوکس", "مسیر شغلی"],
},
{
"title": "هوش مصنوعی در پروژه‌های کوچک دانشجویی",
"slug": "mock-هوش-مصنوعی-پروژه-دانشجویی",
"category": "هوش مصنوعی",
"tags": ["هوش مصنوعی", "پایتون"],
},
]
def make_markdown(title: str) -> str:
return f"""# {title}
این نوشته برای تست نمای واقعی بلاگ ساخته شده است. متن عمداً چند بخش دارد تا فهرست محتوا، خوانایی، کدبلاک و کامنت‌ها در صفحه جزئیات بهتر دیده شوند.
## مسئله از کجا شروع می‌شود؟
وقتی یک تیم دانشجویی روی پروژه کار می‌کند، معمولاً تمرکز اصلی روی تمام کردن سریع کار است. اما اگر کمی ساختار داشته باشیم، خروجی هم قابل ارائه‌تر می‌شود و هم بعداً قابل توسعه خواهد بود.
## یک نمونه کد کوتاه
```python
def normalize_title(title: str) -> str:
return "-".join(title.strip().lower().split())
print(normalize_title("Guilan ACE Blog"))
```
## پیشنهاد عملی
- ابتدا مسئله را واضح بنویسید.
- کارها را کوچک و قابل بررسی کنید.
- خروجی هر مرحله را مستند کنید.
- بازخورد گرفتن را به آخر کار موکول نکنید.
### نکته تکمیلی
اگر نوشته شامل تصویر، کد یا لینک است، بهتر است ساختار آن از ابتدا با تیترهای واضح جدا شود تا کاربر بتواند سریع‌تر بخش موردنظرش را پیدا کند.
"""
def make_image_bytes(label: str, width: int, height: int, color: tuple[int, int, int]) -> bytes:
if Image is None or ImageDraw is None:
raise RuntimeError("Pillow is required to generate mock images.")
image = Image.new("RGB", (width, height), color)
draw = ImageDraw.Draw(image)
for index in range(0, width, 48):
draw.line((index, 0, index - height, height), fill=(255, 255, 255), width=2)
draw.rectangle((32, height - 112, width - 32, height - 32), fill=(20, 24, 38))
draw.text((52, height - 84), label[:70], fill=(255, 255, 255))
output = BytesIO()
image.save(output, format="JPEG", quality=88)
return output.getvalue()
def set_image_field(instance, field_name: str, path: str, label: str, width: int, height: int, color: tuple[int, int, int]):
field = getattr(instance, field_name)
if field:
return
image_bytes = make_image_bytes(label, width, height, color)
field.save(path, ContentFile(image_bytes), save=False)
class Command(BaseCommand):
help = "Seed rich mock blog data for local visual QA."
def add_arguments(self, parser):
parser.add_argument("--reset", action="store_true", help="Delete previous mock blog data before seeding.")
parser.add_argument("--password", default="MockPass12345!", help="Password for generated writer users.")
def handle(self, *args, **options):
if Image is None:
raise RuntimeError("Pillow is required. Install project requirements before running this command.")
random.seed(42)
if options["reset"]:
self._reset_mock_data()
editor_group, _ = Group.objects.get_or_create(name=BLOG_EDITOR_GROUP)
writers = self._seed_writers(editor_group, options["password"])
categories = self._seed_categories()
tags = self._seed_tags()
self._seed_banners()
posts = self._seed_posts(writers, categories, tags)
self._seed_comments_and_reactions(posts, writers)
self.stdout.write(self.style.SUCCESS("Mock blog data seeded successfully."))
self.stdout.write("Writer login usernames:")
for writer in writers:
self.stdout.write(f" - {writer.username} / {options['password']}")
def _reset_mock_data(self):
Post.all_objects.filter(slug__startswith="mock-").delete()
BlogBanner.all_objects.filter(title__startswith="Mock ").delete()
Category.all_objects.filter(slug__startswith="mock-").delete()
Tag.all_objects.filter(slug__startswith="mock-").delete()
User.objects.filter(username__startswith="mock-blog-writer-").delete()
def _seed_writers(self, editor_group: Group, password: str):
writers = []
for index, spec in enumerate(WRITERS, start=1):
user, created = User.objects.get_or_create(
username=spec["username"],
defaults={
"first_name": spec["first_name"],
"last_name": spec["last_name"],
"email": f"{spec['username']}@example.local",
"mobile": f"09199000{index:03d}",
"bio": spec["bio"],
"is_active": True,
"is_mobile_verified": True,
},
)
user.first_name = spec["first_name"]
user.last_name = spec["last_name"]
user.bio = spec["bio"]
user.is_active = True
user.is_mobile_verified = True
if created:
user.set_password(password)
set_image_field(
user,
"profile_picture",
f"profile_pictures/mock-writer-{index}.jpg",
spec["first_name"],
512,
512,
(42 + index * 30, 95 + index * 20, 130 + index * 15),
)
user.save()
user.groups.add(editor_group)
writers.append(user)
return writers
def _seed_categories(self):
root, _ = Category.objects.get_or_create(
slug="mock-بلاگ-انجمن",
defaults={"name": "بلاگ انجمن", "description": "دسته اصلی محتوای تستی بلاگ"},
)
names = ["برنامه‌نویسی", "علوم کامپیوتر", "توسعه نرم‌افزار", "ابزارها", "هوش مصنوعی"]
categories = {"بلاگ انجمن": root}
for name in names:
category, _ = Category.objects.get_or_create(
slug=f"mock-{name}",
defaults={"name": name, "parent": root, "description": f"مطالب تستی درباره {name}"},
)
category.name = name
category.parent = root
category.save()
categories[name] = category
return categories
def _seed_tags(self):
tags = {}
for name in TAG_NAMES:
tag, _ = Tag.objects.get_or_create(slug=f"mock-{name}", defaults={"name": name})
tag.name = name
tag.save()
tags[name] = tag
return tags
def _seed_banners(self):
colors = [(9, 80, 90), (120, 64, 24), (38, 70, 83)]
for index in range(1, 4):
banner, _ = BlogBanner.objects.get_or_create(
title=f"Mock Blog Banner {index}",
defaults={
"url": f"https://east-guilan-ce.ir/blog?mock-banner={index}",
"alt_text": f"بنر تستی بلاگ {index}",
"sort_order": index,
"is_active": True,
},
)
banner.url = f"https://east-guilan-ce.ir/blog?mock-banner={index}"
banner.alt_text = f"بنر تستی بلاگ {index}"
banner.sort_order = index
banner.is_active = True
set_image_field(
banner,
"image",
f"blog/banners/mock-banner-{index}.jpg",
f"Mock Banner {index}",
1440,
320,
colors[index - 1],
)
banner.save()
def _seed_posts(self, writers, categories, tags):
posts = []
for index, spec in enumerate(POSTS, start=1):
writer_pool = writers[: 1 + (index % len(writers))]
post, _ = Post.all_objects.get_or_create(
slug=spec["slug"],
defaults={
"title": spec["title"],
"author": writer_pool[0],
"content": make_markdown(spec["title"]),
"excerpt": f"خلاصه تستی برای نوشته «{spec['title']}» که برای بررسی کارت‌ها و سئوی بلاگ استفاده می‌شود.",
"status": Post.StatusChoices.PUBLISHED,
"category": categories[spec["category"]],
"is_featured": index <= 2,
"seo_title": spec["title"][:70],
"seo_description": f"توضیح سئوی تستی برای {spec['title']}",
"og_title": spec["title"][:95],
"og_description": f"متن شبکه‌های اجتماعی برای {spec['title']}",
"focus_keyword": spec["tags"][0],
"published_at": timezone.now() - timezone.timedelta(days=index * 3),
},
)
post.title = spec["title"]
post.author = writer_pool[0]
post.content = make_markdown(spec["title"])
post.excerpt = f"خلاصه تستی برای نوشته «{spec['title']}» که برای بررسی کارت‌ها و سئوی بلاگ استفاده می‌شود."
post.status = Post.StatusChoices.PUBLISHED
post.category = categories[spec["category"]]
post.is_featured = index <= 2
post.published_at = post.published_at or timezone.now() - timezone.timedelta(days=index * 3)
set_image_field(
post,
"featured_image",
f"blog/featured/mock-post-{index}.jpg",
spec["title"],
1280,
720,
(25 + index * 28, 90 + index * 18, 120 + index * 12),
)
post.save()
post.tags.set([tags[name] for name in spec["tags"]])
post.writers.set(writer_pool)
posts.append(post)
return posts
def _seed_comments_and_reactions(self, posts, writers):
for post in posts:
for index, writer in enumerate(writers, start=1):
if writer == post.author:
continue
comment, _ = Comment.objects.get_or_create(
post=post,
author=writer,
parent=None,
defaults={"content": f"کامنت تستی {index}: این بخش برای بررسی ظاهر کامنت‌ها و پاسخ‌ها ساخته شده است."},
)
Comment.objects.get_or_create(
post=post,
author=post.author,
parent=comment,
defaults={"content": "پاسخ تستی نویسنده برای بررسی حالت nested در کامنت‌ها."},
)
Like.objects.get_or_create(post=post, user=writer)
if index % 2 == 0:
SavedPost.objects.get_or_create(post=post, user=writer)

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import sys
from django.core.management.base import BaseCommand
from apps.blog.models import Tag
TAGS = [
"انجمن علمی",
"دانشکده",
"اطلاعیه",
"رویداد",
"کارگاه",
"گزارش رویداد",
"برنامه‌نویسی",
"پایتون",
"جاوااسکریپت",
"تایپ‌اسکریپت",
"جاوا",
"سی‌پلاس‌پلاس",
"گولنگ",
"فرانت‌اند",
"بک‌اند",
"React",
"Next.js",
"Django",
"REST API",
"پایگاه داده",
"PostgreSQL",
"Redis",
"گیت",
"لینوکس",
"Docker",
"DevOps",
"استقرار",
"امنیت",
"شبکه",
"سیستم‌عامل",
"الگوریتم",
"ساختمان داده",
"برنامه‌نویسی رقابتی",
"حل مسئله",
"هوش مصنوعی",
"یادگیری ماشین",
"یادگیری عمیق",
"علم داده",
"تحلیل داده",
"داده‌کاوی",
"پردازش زبان طبیعی",
"بینایی ماشین",
"پژوهش",
"مقاله‌خوانی",
"پروژه دانشجویی",
"پروژه درسی",
"تیم‌سازی",
"مدیریت پروژه",
"طراحی نرم‌افزار",
"معماری نرم‌افزار",
"تست نرم‌افزار",
"تجربه کاربری",
"طراحی رابط کاربری",
"اپن‌سورس",
"کارآموزی",
"رزومه",
"مصاحبه",
"مسیر شغلی",
"منابع یادگیری",
"کتاب",
"دوره آموزشی",
"تجربه دانشجویی",
"انتخاب واحد",
"امتحانات",
"آموزش",
"راهنمای شروع",
]
def console_safe(value: str) -> str:
encoding = sys.stdout.encoding or "utf-8"
return value.encode(encoding, errors="backslashreplace").decode(encoding)
class Command(BaseCommand):
help = "Create or update production blog tags for the CS association blog."
def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true", help="Print planned tags without writing changes.")
def handle(self, *args, **options):
dry_run = options["dry_run"]
if dry_run:
for name in TAGS:
self.stdout.write(console_safe(f"[tag] {name}"))
self.stdout.write(self.style.WARNING("Dry run only. No tags were changed."))
return
count = 0
for name in TAGS:
Tag.all_objects.update_or_create(
name=name,
defaults={
"is_deleted": False,
"deleted_at": None,
},
)
count += 1
self.stdout.write(self.style.SUCCESS(f"Blog tags synchronized: {count} tags."))

View File

@@ -0,0 +1,69 @@
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import apps.blog.models
def backfill_post_writers(apps, schema_editor):
Post = apps.get_model("blog", "Post")
for post in Post.objects.exclude(author_id__isnull=True).iterator():
post.writers.add(post.author_id)
def noop_reverse(apps, schema_editor):
pass
class Migration(migrations.Migration):
dependencies = [
("blog", "0003_blog_platform"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="BlogBanner",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("is_deleted", models.BooleanField(default=False)),
("deleted_at", models.DateTimeField(blank=True, null=True)),
("title", models.CharField(blank=True, max_length=160)),
("alt_text", models.CharField(blank=True, max_length=200)),
("image", models.ImageField(upload_to=apps.blog.models.blog_banner_upload_to)),
("url", models.URLField()),
("is_active", models.BooleanField(default=True)),
("sort_order", models.PositiveIntegerField(default=0)),
],
options={
"ordering": ["sort_order", "-created_at"],
},
),
migrations.AddField(
model_name="category",
name="parent",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="children",
to="blog.category",
),
),
migrations.AddField(
model_name="post",
name="writers",
field=models.ManyToManyField(
blank=True,
related_name="written_blog_posts",
to=settings.AUTH_USER_MODEL,
),
),
migrations.AddIndex(
model_name="blogbanner",
index=models.Index(fields=["is_active", "sort_order"], name="blog_blogba_is_acti_c11b3c_idx"),
),
migrations.RunPython(backfill_post_writers, noop_reverse),
]

View File

@@ -0,0 +1,58 @@
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
def mark_legacy_hidden_comments(apps, schema_editor):
Comment = apps.get_model("blog", "Comment")
Comment.objects.filter(is_approved=False, is_deleted=False).update(is_hidden=True)
def unmark_legacy_hidden_comments(apps, schema_editor):
Comment = apps.get_model("blog", "Comment")
Comment.objects.filter(is_hidden=True, is_deleted=False).update(is_hidden=False)
class Migration(migrations.Migration):
dependencies = [
("blog", "0004_blog_banner_nested_categories_post_writers"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name="comment",
name="delete_note",
field=models.TextField(blank=True),
),
migrations.AddField(
model_name="comment",
name="deleted_by",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="deleted_blog_comments",
to=settings.AUTH_USER_MODEL,
),
),
migrations.AddField(
model_name="comment",
name="is_hidden",
field=models.BooleanField(default=False),
),
migrations.RunPython(mark_legacy_hidden_comments, unmark_legacy_hidden_comments),
migrations.RemoveIndex(
model_name="comment",
name="blog_commen_post_id_7710b1_idx",
),
migrations.AddIndex(
model_name="comment",
index=models.Index(fields=["post", "is_approved", "is_hidden"], name="blog_commen_post_id_760827_idx"),
),
migrations.AddIndex(
model_name="comment",
index=models.Index(fields=["parent", "is_deleted", "is_hidden"], name="blog_commen_parent__2abfc7_idx"),
),
]

View File

@@ -54,10 +54,22 @@ def post_asset_upload_to(instance: "PostAsset", filename: str) -> str:
return f"blog/posts/{post_part}/assets/{uuid4().hex}{suffix}"
def blog_banner_upload_to(instance: "BlogBanner", filename: str) -> str:
suffix = Path(filename).suffix.lower()
return f"blog/banners/{uuid4().hex}{suffix}"
class Category(BaseModel):
name = models.CharField(max_length=100, unique=True)
slug = models.SlugField(max_length=100, unique=True, blank=True, allow_unicode=True)
description = models.TextField(blank=True)
parent = models.ForeignKey(
"self",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="children",
)
class Meta:
verbose_name_plural = "Categories"
@@ -71,6 +83,17 @@ class Category(BaseModel):
self.slug = _unique_slug_for(self, self.name)
super().save(*args, **kwargs)
@property
def path(self):
path = []
current = self
seen = set()
while current and current.pk not in seen:
seen.add(current.pk)
path.append(current)
current = current.parent
return list(reversed(path))
class Tag(BaseModel):
name = models.CharField(max_length=50, unique=True)
@@ -136,6 +159,11 @@ class Post(BaseModel):
blank=True,
related_name="published_blog_posts",
)
writers = models.ManyToManyField(
settings.AUTH_USER_MODEL,
blank=True,
related_name="written_blog_posts",
)
class Meta:
ordering = ["-created_at"]
@@ -177,8 +205,8 @@ class Post(BaseModel):
"markdown.extensions.toc",
],
)
word_count = len((self.content or "").split())
self.reading_time = max(1, (word_count + 199) // 200)
character_count = len(_plain_text_from_markdown(self.content or ""))
self.reading_time = max(1, (character_count + 999) // 1000)
if self.status == Post.StatusChoices.PUBLISHED and not self.published_at:
self.published_at = timezone.now()
@@ -206,6 +234,24 @@ class Post(BaseModel):
safe_process_public_image(self.og_image, "blog_featured")
class BlogBanner(BaseModel):
title = models.CharField(max_length=160, blank=True)
alt_text = models.CharField(max_length=200, blank=True)
image = models.ImageField(upload_to=blog_banner_upload_to)
url = models.URLField()
is_active = models.BooleanField(default=True)
sort_order = models.PositiveIntegerField(default=0)
class Meta:
ordering = ["sort_order", "-created_at"]
indexes = [
models.Index(fields=["is_active", "sort_order"]),
]
def __str__(self):
return self.title or self.url
class PostAsset(BaseModel):
class FileType(models.TextChoices):
IMAGE = "image", "Image"
@@ -283,6 +329,7 @@ class Comment(BaseModel):
content = models.TextField()
parent = models.ForeignKey("self", on_delete=models.CASCADE, null=True, blank=True, related_name="replies")
is_approved = models.BooleanField(default=True)
is_hidden = models.BooleanField(default=False)
hidden_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
@@ -292,12 +339,21 @@ class Comment(BaseModel):
)
hidden_at = models.DateTimeField(null=True, blank=True)
moderation_note = models.TextField(blank=True)
deleted_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="deleted_blog_comments",
)
delete_note = models.TextField(blank=True)
class Meta:
ordering = ["created_at"]
indexes = [
models.Index(fields=["post", "is_approved"]),
models.Index(fields=["post", "is_approved", "is_hidden"]),
models.Index(fields=["author", "created_at"]),
models.Index(fields=["parent", "is_deleted", "is_hidden"]),
]
def __str__(self):
@@ -308,11 +364,50 @@ class Comment(BaseModel):
return self.parent is not None
def hide(self, user, note: str = ""):
self.is_approved = False
self.hidden_by = user
self.hidden_at = timezone.now()
self.moderation_note = note
self.save(update_fields=["is_approved", "hidden_by", "hidden_at", "moderation_note", "updated_at"])
now = timezone.now()
ids = [self.id, *self.descendant_ids()]
self.__class__.all_objects.filter(id__in=ids, is_deleted=False).update(
is_hidden=True,
is_approved=False,
hidden_by=user,
hidden_at=now,
moderation_note=note,
updated_at=now,
)
def unhide(self):
now = timezone.now()
ids = [self.id, *self.descendant_ids()]
self.__class__.all_objects.filter(id__in=ids, is_deleted=False).update(
is_hidden=False,
is_approved=True,
hidden_by=None,
hidden_at=None,
moderation_note="",
updated_at=now,
)
def descendant_ids(self) -> list[int]:
pending = [self.id]
descendants: list[int] = []
while pending:
child_ids = list(
self.__class__.all_objects.filter(parent_id__in=pending).values_list("id", flat=True)
)
descendants.extend(child_ids)
pending = child_ids
return descendants
def soft_delete_tree(self, user, note: str = ""):
now = timezone.now()
ids = [self.id, *self.descendant_ids()]
self.__class__.all_objects.filter(id__in=ids).update(
is_deleted=True,
deleted_at=now,
deleted_by=user,
delete_note=note,
updated_at=now,
)
class Like(models.Model):

View File

@@ -7,7 +7,7 @@ from apps.blog.models import Post, Category, Tag
class CategoryResource(resources.ModelResource):
class Meta:
model = Category
fields = ('id', 'name', 'slug', 'description', 'created_at')
fields = ('id', 'name', 'parent', 'slug', 'description', 'created_at')
class PostResource(resources.ModelResource):
author = fields.Field(
@@ -25,8 +25,13 @@ class PostResource(resources.ModelResource):
attribute='tags',
widget=ManyToManyWidget(Tag, field='name', separator='|')
)
writers = fields.Field(
column_name='writers',
attribute='writers',
widget=ManyToManyWidget(User, field='username', separator='|')
)
class Meta:
model = Post
fields = ('id', 'title', 'slug', 'content', 'excerpt', 'author',
'category', 'tags', 'status', 'is_featured', 'published_at', 'created_at')
'category', 'tags', 'writers', 'status', 'is_featured', 'published_at', 'created_at')

View File

@@ -138,6 +138,7 @@ class EventListSchema(Schema):
class EventCreateSchema(Schema):
"""Payload for creating events via the API."""
title: str
slug: Optional[str] = None
description: str
event_type: str
address: Optional[str] = None
@@ -150,11 +151,13 @@ class EventCreateSchema(Schema):
capacity: Optional[int] = None
price: Optional[float] = None
status: str = "draft"
registration_success_markdown: Optional[str] = None
gallery_image_ids: Optional[List[int]] = []
class EventUpdateSchema(Schema):
"""Payload for updating events via the API."""
title: Optional[str] = None
slug: Optional[str] = None
description: Optional[str] = None
event_type: Optional[str] = None
address: Optional[str] = None
@@ -167,6 +170,7 @@ class EventUpdateSchema(Schema):
capacity: Optional[int] = None
price: Optional[float] = None
status: Optional[str] = None
registration_success_markdown: Optional[str] = None
gallery_image_ids: Optional[List[int]] = None
class RegistrationSchema(ModelSchema):
@@ -199,12 +203,56 @@ class AdminUserSchema(Schema):
first_name: str
last_name: str
email: str
mobile: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
university: Optional[str] = None
major: Optional[str] = None
student_id: Optional[str] = None
year_of_study: Optional[int] = None
@staticmethod
def resolve_profile_picture(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
return request.build_absolute_uri(image.url) if hasattr(image, "url") else None
@staticmethod
def resolve_profile_picture_thumbnail_url(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
url = derivative_url(image, THUMBNAIL_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_profile_picture_preview_url(obj, context):
image = getattr(obj, "profile_picture", None)
if not getattr(image, "name", None):
return None
request = context["request"]
url = derivative_url(image, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_university(obj):
return obj.get_university_display()
@staticmethod
def resolve_major(obj):
return obj.get_major_display()
class PaymentAdminSchema(Schema):
id: int
authority: Optional[str]
ref_id: Optional[str]
card_pan: Optional[str]
card_hash: Optional[str]
status: int
status_label: str
base_amount: int
@@ -241,7 +289,7 @@ class EventAdminDetailSchema(EventSchema):
@staticmethod
def resolve_registrations(obj):
return obj.registrations.select_related("user").prefetch_related(
return obj.registrations.select_related("user", "user__university", "user__major").prefetch_related(
"payments__discount_code"
).order_by("-registered_at")

View File

@@ -1,18 +1,20 @@
from django.conf import settings
from django.core.files.base import ContentFile
from django.shortcuts import get_object_or_404
from django.db.models import Q, Case, When, IntegerField
from django.utils.text import slugify
from django.utils import timezone
from ninja import Router, Query
from ninja import File, Router, Query, UploadedFile
from ninja.errors import HttpError
from typing import List, Optional
from uuid import UUID
from uuid import UUID, uuid4
from apps.events.api.schemas import (
EventAdminDetailSchema,
EventBriefSchema,
EventCreateSchema,
EventGallerySchema,
EventListSchema,
EventSchema,
EventUpdateSchema,
@@ -26,6 +28,8 @@ from apps.events.api.schemas import (
)
from core.authentication import jwt_auth
from apps.events.models import Event, Registration
from apps.gallery.models import Gallery
from apps.gallery.tasks import process_uploaded_image
from apps.notifications.services import notify_user
from apps.payments.models import DiscountCode
from apps.users.tasks import send_critical_sms
@@ -34,6 +38,28 @@ from core.api.schemas import ErrorSchema, MessageSchema
events_router = Router()
def _is_staff_user(user) -> bool:
return bool(user and (user.is_staff or user.is_superuser))
def _staff_forbidden():
return 403, {"error": "اجازه دسترسی ندارید."}
def _save_uploaded_image(instance, field_name: str, file: UploadedFile, folder: str):
if not file.content_type or not file.content_type.startswith("image/"):
return False, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return False, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
getattr(instance, field_name).save(
f"{folder}/{uuid4().hex}.{extension}",
ContentFile(file.read()),
save=True,
)
return True, instance
def _frontend_event_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):
@@ -130,20 +156,32 @@ def get_event_by_slug(request, slug: str):
)
return event
@events_router.post("/", response=EventSchema)
@events_router.post("/", response={201: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def create_event(request, payload: EventCreateSchema):
"""Create a new event"""
gallery_image_ids = payload.dict().pop('gallery_image_ids', [])
event = Event.objects.create(**payload.dict(exclude={'gallery_image_ids'}))
if not _is_staff_user(request.auth):
return _staff_forbidden()
data = payload.dict(exclude={'gallery_image_ids'})
gallery_image_ids = payload.gallery_image_ids or []
if data.get("slug"):
data["slug"] = slugify(data["slug"])
event = Event(**data)
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids:
event.gallery_images.set(gallery_image_ids)
return event
return 201, event
@events_router.put("/{int:event_id}", response=EventSchema)
@events_router.put("/{int:event_id}", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_event(request, event_id: int, payload: EventUpdateSchema):
"""Update an existing event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
previous_state = {
"status": event.status,
@@ -158,12 +196,18 @@ def update_event(request, event_id: int, payload: EventUpdateSchema):
gallery_image_ids = update_data.pop('gallery_image_ids', None)
for attr, value in update_data.items():
if attr == "slug" and value:
value = slugify(value)
setattr(event, attr, value)
if 'title' in update_data:
if 'title' in update_data and not update_data.get("slug"):
event.slug = slugify(event.title)
event.save()
try:
event.full_clean()
event.save()
except Exception as exc:
return 400, {"error": str(exc)}
if gallery_image_ids is not None:
event.gallery_images.set(gallery_image_ids)
@@ -196,14 +240,94 @@ def update_event(request, event_id: int, payload: EventUpdateSchema):
sms_kind="event_reschedule",
)
return event
return 200, event
@events_router.delete("/{int:event_id}", response=MessageSchema)
@events_router.delete("/{int:event_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event(request, event_id: int):
"""Soft delete an event"""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
event.delete()
return {"message": "Event deleted successfully"}
return 200, {"message": "Event deleted successfully"}
@events_router.post("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_featured_image(request, event_id: int, file: UploadedFile = File(...)):
"""Upload or replace the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
ok, result = _save_uploaded_image(event, "featured_image", file, "events/featured")
if not ok:
return 400, result
return 200, event
@events_router.delete("/{int:event_id}/featured-image", response={200: EventSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_featured_image(request, event_id: int):
"""Remove the poster/featured image for an event."""
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if event.featured_image:
event.featured_image.delete(save=False)
event.featured_image = None
event.save(update_fields=["featured_image", "updated_at"])
return 200, event
@events_router.get("/{int:event_id}/gallery", response={200: List[EventGallerySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_event_gallery(request, event_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
return 200, event.gallery_images.filter(is_deleted=False).select_related("uploaded_by")
@events_router.post("/{int:event_id}/gallery", response={201: EventGallerySchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_event_gallery_image(
request,
event_id: int,
file: UploadedFile = File(...),
title: str | None = None,
alt_text: str | None = None,
):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
if not file.content_type or not file.content_type.startswith("image/"):
return 400, {"error": "فایل باید تصویر باشد."}
if file.size > 10 * 1024 * 1024:
return 400, {"error": "حجم فایل باید کمتر از ۱۰ مگابایت باشد."}
try:
gallery_item = Gallery.objects.create(
title=title or file.name,
description="",
uploaded_by=request.auth,
alt_text=alt_text or title or file.name,
is_public=True,
)
gallery_item._defer_image_processing = True
extension = file.name.rsplit(".", 1)[-1] if "." in file.name else "jpg"
gallery_item.image.save(f"gallery/{uuid4().hex}.{extension}", ContentFile(file.read()))
event.gallery_images.add(gallery_item)
process_uploaded_image.delay(gallery_item.id)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, gallery_item
@events_router.delete("/{int:event_id}/gallery/{int:image_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_event_gallery_image(request, event_id: int, image_id: int):
if not _is_staff_user(request.auth):
return _staff_forbidden()
event = get_object_or_404(Event, id=event_id, is_deleted=False)
image = get_object_or_404(Gallery, id=image_id, is_deleted=False)
event.gallery_images.remove(image)
if not image.event_galleries.exclude(id=event.id).exists():
image.delete()
return 200, {"message": "Gallery image removed"}
# Registration endpoints
@events_router.get("/{int:event_id}/registrations", response=List[RegistrationSchema])
@@ -235,7 +359,7 @@ def list_event_registrations_admin(
event = get_object_or_404(Event, id=event_id, is_deleted=False)
qs = (
event.registrations.filter(is_deleted=False)
.select_related("user")
.select_related("user", "user__university", "user__major")
.prefetch_related("payments__discount_code")
.order_by("-registered_at")
)
@@ -259,6 +383,7 @@ def list_event_registrations_admin(
if search:
qs = qs.filter(
Q(user__username__icontains=search)
| Q(user__mobile__icontains=search)
| Q(user__email__icontains=search)
| Q(user__first_name__icontains=search)
| Q(user__last_name__icontains=search)

View File

@@ -37,6 +37,7 @@ class EventsAPIIntegrationTests(TestCase):
cls.user = User.objects.create_user(
username="event_user",
email="event.user@example.com",
mobile="09198000001",
password=cls.password,
)
cls.user.is_email_verified = True
@@ -45,6 +46,7 @@ class EventsAPIIntegrationTests(TestCase):
cls.staff = User.objects.create_user(
username="event_staff",
email="event.staff@example.com",
mobile="09198000002",
password=cls.password,
is_staff=True,
)
@@ -151,19 +153,21 @@ class EventsAPIIntegrationTests(TestCase):
"/api/events/",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(created.status_code, 200)
self.assertEqual(created.status_code, 201)
event_id = created.json()["id"]
updated = self.client.put(
f"/api/events/{event_id}",
data=json.dumps({"title": "Updated Event"}),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(updated.status_code, 200)
self.assertEqual(updated.json()["title"], "Updated Event")
deleted = self.client.delete(f"/api/events/{event_id}")
deleted = self.client.delete(f"/api/events/{event_id}", **self._auth_headers(self.staff_token))
self.assertEqual(deleted.status_code, 200)
def test_admin_detail_and_registration_list_requires_staff(self):
@@ -230,9 +234,10 @@ class EventsAPIIntegrationTests(TestCase):
"/api/events/",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
body = response.json()
self.assertEqual(response.status_code, 200)
self.assertEqual(response.status_code, 201)
self.assertTrue(body["gallery_images"])
updated = self.client.put(
@@ -244,6 +249,7 @@ class EventsAPIIntegrationTests(TestCase):
}
),
content_type="application/json",
**self._auth_headers(self.staff_token),
)
self.assertEqual(updated.status_code, 200)
self.assertEqual(updated.json()["slug"], "gallery-event-updated")
@@ -370,7 +376,8 @@ class EventsAPIIntegrationTests(TestCase):
self.assertEqual(response.status_code, 400)
def _create_event_user(self, username, email):
user = User.objects.create_user(username=username, email=email, password=self.password)
suffix = str(abs(hash(username)) % 1_000_000).zfill(6)
user = User.objects.create_user(username=username, email=email, mobile=f"09190{suffix}", password=self.password)
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
user.major = self.user.major
@@ -468,6 +475,7 @@ class EventSchemasIntegrationTests(TestCase):
self.user = User.objects.create_user(
username="schema_user",
email="schema.user@example.com",
mobile="09198000003",
password=self.password,
)
self.user.is_email_verified = True

View File

@@ -1,4 +1,42 @@
from ninja import Schema
from datetime import datetime
class DiscountCodeSchema(Schema):
id: int
code: str
type: str
value: int
max_discount: int | None = None
is_active: bool
starts_at: datetime | None = None
ends_at: datetime | None = None
usage_limit_total: int | None = None
usage_limit_per_user: int | None = None
min_amount: int | None = None
applicable_event_ids: list[int]
usage_count: int = 0
created_at: datetime
updated_at: datetime
class PagedDiscountCodeSchema(Schema):
count: int
results: list[DiscountCodeSchema]
class DiscountCodeWriteSchema(Schema):
code: str
type: str = "percent"
value: int
max_discount: int | None = None
is_active: bool = True
starts_at: datetime | None = None
ends_at: datetime | None = None
usage_limit_total: int | None = None
usage_limit_per_user: int | None = None
min_amount: int | None = None
applicable_event_ids: list[int] = []
class CreatePaymentIn(Schema):

View File

@@ -1,8 +1,9 @@
from django.conf import settings
from django.shortcuts import redirect, get_object_or_404
from django.utils import timezone
from django.db.models import Count, Q
from ninja import Router
from ninja import Query, Router
from ninja.errors import HttpError
import requests
@@ -11,11 +12,140 @@ from apps.events.models import Event, Registration
from apps.notifications.services import notify_user
from apps.users.tasks import send_critical_sms
from core.authentication import jwt_auth
from apps.payments.api.schemas import CouponVerifyIn, CouponVerifyOut, CreatePaymentIn, CreatePaymentOut, PaymentDetailOut
from core.api.schemas import ErrorSchema, MessageSchema
from apps.payments.api.schemas import (
CouponVerifyIn,
CouponVerifyOut,
CreatePaymentIn,
CreatePaymentOut,
DiscountCodeSchema,
DiscountCodeWriteSchema,
PagedDiscountCodeSchema,
PaymentDetailOut,
)
payments_router = Router(tags=["Payments"])
def _staff_required(user):
return bool(user and (user.is_staff or user.is_superuser))
def _discount_payload(code: DiscountCode):
return {
"id": code.id,
"code": code.code,
"type": code.type,
"value": code.value,
"max_discount": code.max_discount,
"is_active": code.is_active,
"starts_at": code.starts_at,
"ends_at": code.ends_at,
"usage_limit_total": code.usage_limit_total,
"usage_limit_per_user": code.usage_limit_per_user,
"min_amount": code.min_amount,
"applicable_event_ids": list(code.applicable_events.values_list("id", flat=True)),
"usage_count": getattr(code, "usage_count", None) or code.payments.filter(
status__in=[Payment.OrderStatusChoices.PAID, Payment.OrderStatusChoices.PENDING]
).count(),
"created_at": code.created_at,
"updated_at": code.updated_at,
}
def _apply_discount_payload(instance: DiscountCode, payload: DiscountCodeWriteSchema):
data = payload.dict()
event_ids = data.pop("applicable_event_ids", [])
for field, value in data.items():
setattr(instance, field, value)
instance.code = instance.code.strip().upper()
instance.full_clean()
instance.save()
instance.applicable_events.set(Event.objects.filter(id__in=event_ids, is_deleted=False))
return instance
@payments_router.get("/admin/discount-codes", response={200: PagedDiscountCodeSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_discount_codes(
request,
search: str | None = Query(None),
is_active: bool | None = Query(None),
type: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
queryset = DiscountCode.objects.annotate(
usage_count=Count(
"payments",
filter=Q(
payments__status__in=[
Payment.OrderStatusChoices.PAID,
Payment.OrderStatusChoices.PENDING,
]
),
)
).prefetch_related("applicable_events").order_by("-created_at")
if search:
queryset = queryset.filter(code__icontains=search)
if is_active is not None:
queryset = queryset.filter(is_active=is_active)
if type:
queryset = queryset.filter(type=type)
count = queryset.count()
return 200, {"count": count, "results": [_discount_payload(item) for item in queryset[offset : offset + limit]]}
@payments_router.post("/admin/discount-codes", response={201: DiscountCodeSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_discount_code(request, payload: DiscountCodeWriteSchema):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
if DiscountCode.all_objects.filter(code=payload.code.strip().upper()).exists():
return 400, {"error": "Discount code already exists"}
try:
code = _apply_discount_payload(DiscountCode(), payload)
except Exception as exc:
return 400, {"error": str(exc)}
return 201, _discount_payload(code)
@payments_router.put("/admin/discount-codes/{int:code_id}", response={200: DiscountCodeSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_discount_code(request, code_id: int, payload: DiscountCodeWriteSchema):
if not _staff_required(request.auth):
return 403, {"error": "Permission denied"}
code = get_object_or_404(DiscountCode, id=code_id, is_deleted=False)
normalized = payload.code.strip().upper()
if DiscountCode.all_objects.filter(code=normalized).exclude(id=code_id).exists():
return 400, {"error": "Discount code already exists"}
try:
code = _apply_discount_payload(code, payload)
except Exception as exc:
return 400, {"error": str(exc)}
return 200, _discount_payload(code)
@payments_router.delete("/admin/discount-codes/{int:code_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_discount_code(request, code_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete discount codes"}
code = get_object_or_404(DiscountCode, id=code_id, is_deleted=False)
code.delete()
return 200, {"message": "Discount code deleted"}
@payments_router.post("/admin/discount-codes/{int:code_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_discount_code(request, code_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore discount codes"}
try:
code = DiscountCode.deleted_objects.get(id=code_id)
except DiscountCode.DoesNotExist:
return 400, {"error": "Discount code not found"}
code.restore()
return 200, {"message": "Discount code restored"}
def _event_action_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/") or "/"
if not root.endswith("/"):

View File

@@ -14,6 +14,7 @@ from core.admin import SoftDeleteListFilter, BaseModelAdmin
class UserAdminForm(forms.ModelForm):
mobile = forms.CharField(required=True)
bio = forms.CharField(widget=SimpleMDEEditor(), required=False)
student_id = forms.CharField(required=False)
@@ -25,13 +26,13 @@ class UserAdminForm(forms.ModelForm):
class UserAdmin(BaseUserAdmin, BaseModelAdmin, ImportExportModelAdmin):
form = UserAdminForm
resource_class = UserResource
list_display = ('email', 'username', 'university', 'is_email_verified', 'date_joined')
list_filter = ('is_email_verified', 'is_staff', 'year_of_study', SoftDeleteListFilter)
search_fields = ('email', 'username', 'student_id', 'first_name', 'last_name')
list_display = ('email', 'mobile', 'username', 'university', 'is_email_verified', 'is_mobile_verified', 'date_joined')
list_filter = ('is_email_verified', 'is_mobile_verified', 'is_staff', 'year_of_study', SoftDeleteListFilter)
search_fields = ('email', 'mobile', 'username', 'student_id', 'first_name', 'last_name')
ordering = ('-date_joined',)
fieldsets = (
('Auth Credentials', {'fields': ('username', 'email', 'password')}),
('Auth Credentials', {'fields': ('username', 'email', 'mobile', 'password')}),
('Personal info', {
'fields': ('first_name', 'last_name', 'student_id', 'university', 'year_of_study', 'major', 'bio', 'profile_picture')
}),
@@ -43,6 +44,9 @@ class UserAdmin(BaseUserAdmin, BaseModelAdmin, ImportExportModelAdmin):
('Email Verification', {
'fields': ('is_email_verified', 'email_verification_token', 'email_verification_sent_at')
}),
('Mobile Verification', {
'fields': ('is_mobile_verified',)
}),
('Password Reset', {
'fields': ('password_reset_token', 'password_reset_token_expires_at'),
'classes': ('collapse',)
@@ -57,7 +61,7 @@ class UserAdmin(BaseUserAdmin, BaseModelAdmin, ImportExportModelAdmin):
'Step 1',
{
'classes': ('wide',),
'fields': ('email', 'student_id', 'password1', 'password2', 'usable_password'),
'fields': ('email', 'mobile', 'student_id', 'password1', 'password2', 'usable_password'),
},
),
)

View File

@@ -1,15 +1,209 @@
from ninja import Router
from django.db.models import Q
from ninja import Query, Router, Schema
from apps.users.models import Major, University
from core.api.schemas import ErrorSchema, MessageSchema
from core.authentication import jwt_auth
meta_router = Router(tags=['meta'])
@meta_router.get("/majors")
def list_majors(request):
majors = Major.objects.filter(is_deleted=False, is_active=True).order_by("name")
return [{"id": m.id, "code": m.code, "label": m.name} for m in majors]
@meta_router.get("/universities")
def list_universities(request):
universities = University.objects.filter(is_deleted=False, is_active=True).order_by("name")
return [{"id": u.id, "code": u.code, "label": u.name} for u in universities]
class MetaOptionSchema(Schema):
id: int
code: str
label: str
is_active: bool = True
user_count: int = 0
class PagedMetaOptionSchema(Schema):
count: int
results: list[MetaOptionSchema]
class MetaOptionWriteSchema(Schema):
code: str
name: str
is_active: bool = True
def _is_staff(user):
return bool(user and (user.is_staff or user.is_superuser))
def _option_payload(obj, user_count=0):
return {
"id": obj.id,
"code": obj.code,
"label": obj.name,
"is_active": obj.is_active,
"user_count": user_count,
}
def _list_options(model, search, limit, offset, active_only=True):
queryset = model.objects.filter(is_deleted=False).order_by("name")
if active_only:
queryset = queryset.filter(is_active=True)
if search:
queryset = queryset.filter(Q(code__icontains=search) | Q(name__icontains=search))
count = queryset.count()
return count, list(queryset[offset : offset + limit])
@meta_router.get("/majors", response=PagedMetaOptionSchema)
def list_majors(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
count, majors = _list_options(Major, search, limit, offset)
return {"count": count, "results": [_option_payload(m) for m in majors]}
@meta_router.get("/universities", response=PagedMetaOptionSchema)
def list_universities(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
count, universities = _list_options(University, search, limit, offset)
return {"count": count, "results": [_option_payload(u) for u in universities]}
@meta_router.get("/admin/majors", response={200: PagedMetaOptionSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_majors(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
count, majors = _list_options(Major, search, limit, offset, active_only=False)
return 200, {
"count": count,
"results": [_option_payload(m, m.users.filter(is_deleted=False).count()) for m in majors],
}
@meta_router.post("/admin/majors", response={201: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_major(request, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
if Major.all_objects.filter(code=payload.code).exists():
return 400, {"error": "Major code already exists"}
major = Major.objects.create(code=payload.code.strip(), name=payload.name.strip(), is_active=payload.is_active)
return 201, _option_payload(major)
@meta_router.put("/admin/majors/{int:item_id}", response={200: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_major(request, item_id: int, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
try:
major = Major.objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
conflict = Major.all_objects.filter(code=payload.code).exclude(id=item_id).exists()
if conflict:
return 400, {"error": "Major code already exists"}
major.code = payload.code.strip()
major.name = payload.name.strip()
major.is_active = payload.is_active
major.save(update_fields=["code", "name", "is_active", "updated_at"])
return 200, _option_payload(major, major.users.filter(is_deleted=False).count())
@meta_router.delete("/admin/majors/{int:item_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_major(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete majors"}
try:
major = Major.objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
major.delete()
return 200, {"message": "Major deleted"}
@meta_router.post("/admin/majors/{int:item_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_major(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore majors"}
try:
major = Major.deleted_objects.get(id=item_id)
except Major.DoesNotExist:
return 400, {"error": "Major not found"}
major.restore()
return 200, {"message": "Major restored"}
@meta_router.get("/admin/universities", response={200: PagedMetaOptionSchema, 403: ErrorSchema}, auth=jwt_auth)
def admin_list_universities(
request,
search: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
count, universities = _list_options(University, search, limit, offset, active_only=False)
return 200, {
"count": count,
"results": [_option_payload(u, u.users.filter(is_deleted=False).count()) for u in universities],
}
@meta_router.post("/admin/universities", response={201: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_create_university(request, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
if University.all_objects.filter(code=payload.code).exists():
return 400, {"error": "University code already exists"}
university = University.objects.create(code=payload.code.strip(), name=payload.name.strip(), is_active=payload.is_active)
return 201, _option_payload(university)
@meta_router.put("/admin/universities/{int:item_id}", response={200: MetaOptionSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_update_university(request, item_id: int, payload: MetaOptionWriteSchema):
if not _is_staff(request.auth):
return 403, {"error": "Permission denied"}
try:
university = University.objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
conflict = University.all_objects.filter(code=payload.code).exclude(id=item_id).exists()
if conflict:
return 400, {"error": "University code already exists"}
university.code = payload.code.strip()
university.name = payload.name.strip()
university.is_active = payload.is_active
university.save(update_fields=["code", "name", "is_active", "updated_at"])
return 200, _option_payload(university, university.users.filter(is_deleted=False).count())
@meta_router.delete("/admin/universities/{int:item_id}", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_delete_university(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete universities"}
try:
university = University.objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
university.delete()
return 200, {"message": "University deleted"}
@meta_router.post("/admin/universities/{int:item_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def admin_restore_university(request, item_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can restore universities"}
try:
university = University.deleted_objects.get(id=item_id)
except University.DoesNotExist:
return 400, {"error": "University not found"}
university.restore()
return 200, {"message": "University restored"}

View File

@@ -1,7 +1,7 @@
"""Authentication-related API schemas."""
from datetime import datetime
from typing import Optional
from typing import List, Optional
from ninja import ModelSchema, Schema
@@ -178,6 +178,19 @@ class UserListSchema(ModelSchema):
major: Optional[str] = None
university: Optional[str] = None
mobile: Optional[str] = None
profile_picture: Optional[str] = None
profile_picture_thumbnail_url: Optional[str] = None
profile_picture_preview_url: Optional[str] = None
student_id: Optional[str] = None
year_of_study: Optional[int] = None
bio: Optional[str] = None
is_email_verified: bool
is_mobile_verified: bool
is_deleted: bool
deleted_at: Optional[datetime] = None
can_access_blog_admin: bool
can_write_blog_posts: bool
can_review_blog_posts: bool
class Meta:
model = User
@@ -188,13 +201,19 @@ class UserListSchema(ModelSchema):
"mobile",
"first_name",
"last_name",
"student_id",
"year_of_study",
"bio",
"is_active",
"is_staff",
"is_superuser",
"date_joined",
"major",
"university",
"is_email_verified",
"is_mobile_verified",
"is_deleted",
"deleted_at",
]
@staticmethod
@@ -205,6 +224,64 @@ class UserListSchema(ModelSchema):
def resolve_university(obj):
return obj.get_university_display()
@staticmethod
def resolve_can_access_blog_admin(obj):
return can_access_blog_admin(obj)
@staticmethod
def resolve_can_write_blog_posts(obj):
return can_write_blog_posts(obj)
@staticmethod
def resolve_can_review_blog_posts(obj):
return can_review_blog_posts(obj)
@staticmethod
def resolve_profile_picture(obj, context):
request = context["request"]
if obj.profile_picture and hasattr(obj.profile_picture, "url"):
return request.build_absolute_uri(obj.profile_picture.url)
return None
@staticmethod
def resolve_profile_picture_thumbnail_url(obj, context):
request = context["request"]
url = derivative_url(obj.profile_picture, THUMBNAIL_VARIANT)
return request.build_absolute_uri(url) if url else None
@staticmethod
def resolve_profile_picture_preview_url(obj, context):
request = context["request"]
url = derivative_url(obj.profile_picture, PREVIEW_VARIANT)
return request.build_absolute_uri(url) if url else None
class AuthorizationRoleSchema(Schema):
key: str
label: str
description: str
enabled: bool = False
locked: bool = False
class UserAuthorizationSchema(Schema):
id: int
username: str
email: Optional[str] = None
mobile: Optional[str] = None
first_name: str
last_name: str
is_active: bool
is_staff: bool
is_superuser: bool
groups: List[str]
roles: List[AuthorizationRoleSchema]
class UserAuthorizationUpdateSchema(Schema):
is_staff: bool = False
groups: List[str] = []
class UserUpdateSchema(Schema):
email: Optional[str] = None

View File

@@ -4,6 +4,7 @@ import jwt
import uuid
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.files.base import ContentFile
from django.db.models import Q
from django.http import HttpResponseRedirect
@@ -11,6 +12,7 @@ from django.shortcuts import get_object_or_404
from ninja import Query, Router
from apps.users.api.schemas import (
AuthorizationRoleSchema,
GoogleClaimVerifySchema,
GoogleCompleteSchema,
GoogleFlowResponseSchema,
@@ -25,6 +27,8 @@ from apps.users.api.schemas import (
TokenRefreshIn,
TokenSchema,
UserListSchema,
UserAuthorizationSchema,
UserAuthorizationUpdateSchema,
UserLoginSchema,
UserOtpLoginSchema,
UserProfileSchema,
@@ -32,6 +36,7 @@ from apps.users.api.schemas import (
UserUpdateSchema,
UsernameCheckSchema,
)
from apps.blog.permissions import ASSOCIATION_ADMIN_GROUP, BLOG_EDITOR_GROUP, BLOG_SUPERVISOR_GROUP
from apps.users.email_identity import normalize_email_identity
from apps.users.models import Major, University, User
from apps.users.services.auth import (
@@ -70,11 +75,93 @@ from core.media import delete_image_derivatives
auth_router = Router()
CURATED_ROLE_GROUPS = {
BLOG_EDITOR_GROUP,
BLOG_SUPERVISOR_GROUP,
ASSOCIATION_ADMIN_GROUP,
}
ROLE_SPECS = [
{
"key": BLOG_EDITOR_GROUP,
"label": "ویرایشگر بلاگ",
"description": "امکان نوشتن و مدیریت نوشته‌های خودش در بلاگ.",
"group": BLOG_EDITOR_GROUP,
},
{
"key": BLOG_SUPERVISOR_GROUP,
"label": "سرپرست بلاگ",
"description": "امکان بررسی، انتشار، مدیریت دسته‌ها/برچسب‌ها و نظارت کامنت‌ها.",
"group": BLOG_SUPERVISOR_GROUP,
},
{
"key": ASSOCIATION_ADMIN_GROUP,
"label": "ادمین انجمن",
"description": "نقش سازمانی انجمن برای دسترسی‌های مدیریتی منتخب.",
"group": ASSOCIATION_ADMIN_GROUP,
},
{
"key": "staff_admin",
"label": "دسترسی پنل مدیریت",
"description": "فعال‌سازی is_staff برای ورود به بخش‌های مدیریتی عمومی.",
"field": "is_staff",
},
{
"key": "is_superuser",
"label": "سوپریوزر",
"description": "دسترسی کامل Django؛ از این صفحه قابل تغییر نیست.",
"field": "is_superuser",
"locked": True,
},
]
def _error_response(exc: AuthServiceError | GoogleOAuthFlowError):
return exc.status_code, {"error": exc.message}
def _ensure_superuser(user):
return bool(user and user.is_superuser)
def _role_payload(user: User) -> list[dict]:
user_groups = set(user.groups.values_list("name", flat=True))
roles = []
for spec in ROLE_SPECS:
key = spec["key"]
enabled = False
if spec.get("group"):
enabled = spec["group"] in user_groups
elif spec.get("field"):
enabled = bool(getattr(user, spec["field"]))
roles.append(
{
"key": key,
"label": spec["label"],
"description": spec["description"],
"enabled": enabled,
"locked": bool(spec.get("locked", False)),
}
)
return roles
def _authorization_payload(user: User) -> dict:
return {
"id": user.id,
"username": user.username,
"email": user.email,
"mobile": user.mobile,
"first_name": user.first_name,
"last_name": user.last_name,
"is_active": user.is_active,
"is_staff": user.is_staff,
"is_superuser": user.is_superuser,
"groups": list(user.groups.values_list("name", flat=True)),
"roles": _role_payload(user),
}
def _get_major_from_code(code: str | None):
if not code:
return None
@@ -446,6 +533,64 @@ def list_users(
return queryset[offset : offset + limit]
@auth_router.get("/users/{user_id}", response={200: UserProfileSchema, 403: ErrorSchema, 404: ErrorSchema}, auth=jwt_auth)
def get_user_detail(request, user_id: int):
user = request.auth
if not (user.is_staff or user.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
target = get_object_or_404(User, id=user_id)
return 200, target
@auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_authorization_roles(request):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
return 200, [
{
"key": spec["key"],
"label": spec["label"],
"description": spec["description"],
"enabled": False,
"locked": bool(spec.get("locked", False)),
}
for spec in ROLE_SPECS
]
@auth_router.get("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema}, auth=jwt_auth)
def get_user_authorization(request, user_id: int):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
user = get_object_or_404(User, id=user_id)
return 200, _authorization_payload(user)
@auth_router.put("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_user_authorization(request, user_id: int, data: UserAuthorizationUpdateSchema):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
user = get_object_or_404(User, id=user_id)
if user.id == request.auth.id:
return 400, {"error": "برای جلوگیری از قفل شدن دسترسی، نمی‌توانید نقش‌های خودتان را از این صفحه تغییر دهید."}
requested_groups = set(data.groups or [])
invalid_groups = requested_groups - CURATED_ROLE_GROUPS
if invalid_groups:
return 400, {"error": "نقش انتخاب‌شده معتبر نیست."}
user.is_staff = bool(data.is_staff)
user.save(update_fields=["is_staff"])
current_curated_groups = list(Group.objects.filter(name__in=CURATED_ROLE_GROUPS))
if current_curated_groups:
user.groups.remove(*current_curated_groups)
groups_to_add = [Group.objects.get_or_create(name=name)[0] for name in sorted(requested_groups)]
if groups_to_add:
user.groups.add(*groups_to_add)
return 200, _authorization_payload(user)
@auth_router.get("/check-username", response=UsernameCheckSchema)
def check_username_availability(request, username: str):
return {"exists": User.objects.filter(username=username).exists()}

View File

@@ -0,0 +1,18 @@
import apps.users.models
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("users", "0007_user_is_mobile_verified_user_mobile_alter_user_email_and_more"),
]
operations = [
migrations.AlterModelManagers(
name="user",
managers=[
("objects", apps.users.models.UserManager()),
],
),
]

View File

@@ -1,4 +1,4 @@
from django.contrib.auth.models import AbstractUser
from django.contrib.auth.models import AbstractUser, UserManager as DjangoUserManager
from django.utils import timezone
from django.db import models
@@ -14,6 +14,24 @@ from core.models import BaseModel
from apps.users.email_identity import normalize_email_identity, normalize_mobile_number
class UserManager(DjangoUserManager):
def _normalize_required_mobile(self, mobile):
normalized = normalize_mobile_number(mobile)
if not normalized:
raise ValueError("The mobile number must be set")
return normalized
def create_user(self, username, email=None, password=None, **extra_fields):
extra_fields["mobile"] = self._normalize_required_mobile(extra_fields.get("mobile"))
return super().create_user(username, email=email, password=password, **extra_fields)
def create_superuser(self, username, email=None, password=None, **extra_fields):
extra_fields["mobile"] = self._normalize_required_mobile(extra_fields.get("mobile"))
extra_fields.setdefault("is_active", True)
extra_fields.setdefault("is_mobile_verified", True)
return super().create_superuser(username, email=email, password=password, **extra_fields)
class University(BaseModel):
code = models.CharField(max_length=64, unique=True)
name = models.CharField(max_length=255)
@@ -69,7 +87,9 @@ class User(AbstractUser, BaseModel):
password_reset_token_expires_at = models.DateTimeField(null=True, blank=True)
USERNAME_FIELD = 'username'
REQUIRED_FIELDS = []
REQUIRED_FIELDS = ['mobile']
objects = UserManager()
class Meta:
db_table = 'users'

View File

@@ -1,5 +1,6 @@
from ninja import Router
from apps.analytics.api.views import analytics_router
from apps.blog.api.views import blog_router
from apps.certificates.api.views import certificates_router
from apps.communications.api.views import communications_router
@@ -12,6 +13,7 @@ from apps.users.api.views import auth_router
from core.api.views import health_router
router = Router()
router.add_router("analytics/", analytics_router, tags=["Analytics"])
router.add_router("auth/", auth_router, tags=["Authentication"])
router.add_router("blog/", blog_router, tags=["Blog"])
router.add_router("gallery/", gallery_router, tags=["Gallery"])