Compare commits

..

3 Commits

Author SHA1 Message Date
0fea265cfb test(users): cover google signup otp gating
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
2026-05-14 23:24:09 +03:30
4a6f6a08fb fix(users): require otp verification before google signup 2026-05-14 23:24:09 +03:30
837f5bb49e feat(admin): manage user social account links 2026-05-14 23:00:11 +03:30
3 changed files with 244 additions and 56 deletions

View File

@@ -3,6 +3,7 @@ from django.contrib.admin.helpers import ACTION_CHECKBOX_NAME
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django.contrib.auth.forms import SetPasswordForm from django.contrib.auth.forms import SetPasswordForm
from django.db import transaction
from django.shortcuts import redirect from django.shortcuts import redirect
from django.template.response import TemplateResponse from django.template.response import TemplateResponse
from django.urls import reverse_lazy from django.urls import reverse_lazy
@@ -11,6 +12,7 @@ from unfold.decorators import action as unfold_action
from core.admins.base import BaseAdmin, SoftDeleteListFilter from core.admins.base import BaseAdmin, SoftDeleteListFilter
from apps.users.models import UserSocialAccount
from apps.users.services.forms import CustomUserChangeForm, CustomUserCreationForm from apps.users.services.forms import CustomUserChangeForm, CustomUserCreationForm
User = get_user_model() User = get_user_model()
@@ -21,6 +23,34 @@ class UserResource(resources.ModelResource):
model = User model = User
class UserSocialAccountInline(admin.TabularInline):
model = UserSocialAccount
fk_name = "user"
extra = 0
autocomplete_fields = ("user",)
fields = (
"provider",
"provider_user_id",
"email",
"email_verified",
"avatar_url",
"is_deleted",
"created_at",
"updated_at",
)
readonly_fields = (
"provider",
"provider_user_id",
"email",
"email_verified",
"avatar_url",
"is_deleted",
"created_at",
"updated_at",
)
show_change_link = True
@admin.register(User) @admin.register(User)
class CustomUserAdmin(BaseUserAdmin, BaseAdmin): class CustomUserAdmin(BaseUserAdmin, BaseAdmin):
model = User model = User
@@ -136,6 +166,7 @@ class CustomUserAdmin(BaseUserAdmin, BaseAdmin):
), ),
) )
filter_horizontal = ("groups", "user_permissions") filter_horizontal = ("groups", "user_permissions")
inlines = (UserSocialAccountInline,)
actions_row = [ actions_row = [
"reset_password_action", "reset_password_action",
@@ -181,3 +212,53 @@ class CustomUserAdmin(BaseUserAdmin, BaseAdmin):
@admin.action(description="Deactivate selected users") @admin.action(description="Deactivate selected users")
def deactivate_users(self, request, queryset): def deactivate_users(self, request, queryset):
queryset.update(is_active=False) queryset.update(is_active=False)
@admin.register(UserSocialAccount)
class UserSocialAccountAdmin(BaseAdmin):
list_display = (
"provider",
"provider_user_id",
"user",
"email",
"email_verified",
"created_at",
"is_deleted",
)
search_fields = (
"provider_user_id",
"email",
"user__mobile",
"user__first_name",
"user__last_name",
)
list_filter = (
SoftDeleteListFilter,
"provider",
"email_verified",
"is_deleted",
"created_at",
)
readonly_fields = (
"id",
"created_at",
"updated_at",
"deleted_at",
)
autocomplete_fields = ("user",)
actions = (
"unlink_selected",
"hard_delete_selected",
"restore_selected",
)
@admin.action(description="Unlink selected social accounts")
def unlink_selected(self, request, queryset):
count = queryset.count()
with transaction.atomic():
queryset.hard_delete()
self.message_user(
request,
f"{count} social account link(s) permanently removed.",
level=messages.SUCCESS,
)

View File

@@ -173,6 +173,32 @@ def _build_claim_required_payload(
} }
def _create_user_and_social_account_from_google_profile(*, mobile: str, profile: GoogleProfile) -> User:
user = User.objects.create_user(
mobile=mobile,
password=None,
first_name=profile.first_name,
last_name=profile.last_name,
email=profile.email,
is_verified=True,
is_active=True,
)
user.set_unusable_password()
user.save(update_fields=["password"])
sync_user_from_google_profile(user, profile)
UserSocialAccount.objects.create(
user=user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id=profile.provider_user_id,
email=profile.email,
email_verified=profile.email_verified,
avatar_url=profile.avatar_url,
is_active=True,
)
return user
def _build_public_google_flow_payload(flow_payload: dict[str, Any]) -> dict[str, Any]: def _build_public_google_flow_payload(flow_payload: dict[str, Any]) -> dict[str, Any]:
status = flow_payload.get("status") status = flow_payload.get("status")
if status == "authenticated": if status == "authenticated":
@@ -410,32 +436,62 @@ def complete_google_signup(flow: str, mobile: str) -> dict[str, Any]:
update_google_flow(flow, claim_payload) update_google_flow(flow, claim_payload)
return _build_public_google_flow_payload(claim_payload) return _build_public_google_flow_payload(claim_payload)
user = User.objects.create_user( generate_and_send_otp(normalized_mobile, "register")
mobile=normalized_mobile, signup_payload = {
password=None, "status": "claim_required",
first_name=profile.first_name, "google_profile": _profile_to_payload(profile),
last_name=profile.last_name, "mobile": normalized_mobile,
email=profile.email, "user_id": None,
is_verified=False, "resolution": "new_account",
is_active=True, "email": profile.email,
) "mobile_hint": None,
user.set_unusable_password() "detail": "Verify this mobile number to finish creating your account with Google.",
user.save(update_fields=["password"]) }
sync_user_from_google_profile(user, profile) update_google_flow(flow, signup_payload)
return _build_public_google_flow_payload(signup_payload)
UserSocialAccount.objects.create(
user=user, def _verify_otp_code(*, mobile: str, code: str) -> None:
provider=UserSocialAccount.ProviderType.GOOGLE, from django_redis import get_redis_connection
provider_user_id=profile.provider_user_id,
email=profile.email, redis_conn = get_redis_connection("default")
email_verified=profile.email_verified, stored_code = redis_conn.get(f"verification_code:{mobile}")
avatar_url=profile.avatar_url, if not stored_code or stored_code.decode("utf-8") != code:
is_active=True, raise ValidationError({"code": "Invalid or expired verification code."})
redis_conn.delete(f"verification_code:{mobile}")
def _create_new_google_user_after_otp(*, mobile: str, profile: GoogleProfile) -> User:
existing_email_user = _find_user_by_email(profile.email)
if existing_email_user is not None:
raise GoogleOAuthFlowError(
"This Google email is already attached to an existing account.",
"google_email_already_claimed",
extra={"mobile_hint": mask_mobile(existing_email_user.mobile)},
) )
authenticated_payload = build_authenticated_flow_payload(user) existing_mobile_user = User.objects.filter(mobile=mobile).first()
update_google_flow(flow, authenticated_payload) if existing_mobile_user is not None:
return authenticated_payload existing_mobile_email = normalize_email_identity(existing_mobile_user.email)
if existing_mobile_email:
raise GoogleOAuthFlowError(
"This mobile number already belongs to another account with a different email address.",
"google_mobile_belongs_to_other_email",
)
raise GoogleOAuthFlowError(
"This mobile number is no longer available for creating a new Google account.",
"google_flow_invalid_state",
status_code=409,
)
existing_link = find_social_account_for_profile(profile)
if existing_link is not None:
raise GoogleOAuthFlowError(
"This Google account is already attached to another user.",
"google_email_already_claimed",
)
return _create_user_and_social_account_from_google_profile(mobile=mobile, profile=profile)
def send_google_claim_otp(flow: str) -> dict[str, Any]: def send_google_claim_otp(flow: str) -> dict[str, Any]:
@@ -446,13 +502,13 @@ def send_google_claim_otp(flow: str) -> dict[str, Any]:
if not isinstance(mobile, str) or not mobile: if not isinstance(mobile, str) or not mobile:
raise _invalid_flow_error("Claim mobile number is missing.") raise _invalid_flow_error("Claim mobile number is missing.")
generate_and_send_otp(mobile, "login") resolution = flow_payload.get("resolution")
otp_mode = "register" if resolution == "new_account" else "login"
generate_and_send_otp(mobile, otp_mode)
return {"detail": "Verification code sent successfully."} return {"detail": "Verification code sent successfully."}
def verify_google_claim(flow: str, code: str) -> dict[str, Any]: def verify_google_claim(flow: str, code: str) -> dict[str, Any]:
from django_redis import get_redis_connection
flow_payload = get_google_flow_payload(flow) flow_payload = get_google_flow_payload(flow)
_ensure_flow_status(flow_payload, "claim_required") _ensure_flow_status(flow_payload, "claim_required")
@@ -462,18 +518,19 @@ def verify_google_claim(flow: str, code: str) -> dict[str, Any]:
profile = _profile_from_flow(flow_payload) profile = _profile_from_flow(flow_payload)
user_id = flow_payload.get("user_id") user_id = flow_payload.get("user_id")
resolution = flow_payload.get("resolution")
_verify_otp_code(mobile=mobile, code=code)
if resolution == "new_account":
user = _create_new_google_user_after_otp(mobile=mobile, profile=profile)
authenticated_payload = build_authenticated_flow_payload(user)
update_google_flow(flow, authenticated_payload)
return authenticated_payload
user = User.objects.filter(id=user_id, mobile=mobile).first() user = User.objects.filter(id=user_id, mobile=mobile).first()
if not user: if not user:
raise _invalid_flow_error("Target account could not be found.") raise _invalid_flow_error("Target account could not be found.")
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
if not stored_code or stored_code.decode("utf-8") != code:
raise ValidationError({"code": "Invalid or expired verification code."})
redis_conn.delete(f"verification_code:{mobile}")
resolution = flow_payload.get("resolution")
user_email = normalize_email_identity(user.email) user_email = normalize_email_identity(user.email)
if resolution == "existing_email_claim" and user_email != profile.email: if resolution == "existing_email_claim" and user_email != profile.email:

View File

@@ -733,13 +733,11 @@ class GoogleOAuthApiTests(APITestCase):
self.assertEqual(response.data["code"], "google_email_mobile_conflict") self.assertEqual(response.data["code"], "google_email_mobile_conflict")
self.assertEqual(response.data["mobile_hint"], "09*****0002") self.assertEqual(response.data["mobile_hint"], "09*****0002")
@patch("apps.users.services.google_oauth.requests.get") @patch("apps.users.services.google_oauth.generate_and_send_otp")
def test_google_complete_new_mobile_creates_user_and_link(self, requests_get): def test_google_complete_new_mobile_moves_flow_to_claim_required_without_creating_user(
avatar_response = Mock() self,
avatar_response.content = b"avatar-bytes" generate_and_send_otp,
avatar_response.headers = {"Content-Type": "image/png"} ):
avatar_response.raise_for_status.return_value = None
requests_get.return_value = avatar_response
cache.set( cache.set(
"google_oauth_flow:new-flow", "google_oauth_flow:new-flow",
{ {
@@ -767,20 +765,10 @@ class GoogleOAuthApiTests(APITestCase):
) )
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["status"], "authenticated") self.assertEqual(response.data["status"], "claim_required")
created_user = User.objects.get(mobile="09125550009") self.assertEqual(response.data["resolution"], "new_account")
self.assertFalse(created_user.has_usable_password()) self.assertFalse(User.objects.filter(mobile="09125550009").exists())
self.assertEqual(created_user.email, "created@example.com") generate_and_send_otp.assert_called_once_with("09125550009", "register")
self.assertEqual(created_user.first_name, "Created")
self.assertEqual(created_user.last_name, "User")
self.assertTrue(bool(created_user.profile_picture))
self.assertTrue(
UserSocialAccount.objects.filter(
user=created_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-sub-4",
).exists()
)
@patch("apps.users.services.google_oauth.generate_and_send_otp") @patch("apps.users.services.google_oauth.generate_and_send_otp")
def test_google_complete_existing_blank_email_mobile_moves_flow_to_claim_required( def test_google_complete_existing_blank_email_mobile_moves_flow_to_claim_required(
@@ -937,6 +925,68 @@ class GoogleOAuthApiTests(APITestCase):
).exists() ).exists()
) )
@patch("apps.users.services.google_oauth.requests.get")
@patch("apps.users.services.google_oauth.get_tokens_for_user")
def test_google_claim_verify_creates_new_user_only_after_otp_confirmation(
self,
get_tokens_for_user,
requests_get,
):
get_tokens_for_user.return_value = {"access": "a", "refresh": "r"}
avatar_response = Mock()
avatar_response.content = b"avatar-bytes"
avatar_response.headers = {"Content-Type": "image/png"}
avatar_response.raise_for_status.return_value = None
requests_get.return_value = avatar_response
cache.set(
"google_oauth_flow:new-claim-verify-flow",
{
"status": "claim_required",
"google_profile": {
"provider_user_id": "google-sub-new-verify",
"email": "new-verified@example.com",
"email_verified": True,
"first_name": "Verified",
"last_name": "Signup",
"avatar_url": "https://example.com/new-verify.png",
},
"mobile": "09125550010",
"user_id": None,
"resolution": "new_account",
"email": "new-verified@example.com",
"mobile_hint": None,
"detail": "claim",
},
900,
)
with patch("django_redis.get_redis_connection") as get_redis_connection:
redis_mock = get_redis_connection.return_value
redis_mock.get.return_value = b"12345"
response = self.client.post(
"/api/users/oauth/google/claim/verify/",
{"flow": "new-claim-verify-flow", "code": "12345"},
format="json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["status"], "authenticated")
created_user = User.objects.get(mobile="09125550010")
self.assertTrue(created_user.is_verified)
self.assertFalse(created_user.has_usable_password())
self.assertEqual(created_user.email, "new-verified@example.com")
self.assertEqual(created_user.first_name, "Verified")
self.assertEqual(created_user.last_name, "Signup")
self.assertTrue(bool(created_user.profile_picture))
self.assertTrue(
UserSocialAccount.objects.filter(
user=created_user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id="google-sub-new-verify",
).exists()
)
class GoogleOAuthAuditCommandTests(APITestCase): class GoogleOAuthAuditCommandTests(APITestCase):
def test_audit_google_social_links_reports_suspicious_links(self): def test_audit_google_social_links_reports_suspicious_links(self):