diff --git a/apps/users/api/serializers.py b/apps/users/api/serializers.py index 0c678b0..8461258 100644 --- a/apps/users/api/serializers.py +++ b/apps/users/api/serializers.py @@ -1,4 +1,5 @@ -from django.contrib.auth import get_user_model +from django.contrib.auth import get_user_model, password_validation +from django.core.exceptions import ValidationError as DjangoValidationError from drf_spectacular.utils import extend_schema_serializer from rest_framework import serializers @@ -6,10 +7,18 @@ from core.serializers.base import BaseModelSerializer User = get_user_model() -INVALID_MOBILE_FORMAT_MESSAGE = "فرمت شماره موبایل نادرست است." -INVALID_MOBILE_NUMBER_MESSAGE = "شماره موبایل معتبر نیست." -PASSWORD_MISMATCH_MESSAGE = "رمز عبور مطابقت ندارد." -NEW_PASSWORD_MISMATCH_MESSAGE = "رمز عبور جدید و تکرار آن مطابقت ندارند." +INVALID_MOBILE_FORMAT_MESSAGE = "\u0641\u0631\u0645\u062a \u0634\u0645\u0627\u0631\u0647 \u0645\u0648\u0628\u0627\u06cc\u0644 \u0646\u0627\u062f\u0631\u0633\u062a \u0627\u0633\u062a." +INVALID_MOBILE_NUMBER_MESSAGE = "\u0634\u0645\u0627\u0631\u0647 \u0645\u0648\u0628\u0627\u06cc\u0644 \u0645\u0639\u062a\u0628\u0631 \u0646\u06cc\u0633\u062a." +PASSWORD_MISMATCH_MESSAGE = "\u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u0645\u0637\u0627\u0628\u0642\u062a \u0646\u062f\u0627\u0631\u062f." +NEW_PASSWORD_MISMATCH_MESSAGE = "\u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u062c\u062f\u06cc\u062f \u0648 \u062a\u06a9\u0631\u0627\u0631 \u0622\u0646 \u0645\u0637\u0627\u0628\u0642\u062a \u0646\u062f\u0627\u0631\u0646\u062f." +PASSWORD_REUSE_MESSAGE = "\u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u062c\u062f\u06cc\u062f \u0646\u0628\u0627\u06cc\u062f \u0628\u0627 \u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u0642\u0628\u0644\u06cc \u06cc\u06a9\u0633\u0627\u0646 \u0628\u0627\u0634\u062f." + + +def _raise_password_validation_error(password: str, *, user, field_name: str) -> None: + try: + password_validation.validate_password(password, user=user) + except DjangoValidationError as exc: + raise serializers.ValidationError({field_name: exc.messages[0] if len(exc.messages) == 1 else exc.messages}) class UserProfilePictureSerializer(BaseModelSerializer): @@ -116,8 +125,21 @@ class ResetPasswordSerializer(serializers.Serializer): return value def validate(self, data): - if data.get("password") != data.get("re_password"): + mobile = data.get("mobile", "") + password = data.get("password", "") + re_password = data.get("re_password", "") + + if password != re_password: raise serializers.ValidationError({"password": PASSWORD_MISMATCH_MESSAGE}) + + user = User.objects.filter(mobile=mobile).only("password", "mobile", "first_name", "last_name", "email").first() + + if user is not None: + _raise_password_validation_error(password, user=user, field_name="password") + + if user.check_password(password): + raise serializers.ValidationError({"password": PASSWORD_REUSE_MESSAGE}) + return data @@ -127,9 +149,22 @@ class ChangePasswordSerializer(serializers.Serializer): re_password = serializers.CharField(required=True, write_only=True) def validate(self, data): - if data.get("new_password") != data.get("re_password"): + old_password = data.get("old_password", "") + new_password = data.get("new_password", "") + re_password = data.get("re_password", "") + + if new_password != re_password: raise serializers.ValidationError({"new_password": NEW_PASSWORD_MISMATCH_MESSAGE}) + request = self.context.get("request") + user = getattr(request, "user", None) + + if old_password and old_password == new_password: + raise serializers.ValidationError({"new_password": PASSWORD_REUSE_MESSAGE}) + + if user is not None and getattr(user, "is_authenticated", False): + _raise_password_validation_error(new_password, user=user, field_name="new_password") + return data diff --git a/apps/users/api/views.py b/apps/users/api/views.py index 781fca3..d4fefb0 100644 --- a/apps/users/api/views.py +++ b/apps/users/api/views.py @@ -272,7 +272,7 @@ class ChangePasswordView(APIView): @extend_schema(request=ChangePasswordSerializer) def patch(self, request, *args, **kwargs): - serializer = ChangePasswordSerializer(data=request.data) + serializer = ChangePasswordSerializer(data=request.data, context={"request": request}) serializer.is_valid(raise_exception=True) change_password( diff --git a/apps/users/services/auth.py b/apps/users/services/auth.py index 6fde1c5..0be3017 100644 --- a/apps/users/services/auth.py +++ b/apps/users/services/auth.py @@ -1,174 +1,182 @@ -import random -import string - -from django.utils import timezone -from django.contrib.auth import get_user_model -from django.db import transaction - -from django_redis import get_redis_connection -from rest_framework_simplejwt.tokens import RefreshToken -from rest_framework_simplejwt.exceptions import TokenError -from rest_framework.exceptions import ValidationError - -from apps.users.tasks import send_verification_sms -from apps.users.utils import record_login_attempt -from apps.users.models import LoginAttempt - -User = get_user_model() - -def get_tokens_for_user(user): - """Helper service to generate JWT tokens.""" - refresh = RefreshToken.for_user(user) - return { - "access": str(refresh.access_token), - "refresh": str(refresh), - } - - -def register_user_with_password(mobile, password): - """Business logic for registering a user with just a password.""" - user, created, restored = User.get_or_restore(mobile=mobile) - - if not created and not restored: - raise ValidationError({"detail": "User already exists."}) - - user.set_password(password) - user.save() - - return get_tokens_for_user(user) - -@transaction.atomic -def register_user_with_otp(mobile, code, password, first_name="", last_name=""): - """Business logic for verifying OTP and registering a user.""" - # 1. Check if user already exists - if User.objects.filter(mobile=mobile).exists(): - raise ValidationError({"mobile": "این شماره قبلاً ثبت شده است."}) - - # 2. Verify OTP in Redis - redis_conn = get_redis_connection("default") - stored_code = redis_conn.get(f"verification_code:{mobile}") - - if not stored_code: - raise ValidationError({"code": "کد تأیید یافت نشد."}) - if stored_code.decode("utf-8") != code: - raise ValidationError({"code": "کد تأیید اشتباه است."}) - - # 3. Create User - user = User.objects.create_user( - mobile=mobile, - password=password, - first_name=first_name, - last_name=last_name, - is_verified=True, - is_active=True, - ) - - # 4. Clean up Redis - redis_conn.delete(f"verification_code:{mobile}") - - return get_tokens_for_user(user) - - -def generate_and_send_otp(mobile, mode): - """Business logic for generating OTP, checking existence rules, and sending SMS.""" - user_exists = User.objects.filter(mobile=mobile).exists() - - # Apply business rules based on mode - if mode == "register" and user_exists: - raise ValidationError({"mobile": "این شماره قبلاً ثبت‌نام شده است."}) - - if mode in ["login", "forget_password"] and not user_exists: - raise ValidationError({"mobile": "این شماره یافت نشد."}) - - # Generate OTP - verification_code = "".join(random.choices(string.digits, k=5)) - - # Store in Redis (Assuming 2 minutes / 120 seconds expiry) - redis_conn = get_redis_connection("default") - redis_conn.setex(f"verification_code:{mobile}", 120, verification_code) - - # Trigger async SMS task - send_verification_sms.delay(mobile, verification_code) - - -def login_with_password(mobile, password, request=None): - """Authenticate user with password and record the attempt.""" - user = User.objects.filter(mobile=mobile).first() - - if not user or not user.check_password(password): - record_login_attempt(request, user, LoginAttempt.StatusType.FAILED) - raise ValidationError({"detail": "شماره موبایل یا رمز عبور اشتباه است."}) - - if not user.is_active: - raise ValidationError({"detail": "حساب کاربری شما غیرفعال شده است."}) - - record_login_attempt(request, user, LoginAttempt.StatusType.SUCCESS) - return get_tokens_for_user(user) - - -def login_with_otp(mobile, code, request=None): - """Authenticate or implicitly register user via OTP.""" - redis_conn = get_redis_connection("default") - stored_code = redis_conn.get(f"verification_code:{mobile}") - - if not stored_code or stored_code.decode("utf-8") != code: - record_login_attempt(request, None, LoginAttempt.StatusType.FAILED) - raise ValidationError({"code": "کد تایید نامعتبر است یا منقضی شده است."}) - - # Fixed Bug: Use `mobile=mobile`, not `username=mobile` - user, created = User.objects.get_or_create(mobile=mobile) - if created: - user.set_unusable_password() - user.save() - - if not user.is_active: - raise ValidationError({"detail": "حساب کاربری شما غیرفعال شده است."}) - - record_login_attempt(request, user, LoginAttempt.StatusType.SUCCESS) - - # Clean up Redis - redis_conn.delete(f"verification_code:{mobile}") - - return get_tokens_for_user(user) - - -def reset_password_with_otp(mobile, code, password): - """Verify OTP and change forgotten password.""" - user = User.objects.filter(mobile=mobile).first() - if not user: - raise ValidationError({"mobile": "کاربری با این شماره یافت نشد."}) - - redis_conn = get_redis_connection("default") - stored_code = redis_conn.get(f"verification_code:{mobile}") - - if not stored_code or stored_code.decode("utf-8") != code: - raise ValidationError({"code": "کد تایید نامعتبر است یا منقضی شده است."}) - - # Update password - user.set_password(password) - user.save() - - # Fixed Bug: Ensure we delete the correct key - redis_conn.delete(f"verification_code:{mobile}") - - -def change_password(user, old_password, new_password): - """Change password for an already authenticated user.""" - if not user.check_password(old_password): - raise ValidationError({"old_password": "رمز عبور فعلی اشتباه است."}) - - user.set_password(new_password) - user.password_updated_at = timezone.now() - # Save only the fields that changed for DB performance - user.save(update_fields=["password", "password_updated_at"]) - - -def logout_user(refresh_token_str): - """Blacklist the user's refresh token.""" - if not refresh_token_str: - raise ValidationError({"refresh": "توکن رفرش الزامی است."}) - try: - token = RefreshToken(refresh_token_str) - token.blacklist() - except TokenError: - raise ValidationError({"detail": "توکن نامعتبر است یا قبلا منقضی شده است."}) +import random +import string + +from django.contrib.auth import get_user_model, password_validation +from django.core.exceptions import ValidationError as DjangoValidationError +from django.db import transaction +from django.utils import timezone +from django_redis import get_redis_connection +from rest_framework.exceptions import ValidationError +from rest_framework_simplejwt.exceptions import TokenError +from rest_framework_simplejwt.tokens import RefreshToken + +from apps.users.models import LoginAttempt +from apps.users.tasks import send_verification_sms +from apps.users.utils import record_login_attempt + +User = get_user_model() + +USER_ALREADY_EXISTS_MESSAGE = "User already exists." +PASSWORD_REUSE_MESSAGE = "\u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u062c\u062f\u06cc\u062f \u0646\u0628\u0627\u06cc\u062f \u0628\u0627 \u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u0642\u0628\u0644\u06cc \u06cc\u06a9\u0633\u0627\u0646 \u0628\u0627\u0634\u062f." + + +def _validate_new_password(password, *, user, field_name): + try: + password_validation.validate_password(password, user=user) + except DjangoValidationError as exc: + raise ValidationError({field_name: exc.messages[0] if len(exc.messages) == 1 else exc.messages}) + + +def get_tokens_for_user(user): + """Helper service to generate JWT tokens.""" + refresh = RefreshToken.for_user(user) + return { + "access": str(refresh.access_token), + "refresh": str(refresh), + } + + +def register_user_with_password(mobile, password): + """Business logic for registering a user with just a password.""" + user, created, restored = User.get_or_restore(mobile=mobile) + + if not created and not restored: + raise ValidationError({"detail": USER_ALREADY_EXISTS_MESSAGE}) + + user.set_password(password) + user.save() + + return get_tokens_for_user(user) + + +@transaction.atomic +def register_user_with_otp(mobile, code, password, first_name="", last_name=""): + """Business logic for verifying OTP and registering a user.""" + if User.objects.filter(mobile=mobile).exists(): + raise ValidationError({"mobile": "\u0627\u06cc\u0646 \u0634\u0645\u0627\u0631\u0647 \u0642\u0628\u0644\u0627\u064b \u062b\u0628\u062a \u0634\u062f\u0647 \u0627\u0633\u062a."}) + + redis_conn = get_redis_connection("default") + stored_code = redis_conn.get(f"verification_code:{mobile}") + + if not stored_code: + raise ValidationError({"code": "\u06a9\u062f \u062a\u0623\u06cc\u06cc\u062f \u06cc\u0627\u0641\u062a \u0646\u0634\u062f."}) + if stored_code.decode("utf-8") != code: + raise ValidationError({"code": "\u06a9\u062f \u062a\u0623\u06cc\u06cc\u062f \u0627\u0634\u062a\u0628\u0627\u0647 \u0627\u0633\u062a."}) + + user = User.objects.create_user( + mobile=mobile, + password=password, + first_name=first_name, + last_name=last_name, + is_verified=True, + is_active=True, + ) + + redis_conn.delete(f"verification_code:{mobile}") + + return get_tokens_for_user(user) + + +def generate_and_send_otp(mobile, mode): + """Business logic for generating OTP, checking existence rules, and sending SMS.""" + user_exists = User.objects.filter(mobile=mobile).exists() + + if mode == "register" and user_exists: + raise ValidationError({"mobile": "\u0627\u06cc\u0646 \u0634\u0645\u0627\u0631\u0647 \u0642\u0628\u0644\u0627\u064b \u062b\u0628\u062a\u200c\u0646\u0627\u0645 \u0634\u062f\u0647 \u0627\u0633\u062a."}) + + if mode in ["login", "forget_password"] and not user_exists: + raise ValidationError({"mobile": "\u0627\u06cc\u0646 \u0634\u0645\u0627\u0631\u0647 \u06cc\u0627\u0641\u062a \u0646\u0634\u062f."}) + + verification_code = "".join(random.choices(string.digits, k=5)) + + redis_conn = get_redis_connection("default") + redis_conn.setex(f"verification_code:{mobile}", 120, verification_code) + + send_verification_sms.delay(mobile, verification_code) + + +def login_with_password(mobile, password, request=None): + """Authenticate user with password and record the attempt.""" + user = User.objects.filter(mobile=mobile).first() + + if not user or not user.check_password(password): + record_login_attempt(request, user, LoginAttempt.StatusType.FAILED) + raise ValidationError({"detail": "\u0634\u0645\u0627\u0631\u0647 \u0645\u0648\u0628\u0627\u06cc\u0644 \u06cc\u0627 \u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u0627\u0634\u062a\u0628\u0627\u0647 \u0627\u0633\u062a."}) + + if not user.is_active: + raise ValidationError({"detail": "\u062d\u0633\u0627\u0628 \u06a9\u0627\u0631\u0628\u0631\u06cc \u0634\u0645\u0627 \u063a\u06cc\u0631\u0641\u0639\u0627\u0644 \u0634\u062f\u0647 \u0627\u0633\u062a."}) + + record_login_attempt(request, user, LoginAttempt.StatusType.SUCCESS) + return get_tokens_for_user(user) + + +def login_with_otp(mobile, code, request=None): + """Authenticate or implicitly register user via OTP.""" + redis_conn = get_redis_connection("default") + stored_code = redis_conn.get(f"verification_code:{mobile}") + + if not stored_code or stored_code.decode("utf-8") != code: + record_login_attempt(request, None, LoginAttempt.StatusType.FAILED) + raise ValidationError({"code": "\u06a9\u062f \u062a\u0627\u06cc\u06cc\u062f \u0646\u0627\u0645\u0639\u062a\u0628\u0631 \u0627\u0633\u062a \u06cc\u0627 \u0645\u0646\u0642\u0636\u06cc \u0634\u062f\u0647 \u0627\u0633\u062a."}) + + user, created = User.objects.get_or_create(mobile=mobile) + if created: + user.set_unusable_password() + user.save() + + if not user.is_active: + raise ValidationError({"detail": "\u062d\u0633\u0627\u0628 \u06a9\u0627\u0631\u0628\u0631\u06cc \u0634\u0645\u0627 \u063a\u06cc\u0631\u0641\u0639\u0627\u0644 \u0634\u062f\u0647 \u0627\u0633\u062a."}) + + record_login_attempt(request, user, LoginAttempt.StatusType.SUCCESS) + redis_conn.delete(f"verification_code:{mobile}") + + return get_tokens_for_user(user) + + +def reset_password_with_otp(mobile, code, password): + """Verify OTP and change forgotten password.""" + user = User.objects.filter(mobile=mobile).first() + if not user: + raise ValidationError({"mobile": "\u06a9\u0627\u0631\u0628\u0631\u06cc \u0628\u0627 \u0627\u06cc\u0646 \u0634\u0645\u0627\u0631\u0647 \u06cc\u0627\u0641\u062a \u0646\u0634\u062f."}) + + redis_conn = get_redis_connection("default") + stored_code = redis_conn.get(f"verification_code:{mobile}") + + if not stored_code or stored_code.decode("utf-8") != code: + raise ValidationError({"code": "\u06a9\u062f \u062a\u0627\u06cc\u06cc\u062f \u0646\u0627\u0645\u0639\u062a\u0628\u0631 \u0627\u0633\u062a \u06cc\u0627 \u0645\u0646\u0642\u0636\u06cc \u0634\u062f\u0647 \u0627\u0633\u062a."}) + + _validate_new_password(password, user=user, field_name="password") + + if user.check_password(password): + raise ValidationError({"password": PASSWORD_REUSE_MESSAGE}) + + user.set_password(password) + user.save() + + redis_conn.delete(f"verification_code:{mobile}") + + +def change_password(user, old_password, new_password): + """Change password for an already authenticated user.""" + if not user.check_password(old_password): + raise ValidationError({"old_password": "\u0631\u0645\u0632 \u0639\u0628\u0648\u0631 \u0641\u0639\u0644\u06cc \u0627\u0634\u062a\u0628\u0627\u0647 \u0627\u0633\u062a."}) + + _validate_new_password(new_password, user=user, field_name="new_password") + + if old_password == new_password: + raise ValidationError({"new_password": PASSWORD_REUSE_MESSAGE}) + + user.set_password(new_password) + user.password_updated_at = timezone.now() + user.save(update_fields=["password", "password_updated_at"]) + + +def logout_user(refresh_token_str): + """Blacklist the user's refresh token.""" + if not refresh_token_str: + raise ValidationError({"refresh": "\u062a\u0648\u06a9\u0646 \u0631\u0641\u0631\u0634 \u0627\u0644\u0632\u0627\u0645\u06cc \u0627\u0633\u062a."}) + try: + token = RefreshToken(refresh_token_str) + token.blacklist() + except TokenError: + raise ValidationError({"detail": "\u062a\u0648\u06a9\u0646 \u0646\u0627\u0645\u0639\u062a\u0628\u0631 \u0627\u0633\u062a \u06cc\u0627 \u0642\u0628\u0644\u0627 \u0645\u0646\u0642\u0636\u06cc \u0634\u062f\u0647 \u0627\u0633\u062a."}) diff --git a/apps/users/tests/test_api_views.py b/apps/users/tests/test_api_views.py index a34df04..1818c15 100644 --- a/apps/users/tests/test_api_views.py +++ b/apps/users/tests/test_api_views.py @@ -113,8 +113,8 @@ class UserApiViewTests(APITestCase): { "mobile": "09123330001", "code": "123456", - "password": "new-secret123", - "re_password": "new-secret123", + "password": "NewSecret1!", + "re_password": "NewSecret1!", }, format="json", ) @@ -123,7 +123,7 @@ class UserApiViewTests(APITestCase): reset_password_with_otp.assert_called_once_with( mobile="09123330001", code="123456", - password="new-secret123", + password="NewSecret1!", ) def test_reset_password_view_rejects_invalid_mobile_format(self): @@ -132,8 +132,8 @@ class UserApiViewTests(APITestCase): { "mobile": "9123330001", "code": "123456", - "password": "new-secret123", - "re_password": "new-secret123", + "password": "NewSecret1!", + "re_password": "NewSecret1!", }, format="json", ) @@ -141,6 +141,21 @@ class UserApiViewTests(APITestCase): self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("error", response.data) + def test_reset_password_view_rejects_weak_password(self): + response = self.client.post( + "/api/users/password/reset/", + { + "mobile": "09123330001", + "code": "123456", + "password": "weakpass", + "re_password": "weakpass", + }, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("Password must be at least 8 characters", response.data["error"]) + @patch("apps.users.api.views.change_password") def test_change_password_view_requires_auth_and_calls_service(self, change_password): self.client.force_authenticate(user=self.user) @@ -149,8 +164,8 @@ class UserApiViewTests(APITestCase): "/api/users/password/change/", { "old_password": "secret123", - "new_password": "new-secret123", - "re_password": "new-secret123", + "new_password": "NewSecret1!", + "re_password": "NewSecret1!", }, format="json", ) @@ -159,9 +174,25 @@ class UserApiViewTests(APITestCase): change_password.assert_called_once_with( user=self.user, old_password="secret123", - new_password="new-secret123", + new_password="NewSecret1!", ) + def test_change_password_view_rejects_reused_password(self): + self.client.force_authenticate(user=self.user) + + response = self.client.patch( + "/api/users/password/change/", + { + "old_password": "secret123", + "new_password": "secret123", + "re_password": "secret123", + }, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("رمز عبور جدید", response.data["error"]) + @patch("apps.users.api.views.logout_user") def test_logout_view_calls_service(self, logout_user): self.client.force_authenticate(user=self.user) diff --git a/apps/users/tests/test_auth_services.py b/apps/users/tests/test_auth_services.py index d7864e7..3afbee0 100644 --- a/apps/users/tests/test_auth_services.py +++ b/apps/users/tests/test_auth_services.py @@ -119,26 +119,42 @@ class AuthServiceTests(TestCase): @patch("apps.users.services.auth.get_redis_connection") def test_reset_password_with_otp_updates_password(self, get_redis_connection): - user = User.objects.create_user(mobile="09120000007", password="oldsecret") + user = User.objects.create_user(mobile="09120000007", password="OldSecret1!") fake_redis = FakeRedisConnection() fake_redis.setex("verification_code:09120000007", 120, "12345") get_redis_connection.return_value = fake_redis - reset_password_with_otp("09120000007", "12345", "newsecret") + reset_password_with_otp("09120000007", "12345", "NewSecret1!") user.refresh_from_db() - self.assertTrue(user.check_password("newsecret")) + self.assertTrue(user.check_password("NewSecret1!")) self.assertNotIn("verification_code:09120000007", fake_redis.store) def test_change_password_updates_existing_user_password(self): - user = User.objects.create_user(mobile="09120000008", password="oldsecret") + user = User.objects.create_user(mobile="09120000008", password="OldSecret1!") - change_password(user, "oldsecret", "newsecret") + change_password(user, "OldSecret1!", "NewSecret1!") user.refresh_from_db() - self.assertTrue(user.check_password("newsecret")) + self.assertTrue(user.check_password("NewSecret1!")) self.assertIsNotNone(user.password_updated_at) + @patch("apps.users.services.auth.get_redis_connection") + def test_reset_password_with_otp_rejects_reused_password(self, get_redis_connection): + User.objects.create_user(mobile="09120000070", password="OldSecret1!") + fake_redis = FakeRedisConnection() + fake_redis.setex("verification_code:09120000070", 120, "12345") + get_redis_connection.return_value = fake_redis + + with self.assertRaises(ValidationError): + reset_password_with_otp("09120000070", "12345", "OldSecret1!") + + def test_change_password_rejects_weak_password(self): + user = User.objects.create_user(mobile="09120000071", password="OldSecret1!") + + with self.assertRaises(ValidationError): + change_password(user, "OldSecret1!", "weakpass") + def test_logout_user_blacklists_refresh_token(self): user = User.objects.create_user(mobile="09120000009", password="secret123") refresh = str(RefreshToken.for_user(user)) diff --git a/core/validators/password.py b/core/validators/password.py new file mode 100644 index 0000000..d5f7cf9 --- /dev/null +++ b/core/validators/password.py @@ -0,0 +1,29 @@ +import re + +from django.core.exceptions import ValidationError + + +class PasswordComplexityValidator: + lower_pattern = re.compile(r"[a-z]") + upper_pattern = re.compile(r"[A-Z]") + digit_pattern = re.compile(r"\d") + symbol_pattern = re.compile(r"[^A-Za-z0-9]") + + message = ( + "Password must be at least 8 characters and include at least one lowercase " + "letter, one uppercase letter, one digit, and one symbol." + ) + code = "password_no_complexity" + + def validate(self, password, user=None): + if ( + len(password) < 8 + or not self.lower_pattern.search(password) + or not self.upper_pattern.search(password) + or not self.digit_pattern.search(password) + or not self.symbol_pattern.search(password) + ): + raise ValidationError(self.message, code=self.code) + + def get_help_text(self): + return self.message