From 9acab4af2c20b41b5c4f69fecf4da09ca6000cc4 Mon Sep 17 00:00:00 2001 From: Amirhossein Khalili Date: Fri, 12 Jun 2026 15:08:19 +0330 Subject: [PATCH] feat(users): add authorization management APIs --- apps/users/api/schemas.py | 29 +++++++- apps/users/api/views.py | 136 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 1 deletion(-) diff --git a/apps/users/api/schemas.py b/apps/users/api/schemas.py index d01bff6..8a67f95 100644 --- a/apps/users/api/schemas.py +++ b/apps/users/api/schemas.py @@ -1,7 +1,7 @@ """Authentication-related API schemas.""" from datetime import datetime -from typing import Optional +from typing import List, Optional from ninja import ModelSchema, Schema @@ -206,6 +206,33 @@ class UserListSchema(ModelSchema): return obj.get_university_display() +class AuthorizationRoleSchema(Schema): + key: str + label: str + description: str + enabled: bool = False + locked: bool = False + + +class UserAuthorizationSchema(Schema): + id: int + username: str + email: Optional[str] = None + mobile: Optional[str] = None + first_name: str + last_name: str + is_active: bool + is_staff: bool + is_superuser: bool + groups: List[str] + roles: List[AuthorizationRoleSchema] + + +class UserAuthorizationUpdateSchema(Schema): + is_staff: bool = False + groups: List[str] = [] + + class UserUpdateSchema(Schema): email: Optional[str] = None first_name: Optional[str] = None diff --git a/apps/users/api/views.py b/apps/users/api/views.py index 3a3f32f..1fb5b57 100644 --- a/apps/users/api/views.py +++ b/apps/users/api/views.py @@ -4,6 +4,7 @@ import jwt import uuid from django.conf import settings +from django.contrib.auth.models import Group from django.core.files.base import ContentFile from django.db.models import Q from django.http import HttpResponseRedirect @@ -11,6 +12,7 @@ from django.shortcuts import get_object_or_404 from ninja import Query, Router from apps.users.api.schemas import ( + AuthorizationRoleSchema, GoogleClaimVerifySchema, GoogleCompleteSchema, GoogleFlowResponseSchema, @@ -25,6 +27,8 @@ from apps.users.api.schemas import ( TokenRefreshIn, TokenSchema, UserListSchema, + UserAuthorizationSchema, + UserAuthorizationUpdateSchema, UserLoginSchema, UserOtpLoginSchema, UserProfileSchema, @@ -32,6 +36,7 @@ from apps.users.api.schemas import ( UserUpdateSchema, UsernameCheckSchema, ) +from apps.blog.permissions import ASSOCIATION_ADMIN_GROUP, BLOG_EDITOR_GROUP, BLOG_SUPERVISOR_GROUP from apps.users.email_identity import normalize_email_identity from apps.users.models import Major, University, User from apps.users.services.auth import ( @@ -70,11 +75,93 @@ from core.media import delete_image_derivatives auth_router = Router() +CURATED_ROLE_GROUPS = { + BLOG_EDITOR_GROUP, + BLOG_SUPERVISOR_GROUP, + ASSOCIATION_ADMIN_GROUP, +} + +ROLE_SPECS = [ + { + "key": BLOG_EDITOR_GROUP, + "label": "ویرایشگر بلاگ", + "description": "امکان نوشتن و مدیریت نوشته‌های خودش در بلاگ.", + "group": BLOG_EDITOR_GROUP, + }, + { + "key": BLOG_SUPERVISOR_GROUP, + "label": "سرپرست بلاگ", + "description": "امکان بررسی، انتشار، مدیریت دسته‌ها/برچسب‌ها و نظارت کامنت‌ها.", + "group": BLOG_SUPERVISOR_GROUP, + }, + { + "key": ASSOCIATION_ADMIN_GROUP, + "label": "ادمین انجمن", + "description": "نقش سازمانی انجمن برای دسترسی‌های مدیریتی منتخب.", + "group": ASSOCIATION_ADMIN_GROUP, + }, + { + "key": "staff_admin", + "label": "دسترسی پنل مدیریت", + "description": "فعال‌سازی is_staff برای ورود به بخش‌های مدیریتی عمومی.", + "field": "is_staff", + }, + { + "key": "is_superuser", + "label": "سوپریوزر", + "description": "دسترسی کامل Django؛ از این صفحه قابل تغییر نیست.", + "field": "is_superuser", + "locked": True, + }, +] + def _error_response(exc: AuthServiceError | GoogleOAuthFlowError): return exc.status_code, {"error": exc.message} +def _ensure_superuser(user): + return bool(user and user.is_superuser) + + +def _role_payload(user: User) -> list[dict]: + user_groups = set(user.groups.values_list("name", flat=True)) + roles = [] + for spec in ROLE_SPECS: + key = spec["key"] + enabled = False + if spec.get("group"): + enabled = spec["group"] in user_groups + elif spec.get("field"): + enabled = bool(getattr(user, spec["field"])) + roles.append( + { + "key": key, + "label": spec["label"], + "description": spec["description"], + "enabled": enabled, + "locked": bool(spec.get("locked", False)), + } + ) + return roles + + +def _authorization_payload(user: User) -> dict: + return { + "id": user.id, + "username": user.username, + "email": user.email, + "mobile": user.mobile, + "first_name": user.first_name, + "last_name": user.last_name, + "is_active": user.is_active, + "is_staff": user.is_staff, + "is_superuser": user.is_superuser, + "groups": list(user.groups.values_list("name", flat=True)), + "roles": _role_payload(user), + } + + def _get_major_from_code(code: str | None): if not code: return None @@ -446,6 +533,55 @@ def list_users( return queryset[offset : offset + limit] +@auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth) +def list_authorization_roles(request): + if not _ensure_superuser(request.auth): + return 403, {"error": "اجازه دسترسی ندارید."} + return 200, [ + { + "key": spec["key"], + "label": spec["label"], + "description": spec["description"], + "enabled": False, + "locked": bool(spec.get("locked", False)), + } + for spec in ROLE_SPECS + ] + + +@auth_router.get("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema}, auth=jwt_auth) +def get_user_authorization(request, user_id: int): + if not _ensure_superuser(request.auth): + return 403, {"error": "اجازه دسترسی ندارید."} + user = get_object_or_404(User, id=user_id) + return 200, _authorization_payload(user) + + +@auth_router.put("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth) +def update_user_authorization(request, user_id: int, data: UserAuthorizationUpdateSchema): + if not _ensure_superuser(request.auth): + return 403, {"error": "اجازه دسترسی ندارید."} + user = get_object_or_404(User, id=user_id) + if user.id == request.auth.id: + return 400, {"error": "برای جلوگیری از قفل شدن دسترسی، نمی‌توانید نقش‌های خودتان را از این صفحه تغییر دهید."} + requested_groups = set(data.groups or []) + invalid_groups = requested_groups - CURATED_ROLE_GROUPS + if invalid_groups: + return 400, {"error": "نقش انتخاب‌شده معتبر نیست."} + + user.is_staff = bool(data.is_staff) + user.save(update_fields=["is_staff"]) + + current_curated_groups = list(Group.objects.filter(name__in=CURATED_ROLE_GROUPS)) + if current_curated_groups: + user.groups.remove(*current_curated_groups) + groups_to_add = [Group.objects.get_or_create(name=name)[0] for name in sorted(requested_groups)] + if groups_to_add: + user.groups.add(*groups_to_add) + + return 200, _authorization_payload(user) + + @auth_router.get("/check-username", response=UsernameCheckSchema) def check_username_availability(request, username: str): return {"exists": User.objects.filter(username=username).exists()}