Files

605 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import jwt
import uuid
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.files.base import ContentFile
from django.db.models import Q
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from ninja import Query, Router
from apps.users.api.schemas import (
AuthorizationRoleSchema,
GoogleClaimVerifySchema,
GoogleCompleteSchema,
GoogleFlowResponseSchema,
GoogleFlowSchema,
MobileLookupSchema,
MobileOtpSendSchema,
MobileOtpVerifySchema,
OtpSendResponseSchema,
OtpSendSchema,
PasswordResetSchema,
RegisterOtpVerifySchema,
TokenRefreshIn,
TokenSchema,
UserListSchema,
UserAuthorizationSchema,
UserAuthorizationUpdateSchema,
UserLoginSchema,
UserOtpLoginSchema,
UserProfileSchema,
UserRegistrationSchema,
UserUpdateSchema,
UsernameCheckSchema,
)
from apps.blog.permissions import ASSOCIATION_ADMIN_GROUP, BLOG_EDITOR_GROUP, BLOG_SUPERVISOR_GROUP
from apps.users.email_identity import normalize_email_identity
from apps.users.models import Major, University, User
from apps.users.services.auth import (
AuthServiceError,
RegistrationPayload,
generate_and_send_otp,
get_tokens_for_user,
login_with_otp,
login_with_password,
lookup_mobile_registration_state,
register_user,
reset_password_with_otp,
send_authenticated_mobile_otp,
verify_register_otp,
verify_authenticated_mobile_otp,
)
from apps.users.services.google_oauth import (
GoogleOAuthFlowError,
build_authenticated_flow_payload,
build_google_authorization_url,
build_google_callback_redirect_url,
build_pending_google_flow_payload,
complete_google_signup,
consume_google_state,
create_google_flow,
exchange_code_for_google_profile,
find_social_account_for_profile,
get_google_flow,
send_google_claim_otp,
sync_user_from_google_profile,
verify_google_claim,
)
from core.api.schemas import ErrorSchema, MessageSchema
from core.authentication import create_jwt_token, create_refresh_token, jwt_auth
from core.media import delete_image_derivatives
auth_router = Router()
CURATED_ROLE_GROUPS = {
BLOG_EDITOR_GROUP,
BLOG_SUPERVISOR_GROUP,
ASSOCIATION_ADMIN_GROUP,
}
ROLE_SPECS = [
{
"key": BLOG_EDITOR_GROUP,
"label": "ویرایشگر بلاگ",
"description": "امکان نوشتن و مدیریت نوشته‌های خودش در بلاگ.",
"group": BLOG_EDITOR_GROUP,
},
{
"key": BLOG_SUPERVISOR_GROUP,
"label": "سرپرست بلاگ",
"description": "امکان بررسی، انتشار، مدیریت دسته‌ها/برچسب‌ها و نظارت کامنت‌ها.",
"group": BLOG_SUPERVISOR_GROUP,
},
{
"key": ASSOCIATION_ADMIN_GROUP,
"label": "ادمین انجمن",
"description": "نقش سازمانی انجمن برای دسترسی‌های مدیریتی منتخب.",
"group": ASSOCIATION_ADMIN_GROUP,
},
{
"key": "staff_admin",
"label": "دسترسی پنل مدیریت",
"description": "فعال‌سازی is_staff برای ورود به بخش‌های مدیریتی عمومی.",
"field": "is_staff",
},
{
"key": "is_superuser",
"label": "سوپریوزر",
"description": "دسترسی کامل Django؛ از این صفحه قابل تغییر نیست.",
"field": "is_superuser",
"locked": True,
},
]
def _error_response(exc: AuthServiceError | GoogleOAuthFlowError):
return exc.status_code, {"error": exc.message}
def _ensure_superuser(user):
return bool(user and user.is_superuser)
def _role_payload(user: User) -> list[dict]:
user_groups = set(user.groups.values_list("name", flat=True))
roles = []
for spec in ROLE_SPECS:
key = spec["key"]
enabled = False
if spec.get("group"):
enabled = spec["group"] in user_groups
elif spec.get("field"):
enabled = bool(getattr(user, spec["field"]))
roles.append(
{
"key": key,
"label": spec["label"],
"description": spec["description"],
"enabled": enabled,
"locked": bool(spec.get("locked", False)),
}
)
return roles
def _authorization_payload(user: User) -> dict:
return {
"id": user.id,
"username": user.username,
"email": user.email,
"mobile": user.mobile,
"first_name": user.first_name,
"last_name": user.last_name,
"is_active": user.is_active,
"is_staff": user.is_staff,
"is_superuser": user.is_superuser,
"groups": list(user.groups.values_list("name", flat=True)),
"roles": _role_payload(user),
}
def _get_major_from_code(code: str | None):
if not code:
return None
return Major.objects.filter(code=code, is_deleted=False).first()
def _get_university_from_code(code: str | None):
if not code:
return None
return University.objects.filter(code=code, is_deleted=False).first()
@auth_router.post("/register", response={201: MessageSchema, 400: ErrorSchema})
def register(request, data: UserRegistrationSchema):
try:
user = register_user(
RegistrationPayload(
username=data.username,
mobile=data.mobile,
password=data.password,
code=data.code,
email=data.email,
first_name=data.first_name or "",
last_name=data.last_name or "",
university=data.university,
student_id=data.student_id,
year_of_study=data.year_of_study,
major=data.major,
)
)
except AuthServiceError as exc:
return _error_response(exc)
return 201, {"message": f"ثبت‌نام با موفقیت انجام شد. خوش آمدید {user.get_full_name() or user.username}."}
@auth_router.post("/otp/send", response={200: OtpSendResponseSchema, 400: ErrorSchema})
def send_otp(request, data: OtpSendSchema):
try:
payload = generate_and_send_otp(data.mobile, data.mode)
except AuthServiceError as exc:
return _error_response(exc)
return 200, payload
@auth_router.post("/otp/verify-register", response={200: MessageSchema, 400: ErrorSchema})
def verify_register_otp_view(request, data: RegisterOtpVerifySchema):
try:
verify_register_otp(data.mobile, data.code)
except AuthServiceError as exc:
return _error_response(exc)
return 200, {"message": "کد تایید با موفقیت تایید شد."}
@auth_router.post("/login", response={200: TokenSchema, 401: ErrorSchema, 400: ErrorSchema})
def login(request, data: UserLoginSchema):
try:
return 200, login_with_password(data.identifier, data.password)
except AuthServiceError as exc:
return _error_response(exc)
@auth_router.post("/login/otp", response={200: TokenSchema, 401: ErrorSchema, 400: ErrorSchema})
def login_otp(request, data: UserOtpLoginSchema):
try:
return 200, login_with_otp(data.mobile, data.code)
except AuthServiceError as exc:
return _error_response(exc)
@auth_router.post("/reset-password", response={200: MessageSchema, 400: ErrorSchema})
def reset_password(request, data: PasswordResetSchema):
try:
reset_password_with_otp(data.mobile, data.code, data.new_password)
except AuthServiceError as exc:
return _error_response(exc)
return 200, {"message": "رمز عبور با موفقیت تغییر کرد."}
@auth_router.post("/mobile/send-otp", response={200: OtpSendResponseSchema, 400: ErrorSchema}, auth=jwt_auth)
def send_mobile_verify_otp(request, data: MobileOtpSendSchema):
try:
payload = send_authenticated_mobile_otp(request.auth, data.mobile)
except AuthServiceError as exc:
return _error_response(exc)
return 200, payload
@auth_router.post("/mobile/verify", response={200: UserProfileSchema, 400: ErrorSchema}, auth=jwt_auth)
def verify_mobile(request, data: MobileOtpVerifySchema):
try:
user = verify_authenticated_mobile_otp(request.auth, data.mobile, data.code)
except AuthServiceError as exc:
return _error_response(exc)
return 200, user
@auth_router.post("/refresh", response={200: TokenSchema, 401: ErrorSchema})
def refresh_tokens(request, data: TokenRefreshIn):
try:
payload = jwt.decode(
data.refresh_token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM],
)
if payload.get("type") != "refresh":
return 401, {"error": "نوع توکن نامعتبر است."}
user_id = payload.get("user_id")
if not user_id:
return 401, {"error": "داده‌های توکن نامعتبر است."}
user = get_object_or_404(User, id=user_id, is_active=True)
except jwt.ExpiredSignatureError:
return 401, {"error": "رفرش‌توکن منقضی شده است."}
except jwt.InvalidTokenError:
return 401, {"error": "رفرش‌توکن نامعتبر است."}
return {
"access_token": create_jwt_token(user),
"refresh_token": create_refresh_token(user),
"token_type": "bearer",
}
@auth_router.get("/oauth/google/start")
def google_oauth_start(request):
return HttpResponseRedirect(build_google_authorization_url())
@auth_router.get("/oauth/google/callback")
def google_oauth_callback(request):
if request.GET.get("error"):
flow = create_google_flow(
{
"status": "error",
"detail": request.GET.get("error_description") or "Google sign-in was cancelled.",
}
)
return HttpResponseRedirect(build_google_callback_redirect_url(flow))
try:
consume_google_state(request.GET.get("state"))
profile = exchange_code_for_google_profile(request.GET.get("code"))
social_account = find_social_account_for_profile(profile)
if social_account and social_account.user.is_mobile_verified:
sync_user_from_google_profile(social_account.user, profile)
flow_payload = build_authenticated_flow_payload(social_account.user)
elif social_account:
sync_user_from_google_profile(social_account.user, profile)
flow_payload = build_pending_google_flow_payload(profile)
flow_payload["resolution"] = "existing_email_claim"
flow_payload["target_user_id"] = social_account.user.id
flow_payload["mobile_hint"] = social_account.user.mobile
else:
flow_payload = build_pending_google_flow_payload(profile)
flow = create_google_flow(flow_payload)
return HttpResponseRedirect(build_google_callback_redirect_url(flow))
except GoogleOAuthFlowError as exc:
flow = create_google_flow({"status": "error", "detail": exc.message})
return HttpResponseRedirect(build_google_callback_redirect_url(flow))
@auth_router.get("/oauth/google/flow", response={200: GoogleFlowResponseSchema, 400: ErrorSchema})
def google_oauth_flow(request, flow: str):
try:
return 200, get_google_flow(flow)
except GoogleOAuthFlowError as exc:
return _error_response(exc)
@auth_router.post("/oauth/google/complete", response={200: GoogleFlowResponseSchema, 400: ErrorSchema})
def google_oauth_complete(request, data: GoogleCompleteSchema):
try:
payload = complete_google_signup(
flow=data.flow,
mobile=data.mobile,
username=data.username,
student_id=data.student_id,
year_of_study=data.year_of_study,
major=data.major,
university=data.university,
first_name=data.first_name,
last_name=data.last_name,
)
return 200, payload
except (GoogleOAuthFlowError, AuthServiceError) as exc:
return _error_response(exc)
@auth_router.post("/oauth/google/claim/send-otp", response={200: MessageSchema, 400: ErrorSchema})
def google_oauth_claim_send_otp(request, data: GoogleFlowSchema):
try:
return 200, send_google_claim_otp(data.flow)
except (GoogleOAuthFlowError, AuthServiceError) as exc:
return _error_response(exc)
@auth_router.post("/oauth/google/claim/verify", response={200: GoogleFlowResponseSchema, 400: ErrorSchema})
def google_oauth_claim_verify(request, data: GoogleClaimVerifySchema):
try:
return 200, verify_google_claim(data.flow, data.code)
except (GoogleOAuthFlowError, AuthServiceError) as exc:
return _error_response(exc)
@auth_router.get("/verify-email/{token}", response=MessageSchema)
def verify_email_legacy_guidance(request, token: str):
return {
"message": "تایید ایمیل غیرفعال شده است. برای بازیابی حساب از ورود با گوگل یا تایید شماره موبایل استفاده کنید."
}
@auth_router.post("/resend-verification", response=MessageSchema)
def resend_verification_legacy_guidance(request, email: str):
return {
"message": "ارسال ایمیل تایید غیرفعال شده است. برای ادامه، شماره موبایل خود را ثبت یا از ورود با گوگل استفاده کنید."
}
@auth_router.post("/request-password-reset", response=MessageSchema)
def request_password_reset_legacy_guidance(request):
return {
"message": "بازیابی رمز عبور با ایمیل غیرفعال شده است. از بازیابی با کد پیامکی یا ورود با گوگل استفاده کنید."
}
@auth_router.post("/reset-password-confirm", response=MessageSchema)
def reset_password_confirm_legacy_guidance(request):
return {
"message": "لینک بازیابی ایمیلی غیرفعال شده است. لطفاً از بازیابی رمز عبور با موبایل استفاده کنید."
}
@auth_router.get("/profile", response=UserProfileSchema, auth=jwt_auth)
def get_profile(request):
return request.auth
@auth_router.put("/profile", response={200: UserProfileSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_profile(request, data: UserUpdateSchema):
user = request.auth
payload = data.dict(exclude_unset=True)
if "major" in payload:
code = payload.pop("major")
if code:
major_obj = _get_major_from_code(code)
if not major_obj:
return 400, {"error": "رشته انتخابی معتبر نیست."}
payload["major"] = major_obj
else:
payload["major"] = None
if "university" in payload:
code = payload.pop("university")
if code:
uni_obj = _get_university_from_code(code)
if not uni_obj:
return 400, {"error": "دانشگاه انتخابی معتبر نیست."}
payload["university"] = uni_obj
else:
payload["university"] = None
if "email" in payload:
normalized_email = normalize_email_identity(payload["email"])
existing = User.objects.filter(email=normalized_email).exclude(pk=user.pk) if normalized_email else User.objects.none()
if existing.exists():
return 400, {"error": "این ایمیل قبلاً ثبت شده است."}
payload["email"] = normalized_email
payload["is_email_verified"] = False if normalized_email else False
for field, value in payload.items():
setattr(user, field, value)
user.save()
return 200, user
@auth_router.post("/profile/picture", response={200: MessageSchema, 400: ErrorSchema}, auth=jwt_auth)
def upload_profile_picture(request):
if "file" not in request.FILES:
return 400, {"error": "فایلی ارسال نشده است."}
file = request.FILES["file"]
if not file.content_type.startswith("image/"):
return 400, {"error": "فایل باید از نوع تصویر باشد."}
if file.size > 5 * 1024 * 1024:
return 400, {"error": "حجم فایل باید کمتر از ۵ مگابایت باشد."}
user = request.auth
filename = f"profile_pictures/{user.id}_{uuid.uuid4().hex}.{file.name.split('.')[-1]}"
user.profile_picture.save(filename, ContentFile(file.read()))
return 200, {"message": "تصویر پروفایل با موفقیت به‌روزرسانی شد."}
@auth_router.delete("/profile/picture", response={200: MessageSchema}, auth=jwt_auth)
def delete_profile_picture(request):
user = request.auth
if user.profile_picture:
delete_image_derivatives(user.profile_picture, "profile_picture", delete_original=True)
user.profile_picture = None
user.save(update_fields=["profile_picture"])
return 200, {"message": "تصویر پروفایل با موفقیت حذف شد."}
@auth_router.get("/users/deleted", response={200: list[UserProfileSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_deleted_users(request):
if not (request.auth.is_staff or request.auth.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
return User.deleted_objects.all()
@auth_router.post("/users/{user_id}/restore", response={200: MessageSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth)
def restore_user(request, user_id: int):
if not (request.auth.is_staff or request.auth.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
try:
user = User.deleted_objects.get(id=user_id)
user.restore()
return 200, {"message": f"کاربر {user.username} با موفقیت بازیابی شد."}
except User.DoesNotExist:
return 400, {"error": "کاربر یافت نشد یا حذف نرم نشده است."}
@auth_router.get("/users", response={200: list[UserListSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_users(
request,
search: str | None = Query(None),
role: str | None = Query(None, description="staff or superuser"),
student_id: str | None = Query(None),
university: str | None = Query(None),
major: str | None = Query(None),
is_active: str | None = Query(None, description="true or false"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
):
user = request.auth
if not (user.is_staff or user.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
queryset = User.objects.order_by("-date_joined")
if search:
queryset = queryset.filter(
Q(username__icontains=search)
| Q(email__icontains=search)
| Q(mobile__icontains=search)
| Q(first_name__icontains=search)
| Q(last_name__icontains=search)
)
if role == "staff":
queryset = queryset.filter(is_staff=True)
elif role == "superuser":
queryset = queryset.filter(is_superuser=True)
if student_id:
queryset = queryset.filter(student_id__icontains=student_id)
if university:
queryset = queryset.filter(
Q(university__code__icontains=university) | Q(university__name__icontains=university)
)
if major:
queryset = queryset.filter(Q(major__code__icontains=major) | Q(major__name__icontains=major))
if is_active is not None:
if is_active.lower() in ("true", "1"):
queryset = queryset.filter(is_active=True)
elif is_active.lower() in ("false", "0"):
queryset = queryset.filter(is_active=False)
return queryset[offset : offset + limit]
@auth_router.get("/users/{user_id}", response={200: UserProfileSchema, 403: ErrorSchema, 404: ErrorSchema}, auth=jwt_auth)
def get_user_detail(request, user_id: int):
user = request.auth
if not (user.is_staff or user.is_superuser):
return 403, {"error": "اجازه دسترسی ندارید."}
target = get_object_or_404(User, id=user_id)
return 200, target
@auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth)
def list_authorization_roles(request):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
return 200, [
{
"key": spec["key"],
"label": spec["label"],
"description": spec["description"],
"enabled": False,
"locked": bool(spec.get("locked", False)),
}
for spec in ROLE_SPECS
]
@auth_router.get("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema}, auth=jwt_auth)
def get_user_authorization(request, user_id: int):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
user = get_object_or_404(User, id=user_id)
return 200, _authorization_payload(user)
@auth_router.put("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth)
def update_user_authorization(request, user_id: int, data: UserAuthorizationUpdateSchema):
if not _ensure_superuser(request.auth):
return 403, {"error": "اجازه دسترسی ندارید."}
user = get_object_or_404(User, id=user_id)
if user.id == request.auth.id:
return 400, {"error": "برای جلوگیری از قفل شدن دسترسی، نمی‌توانید نقش‌های خودتان را از این صفحه تغییر دهید."}
requested_groups = set(data.groups or [])
invalid_groups = requested_groups - CURATED_ROLE_GROUPS
if invalid_groups:
return 400, {"error": "نقش انتخاب‌شده معتبر نیست."}
user.is_staff = bool(data.is_staff)
user.save(update_fields=["is_staff"])
current_curated_groups = list(Group.objects.filter(name__in=CURATED_ROLE_GROUPS))
if current_curated_groups:
user.groups.remove(*current_curated_groups)
groups_to_add = [Group.objects.get_or_create(name=name)[0] for name in sorted(requested_groups)]
if groups_to_add:
user.groups.add(*groups_to_add)
return 200, _authorization_payload(user)
@auth_router.get("/check-username", response=UsernameCheckSchema)
def check_username_availability(request, username: str):
return {"exists": User.objects.filter(username=username).exists()}
@auth_router.get("/check-mobile", response={200: MobileLookupSchema, 400: ErrorSchema})
def check_mobile_availability(request, mobile: str):
try:
return 200, lookup_mobile_registration_state(mobile)
except AuthServiceError as exc:
return _error_response(exc)