feat(users): apply django password validators in auth flows

This commit is contained in:
2026-05-03 20:02:14 +03:30
parent f04e9ba828
commit d1c4889d22
6 changed files with 315 additions and 196 deletions

View File

@@ -1,174 +1,182 @@
import random
import string
from django.utils import timezone
from django.contrib.auth import get_user_model
from django.db import transaction
from django_redis import get_redis_connection
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError
from rest_framework.exceptions import ValidationError
from apps.users.tasks import send_verification_sms
from apps.users.utils import record_login_attempt
from apps.users.models import LoginAttempt
User = get_user_model()
def get_tokens_for_user(user):
"""Helper service to generate JWT tokens."""
refresh = RefreshToken.for_user(user)
return {
"access": str(refresh.access_token),
"refresh": str(refresh),
}
def register_user_with_password(mobile, password):
"""Business logic for registering a user with just a password."""
user, created, restored = User.get_or_restore(mobile=mobile)
if not created and not restored:
raise ValidationError({"detail": "User already exists."})
user.set_password(password)
user.save()
return get_tokens_for_user(user)
@transaction.atomic
def register_user_with_otp(mobile, code, password, first_name="", last_name=""):
"""Business logic for verifying OTP and registering a user."""
# 1. Check if user already exists
if User.objects.filter(mobile=mobile).exists():
raise ValidationError({"mobile": "این شماره قبلاً ثبت شده است."})
# 2. Verify OTP in Redis
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
if not stored_code:
raise ValidationError({"code": "کد تأیید یافت نشد."})
if stored_code.decode("utf-8") != code:
raise ValidationError({"code": "کد تأیید اشتباه است."})
# 3. Create User
user = User.objects.create_user(
mobile=mobile,
password=password,
first_name=first_name,
last_name=last_name,
is_verified=True,
is_active=True,
)
# 4. Clean up Redis
redis_conn.delete(f"verification_code:{mobile}")
return get_tokens_for_user(user)
def generate_and_send_otp(mobile, mode):
"""Business logic for generating OTP, checking existence rules, and sending SMS."""
user_exists = User.objects.filter(mobile=mobile).exists()
# Apply business rules based on mode
if mode == "register" and user_exists:
raise ValidationError({"mobile": "این شماره قبلاً ثبت‌نام شده است."})
if mode in ["login", "forget_password"] and not user_exists:
raise ValidationError({"mobile": "این شماره یافت نشد."})
# Generate OTP
verification_code = "".join(random.choices(string.digits, k=5))
# Store in Redis (Assuming 2 minutes / 120 seconds expiry)
redis_conn = get_redis_connection("default")
redis_conn.setex(f"verification_code:{mobile}", 120, verification_code)
# Trigger async SMS task
send_verification_sms.delay(mobile, verification_code)
def login_with_password(mobile, password, request=None):
"""Authenticate user with password and record the attempt."""
user = User.objects.filter(mobile=mobile).first()
if not user or not user.check_password(password):
record_login_attempt(request, user, LoginAttempt.StatusType.FAILED)
raise ValidationError({"detail": "شماره موبایل یا رمز عبور اشتباه است."})
if not user.is_active:
raise ValidationError({"detail": "حساب کاربری شما غیرفعال شده است."})
record_login_attempt(request, user, LoginAttempt.StatusType.SUCCESS)
return get_tokens_for_user(user)
def login_with_otp(mobile, code, request=None):
"""Authenticate or implicitly register user via OTP."""
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
if not stored_code or stored_code.decode("utf-8") != code:
record_login_attempt(request, None, LoginAttempt.StatusType.FAILED)
raise ValidationError({"code": "کد تایید نامعتبر است یا منقضی شده است."})
# Fixed Bug: Use `mobile=mobile`, not `username=mobile`
user, created = User.objects.get_or_create(mobile=mobile)
if created:
user.set_unusable_password()
user.save()
if not user.is_active:
raise ValidationError({"detail": "حساب کاربری شما غیرفعال شده است."})
record_login_attempt(request, user, LoginAttempt.StatusType.SUCCESS)
# Clean up Redis
redis_conn.delete(f"verification_code:{mobile}")
return get_tokens_for_user(user)
def reset_password_with_otp(mobile, code, password):
"""Verify OTP and change forgotten password."""
user = User.objects.filter(mobile=mobile).first()
if not user:
raise ValidationError({"mobile": "کاربری با این شماره یافت نشد."})
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
if not stored_code or stored_code.decode("utf-8") != code:
raise ValidationError({"code": "کد تایید نامعتبر است یا منقضی شده است."})
# Update password
user.set_password(password)
user.save()
# Fixed Bug: Ensure we delete the correct key
redis_conn.delete(f"verification_code:{mobile}")
def change_password(user, old_password, new_password):
"""Change password for an already authenticated user."""
if not user.check_password(old_password):
raise ValidationError({"old_password": "رمز عبور فعلی اشتباه است."})
user.set_password(new_password)
user.password_updated_at = timezone.now()
# Save only the fields that changed for DB performance
user.save(update_fields=["password", "password_updated_at"])
def logout_user(refresh_token_str):
"""Blacklist the user's refresh token."""
if not refresh_token_str:
raise ValidationError({"refresh": "توکن رفرش الزامی است."})
try:
token = RefreshToken(refresh_token_str)
token.blacklist()
except TokenError:
raise ValidationError({"detail": "توکن نامعتبر است یا قبلا منقضی شده است."})
import random
import string
from django.contrib.auth import get_user_model, password_validation
from django.core.exceptions import ValidationError as DjangoValidationError
from django.db import transaction
from django.utils import timezone
from django_redis import get_redis_connection
from rest_framework.exceptions import ValidationError
from rest_framework_simplejwt.exceptions import TokenError
from rest_framework_simplejwt.tokens import RefreshToken
from apps.users.models import LoginAttempt
from apps.users.tasks import send_verification_sms
from apps.users.utils import record_login_attempt
User = get_user_model()
USER_ALREADY_EXISTS_MESSAGE = "User already exists."
PASSWORD_REUSE_MESSAGE = "\u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u062c\u062f\u06cc\u062f \u0646\u0628\u0627\u06cc\u062f \u0628\u0627 \u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u0642\u0628\u0644\u06cc \u06cc\u06a9\u0633\u0627\u0646 \u0628\u0627\u0634\u062f."
def _validate_new_password(password, *, user, field_name):
try:
password_validation.validate_password(password, user=user)
except DjangoValidationError as exc:
raise ValidationError({field_name: exc.messages[0] if len(exc.messages) == 1 else exc.messages})
def get_tokens_for_user(user):
"""Helper service to generate JWT tokens."""
refresh = RefreshToken.for_user(user)
return {
"access": str(refresh.access_token),
"refresh": str(refresh),
}
def register_user_with_password(mobile, password):
"""Business logic for registering a user with just a password."""
user, created, restored = User.get_or_restore(mobile=mobile)
if not created and not restored:
raise ValidationError({"detail": USER_ALREADY_EXISTS_MESSAGE})
user.set_password(password)
user.save()
return get_tokens_for_user(user)
@transaction.atomic
def register_user_with_otp(mobile, code, password, first_name="", last_name=""):
"""Business logic for verifying OTP and registering a user."""
if User.objects.filter(mobile=mobile).exists():
raise ValidationError({"mobile": "\u0627\u06cc\u0646 \u0634\u0645\u0627\u0631\u0647 \u0642\u0628\u0644\u0627\u064b \u062b\u0628\u062a \u0634\u062f\u0647 \u0627\u0633\u062a."})
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
if not stored_code:
raise ValidationError({"code": "\u06a9\u062f \u062a\u0623\u06cc\u06cc\u062f \u06cc\u0627\u0641\u062a \u0646\u0634\u062f."})
if stored_code.decode("utf-8") != code:
raise ValidationError({"code": "\u06a9\u062f \u062a\u0623\u06cc\u06cc\u062f \u0627\u0634\u062a\u0628\u0627\u0647 \u0627\u0633\u062a."})
user = User.objects.create_user(
mobile=mobile,
password=password,
first_name=first_name,
last_name=last_name,
is_verified=True,
is_active=True,
)
redis_conn.delete(f"verification_code:{mobile}")
return get_tokens_for_user(user)
def generate_and_send_otp(mobile, mode):
"""Business logic for generating OTP, checking existence rules, and sending SMS."""
user_exists = User.objects.filter(mobile=mobile).exists()
if mode == "register" and user_exists:
raise ValidationError({"mobile": "\u0627\u06cc\u0646 \u0634\u0645\u0627\u0631\u0647 \u0642\u0628\u0644\u0627\u064b \u062b\u0628\u062a\u200c\u0646\u0627\u0645 \u0634\u062f\u0647 \u0627\u0633\u062a."})
if mode in ["login", "forget_password"] and not user_exists:
raise ValidationError({"mobile": "\u0627\u06cc\u0646 \u0634\u0645\u0627\u0631\u0647 \u06cc\u0627\u0641\u062a \u0646\u0634\u062f."})
verification_code = "".join(random.choices(string.digits, k=5))
redis_conn = get_redis_connection("default")
redis_conn.setex(f"verification_code:{mobile}", 120, verification_code)
send_verification_sms.delay(mobile, verification_code)
def login_with_password(mobile, password, request=None):
"""Authenticate user with password and record the attempt."""
user = User.objects.filter(mobile=mobile).first()
if not user or not user.check_password(password):
record_login_attempt(request, user, LoginAttempt.StatusType.FAILED)
raise ValidationError({"detail": "\u0634\u0645\u0627\u0631\u0647 \u0645\u0648\u0628\u0627\u06cc\u0644 \u06cc\u0627 \u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u0627\u0634\u062a\u0628\u0627\u0647 \u0627\u0633\u062a."})
if not user.is_active:
raise ValidationError({"detail": "\u062d\u0633\u0627\u0628 \u06a9\u0627\u0631\u0628\u0631\u06cc \u0634\u0645\u0627 \u063a\u06cc\u0631\u0641\u0639\u0627\u0644 \u0634\u062f\u0647 \u0627\u0633\u062a."})
record_login_attempt(request, user, LoginAttempt.StatusType.SUCCESS)
return get_tokens_for_user(user)
def login_with_otp(mobile, code, request=None):
"""Authenticate or implicitly register user via OTP."""
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
if not stored_code or stored_code.decode("utf-8") != code:
record_login_attempt(request, None, LoginAttempt.StatusType.FAILED)
raise ValidationError({"code": "\u06a9\u062f \u062a\u0627\u06cc\u06cc\u062f \u0646\u0627\u0645\u0639\u062a\u0628\u0631 \u0627\u0633\u062a \u06cc\u0627 \u0645\u0646\u0642\u0636\u06cc \u0634\u062f\u0647 \u0627\u0633\u062a."})
user, created = User.objects.get_or_create(mobile=mobile)
if created:
user.set_unusable_password()
user.save()
if not user.is_active:
raise ValidationError({"detail": "\u062d\u0633\u0627\u0628 \u06a9\u0627\u0631\u0628\u0631\u06cc \u0634\u0645\u0627 \u063a\u06cc\u0631\u0641\u0639\u0627\u0644 \u0634\u062f\u0647 \u0627\u0633\u062a."})
record_login_attempt(request, user, LoginAttempt.StatusType.SUCCESS)
redis_conn.delete(f"verification_code:{mobile}")
return get_tokens_for_user(user)
def reset_password_with_otp(mobile, code, password):
"""Verify OTP and change forgotten password."""
user = User.objects.filter(mobile=mobile).first()
if not user:
raise ValidationError({"mobile": "\u06a9\u0627\u0631\u0628\u0631\u06cc \u0628\u0627 \u0627\u06cc\u0646 \u0634\u0645\u0627\u0631\u0647 \u06cc\u0627\u0641\u062a \u0646\u0634\u062f."})
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
if not stored_code or stored_code.decode("utf-8") != code:
raise ValidationError({"code": "\u06a9\u062f \u062a\u0627\u06cc\u06cc\u062f \u0646\u0627\u0645\u0639\u062a\u0628\u0631 \u0627\u0633\u062a \u06cc\u0627 \u0645\u0646\u0642\u0636\u06cc \u0634\u062f\u0647 \u0627\u0633\u062a."})
_validate_new_password(password, user=user, field_name="password")
if user.check_password(password):
raise ValidationError({"password": PASSWORD_REUSE_MESSAGE})
user.set_password(password)
user.save()
redis_conn.delete(f"verification_code:{mobile}")
def change_password(user, old_password, new_password):
"""Change password for an already authenticated user."""
if not user.check_password(old_password):
raise ValidationError({"old_password": "\u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u0641\u0639\u0644\u06cc \u0627\u0634\u062a\u0628\u0627\u0647 \u0627\u0633\u062a."})
_validate_new_password(new_password, user=user, field_name="new_password")
if old_password == new_password:
raise ValidationError({"new_password": PASSWORD_REUSE_MESSAGE})
user.set_password(new_password)
user.password_updated_at = timezone.now()
user.save(update_fields=["password", "password_updated_at"])
def logout_user(refresh_token_str):
"""Blacklist the user's refresh token."""
if not refresh_token_str:
raise ValidationError({"refresh": "\u062a\u0648\u06a9\u0646 \u0631\u0641\u0631\u0634 \u0627\u0644\u0632\u0627\u0645\u06cc \u0627\u0633\u062a."})
try:
token = RefreshToken(refresh_token_str)
token.blacklist()
except TokenError:
raise ValidationError({"detail": "\u062a\u0648\u06a9\u0646 \u0646\u0627\u0645\u0639\u062a\u0628\u0631 \u0627\u0633\u062a \u06cc\u0627 \u0642\u0628\u0644\u0627 \u0645\u0646\u0642\u0636\u06cc \u0634\u062f\u0647 \u0627\u0633\u062a."})