Compare commits

...

4 Commits

Author SHA1 Message Date
388d4e0e7f test(users): cover google oauth identity safety
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-05-14 21:18:11 +03:30
d75c19bb6b feat(users): add google social account audit command 2026-05-14 21:17:47 +03:30
cacf6114d1 fix(users): harden google oauth account resolution 2026-05-14 21:17:37 +03:30
09d2015351 feat(users): normalize email identity storage 2026-05-14 21:17:25 +03:30
11 changed files with 772 additions and 93 deletions

View File

@@ -4,6 +4,7 @@ from drf_spectacular.utils import extend_schema_serializer
from rest_framework import serializers
from core.serializers.base import BaseModelSerializer
from apps.users.email_identity import normalize_email_identity
User = get_user_model()
@@ -186,6 +187,20 @@ class UserProfileSerializer(BaseModelSerializer):
full_name = serializers.ReadOnlyField()
age = serializers.ReadOnlyField()
def validate_email(self, value):
normalized = normalize_email_identity(value)
user = self.instance
if normalized is None:
return None
existing = User.objects.filter(email=normalized)
if user is not None:
existing = existing.exclude(pk=user.pk)
if existing.exists():
raise serializers.ValidationError("A user with this email already exists.")
return normalized
class Meta:
model = User
fields = BaseModelSerializer.Meta.fields + (

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
def normalize_email_identity(value: str | None) -> str | None:
if value is None:
return None
normalized = value.strip().lower()
return normalized or None
def mask_mobile(value: str | None) -> str | None:
if not value:
return None
if len(value) <= 4:
return value
return f"{value[:2]}{'*' * max(len(value) - 6, 1)}{value[-4:]}"

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
import json
from collections import defaultdict
from django.core.management.base import BaseCommand
from apps.users.email_identity import normalize_email_identity
from apps.users.models import User, UserSocialAccount
class Command(BaseCommand):
help = "Report suspicious Google social-account links without modifying data."
def handle(self, *args, **options):
issues: list[dict] = []
google_accounts = list(
UserSocialAccount.objects.select_related("user").filter(
provider=UserSocialAccount.ProviderType.GOOGLE
)
)
social_email_groups: dict[str, set[str]] = defaultdict(set)
user_by_email = {
user.email: user
for user in User.objects.exclude(email__isnull=True).only("id", "mobile", "email")
}
for account in google_accounts:
provider_email = normalize_email_identity(account.email)
user_email = normalize_email_identity(account.user.email)
if provider_email:
social_email_groups[provider_email].add(str(account.user_id))
if user_email and provider_email and user_email != provider_email:
issues.append(
{
"type": "linked_user_email_mismatch",
"linked_user_id": str(account.user_id),
"linked_user_mobile": account.user.mobile,
"linked_user_email": user_email,
"social_account_id": str(account.id),
"provider_email": provider_email,
"provider_user_id": account.provider_user_id,
}
)
other_user = user_by_email.get(provider_email) if provider_email else None
if other_user and other_user.id != account.user_id:
issues.append(
{
"type": "provider_email_matches_other_user",
"linked_user_id": str(account.user_id),
"linked_user_mobile": account.user.mobile,
"linked_user_email": user_email,
"social_account_id": str(account.id),
"provider_email": provider_email,
"provider_user_id": account.provider_user_id,
"other_user_id": str(other_user.id),
"other_user_mobile": other_user.mobile,
"other_user_email": other_user.email,
}
)
for provider_email, user_ids in social_email_groups.items():
if len(user_ids) <= 1:
continue
issues.append(
{
"type": "duplicate_provider_email_across_users",
"provider_email": provider_email,
"user_ids": sorted(user_ids),
}
)
if not issues:
self.stdout.write(self.style.SUCCESS("No suspicious Google social links found."))
return
for issue in issues:
self.stdout.write(json.dumps(issue, ensure_ascii=True, sort_keys=True))
self.stdout.write(self.style.WARNING(f"Reported {len(issues)} suspicious Google social link issue(s)."))

View File

@@ -0,0 +1,63 @@
# Generated by Django 5.2.12 on 2026-05-14
from django.db import migrations, models
from django.db.models import Q
from django.db.models.functions import Lower
def _normalize_email(value):
if value is None:
return None
normalized = value.strip().lower()
return normalized or None
def normalize_user_and_social_emails(apps, schema_editor):
User = apps.get_model("users", "User")
UserSocialAccount = apps.get_model("users", "UserSocialAccount")
seen_emails = set()
for user in User.objects.all().order_by("created_at", "id"):
normalized_email = _normalize_email(user.email)
if normalized_email in seen_emails:
normalized_email = None
elif normalized_email is not None:
seen_emails.add(normalized_email)
if user.email != normalized_email:
user.email = normalized_email
user.save(update_fields=["email"])
for social_account in UserSocialAccount.objects.all().order_by("created_at", "id"):
normalized_email = _normalize_email(social_account.email)
if social_account.email != normalized_email:
social_account.email = normalized_email
social_account.save(update_fields=["email"])
class Migration(migrations.Migration):
dependencies = [
("users", "0002_usersocialaccount"),
]
operations = [
migrations.AlterField(
model_name="user",
name="email",
field=models.EmailField(blank=True, default=None, max_length=254, null=True),
),
migrations.AlterField(
model_name="usersocialaccount",
name="email",
field=models.EmailField(blank=True, default=None, max_length=254, null=True),
),
migrations.RunPython(normalize_user_and_social_emails, migrations.RunPython.noop),
migrations.AddConstraint(
model_name="user",
constraint=models.UniqueConstraint(
Lower("email"),
condition=Q(email__isnull=False),
name="user_email_ci_uniq",
),
),
]

View File

@@ -1,9 +1,12 @@
from django.contrib.auth.models import AbstractUser
from django.db import models
from django.db.models import Q
from django.db.models.functions import Lower
from core.models.base import BaseModel
from core.utils import calculate_age, common_datetime_str
from apps.users.email_identity import normalize_email_identity
from apps.users.services.managers import UserManager
@@ -11,7 +14,7 @@ class User(AbstractUser, BaseModel):
username = None
mobile = models.CharField(max_length=11, unique=True)
email = models.EmailField(blank=True, default="")
email = models.EmailField(blank=True, null=True, default=None)
description = models.TextField(blank=True, default="")
profile_picture = models.ImageField(upload_to="profile/users/", blank=True, null=True)
@@ -41,11 +44,22 @@ class User(AbstractUser, BaseModel):
def __str__(self):
return self.full_name or self.mobile
def save(self, *args, **kwargs):
self.email = normalize_email_identity(self.email)
super().save(*args, **kwargs)
class Meta:
verbose_name = "user"
verbose_name_plural = "users"
db_table = "user"
ordering = ("-updated_at", "-created_at")
constraints = (
models.UniqueConstraint(
Lower("email"),
condition=Q(email__isnull=False),
name="user_email_ci_uniq",
),
)
indexes = (
models.Index(fields=["id"], name="user_id_idx"),
models.Index(fields=["mobile"], name="user_mobile_idx"),
@@ -78,7 +92,7 @@ class UserSocialAccount(BaseModel):
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="social_accounts")
provider = models.CharField(max_length=32, choices=ProviderType.choices)
provider_user_id = models.CharField(max_length=255)
email = models.EmailField(blank=True, default="")
email = models.EmailField(blank=True, null=True, default=None)
email_verified = models.BooleanField(default=False)
avatar_url = models.URLField(blank=True, default="")
@@ -100,3 +114,7 @@ class UserSocialAccount(BaseModel):
def __str__(self):
return f"{self.provider}:{self.provider_user_id}"
def save(self, *args, **kwargs):
self.email = normalize_email_identity(self.email)
super().save(*args, **kwargs)

View File

@@ -8,8 +8,9 @@ from urllib.parse import urlencode
import requests
from django.conf import settings
from django.core.cache import cache
from rest_framework.exceptions import ValidationError
from rest_framework.exceptions import APIException, ValidationError
from apps.users.email_identity import mask_mobile, normalize_email_identity
from apps.users.models import User, UserSocialAccount
from apps.users.services.auth import generate_and_send_otp, get_tokens_for_user
@@ -25,6 +26,25 @@ GOOGLE_STATE_CACHE_PREFIX = "google_oauth_state"
GOOGLE_FLOW_CACHE_PREFIX = "google_oauth_flow"
class GoogleOAuthFlowError(APIException):
status_code = 409
default_detail = "Google sign-in could not be completed."
default_code = "google_flow_error"
def __init__(
self,
detail: str,
code: str,
*,
status_code: int | None = None,
extra: dict[str, Any] | None = None,
) -> None:
if status_code is not None:
self.status_code = status_code
self.payload_extra = extra or {}
super().__init__(detail=detail, code=code)
@dataclass
class GoogleProfile:
provider_user_id: str
@@ -50,6 +70,97 @@ def _create_token() -> str:
return secrets.token_urlsafe(32)
def _invalid_flow_error(message: str = "Google sign-in flow is invalid or expired.") -> GoogleOAuthFlowError:
return GoogleOAuthFlowError(message, "google_flow_invalid_state", status_code=400)
def _ensure_flow_status(flow_payload: dict[str, Any], expected_status: str) -> None:
if flow_payload.get("status") != expected_status:
raise _invalid_flow_error("Google sign-in flow is in an unexpected state.")
def _profile_to_payload(profile: GoogleProfile) -> dict[str, Any]:
return asdict(profile) if is_dataclass(profile) else {
"provider_user_id": profile.provider_user_id,
"email": profile.email,
"email_verified": profile.email_verified,
"first_name": profile.first_name,
"last_name": profile.last_name,
"avatar_url": profile.avatar_url,
}
def _profile_from_flow(flow_payload: dict[str, Any]) -> GoogleProfile:
google_profile = flow_payload.get("google_profile")
if not isinstance(google_profile, dict):
raise _invalid_flow_error("Google profile data is missing from the flow.")
return GoogleProfile(**google_profile)
def _find_user_by_email(email: str | None) -> User | None:
normalized_email = normalize_email_identity(email)
if normalized_email is None:
return None
return User.objects.filter(email=normalized_email).first()
def _normalize_mobile(mobile: str) -> str:
normalized = "".join(ch for ch in mobile if ch.isdigit())
if len(normalized) != 11 or not normalized.startswith("09"):
raise ValidationError({"mobile": "Invalid mobile number."})
return normalized
def _build_claim_required_payload(
*,
profile: GoogleProfile,
user: User,
mobile: str,
resolution: str,
detail: str,
) -> dict[str, Any]:
return {
"status": "claim_required",
"google_profile": _profile_to_payload(profile),
"mobile": mobile,
"user_id": str(user.id),
"resolution": resolution,
"email": profile.email,
"mobile_hint": mask_mobile(user.mobile),
"detail": detail,
}
def _build_public_google_flow_payload(flow_payload: dict[str, Any]) -> dict[str, Any]:
status = flow_payload.get("status")
if status == "authenticated":
return {
"status": "authenticated",
"access": flow_payload.get("access", ""),
"refresh": flow_payload.get("refresh", ""),
}
if status == "collect_mobile":
return {
"status": "collect_mobile",
"email": flow_payload.get("email", ""),
"first_name": flow_payload.get("first_name", ""),
"last_name": flow_payload.get("last_name", ""),
"avatar_url": flow_payload.get("avatar_url", ""),
"resolution": flow_payload.get("resolution", "new_account"),
"mobile_hint": flow_payload.get("mobile_hint"),
}
if status == "claim_required":
return {
"status": "claim_required",
"mobile": flow_payload.get("mobile", ""),
"detail": flow_payload.get("detail", ""),
"resolution": flow_payload.get("resolution", "existing_mobile_claim"),
"email": flow_payload.get("email", ""),
"mobile_hint": flow_payload.get("mobile_hint"),
}
raise _invalid_flow_error("Google sign-in flow is in an unknown state.")
def create_google_state() -> str:
state = _create_token()
cache.set(_cache_key(GOOGLE_STATE_CACHE_PREFIX, state), {"valid": True}, GOOGLE_STATE_TTL_SECONDS)
@@ -74,13 +185,17 @@ def create_google_flow(payload: dict[str, Any]) -> str:
return flow
def get_google_flow(flow: str) -> dict[str, Any]:
def get_google_flow_payload(flow: str) -> dict[str, Any]:
payload = cache.get(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow))
if not payload:
raise ValidationError({"detail": "Google sign-in flow is invalid or expired."})
raise _invalid_flow_error()
return payload
def get_google_flow(flow: str) -> dict[str, Any]:
return _build_public_google_flow_payload(get_google_flow_payload(flow))
def delete_google_flow(flow: str) -> None:
cache.delete(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow))
@@ -140,7 +255,7 @@ def exchange_code_for_google_profile(code: str) -> GoogleProfile:
raise ValidationError({"detail": "Google user profile lookup failed."}) from exc
provider_user_id = userinfo.get("sub", "")
email = userinfo.get("email", "")
email = normalize_email_identity(userinfo.get("email"))
email_verified = bool(userinfo.get("email_verified"))
if not provider_user_id or not email or not email_verified:
@@ -185,61 +300,73 @@ def build_authenticated_flow_payload(user: User) -> dict[str, Any]:
def build_pending_google_flow_payload(profile: GoogleProfile) -> dict[str, Any]:
profile_payload = asdict(profile) if is_dataclass(profile) else {
"provider_user_id": profile.provider_user_id,
"email": profile.email,
"email_verified": profile.email_verified,
"first_name": profile.first_name,
"last_name": profile.last_name,
"avatar_url": profile.avatar_url,
}
existing_email_user = _find_user_by_email(profile.email)
return {
"status": "collect_mobile",
"google_profile": profile_payload,
"google_profile": _profile_to_payload(profile),
"email": profile.email,
"first_name": profile.first_name,
"last_name": profile.last_name,
"avatar_url": profile.avatar_url,
"resolution": "existing_email_claim" if existing_email_user else "new_account",
"target_user_id": str(existing_email_user.id) if existing_email_user else None,
"mobile_hint": mask_mobile(existing_email_user.mobile) if existing_email_user else None,
}
def _profile_from_flow(flow_payload: dict[str, Any]) -> GoogleProfile:
google_profile = flow_payload.get("google_profile")
if not isinstance(google_profile, dict):
raise ValidationError({"detail": "Google profile is missing from the flow."})
return GoogleProfile(**google_profile)
def _normalize_mobile(mobile: str) -> str:
normalized = "".join(ch for ch in mobile if ch.isdigit())
if len(normalized) != 11 or not normalized.startswith("09"):
raise ValidationError({"mobile": "Invalid mobile number."})
return normalized
def complete_google_signup(flow: str, mobile: str) -> dict[str, Any]:
flow_payload = get_google_flow(flow)
if flow_payload.get("status") != "collect_mobile":
raise ValidationError({"detail": "Google sign-in flow is not ready for mobile completion."})
flow_payload = get_google_flow_payload(flow)
_ensure_flow_status(flow_payload, "collect_mobile")
normalized_mobile = _normalize_mobile(mobile)
profile = _profile_from_flow(flow_payload)
existing_user = User.objects.filter(mobile=normalized_mobile).first()
email_matched_user = None
target_user_id = flow_payload.get("target_user_id")
if target_user_id:
email_matched_user = User.objects.filter(id=target_user_id).first()
if email_matched_user is None:
raise _invalid_flow_error("The Google sign-in target account could not be found.")
existing_mobile_user = User.objects.filter(mobile=normalized_mobile).first()
if email_matched_user is not None:
if normalized_mobile != email_matched_user.mobile:
raise GoogleOAuthFlowError(
"This Google account must be verified with the mobile number already attached to the matching account.",
"google_email_mobile_conflict",
extra={"mobile_hint": mask_mobile(email_matched_user.mobile)},
)
if existing_user:
generate_and_send_otp(normalized_mobile, "login")
claim_payload = {
"status": "claim_required",
"google_profile": asdict(profile),
"mobile": normalized_mobile,
"user_id": str(existing_user.id),
}
claim_payload = _build_claim_required_payload(
profile=profile,
user=email_matched_user,
mobile=normalized_mobile,
resolution="existing_email_claim",
detail="Existing account found for this email. Verify the linked mobile number to attach Google.",
)
update_google_flow(flow, claim_payload)
return {
"status": "claim_required",
"mobile": normalized_mobile,
"detail": "Existing account found. Verify ownership to attach Google.",
}
return _build_public_google_flow_payload(claim_payload)
if existing_mobile_user is not None:
existing_mobile_email = normalize_email_identity(existing_mobile_user.email)
if existing_mobile_email:
raise GoogleOAuthFlowError(
"This mobile number already belongs to another account with a different email address.",
"google_mobile_belongs_to_other_email",
)
generate_and_send_otp(normalized_mobile, "login")
claim_payload = _build_claim_required_payload(
profile=profile,
user=existing_mobile_user,
mobile=normalized_mobile,
resolution="existing_mobile_claim",
detail="Existing mobile account found. Verify ownership to attach Google and set the verified email address.",
)
update_google_flow(flow, claim_payload)
return _build_public_google_flow_payload(claim_payload)
user = User.objects.create_user(
mobile=normalized_mobile,
@@ -269,13 +396,12 @@ def complete_google_signup(flow: str, mobile: str) -> dict[str, Any]:
def send_google_claim_otp(flow: str) -> dict[str, Any]:
flow_payload = get_google_flow(flow)
if flow_payload.get("status") != "claim_required":
raise ValidationError({"detail": "Google sign-in flow is not waiting for claim verification."})
flow_payload = get_google_flow_payload(flow)
_ensure_flow_status(flow_payload, "claim_required")
mobile = flow_payload.get("mobile")
if not isinstance(mobile, str) or not mobile:
raise ValidationError({"detail": "Claim mobile number is missing."})
raise _invalid_flow_error("Claim mobile number is missing.")
generate_and_send_otp(mobile, "login")
return {"detail": "Verification code sent successfully."}
@@ -284,19 +410,18 @@ def send_google_claim_otp(flow: str) -> dict[str, Any]:
def verify_google_claim(flow: str, code: str) -> dict[str, Any]:
from django_redis import get_redis_connection
flow_payload = get_google_flow(flow)
if flow_payload.get("status") != "claim_required":
raise ValidationError({"detail": "Google sign-in flow is not waiting for claim verification."})
flow_payload = get_google_flow_payload(flow)
_ensure_flow_status(flow_payload, "claim_required")
mobile = flow_payload.get("mobile")
if not isinstance(mobile, str) or not mobile:
raise ValidationError({"detail": "Claim mobile number is missing."})
raise _invalid_flow_error("Claim mobile number is missing.")
profile = _profile_from_flow(flow_payload)
user_id = flow_payload.get("user_id")
user = User.objects.filter(id=user_id, mobile=mobile).first()
if not user:
raise ValidationError({"detail": "Target account could not be found."})
raise _invalid_flow_error("Target account could not be found.")
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
@@ -305,9 +430,32 @@ def verify_google_claim(flow: str, code: str) -> dict[str, Any]:
redis_conn.delete(f"verification_code:{mobile}")
resolution = flow_payload.get("resolution")
user_email = normalize_email_identity(user.email)
if resolution == "existing_email_claim" and user_email != profile.email:
raise GoogleOAuthFlowError(
"The matching email account could not be verified for this mobile number.",
"google_email_mobile_conflict",
extra={"mobile_hint": mask_mobile(user.mobile)},
)
if resolution == "existing_mobile_claim" and user_email not in (None, profile.email):
raise GoogleOAuthFlowError(
"This mobile number already belongs to another account with a different email address.",
"google_mobile_belongs_to_other_email",
)
existing_link = find_social_account_for_profile(profile)
if existing_link and existing_link.user_id != user.id:
raise ValidationError({"detail": "This Google account is already attached to another user."})
raise GoogleOAuthFlowError(
"This Google account is already attached to another user.",
"google_email_already_claimed",
)
if user_email is None:
user.email = profile.email
user.save(update_fields=["email"])
if existing_link:
existing_link.email = profile.email

View File

@@ -1,6 +1,7 @@
from django.contrib.auth.models import BaseUserManager
from core.models.base import SoftDeleteManager
from apps.users.email_identity import normalize_email_identity
class UserManager(BaseUserManager, SoftDeleteManager):
@@ -9,6 +10,8 @@ class UserManager(BaseUserManager, SoftDeleteManager):
def _create_user(self, mobile, password, **extra_fields):
if not mobile:
raise ValueError("Mobile must be set")
if "email" in extra_fields:
extra_fields["email"] = normalize_email_identity(extra_fields.get("email"))
user = self.model(mobile=mobile, **extra_fields)
user.set_password(password)
user.save(using=self._db)

View File

@@ -1,7 +1,10 @@
from io import StringIO
from unittest.mock import patch
from django.conf import settings
from django.core.cache import cache
from django.core.management import call_command
from django.db import IntegrityError
from django.test import override_settings
from rest_framework.test import APIRequestFactory
from rest_framework import status
@@ -9,6 +12,7 @@ from rest_framework.test import APITestCase
from apps.users.api.views import RegisterWithPasswordView
from apps.users.models import User, UserSocialAccount
from apps.users.services.google_oauth import GoogleProfile
class UserApiViewTests(APITestCase):
@@ -158,14 +162,14 @@ class UserApiViewTests(APITestCase):
{
"mobile": "09123330001",
"code": "123456",
"password": "weakpass",
"re_password": "weakpass",
"password": "Short1!",
"re_password": "Short1!",
},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("Password must be at least 8 characters", response.data["error"])
self.assertIn("too short", response.data["error"])
@patch("apps.users.api.views.change_password")
def test_change_password_view_requires_auth_and_calls_service(self, change_password):
@@ -259,6 +263,43 @@ class UserApiViewTests(APITestCase):
self.user.refresh_from_db()
self.assertEqual(self.user.description, "Bio")
def test_user_email_is_normalized_on_save(self):
user = User.objects.create_user(
mobile="09123330099",
password="Secret123!",
email=" MixedCase@Example.COM ",
)
self.assertEqual(user.email, "mixedcase@example.com")
def test_user_email_is_case_insensitively_unique(self):
User.objects.create_user(
mobile="09123330100",
password="Secret123!",
email="duplicate@example.com",
)
with self.assertRaises(IntegrityError):
User.objects.create_user(
mobile="09123330101",
password="Secret123!",
email=" DUPLICATE@example.com ",
)
def test_user_profile_patch_rejects_duplicate_normalized_email(self):
self.client.force_authenticate(user=self.user)
self.other_user.email = "other@example.com"
self.other_user.save(update_fields=["email"])
response = self.client.patch(
"/api/users/me/",
{"email": " OTHER@example.com "},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("already exists", response.data["error"])
def test_user_search_handles_missing_mobile_not_found_and_success(self):
self.client.force_authenticate(user=self.user)
@@ -468,12 +509,33 @@ class UserThrottleTests(APITestCase):
class GoogleOAuthApiTests(APITestCase):
@classmethod
def setUpTestData(cls):
cls.user = User.objects.create_user(
cls.linked_user = User.objects.create_user(
mobile="09125550001",
password="secret123",
first_name="Google",
last_name="Linked",
)
cls.email_matched_user = User.objects.create_user(
mobile="09125550002",
password="secret123",
first_name="Email",
last_name="Matched",
email="matched@example.com",
)
cls.blank_email_user = User.objects.create_user(
mobile="09125550003",
password="secret123",
first_name="Blank",
last_name="Email",
email=None,
)
cls.other_email_user = User.objects.create_user(
mobile="09125550004",
password="secret123",
first_name="Other",
last_name="Email",
email="other@example.com",
)
def setUp(self):
cache.clear()
@@ -493,20 +555,16 @@ class GoogleOAuthApiTests(APITestCase):
self,
exchange_code_for_google_profile,
):
exchange_code_for_google_profile.return_value = type(
"Profile",
(),
{
"provider_user_id": "google-sub-1",
"email": "linked@example.com",
"email_verified": True,
"first_name": "Google",
"last_name": "Linked",
"avatar_url": "https://example.com/avatar.png",
},
)()
exchange_code_for_google_profile.return_value = GoogleProfile(
provider_user_id="google-sub-1",
email="linked@example.com",
email_verified=True,
first_name="Google",
last_name="Linked",
avatar_url="https://example.com/avatar.png",
)
UserSocialAccount.objects.create(
user=self.user,
user=self.linked_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-sub-1",
email="linked@example.com",
@@ -537,18 +595,14 @@ class GoogleOAuthApiTests(APITestCase):
self,
exchange_code_for_google_profile,
):
exchange_code_for_google_profile.return_value = type(
"Profile",
(),
{
"provider_user_id": "google-sub-2",
"email": "new@example.com",
"email_verified": True,
"first_name": "New",
"last_name": "User",
"avatar_url": "https://example.com/new-avatar.png",
},
)()
exchange_code_for_google_profile.return_value = GoogleProfile(
provider_user_id="google-sub-2",
email="new@example.com",
email_verified=True,
first_name="New",
last_name="User",
avatar_url="https://example.com/new-avatar.png",
)
start_response = self.client.get("/api/users/oauth/google/start/")
state = start_response["Location"].split("state=", 1)[1].split("&", 1)[0]
@@ -563,38 +617,108 @@ class GoogleOAuthApiTests(APITestCase):
self.assertEqual(flow_response.status_code, 200)
self.assertEqual(flow_response.data["status"], "collect_mobile")
self.assertEqual(flow_response.data["email"], "new@example.com")
self.assertEqual(flow_response.data["resolution"], "new_account")
self.assertIsNone(flow_response.data["mobile_hint"])
@patch("apps.users.api.views.exchange_code_for_google_profile")
def test_google_callback_redirects_with_email_claim_flow_for_matching_email(
self,
exchange_code_for_google_profile,
):
exchange_code_for_google_profile.return_value = GoogleProfile(
provider_user_id="google-sub-email-match",
email="matched@example.com",
email_verified=True,
first_name="Email",
last_name="Matched",
avatar_url="https://example.com/matched.png",
)
start_response = self.client.get("/api/users/oauth/google/start/")
state = start_response["Location"].split("state=", 1)[1].split("&", 1)[0]
response = self.client.get(f"/api/users/oauth/google/callback/?state={state}&code=google-code")
flow = response["Location"].split("flow=", 1)[1]
flow_response = self.client.get(f"/api/users/oauth/google/flow/?flow={flow}")
self.assertEqual(flow_response.status_code, 200)
self.assertEqual(flow_response.data["status"], "collect_mobile")
self.assertEqual(flow_response.data["resolution"], "existing_email_claim")
self.assertEqual(flow_response.data["email"], "matched@example.com")
self.assertTrue(flow_response.data["mobile_hint"].startswith("09"))
@patch("apps.users.services.google_oauth.generate_and_send_otp")
def test_google_complete_existing_mobile_moves_flow_to_claim_required(self, generate_and_send_otp):
def test_google_complete_matching_mobile_on_email_claim_moves_flow_to_claim_required(
self,
generate_and_send_otp,
):
cache.set(
"google_oauth_flow:test-flow",
{
"status": "collect_mobile",
"google_profile": {
"provider_user_id": "google-sub-3",
"email": "existing@example.com",
"email": "matched@example.com",
"email_verified": True,
"first_name": "Existing",
"last_name": "User",
"first_name": "Email",
"last_name": "Matched",
"avatar_url": "https://example.com/existing.png",
},
"email": "existing@example.com",
"first_name": "Existing",
"last_name": "User",
"email": "matched@example.com",
"first_name": "Email",
"last_name": "Matched",
"avatar_url": "https://example.com/existing.png",
"resolution": "existing_email_claim",
"target_user_id": str(self.email_matched_user.id),
"mobile_hint": "09*****0002",
},
900,
)
response = self.client.post(
"/api/users/oauth/google/complete/",
{"flow": "test-flow", "mobile": self.user.mobile},
{"flow": "test-flow", "mobile": self.email_matched_user.mobile},
format="json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["status"], "claim_required")
generate_and_send_otp.assert_called_once_with(self.user.mobile, "login")
self.assertEqual(response.data["resolution"], "existing_email_claim")
generate_and_send_otp.assert_called_once_with(self.email_matched_user.mobile, "login")
def test_google_complete_wrong_mobile_on_email_claim_returns_conflict(self):
cache.set(
"google_oauth_flow:email-conflict-flow",
{
"status": "collect_mobile",
"google_profile": {
"provider_user_id": "google-sub-email-conflict",
"email": "matched@example.com",
"email_verified": True,
"first_name": "Email",
"last_name": "Matched",
"avatar_url": "https://example.com/existing.png",
},
"email": "matched@example.com",
"first_name": "Email",
"last_name": "Matched",
"avatar_url": "https://example.com/existing.png",
"resolution": "existing_email_claim",
"target_user_id": str(self.email_matched_user.id),
"mobile_hint": "09*****0002",
},
900,
)
response = self.client.post(
"/api/users/oauth/google/complete/",
{"flow": "email-conflict-flow", "mobile": self.other_email_user.mobile},
format="json",
)
self.assertEqual(response.status_code, 409)
self.assertEqual(response.data["code"], "google_email_mobile_conflict")
self.assertEqual(response.data["mobile_hint"], "09*****0002")
def test_google_complete_new_mobile_creates_user_and_link(self):
cache.set(
@@ -635,6 +759,78 @@ class GoogleOAuthApiTests(APITestCase):
).exists()
)
@patch("apps.users.services.google_oauth.generate_and_send_otp")
def test_google_complete_existing_blank_email_mobile_moves_flow_to_claim_required(
self,
generate_and_send_otp,
):
cache.set(
"google_oauth_flow:blank-email-flow",
{
"status": "collect_mobile",
"google_profile": {
"provider_user_id": "google-sub-blank",
"email": "blank-claim@example.com",
"email_verified": True,
"first_name": "Blank",
"last_name": "Claim",
"avatar_url": "https://example.com/blank.png",
},
"email": "blank-claim@example.com",
"first_name": "Blank",
"last_name": "Claim",
"avatar_url": "https://example.com/blank.png",
"resolution": "new_account",
"target_user_id": None,
"mobile_hint": None,
},
900,
)
response = self.client.post(
"/api/users/oauth/google/complete/",
{"flow": "blank-email-flow", "mobile": self.blank_email_user.mobile},
format="json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["status"], "claim_required")
self.assertEqual(response.data["resolution"], "existing_mobile_claim")
generate_and_send_otp.assert_called_once_with(self.blank_email_user.mobile, "login")
def test_google_complete_existing_non_empty_email_mobile_returns_conflict(self):
cache.set(
"google_oauth_flow:mobile-conflict-flow",
{
"status": "collect_mobile",
"google_profile": {
"provider_user_id": "google-sub-mobile-conflict",
"email": "new-google@example.com",
"email_verified": True,
"first_name": "New",
"last_name": "Google",
"avatar_url": "https://example.com/conflict.png",
},
"email": "new-google@example.com",
"first_name": "New",
"last_name": "Google",
"avatar_url": "https://example.com/conflict.png",
"resolution": "new_account",
"target_user_id": None,
"mobile_hint": None,
},
900,
)
response = self.client.post(
"/api/users/oauth/google/complete/",
{"flow": "mobile-conflict-flow", "mobile": self.other_email_user.mobile},
format="json",
)
self.assertEqual(response.status_code, 409)
self.assertEqual(response.data["code"], "google_mobile_belongs_to_other_email")
@patch("apps.users.api.views.send_google_claim_otp")
def test_google_claim_send_otp_endpoint_dispatches(self, send_google_claim_otp):
send_google_claim_otp.return_value = {"detail": "Verification code sent successfully."}
@@ -661,3 +857,110 @@ class GoogleOAuthApiTests(APITestCase):
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["access"], "a")
verify_google_claim.assert_called_once_with(flow="claim-flow", code="12345")
@patch("apps.users.services.google_oauth.get_tokens_for_user")
def test_google_claim_verify_sets_blank_user_email_from_google(self, get_tokens_for_user):
get_tokens_for_user.return_value = {"access": "a", "refresh": "r"}
cache.set(
"google_oauth_flow:claim-verify-flow",
{
"status": "claim_required",
"google_profile": {
"provider_user_id": "google-sub-verify",
"email": "claimed@example.com",
"email_verified": True,
"first_name": "Claimed",
"last_name": "User",
"avatar_url": "https://example.com/claim.png",
},
"mobile": self.blank_email_user.mobile,
"user_id": str(self.blank_email_user.id),
"resolution": "existing_mobile_claim",
"email": "claimed@example.com",
"mobile_hint": "09*****0003",
"detail": "claim",
},
900,
)
with patch("django_redis.get_redis_connection") as get_redis_connection:
redis_mock = get_redis_connection.return_value
redis_mock.get.return_value = b"12345"
response = self.client.post(
"/api/users/oauth/google/claim/verify/",
{"flow": "claim-verify-flow", "code": "12345"},
format="json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["status"], "authenticated")
self.blank_email_user.refresh_from_db()
self.assertEqual(self.blank_email_user.email, "claimed@example.com")
self.assertTrue(
UserSocialAccount.objects.filter(
user=self.blank_email_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-sub-verify",
).exists()
)
class GoogleOAuthAuditCommandTests(APITestCase):
def test_audit_google_social_links_reports_suspicious_links(self):
linked_user = User.objects.create_user(
mobile="09126660001",
password="secret123",
email="owner@example.com",
)
other_user = User.objects.create_user(
mobile="09126660002",
password="secret123",
email="shared@example.com",
)
third_user = User.objects.create_user(
mobile="09126660003",
password="secret123",
email=None,
)
UserSocialAccount.objects.create(
user=linked_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-audit-1",
email="different@example.com",
email_verified=True,
avatar_url="https://example.com/audit-1.png",
)
UserSocialAccount.objects.create(
user=third_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-audit-2",
email="shared@example.com",
email_verified=True,
avatar_url="https://example.com/audit-2.png",
)
UserSocialAccount.objects.create(
user=linked_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-audit-3",
email="duplicate@example.com",
email_verified=True,
avatar_url="https://example.com/audit-3.png",
)
UserSocialAccount.objects.create(
user=third_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-audit-4",
email="duplicate@example.com",
email_verified=True,
avatar_url="https://example.com/audit-4.png",
)
stdout = StringIO()
call_command("audit_google_social_links", stdout=stdout)
output = stdout.getvalue()
self.assertIn("linked_user_email_mismatch", output)
self.assertIn("provider_email_matches_other_user", output)
self.assertIn("duplicate_provider_email_across_users", output)

View File

@@ -15,6 +15,26 @@ from rest_framework.views import exception_handler as drf_exception_handler
logger = logging.getLogger(__name__)
def _extract_code(value: Any) -> str | None:
if isinstance(value, ErrorDetail):
return str(value.code) if getattr(value, "code", None) else None
if isinstance(value, list | tuple):
for item in value:
code = _extract_code(item)
if code:
return code
return None
if isinstance(value, dict):
for item in value.values():
code = _extract_code(item)
if code:
return code
return None
if isinstance(value, str):
return None
return getattr(value, "code", None)
def _flatten_messages(values: Iterable) -> list[str]:
items: list[str] = []
for value in values:
@@ -107,6 +127,13 @@ def exception_handler(exc, context) -> Response:
else None,
}
)
else:
code = _extract_code(detail)
if code:
payload["code"] = code
extra_payload = getattr(exc, "payload_extra", None)
if isinstance(extra_payload, dict):
payload.update(extra_payload)
formatted_response = Response(payload, status=status_code)
for header, value in response.headers.items():
formatted_response[header] = value