from __future__ import annotations import logging import secrets from dataclasses import asdict, dataclass, is_dataclass from typing import Any from urllib.parse import urlencode, urlparse import requests from django.conf import settings from django.core.cache import cache from django.core.files.base import ContentFile from rest_framework.exceptions import APIException, ValidationError from apps.users.email_identity import mask_mobile, normalize_email_identity from apps.users.models import User, UserSocialAccount from apps.users.services.auth import generate_and_send_otp, get_tokens_for_user GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" GOOGLE_USERINFO_URL = "https://openidconnect.googleapis.com/v1/userinfo" GOOGLE_STATE_TTL_SECONDS = 300 GOOGLE_FLOW_TTL_SECONDS = 900 GOOGLE_STATE_CACHE_PREFIX = "google_oauth_state" GOOGLE_FLOW_CACHE_PREFIX = "google_oauth_flow" logger = logging.getLogger(__name__) class GoogleOAuthFlowError(APIException): status_code = 409 default_detail = "Google sign-in could not be completed." default_code = "google_flow_error" def __init__( self, detail: str, code: str, *, status_code: int | None = None, extra: dict[str, Any] | None = None, ) -> None: if status_code is not None: self.status_code = status_code self.payload_extra = extra or {} super().__init__(detail=detail, code=code) @dataclass class GoogleProfile: provider_user_id: str email: str email_verified: bool first_name: str last_name: str avatar_url: str def _google_required_setting(name: str) -> str: value = getattr(settings, name, "") if not value: raise ValidationError({"detail": f"{name} is not configured."}) return value def _cache_key(prefix: str, token: str) -> str: return f"{prefix}:{token}" def _create_token() -> str: return secrets.token_urlsafe(32) def _invalid_flow_error(message: str = "Google sign-in flow is invalid or expired.") -> GoogleOAuthFlowError: return GoogleOAuthFlowError(message, "google_flow_invalid_state", status_code=400) def _ensure_flow_status(flow_payload: dict[str, Any], expected_status: str) -> None: if flow_payload.get("status") != expected_status: raise _invalid_flow_error("Google sign-in flow is in an unexpected state.") def _profile_to_payload(profile: GoogleProfile) -> dict[str, Any]: return asdict(profile) if is_dataclass(profile) else { "provider_user_id": profile.provider_user_id, "email": profile.email, "email_verified": profile.email_verified, "first_name": profile.first_name, "last_name": profile.last_name, "avatar_url": profile.avatar_url, } def _profile_from_flow(flow_payload: dict[str, Any]) -> GoogleProfile: google_profile = flow_payload.get("google_profile") if not isinstance(google_profile, dict): raise _invalid_flow_error("Google profile data is missing from the flow.") return GoogleProfile(**google_profile) def _find_user_by_email(email: str | None) -> User | None: normalized_email = normalize_email_identity(email) if normalized_email is None: return None return User.objects.filter(email=normalized_email).first() def _normalize_mobile(mobile: str) -> str: normalized = "".join(ch for ch in mobile if ch.isdigit()) if len(normalized) != 11 or not normalized.startswith("09"): raise ValidationError({"mobile": "Invalid mobile number."}) return normalized def _avatar_file_extension(profile: GoogleProfile) -> str: path = urlparse(profile.avatar_url or "").path if "." in path: suffix = path.rsplit(".", 1)[-1].lower() if suffix in {"jpg", "jpeg", "png", "webp", "gif"}: return suffix return "jpg" def sync_user_from_google_profile(user: User, profile: GoogleProfile) -> None: update_fields: list[str] = [] if not user.first_name and profile.first_name: user.first_name = profile.first_name update_fields.append("first_name") if not user.last_name and profile.last_name: user.last_name = profile.last_name update_fields.append("last_name") if normalize_email_identity(user.email) is None and profile.email: user.email = profile.email update_fields.append("email") if not user.profile_picture and profile.avatar_url: try: avatar_response = requests.get(profile.avatar_url, timeout=10) avatar_response.raise_for_status() except requests.RequestException: avatar_response = None if avatar_response and avatar_response.content: filename = f"google-{profile.provider_user_id}.{_avatar_file_extension(profile)}" user.profile_picture.save(filename, ContentFile(avatar_response.content), save=False) update_fields.append("profile_picture") if update_fields: user.save(update_fields=update_fields) def _build_claim_required_payload( *, profile: GoogleProfile, user: User, mobile: str, resolution: str, detail: str, ) -> dict[str, Any]: return { "status": "claim_required", "google_profile": _profile_to_payload(profile), "mobile": mobile, "user_id": str(user.id), "resolution": resolution, "email": profile.email, "mobile_hint": mask_mobile(user.mobile), "detail": detail, } def _create_user_and_social_account_from_google_profile(*, mobile: str, profile: GoogleProfile) -> User: user = User.objects.create_user( mobile=mobile, password=None, first_name=profile.first_name, last_name=profile.last_name, email=profile.email, is_verified=True, is_active=True, ) user.set_unusable_password() user.save(update_fields=["password"]) sync_user_from_google_profile(user, profile) UserSocialAccount.objects.create( user=user, provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id, email=profile.email, email_verified=profile.email_verified, avatar_url=profile.avatar_url, is_active=True, ) return user def _build_public_google_flow_payload(flow_payload: dict[str, Any]) -> dict[str, Any]: status = flow_payload.get("status") if status == "authenticated": return { "status": "authenticated", "access": flow_payload.get("access", ""), "refresh": flow_payload.get("refresh", ""), } if status == "collect_mobile": return { "status": "collect_mobile", "email": flow_payload.get("email", ""), "first_name": flow_payload.get("first_name", ""), "last_name": flow_payload.get("last_name", ""), "avatar_url": flow_payload.get("avatar_url", ""), "resolution": flow_payload.get("resolution", "new_account"), "mobile_hint": flow_payload.get("mobile_hint"), } if status == "claim_required": return { "status": "claim_required", "mobile": flow_payload.get("mobile", ""), "detail": flow_payload.get("detail", ""), "resolution": flow_payload.get("resolution", "existing_mobile_claim"), "email": flow_payload.get("email", ""), "mobile_hint": flow_payload.get("mobile_hint"), } raise _invalid_flow_error("Google sign-in flow is in an unknown state.") def create_google_state() -> str: state = _create_token() cache.set(_cache_key(GOOGLE_STATE_CACHE_PREFIX, state), {"valid": True}, GOOGLE_STATE_TTL_SECONDS) return state def consume_google_state(state: str) -> dict[str, Any]: if not state: raise ValidationError({"detail": "Missing OAuth state."}) key = _cache_key(GOOGLE_STATE_CACHE_PREFIX, state) payload = cache.get(key) cache.delete(key) if not payload: raise ValidationError({"detail": "Invalid or expired OAuth state."}) return payload def create_google_flow(payload: dict[str, Any]) -> str: flow = _create_token() cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS) return flow def get_google_flow_payload(flow: str) -> dict[str, Any]: payload = cache.get(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow)) if not payload: raise _invalid_flow_error() return payload def get_google_flow(flow: str) -> dict[str, Any]: return _build_public_google_flow_payload(get_google_flow_payload(flow)) def delete_google_flow(flow: str) -> None: cache.delete(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow)) def update_google_flow(flow: str, payload: dict[str, Any]) -> None: cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS) def build_google_authorization_url() -> str: state = create_google_state() params = { "client_id": _google_required_setting("GOOGLE_OAUTH_CLIENT_ID"), "redirect_uri": _google_required_setting("GOOGLE_OAUTH_REDIRECT_URI"), "response_type": "code", "scope": "openid email profile", "state": state, "access_type": "online", "prompt": "select_account", } return f"{GOOGLE_AUTH_URL}?{urlencode(params)}" def exchange_code_for_google_profile(code: str) -> GoogleProfile: if not code: raise ValidationError({"detail": "Missing Google authorization code."}) try: token_response = requests.post( GOOGLE_TOKEN_URL, data={ "code": code, "client_id": _google_required_setting("GOOGLE_OAUTH_CLIENT_ID"), "client_secret": _google_required_setting("GOOGLE_OAUTH_CLIENT_SECRET"), "redirect_uri": _google_required_setting("GOOGLE_OAUTH_REDIRECT_URI"), "grant_type": "authorization_code", }, timeout=10, ) token_response.raise_for_status() token_payload = token_response.json() except requests.RequestException as exc: response = getattr(exc, "response", None) logger.warning( "Google token exchange failed", extra={ "google_status_code": getattr(response, "status_code", None), "google_response_text": getattr(response, "text", "")[:1000] if response is not None else "", "google_redirect_uri": getattr(settings, "GOOGLE_OAUTH_REDIRECT_URI", ""), }, exc_info=True, ) raise ValidationError({"detail": "Google token exchange failed."}) from exc access_token = token_payload.get("access_token") if not access_token: raise ValidationError({"detail": "Google did not return an access token."}) try: userinfo_response = requests.get( GOOGLE_USERINFO_URL, headers={"Authorization": f"Bearer {access_token}"}, timeout=10, ) userinfo_response.raise_for_status() userinfo = userinfo_response.json() except requests.RequestException as exc: response = getattr(exc, "response", None) logger.warning( "Google user profile lookup failed", extra={ "google_status_code": getattr(response, "status_code", None), "google_response_text": getattr(response, "text", "")[:1000] if response is not None else "", }, exc_info=True, ) raise ValidationError({"detail": "Google user profile lookup failed."}) from exc provider_user_id = userinfo.get("sub", "") email = normalize_email_identity(userinfo.get("email")) email_verified = bool(userinfo.get("email_verified")) if not provider_user_id or not email or not email_verified: raise ValidationError({"detail": "Google account must have a verified email address."}) return GoogleProfile( provider_user_id=provider_user_id, email=email, email_verified=email_verified, first_name=userinfo.get("given_name", "") or "", last_name=userinfo.get("family_name", "") or "", avatar_url=userinfo.get("picture", "") or "", ) def get_frontend_google_callback_url() -> str: return _google_required_setting("GOOGLE_OAUTH_FRONTEND_CALLBACK_URL") def build_google_callback_redirect_url(flow: str) -> str: return f"{get_frontend_google_callback_url()}?flow={flow}" def build_google_callback_error_redirect_url(*, code: str, detail: str) -> str: params = urlencode( { "error": code, "error_description": detail, } ) return f"{get_frontend_google_callback_url()}?{params}" def find_social_account_for_profile(profile: GoogleProfile) -> UserSocialAccount | None: return ( UserSocialAccount.objects.select_related("user") .filter( provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id, ) .first() ) def build_authenticated_flow_payload(user: User) -> dict[str, Any]: tokens = get_tokens_for_user(user) return { "status": "authenticated", "access": tokens["access"], "refresh": tokens["refresh"], } def build_pending_google_flow_payload(profile: GoogleProfile) -> dict[str, Any]: existing_email_user = _find_user_by_email(profile.email) return { "status": "collect_mobile", "google_profile": _profile_to_payload(profile), "email": profile.email, "first_name": profile.first_name, "last_name": profile.last_name, "avatar_url": profile.avatar_url, "resolution": "existing_email_claim" if existing_email_user else "new_account", "target_user_id": str(existing_email_user.id) if existing_email_user else None, "mobile_hint": mask_mobile(existing_email_user.mobile) if existing_email_user else None, } def complete_google_signup(flow: str, mobile: str) -> dict[str, Any]: flow_payload = get_google_flow_payload(flow) _ensure_flow_status(flow_payload, "collect_mobile") normalized_mobile = _normalize_mobile(mobile) profile = _profile_from_flow(flow_payload) email_matched_user = None target_user_id = flow_payload.get("target_user_id") if target_user_id: email_matched_user = User.objects.filter(id=target_user_id).first() if email_matched_user is None: raise _invalid_flow_error("The Google sign-in target account could not be found.") existing_mobile_user = User.objects.filter(mobile=normalized_mobile).first() if email_matched_user is not None: if normalized_mobile != email_matched_user.mobile: raise GoogleOAuthFlowError( "This Google account must be verified with the mobile number already attached to the matching account.", "google_email_mobile_conflict", extra={"mobile_hint": mask_mobile(email_matched_user.mobile)}, ) generate_and_send_otp(normalized_mobile, "login") claim_payload = _build_claim_required_payload( profile=profile, user=email_matched_user, mobile=normalized_mobile, resolution="existing_email_claim", detail="Existing account found for this email. Verify the linked mobile number to attach Google.", ) update_google_flow(flow, claim_payload) return _build_public_google_flow_payload(claim_payload) if existing_mobile_user is not None: existing_mobile_email = normalize_email_identity(existing_mobile_user.email) if existing_mobile_email: raise GoogleOAuthFlowError( "This mobile number already belongs to another account with a different email address.", "google_mobile_belongs_to_other_email", ) generate_and_send_otp(normalized_mobile, "login") claim_payload = _build_claim_required_payload( profile=profile, user=existing_mobile_user, mobile=normalized_mobile, resolution="existing_mobile_claim", detail=( "Existing mobile account found. Verify ownership to attach " "Google and set the verified email address." ), ) update_google_flow(flow, claim_payload) return _build_public_google_flow_payload(claim_payload) generate_and_send_otp(normalized_mobile, "register") signup_payload = { "status": "claim_required", "google_profile": _profile_to_payload(profile), "mobile": normalized_mobile, "user_id": None, "resolution": "new_account", "email": profile.email, "mobile_hint": None, "detail": "Verify this mobile number to finish creating your account with Google.", } update_google_flow(flow, signup_payload) return _build_public_google_flow_payload(signup_payload) def _verify_otp_code(*, mobile: str, code: str) -> None: from django_redis import get_redis_connection redis_conn = get_redis_connection("default") stored_code = redis_conn.get(f"verification_code:{mobile}") if not stored_code or stored_code.decode("utf-8") != code: raise ValidationError({"code": "Invalid or expired verification code."}) redis_conn.delete(f"verification_code:{mobile}") def _create_new_google_user_after_otp(*, mobile: str, profile: GoogleProfile) -> User: existing_email_user = _find_user_by_email(profile.email) if existing_email_user is not None: raise GoogleOAuthFlowError( "This Google email is already attached to an existing account.", "google_email_already_claimed", extra={"mobile_hint": mask_mobile(existing_email_user.mobile)}, ) existing_mobile_user = User.objects.filter(mobile=mobile).first() if existing_mobile_user is not None: existing_mobile_email = normalize_email_identity(existing_mobile_user.email) if existing_mobile_email: raise GoogleOAuthFlowError( "This mobile number already belongs to another account with a different email address.", "google_mobile_belongs_to_other_email", ) raise GoogleOAuthFlowError( "This mobile number is no longer available for creating a new Google account.", "google_flow_invalid_state", status_code=409, ) existing_link = find_social_account_for_profile(profile) if existing_link is not None: raise GoogleOAuthFlowError( "This Google account is already attached to another user.", "google_email_already_claimed", ) return _create_user_and_social_account_from_google_profile(mobile=mobile, profile=profile) def send_google_claim_otp(flow: str) -> dict[str, Any]: flow_payload = get_google_flow_payload(flow) _ensure_flow_status(flow_payload, "claim_required") mobile = flow_payload.get("mobile") if not isinstance(mobile, str) or not mobile: raise _invalid_flow_error("Claim mobile number is missing.") resolution = flow_payload.get("resolution") otp_mode = "register" if resolution == "new_account" else "login" generate_and_send_otp(mobile, otp_mode) return {"detail": "Verification code sent successfully."} def verify_google_claim(flow: str, code: str) -> dict[str, Any]: flow_payload = get_google_flow_payload(flow) _ensure_flow_status(flow_payload, "claim_required") mobile = flow_payload.get("mobile") if not isinstance(mobile, str) or not mobile: raise _invalid_flow_error("Claim mobile number is missing.") profile = _profile_from_flow(flow_payload) user_id = flow_payload.get("user_id") resolution = flow_payload.get("resolution") _verify_otp_code(mobile=mobile, code=code) if resolution == "new_account": user = _create_new_google_user_after_otp(mobile=mobile, profile=profile) authenticated_payload = build_authenticated_flow_payload(user) update_google_flow(flow, authenticated_payload) return authenticated_payload user = User.objects.filter(id=user_id, mobile=mobile).first() if not user: raise _invalid_flow_error("Target account could not be found.") user_email = normalize_email_identity(user.email) if resolution == "existing_email_claim" and user_email != profile.email: raise GoogleOAuthFlowError( "The matching email account could not be verified for this mobile number.", "google_email_mobile_conflict", extra={"mobile_hint": mask_mobile(user.mobile)}, ) if resolution == "existing_mobile_claim" and user_email not in (None, profile.email): raise GoogleOAuthFlowError( "This mobile number already belongs to another account with a different email address.", "google_mobile_belongs_to_other_email", ) existing_link = find_social_account_for_profile(profile) if existing_link and existing_link.user_id != user.id: raise GoogleOAuthFlowError( "This Google account is already attached to another user.", "google_email_already_claimed", ) if user_email is None: user.email = profile.email user.save(update_fields=["email"]) sync_user_from_google_profile(user, profile) if existing_link: existing_link.email = profile.email existing_link.email_verified = profile.email_verified existing_link.avatar_url = profile.avatar_url existing_link.is_active = True existing_link.save( update_fields=["email", "email_verified", "avatar_url", "is_active", "updated_at"] ) else: UserSocialAccount.objects.create( user=user, provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id, email=profile.email, email_verified=profile.email_verified, avatar_url=profile.avatar_url, is_active=True, ) authenticated_payload = build_authenticated_flow_payload(user) update_google_flow(flow, authenticated_payload) return authenticated_payload