Files
guilan-ace-backend/apps/blog/api/views.py
Amirhossein Khalili 4039be0187
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
feat(blog): add recommended posts endpoint
2026-06-10 11:55:44 +03:30

578 lines
23 KiB
Python

from __future__ import annotations
import mimetypes
from pathlib import Path
from typing import List, Optional
from django.conf import settings
from django.db.models import Count, Prefetch, Q
from django.shortcuts import get_object_or_404
from django.utils import timezone
from ninja import File, Form, Query, Router, UploadedFile
from apps.blog.api.schemas import (
BlogInteractionSchema,
BlogProfileActivitySchema,
CategorySchema,
CommentCreateSchema,
CommentHideSchema,
CommentSchema,
PostAssetCreateSchema,
PostAssetSchema,
PostCreateSchema,
PostDetailSchema,
PostListSchema,
PostReviewSchema,
TagSchema,
)
from apps.blog.models import Category, Comment, Like, Post, PostAsset, SavedPost, Tag
from apps.blog.permissions import (
can_access_blog_admin,
can_edit_post,
can_manage_post_assets,
can_moderate_blog_comments,
can_review_blog_posts,
can_write_blog_posts,
)
from core.api.schemas import ErrorSchema, MessageSchema
from core.authentication import jwt_auth
blog_router = Router()
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".svg"}
VIDEO_EXTENSIONS = {".mp4", ".webm", ".ogg", ".mov", ".mkv", ".avi"}
DOCUMENT_EXTENSIONS = {".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".txt", ".md", ".csv"}
ARCHIVE_EXTENSIONS = {".zip", ".rar", ".7z", ".tar", ".gz", ".bz2"}
def _post_queryset():
return (
Post.objects.select_related("author", "category", "reviewed_by", "published_by")
.prefetch_related("tags", "assets__uploaded_by")
.annotate(
likes_count=Count("likes", distinct=True),
saves_count=Count("saves", distinct=True),
comments_count=Count("comments", filter=Q(comments__is_approved=True), distinct=True),
)
)
def _published_queryset():
return _post_queryset().filter(status=Post.StatusChoices.PUBLISHED)
def _asset_file_type(file: UploadedFile) -> str:
suffix = Path(file.name).suffix.lower()
content_type = (file.content_type or mimetypes.guess_type(file.name)[0] or "").lower()
if content_type.startswith("image/") or suffix in IMAGE_EXTENSIONS:
return PostAsset.FileType.IMAGE
if content_type.startswith("video/") or suffix in VIDEO_EXTENSIONS:
return PostAsset.FileType.VIDEO
if suffix in DOCUMENT_EXTENSIONS:
return PostAsset.FileType.DOCUMENT
if suffix in ARCHIVE_EXTENSIONS:
return PostAsset.FileType.ARCHIVE
return PostAsset.FileType.OTHER
def _validate_asset_file(file: UploadedFile) -> str | None:
file_type = _asset_file_type(file)
suffix = Path(file.name).suffix.lower()
if file_type == PostAsset.FileType.OTHER:
return "Unsupported file type."
max_size_mb = getattr(settings, "BLOG_ASSET_MAX_SIZE_MB", 50)
image_max_size_mb = getattr(settings, "BLOG_IMAGE_ASSET_MAX_SIZE_MB", 10)
limit_mb = image_max_size_mb if file_type == PostAsset.FileType.IMAGE else max_size_mb
if file.size > limit_mb * 1024 * 1024:
return f"File size must be less than {limit_mb}MB."
allowed = IMAGE_EXTENSIONS | VIDEO_EXTENSIONS | DOCUMENT_EXTENSIONS | ARCHIVE_EXTENSIONS
if suffix and suffix not in allowed:
return "Unsupported file extension."
return None
def _validate_featured_image(file: UploadedFile) -> str | None:
suffix = Path(file.name).suffix.lower()
content_type = (file.content_type or mimetypes.guess_type(file.name)[0] or "").lower()
if not content_type.startswith("image/") and suffix not in IMAGE_EXTENSIONS:
return "Featured image must be an image file."
image_max_size_mb = getattr(settings, "BLOG_IMAGE_ASSET_MAX_SIZE_MB", 10)
if file.size > image_max_size_mb * 1024 * 1024:
return f"Featured image size must be less than {image_max_size_mb}MB."
return None
def _apply_post_payload(post: Post, data: PostCreateSchema, *, user, allow_status: bool = False) -> Post:
payload = data.dict(exclude_unset=True)
tag_ids = payload.pop("tag_ids", None)
category_id = payload.pop("category_id", None)
requested_status = payload.pop("status", None)
if category_id is not None:
post.category = Category.objects.filter(id=category_id, is_deleted=False).first() if category_id else None
if requested_status and allow_status:
post.status = requested_status
if requested_status == Post.StatusChoices.PUBLISHED:
post.published_by = user
for field, value in payload.items():
setattr(post, field, value if value is not None else "")
post.save()
if tag_ids is not None:
post.tags.set(tag_ids)
return post
def _interaction_payload(post: Post, user) -> BlogInteractionSchema:
return BlogInteractionSchema(
liked=Like.objects.filter(post=post, user=user).exists(),
saved=SavedPost.objects.filter(post=post, user=user).exists(),
likes_count=post.likes.count(),
saves_count=post.saves.count(),
comments_count=post.comments.filter(is_approved=True).count(),
)
@blog_router.get("/admin/posts", response={200: List[PostListSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_admin_posts(
request,
status: Optional[str] = Query(None),
search: Optional[str] = Query(None),
mine: bool = Query(False),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
):
user = request.auth
if not can_access_blog_admin(user):
return 403, {"error": "Permission denied"}
queryset = _post_queryset().order_by("-updated_at", "-created_at")
if not (user.is_superuser or user.is_staff or can_review_blog_posts(user)) or mine:
queryset = queryset.filter(author=user)
if status:
queryset = queryset.filter(status=status)
if search:
queryset = queryset.filter(Q(title__icontains=search) | Q(content__icontains=search) | Q(excerpt__icontains=search))
return list(queryset[offset : offset + limit])
@blog_router.post("/admin/posts", response={201: PostDetailSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def create_admin_post(request, data: PostCreateSchema):
user = request.auth
if not can_write_blog_posts(user):
return 403, {"error": "Permission denied"}
try:
post = Post(author=user)
_apply_post_payload(post, data, user=user, allow_status=can_review_blog_posts(user))
if not can_review_blog_posts(user) and post.status == Post.StatusChoices.PUBLISHED:
post.status = Post.StatusChoices.DRAFT
post.published_at = None
post.published_by = None
post.save(update_fields=["status", "published_at", "published_by", "updated_at"])
return 201, _post_queryset().get(pk=post.pk)
except Exception as exc:
return 400, {"error": "Failed to create post", "details": str(exc)}
@blog_router.get("/admin/posts/{post_id}", response={200: PostDetailSchema, 403: ErrorSchema, 404: ErrorSchema}, auth=jwt_auth)
def get_admin_post(request, post_id: int):
post = get_object_or_404(_post_queryset(), id=post_id)
if not can_edit_post(request.auth, post):
return 403, {"error": "Permission denied"}
return 200, post
@blog_router.put("/admin/posts/{post_id}", response={200: PostDetailSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_admin_post(request, post_id: int, data: PostCreateSchema):
post = get_object_or_404(Post, id=post_id)
if not can_edit_post(request.auth, post):
return 403, {"error": "Permission denied"}
try:
_apply_post_payload(post, data, user=request.auth, allow_status=can_review_blog_posts(request.auth))
return 200, _post_queryset().get(pk=post.pk)
except Exception as exc:
return 400, {"error": "Failed to update post", "details": str(exc)}
@blog_router.post("/admin/posts/{post_id}/featured-image", response={200: PostDetailSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def upload_post_featured_image(request, post_id: int, file: UploadedFile = File(...)):
post = get_object_or_404(Post, id=post_id)
if not can_edit_post(request.auth, post):
return 403, {"error": "Permission denied"}
error = _validate_featured_image(file)
if error:
return 400, {"error": error}
post.featured_image = file
post.save(update_fields=["featured_image", "updated_at"])
return 200, _post_queryset().get(pk=post.pk)
@blog_router.delete("/admin/posts/{post_id}/featured-image", response={200: PostDetailSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_post_featured_image(request, post_id: int):
post = get_object_or_404(Post, id=post_id)
if not can_edit_post(request.auth, post):
return 403, {"error": "Permission denied"}
post.featured_image = None
post.save(update_fields=["featured_image", "updated_at"])
return 200, _post_queryset().get(pk=post.pk)
@blog_router.post("/admin/posts/{post_id}/submit", response={200: PostDetailSchema, 403: ErrorSchema}, auth=jwt_auth)
def submit_post_for_review(request, post_id: int):
post = get_object_or_404(Post, id=post_id)
if not can_edit_post(request.auth, post):
return 403, {"error": "Permission denied"}
post.status = Post.StatusChoices.SUBMITTED
post.submitted_at = timezone.now()
post.review_note = ""
post.save(update_fields=["status", "submitted_at", "review_note", "updated_at"])
return 200, _post_queryset().get(pk=post.pk)
@blog_router.post("/admin/posts/{post_id}/review", response={200: PostDetailSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def review_post(request, post_id: int, data: PostReviewSchema):
user = request.auth
if not can_review_blog_posts(user):
return 403, {"error": "Permission denied"}
post = get_object_or_404(Post, id=post_id)
action = data.action.strip().lower()
if action in {"publish", "approve"}:
post.status = Post.StatusChoices.PUBLISHED
post.published_at = post.published_at or timezone.now()
post.published_by = user
elif action in {"request_changes", "changes_requested"}:
post.status = Post.StatusChoices.CHANGES_REQUESTED
elif action == "archive":
post.status = Post.StatusChoices.ARCHIVED
else:
return 400, {"error": "Unsupported review action"}
post.reviewed_by = user
post.reviewed_at = timezone.now()
post.review_note = data.note or ""
post.save()
return 200, _post_queryset().get(pk=post.pk)
@blog_router.get("/admin/posts/{post_id}/assets", response={200: List[PostAssetSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_post_assets(request, post_id: int):
post = get_object_or_404(Post, id=post_id)
if not can_edit_post(request.auth, post):
return 403, {"error": "Permission denied"}
return list(post.assets.select_related("uploaded_by"))
@blog_router.post("/admin/posts/{post_id}/assets", response={201: PostAssetSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def upload_post_asset(
request,
post_id: int,
file: UploadedFile = File(...),
data: PostAssetCreateSchema = Form(...),
):
post = get_object_or_404(Post, id=post_id)
if not can_manage_post_assets(request.auth, post):
return 403, {"error": "Permission denied"}
error = _validate_asset_file(file)
if error:
return 400, {"error": error}
try:
asset = PostAsset.objects.create(
post=post,
file=file,
file_type=_asset_file_type(file),
title=(data.title if data else "") or file.name,
alt_text=data.alt_text if data else "",
caption=data.caption if data else "",
size=file.size,
mime_type=file.content_type or mimetypes.guess_type(file.name)[0] or "",
uploaded_by=request.auth,
)
return 201, asset
except Exception as exc:
return 400, {"error": "Failed to upload asset", "details": str(exc)}
@blog_router.delete("/admin/posts/{post_id}/assets/{asset_id}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_post_asset(request, post_id: int, asset_id: int):
post = get_object_or_404(Post, id=post_id)
asset = get_object_or_404(PostAsset, id=asset_id, post=post)
if not can_manage_post_assets(request.auth, post):
return 403, {"error": "Permission denied"}
asset.delete()
return 200, {"message": "Asset deleted successfully"}
@blog_router.get("/me/activity", response=BlogProfileActivitySchema, auth=jwt_auth)
def my_blog_activity(request):
comments = (
Comment.objects.filter(author=request.auth)
.select_related("author", "post")
.order_by("-created_at")[:20]
)
return BlogProfileActivitySchema(
liked_posts=list(_published_queryset().filter(likes__user=request.auth)[:20]),
saved_posts=list(_published_queryset().filter(saves__user=request.auth)[:20]),
comments=list([comment for comment in comments if comment.parent_id is None]),
replies=list([comment for comment in comments if comment.parent_id is not None]),
)
@blog_router.get("/posts", response=List[PostListSchema])
def list_posts(
request,
page: int = Query(1, ge=1),
limit: int = Query(10, ge=1, le=50),
category: Optional[str] = None,
tag: Optional[str] = None,
search: Optional[str] = None,
featured: Optional[bool] = None,
author: Optional[str] = None,
):
queryset = _published_queryset()
if category:
queryset = queryset.filter(category__slug=category)
if tag:
queryset = queryset.filter(tags__slug=tag)
if search:
queryset = queryset.filter(Q(title__icontains=search) | Q(content__icontains=search) | Q(excerpt__icontains=search))
if featured is not None:
queryset = queryset.filter(is_featured=featured)
if author:
queryset = queryset.filter(author__username=author)
offset = (page - 1) * limit
return list(queryset[offset : offset + limit])
@blog_router.get("/posts/{slug}/recommended", response=List[PostListSchema])
def recommended_posts(request, slug: str, limit: int = Query(3, ge=1, le=12)):
post = get_object_or_404(
Post.objects.select_related("category").prefetch_related("tags"),
slug=slug,
status=Post.StatusChoices.PUBLISHED,
)
tag_ids = list(post.tags.values_list("id", flat=True))
selected = []
seen_ids = {post.id}
def append_posts(queryset):
remaining = limit - len(selected)
if remaining <= 0:
return
for candidate in queryset.exclude(id__in=seen_ids)[:remaining]:
selected.append(candidate)
seen_ids.add(candidate.id)
if tag_ids:
append_posts(
_published_queryset()
.filter(tags__id__in=tag_ids)
.annotate(shared_tags=Count("tags", filter=Q(tags__id__in=tag_ids), distinct=True))
.order_by("-shared_tags", "-published_at", "-created_at")
.distinct()
)
if len(selected) < limit and post.category_id:
append_posts(
_published_queryset()
.filter(category_id=post.category_id)
.order_by("-published_at", "-created_at")
)
if len(selected) < limit:
append_posts(_published_queryset().order_by("-published_at", "-created_at"))
return selected
@blog_router.get("/posts/{slug}", response=PostDetailSchema)
def get_post(request, slug: str):
return get_object_or_404(_published_queryset(), slug=slug)
@blog_router.post("/posts", response={201: PostDetailSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def create_post_compat(request, data: PostCreateSchema):
return create_admin_post(request, data)
@blog_router.put("/posts/{slug}", response={200: PostDetailSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def update_post_compat(request, slug: str, data: PostCreateSchema):
post = get_object_or_404(Post, slug=slug)
return update_admin_post(request, post.id, data)
@blog_router.delete("/posts/{slug}", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def delete_post(request, slug: str):
if not request.auth.is_superuser:
return 403, {"error": "Only superusers can delete posts"}
post = get_object_or_404(Post, slug=slug)
post.delete()
return 200, {"message": "Post deleted successfully"}
@blog_router.get("/posts/{slug}/comments", response=List[CommentSchema])
def list_comments(request, slug: str):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
comments = Comment.objects.filter(post=post, is_approved=True, parent=None).select_related("author", "post").prefetch_related(
Prefetch("replies", queryset=Comment.objects.filter(is_approved=True).select_related("author", "post"))
)
return list(comments)
@blog_router.post("/posts/{slug}/comments", response={201: CommentSchema, 400: ErrorSchema}, auth=jwt_auth)
def create_comment(request, slug: str, data: CommentCreateSchema):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
parent = None
if data.parent_id:
parent = get_object_or_404(Comment, id=data.parent_id, post=post, is_approved=True)
comment = Comment.objects.create(post=post, author=request.auth, content=data.content, parent=parent)
return 201, comment
@blog_router.post("/comments/{comment_id}/hide", response={200: MessageSchema, 403: ErrorSchema}, auth=jwt_auth)
def hide_comment(request, comment_id: int, data: CommentHideSchema):
if not can_moderate_blog_comments(request.auth):
return 403, {"error": "Permission denied"}
comment = get_object_or_404(Comment, id=comment_id)
comment.hide(request.auth, data.note or "")
return 200, {"message": "Comment hidden successfully"}
@blog_router.post("/posts/{slug}/like", response={200: BlogInteractionSchema}, auth=jwt_auth)
def toggle_like(request, slug: str):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
like, created = Like.objects.get_or_create(post=post, user=request.auth)
if not created:
like.delete()
return 200, _interaction_payload(post, request.auth)
@blog_router.get("/posts/{slug}/likes", response={200: MessageSchema})
def get_likes_count(request, slug: str):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
return 200, {"message": str(post.likes.count())}
@blog_router.post("/posts/{slug}/save", response={200: BlogInteractionSchema}, auth=jwt_auth)
def toggle_save(request, slug: str):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
saved, created = SavedPost.objects.get_or_create(post=post, user=request.auth)
if not created:
saved.delete()
return 200, _interaction_payload(post, request.auth)
@blog_router.get("/posts/{slug}/interaction", response={200: BlogInteractionSchema}, auth=jwt_auth)
def get_interaction(request, slug: str):
post = get_object_or_404(Post, slug=slug, status=Post.StatusChoices.PUBLISHED)
return 200, _interaction_payload(post, request.auth)
@blog_router.get("/deleted/posts", response={200: List[PostListSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_deleted_posts(request):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
return 200, Post.deleted_objects.all().select_related("author", "category").prefetch_related("tags")
@blog_router.post("/deleted/posts/{post_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def restore_post(request, post_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
try:
post = Post.deleted_objects.get(id=post_id)
post.restore()
return 200, {"message": f"Post '{post.title}' restored successfully."}
except Post.DoesNotExist:
return 400, {"error": "Post not found or not soft-deleted."}
@blog_router.get("/deleted/comments", response={200: List[CommentSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_deleted_comments(request):
if not can_moderate_blog_comments(request.auth):
return 403, {"error": "Permission denied"}
return 200, Comment.deleted_objects.all().select_related("author", "post")
@blog_router.post("/deleted/comments/{comment_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def restore_comment(request, comment_id: int):
if not can_moderate_blog_comments(request.auth):
return 403, {"error": "Permission denied"}
try:
comment = Comment.deleted_objects.get(id=comment_id)
comment.restore()
return 200, {"message": f"Comment by {comment.author.username} restored successfully."}
except Comment.DoesNotExist:
return 400, {"error": "Comment not found or not soft-deleted."}
@blog_router.get("/categories", response=List[CategorySchema])
def list_categories(request):
return Category.objects.all()
@blog_router.get("/categories/{slug}", response=CategorySchema)
def get_category(request, slug: str):
return get_object_or_404(Category, slug=slug)
@blog_router.get("/deleted/categories", response={200: List[CategorySchema], 403: ErrorSchema}, auth=jwt_auth)
def list_deleted_categories(request):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
return 200, Category.deleted_objects.all()
@blog_router.post("/deleted/categories/{category_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def restore_category(request, category_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
try:
category = Category.deleted_objects.get(id=category_id)
category.restore()
return 200, {"message": f"Category '{category.name}' restored successfully."}
except Category.DoesNotExist:
return 400, {"error": "Category not found or not soft-deleted."}
@blog_router.get("/tags", response=List[TagSchema])
def list_tags(request):
return Tag.objects.all()
@blog_router.get("/tags/{slug}", response=TagSchema)
def get_tag(request, slug: str):
return get_object_or_404(Tag, slug=slug)
@blog_router.get("/deleted/tags", response={200: List[TagSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_deleted_tags(request):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
return 200, Tag.deleted_objects.all()
@blog_router.post("/deleted/tags/{tag_id}/restore", response={200: MessageSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def restore_tag(request, tag_id: int):
if not request.auth.is_superuser:
return 403, {"error": "Permission denied"}
try:
tag = Tag.deleted_objects.get(id=tag_id)
tag.restore()
return 200, {"message": f"Tag '{tag.name}' restored successfully."}
except Tag.DoesNotExist:
return 400, {"error": "Tag not found or not soft-deleted."}