test(users): cover google oauth identity safety
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled

This commit is contained in:
2026-05-14 21:18:11 +03:30
parent d75c19bb6b
commit 388d4e0e7f

View File

@@ -1,7 +1,10 @@
from io import StringIO
from unittest.mock import patch
from django.conf import settings
from django.core.cache import cache
from django.core.management import call_command
from django.db import IntegrityError
from django.test import override_settings
from rest_framework.test import APIRequestFactory
from rest_framework import status
@@ -9,6 +12,7 @@ from rest_framework.test import APITestCase
from apps.users.api.views import RegisterWithPasswordView
from apps.users.models import User, UserSocialAccount
from apps.users.services.google_oauth import GoogleProfile
class UserApiViewTests(APITestCase):
@@ -158,14 +162,14 @@ class UserApiViewTests(APITestCase):
{
"mobile": "09123330001",
"code": "123456",
"password": "weakpass",
"re_password": "weakpass",
"password": "Short1!",
"re_password": "Short1!",
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("Password must be at least 8 characters", response.data["error"])
self.assertIn("too short", response.data["error"])
@patch("apps.users.api.views.change_password")
def test_change_password_view_requires_auth_and_calls_service(self, change_password):
@@ -259,6 +263,43 @@ class UserApiViewTests(APITestCase):
self.user.refresh_from_db()
self.assertEqual(self.user.description, "Bio")
def test_user_email_is_normalized_on_save(self):
user = User.objects.create_user(
mobile="09123330099",
password="Secret123!",
email=" MixedCase@Example.COM ",
)
self.assertEqual(user.email, "mixedcase@example.com")
def test_user_email_is_case_insensitively_unique(self):
User.objects.create_user(
mobile="09123330100",
password="Secret123!",
email="duplicate@example.com",
)
with self.assertRaises(IntegrityError):
User.objects.create_user(
mobile="09123330101",
password="Secret123!",
email=" DUPLICATE@example.com ",
)
def test_user_profile_patch_rejects_duplicate_normalized_email(self):
self.client.force_authenticate(user=self.user)
self.other_user.email = "other@example.com"
self.other_user.save(update_fields=["email"])
response = self.client.patch(
"/api/users/me/",
{"email": " OTHER@example.com "},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("already exists", response.data["error"])
def test_user_search_handles_missing_mobile_not_found_and_success(self):
self.client.force_authenticate(user=self.user)
@@ -468,12 +509,33 @@ class UserThrottleTests(APITestCase):
class GoogleOAuthApiTests(APITestCase):
@classmethod
def setUpTestData(cls):
cls.user = User.objects.create_user(
cls.linked_user = User.objects.create_user(
mobile="09125550001",
password="secret123",
first_name="Google",
last_name="Linked",
)
cls.email_matched_user = User.objects.create_user(
mobile="09125550002",
password="secret123",
first_name="Email",
last_name="Matched",
email="matched@example.com",
)
cls.blank_email_user = User.objects.create_user(
mobile="09125550003",
password="secret123",
first_name="Blank",
last_name="Email",
email=None,
)
cls.other_email_user = User.objects.create_user(
mobile="09125550004",
password="secret123",
first_name="Other",
last_name="Email",
email="other@example.com",
)
def setUp(self):
cache.clear()
@@ -493,20 +555,16 @@ class GoogleOAuthApiTests(APITestCase):
self,
exchange_code_for_google_profile,
):
exchange_code_for_google_profile.return_value = type(
"Profile",
(),
{
"provider_user_id": "google-sub-1",
"email": "linked@example.com",
"email_verified": True,
"first_name": "Google",
"last_name": "Linked",
"avatar_url": "https://example.com/avatar.png",
},
)()
exchange_code_for_google_profile.return_value = GoogleProfile(
provider_user_id="google-sub-1",
email="linked@example.com",
email_verified=True,
first_name="Google",
last_name="Linked",
avatar_url="https://example.com/avatar.png",
)
UserSocialAccount.objects.create(
user=self.user,
user=self.linked_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-sub-1",
email="linked@example.com",
@@ -537,18 +595,14 @@ class GoogleOAuthApiTests(APITestCase):
self,
exchange_code_for_google_profile,
):
exchange_code_for_google_profile.return_value = type(
"Profile",
(),
{
"provider_user_id": "google-sub-2",
"email": "new@example.com",
"email_verified": True,
"first_name": "New",
"last_name": "User",
"avatar_url": "https://example.com/new-avatar.png",
},
)()
exchange_code_for_google_profile.return_value = GoogleProfile(
provider_user_id="google-sub-2",
email="new@example.com",
email_verified=True,
first_name="New",
last_name="User",
avatar_url="https://example.com/new-avatar.png",
)
start_response = self.client.get("/api/users/oauth/google/start/")
state = start_response["Location"].split("state=", 1)[1].split("&", 1)[0]
@@ -563,38 +617,108 @@ class GoogleOAuthApiTests(APITestCase):
self.assertEqual(flow_response.status_code, 200)
self.assertEqual(flow_response.data["status"], "collect_mobile")
self.assertEqual(flow_response.data["email"], "new@example.com")
self.assertEqual(flow_response.data["resolution"], "new_account")
self.assertIsNone(flow_response.data["mobile_hint"])
@patch("apps.users.api.views.exchange_code_for_google_profile")
def test_google_callback_redirects_with_email_claim_flow_for_matching_email(
self,
exchange_code_for_google_profile,
):
exchange_code_for_google_profile.return_value = GoogleProfile(
provider_user_id="google-sub-email-match",
email="matched@example.com",
email_verified=True,
first_name="Email",
last_name="Matched",
avatar_url="https://example.com/matched.png",
)
start_response = self.client.get("/api/users/oauth/google/start/")
state = start_response["Location"].split("state=", 1)[1].split("&", 1)[0]
response = self.client.get(f"/api/users/oauth/google/callback/?state={state}&code=google-code")
flow = response["Location"].split("flow=", 1)[1]
flow_response = self.client.get(f"/api/users/oauth/google/flow/?flow={flow}")
self.assertEqual(flow_response.status_code, 200)
self.assertEqual(flow_response.data["status"], "collect_mobile")
self.assertEqual(flow_response.data["resolution"], "existing_email_claim")
self.assertEqual(flow_response.data["email"], "matched@example.com")
self.assertTrue(flow_response.data["mobile_hint"].startswith("09"))
@patch("apps.users.services.google_oauth.generate_and_send_otp")
def test_google_complete_existing_mobile_moves_flow_to_claim_required(self, generate_and_send_otp):
def test_google_complete_matching_mobile_on_email_claim_moves_flow_to_claim_required(
self,
generate_and_send_otp,
):
cache.set(
"google_oauth_flow:test-flow",
{
"status": "collect_mobile",
"google_profile": {
"provider_user_id": "google-sub-3",
"email": "existing@example.com",
"email": "matched@example.com",
"email_verified": True,
"first_name": "Existing",
"last_name": "User",
"first_name": "Email",
"last_name": "Matched",
"avatar_url": "https://example.com/existing.png",
},
"email": "existing@example.com",
"first_name": "Existing",
"last_name": "User",
"email": "matched@example.com",
"first_name": "Email",
"last_name": "Matched",
"avatar_url": "https://example.com/existing.png",
"resolution": "existing_email_claim",
"target_user_id": str(self.email_matched_user.id),
"mobile_hint": "09*****0002",
},
900,
)
response = self.client.post(
"/api/users/oauth/google/complete/",
{"flow": "test-flow", "mobile": self.user.mobile},
{"flow": "test-flow", "mobile": self.email_matched_user.mobile},
format="json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["status"], "claim_required")
generate_and_send_otp.assert_called_once_with(self.user.mobile, "login")
self.assertEqual(response.data["resolution"], "existing_email_claim")
generate_and_send_otp.assert_called_once_with(self.email_matched_user.mobile, "login")
def test_google_complete_wrong_mobile_on_email_claim_returns_conflict(self):
cache.set(
"google_oauth_flow:email-conflict-flow",
{
"status": "collect_mobile",
"google_profile": {
"provider_user_id": "google-sub-email-conflict",
"email": "matched@example.com",
"email_verified": True,
"first_name": "Email",
"last_name": "Matched",
"avatar_url": "https://example.com/existing.png",
},
"email": "matched@example.com",
"first_name": "Email",
"last_name": "Matched",
"avatar_url": "https://example.com/existing.png",
"resolution": "existing_email_claim",
"target_user_id": str(self.email_matched_user.id),
"mobile_hint": "09*****0002",
},
900,
)
response = self.client.post(
"/api/users/oauth/google/complete/",
{"flow": "email-conflict-flow", "mobile": self.other_email_user.mobile},
format="json",
)
self.assertEqual(response.status_code, 409)
self.assertEqual(response.data["code"], "google_email_mobile_conflict")
self.assertEqual(response.data["mobile_hint"], "09*****0002")
def test_google_complete_new_mobile_creates_user_and_link(self):
cache.set(
@@ -635,6 +759,78 @@ class GoogleOAuthApiTests(APITestCase):
).exists()
)
@patch("apps.users.services.google_oauth.generate_and_send_otp")
def test_google_complete_existing_blank_email_mobile_moves_flow_to_claim_required(
self,
generate_and_send_otp,
):
cache.set(
"google_oauth_flow:blank-email-flow",
{
"status": "collect_mobile",
"google_profile": {
"provider_user_id": "google-sub-blank",
"email": "blank-claim@example.com",
"email_verified": True,
"first_name": "Blank",
"last_name": "Claim",
"avatar_url": "https://example.com/blank.png",
},
"email": "blank-claim@example.com",
"first_name": "Blank",
"last_name": "Claim",
"avatar_url": "https://example.com/blank.png",
"resolution": "new_account",
"target_user_id": None,
"mobile_hint": None,
},
900,
)
response = self.client.post(
"/api/users/oauth/google/complete/",
{"flow": "blank-email-flow", "mobile": self.blank_email_user.mobile},
format="json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["status"], "claim_required")
self.assertEqual(response.data["resolution"], "existing_mobile_claim")
generate_and_send_otp.assert_called_once_with(self.blank_email_user.mobile, "login")
def test_google_complete_existing_non_empty_email_mobile_returns_conflict(self):
cache.set(
"google_oauth_flow:mobile-conflict-flow",
{
"status": "collect_mobile",
"google_profile": {
"provider_user_id": "google-sub-mobile-conflict",
"email": "new-google@example.com",
"email_verified": True,
"first_name": "New",
"last_name": "Google",
"avatar_url": "https://example.com/conflict.png",
},
"email": "new-google@example.com",
"first_name": "New",
"last_name": "Google",
"avatar_url": "https://example.com/conflict.png",
"resolution": "new_account",
"target_user_id": None,
"mobile_hint": None,
},
900,
)
response = self.client.post(
"/api/users/oauth/google/complete/",
{"flow": "mobile-conflict-flow", "mobile": self.other_email_user.mobile},
format="json",
)
self.assertEqual(response.status_code, 409)
self.assertEqual(response.data["code"], "google_mobile_belongs_to_other_email")
@patch("apps.users.api.views.send_google_claim_otp")
def test_google_claim_send_otp_endpoint_dispatches(self, send_google_claim_otp):
send_google_claim_otp.return_value = {"detail": "Verification code sent successfully."}
@@ -661,3 +857,110 @@ class GoogleOAuthApiTests(APITestCase):
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["access"], "a")
verify_google_claim.assert_called_once_with(flow="claim-flow", code="12345")
@patch("apps.users.services.google_oauth.get_tokens_for_user")
def test_google_claim_verify_sets_blank_user_email_from_google(self, get_tokens_for_user):
get_tokens_for_user.return_value = {"access": "a", "refresh": "r"}
cache.set(
"google_oauth_flow:claim-verify-flow",
{
"status": "claim_required",
"google_profile": {
"provider_user_id": "google-sub-verify",
"email": "claimed@example.com",
"email_verified": True,
"first_name": "Claimed",
"last_name": "User",
"avatar_url": "https://example.com/claim.png",
},
"mobile": self.blank_email_user.mobile,
"user_id": str(self.blank_email_user.id),
"resolution": "existing_mobile_claim",
"email": "claimed@example.com",
"mobile_hint": "09*****0003",
"detail": "claim",
},
900,
)
with patch("django_redis.get_redis_connection") as get_redis_connection:
redis_mock = get_redis_connection.return_value
redis_mock.get.return_value = b"12345"
response = self.client.post(
"/api/users/oauth/google/claim/verify/",
{"flow": "claim-verify-flow", "code": "12345"},
format="json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["status"], "authenticated")
self.blank_email_user.refresh_from_db()
self.assertEqual(self.blank_email_user.email, "claimed@example.com")
self.assertTrue(
UserSocialAccount.objects.filter(
user=self.blank_email_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-sub-verify",
).exists()
)
class GoogleOAuthAuditCommandTests(APITestCase):
def test_audit_google_social_links_reports_suspicious_links(self):
linked_user = User.objects.create_user(
mobile="09126660001",
password="secret123",
email="owner@example.com",
)
other_user = User.objects.create_user(
mobile="09126660002",
password="secret123",
email="shared@example.com",
)
third_user = User.objects.create_user(
mobile="09126660003",
password="secret123",
email=None,
)
UserSocialAccount.objects.create(
user=linked_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-audit-1",
email="different@example.com",
email_verified=True,
avatar_url="https://example.com/audit-1.png",
)
UserSocialAccount.objects.create(
user=third_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-audit-2",
email="shared@example.com",
email_verified=True,
avatar_url="https://example.com/audit-2.png",
)
UserSocialAccount.objects.create(
user=linked_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-audit-3",
email="duplicate@example.com",
email_verified=True,
avatar_url="https://example.com/audit-3.png",
)
UserSocialAccount.objects.create(
user=third_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-audit-4",
email="duplicate@example.com",
email_verified=True,
avatar_url="https://example.com/audit-4.png",
)
stdout = StringIO()
call_command("audit_google_social_links", stdout=stdout)
output = stdout.getvalue()
self.assertIn("linked_user_email_mismatch", output)
self.assertIn("provider_email_matches_other_user", output)
self.assertIn("duplicate_provider_email_across_users", output)