diff --git a/apps/users/tests/test_api_views.py b/apps/users/tests/test_api_views.py index 833fd70..40a002f 100644 --- a/apps/users/tests/test_api_views.py +++ b/apps/users/tests/test_api_views.py @@ -1,7 +1,10 @@ +from io import StringIO from unittest.mock import patch from django.conf import settings from django.core.cache import cache +from django.core.management import call_command +from django.db import IntegrityError from django.test import override_settings from rest_framework.test import APIRequestFactory from rest_framework import status @@ -9,6 +12,7 @@ from rest_framework.test import APITestCase from apps.users.api.views import RegisterWithPasswordView from apps.users.models import User, UserSocialAccount +from apps.users.services.google_oauth import GoogleProfile class UserApiViewTests(APITestCase): @@ -158,14 +162,14 @@ class UserApiViewTests(APITestCase): { "mobile": "09123330001", "code": "123456", - "password": "weakpass", - "re_password": "weakpass", + "password": "Short1!", + "re_password": "Short1!", }, format="json", ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertIn("Password must be at least 8 characters", response.data["error"]) + self.assertIn("too short", response.data["error"]) @patch("apps.users.api.views.change_password") def test_change_password_view_requires_auth_and_calls_service(self, change_password): @@ -259,6 +263,43 @@ class UserApiViewTests(APITestCase): self.user.refresh_from_db() self.assertEqual(self.user.description, "Bio") + def test_user_email_is_normalized_on_save(self): + user = User.objects.create_user( + mobile="09123330099", + password="Secret123!", + email=" MixedCase@Example.COM ", + ) + + self.assertEqual(user.email, "mixedcase@example.com") + + def test_user_email_is_case_insensitively_unique(self): + User.objects.create_user( + mobile="09123330100", + password="Secret123!", + email="duplicate@example.com", + ) + + with self.assertRaises(IntegrityError): + User.objects.create_user( + mobile="09123330101", + password="Secret123!", + email=" DUPLICATE@example.com ", + ) + + def test_user_profile_patch_rejects_duplicate_normalized_email(self): + self.client.force_authenticate(user=self.user) + self.other_user.email = "other@example.com" + self.other_user.save(update_fields=["email"]) + + response = self.client.patch( + "/api/users/me/", + {"email": " OTHER@example.com "}, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("already exists", response.data["error"]) + def test_user_search_handles_missing_mobile_not_found_and_success(self): self.client.force_authenticate(user=self.user) @@ -468,12 +509,33 @@ class UserThrottleTests(APITestCase): class GoogleOAuthApiTests(APITestCase): @classmethod def setUpTestData(cls): - cls.user = User.objects.create_user( + cls.linked_user = User.objects.create_user( mobile="09125550001", password="secret123", first_name="Google", last_name="Linked", ) + cls.email_matched_user = User.objects.create_user( + mobile="09125550002", + password="secret123", + first_name="Email", + last_name="Matched", + email="matched@example.com", + ) + cls.blank_email_user = User.objects.create_user( + mobile="09125550003", + password="secret123", + first_name="Blank", + last_name="Email", + email=None, + ) + cls.other_email_user = User.objects.create_user( + mobile="09125550004", + password="secret123", + first_name="Other", + last_name="Email", + email="other@example.com", + ) def setUp(self): cache.clear() @@ -493,20 +555,16 @@ class GoogleOAuthApiTests(APITestCase): self, exchange_code_for_google_profile, ): - exchange_code_for_google_profile.return_value = type( - "Profile", - (), - { - "provider_user_id": "google-sub-1", - "email": "linked@example.com", - "email_verified": True, - "first_name": "Google", - "last_name": "Linked", - "avatar_url": "https://example.com/avatar.png", - }, - )() + exchange_code_for_google_profile.return_value = GoogleProfile( + provider_user_id="google-sub-1", + email="linked@example.com", + email_verified=True, + first_name="Google", + last_name="Linked", + avatar_url="https://example.com/avatar.png", + ) UserSocialAccount.objects.create( - user=self.user, + user=self.linked_user, provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id="google-sub-1", email="linked@example.com", @@ -537,18 +595,14 @@ class GoogleOAuthApiTests(APITestCase): self, exchange_code_for_google_profile, ): - exchange_code_for_google_profile.return_value = type( - "Profile", - (), - { - "provider_user_id": "google-sub-2", - "email": "new@example.com", - "email_verified": True, - "first_name": "New", - "last_name": "User", - "avatar_url": "https://example.com/new-avatar.png", - }, - )() + exchange_code_for_google_profile.return_value = GoogleProfile( + provider_user_id="google-sub-2", + email="new@example.com", + email_verified=True, + first_name="New", + last_name="User", + avatar_url="https://example.com/new-avatar.png", + ) start_response = self.client.get("/api/users/oauth/google/start/") state = start_response["Location"].split("state=", 1)[1].split("&", 1)[0] @@ -563,38 +617,108 @@ class GoogleOAuthApiTests(APITestCase): self.assertEqual(flow_response.status_code, 200) self.assertEqual(flow_response.data["status"], "collect_mobile") self.assertEqual(flow_response.data["email"], "new@example.com") + self.assertEqual(flow_response.data["resolution"], "new_account") + self.assertIsNone(flow_response.data["mobile_hint"]) + + @patch("apps.users.api.views.exchange_code_for_google_profile") + def test_google_callback_redirects_with_email_claim_flow_for_matching_email( + self, + exchange_code_for_google_profile, + ): + exchange_code_for_google_profile.return_value = GoogleProfile( + provider_user_id="google-sub-email-match", + email="matched@example.com", + email_verified=True, + first_name="Email", + last_name="Matched", + avatar_url="https://example.com/matched.png", + ) + + start_response = self.client.get("/api/users/oauth/google/start/") + state = start_response["Location"].split("state=", 1)[1].split("&", 1)[0] + response = self.client.get(f"/api/users/oauth/google/callback/?state={state}&code=google-code") + flow = response["Location"].split("flow=", 1)[1] + + flow_response = self.client.get(f"/api/users/oauth/google/flow/?flow={flow}") + + self.assertEqual(flow_response.status_code, 200) + self.assertEqual(flow_response.data["status"], "collect_mobile") + self.assertEqual(flow_response.data["resolution"], "existing_email_claim") + self.assertEqual(flow_response.data["email"], "matched@example.com") + self.assertTrue(flow_response.data["mobile_hint"].startswith("09")) @patch("apps.users.services.google_oauth.generate_and_send_otp") - def test_google_complete_existing_mobile_moves_flow_to_claim_required(self, generate_and_send_otp): + def test_google_complete_matching_mobile_on_email_claim_moves_flow_to_claim_required( + self, + generate_and_send_otp, + ): cache.set( "google_oauth_flow:test-flow", { "status": "collect_mobile", "google_profile": { "provider_user_id": "google-sub-3", - "email": "existing@example.com", + "email": "matched@example.com", "email_verified": True, - "first_name": "Existing", - "last_name": "User", + "first_name": "Email", + "last_name": "Matched", "avatar_url": "https://example.com/existing.png", }, - "email": "existing@example.com", - "first_name": "Existing", - "last_name": "User", + "email": "matched@example.com", + "first_name": "Email", + "last_name": "Matched", "avatar_url": "https://example.com/existing.png", + "resolution": "existing_email_claim", + "target_user_id": str(self.email_matched_user.id), + "mobile_hint": "09*****0002", }, 900, ) response = self.client.post( "/api/users/oauth/google/complete/", - {"flow": "test-flow", "mobile": self.user.mobile}, + {"flow": "test-flow", "mobile": self.email_matched_user.mobile}, format="json", ) self.assertEqual(response.status_code, 200) self.assertEqual(response.data["status"], "claim_required") - generate_and_send_otp.assert_called_once_with(self.user.mobile, "login") + self.assertEqual(response.data["resolution"], "existing_email_claim") + generate_and_send_otp.assert_called_once_with(self.email_matched_user.mobile, "login") + + def test_google_complete_wrong_mobile_on_email_claim_returns_conflict(self): + cache.set( + "google_oauth_flow:email-conflict-flow", + { + "status": "collect_mobile", + "google_profile": { + "provider_user_id": "google-sub-email-conflict", + "email": "matched@example.com", + "email_verified": True, + "first_name": "Email", + "last_name": "Matched", + "avatar_url": "https://example.com/existing.png", + }, + "email": "matched@example.com", + "first_name": "Email", + "last_name": "Matched", + "avatar_url": "https://example.com/existing.png", + "resolution": "existing_email_claim", + "target_user_id": str(self.email_matched_user.id), + "mobile_hint": "09*****0002", + }, + 900, + ) + + response = self.client.post( + "/api/users/oauth/google/complete/", + {"flow": "email-conflict-flow", "mobile": self.other_email_user.mobile}, + format="json", + ) + + self.assertEqual(response.status_code, 409) + self.assertEqual(response.data["code"], "google_email_mobile_conflict") + self.assertEqual(response.data["mobile_hint"], "09*****0002") def test_google_complete_new_mobile_creates_user_and_link(self): cache.set( @@ -635,6 +759,78 @@ class GoogleOAuthApiTests(APITestCase): ).exists() ) + @patch("apps.users.services.google_oauth.generate_and_send_otp") + def test_google_complete_existing_blank_email_mobile_moves_flow_to_claim_required( + self, + generate_and_send_otp, + ): + cache.set( + "google_oauth_flow:blank-email-flow", + { + "status": "collect_mobile", + "google_profile": { + "provider_user_id": "google-sub-blank", + "email": "blank-claim@example.com", + "email_verified": True, + "first_name": "Blank", + "last_name": "Claim", + "avatar_url": "https://example.com/blank.png", + }, + "email": "blank-claim@example.com", + "first_name": "Blank", + "last_name": "Claim", + "avatar_url": "https://example.com/blank.png", + "resolution": "new_account", + "target_user_id": None, + "mobile_hint": None, + }, + 900, + ) + + response = self.client.post( + "/api/users/oauth/google/complete/", + {"flow": "blank-email-flow", "mobile": self.blank_email_user.mobile}, + format="json", + ) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data["status"], "claim_required") + self.assertEqual(response.data["resolution"], "existing_mobile_claim") + generate_and_send_otp.assert_called_once_with(self.blank_email_user.mobile, "login") + + def test_google_complete_existing_non_empty_email_mobile_returns_conflict(self): + cache.set( + "google_oauth_flow:mobile-conflict-flow", + { + "status": "collect_mobile", + "google_profile": { + "provider_user_id": "google-sub-mobile-conflict", + "email": "new-google@example.com", + "email_verified": True, + "first_name": "New", + "last_name": "Google", + "avatar_url": "https://example.com/conflict.png", + }, + "email": "new-google@example.com", + "first_name": "New", + "last_name": "Google", + "avatar_url": "https://example.com/conflict.png", + "resolution": "new_account", + "target_user_id": None, + "mobile_hint": None, + }, + 900, + ) + + response = self.client.post( + "/api/users/oauth/google/complete/", + {"flow": "mobile-conflict-flow", "mobile": self.other_email_user.mobile}, + format="json", + ) + + self.assertEqual(response.status_code, 409) + self.assertEqual(response.data["code"], "google_mobile_belongs_to_other_email") + @patch("apps.users.api.views.send_google_claim_otp") def test_google_claim_send_otp_endpoint_dispatches(self, send_google_claim_otp): send_google_claim_otp.return_value = {"detail": "Verification code sent successfully."} @@ -661,3 +857,110 @@ class GoogleOAuthApiTests(APITestCase): self.assertEqual(response.status_code, 200) self.assertEqual(response.data["access"], "a") verify_google_claim.assert_called_once_with(flow="claim-flow", code="12345") + + @patch("apps.users.services.google_oauth.get_tokens_for_user") + def test_google_claim_verify_sets_blank_user_email_from_google(self, get_tokens_for_user): + get_tokens_for_user.return_value = {"access": "a", "refresh": "r"} + cache.set( + "google_oauth_flow:claim-verify-flow", + { + "status": "claim_required", + "google_profile": { + "provider_user_id": "google-sub-verify", + "email": "claimed@example.com", + "email_verified": True, + "first_name": "Claimed", + "last_name": "User", + "avatar_url": "https://example.com/claim.png", + }, + "mobile": self.blank_email_user.mobile, + "user_id": str(self.blank_email_user.id), + "resolution": "existing_mobile_claim", + "email": "claimed@example.com", + "mobile_hint": "09*****0003", + "detail": "claim", + }, + 900, + ) + + with patch("django_redis.get_redis_connection") as get_redis_connection: + redis_mock = get_redis_connection.return_value + redis_mock.get.return_value = b"12345" + + response = self.client.post( + "/api/users/oauth/google/claim/verify/", + {"flow": "claim-verify-flow", "code": "12345"}, + format="json", + ) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data["status"], "authenticated") + self.blank_email_user.refresh_from_db() + self.assertEqual(self.blank_email_user.email, "claimed@example.com") + self.assertTrue( + UserSocialAccount.objects.filter( + user=self.blank_email_user, + provider=UserSocialAccount.ProviderType.GOOGLE, + provider_user_id="google-sub-verify", + ).exists() + ) + + +class GoogleOAuthAuditCommandTests(APITestCase): + def test_audit_google_social_links_reports_suspicious_links(self): + linked_user = User.objects.create_user( + mobile="09126660001", + password="secret123", + email="owner@example.com", + ) + other_user = User.objects.create_user( + mobile="09126660002", + password="secret123", + email="shared@example.com", + ) + third_user = User.objects.create_user( + mobile="09126660003", + password="secret123", + email=None, + ) + + UserSocialAccount.objects.create( + user=linked_user, + provider=UserSocialAccount.ProviderType.GOOGLE, + provider_user_id="google-audit-1", + email="different@example.com", + email_verified=True, + avatar_url="https://example.com/audit-1.png", + ) + UserSocialAccount.objects.create( + user=third_user, + provider=UserSocialAccount.ProviderType.GOOGLE, + provider_user_id="google-audit-2", + email="shared@example.com", + email_verified=True, + avatar_url="https://example.com/audit-2.png", + ) + UserSocialAccount.objects.create( + user=linked_user, + provider=UserSocialAccount.ProviderType.GOOGLE, + provider_user_id="google-audit-3", + email="duplicate@example.com", + email_verified=True, + avatar_url="https://example.com/audit-3.png", + ) + UserSocialAccount.objects.create( + user=third_user, + provider=UserSocialAccount.ProviderType.GOOGLE, + provider_user_id="google-audit-4", + email="duplicate@example.com", + email_verified=True, + avatar_url="https://example.com/audit-4.png", + ) + + stdout = StringIO() + call_command("audit_google_social_links", stdout=stdout) + output = stdout.getvalue() + + self.assertIn("linked_user_email_mismatch", output) + self.assertIn("provider_email_matches_other_user", output) + self.assertIn("duplicate_provider_email_across_users", output)