310 lines
12 KiB
Python
310 lines
12 KiB
Python
from __future__ import annotations
|
||
|
||
import random
|
||
import string
|
||
from dataclasses import dataclass
|
||
from datetime import timedelta
|
||
|
||
from django.contrib.auth import password_validation
|
||
from django.core.exceptions import ValidationError as DjangoValidationError
|
||
from django.db import transaction
|
||
from django.utils import timezone
|
||
from django_redis import get_redis_connection
|
||
|
||
from apps.users.email_identity import (
|
||
is_valid_mobile_number,
|
||
normalize_email_identity,
|
||
normalize_mobile_number,
|
||
)
|
||
from apps.users.models import Major, University, User
|
||
from apps.users.tasks import send_critical_sms
|
||
from core.authentication import create_jwt_token, create_refresh_token
|
||
|
||
OTP_EXPIRY_SECONDS = 120
|
||
VERIFIED_OTP_EXPIRY_SECONDS = 900
|
||
OTP_KEY_PREFIX = "auth_otp"
|
||
OTP_VERIFIED_KEY_PREFIX = "auth_otp_verified"
|
||
SMS_KIND_REGISTER = "auth_register_otp"
|
||
SMS_KIND_LOGIN = "auth_login_otp"
|
||
SMS_KIND_RESET_PASSWORD = "auth_reset_password_otp"
|
||
SMS_KIND_VERIFY_MOBILE = "auth_verify_mobile_otp"
|
||
|
||
OTP_MODE_SMS_KIND = {
|
||
"register": SMS_KIND_REGISTER,
|
||
"login": SMS_KIND_LOGIN,
|
||
"reset_password": SMS_KIND_RESET_PASSWORD,
|
||
"verify_mobile": SMS_KIND_VERIFY_MOBILE,
|
||
"google_claim": SMS_KIND_VERIFY_MOBILE,
|
||
}
|
||
|
||
|
||
class AuthServiceError(Exception):
|
||
def __init__(self, message: str, *, field: str = "detail", status_code: int = 400):
|
||
super().__init__(message)
|
||
self.message = message
|
||
self.field = field
|
||
self.status_code = status_code
|
||
|
||
def to_response(self) -> dict[str, str]:
|
||
return {"error": self.message}
|
||
|
||
|
||
@dataclass
|
||
class RegistrationPayload:
|
||
username: str
|
||
mobile: str
|
||
password: str
|
||
code: str
|
||
email: str | None = None
|
||
first_name: str = ""
|
||
last_name: str = ""
|
||
university: str | None = None
|
||
student_id: str | None = None
|
||
year_of_study: int | None = None
|
||
major: str | None = None
|
||
|
||
|
||
def _otp_key(mode: str, mobile: str) -> str:
|
||
return f"{OTP_KEY_PREFIX}:{mode}:{mobile}"
|
||
|
||
|
||
def _verified_otp_key(mode: str, mobile: str) -> str:
|
||
return f"{OTP_VERIFIED_KEY_PREFIX}:{mode}:{mobile}"
|
||
|
||
|
||
def _redis():
|
||
return get_redis_connection("default")
|
||
|
||
|
||
def _validate_new_password(password: str, *, user: User | None = None, field_name: str = "password") -> None:
|
||
try:
|
||
password_validation.validate_password(password, user=user)
|
||
except DjangoValidationError as exc:
|
||
message = exc.messages[0] if len(exc.messages) == 1 else exc.messages[0]
|
||
raise AuthServiceError(message, field=field_name)
|
||
|
||
|
||
def _resolve_user_by_identifier(identifier: str) -> User | None:
|
||
normalized_mobile = normalize_mobile_number(identifier)
|
||
if is_valid_mobile_number(normalized_mobile):
|
||
return User.objects.filter(mobile=normalized_mobile).first()
|
||
normalized_email = normalize_email_identity(identifier)
|
||
if normalized_email:
|
||
return User.objects.filter(email=normalized_email).first()
|
||
return None
|
||
|
||
|
||
def _get_major_from_code(code: str | None) -> Major | None:
|
||
if not code:
|
||
return None
|
||
return Major.objects.filter(code=code, is_deleted=False).first()
|
||
|
||
|
||
def _get_university_from_code(code: str | None) -> University | None:
|
||
if not code:
|
||
return None
|
||
return University.objects.filter(code=code, is_deleted=False).first()
|
||
|
||
|
||
def _normalize_otp_code(code: str) -> str:
|
||
normalized = (
|
||
str(code or "")
|
||
.strip()
|
||
.translate(str.maketrans("۰۱۲۳۴۵۶۷۸۹٠١٢٣٤٥٦٧٨٩", "01234567890123456789"))
|
||
)
|
||
if len(normalized) != 5 or not normalized.isdigit():
|
||
raise AuthServiceError("کد تایید باید شامل ۵ رقم باشد.", field="code")
|
||
return normalized
|
||
|
||
|
||
def _issue_otp(mobile: str, mode: str) -> dict[str, str | int]:
|
||
verification_code = "".join(random.choices(string.digits, k=5))
|
||
redis_conn = _redis()
|
||
redis_conn.setex(_otp_key(mode, mobile), OTP_EXPIRY_SECONDS, verification_code)
|
||
send_critical_sms.delay(mobile, OTP_MODE_SMS_KIND.get(mode, SMS_KIND_VERIFY_MOBILE), verification_code)
|
||
expires_at = timezone.now() + timedelta(seconds=OTP_EXPIRY_SECONDS)
|
||
return {
|
||
"message": "کد تایید با موفقیت ارسال شد.",
|
||
"expires_in_seconds": OTP_EXPIRY_SECONDS,
|
||
"expires_at": expires_at.isoformat(),
|
||
}
|
||
|
||
|
||
def verify_otp_code(*, mobile: str, code: str, mode: str) -> None:
|
||
normalized_code = _normalize_otp_code(code)
|
||
redis_conn = _redis()
|
||
stored_code = redis_conn.get(_otp_key(mode, mobile))
|
||
if not stored_code or stored_code.decode("utf-8") != normalized_code:
|
||
raise AuthServiceError("کد تایید نامعتبر است یا منقضی شده است.", field="code")
|
||
redis_conn.delete(_otp_key(mode, mobile))
|
||
|
||
|
||
def verify_register_otp(mobile: str, code: str) -> None:
|
||
normalized_mobile = normalize_mobile_number(mobile)
|
||
if not is_valid_mobile_number(normalized_mobile):
|
||
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
|
||
verify_otp_code(mobile=normalized_mobile, code=code, mode="register")
|
||
_redis().setex(_verified_otp_key("register", normalized_mobile), VERIFIED_OTP_EXPIRY_SECONDS, "1")
|
||
|
||
|
||
def _consume_verified_otp(mobile: str, mode: str) -> bool:
|
||
redis_conn = _redis()
|
||
key = _verified_otp_key(mode, mobile)
|
||
if redis_conn.get(key):
|
||
redis_conn.delete(key)
|
||
return True
|
||
return False
|
||
|
||
|
||
def get_tokens_for_user(user: User) -> dict[str, str]:
|
||
return {
|
||
"access_token": create_jwt_token(user),
|
||
"refresh_token": create_refresh_token(user),
|
||
"token_type": "bearer",
|
||
}
|
||
|
||
|
||
def generate_and_send_otp(mobile: str, mode: str) -> dict[str, str | int]:
|
||
normalized_mobile = normalize_mobile_number(mobile)
|
||
if not is_valid_mobile_number(normalized_mobile):
|
||
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
|
||
|
||
user_exists = User.objects.filter(mobile=normalized_mobile).exists()
|
||
|
||
if mode == "register" and user_exists:
|
||
raise AuthServiceError("این شماره قبلاً ثبت شده است.", field="mobile")
|
||
if mode in {"login", "reset_password"} and not user_exists:
|
||
raise AuthServiceError("کاربری با این شماره موبایل یافت نشد.", field="mobile")
|
||
|
||
return _issue_otp(normalized_mobile, mode)
|
||
|
||
|
||
@transaction.atomic
|
||
def register_user(payload: RegistrationPayload) -> User:
|
||
if payload.student_id and len(str(payload.student_id)) < 10:
|
||
raise AuthServiceError("شماره دانشجویی باید حداقل ۱۰ رقم باشد.", field="student_id")
|
||
|
||
if User.objects.filter(username=payload.username).exists():
|
||
raise AuthServiceError("نام کاربری قبلاً استفاده شده است.", field="username")
|
||
|
||
normalized_email = normalize_email_identity(payload.email)
|
||
if normalized_email and User.objects.filter(email=normalized_email).exists():
|
||
raise AuthServiceError("این ایمیل قبلاً ثبت شده است.", field="email")
|
||
|
||
normalized_mobile = normalize_mobile_number(payload.mobile)
|
||
if not is_valid_mobile_number(normalized_mobile):
|
||
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
|
||
|
||
if not _consume_verified_otp(normalized_mobile, "register"):
|
||
verify_otp_code(mobile=normalized_mobile, code=payload.code, mode="register")
|
||
_validate_new_password(payload.password)
|
||
|
||
major = _get_major_from_code(payload.major)
|
||
if payload.major and not major:
|
||
raise AuthServiceError("رشته انتخابی معتبر نیست.", field="major")
|
||
|
||
university = _get_university_from_code(payload.university)
|
||
if payload.university and not university:
|
||
raise AuthServiceError("دانشگاه انتخابی معتبر نیست.", field="university")
|
||
|
||
if payload.student_id and university and User.objects.filter(
|
||
university=university,
|
||
student_id=payload.student_id,
|
||
).exists():
|
||
raise AuthServiceError(
|
||
"این شماره دانشجویی در دانشگاه انتخابی قبلاً ثبت شده است.",
|
||
field="student_id",
|
||
)
|
||
|
||
user = User.objects.create_user(
|
||
username=payload.username,
|
||
email=normalized_email,
|
||
mobile=normalized_mobile,
|
||
password=payload.password,
|
||
first_name=payload.first_name or "",
|
||
last_name=payload.last_name or "",
|
||
student_id=payload.student_id,
|
||
year_of_study=payload.year_of_study,
|
||
major=major,
|
||
university=university,
|
||
is_email_verified=bool(normalized_email),
|
||
is_mobile_verified=True,
|
||
)
|
||
return user
|
||
|
||
|
||
def login_with_password(identifier: str, password: str) -> dict[str, str]:
|
||
user = _resolve_user_by_identifier(identifier)
|
||
if not user or not user.check_password(password):
|
||
raise AuthServiceError("شناسه ورود یا رمز عبور نادرست است.", status_code=401)
|
||
if not user.is_active:
|
||
raise AuthServiceError("حساب کاربری شما غیرفعال است.", status_code=401)
|
||
return get_tokens_for_user(user)
|
||
|
||
|
||
def login_with_otp(mobile: str, code: str) -> dict[str, str]:
|
||
normalized_mobile = normalize_mobile_number(mobile)
|
||
if not is_valid_mobile_number(normalized_mobile):
|
||
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
|
||
user = User.objects.filter(mobile=normalized_mobile).first()
|
||
if not user:
|
||
raise AuthServiceError("کاربری با این شماره موبایل یافت نشد.", field="mobile")
|
||
if not user.is_active:
|
||
raise AuthServiceError("حساب کاربری شما غیرفعال است.", status_code=401)
|
||
verify_otp_code(mobile=normalized_mobile, code=code, mode="login")
|
||
if not user.is_mobile_verified:
|
||
user.is_mobile_verified = True
|
||
user.save(update_fields=["is_mobile_verified"])
|
||
return get_tokens_for_user(user)
|
||
|
||
|
||
def reset_password_with_otp(mobile: str, code: str, password: str) -> None:
|
||
normalized_mobile = normalize_mobile_number(mobile)
|
||
user = User.objects.filter(mobile=normalized_mobile).first()
|
||
if not user:
|
||
raise AuthServiceError("کاربری با این شماره موبایل یافت نشد.", field="mobile")
|
||
|
||
verify_otp_code(mobile=normalized_mobile, code=code, mode="reset_password")
|
||
_validate_new_password(password, user=user)
|
||
if user.check_password(password):
|
||
raise AuthServiceError("رمز عبور جدید نباید با رمز قبلی یکسان باشد.", field="password")
|
||
user.set_password(password)
|
||
user.save(update_fields=["password"])
|
||
|
||
|
||
def send_authenticated_mobile_otp(user: User, mobile: str) -> dict[str, str | int]:
|
||
normalized_mobile = normalize_mobile_number(mobile)
|
||
if not is_valid_mobile_number(normalized_mobile):
|
||
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
|
||
|
||
conflict = User.objects.filter(mobile=normalized_mobile).exclude(pk=user.pk).exists()
|
||
if conflict:
|
||
raise AuthServiceError("این شماره موبایل قبلاً به حساب دیگری متصل شده است.", field="mobile")
|
||
return _issue_otp(normalized_mobile, "verify_mobile")
|
||
|
||
|
||
def verify_authenticated_mobile_otp(user: User, mobile: str, code: str) -> User:
|
||
normalized_mobile = normalize_mobile_number(mobile)
|
||
if not is_valid_mobile_number(normalized_mobile):
|
||
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
|
||
conflict = User.objects.filter(mobile=normalized_mobile).exclude(pk=user.pk).exists()
|
||
if conflict:
|
||
raise AuthServiceError("این شماره موبایل قبلاً به حساب دیگری متصل شده است.", field="mobile")
|
||
verify_otp_code(mobile=normalized_mobile, code=code, mode="verify_mobile")
|
||
user.mobile = normalized_mobile
|
||
user.is_mobile_verified = True
|
||
user.save(update_fields=["mobile", "is_mobile_verified"])
|
||
return user
|
||
|
||
|
||
def lookup_mobile_registration_state(mobile: str) -> dict[str, bool]:
|
||
normalized_mobile = normalize_mobile_number(mobile)
|
||
if not is_valid_mobile_number(normalized_mobile):
|
||
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
|
||
|
||
user = User.objects.filter(mobile=normalized_mobile).first()
|
||
return {
|
||
"exists": bool(user),
|
||
"has_password": bool(user and user.has_usable_password()),
|
||
}
|