Files
guilan-ace-backend/apps/users/services/google_oauth.py
Amirhossein Khalili b7b21a6cc6
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
feat(backend): migrate auth and notifications off email
2026-05-21 10:28:04 +03:30

560 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import secrets
from dataclasses import asdict, dataclass
from typing import Any
from urllib.parse import urlencode
from urllib.parse import urlparse
import requests
from django.conf import settings
from django.core.cache import cache
from django.core.files.base import ContentFile
from apps.users.email_identity import (
is_placeholder_email,
is_valid_mobile_number,
mask_mobile,
normalize_email_identity,
normalize_mobile_number,
)
from apps.users.models import Major, University, User, UserSocialAccount
from apps.users.services.auth import (
AuthServiceError,
RegistrationPayload,
generate_and_send_otp,
get_tokens_for_user,
verify_otp_code,
)
GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
GOOGLE_USERINFO_URL = "https://openidconnect.googleapis.com/v1/userinfo"
GOOGLE_STATE_TTL_SECONDS = 300
GOOGLE_FLOW_TTL_SECONDS = 900
GOOGLE_STATE_CACHE_PREFIX = "google_oauth_state"
GOOGLE_FLOW_CACHE_PREFIX = "google_oauth_flow"
class GoogleOAuthFlowError(AuthServiceError):
def __init__(
self,
message: str,
*,
code: str = "google_flow_error",
status_code: int = 409,
extra: dict[str, Any] | None = None,
):
super().__init__(message, field="detail", status_code=status_code)
self.code = code
self.extra = extra or {}
@dataclass
class GoogleProfile:
provider_user_id: str
email: str
email_verified: bool
first_name: str
last_name: str
avatar_url: str
def _cache_key(prefix: str, token: str) -> str:
return f"{prefix}:{token}"
def _create_token() -> str:
return secrets.token_urlsafe(32)
def _required_setting(name: str) -> str:
value = getattr(settings, name, "")
if not value:
raise GoogleOAuthFlowError(f"{name} is not configured.", status_code=500)
return value
def _public_flow_payload(flow_payload: dict[str, Any]) -> dict[str, Any]:
status = flow_payload.get("status")
base = {"status": status}
for key in (
"email",
"first_name",
"last_name",
"avatar_url",
"resolution",
"mobile",
"mobile_hint",
"detail",
"access_token",
"refresh_token",
):
if key in flow_payload:
base[key] = flow_payload[key]
return base
def _profile_payload(profile: GoogleProfile) -> dict[str, Any]:
return asdict(profile)
def _profile_from_payload(payload: dict[str, Any]) -> GoogleProfile:
raw = payload.get("google_profile")
if not isinstance(raw, dict):
raise GoogleOAuthFlowError(
"Google flow profile data is missing.",
code="google_flow_invalid_state",
status_code=400,
)
return GoogleProfile(**raw)
def create_google_state() -> str:
state = _create_token()
cache.set(_cache_key(GOOGLE_STATE_CACHE_PREFIX, state), {"valid": True}, GOOGLE_STATE_TTL_SECONDS)
return state
def consume_google_state(state: str) -> None:
key = _cache_key(GOOGLE_STATE_CACHE_PREFIX, state or "")
payload = cache.get(key)
cache.delete(key)
if not payload:
raise GoogleOAuthFlowError(
"Google sign-in state is invalid or expired.",
code="google_flow_invalid_state",
status_code=400,
)
def create_google_flow(payload: dict[str, Any]) -> str:
flow = _create_token()
cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS)
return flow
def get_google_flow_payload(flow: str) -> dict[str, Any]:
payload = cache.get(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow))
if not payload:
raise GoogleOAuthFlowError(
"Google sign-in flow is invalid or expired.",
code="google_flow_invalid_state",
status_code=400,
)
return payload
def get_google_flow(flow: str) -> dict[str, Any]:
return _public_flow_payload(get_google_flow_payload(flow))
def update_google_flow(flow: str, payload: dict[str, Any]) -> None:
cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS)
def build_google_authorization_url() -> str:
state = create_google_state()
params = {
"client_id": _required_setting("GOOGLE_OAUTH_CLIENT_ID"),
"redirect_uri": _required_setting("GOOGLE_OAUTH_REDIRECT_URI"),
"response_type": "code",
"scope": "openid email profile",
"state": state,
"access_type": "online",
"prompt": "select_account",
}
return f"{GOOGLE_AUTH_URL}?{urlencode(params)}"
def exchange_code_for_google_profile(code: str) -> GoogleProfile:
if not code:
raise GoogleOAuthFlowError("Missing Google authorization code.", status_code=400)
try:
token_response = requests.post(
GOOGLE_TOKEN_URL,
data={
"code": code,
"client_id": _required_setting("GOOGLE_OAUTH_CLIENT_ID"),
"client_secret": _required_setting("GOOGLE_OAUTH_CLIENT_SECRET"),
"redirect_uri": _required_setting("GOOGLE_OAUTH_REDIRECT_URI"),
"grant_type": "authorization_code",
},
timeout=10,
)
token_response.raise_for_status()
token_payload = token_response.json()
except requests.RequestException as exc:
raise GoogleOAuthFlowError("Google token exchange failed.", status_code=400) from exc
access_token = token_payload.get("access_token")
if not access_token:
raise GoogleOAuthFlowError("Google did not return an access token.", status_code=400)
try:
userinfo_response = requests.get(
GOOGLE_USERINFO_URL,
headers={"Authorization": f"Bearer {access_token}"},
timeout=10,
)
userinfo_response.raise_for_status()
userinfo = userinfo_response.json()
except requests.RequestException as exc:
raise GoogleOAuthFlowError("Google user profile lookup failed.", status_code=400) from exc
email = normalize_email_identity(userinfo.get("email"))
if not userinfo.get("sub") or not email or not userinfo.get("email_verified"):
raise GoogleOAuthFlowError(
"Google account must have a verified email address.",
status_code=400,
)
return GoogleProfile(
provider_user_id=userinfo.get("sub"),
email=email,
email_verified=bool(userinfo.get("email_verified")),
first_name=userinfo.get("given_name", "") or "",
last_name=userinfo.get("family_name", "") or "",
avatar_url=userinfo.get("picture", "") or "",
)
def build_google_callback_redirect_url(flow: str) -> str:
callback = _required_setting("GOOGLE_OAUTH_FRONTEND_CALLBACK_URL")
sep = "&" if "?" in callback else "?"
return f"{callback}{sep}flow={flow}"
def find_social_account_for_profile(profile: GoogleProfile) -> UserSocialAccount | None:
return (
UserSocialAccount.objects.select_related("user")
.filter(provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id)
.first()
)
def _avatar_file_extension(profile: GoogleProfile) -> str:
path = urlparse(profile.avatar_url or "").path
if "." in path:
suffix = path.rsplit(".", 1)[-1].lower()
if suffix in {"jpg", "jpeg", "png", "webp", "gif"}:
return suffix
return "jpg"
def sync_user_from_google_profile(user: User, profile: GoogleProfile) -> None:
update_fields: list[str] = []
if not user.first_name and profile.first_name:
user.first_name = profile.first_name
update_fields.append("first_name")
if not user.last_name and profile.last_name:
user.last_name = profile.last_name
update_fields.append("last_name")
if not user.email and profile.email:
user.email = profile.email
user.is_email_verified = True
update_fields.extend(["email", "is_email_verified"])
if not user.profile_picture and profile.avatar_url:
try:
avatar_response = requests.get(profile.avatar_url, timeout=10)
avatar_response.raise_for_status()
except requests.RequestException:
avatar_response = None
if avatar_response and avatar_response.content:
filename = f"google-{profile.provider_user_id}.{_avatar_file_extension(profile)}"
user.profile_picture.save(filename, ContentFile(avatar_response.content), save=False)
update_fields.append("profile_picture")
if update_fields:
user.save(update_fields=update_fields)
def build_authenticated_flow_payload(user: User) -> dict[str, Any]:
tokens = get_tokens_for_user(user)
return {
"status": "authenticated",
"access_token": tokens["access_token"],
"refresh_token": tokens["refresh_token"],
}
def _is_google_claim_candidate(user: User | None) -> bool:
if user is None:
return False
if is_placeholder_email(user.email):
return False
normalized_email = normalize_email_identity(user.email)
return bool(normalized_email)
def build_pending_google_flow_payload(profile: GoogleProfile) -> dict[str, Any]:
existing_email_user = None
if not is_placeholder_email(profile.email):
existing_email_user = User.objects.filter(email=profile.email).first()
resolution = "existing_email_claim" if _is_google_claim_candidate(existing_email_user) else "new_account"
mobile_hint = mask_mobile(existing_email_user.mobile) if existing_email_user else None
has_verified_mobile = bool(existing_email_user and existing_email_user.mobile and existing_email_user.is_mobile_verified)
return {
"status": "collect_profile",
"google_profile": _profile_payload(profile),
"email": profile.email,
"first_name": profile.first_name,
"last_name": profile.last_name,
"avatar_url": profile.avatar_url,
"resolution": resolution,
"target_user_id": existing_email_user.id if existing_email_user else None,
"mobile_hint": mobile_hint,
"has_verified_mobile": has_verified_mobile,
}
def _resolve_major(code: str | None) -> Major | None:
if not code:
return None
return Major.objects.filter(code=code, is_deleted=False).first()
def _resolve_university(code: str | None) -> University | None:
if not code:
return None
return University.objects.filter(code=code, is_deleted=False).first()
def complete_google_signup(
*,
flow: str,
mobile: str,
username: str | None = None,
student_id: str | None = None,
year_of_study: int | None = None,
major: str | None = None,
university: str | None = None,
first_name: str | None = None,
last_name: str | None = None,
) -> dict[str, Any]:
flow_payload = get_google_flow_payload(flow)
if flow_payload.get("status") != "collect_profile":
raise GoogleOAuthFlowError(
"Google sign-in flow is in an unexpected state.",
code="google_flow_invalid_state",
status_code=400,
)
normalized_mobile = normalize_mobile_number(mobile)
if not is_valid_mobile_number(normalized_mobile):
raise GoogleOAuthFlowError("شماره موبایل معتبر نیست.", status_code=400)
profile = _profile_from_payload(flow_payload)
resolution = flow_payload.get("resolution", "new_account")
target_user_id = flow_payload.get("target_user_id")
target_user = User.objects.filter(pk=target_user_id).first() if target_user_id else None
if resolution == "existing_email_claim":
if target_user is None:
raise GoogleOAuthFlowError(
"Target account could not be found.",
code="google_flow_invalid_state",
status_code=400,
)
conflict = User.objects.filter(mobile=normalized_mobile).exclude(pk=target_user.pk).first()
if conflict and normalize_email_identity(conflict.email) not in (None, profile.email):
raise GoogleOAuthFlowError(
"این شماره موبایل قبلاً به حساب دیگری متصل شده است.",
code="google_mobile_belongs_to_other_email",
status_code=409,
)
generate_and_send_otp(normalized_mobile, "google_claim")
claim_payload = {
"status": "claim_required",
"google_profile": _profile_payload(profile),
"resolution": resolution,
"target_user_id": target_user.id,
"mobile": normalized_mobile,
"email": profile.email,
"mobile_hint": mask_mobile(normalized_mobile),
"detail": "مالکیت شماره موبایل را تایید کنید تا ورود با گوگل تکمیل شود.",
}
update_google_flow(flow, claim_payload)
return _public_flow_payload(claim_payload)
if not username:
raise GoogleOAuthFlowError("نام کاربری الزامی است.", code="google_missing_username", status_code=400)
if User.objects.filter(username=username).exists():
raise GoogleOAuthFlowError("نام کاربری قبلاً استفاده شده است.", code="google_username_taken", status_code=400)
if student_id and len(str(student_id)) < 10:
raise GoogleOAuthFlowError("شماره دانشجویی باید حداقل ۱۰ رقم باشد.", code="google_invalid_student_id", status_code=400)
major_obj = _resolve_major(major)
if major and not major_obj:
raise GoogleOAuthFlowError("رشته انتخابی معتبر نیست.", code="google_invalid_major", status_code=400)
university_obj = _resolve_university(university)
if university and not university_obj:
raise GoogleOAuthFlowError("دانشگاه انتخابی معتبر نیست.", code="google_invalid_university", status_code=400)
if student_id and university_obj and User.objects.filter(university=university_obj, student_id=student_id).exists():
raise GoogleOAuthFlowError(
"این شماره دانشجویی در دانشگاه انتخابی قبلاً ثبت شده است.",
code="google_student_id_taken",
status_code=400,
)
conflict = User.objects.filter(mobile=normalized_mobile).first()
if conflict:
existing_email = normalize_email_identity(conflict.email)
if existing_email not in (None, profile.email):
raise GoogleOAuthFlowError(
"این شماره موبایل قبلاً به حساب دیگری متصل شده است.",
code="google_mobile_belongs_to_other_email",
status_code=409,
)
generate_and_send_otp(normalized_mobile, "google_claim")
claim_payload = {
"status": "claim_required",
"google_profile": _profile_payload(profile),
"resolution": "new_account",
"mobile": normalized_mobile,
"email": profile.email,
"username": username,
"student_id": student_id,
"year_of_study": year_of_study,
"major": major,
"university": university,
"first_name": first_name or profile.first_name,
"last_name": last_name or profile.last_name,
"detail": "کد تایید موبایل را وارد کنید تا حساب جدید شما ساخته شود.",
}
update_google_flow(flow, claim_payload)
return _public_flow_payload(claim_payload)
def send_google_claim_otp(flow: str) -> dict[str, str]:
flow_payload = get_google_flow_payload(flow)
if flow_payload.get("status") != "claim_required":
raise GoogleOAuthFlowError(
"Google sign-in flow is in an unexpected state.",
code="google_flow_invalid_state",
status_code=400,
)
mobile = flow_payload.get("mobile")
if not isinstance(mobile, str) or not mobile:
raise GoogleOAuthFlowError("Claim mobile number is missing.", status_code=400)
generate_and_send_otp(mobile, "google_claim")
return {"message": "کد تایید مجدداً ارسال شد."}
def _link_google_account(*, user: User, profile: GoogleProfile) -> None:
social = find_social_account_for_profile(profile)
if social and social.user_id != user.id:
raise GoogleOAuthFlowError(
"این حساب گوگل قبلاً به کاربر دیگری متصل شده است.",
code="google_already_linked",
status_code=409,
)
sync_user_from_google_profile(user, profile)
if social:
social.email = profile.email
social.email_verified = profile.email_verified
social.avatar_url = profile.avatar_url
social.is_active = True
social.save(update_fields=["email", "email_verified", "avatar_url", "is_active"])
return
UserSocialAccount.objects.create(
user=user,
provider=UserSocialAccount.ProviderType.GOOGLE,
provider_user_id=profile.provider_user_id,
email=profile.email,
email_verified=profile.email_verified,
avatar_url=profile.avatar_url,
is_active=True,
)
def verify_google_claim(flow: str, code: str) -> dict[str, Any]:
flow_payload = get_google_flow_payload(flow)
if flow_payload.get("status") != "claim_required":
raise GoogleOAuthFlowError(
"Google sign-in flow is in an unexpected state.",
code="google_flow_invalid_state",
status_code=400,
)
mobile = flow_payload.get("mobile")
if not isinstance(mobile, str) or not mobile:
raise GoogleOAuthFlowError("Claim mobile number is missing.", status_code=400)
verify_otp_code(mobile=mobile, code=code, mode="google_claim")
profile = _profile_from_payload(flow_payload)
resolution = flow_payload.get("resolution", "new_account")
if resolution == "existing_email_claim":
target_user_id = flow_payload.get("target_user_id")
user = User.objects.filter(pk=target_user_id).first()
if user is None or normalize_email_identity(user.email) != profile.email:
raise GoogleOAuthFlowError(
"The matching account could not be verified.",
code="google_email_claim_failed",
status_code=409,
)
user.mobile = mobile
user.is_mobile_verified = True
user.is_email_verified = True
user.save(update_fields=["mobile", "is_mobile_verified", "is_email_verified"])
_link_google_account(user=user, profile=profile)
authenticated = build_authenticated_flow_payload(user)
update_google_flow(flow, authenticated)
return _public_flow_payload(authenticated)
normalized_email = normalize_email_identity(profile.email)
if normalized_email and User.objects.filter(email=normalized_email).exists():
raise GoogleOAuthFlowError(
"این ایمیل قبلاً به حساب دیگری متصل شده است.",
code="google_email_taken",
status_code=409,
)
payload = RegistrationPayload(
username=flow_payload.get("username", ""),
mobile=mobile,
password=_create_token(),
code=code,
email=normalized_email,
first_name=flow_payload.get("first_name") or profile.first_name,
last_name=flow_payload.get("last_name") or profile.last_name,
university=flow_payload.get("university"),
student_id=flow_payload.get("student_id"),
year_of_study=flow_payload.get("year_of_study"),
major=flow_payload.get("major"),
)
if not payload.username:
raise GoogleOAuthFlowError("نام کاربری برای ساخت حساب جدید الزامی است.", status_code=400)
major_obj = _resolve_major(payload.major)
university_obj = _resolve_university(payload.university)
user = User.objects.create_user(
username=payload.username,
email=payload.email,
mobile=payload.mobile,
password=None,
first_name=payload.first_name or "",
last_name=payload.last_name or "",
student_id=payload.student_id,
year_of_study=payload.year_of_study,
major=major_obj,
university=university_obj,
is_mobile_verified=True,
is_email_verified=bool(payload.email),
)
user.set_unusable_password()
user.save(update_fields=["password"])
_link_google_account(user=user, profile=profile)
authenticated = build_authenticated_flow_payload(user)
update_google_flow(flow, authenticated)
return _public_flow_payload(authenticated)