feat(users): add authorization management APIs

This commit is contained in:
2026-06-12 15:08:19 +03:30
parent 7cbc99a82f
commit 9acab4af2c
2 changed files with 164 additions and 1 deletions

View File

@@ -4,6 +4,7 @@ import jwt
import uuid
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.files.base import ContentFile
from django.db.models import Q
from django.http import HttpResponseRedirect
@@ -11,6 +12,7 @@ from django.shortcuts import get_object_or_404
from ninja import Query, Router
from apps.users.api.schemas import (
AuthorizationRoleSchema,
GoogleClaimVerifySchema,
GoogleCompleteSchema,
GoogleFlowResponseSchema,
@@ -25,6 +27,8 @@ from apps.users.api.schemas import (
TokenRefreshIn,
TokenSchema,
UserListSchema,
UserAuthorizationSchema,
UserAuthorizationUpdateSchema,
UserLoginSchema,
UserOtpLoginSchema,
UserProfileSchema,
@@ -32,6 +36,7 @@ from apps.users.api.schemas import (
UserUpdateSchema,
UsernameCheckSchema,
)
from apps.blog.permissions import ASSOCIATION_ADMIN_GROUP, BLOG_EDITOR_GROUP, BLOG_SUPERVISOR_GROUP
from apps.users.email_identity import normalize_email_identity
from apps.users.models import Major, University, User
from apps.users.services.auth import (
@@ -70,11 +75,93 @@ from core.media import delete_image_derivatives
auth_router = Router()
CURATED_ROLE_GROUPS = {
BLOG_EDITOR_GROUP,
BLOG_SUPERVISOR_GROUP,
ASSOCIATION_ADMIN_GROUP,
}
ROLE_SPECS = [
{
"key": BLOG_EDITOR_GROUP,
"label": "ویرایشگر بلاگ",
"description": "امکان نوشتن و مدیریت نوشته‌های خودش در بلاگ.",
"group": BLOG_EDITOR_GROUP,
},
{
"key": BLOG_SUPERVISOR_GROUP,
"label": "سرپرست بلاگ",
"description": "امکان بررسی، انتشار، مدیریت دسته‌ها/برچسب‌ها و نظارت کامنت‌ها.",
"group": BLOG_SUPERVISOR_GROUP,
},
{
"key": ASSOCIATION_ADMIN_GROUP,
"label": "ادمین انجمن",
"description": "نقش سازمانی انجمن برای دسترسی‌های مدیریتی منتخب.",
"group": ASSOCIATION_ADMIN_GROUP,
},
{
"key": "staff_admin",
"label": "دسترسی پنل مدیریت",
"description": "فعال‌سازی is_staff برای ورود به بخش‌های مدیریتی عمومی.",
"field": "is_staff",
},
{
"key": "is_superuser",
"label": "سوپریوزر",
"description": "دسترسی کامل Django؛ از این صفحه قابل تغییر نیست.",
"field": "is_superuser",
"locked": True,
},
]
def _error_response(exc: AuthServiceError | GoogleOAuthFlowError):
return exc.status_code, {"error": exc.message}
def _ensure_superuser(user):
return bool(user and user.is_superuser)
def _role_payload(user: User) -> list[dict]:
user_groups = set(user.groups.values_list("name", flat=True))
roles = []
for spec in ROLE_SPECS:
key = spec["key"]
enabled = False
if spec.get("group"):
enabled = spec["group"] in user_groups
elif spec.get("field"):
enabled = bool(getattr(user, spec["field"]))
roles.append(
{
"key": key,
"label": spec["label"],
"description": spec["description"],
"enabled": enabled,
"locked": bool(spec.get("locked", False)),
}
)
return roles
def _authorization_payload(user: User) -> dict:
return {
"id": user.id,
"username": user.username,
"email": user.email,
"mobile": user.mobile,
"first_name": user.first_name,
"last_name": user.last_name,
"is_active": user.is_active,
"is_staff": user.is_staff,
"is_superuser": user.is_superuser,
"groups": list(user.groups.values_list("name", flat=True)),
"roles": _role_payload(user),
}
def _get_major_from_code(code: str | None):
if not code:
return None
@@ -446,6 +533,55 @@ def list_users(
return queryset[offset : offset + limit]
@auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_authorization_roles(request):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
return 200, [
{
"key": spec["key"],
"label": spec["label"],
"description": spec["description"],
"enabled": False,
"locked": bool(spec.get("locked", False)),
}
for spec in ROLE_SPECS
]
@auth_router.get("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema}, auth=jwt_auth)
def get_user_authorization(request, user_id: int):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
user = get_object_or_404(User, id=user_id)
return 200, _authorization_payload(user)
@auth_router.put("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_user_authorization(request, user_id: int, data: UserAuthorizationUpdateSchema):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
user = get_object_or_404(User, id=user_id)
if user.id == request.auth.id:
return 400, {"error": "برای جلوگیری از قفل شدن دسترسی، نمی‌توانید نقش‌های خودتان را از این صفحه تغییر دهید."}
requested_groups = set(data.groups or [])
invalid_groups = requested_groups - CURATED_ROLE_GROUPS
if invalid_groups:
return 400, {"error": "نقش انتخاب‌شده معتبر نیست."}
user.is_staff = bool(data.is_staff)
user.save(update_fields=["is_staff"])
current_curated_groups = list(Group.objects.filter(name__in=CURATED_ROLE_GROUPS))
if current_curated_groups:
user.groups.remove(*current_curated_groups)
groups_to_add = [Group.objects.get_or_create(name=name)[0] for name in sorted(requested_groups)]
if groups_to_add:
user.groups.add(*groups_to_add)
return 200, _authorization_payload(user)
@auth_router.get("/check-username", response=UsernameCheckSchema)
def check_username_availability(request, username: str):
return {"exists": User.objects.filter(username=username).exists()}