feat(blog): add admin taxonomy APIs

This commit is contained in:
2026-06-12 15:08:07 +03:30
parent 36aef98986
commit 7cbc99a82f
2 changed files with 196 additions and 0 deletions

View File

@@ -22,6 +22,21 @@ class CategorySchema(ModelSchema):
return obj.parent_id
class AdminCategorySchema(CategorySchema):
post_count: int = 0
@staticmethod
def resolve_post_count(obj):
return getattr(obj, "post_count", None) or obj.posts.filter(is_deleted=False).count()
class CategoryWriteSchema(Schema):
name: str
slug: Optional[str] = None
description: Optional[str] = ""
parent_id: Optional[int] = None
class CategoryPathSchema(Schema):
id: int
name: str
@@ -45,6 +60,19 @@ class TagSchema(ModelSchema):
model_fields = ["id", "name", "slug", "created_at"]
class AdminTagSchema(TagSchema):
post_count: int = 0
@staticmethod
def resolve_post_count(obj):
return getattr(obj, "post_count", None) or obj.posts.filter(is_deleted=False).count()
class TagWriteSchema(Schema):
name: str
slug: Optional[str] = None
class TagFilterSchema(Schema):
id: int
name: str

View File

@@ -7,17 +7,21 @@ from typing import List, Optional
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import IntegrityError
from django.db.models import Count, Prefetch, Q
from django.shortcuts import get_object_or_404
from django.utils import timezone
from ninja import File, Form, Query, Router, UploadedFile
from apps.blog.api.schemas import (
AdminCategorySchema,
AdminTagSchema,
BlogBannerSchema,
BlogFiltersSchema,
BlogInteractionSchema,
BlogProfileActivitySchema,
CategorySchema,
CategoryWriteSchema,
CommentCreateSchema,
CommentHideSchema,
CommentSchema,
@@ -30,6 +34,7 @@ from apps.blog.api.schemas import (
PostListSchema,
PostReviewSchema,
TagSchema,
TagWriteSchema,
)
from apps.blog.models import BlogBanner, Category, Comment, Like, Post, PostAsset, SavedPost, Tag
from apps.blog.permissions import (
@@ -310,6 +315,79 @@ def _notify_blog_comment(comment: Comment) -> None:
)
def _can_manage_blog_taxonomy(user) -> bool:
return bool(
user
and getattr(user, "is_authenticated", False)
and (
user.is_superuser
or user.is_staff
or user.has_perm("blog.add_category")
or user.has_perm("blog.change_category")
or user.has_perm("blog.add_tag")
or user.has_perm("blog.change_tag")
)
)
def _category_queryset_with_counts():
return Category.objects.annotate(post_count=Count("posts", filter=Q(posts__is_deleted=False), distinct=True))
def _tag_queryset_with_counts():
return Tag.objects.annotate(post_count=Count("posts", filter=Q(posts__is_deleted=False), distinct=True))
def _validate_category_parent(category_id: int | None, parent_id: int | None) -> tuple[Category | None, str | None]:
if not parent_id:
return None, None
if category_id and parent_id == category_id:
return None, "A category cannot be its own parent."
parent = Category.objects.filter(id=parent_id).first()
if not parent:
return None, "Parent category not found."
current = parent
seen: set[int] = set()
while current:
if current.id in seen:
return None, "Invalid category hierarchy."
seen.add(current.id)
if category_id and current.id == category_id:
return None, "Category parent would create a cycle."
current = current.parent
return parent, None
def _apply_category_payload(category: Category, data: CategoryWriteSchema) -> tuple[Category | None, str | None]:
name = (data.name or "").strip()
if not name:
return None, "Category name is required."
parent, error = _validate_category_parent(category.id, data.parent_id)
if error:
return None, error
category.name = name
if data.slug is not None:
category.slug = data.slug.strip()
category.description = data.description or ""
category.parent = parent
return category, None
def _apply_tag_payload(tag: Tag, data: TagWriteSchema) -> tuple[Tag | None, str | None]:
name = (data.name or "").strip()
if not name:
return None, "Tag name is required."
tag.name = name
if data.slug is not None:
tag.slug = data.slug.strip()
return tag, None
@blog_router.get("/admin/writers", response={200: List[AuthorSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_blog_writers(request):
if not (request.auth.is_superuser or request.auth.is_staff or can_review_blog_posts(request.auth)):
@@ -816,6 +894,96 @@ def restore_comment(request, comment_id: int):
return 400, {"error": "Comment not found or not soft-deleted."}
@blog_router.get("/admin/categories", response={200: List[AdminCategorySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_admin_categories(request):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
return 200, _category_queryset_with_counts().order_by("name")
@blog_router.post("/admin/categories", response={201: AdminCategorySchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def create_admin_category(request, data: CategoryWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
category, error = _apply_category_payload(Category(), data)
if error:
return 400, {"error": error}
try:
category.save()
except IntegrityError:
return 400, {"error": "Category name or slug already exists."}
return 201, _category_queryset_with_counts().get(id=category.id)
@blog_router.put("/admin/categories/{category_id}", response={200: AdminCategorySchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_admin_category(request, category_id: int, data: CategoryWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
category = get_object_or_404(Category, id=category_id)
category, error = _apply_category_payload(category, data)
if error:
return 400, {"error": error}
try:
category.save()
except IntegrityError:
return 400, {"error": "Category name or slug already exists."}
return 200, _category_queryset_with_counts().get(id=category.id)
@blog_router.delete("/admin/categories/{category_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_admin_category(request, category_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
category = get_object_or_404(Category, id=category_id)
category.delete()
return 200, {"message": f"Category '{category.name}' deleted successfully."}
@blog_router.get("/admin/tags", response={200: List[AdminTagSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_admin_tags(request):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
return 200, _tag_queryset_with_counts().order_by("name")
@blog_router.post("/admin/tags", response={201: AdminTagSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def create_admin_tag(request, data: TagWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
tag, error = _apply_tag_payload(Tag(), data)
if error:
return 400, {"error": error}
try:
tag.save()
except IntegrityError:
return 400, {"error": "Tag name or slug already exists."}
return 201, _tag_queryset_with_counts().get(id=tag.id)
@blog_router.put("/admin/tags/{tag_id}", response={200: AdminTagSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_admin_tag(request, tag_id: int, data: TagWriteSchema):
if not _can_manage_blog_taxonomy(request.auth):
return 403, {"error": "Permission denied"}
tag = get_object_or_404(Tag, id=tag_id)
tag, error = _apply_tag_payload(tag, data)
if error:
return 400, {"error": error}
try:
tag.save()
except IntegrityError:
return 400, {"error": "Tag name or slug already exists."}
return 200, _tag_queryset_with_counts().get(id=tag.id)
@blog_router.delete("/admin/tags/{tag_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_admin_tag(request, tag_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
tag = get_object_or_404(Tag, id=tag_id)
tag.delete()
return 200, {"message": f"Tag '{tag.name}' deleted successfully."}
@blog_router.get("/categories", response=List[CategorySchema])
def list_categories(request):
return Category.objects.all()