from __future__ import annotations import secrets from dataclasses import asdict, dataclass from typing import Any from urllib.parse import urlencode from urllib.parse import urlparse import requests from django.conf import settings from django.core.cache import cache from django.core.files.base import ContentFile from apps.users.email_identity import ( is_placeholder_email, is_valid_mobile_number, mask_mobile, normalize_email_identity, normalize_mobile_number, ) from apps.users.models import Major, University, User, UserSocialAccount from apps.users.services.auth import ( AuthServiceError, RegistrationPayload, generate_and_send_otp, get_tokens_for_user, verify_otp_code, ) GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" GOOGLE_USERINFO_URL = "https://openidconnect.googleapis.com/v1/userinfo" GOOGLE_STATE_TTL_SECONDS = 300 GOOGLE_FLOW_TTL_SECONDS = 900 GOOGLE_STATE_CACHE_PREFIX = "google_oauth_state" GOOGLE_FLOW_CACHE_PREFIX = "google_oauth_flow" class GoogleOAuthFlowError(AuthServiceError): def __init__( self, message: str, *, code: str = "google_flow_error", status_code: int = 409, extra: dict[str, Any] | None = None, ): super().__init__(message, field="detail", status_code=status_code) self.code = code self.extra = extra or {} @dataclass class GoogleProfile: provider_user_id: str email: str email_verified: bool first_name: str last_name: str avatar_url: str def _cache_key(prefix: str, token: str) -> str: return f"{prefix}:{token}" def _create_token() -> str: return secrets.token_urlsafe(32) def _required_setting(name: str) -> str: value = getattr(settings, name, "") if not value: raise GoogleOAuthFlowError(f"{name} is not configured.", status_code=500) return value def _public_flow_payload(flow_payload: dict[str, Any]) -> dict[str, Any]: status = flow_payload.get("status") base = {"status": status} for key in ( "email", "first_name", "last_name", "avatar_url", "resolution", "mobile", "mobile_hint", "detail", "access_token", "refresh_token", ): if key in flow_payload: base[key] = flow_payload[key] return base def _profile_payload(profile: GoogleProfile) -> dict[str, Any]: return asdict(profile) def _profile_from_payload(payload: dict[str, Any]) -> GoogleProfile: raw = payload.get("google_profile") if not isinstance(raw, dict): raise GoogleOAuthFlowError( "Google flow profile data is missing.", code="google_flow_invalid_state", status_code=400, ) return GoogleProfile(**raw) def create_google_state() -> str: state = _create_token() cache.set(_cache_key(GOOGLE_STATE_CACHE_PREFIX, state), {"valid": True}, GOOGLE_STATE_TTL_SECONDS) return state def consume_google_state(state: str) -> None: key = _cache_key(GOOGLE_STATE_CACHE_PREFIX, state or "") payload = cache.get(key) cache.delete(key) if not payload: raise GoogleOAuthFlowError( "Google sign-in state is invalid or expired.", code="google_flow_invalid_state", status_code=400, ) def create_google_flow(payload: dict[str, Any]) -> str: flow = _create_token() cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS) return flow def get_google_flow_payload(flow: str) -> dict[str, Any]: payload = cache.get(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow)) if not payload: raise GoogleOAuthFlowError( "Google sign-in flow is invalid or expired.", code="google_flow_invalid_state", status_code=400, ) return payload def get_google_flow(flow: str) -> dict[str, Any]: return _public_flow_payload(get_google_flow_payload(flow)) def update_google_flow(flow: str, payload: dict[str, Any]) -> None: cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS) def build_google_authorization_url() -> str: state = create_google_state() params = { "client_id": _required_setting("GOOGLE_OAUTH_CLIENT_ID"), "redirect_uri": _required_setting("GOOGLE_OAUTH_REDIRECT_URI"), "response_type": "code", "scope": "openid email profile", "state": state, "access_type": "online", "prompt": "select_account", } return f"{GOOGLE_AUTH_URL}?{urlencode(params)}" def exchange_code_for_google_profile(code: str) -> GoogleProfile: if not code: raise GoogleOAuthFlowError("Missing Google authorization code.", status_code=400) try: token_response = requests.post( GOOGLE_TOKEN_URL, data={ "code": code, "client_id": _required_setting("GOOGLE_OAUTH_CLIENT_ID"), "client_secret": _required_setting("GOOGLE_OAUTH_CLIENT_SECRET"), "redirect_uri": _required_setting("GOOGLE_OAUTH_REDIRECT_URI"), "grant_type": "authorization_code", }, timeout=10, ) token_response.raise_for_status() token_payload = token_response.json() except requests.RequestException as exc: raise GoogleOAuthFlowError("Google token exchange failed.", status_code=400) from exc access_token = token_payload.get("access_token") if not access_token: raise GoogleOAuthFlowError("Google did not return an access token.", status_code=400) try: userinfo_response = requests.get( GOOGLE_USERINFO_URL, headers={"Authorization": f"Bearer {access_token}"}, timeout=10, ) userinfo_response.raise_for_status() userinfo = userinfo_response.json() except requests.RequestException as exc: raise GoogleOAuthFlowError("Google user profile lookup failed.", status_code=400) from exc email = normalize_email_identity(userinfo.get("email")) if not userinfo.get("sub") or not email or not userinfo.get("email_verified"): raise GoogleOAuthFlowError( "Google account must have a verified email address.", status_code=400, ) return GoogleProfile( provider_user_id=userinfo.get("sub"), email=email, email_verified=bool(userinfo.get("email_verified")), first_name=userinfo.get("given_name", "") or "", last_name=userinfo.get("family_name", "") or "", avatar_url=userinfo.get("picture", "") or "", ) def build_google_callback_redirect_url(flow: str) -> str: callback = _required_setting("GOOGLE_OAUTH_FRONTEND_CALLBACK_URL") sep = "&" if "?" in callback else "?" return f"{callback}{sep}flow={flow}" def find_social_account_for_profile(profile: GoogleProfile) -> UserSocialAccount | None: return ( UserSocialAccount.objects.select_related("user") .filter(provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id) .first() ) def _avatar_file_extension(profile: GoogleProfile) -> str: path = urlparse(profile.avatar_url or "").path if "." in path: suffix = path.rsplit(".", 1)[-1].lower() if suffix in {"jpg", "jpeg", "png", "webp", "gif"}: return suffix return "jpg" def sync_user_from_google_profile(user: User, profile: GoogleProfile) -> None: update_fields: list[str] = [] if not user.first_name and profile.first_name: user.first_name = profile.first_name update_fields.append("first_name") if not user.last_name and profile.last_name: user.last_name = profile.last_name update_fields.append("last_name") if not user.email and profile.email: user.email = profile.email user.is_email_verified = True update_fields.extend(["email", "is_email_verified"]) if not user.profile_picture and profile.avatar_url: try: avatar_response = requests.get(profile.avatar_url, timeout=10) avatar_response.raise_for_status() except requests.RequestException: avatar_response = None if avatar_response and avatar_response.content: filename = f"google-{profile.provider_user_id}.{_avatar_file_extension(profile)}" user.profile_picture.save(filename, ContentFile(avatar_response.content), save=False) update_fields.append("profile_picture") if update_fields: user.save(update_fields=update_fields) def build_authenticated_flow_payload(user: User) -> dict[str, Any]: tokens = get_tokens_for_user(user) return { "status": "authenticated", "access_token": tokens["access_token"], "refresh_token": tokens["refresh_token"], } def _is_google_claim_candidate(user: User | None) -> bool: if user is None: return False if is_placeholder_email(user.email): return False normalized_email = normalize_email_identity(user.email) return bool(normalized_email) def build_pending_google_flow_payload(profile: GoogleProfile) -> dict[str, Any]: existing_email_user = None if not is_placeholder_email(profile.email): existing_email_user = User.objects.filter(email=profile.email).first() resolution = "existing_email_claim" if _is_google_claim_candidate(existing_email_user) else "new_account" mobile_hint = mask_mobile(existing_email_user.mobile) if existing_email_user else None has_verified_mobile = bool(existing_email_user and existing_email_user.mobile and existing_email_user.is_mobile_verified) return { "status": "collect_profile", "google_profile": _profile_payload(profile), "email": profile.email, "first_name": profile.first_name, "last_name": profile.last_name, "avatar_url": profile.avatar_url, "resolution": resolution, "target_user_id": existing_email_user.id if existing_email_user else None, "mobile_hint": mobile_hint, "has_verified_mobile": has_verified_mobile, } def _resolve_major(code: str | None) -> Major | None: if not code: return None return Major.objects.filter(code=code, is_deleted=False).first() def _resolve_university(code: str | None) -> University | None: if not code: return None return University.objects.filter(code=code, is_deleted=False).first() def complete_google_signup( *, flow: str, mobile: str, username: str | None = None, student_id: str | None = None, year_of_study: int | None = None, major: str | None = None, university: str | None = None, first_name: str | None = None, last_name: str | None = None, ) -> dict[str, Any]: flow_payload = get_google_flow_payload(flow) if flow_payload.get("status") != "collect_profile": raise GoogleOAuthFlowError( "Google sign-in flow is in an unexpected state.", code="google_flow_invalid_state", status_code=400, ) normalized_mobile = normalize_mobile_number(mobile) if not is_valid_mobile_number(normalized_mobile): raise GoogleOAuthFlowError("شماره موبایل معتبر نیست.", status_code=400) profile = _profile_from_payload(flow_payload) resolution = flow_payload.get("resolution", "new_account") target_user_id = flow_payload.get("target_user_id") target_user = User.objects.filter(pk=target_user_id).first() if target_user_id else None if resolution == "existing_email_claim": if target_user is None: raise GoogleOAuthFlowError( "Target account could not be found.", code="google_flow_invalid_state", status_code=400, ) conflict = User.objects.filter(mobile=normalized_mobile).exclude(pk=target_user.pk).first() if conflict and normalize_email_identity(conflict.email) not in (None, profile.email): raise GoogleOAuthFlowError( "این شماره موبایل قبلاً به حساب دیگری متصل شده است.", code="google_mobile_belongs_to_other_email", status_code=409, ) generate_and_send_otp(normalized_mobile, "google_claim") claim_payload = { "status": "claim_required", "google_profile": _profile_payload(profile), "resolution": resolution, "target_user_id": target_user.id, "mobile": normalized_mobile, "email": profile.email, "mobile_hint": mask_mobile(normalized_mobile), "detail": "مالکیت شماره موبایل را تایید کنید تا ورود با گوگل تکمیل شود.", } update_google_flow(flow, claim_payload) return _public_flow_payload(claim_payload) if not username: raise GoogleOAuthFlowError("نام کاربری الزامی است.", code="google_missing_username", status_code=400) if User.objects.filter(username=username).exists(): raise GoogleOAuthFlowError("نام کاربری قبلاً استفاده شده است.", code="google_username_taken", status_code=400) if student_id and len(str(student_id)) < 10: raise GoogleOAuthFlowError("شماره دانشجویی باید حداقل ۱۰ رقم باشد.", code="google_invalid_student_id", status_code=400) major_obj = _resolve_major(major) if major and not major_obj: raise GoogleOAuthFlowError("رشته انتخابی معتبر نیست.", code="google_invalid_major", status_code=400) university_obj = _resolve_university(university) if university and not university_obj: raise GoogleOAuthFlowError("دانشگاه انتخابی معتبر نیست.", code="google_invalid_university", status_code=400) if student_id and university_obj and User.objects.filter(university=university_obj, student_id=student_id).exists(): raise GoogleOAuthFlowError( "این شماره دانشجویی در دانشگاه انتخابی قبلاً ثبت شده است.", code="google_student_id_taken", status_code=400, ) conflict = User.objects.filter(mobile=normalized_mobile).first() if conflict: existing_email = normalize_email_identity(conflict.email) if existing_email not in (None, profile.email): raise GoogleOAuthFlowError( "این شماره موبایل قبلاً به حساب دیگری متصل شده است.", code="google_mobile_belongs_to_other_email", status_code=409, ) generate_and_send_otp(normalized_mobile, "google_claim") claim_payload = { "status": "claim_required", "google_profile": _profile_payload(profile), "resolution": "new_account", "mobile": normalized_mobile, "email": profile.email, "username": username, "student_id": student_id, "year_of_study": year_of_study, "major": major, "university": university, "first_name": first_name or profile.first_name, "last_name": last_name or profile.last_name, "detail": "کد تایید موبایل را وارد کنید تا حساب جدید شما ساخته شود.", } update_google_flow(flow, claim_payload) return _public_flow_payload(claim_payload) def send_google_claim_otp(flow: str) -> dict[str, str]: flow_payload = get_google_flow_payload(flow) if flow_payload.get("status") != "claim_required": raise GoogleOAuthFlowError( "Google sign-in flow is in an unexpected state.", code="google_flow_invalid_state", status_code=400, ) mobile = flow_payload.get("mobile") if not isinstance(mobile, str) or not mobile: raise GoogleOAuthFlowError("Claim mobile number is missing.", status_code=400) generate_and_send_otp(mobile, "google_claim") return {"message": "کد تایید مجدداً ارسال شد."} def _link_google_account(*, user: User, profile: GoogleProfile) -> None: social = find_social_account_for_profile(profile) if social and social.user_id != user.id: raise GoogleOAuthFlowError( "این حساب گوگل قبلاً به کاربر دیگری متصل شده است.", code="google_already_linked", status_code=409, ) sync_user_from_google_profile(user, profile) if social: social.email = profile.email social.email_verified = profile.email_verified social.avatar_url = profile.avatar_url social.is_active = True social.save(update_fields=["email", "email_verified", "avatar_url", "is_active"]) return UserSocialAccount.objects.create( user=user, provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id, email=profile.email, email_verified=profile.email_verified, avatar_url=profile.avatar_url, is_active=True, ) def verify_google_claim(flow: str, code: str) -> dict[str, Any]: flow_payload = get_google_flow_payload(flow) if flow_payload.get("status") != "claim_required": raise GoogleOAuthFlowError( "Google sign-in flow is in an unexpected state.", code="google_flow_invalid_state", status_code=400, ) mobile = flow_payload.get("mobile") if not isinstance(mobile, str) or not mobile: raise GoogleOAuthFlowError("Claim mobile number is missing.", status_code=400) verify_otp_code(mobile=mobile, code=code, mode="google_claim") profile = _profile_from_payload(flow_payload) resolution = flow_payload.get("resolution", "new_account") if resolution == "existing_email_claim": target_user_id = flow_payload.get("target_user_id") user = User.objects.filter(pk=target_user_id).first() if user is None or normalize_email_identity(user.email) != profile.email: raise GoogleOAuthFlowError( "The matching account could not be verified.", code="google_email_claim_failed", status_code=409, ) user.mobile = mobile user.is_mobile_verified = True user.is_email_verified = True user.save(update_fields=["mobile", "is_mobile_verified", "is_email_verified"]) _link_google_account(user=user, profile=profile) authenticated = build_authenticated_flow_payload(user) update_google_flow(flow, authenticated) return _public_flow_payload(authenticated) normalized_email = normalize_email_identity(profile.email) if normalized_email and User.objects.filter(email=normalized_email).exists(): raise GoogleOAuthFlowError( "این ایمیل قبلاً به حساب دیگری متصل شده است.", code="google_email_taken", status_code=409, ) payload = RegistrationPayload( username=flow_payload.get("username", ""), mobile=mobile, password=_create_token(), code=code, email=normalized_email, first_name=flow_payload.get("first_name") or profile.first_name, last_name=flow_payload.get("last_name") or profile.last_name, university=flow_payload.get("university"), student_id=flow_payload.get("student_id"), year_of_study=flow_payload.get("year_of_study"), major=flow_payload.get("major"), ) if not payload.username: raise GoogleOAuthFlowError("نام کاربری برای ساخت حساب جدید الزامی است.", status_code=400) major_obj = _resolve_major(payload.major) university_obj = _resolve_university(payload.university) user = User.objects.create_user( username=payload.username, email=payload.email, mobile=payload.mobile, password=None, first_name=payload.first_name or "", last_name=payload.last_name or "", student_id=payload.student_id, year_of_study=payload.year_of_study, major=major_obj, university=university_obj, is_mobile_verified=True, is_email_verified=bool(payload.email), ) user.set_unusable_password() user.save(update_fields=["password"]) _link_google_account(user=user, profile=profile) authenticated = build_authenticated_flow_payload(user) update_google_flow(flow, authenticated) return _public_flow_payload(authenticated)