from __future__ import annotations import secrets from dataclasses import asdict, dataclass, is_dataclass from typing import Any from urllib.parse import urlencode import requests from django.conf import settings from django.core.cache import cache from rest_framework.exceptions import ValidationError from apps.users.models import User, UserSocialAccount from apps.users.services.auth import generate_and_send_otp, get_tokens_for_user GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" GOOGLE_USERINFO_URL = "https://openidconnect.googleapis.com/v1/userinfo" GOOGLE_STATE_TTL_SECONDS = 300 GOOGLE_FLOW_TTL_SECONDS = 900 GOOGLE_STATE_CACHE_PREFIX = "google_oauth_state" GOOGLE_FLOW_CACHE_PREFIX = "google_oauth_flow" @dataclass class GoogleProfile: provider_user_id: str email: str email_verified: bool first_name: str last_name: str avatar_url: str def _google_required_setting(name: str) -> str: value = getattr(settings, name, "") if not value: raise ValidationError({"detail": f"{name} is not configured."}) return value def _cache_key(prefix: str, token: str) -> str: return f"{prefix}:{token}" def _create_token() -> str: return secrets.token_urlsafe(32) def create_google_state() -> str: state = _create_token() cache.set(_cache_key(GOOGLE_STATE_CACHE_PREFIX, state), {"valid": True}, GOOGLE_STATE_TTL_SECONDS) return state def consume_google_state(state: str) -> dict[str, Any]: if not state: raise ValidationError({"detail": "Missing OAuth state."}) key = _cache_key(GOOGLE_STATE_CACHE_PREFIX, state) payload = cache.get(key) cache.delete(key) if not payload: raise ValidationError({"detail": "Invalid or expired OAuth state."}) return payload def create_google_flow(payload: dict[str, Any]) -> str: flow = _create_token() cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS) return flow def get_google_flow(flow: str) -> dict[str, Any]: payload = cache.get(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow)) if not payload: raise ValidationError({"detail": "Google sign-in flow is invalid or expired."}) return payload def delete_google_flow(flow: str) -> None: cache.delete(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow)) def update_google_flow(flow: str, payload: dict[str, Any]) -> None: cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS) def build_google_authorization_url() -> str: state = create_google_state() params = { "client_id": _google_required_setting("GOOGLE_OAUTH_CLIENT_ID"), "redirect_uri": _google_required_setting("GOOGLE_OAUTH_REDIRECT_URI"), "response_type": "code", "scope": "openid email profile", "state": state, "access_type": "online", "prompt": "select_account", } return f"{GOOGLE_AUTH_URL}?{urlencode(params)}" def exchange_code_for_google_profile(code: str) -> GoogleProfile: if not code: raise ValidationError({"detail": "Missing Google authorization code."}) try: token_response = requests.post( GOOGLE_TOKEN_URL, data={ "code": code, "client_id": _google_required_setting("GOOGLE_OAUTH_CLIENT_ID"), "client_secret": _google_required_setting("GOOGLE_OAUTH_CLIENT_SECRET"), "redirect_uri": _google_required_setting("GOOGLE_OAUTH_REDIRECT_URI"), "grant_type": "authorization_code", }, timeout=10, ) token_response.raise_for_status() token_payload = token_response.json() except requests.RequestException as exc: raise ValidationError({"detail": "Google token exchange failed."}) from exc access_token = token_payload.get("access_token") if not access_token: raise ValidationError({"detail": "Google did not return an access token."}) try: userinfo_response = requests.get( GOOGLE_USERINFO_URL, headers={"Authorization": f"Bearer {access_token}"}, timeout=10, ) userinfo_response.raise_for_status() userinfo = userinfo_response.json() except requests.RequestException as exc: raise ValidationError({"detail": "Google user profile lookup failed."}) from exc provider_user_id = userinfo.get("sub", "") email = userinfo.get("email", "") email_verified = bool(userinfo.get("email_verified")) if not provider_user_id or not email or not email_verified: raise ValidationError({"detail": "Google account must have a verified email address."}) return GoogleProfile( provider_user_id=provider_user_id, email=email, email_verified=email_verified, first_name=userinfo.get("given_name", "") or "", last_name=userinfo.get("family_name", "") or "", avatar_url=userinfo.get("picture", "") or "", ) def get_frontend_google_callback_url() -> str: return _google_required_setting("GOOGLE_OAUTH_FRONTEND_CALLBACK_URL") def build_google_callback_redirect_url(flow: str) -> str: return f"{get_frontend_google_callback_url()}?flow={flow}" def find_social_account_for_profile(profile: GoogleProfile) -> UserSocialAccount | None: return ( UserSocialAccount.objects.select_related("user") .filter( provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id, ) .first() ) def build_authenticated_flow_payload(user: User) -> dict[str, Any]: tokens = get_tokens_for_user(user) return { "status": "authenticated", "access": tokens["access"], "refresh": tokens["refresh"], } def build_pending_google_flow_payload(profile: GoogleProfile) -> dict[str, Any]: profile_payload = asdict(profile) if is_dataclass(profile) else { "provider_user_id": profile.provider_user_id, "email": profile.email, "email_verified": profile.email_verified, "first_name": profile.first_name, "last_name": profile.last_name, "avatar_url": profile.avatar_url, } return { "status": "collect_mobile", "google_profile": profile_payload, "email": profile.email, "first_name": profile.first_name, "last_name": profile.last_name, "avatar_url": profile.avatar_url, } def _profile_from_flow(flow_payload: dict[str, Any]) -> GoogleProfile: google_profile = flow_payload.get("google_profile") if not isinstance(google_profile, dict): raise ValidationError({"detail": "Google profile is missing from the flow."}) return GoogleProfile(**google_profile) def _normalize_mobile(mobile: str) -> str: normalized = "".join(ch for ch in mobile if ch.isdigit()) if len(normalized) != 11 or not normalized.startswith("09"): raise ValidationError({"mobile": "Invalid mobile number."}) return normalized def complete_google_signup(flow: str, mobile: str) -> dict[str, Any]: flow_payload = get_google_flow(flow) if flow_payload.get("status") != "collect_mobile": raise ValidationError({"detail": "Google sign-in flow is not ready for mobile completion."}) normalized_mobile = _normalize_mobile(mobile) profile = _profile_from_flow(flow_payload) existing_user = User.objects.filter(mobile=normalized_mobile).first() if existing_user: generate_and_send_otp(normalized_mobile, "login") claim_payload = { "status": "claim_required", "google_profile": asdict(profile), "mobile": normalized_mobile, "user_id": str(existing_user.id), } update_google_flow(flow, claim_payload) return { "status": "claim_required", "mobile": normalized_mobile, "detail": "Existing account found. Verify ownership to attach Google.", } user = User.objects.create_user( mobile=normalized_mobile, password=None, first_name=profile.first_name, last_name=profile.last_name, email=profile.email, is_verified=False, is_active=True, ) user.set_unusable_password() user.save(update_fields=["password"]) UserSocialAccount.objects.create( user=user, provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id, email=profile.email, email_verified=profile.email_verified, avatar_url=profile.avatar_url, is_active=True, ) authenticated_payload = build_authenticated_flow_payload(user) update_google_flow(flow, authenticated_payload) return authenticated_payload def send_google_claim_otp(flow: str) -> dict[str, Any]: flow_payload = get_google_flow(flow) if flow_payload.get("status") != "claim_required": raise ValidationError({"detail": "Google sign-in flow is not waiting for claim verification."}) mobile = flow_payload.get("mobile") if not isinstance(mobile, str) or not mobile: raise ValidationError({"detail": "Claim mobile number is missing."}) generate_and_send_otp(mobile, "login") return {"detail": "Verification code sent successfully."} def verify_google_claim(flow: str, code: str) -> dict[str, Any]: from django_redis import get_redis_connection flow_payload = get_google_flow(flow) if flow_payload.get("status") != "claim_required": raise ValidationError({"detail": "Google sign-in flow is not waiting for claim verification."}) mobile = flow_payload.get("mobile") if not isinstance(mobile, str) or not mobile: raise ValidationError({"detail": "Claim mobile number is missing."}) profile = _profile_from_flow(flow_payload) user_id = flow_payload.get("user_id") user = User.objects.filter(id=user_id, mobile=mobile).first() if not user: raise ValidationError({"detail": "Target account could not be found."}) redis_conn = get_redis_connection("default") stored_code = redis_conn.get(f"verification_code:{mobile}") if not stored_code or stored_code.decode("utf-8") != code: raise ValidationError({"code": "Invalid or expired verification code."}) redis_conn.delete(f"verification_code:{mobile}") existing_link = find_social_account_for_profile(profile) if existing_link and existing_link.user_id != user.id: raise ValidationError({"detail": "This Google account is already attached to another user."}) if existing_link: existing_link.email = profile.email existing_link.email_verified = profile.email_verified existing_link.avatar_url = profile.avatar_url existing_link.is_active = True existing_link.save( update_fields=["email", "email_verified", "avatar_url", "is_active", "updated_at"] ) else: UserSocialAccount.objects.create( user=user, provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id, email=profile.email, email_verified=profile.email_verified, avatar_url=profile.avatar_url, is_active=True, ) authenticated_payload = build_authenticated_flow_payload(user) update_google_flow(flow, authenticated_payload) return authenticated_payload