Files
guilan-ace-backend/apps/users/services/auth.py
Amirhossein Khalili b7b21a6cc6
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
feat(backend): migrate auth and notifications off email
2026-05-21 10:28:04 +03:30

310 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import random
import string
from dataclasses import dataclass
from datetime import timedelta
from django.contrib.auth import password_validation
from django.core.exceptions import ValidationError as DjangoValidationError
from django.db import transaction
from django.utils import timezone
from django_redis import get_redis_connection
from apps.users.email_identity import (
is_valid_mobile_number,
normalize_email_identity,
normalize_mobile_number,
)
from apps.users.models import Major, University, User
from apps.users.tasks import send_critical_sms
from core.authentication import create_jwt_token, create_refresh_token
OTP_EXPIRY_SECONDS = 120
VERIFIED_OTP_EXPIRY_SECONDS = 900
OTP_KEY_PREFIX = "auth_otp"
OTP_VERIFIED_KEY_PREFIX = "auth_otp_verified"
SMS_KIND_REGISTER = "auth_register_otp"
SMS_KIND_LOGIN = "auth_login_otp"
SMS_KIND_RESET_PASSWORD = "auth_reset_password_otp"
SMS_KIND_VERIFY_MOBILE = "auth_verify_mobile_otp"
OTP_MODE_SMS_KIND = {
"register": SMS_KIND_REGISTER,
"login": SMS_KIND_LOGIN,
"reset_password": SMS_KIND_RESET_PASSWORD,
"verify_mobile": SMS_KIND_VERIFY_MOBILE,
"google_claim": SMS_KIND_VERIFY_MOBILE,
}
class AuthServiceError(Exception):
def __init__(self, message: str, *, field: str = "detail", status_code: int = 400):
super().__init__(message)
self.message = message
self.field = field
self.status_code = status_code
def to_response(self) -> dict[str, str]:
return {"error": self.message}
@dataclass
class RegistrationPayload:
username: str
mobile: str
password: str
code: str
email: str | None = None
first_name: str = ""
last_name: str = ""
university: str | None = None
student_id: str | None = None
year_of_study: int | None = None
major: str | None = None
def _otp_key(mode: str, mobile: str) -> str:
return f"{OTP_KEY_PREFIX}:{mode}:{mobile}"
def _verified_otp_key(mode: str, mobile: str) -> str:
return f"{OTP_VERIFIED_KEY_PREFIX}:{mode}:{mobile}"
def _redis():
return get_redis_connection("default")
def _validate_new_password(password: str, *, user: User | None = None, field_name: str = "password") -> None:
try:
password_validation.validate_password(password, user=user)
except DjangoValidationError as exc:
message = exc.messages[0] if len(exc.messages) == 1 else exc.messages[0]
raise AuthServiceError(message, field=field_name)
def _resolve_user_by_identifier(identifier: str) -> User | None:
normalized_mobile = normalize_mobile_number(identifier)
if is_valid_mobile_number(normalized_mobile):
return User.objects.filter(mobile=normalized_mobile).first()
normalized_email = normalize_email_identity(identifier)
if normalized_email:
return User.objects.filter(email=normalized_email).first()
return None
def _get_major_from_code(code: str | None) -> Major | None:
if not code:
return None
return Major.objects.filter(code=code, is_deleted=False).first()
def _get_university_from_code(code: str | None) -> University | None:
if not code:
return None
return University.objects.filter(code=code, is_deleted=False).first()
def _normalize_otp_code(code: str) -> str:
normalized = (
str(code or "")
.strip()
.translate(str.maketrans("۰۱۲۳۴۵۶۷۸۹٠١٢٣٤٥٦٧٨٩", "01234567890123456789"))
)
if len(normalized) != 5 or not normalized.isdigit():
raise AuthServiceError("کد تایید باید شامل ۵ رقم باشد.", field="code")
return normalized
def _issue_otp(mobile: str, mode: str) -> dict[str, str | int]:
verification_code = "".join(random.choices(string.digits, k=5))
redis_conn = _redis()
redis_conn.setex(_otp_key(mode, mobile), OTP_EXPIRY_SECONDS, verification_code)
send_critical_sms.delay(mobile, OTP_MODE_SMS_KIND.get(mode, SMS_KIND_VERIFY_MOBILE), verification_code)
expires_at = timezone.now() + timedelta(seconds=OTP_EXPIRY_SECONDS)
return {
"message": "کد تایید با موفقیت ارسال شد.",
"expires_in_seconds": OTP_EXPIRY_SECONDS,
"expires_at": expires_at.isoformat(),
}
def verify_otp_code(*, mobile: str, code: str, mode: str) -> None:
normalized_code = _normalize_otp_code(code)
redis_conn = _redis()
stored_code = redis_conn.get(_otp_key(mode, mobile))
if not stored_code or stored_code.decode("utf-8") != normalized_code:
raise AuthServiceError("کد تایید نامعتبر است یا منقضی شده است.", field="code")
redis_conn.delete(_otp_key(mode, mobile))
def verify_register_otp(mobile: str, code: str) -> None:
normalized_mobile = normalize_mobile_number(mobile)
if not is_valid_mobile_number(normalized_mobile):
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
verify_otp_code(mobile=normalized_mobile, code=code, mode="register")
_redis().setex(_verified_otp_key("register", normalized_mobile), VERIFIED_OTP_EXPIRY_SECONDS, "1")
def _consume_verified_otp(mobile: str, mode: str) -> bool:
redis_conn = _redis()
key = _verified_otp_key(mode, mobile)
if redis_conn.get(key):
redis_conn.delete(key)
return True
return False
def get_tokens_for_user(user: User) -> dict[str, str]:
return {
"access_token": create_jwt_token(user),
"refresh_token": create_refresh_token(user),
"token_type": "bearer",
}
def generate_and_send_otp(mobile: str, mode: str) -> dict[str, str | int]:
normalized_mobile = normalize_mobile_number(mobile)
if not is_valid_mobile_number(normalized_mobile):
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
user_exists = User.objects.filter(mobile=normalized_mobile).exists()
if mode == "register" and user_exists:
raise AuthServiceError("این شماره قبلاً ثبت شده است.", field="mobile")
if mode in {"login", "reset_password"} and not user_exists:
raise AuthServiceError("کاربری با این شماره موبایل یافت نشد.", field="mobile")
return _issue_otp(normalized_mobile, mode)
@transaction.atomic
def register_user(payload: RegistrationPayload) -> User:
if payload.student_id and len(str(payload.student_id)) < 10:
raise AuthServiceError("شماره دانشجویی باید حداقل ۱۰ رقم باشد.", field="student_id")
if User.objects.filter(username=payload.username).exists():
raise AuthServiceError("نام کاربری قبلاً استفاده شده است.", field="username")
normalized_email = normalize_email_identity(payload.email)
if normalized_email and User.objects.filter(email=normalized_email).exists():
raise AuthServiceError("این ایمیل قبلاً ثبت شده است.", field="email")
normalized_mobile = normalize_mobile_number(payload.mobile)
if not is_valid_mobile_number(normalized_mobile):
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
if not _consume_verified_otp(normalized_mobile, "register"):
verify_otp_code(mobile=normalized_mobile, code=payload.code, mode="register")
_validate_new_password(payload.password)
major = _get_major_from_code(payload.major)
if payload.major and not major:
raise AuthServiceError("رشته انتخابی معتبر نیست.", field="major")
university = _get_university_from_code(payload.university)
if payload.university and not university:
raise AuthServiceError("دانشگاه انتخابی معتبر نیست.", field="university")
if payload.student_id and university and User.objects.filter(
university=university,
student_id=payload.student_id,
).exists():
raise AuthServiceError(
"این شماره دانشجویی در دانشگاه انتخابی قبلاً ثبت شده است.",
field="student_id",
)
user = User.objects.create_user(
username=payload.username,
email=normalized_email,
mobile=normalized_mobile,
password=payload.password,
first_name=payload.first_name or "",
last_name=payload.last_name or "",
student_id=payload.student_id,
year_of_study=payload.year_of_study,
major=major,
university=university,
is_email_verified=bool(normalized_email),
is_mobile_verified=True,
)
return user
def login_with_password(identifier: str, password: str) -> dict[str, str]:
user = _resolve_user_by_identifier(identifier)
if not user or not user.check_password(password):
raise AuthServiceError("شناسه ورود یا رمز عبور نادرست است.", status_code=401)
if not user.is_active:
raise AuthServiceError("حساب کاربری شما غیرفعال است.", status_code=401)
return get_tokens_for_user(user)
def login_with_otp(mobile: str, code: str) -> dict[str, str]:
normalized_mobile = normalize_mobile_number(mobile)
if not is_valid_mobile_number(normalized_mobile):
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
user = User.objects.filter(mobile=normalized_mobile).first()
if not user:
raise AuthServiceError("کاربری با این شماره موبایل یافت نشد.", field="mobile")
if not user.is_active:
raise AuthServiceError("حساب کاربری شما غیرفعال است.", status_code=401)
verify_otp_code(mobile=normalized_mobile, code=code, mode="login")
if not user.is_mobile_verified:
user.is_mobile_verified = True
user.save(update_fields=["is_mobile_verified"])
return get_tokens_for_user(user)
def reset_password_with_otp(mobile: str, code: str, password: str) -> None:
normalized_mobile = normalize_mobile_number(mobile)
user = User.objects.filter(mobile=normalized_mobile).first()
if not user:
raise AuthServiceError("کاربری با این شماره موبایل یافت نشد.", field="mobile")
verify_otp_code(mobile=normalized_mobile, code=code, mode="reset_password")
_validate_new_password(password, user=user)
if user.check_password(password):
raise AuthServiceError("رمز عبور جدید نباید با رمز قبلی یکسان باشد.", field="password")
user.set_password(password)
user.save(update_fields=["password"])
def send_authenticated_mobile_otp(user: User, mobile: str) -> dict[str, str | int]:
normalized_mobile = normalize_mobile_number(mobile)
if not is_valid_mobile_number(normalized_mobile):
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
conflict = User.objects.filter(mobile=normalized_mobile).exclude(pk=user.pk).exists()
if conflict:
raise AuthServiceError("این شماره موبایل قبلاً به حساب دیگری متصل شده است.", field="mobile")
return _issue_otp(normalized_mobile, "verify_mobile")
def verify_authenticated_mobile_otp(user: User, mobile: str, code: str) -> User:
normalized_mobile = normalize_mobile_number(mobile)
if not is_valid_mobile_number(normalized_mobile):
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
conflict = User.objects.filter(mobile=normalized_mobile).exclude(pk=user.pk).exists()
if conflict:
raise AuthServiceError("این شماره موبایل قبلاً به حساب دیگری متصل شده است.", field="mobile")
verify_otp_code(mobile=normalized_mobile, code=code, mode="verify_mobile")
user.mobile = normalized_mobile
user.is_mobile_verified = True
user.save(update_fields=["mobile", "is_mobile_verified"])
return user
def lookup_mobile_registration_state(mobile: str) -> dict[str, bool]:
normalized_mobile = normalize_mobile_number(mobile)
if not is_valid_mobile_number(normalized_mobile):
raise AuthServiceError("شماره موبایل معتبر نیست.", field="mobile")
user = User.objects.filter(mobile=normalized_mobile).first()
return {
"exists": bool(user),
"has_password": bool(user and user.has_usable_password()),
}