from __future__ import annotations import jwt import uuid from django.conf import settings from django.contrib.auth.models import Group from django.core.files.base import ContentFile from django.db.models import Q from django.http import HttpResponseRedirect from django.shortcuts import get_object_or_404 from ninja import Query, Router from apps.users.api.schemas import ( AuthorizationRoleSchema, GoogleClaimVerifySchema, GoogleCompleteSchema, GoogleFlowResponseSchema, GoogleFlowSchema, MobileLookupSchema, MobileOtpSendSchema, MobileOtpVerifySchema, OtpSendResponseSchema, OtpSendSchema, PasswordResetSchema, RegisterOtpVerifySchema, TokenRefreshIn, TokenSchema, UserListSchema, UserAuthorizationSchema, UserAuthorizationUpdateSchema, UserLoginSchema, UserOtpLoginSchema, UserProfileSchema, UserRegistrationSchema, UserUpdateSchema, UsernameCheckSchema, ) from apps.blog.permissions import ASSOCIATION_ADMIN_GROUP, BLOG_EDITOR_GROUP, BLOG_SUPERVISOR_GROUP from apps.users.email_identity import normalize_email_identity from apps.users.models import Major, University, User from apps.users.services.auth import ( AuthServiceError, RegistrationPayload, generate_and_send_otp, get_tokens_for_user, login_with_otp, login_with_password, lookup_mobile_registration_state, register_user, reset_password_with_otp, send_authenticated_mobile_otp, verify_register_otp, verify_authenticated_mobile_otp, ) from apps.users.services.google_oauth import ( GoogleOAuthFlowError, build_authenticated_flow_payload, build_google_authorization_url, build_google_callback_redirect_url, build_pending_google_flow_payload, complete_google_signup, consume_google_state, create_google_flow, exchange_code_for_google_profile, find_social_account_for_profile, get_google_flow, send_google_claim_otp, sync_user_from_google_profile, verify_google_claim, ) from core.api.schemas import ErrorSchema, MessageSchema from core.authentication import create_jwt_token, create_refresh_token, jwt_auth from core.media import delete_image_derivatives auth_router = Router() CURATED_ROLE_GROUPS = { BLOG_EDITOR_GROUP, BLOG_SUPERVISOR_GROUP, ASSOCIATION_ADMIN_GROUP, } ROLE_SPECS = [ { "key": BLOG_EDITOR_GROUP, "label": "ویرایشگر بلاگ", "description": "امکان نوشتن و مدیریت نوشته‌های خودش در بلاگ.", "group": BLOG_EDITOR_GROUP, }, { "key": BLOG_SUPERVISOR_GROUP, "label": "سرپرست بلاگ", "description": "امکان بررسی، انتشار، مدیریت دسته‌ها/برچسب‌ها و نظارت کامنت‌ها.", "group": BLOG_SUPERVISOR_GROUP, }, { "key": ASSOCIATION_ADMIN_GROUP, "label": "ادمین انجمن", "description": "نقش سازمانی انجمن برای دسترسی‌های مدیریتی منتخب.", "group": ASSOCIATION_ADMIN_GROUP, }, { "key": "staff_admin", "label": "دسترسی پنل مدیریت", "description": "فعال‌سازی is_staff برای ورود به بخش‌های مدیریتی عمومی.", "field": "is_staff", }, { "key": "is_superuser", "label": "سوپریوزر", "description": "دسترسی کامل Django؛ از این صفحه قابل تغییر نیست.", "field": "is_superuser", "locked": True, }, ] def _error_response(exc: AuthServiceError | GoogleOAuthFlowError): return exc.status_code, {"error": exc.message} def _ensure_superuser(user): return bool(user and user.is_superuser) def _role_payload(user: User) -> list[dict]: user_groups = set(user.groups.values_list("name", flat=True)) roles = [] for spec in ROLE_SPECS: key = spec["key"] enabled = False if spec.get("group"): enabled = spec["group"] in user_groups elif spec.get("field"): enabled = bool(getattr(user, spec["field"])) roles.append( { "key": key, "label": spec["label"], "description": spec["description"], "enabled": enabled, "locked": bool(spec.get("locked", False)), } ) return roles def _authorization_payload(user: User) -> dict: return { "id": user.id, "username": user.username, "email": user.email, "mobile": user.mobile, "first_name": user.first_name, "last_name": user.last_name, "is_active": user.is_active, "is_staff": user.is_staff, "is_superuser": user.is_superuser, "groups": list(user.groups.values_list("name", flat=True)), "roles": _role_payload(user), } def _get_major_from_code(code: str | None): if not code: return None return Major.objects.filter(code=code, is_deleted=False).first() def _get_university_from_code(code: str | None): if not code: return None return University.objects.filter(code=code, is_deleted=False).first() @auth_router.post("/register", response={201: MessageSchema, 400: ErrorSchema}) def register(request, data: UserRegistrationSchema): try: user = register_user( RegistrationPayload( username=data.username, mobile=data.mobile, password=data.password, code=data.code, email=data.email, first_name=data.first_name or "", last_name=data.last_name or "", university=data.university, student_id=data.student_id, year_of_study=data.year_of_study, major=data.major, ) ) except AuthServiceError as exc: return _error_response(exc) return 201, {"message": f"ثبت‌نام با موفقیت انجام شد. خوش آمدید {user.get_full_name() or user.username}."} @auth_router.post("/otp/send", response={200: OtpSendResponseSchema, 400: ErrorSchema}) def send_otp(request, data: OtpSendSchema): try: payload = generate_and_send_otp(data.mobile, data.mode) except AuthServiceError as exc: return _error_response(exc) return 200, payload @auth_router.post("/otp/verify-register", response={200: MessageSchema, 400: ErrorSchema}) def verify_register_otp_view(request, data: RegisterOtpVerifySchema): try: verify_register_otp(data.mobile, data.code) except AuthServiceError as exc: return _error_response(exc) return 200, {"message": "کد تایید با موفقیت تایید شد."} @auth_router.post("/login", response={200: TokenSchema, 401: ErrorSchema, 400: ErrorSchema}) def login(request, data: UserLoginSchema): try: return 200, login_with_password(data.identifier, data.password) except AuthServiceError as exc: return _error_response(exc) @auth_router.post("/login/otp", response={200: TokenSchema, 401: ErrorSchema, 400: ErrorSchema}) def login_otp(request, data: UserOtpLoginSchema): try: return 200, login_with_otp(data.mobile, data.code) except AuthServiceError as exc: return _error_response(exc) @auth_router.post("/reset-password", response={200: MessageSchema, 400: ErrorSchema}) def reset_password(request, data: PasswordResetSchema): try: reset_password_with_otp(data.mobile, data.code, data.new_password) except AuthServiceError as exc: return _error_response(exc) return 200, {"message": "رمز عبور با موفقیت تغییر کرد."} @auth_router.post("/mobile/send-otp", response={200: OtpSendResponseSchema, 400: ErrorSchema}, auth=jwt_auth) def send_mobile_verify_otp(request, data: MobileOtpSendSchema): try: payload = send_authenticated_mobile_otp(request.auth, data.mobile) except AuthServiceError as exc: return _error_response(exc) return 200, payload @auth_router.post("/mobile/verify", response={200: UserProfileSchema, 400: ErrorSchema}, auth=jwt_auth) def verify_mobile(request, data: MobileOtpVerifySchema): try: user = verify_authenticated_mobile_otp(request.auth, data.mobile, data.code) except AuthServiceError as exc: return _error_response(exc) return 200, user @auth_router.post("/refresh", response={200: TokenSchema, 401: ErrorSchema}) def refresh_tokens(request, data: TokenRefreshIn): try: payload = jwt.decode( data.refresh_token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM], ) if payload.get("type") != "refresh": return 401, {"error": "نوع توکن نامعتبر است."} user_id = payload.get("user_id") if not user_id: return 401, {"error": "داده‌های توکن نامعتبر است."} user = get_object_or_404(User, id=user_id, is_active=True) except jwt.ExpiredSignatureError: return 401, {"error": "رفرش‌توکن منقضی شده است."} except jwt.InvalidTokenError: return 401, {"error": "رفرش‌توکن نامعتبر است."} return { "access_token": create_jwt_token(user), "refresh_token": create_refresh_token(user), "token_type": "bearer", } @auth_router.get("/oauth/google/start") def google_oauth_start(request): return HttpResponseRedirect(build_google_authorization_url()) @auth_router.get("/oauth/google/callback") def google_oauth_callback(request): if request.GET.get("error"): flow = create_google_flow( { "status": "error", "detail": request.GET.get("error_description") or "Google sign-in was cancelled.", } ) return HttpResponseRedirect(build_google_callback_redirect_url(flow)) try: consume_google_state(request.GET.get("state")) profile = exchange_code_for_google_profile(request.GET.get("code")) social_account = find_social_account_for_profile(profile) if social_account and social_account.user.is_mobile_verified: sync_user_from_google_profile(social_account.user, profile) flow_payload = build_authenticated_flow_payload(social_account.user) elif social_account: sync_user_from_google_profile(social_account.user, profile) flow_payload = build_pending_google_flow_payload(profile) flow_payload["resolution"] = "existing_email_claim" flow_payload["target_user_id"] = social_account.user.id flow_payload["mobile_hint"] = social_account.user.mobile else: flow_payload = build_pending_google_flow_payload(profile) flow = create_google_flow(flow_payload) return HttpResponseRedirect(build_google_callback_redirect_url(flow)) except GoogleOAuthFlowError as exc: flow = create_google_flow({"status": "error", "detail": exc.message}) return HttpResponseRedirect(build_google_callback_redirect_url(flow)) @auth_router.get("/oauth/google/flow", response={200: GoogleFlowResponseSchema, 400: ErrorSchema}) def google_oauth_flow(request, flow: str): try: return 200, get_google_flow(flow) except GoogleOAuthFlowError as exc: return _error_response(exc) @auth_router.post("/oauth/google/complete", response={200: GoogleFlowResponseSchema, 400: ErrorSchema}) def google_oauth_complete(request, data: GoogleCompleteSchema): try: payload = complete_google_signup( flow=data.flow, mobile=data.mobile, username=data.username, student_id=data.student_id, year_of_study=data.year_of_study, major=data.major, university=data.university, first_name=data.first_name, last_name=data.last_name, ) return 200, payload except (GoogleOAuthFlowError, AuthServiceError) as exc: return _error_response(exc) @auth_router.post("/oauth/google/claim/send-otp", response={200: MessageSchema, 400: ErrorSchema}) def google_oauth_claim_send_otp(request, data: GoogleFlowSchema): try: return 200, send_google_claim_otp(data.flow) except (GoogleOAuthFlowError, AuthServiceError) as exc: return _error_response(exc) @auth_router.post("/oauth/google/claim/verify", response={200: GoogleFlowResponseSchema, 400: ErrorSchema}) def google_oauth_claim_verify(request, data: GoogleClaimVerifySchema): try: return 200, verify_google_claim(data.flow, data.code) except (GoogleOAuthFlowError, AuthServiceError) as exc: return _error_response(exc) @auth_router.get("/verify-email/{token}", response=MessageSchema) def verify_email_legacy_guidance(request, token: str): return { "message": "تایید ایمیل غیرفعال شده است. برای بازیابی حساب از ورود با گوگل یا تایید شماره موبایل استفاده کنید." } @auth_router.post("/resend-verification", response=MessageSchema) def resend_verification_legacy_guidance(request, email: str): return { "message": "ارسال ایمیل تایید غیرفعال شده است. برای ادامه، شماره موبایل خود را ثبت یا از ورود با گوگل استفاده کنید." } @auth_router.post("/request-password-reset", response=MessageSchema) def request_password_reset_legacy_guidance(request): return { "message": "بازیابی رمز عبور با ایمیل غیرفعال شده است. از بازیابی با کد پیامکی یا ورود با گوگل استفاده کنید." } @auth_router.post("/reset-password-confirm", response=MessageSchema) def reset_password_confirm_legacy_guidance(request): return { "message": "لینک بازیابی ایمیلی غیرفعال شده است. لطفاً از بازیابی رمز عبور با موبایل استفاده کنید." } @auth_router.get("/profile", response=UserProfileSchema, auth=jwt_auth) def get_profile(request): return request.auth @auth_router.put("/profile", response={200: UserProfileSchema, 400: ErrorSchema}, auth=jwt_auth) def update_profile(request, data: UserUpdateSchema): user = request.auth payload = data.dict(exclude_unset=True) if "major" in payload: code = payload.pop("major") if code: major_obj = _get_major_from_code(code) if not major_obj: return 400, {"error": "رشته انتخابی معتبر نیست."} payload["major"] = major_obj else: payload["major"] = None if "university" in payload: code = payload.pop("university") if code: uni_obj = _get_university_from_code(code) if not uni_obj: return 400, {"error": "دانشگاه انتخابی معتبر نیست."} payload["university"] = uni_obj else: payload["university"] = None if "email" in payload: normalized_email = normalize_email_identity(payload["email"]) existing = User.objects.filter(email=normalized_email).exclude(pk=user.pk) if normalized_email else User.objects.none() if existing.exists(): return 400, {"error": "این ایمیل قبلاً ثبت شده است."} payload["email"] = normalized_email payload["is_email_verified"] = False if normalized_email else False for field, value in payload.items(): setattr(user, field, value) user.save() return 200, user @auth_router.post("/profile/picture", response={200: MessageSchema, 400: ErrorSchema}, auth=jwt_auth) def upload_profile_picture(request): if "file" not in request.FILES: return 400, {"error": "فایلی ارسال نشده است."} file = request.FILES["file"] if not file.content_type.startswith("image/"): return 400, {"error": "فایل باید از نوع تصویر باشد."} if file.size > 5 * 1024 * 1024: return 400, {"error": "حجم فایل باید کمتر از ۵ مگابایت باشد."} user = request.auth filename = f"profile_pictures/{user.id}_{uuid.uuid4().hex}.{file.name.split('.')[-1]}" user.profile_picture.save(filename, ContentFile(file.read())) return 200, {"message": "تصویر پروفایل با موفقیت به‌روزرسانی شد."} @auth_router.delete("/profile/picture", response={200: MessageSchema}, auth=jwt_auth) def delete_profile_picture(request): user = request.auth if user.profile_picture: delete_image_derivatives(user.profile_picture, "profile_picture", delete_original=True) user.profile_picture = None user.save(update_fields=["profile_picture"]) return 200, {"message": "تصویر پروفایل با موفقیت حذف شد."} @auth_router.get("/users/deleted", response={200: list[UserProfileSchema], 403: ErrorSchema}, auth=jwt_auth) def list_deleted_users(request): if not (request.auth.is_staff or request.auth.is_superuser): return 403, {"error": "اجازه دسترسی ندارید."} return User.deleted_objects.all() @auth_router.post("/users/{user_id}/restore", response={200: MessageSchema, 400: ErrorSchema, 403: ErrorSchema}, auth=jwt_auth) def restore_user(request, user_id: int): if not (request.auth.is_staff or request.auth.is_superuser): return 403, {"error": "اجازه دسترسی ندارید."} try: user = User.deleted_objects.get(id=user_id) user.restore() return 200, {"message": f"کاربر {user.username} با موفقیت بازیابی شد."} except User.DoesNotExist: return 400, {"error": "کاربر یافت نشد یا حذف نرم نشده است."} @auth_router.get("/users", response={200: list[UserListSchema], 403: ErrorSchema}, auth=jwt_auth) def list_users( request, search: str | None = Query(None), role: str | None = Query(None, description="staff or superuser"), student_id: str | None = Query(None), university: str | None = Query(None), major: str | None = Query(None), is_active: str | None = Query(None, description="true or false"), limit: int = Query(50, ge=1, le=200), offset: int = Query(0, ge=0), ): user = request.auth if not (user.is_staff or user.is_superuser): return 403, {"error": "اجازه دسترسی ندارید."} queryset = User.objects.order_by("-date_joined") if search: queryset = queryset.filter( Q(username__icontains=search) | Q(email__icontains=search) | Q(mobile__icontains=search) | Q(first_name__icontains=search) | Q(last_name__icontains=search) ) if role == "staff": queryset = queryset.filter(is_staff=True) elif role == "superuser": queryset = queryset.filter(is_superuser=True) if student_id: queryset = queryset.filter(student_id__icontains=student_id) if university: queryset = queryset.filter( Q(university__code__icontains=university) | Q(university__name__icontains=university) ) if major: queryset = queryset.filter(Q(major__code__icontains=major) | Q(major__name__icontains=major)) if is_active is not None: if is_active.lower() in ("true", "1"): queryset = queryset.filter(is_active=True) elif is_active.lower() in ("false", "0"): queryset = queryset.filter(is_active=False) return queryset[offset : offset + limit] @auth_router.get("/users/{user_id}", response={200: UserProfileSchema, 403: ErrorSchema, 404: ErrorSchema}, auth=jwt_auth) def get_user_detail(request, user_id: int): user = request.auth if not (user.is_staff or user.is_superuser): return 403, {"error": "اجازه دسترسی ندارید."} target = get_object_or_404(User, id=user_id) return 200, target @auth_router.get("/roles", response={200: list[AuthorizationRoleSchema], 403: ErrorSchema}, auth=jwt_auth) def list_authorization_roles(request): if not _ensure_superuser(request.auth): return 403, {"error": "اجازه دسترسی ندارید."} return 200, [ { "key": spec["key"], "label": spec["label"], "description": spec["description"], "enabled": False, "locked": bool(spec.get("locked", False)), } for spec in ROLE_SPECS ] @auth_router.get("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema}, auth=jwt_auth) def get_user_authorization(request, user_id: int): if not _ensure_superuser(request.auth): return 403, {"error": "اجازه دسترسی ندارید."} user = get_object_or_404(User, id=user_id) return 200, _authorization_payload(user) @auth_router.put("/users/{user_id}/authorization", response={200: UserAuthorizationSchema, 403: ErrorSchema, 400: ErrorSchema}, auth=jwt_auth) def update_user_authorization(request, user_id: int, data: UserAuthorizationUpdateSchema): if not _ensure_superuser(request.auth): return 403, {"error": "اجازه دسترسی ندارید."} user = get_object_or_404(User, id=user_id) if user.id == request.auth.id: return 400, {"error": "برای جلوگیری از قفل شدن دسترسی، نمی‌توانید نقش‌های خودتان را از این صفحه تغییر دهید."} requested_groups = set(data.groups or []) invalid_groups = requested_groups - CURATED_ROLE_GROUPS if invalid_groups: return 400, {"error": "نقش انتخاب‌شده معتبر نیست."} user.is_staff = bool(data.is_staff) user.save(update_fields=["is_staff"]) current_curated_groups = list(Group.objects.filter(name__in=CURATED_ROLE_GROUPS)) if current_curated_groups: user.groups.remove(*current_curated_groups) groups_to_add = [Group.objects.get_or_create(name=name)[0] for name in sorted(requested_groups)] if groups_to_add: user.groups.add(*groups_to_add) return 200, _authorization_payload(user) @auth_router.get("/check-username", response=UsernameCheckSchema) def check_username_availability(request, username: str): return {"exists": User.objects.filter(username=username).exists()} @auth_router.get("/check-mobile", response={200: MobileLookupSchema, 400: ErrorSchema}) def check_mobile_availability(request, mobile: str): try: return 200, lookup_mobile_registration_state(mobile) except AuthServiceError as exc: return _error_response(exc)