Compare commits
4 Commits
36aef98986
...
8b307196da
| Author | SHA1 | Date | |
|---|---|---|---|
| 8b307196da | |||
| 690dc7b600 | |||
| 9acab4af2c | |||
| 7cbc99a82f |
@@ -22,6 +22,21 @@ class CategorySchema(ModelSchema):
|
|||||||
return obj.parent_id
|
return obj.parent_id
|
||||||
|
|
||||||
|
|
||||||
|
class AdminCategorySchema(CategorySchema):
|
||||||
|
post_count: int = 0
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def resolve_post_count(obj):
|
||||||
|
return getattr(obj, "post_count", None) or obj.posts.filter(is_deleted=False).count()
|
||||||
|
|
||||||
|
|
||||||
|
class CategoryWriteSchema(Schema):
|
||||||
|
name: str
|
||||||
|
slug: Optional[str] = None
|
||||||
|
description: Optional[str] = ""
|
||||||
|
parent_id: Optional[int] = None
|
||||||
|
|
||||||
|
|
||||||
class CategoryPathSchema(Schema):
|
class CategoryPathSchema(Schema):
|
||||||
id: int
|
id: int
|
||||||
name: str
|
name: str
|
||||||
@@ -45,6 +60,19 @@ class TagSchema(ModelSchema):
|
|||||||
model_fields = ["id", "name", "slug", "created_at"]
|
model_fields = ["id", "name", "slug", "created_at"]
|
||||||
|
|
||||||
|
|
||||||
|
class AdminTagSchema(TagSchema):
|
||||||
|
post_count: int = 0
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def resolve_post_count(obj):
|
||||||
|
return getattr(obj, "post_count", None) or obj.posts.filter(is_deleted=False).count()
|
||||||
|
|
||||||
|
|
||||||
|
class TagWriteSchema(Schema):
|
||||||
|
name: str
|
||||||
|
slug: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class TagFilterSchema(Schema):
|
class TagFilterSchema(Schema):
|
||||||
id: int
|
id: int
|
||||||
name: str
|
name: str
|
||||||
@@ -220,6 +248,7 @@ class PostListSchema(Schema):
|
|||||||
class PostDetailSchema(PostListSchema):
|
class PostDetailSchema(PostListSchema):
|
||||||
content: str
|
content: str
|
||||||
content_html: str
|
content_html: str
|
||||||
|
review_note: Optional[str] = ""
|
||||||
og_image_url: Optional[str] = None
|
og_image_url: Optional[str] = None
|
||||||
assets: List[PostAssetSchema] = []
|
assets: List[PostAssetSchema] = []
|
||||||
|
|
||||||
|
|||||||
@@ -7,17 +7,21 @@ from typing import List, Optional
|
|||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.auth import get_user_model
|
from django.contrib.auth import get_user_model
|
||||||
|
from django.db import IntegrityError
|
||||||
from django.db.models import Count, Prefetch, Q
|
from django.db.models import Count, Prefetch, Q
|
||||||
from django.shortcuts import get_object_or_404
|
from django.shortcuts import get_object_or_404
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from ninja import File, Form, Query, Router, UploadedFile
|
from ninja import File, Form, Query, Router, UploadedFile
|
||||||
|
|
||||||
from apps.blog.api.schemas import (
|
from apps.blog.api.schemas import (
|
||||||
|
AdminCategorySchema,
|
||||||
|
AdminTagSchema,
|
||||||
BlogBannerSchema,
|
BlogBannerSchema,
|
||||||
BlogFiltersSchema,
|
BlogFiltersSchema,
|
||||||
BlogInteractionSchema,
|
BlogInteractionSchema,
|
||||||
BlogProfileActivitySchema,
|
BlogProfileActivitySchema,
|
||||||
CategorySchema,
|
CategorySchema,
|
||||||
|
CategoryWriteSchema,
|
||||||
CommentCreateSchema,
|
CommentCreateSchema,
|
||||||
CommentHideSchema,
|
CommentHideSchema,
|
||||||
CommentSchema,
|
CommentSchema,
|
||||||
@@ -30,6 +34,7 @@ from apps.blog.api.schemas import (
|
|||||||
PostListSchema,
|
PostListSchema,
|
||||||
PostReviewSchema,
|
PostReviewSchema,
|
||||||
TagSchema,
|
TagSchema,
|
||||||
|
TagWriteSchema,
|
||||||
)
|
)
|
||||||
from apps.blog.models import BlogBanner, Category, Comment, Like, Post, PostAsset, SavedPost, Tag
|
from apps.blog.models import BlogBanner, Category, Comment, Like, Post, PostAsset, SavedPost, Tag
|
||||||
from apps.blog.permissions import (
|
from apps.blog.permissions import (
|
||||||
@@ -310,6 +315,83 @@ def _notify_blog_comment(comment: Comment) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _can_manage_blog_taxonomy(user) -> bool:
|
||||||
|
return bool(
|
||||||
|
user
|
||||||
|
and getattr(user, "is_authenticated", False)
|
||||||
|
and (
|
||||||
|
user.is_superuser
|
||||||
|
or user.is_staff
|
||||||
|
or user.has_perm("blog.add_category")
|
||||||
|
or user.has_perm("blog.change_category")
|
||||||
|
or user.has_perm("blog.add_tag")
|
||||||
|
or user.has_perm("blog.change_tag")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _category_queryset_with_counts():
|
||||||
|
return Category.objects.annotate(post_count=Count("posts", filter=Q(posts__is_deleted=False), distinct=True))
|
||||||
|
|
||||||
|
|
||||||
|
def _tag_queryset_with_counts():
|
||||||
|
return Tag.objects.annotate(post_count=Count("posts", filter=Q(posts__is_deleted=False), distinct=True))
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_category_parent(category_id: int | None, parent_id: int | None) -> tuple[Category | None, str | None]:
|
||||||
|
if not parent_id:
|
||||||
|
return None, None
|
||||||
|
if category_id and parent_id == category_id:
|
||||||
|
return None, "A category cannot be its own parent."
|
||||||
|
if category_id and Category.objects.filter(parent_id=category_id).exists():
|
||||||
|
return None, "A category with child categories must remain a root category."
|
||||||
|
|
||||||
|
parent = Category.objects.filter(id=parent_id).first()
|
||||||
|
if not parent:
|
||||||
|
return None, "Parent category not found."
|
||||||
|
if parent.parent_id:
|
||||||
|
return None, "Only root categories can be selected as a parent."
|
||||||
|
|
||||||
|
current = parent
|
||||||
|
seen: set[int] = set()
|
||||||
|
while current:
|
||||||
|
if current.id in seen:
|
||||||
|
return None, "Invalid category hierarchy."
|
||||||
|
seen.add(current.id)
|
||||||
|
if category_id and current.id == category_id:
|
||||||
|
return None, "Category parent would create a cycle."
|
||||||
|
current = current.parent
|
||||||
|
return parent, None
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_category_payload(category: Category, data: CategoryWriteSchema) -> tuple[Category | None, str | None]:
|
||||||
|
name = (data.name or "").strip()
|
||||||
|
if not name:
|
||||||
|
return None, "Category name is required."
|
||||||
|
|
||||||
|
parent, error = _validate_category_parent(category.id, data.parent_id)
|
||||||
|
if error:
|
||||||
|
return None, error
|
||||||
|
|
||||||
|
category.name = name
|
||||||
|
if data.slug is not None:
|
||||||
|
category.slug = data.slug.strip()
|
||||||
|
category.description = data.description or ""
|
||||||
|
category.parent = parent
|
||||||
|
return category, None
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_tag_payload(tag: Tag, data: TagWriteSchema) -> tuple[Tag | None, str | None]:
|
||||||
|
name = (data.name or "").strip()
|
||||||
|
if not name:
|
||||||
|
return None, "Tag name is required."
|
||||||
|
|
||||||
|
tag.name = name
|
||||||
|
if data.slug is not None:
|
||||||
|
tag.slug = data.slug.strip()
|
||||||
|
return tag, None
|
||||||
|
|
||||||
|
|
||||||
@blog_router.get("/admin/writers", response={200: List[AuthorSchema], 403: ErrorSchema}, auth=jwt_auth)
|
@blog_router.get("/admin/writers", response={200: List[AuthorSchema], 403: ErrorSchema}, auth=jwt_auth)
|
||||||
def list_blog_writers(request):
|
def list_blog_writers(request):
|
||||||
if not (request.auth.is_superuser or request.auth.is_staff or can_review_blog_posts(request.auth)):
|
if not (request.auth.is_superuser or request.auth.is_staff or can_review_blog_posts(request.auth)):
|
||||||
@@ -816,6 +898,96 @@ def restore_comment(request, comment_id: int):
|
|||||||
return 400, {"error": "Comment not found or not soft-deleted."}
|
return 400, {"error": "Comment not found or not soft-deleted."}
|
||||||
|
|
||||||
|
|
||||||
|
@blog_router.get("/admin/categories", response={200: List[AdminCategorySchema], 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def list_admin_categories(request):
|
||||||
|
if not _can_manage_blog_taxonomy(request.auth):
|
||||||
|
return 403, {"error": "Permission denied"}
|
||||||
|
return 200, _category_queryset_with_counts().order_by("name")
|
||||||
|
|
||||||
|
|
||||||
|
@blog_router.post("/admin/categories", response={201: AdminCategorySchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def create_admin_category(request, data: CategoryWriteSchema):
|
||||||
|
if not _can_manage_blog_taxonomy(request.auth):
|
||||||
|
return 403, {"error": "Permission denied"}
|
||||||
|
category, error = _apply_category_payload(Category(), data)
|
||||||
|
if error:
|
||||||
|
return 400, {"error": error}
|
||||||
|
try:
|
||||||
|
category.save()
|
||||||
|
except IntegrityError:
|
||||||
|
return 400, {"error": "Category name or slug already exists."}
|
||||||
|
return 201, _category_queryset_with_counts().get(id=category.id)
|
||||||
|
|
||||||
|
|
||||||
|
@blog_router.put("/admin/categories/{category_id}", response={200: AdminCategorySchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def update_admin_category(request, category_id: int, data: CategoryWriteSchema):
|
||||||
|
if not _can_manage_blog_taxonomy(request.auth):
|
||||||
|
return 403, {"error": "Permission denied"}
|
||||||
|
category = get_object_or_404(Category, id=category_id)
|
||||||
|
category, error = _apply_category_payload(category, data)
|
||||||
|
if error:
|
||||||
|
return 400, {"error": error}
|
||||||
|
try:
|
||||||
|
category.save()
|
||||||
|
except IntegrityError:
|
||||||
|
return 400, {"error": "Category name or slug already exists."}
|
||||||
|
return 200, _category_queryset_with_counts().get(id=category.id)
|
||||||
|
|
||||||
|
|
||||||
|
@blog_router.delete("/admin/categories/{category_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def delete_admin_category(request, category_id: int):
|
||||||
|
if not request.auth.is_superuser:
|
||||||
|
return 403, {"error": "Permission denied"}
|
||||||
|
category = get_object_or_404(Category, id=category_id)
|
||||||
|
category.delete()
|
||||||
|
return 200, {"message": f"Category '{category.name}' deleted successfully."}
|
||||||
|
|
||||||
|
|
||||||
|
@blog_router.get("/admin/tags", response={200: List[AdminTagSchema], 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def list_admin_tags(request):
|
||||||
|
if not _can_manage_blog_taxonomy(request.auth):
|
||||||
|
return 403, {"error": "Permission denied"}
|
||||||
|
return 200, _tag_queryset_with_counts().order_by("name")
|
||||||
|
|
||||||
|
|
||||||
|
@blog_router.post("/admin/tags", response={201: AdminTagSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def create_admin_tag(request, data: TagWriteSchema):
|
||||||
|
if not _can_manage_blog_taxonomy(request.auth):
|
||||||
|
return 403, {"error": "Permission denied"}
|
||||||
|
tag, error = _apply_tag_payload(Tag(), data)
|
||||||
|
if error:
|
||||||
|
return 400, {"error": error}
|
||||||
|
try:
|
||||||
|
tag.save()
|
||||||
|
except IntegrityError:
|
||||||
|
return 400, {"error": "Tag name or slug already exists."}
|
||||||
|
return 201, _tag_queryset_with_counts().get(id=tag.id)
|
||||||
|
|
||||||
|
|
||||||
|
@blog_router.put("/admin/tags/{tag_id}", response={200: AdminTagSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def update_admin_tag(request, tag_id: int, data: TagWriteSchema):
|
||||||
|
if not _can_manage_blog_taxonomy(request.auth):
|
||||||
|
return 403, {"error": "Permission denied"}
|
||||||
|
tag = get_object_or_404(Tag, id=tag_id)
|
||||||
|
tag, error = _apply_tag_payload(tag, data)
|
||||||
|
if error:
|
||||||
|
return 400, {"error": error}
|
||||||
|
try:
|
||||||
|
tag.save()
|
||||||
|
except IntegrityError:
|
||||||
|
return 400, {"error": "Tag name or slug already exists."}
|
||||||
|
return 200, _tag_queryset_with_counts().get(id=tag.id)
|
||||||
|
|
||||||
|
|
||||||
|
@blog_router.delete("/admin/tags/{tag_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def delete_admin_tag(request, tag_id: int):
|
||||||
|
if not request.auth.is_superuser:
|
||||||
|
return 403, {"error": "Permission denied"}
|
||||||
|
tag = get_object_or_404(Tag, id=tag_id)
|
||||||
|
tag.delete()
|
||||||
|
return 200, {"message": f"Tag '{tag.name}' deleted successfully."}
|
||||||
|
|
||||||
|
|
||||||
@blog_router.get("/categories", response=List[CategorySchema])
|
@blog_router.get("/categories", response=List[CategorySchema])
|
||||||
def list_categories(request):
|
def list_categories(request):
|
||||||
return Category.objects.all()
|
return Category.objects.all()
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
"""Authentication-related API schemas."""
|
"""Authentication-related API schemas."""
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
from ninja import ModelSchema, Schema
|
from ninja import ModelSchema, Schema
|
||||||
|
|
||||||
@@ -206,6 +206,33 @@ class UserListSchema(ModelSchema):
|
|||||||
return obj.get_university_display()
|
return obj.get_university_display()
|
||||||
|
|
||||||
|
|
||||||
|
class AuthorizationRoleSchema(Schema):
|
||||||
|
key: str
|
||||||
|
label: str
|
||||||
|
description: str
|
||||||
|
enabled: bool = False
|
||||||
|
locked: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
class UserAuthorizationSchema(Schema):
|
||||||
|
id: int
|
||||||
|
username: str
|
||||||
|
email: Optional[str] = None
|
||||||
|
mobile: Optional[str] = None
|
||||||
|
first_name: str
|
||||||
|
last_name: str
|
||||||
|
is_active: bool
|
||||||
|
is_staff: bool
|
||||||
|
is_superuser: bool
|
||||||
|
groups: List[str]
|
||||||
|
roles: List[AuthorizationRoleSchema]
|
||||||
|
|
||||||
|
|
||||||
|
class UserAuthorizationUpdateSchema(Schema):
|
||||||
|
is_staff: bool = False
|
||||||
|
groups: List[str] = []
|
||||||
|
|
||||||
|
|
||||||
class UserUpdateSchema(Schema):
|
class UserUpdateSchema(Schema):
|
||||||
email: Optional[str] = None
|
email: Optional[str] = None
|
||||||
first_name: Optional[str] = None
|
first_name: Optional[str] = None
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import jwt
|
|||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from django.contrib.auth.models import Group
|
||||||
from django.core.files.base import ContentFile
|
from django.core.files.base import ContentFile
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from django.http import HttpResponseRedirect
|
from django.http import HttpResponseRedirect
|
||||||
@@ -11,6 +12,7 @@ from django.shortcuts import get_object_or_404
|
|||||||
from ninja import Query, Router
|
from ninja import Query, Router
|
||||||
|
|
||||||
from apps.users.api.schemas import (
|
from apps.users.api.schemas import (
|
||||||
|
AuthorizationRoleSchema,
|
||||||
GoogleClaimVerifySchema,
|
GoogleClaimVerifySchema,
|
||||||
GoogleCompleteSchema,
|
GoogleCompleteSchema,
|
||||||
GoogleFlowResponseSchema,
|
GoogleFlowResponseSchema,
|
||||||
@@ -25,6 +27,8 @@ from apps.users.api.schemas import (
|
|||||||
TokenRefreshIn,
|
TokenRefreshIn,
|
||||||
TokenSchema,
|
TokenSchema,
|
||||||
UserListSchema,
|
UserListSchema,
|
||||||
|
UserAuthorizationSchema,
|
||||||
|
UserAuthorizationUpdateSchema,
|
||||||
UserLoginSchema,
|
UserLoginSchema,
|
||||||
UserOtpLoginSchema,
|
UserOtpLoginSchema,
|
||||||
UserProfileSchema,
|
UserProfileSchema,
|
||||||
@@ -32,6 +36,7 @@ from apps.users.api.schemas import (
|
|||||||
UserUpdateSchema,
|
UserUpdateSchema,
|
||||||
UsernameCheckSchema,
|
UsernameCheckSchema,
|
||||||
)
|
)
|
||||||
|
from apps.blog.permissions import ASSOCIATION_ADMIN_GROUP, BLOG_EDITOR_GROUP, BLOG_SUPERVISOR_GROUP
|
||||||
from apps.users.email_identity import normalize_email_identity
|
from apps.users.email_identity import normalize_email_identity
|
||||||
from apps.users.models import Major, University, User
|
from apps.users.models import Major, University, User
|
||||||
from apps.users.services.auth import (
|
from apps.users.services.auth import (
|
||||||
@@ -70,11 +75,93 @@ from core.media import delete_image_derivatives
|
|||||||
|
|
||||||
auth_router = Router()
|
auth_router = Router()
|
||||||
|
|
||||||
|
CURATED_ROLE_GROUPS = {
|
||||||
|
BLOG_EDITOR_GROUP,
|
||||||
|
BLOG_SUPERVISOR_GROUP,
|
||||||
|
ASSOCIATION_ADMIN_GROUP,
|
||||||
|
}
|
||||||
|
|
||||||
|
ROLE_SPECS = [
|
||||||
|
{
|
||||||
|
"key": BLOG_EDITOR_GROUP,
|
||||||
|
"label": "ویرایشگر بلاگ",
|
||||||
|
"description": "امکان نوشتن و مدیریت نوشتههای خودش در بلاگ.",
|
||||||
|
"group": BLOG_EDITOR_GROUP,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": BLOG_SUPERVISOR_GROUP,
|
||||||
|
"label": "سرپرست بلاگ",
|
||||||
|
"description": "امکان بررسی، انتشار، مدیریت دستهها/برچسبها و نظارت کامنتها.",
|
||||||
|
"group": BLOG_SUPERVISOR_GROUP,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": ASSOCIATION_ADMIN_GROUP,
|
||||||
|
"label": "ادمین انجمن",
|
||||||
|
"description": "نقش سازمانی انجمن برای دسترسیهای مدیریتی منتخب.",
|
||||||
|
"group": ASSOCIATION_ADMIN_GROUP,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "staff_admin",
|
||||||
|
"label": "دسترسی پنل مدیریت",
|
||||||
|
"description": "فعالسازی is_staff برای ورود به بخشهای مدیریتی عمومی.",
|
||||||
|
"field": "is_staff",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "is_superuser",
|
||||||
|
"label": "سوپریوزر",
|
||||||
|
"description": "دسترسی کامل Django؛ از این صفحه قابل تغییر نیست.",
|
||||||
|
"field": "is_superuser",
|
||||||
|
"locked": True,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def _error_response(exc: AuthServiceError | GoogleOAuthFlowError):
|
def _error_response(exc: AuthServiceError | GoogleOAuthFlowError):
|
||||||
return exc.status_code, {"error": exc.message}
|
return exc.status_code, {"error": exc.message}
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_superuser(user):
|
||||||
|
return bool(user and user.is_superuser)
|
||||||
|
|
||||||
|
|
||||||
|
def _role_payload(user: User) -> list[dict]:
|
||||||
|
user_groups = set(user.groups.values_list("name", flat=True))
|
||||||
|
roles = []
|
||||||
|
for spec in ROLE_SPECS:
|
||||||
|
key = spec["key"]
|
||||||
|
enabled = False
|
||||||
|
if spec.get("group"):
|
||||||
|
enabled = spec["group"] in user_groups
|
||||||
|
elif spec.get("field"):
|
||||||
|
enabled = bool(getattr(user, spec["field"]))
|
||||||
|
roles.append(
|
||||||
|
{
|
||||||
|
"key": key,
|
||||||
|
"label": spec["label"],
|
||||||
|
"description": spec["description"],
|
||||||
|
"enabled": enabled,
|
||||||
|
"locked": bool(spec.get("locked", False)),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return roles
|
||||||
|
|
||||||
|
|
||||||
|
def _authorization_payload(user: User) -> dict:
|
||||||
|
return {
|
||||||
|
"id": user.id,
|
||||||
|
"username": user.username,
|
||||||
|
"email": user.email,
|
||||||
|
"mobile": user.mobile,
|
||||||
|
"first_name": user.first_name,
|
||||||
|
"last_name": user.last_name,
|
||||||
|
"is_active": user.is_active,
|
||||||
|
"is_staff": user.is_staff,
|
||||||
|
"is_superuser": user.is_superuser,
|
||||||
|
"groups": list(user.groups.values_list("name", flat=True)),
|
||||||
|
"roles": _role_payload(user),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def _get_major_from_code(code: str | None):
|
def _get_major_from_code(code: str | None):
|
||||||
if not code:
|
if not code:
|
||||||
return None
|
return None
|
||||||
@@ -446,6 +533,55 @@ def list_users(
|
|||||||
return queryset[offset : offset + limit]
|
return queryset[offset : offset + limit]
|
||||||
|
|
||||||
|
|
||||||
|
@auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def list_authorization_roles(request):
|
||||||
|
if not _ensure_superuser(request.auth):
|
||||||
|
return 403, {"error": "اجازه دسترسی ندارید."}
|
||||||
|
return 200, [
|
||||||
|
{
|
||||||
|
"key": spec["key"],
|
||||||
|
"label": spec["label"],
|
||||||
|
"description": spec["description"],
|
||||||
|
"enabled": False,
|
||||||
|
"locked": bool(spec.get("locked", False)),
|
||||||
|
}
|
||||||
|
for spec in ROLE_SPECS
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@auth_router.get("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def get_user_authorization(request, user_id: int):
|
||||||
|
if not _ensure_superuser(request.auth):
|
||||||
|
return 403, {"error": "اجازه دسترسی ندارید."}
|
||||||
|
user = get_object_or_404(User, id=user_id)
|
||||||
|
return 200, _authorization_payload(user)
|
||||||
|
|
||||||
|
|
||||||
|
@auth_router.put("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
|
||||||
|
def update_user_authorization(request, user_id: int, data: UserAuthorizationUpdateSchema):
|
||||||
|
if not _ensure_superuser(request.auth):
|
||||||
|
return 403, {"error": "اجازه دسترسی ندارید."}
|
||||||
|
user = get_object_or_404(User, id=user_id)
|
||||||
|
if user.id == request.auth.id:
|
||||||
|
return 400, {"error": "برای جلوگیری از قفل شدن دسترسی، نمیتوانید نقشهای خودتان را از این صفحه تغییر دهید."}
|
||||||
|
requested_groups = set(data.groups or [])
|
||||||
|
invalid_groups = requested_groups - CURATED_ROLE_GROUPS
|
||||||
|
if invalid_groups:
|
||||||
|
return 400, {"error": "نقش انتخابشده معتبر نیست."}
|
||||||
|
|
||||||
|
user.is_staff = bool(data.is_staff)
|
||||||
|
user.save(update_fields=["is_staff"])
|
||||||
|
|
||||||
|
current_curated_groups = list(Group.objects.filter(name__in=CURATED_ROLE_GROUPS))
|
||||||
|
if current_curated_groups:
|
||||||
|
user.groups.remove(*current_curated_groups)
|
||||||
|
groups_to_add = [Group.objects.get_or_create(name=name)[0] for name in sorted(requested_groups)]
|
||||||
|
if groups_to_add:
|
||||||
|
user.groups.add(*groups_to_add)
|
||||||
|
|
||||||
|
return 200, _authorization_payload(user)
|
||||||
|
|
||||||
|
|
||||||
@auth_router.get("/check-username", response=UsernameCheckSchema)
|
@auth_router.get("/check-username", response=UsernameCheckSchema)
|
||||||
def check_username_availability(request, username: str):
|
def check_username_availability(request, username: str):
|
||||||
return {"exists": User.objects.filter(username=username).exists()}
|
return {"exists": User.objects.filter(username=username).exists()}
|
||||||
|
|||||||
Reference in New Issue
Block a user