Files
Amirhossein Khalili b79fd73403
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
fix(oauth): add callback error page for google oauth flow
2026-05-22 01:01:21 +03:30

617 lines
22 KiB
Python

from __future__ import annotations
import logging
import secrets
from dataclasses import asdict, dataclass, is_dataclass
from typing import Any
from urllib.parse import urlencode, urlparse
import requests
from django.conf import settings
from django.core.cache import cache
from django.core.files.base import ContentFile
from rest_framework.exceptions import APIException, ValidationError
from apps.users.email_identity import mask_mobile, normalize_email_identity
from apps.users.models import User, UserSocialAccount
from apps.users.services.auth import generate_and_send_otp, get_tokens_for_user
GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
GOOGLE_USERINFO_URL = "https://openidconnect.googleapis.com/v1/userinfo"
GOOGLE_STATE_TTL_SECONDS = 300
GOOGLE_FLOW_TTL_SECONDS = 900
GOOGLE_STATE_CACHE_PREFIX = "google_oauth_state"
GOOGLE_FLOW_CACHE_PREFIX = "google_oauth_flow"
logger = logging.getLogger(__name__)
class GoogleOAuthFlowError(APIException):
status_code = 409
default_detail = "Google sign-in could not be completed."
default_code = "google_flow_error"
def __init__(
self,
detail: str,
code: str,
*,
status_code: int | None = None,
extra: dict[str, Any] | None = None,
) -> None:
if status_code is not None:
self.status_code = status_code
self.payload_extra = extra or {}
super().__init__(detail=detail, code=code)
@dataclass
class GoogleProfile:
provider_user_id: str
email: str
email_verified: bool
first_name: str
last_name: str
avatar_url: str
def _google_required_setting(name: str) -> str:
value = getattr(settings, name, "")
if not value:
raise ValidationError({"detail": f"{name} is not configured."})
return value
def _cache_key(prefix: str, token: str) -> str:
return f"{prefix}:{token}"
def _create_token() -> str:
return secrets.token_urlsafe(32)
def _invalid_flow_error(message: str = "Google sign-in flow is invalid or expired.") -> GoogleOAuthFlowError:
return GoogleOAuthFlowError(message, "google_flow_invalid_state", status_code=400)
def _ensure_flow_status(flow_payload: dict[str, Any], expected_status: str) -> None:
if flow_payload.get("status") != expected_status:
raise _invalid_flow_error("Google sign-in flow is in an unexpected state.")
def _profile_to_payload(profile: GoogleProfile) -> dict[str, Any]:
return asdict(profile) if is_dataclass(profile) else {
"provider_user_id": profile.provider_user_id,
"email": profile.email,
"email_verified": profile.email_verified,
"first_name": profile.first_name,
"last_name": profile.last_name,
"avatar_url": profile.avatar_url,
}
def _profile_from_flow(flow_payload: dict[str, Any]) -> GoogleProfile:
google_profile = flow_payload.get("google_profile")
if not isinstance(google_profile, dict):
raise _invalid_flow_error("Google profile data is missing from the flow.")
return GoogleProfile(**google_profile)
def _find_user_by_email(email: str | None) -> User | None:
normalized_email = normalize_email_identity(email)
if normalized_email is None:
return None
return User.objects.filter(email=normalized_email).first()
def _normalize_mobile(mobile: str) -> str:
normalized = "".join(ch for ch in mobile if ch.isdigit())
if len(normalized) != 11 or not normalized.startswith("09"):
raise ValidationError({"mobile": "Invalid mobile number."})
return normalized
def _avatar_file_extension(profile: GoogleProfile) -> str:
path = urlparse(profile.avatar_url or "").path
if "." in path:
suffix = path.rsplit(".", 1)[-1].lower()
if suffix in {"jpg", "jpeg", "png", "webp", "gif"}:
return suffix
return "jpg"
def sync_user_from_google_profile(user: User, profile: GoogleProfile) -> None:
update_fields: list[str] = []
if not user.first_name and profile.first_name:
user.first_name = profile.first_name
update_fields.append("first_name")
if not user.last_name and profile.last_name:
user.last_name = profile.last_name
update_fields.append("last_name")
if normalize_email_identity(user.email) is None and profile.email:
user.email = profile.email
update_fields.append("email")
if not user.profile_picture and profile.avatar_url:
try:
avatar_response = requests.get(profile.avatar_url, timeout=10)
avatar_response.raise_for_status()
except requests.RequestException:
avatar_response = None
if avatar_response and avatar_response.content:
filename = f"google-{profile.provider_user_id}.{_avatar_file_extension(profile)}"
user.profile_picture.save(filename, ContentFile(avatar_response.content), save=False)
update_fields.append("profile_picture")
if update_fields:
user.save(update_fields=update_fields)
def _build_claim_required_payload(
*,
profile: GoogleProfile,
user: User,
mobile: str,
resolution: str,
detail: str,
) -> dict[str, Any]:
return {
"status": "claim_required",
"google_profile": _profile_to_payload(profile),
"mobile": mobile,
"user_id": str(user.id),
"resolution": resolution,
"email": profile.email,
"mobile_hint": mask_mobile(user.mobile),
"detail": detail,
}
def _create_user_and_social_account_from_google_profile(*, mobile: str, profile: GoogleProfile) -> User:
user = User.objects.create_user(
mobile=mobile,
password=None,
first_name=profile.first_name,
last_name=profile.last_name,
email=profile.email,
is_verified=True,
is_active=True,
)
user.set_unusable_password()
user.save(update_fields=["password"])
sync_user_from_google_profile(user, profile)
UserSocialAccount.objects.create(
user=user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id=profile.provider_user_id,
email=profile.email,
email_verified=profile.email_verified,
avatar_url=profile.avatar_url,
is_active=True,
)
return user
def _build_public_google_flow_payload(flow_payload: dict[str, Any]) -> dict[str, Any]:
status = flow_payload.get("status")
if status == "authenticated":
return {
"status": "authenticated",
"access": flow_payload.get("access", ""),
"refresh": flow_payload.get("refresh", ""),
}
if status == "collect_mobile":
return {
"status": "collect_mobile",
"email": flow_payload.get("email", ""),
"first_name": flow_payload.get("first_name", ""),
"last_name": flow_payload.get("last_name", ""),
"avatar_url": flow_payload.get("avatar_url", ""),
"resolution": flow_payload.get("resolution", "new_account"),
"mobile_hint": flow_payload.get("mobile_hint"),
}
if status == "claim_required":
return {
"status": "claim_required",
"mobile": flow_payload.get("mobile", ""),
"detail": flow_payload.get("detail", ""),
"resolution": flow_payload.get("resolution", "existing_mobile_claim"),
"email": flow_payload.get("email", ""),
"mobile_hint": flow_payload.get("mobile_hint"),
}
raise _invalid_flow_error("Google sign-in flow is in an unknown state.")
def create_google_state() -> str:
state = _create_token()
cache.set(_cache_key(GOOGLE_STATE_CACHE_PREFIX, state), {"valid": True}, GOOGLE_STATE_TTL_SECONDS)
return state
def consume_google_state(state: str) -> dict[str, Any]:
if not state:
raise ValidationError({"detail": "Missing OAuth state."})
key = _cache_key(GOOGLE_STATE_CACHE_PREFIX, state)
payload = cache.get(key)
cache.delete(key)
if not payload:
raise ValidationError({"detail": "Invalid or expired OAuth state."})
return payload
def create_google_flow(payload: dict[str, Any]) -> str:
flow = _create_token()
cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS)
return flow
def get_google_flow_payload(flow: str) -> dict[str, Any]:
payload = cache.get(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow))
if not payload:
raise _invalid_flow_error()
return payload
def get_google_flow(flow: str) -> dict[str, Any]:
return _build_public_google_flow_payload(get_google_flow_payload(flow))
def delete_google_flow(flow: str) -> None:
cache.delete(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow))
def update_google_flow(flow: str, payload: dict[str, Any]) -> None:
cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS)
def build_google_authorization_url() -> str:
state = create_google_state()
params = {
"client_id": _google_required_setting("GOOGLE_OAUTH_CLIENT_ID"),
"redirect_uri": _google_required_setting("GOOGLE_OAUTH_REDIRECT_URI"),
"response_type": "code",
"scope": "openid email profile",
"state": state,
"access_type": "online",
"prompt": "select_account",
}
return f"{GOOGLE_AUTH_URL}?{urlencode(params)}"
def exchange_code_for_google_profile(code: str) -> GoogleProfile:
if not code:
raise ValidationError({"detail": "Missing Google authorization code."})
try:
token_response = requests.post(
GOOGLE_TOKEN_URL,
data={
"code": code,
"client_id": _google_required_setting("GOOGLE_OAUTH_CLIENT_ID"),
"client_secret": _google_required_setting("GOOGLE_OAUTH_CLIENT_SECRET"),
"redirect_uri": _google_required_setting("GOOGLE_OAUTH_REDIRECT_URI"),
"grant_type": "authorization_code",
},
timeout=10,
)
token_response.raise_for_status()
token_payload = token_response.json()
except requests.RequestException as exc:
response = getattr(exc, "response", None)
logger.warning(
"Google token exchange failed",
extra={
"google_status_code": getattr(response, "status_code", None),
"google_response_text": getattr(response, "text", "")[:1000] if response is not None else "",
"google_redirect_uri": getattr(settings, "GOOGLE_OAUTH_REDIRECT_URI", ""),
},
exc_info=True,
)
raise ValidationError({"detail": "Google token exchange failed."}) from exc
access_token = token_payload.get("access_token")
if not access_token:
raise ValidationError({"detail": "Google did not return an access token."})
try:
userinfo_response = requests.get(
GOOGLE_USERINFO_URL,
headers={"Authorization": f"Bearer {access_token}"},
timeout=10,
)
userinfo_response.raise_for_status()
userinfo = userinfo_response.json()
except requests.RequestException as exc:
response = getattr(exc, "response", None)
logger.warning(
"Google user profile lookup failed",
extra={
"google_status_code": getattr(response, "status_code", None),
"google_response_text": getattr(response, "text", "")[:1000] if response is not None else "",
},
exc_info=True,
)
raise ValidationError({"detail": "Google user profile lookup failed."}) from exc
provider_user_id = userinfo.get("sub", "")
email = normalize_email_identity(userinfo.get("email"))
email_verified = bool(userinfo.get("email_verified"))
if not provider_user_id or not email or not email_verified:
raise ValidationError({"detail": "Google account must have a verified email address."})
return GoogleProfile(
provider_user_id=provider_user_id,
email=email,
email_verified=email_verified,
first_name=userinfo.get("given_name", "") or "",
last_name=userinfo.get("family_name", "") or "",
avatar_url=userinfo.get("picture", "") or "",
)
def get_frontend_google_callback_url() -> str:
return _google_required_setting("GOOGLE_OAUTH_FRONTEND_CALLBACK_URL")
def build_google_callback_redirect_url(flow: str) -> str:
return f"{get_frontend_google_callback_url()}?flow={flow}"
def build_google_callback_error_redirect_url(*, code: str, detail: str) -> str:
params = urlencode(
{
"error": code,
"error_description": detail,
}
)
return f"{get_frontend_google_callback_url()}?{params}"
def find_social_account_for_profile(profile: GoogleProfile) -> UserSocialAccount | None:
return (
UserSocialAccount.objects.select_related("user")
.filter(
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id=profile.provider_user_id,
)
.first()
)
def build_authenticated_flow_payload(user: User) -> dict[str, Any]:
tokens = get_tokens_for_user(user)
return {
"status": "authenticated",
"access": tokens["access"],
"refresh": tokens["refresh"],
}
def build_pending_google_flow_payload(profile: GoogleProfile) -> dict[str, Any]:
existing_email_user = _find_user_by_email(profile.email)
return {
"status": "collect_mobile",
"google_profile": _profile_to_payload(profile),
"email": profile.email,
"first_name": profile.first_name,
"last_name": profile.last_name,
"avatar_url": profile.avatar_url,
"resolution": "existing_email_claim" if existing_email_user else "new_account",
"target_user_id": str(existing_email_user.id) if existing_email_user else None,
"mobile_hint": mask_mobile(existing_email_user.mobile) if existing_email_user else None,
}
def complete_google_signup(flow: str, mobile: str) -> dict[str, Any]:
flow_payload = get_google_flow_payload(flow)
_ensure_flow_status(flow_payload, "collect_mobile")
normalized_mobile = _normalize_mobile(mobile)
profile = _profile_from_flow(flow_payload)
email_matched_user = None
target_user_id = flow_payload.get("target_user_id")
if target_user_id:
email_matched_user = User.objects.filter(id=target_user_id).first()
if email_matched_user is None:
raise _invalid_flow_error("The Google sign-in target account could not be found.")
existing_mobile_user = User.objects.filter(mobile=normalized_mobile).first()
if email_matched_user is not None:
if normalized_mobile != email_matched_user.mobile:
raise GoogleOAuthFlowError(
"This Google account must be verified with the mobile number already attached to the matching account.",
"google_email_mobile_conflict",
extra={"mobile_hint": mask_mobile(email_matched_user.mobile)},
)
generate_and_send_otp(normalized_mobile, "login")
claim_payload = _build_claim_required_payload(
profile=profile,
user=email_matched_user,
mobile=normalized_mobile,
resolution="existing_email_claim",
detail="Existing account found for this email. Verify the linked mobile number to attach Google.",
)
update_google_flow(flow, claim_payload)
return _build_public_google_flow_payload(claim_payload)
if existing_mobile_user is not None:
existing_mobile_email = normalize_email_identity(existing_mobile_user.email)
if existing_mobile_email:
raise GoogleOAuthFlowError(
"This mobile number already belongs to another account with a different email address.",
"google_mobile_belongs_to_other_email",
)
generate_and_send_otp(normalized_mobile, "login")
claim_payload = _build_claim_required_payload(
profile=profile,
user=existing_mobile_user,
mobile=normalized_mobile,
resolution="existing_mobile_claim",
detail=(
"Existing mobile account found. Verify ownership to attach "
"Google and set the verified email address."
),
)
update_google_flow(flow, claim_payload)
return _build_public_google_flow_payload(claim_payload)
generate_and_send_otp(normalized_mobile, "register")
signup_payload = {
"status": "claim_required",
"google_profile": _profile_to_payload(profile),
"mobile": normalized_mobile,
"user_id": None,
"resolution": "new_account",
"email": profile.email,
"mobile_hint": None,
"detail": "Verify this mobile number to finish creating your account with Google.",
}
update_google_flow(flow, signup_payload)
return _build_public_google_flow_payload(signup_payload)
def _verify_otp_code(*, mobile: str, code: str) -> None:
from django_redis import get_redis_connection
redis_conn = get_redis_connection("default")
stored_code = redis_conn.get(f"verification_code:{mobile}")
if not stored_code or stored_code.decode("utf-8") != code:
raise ValidationError({"code": "Invalid or expired verification code."})
redis_conn.delete(f"verification_code:{mobile}")
def _create_new_google_user_after_otp(*, mobile: str, profile: GoogleProfile) -> User:
existing_email_user = _find_user_by_email(profile.email)
if existing_email_user is not None:
raise GoogleOAuthFlowError(
"This Google email is already attached to an existing account.",
"google_email_already_claimed",
extra={"mobile_hint": mask_mobile(existing_email_user.mobile)},
)
existing_mobile_user = User.objects.filter(mobile=mobile).first()
if existing_mobile_user is not None:
existing_mobile_email = normalize_email_identity(existing_mobile_user.email)
if existing_mobile_email:
raise GoogleOAuthFlowError(
"This mobile number already belongs to another account with a different email address.",
"google_mobile_belongs_to_other_email",
)
raise GoogleOAuthFlowError(
"This mobile number is no longer available for creating a new Google account.",
"google_flow_invalid_state",
status_code=409,
)
existing_link = find_social_account_for_profile(profile)
if existing_link is not None:
raise GoogleOAuthFlowError(
"This Google account is already attached to another user.",
"google_email_already_claimed",
)
return _create_user_and_social_account_from_google_profile(mobile=mobile, profile=profile)
def send_google_claim_otp(flow: str) -> dict[str, Any]:
flow_payload = get_google_flow_payload(flow)
_ensure_flow_status(flow_payload, "claim_required")
mobile = flow_payload.get("mobile")
if not isinstance(mobile, str) or not mobile:
raise _invalid_flow_error("Claim mobile number is missing.")
resolution = flow_payload.get("resolution")
otp_mode = "register" if resolution == "new_account" else "login"
generate_and_send_otp(mobile, otp_mode)
return {"detail": "Verification code sent successfully."}
def verify_google_claim(flow: str, code: str) -> dict[str, Any]:
flow_payload = get_google_flow_payload(flow)
_ensure_flow_status(flow_payload, "claim_required")
mobile = flow_payload.get("mobile")
if not isinstance(mobile, str) or not mobile:
raise _invalid_flow_error("Claim mobile number is missing.")
profile = _profile_from_flow(flow_payload)
user_id = flow_payload.get("user_id")
resolution = flow_payload.get("resolution")
_verify_otp_code(mobile=mobile, code=code)
if resolution == "new_account":
user = _create_new_google_user_after_otp(mobile=mobile, profile=profile)
authenticated_payload = build_authenticated_flow_payload(user)
update_google_flow(flow, authenticated_payload)
return authenticated_payload
user = User.objects.filter(id=user_id, mobile=mobile).first()
if not user:
raise _invalid_flow_error("Target account could not be found.")
user_email = normalize_email_identity(user.email)
if resolution == "existing_email_claim" and user_email != profile.email:
raise GoogleOAuthFlowError(
"The matching email account could not be verified for this mobile number.",
"google_email_mobile_conflict",
extra={"mobile_hint": mask_mobile(user.mobile)},
)
if resolution == "existing_mobile_claim" and user_email not in (None, profile.email):
raise GoogleOAuthFlowError(
"This mobile number already belongs to another account with a different email address.",
"google_mobile_belongs_to_other_email",
)
existing_link = find_social_account_for_profile(profile)
if existing_link and existing_link.user_id != user.id:
raise GoogleOAuthFlowError(
"This Google account is already attached to another user.",
"google_email_already_claimed",
)
if user_email is None:
user.email = profile.email
user.save(update_fields=["email"])
sync_user_from_google_profile(user, profile)
if existing_link:
existing_link.email = profile.email
existing_link.email_verified = profile.email_verified
existing_link.avatar_url = profile.avatar_url
existing_link.is_active = True
existing_link.save(
update_fields=["email", "email_verified", "avatar_url", "is_active", "updated_at"]
)
else:
UserSocialAccount.objects.create(
user=user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id=profile.provider_user_id,
email=profile.email,
email_verified=profile.email_verified,
avatar_url=profile.avatar_url,
is_active=True,
)
authenticated_payload = build_authenticated_flow_payload(user)
update_google_flow(flow, authenticated_payload)
return authenticated_payload