560 lines
20 KiB
Python
560 lines
20 KiB
Python
from __future__ import annotations
|
||
|
||
import secrets
|
||
from dataclasses import asdict, dataclass
|
||
from typing import Any
|
||
from urllib.parse import urlencode
|
||
from urllib.parse import urlparse
|
||
|
||
import requests
|
||
from django.conf import settings
|
||
from django.core.cache import cache
|
||
from django.core.files.base import ContentFile
|
||
|
||
from apps.users.email_identity import (
|
||
is_placeholder_email,
|
||
is_valid_mobile_number,
|
||
mask_mobile,
|
||
normalize_email_identity,
|
||
normalize_mobile_number,
|
||
)
|
||
from apps.users.models import Major, University, User, UserSocialAccount
|
||
from apps.users.services.auth import (
|
||
AuthServiceError,
|
||
RegistrationPayload,
|
||
generate_and_send_otp,
|
||
get_tokens_for_user,
|
||
verify_otp_code,
|
||
)
|
||
|
||
GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
|
||
GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
|
||
GOOGLE_USERINFO_URL = "https://openidconnect.googleapis.com/v1/userinfo"
|
||
GOOGLE_STATE_TTL_SECONDS = 300
|
||
GOOGLE_FLOW_TTL_SECONDS = 900
|
||
GOOGLE_STATE_CACHE_PREFIX = "google_oauth_state"
|
||
GOOGLE_FLOW_CACHE_PREFIX = "google_oauth_flow"
|
||
|
||
|
||
class GoogleOAuthFlowError(AuthServiceError):
|
||
def __init__(
|
||
self,
|
||
message: str,
|
||
*,
|
||
code: str = "google_flow_error",
|
||
status_code: int = 409,
|
||
extra: dict[str, Any] | None = None,
|
||
):
|
||
super().__init__(message, field="detail", status_code=status_code)
|
||
self.code = code
|
||
self.extra = extra or {}
|
||
|
||
|
||
@dataclass
|
||
class GoogleProfile:
|
||
provider_user_id: str
|
||
email: str
|
||
email_verified: bool
|
||
first_name: str
|
||
last_name: str
|
||
avatar_url: str
|
||
|
||
|
||
def _cache_key(prefix: str, token: str) -> str:
|
||
return f"{prefix}:{token}"
|
||
|
||
|
||
def _create_token() -> str:
|
||
return secrets.token_urlsafe(32)
|
||
|
||
|
||
def _required_setting(name: str) -> str:
|
||
value = getattr(settings, name, "")
|
||
if not value:
|
||
raise GoogleOAuthFlowError(f"{name} is not configured.", status_code=500)
|
||
return value
|
||
|
||
|
||
def _public_flow_payload(flow_payload: dict[str, Any]) -> dict[str, Any]:
|
||
status = flow_payload.get("status")
|
||
base = {"status": status}
|
||
for key in (
|
||
"email",
|
||
"first_name",
|
||
"last_name",
|
||
"avatar_url",
|
||
"resolution",
|
||
"mobile",
|
||
"mobile_hint",
|
||
"detail",
|
||
"access_token",
|
||
"refresh_token",
|
||
):
|
||
if key in flow_payload:
|
||
base[key] = flow_payload[key]
|
||
return base
|
||
|
||
|
||
def _profile_payload(profile: GoogleProfile) -> dict[str, Any]:
|
||
return asdict(profile)
|
||
|
||
|
||
def _profile_from_payload(payload: dict[str, Any]) -> GoogleProfile:
|
||
raw = payload.get("google_profile")
|
||
if not isinstance(raw, dict):
|
||
raise GoogleOAuthFlowError(
|
||
"Google flow profile data is missing.",
|
||
code="google_flow_invalid_state",
|
||
status_code=400,
|
||
)
|
||
return GoogleProfile(**raw)
|
||
|
||
|
||
def create_google_state() -> str:
|
||
state = _create_token()
|
||
cache.set(_cache_key(GOOGLE_STATE_CACHE_PREFIX, state), {"valid": True}, GOOGLE_STATE_TTL_SECONDS)
|
||
return state
|
||
|
||
|
||
def consume_google_state(state: str) -> None:
|
||
key = _cache_key(GOOGLE_STATE_CACHE_PREFIX, state or "")
|
||
payload = cache.get(key)
|
||
cache.delete(key)
|
||
if not payload:
|
||
raise GoogleOAuthFlowError(
|
||
"Google sign-in state is invalid or expired.",
|
||
code="google_flow_invalid_state",
|
||
status_code=400,
|
||
)
|
||
|
||
|
||
def create_google_flow(payload: dict[str, Any]) -> str:
|
||
flow = _create_token()
|
||
cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS)
|
||
return flow
|
||
|
||
|
||
def get_google_flow_payload(flow: str) -> dict[str, Any]:
|
||
payload = cache.get(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow))
|
||
if not payload:
|
||
raise GoogleOAuthFlowError(
|
||
"Google sign-in flow is invalid or expired.",
|
||
code="google_flow_invalid_state",
|
||
status_code=400,
|
||
)
|
||
return payload
|
||
|
||
|
||
def get_google_flow(flow: str) -> dict[str, Any]:
|
||
return _public_flow_payload(get_google_flow_payload(flow))
|
||
|
||
|
||
def update_google_flow(flow: str, payload: dict[str, Any]) -> None:
|
||
cache.set(_cache_key(GOOGLE_FLOW_CACHE_PREFIX, flow), payload, GOOGLE_FLOW_TTL_SECONDS)
|
||
|
||
|
||
def build_google_authorization_url() -> str:
|
||
state = create_google_state()
|
||
params = {
|
||
"client_id": _required_setting("GOOGLE_OAUTH_CLIENT_ID"),
|
||
"redirect_uri": _required_setting("GOOGLE_OAUTH_REDIRECT_URI"),
|
||
"response_type": "code",
|
||
"scope": "openid email profile",
|
||
"state": state,
|
||
"access_type": "online",
|
||
"prompt": "select_account",
|
||
}
|
||
return f"{GOOGLE_AUTH_URL}?{urlencode(params)}"
|
||
|
||
|
||
def exchange_code_for_google_profile(code: str) -> GoogleProfile:
|
||
if not code:
|
||
raise GoogleOAuthFlowError("Missing Google authorization code.", status_code=400)
|
||
|
||
try:
|
||
token_response = requests.post(
|
||
GOOGLE_TOKEN_URL,
|
||
data={
|
||
"code": code,
|
||
"client_id": _required_setting("GOOGLE_OAUTH_CLIENT_ID"),
|
||
"client_secret": _required_setting("GOOGLE_OAUTH_CLIENT_SECRET"),
|
||
"redirect_uri": _required_setting("GOOGLE_OAUTH_REDIRECT_URI"),
|
||
"grant_type": "authorization_code",
|
||
},
|
||
timeout=10,
|
||
)
|
||
token_response.raise_for_status()
|
||
token_payload = token_response.json()
|
||
except requests.RequestException as exc:
|
||
raise GoogleOAuthFlowError("Google token exchange failed.", status_code=400) from exc
|
||
|
||
access_token = token_payload.get("access_token")
|
||
if not access_token:
|
||
raise GoogleOAuthFlowError("Google did not return an access token.", status_code=400)
|
||
|
||
try:
|
||
userinfo_response = requests.get(
|
||
GOOGLE_USERINFO_URL,
|
||
headers={"Authorization": f"Bearer {access_token}"},
|
||
timeout=10,
|
||
)
|
||
userinfo_response.raise_for_status()
|
||
userinfo = userinfo_response.json()
|
||
except requests.RequestException as exc:
|
||
raise GoogleOAuthFlowError("Google user profile lookup failed.", status_code=400) from exc
|
||
|
||
email = normalize_email_identity(userinfo.get("email"))
|
||
if not userinfo.get("sub") or not email or not userinfo.get("email_verified"):
|
||
raise GoogleOAuthFlowError(
|
||
"Google account must have a verified email address.",
|
||
status_code=400,
|
||
)
|
||
|
||
return GoogleProfile(
|
||
provider_user_id=userinfo.get("sub"),
|
||
email=email,
|
||
email_verified=bool(userinfo.get("email_verified")),
|
||
first_name=userinfo.get("given_name", "") or "",
|
||
last_name=userinfo.get("family_name", "") or "",
|
||
avatar_url=userinfo.get("picture", "") or "",
|
||
)
|
||
|
||
|
||
def build_google_callback_redirect_url(flow: str) -> str:
|
||
callback = _required_setting("GOOGLE_OAUTH_FRONTEND_CALLBACK_URL")
|
||
sep = "&" if "?" in callback else "?"
|
||
return f"{callback}{sep}flow={flow}"
|
||
|
||
|
||
def find_social_account_for_profile(profile: GoogleProfile) -> UserSocialAccount | None:
|
||
return (
|
||
UserSocialAccount.objects.select_related("user")
|
||
.filter(provider=UserSocialAccount.ProviderType.GOOGLE, provider_user_id=profile.provider_user_id)
|
||
.first()
|
||
)
|
||
|
||
|
||
def _avatar_file_extension(profile: GoogleProfile) -> str:
|
||
path = urlparse(profile.avatar_url or "").path
|
||
if "." in path:
|
||
suffix = path.rsplit(".", 1)[-1].lower()
|
||
if suffix in {"jpg", "jpeg", "png", "webp", "gif"}:
|
||
return suffix
|
||
return "jpg"
|
||
|
||
|
||
def sync_user_from_google_profile(user: User, profile: GoogleProfile) -> None:
|
||
update_fields: list[str] = []
|
||
if not user.first_name and profile.first_name:
|
||
user.first_name = profile.first_name
|
||
update_fields.append("first_name")
|
||
if not user.last_name and profile.last_name:
|
||
user.last_name = profile.last_name
|
||
update_fields.append("last_name")
|
||
if not user.email and profile.email:
|
||
user.email = profile.email
|
||
user.is_email_verified = True
|
||
update_fields.extend(["email", "is_email_verified"])
|
||
if not user.profile_picture and profile.avatar_url:
|
||
try:
|
||
avatar_response = requests.get(profile.avatar_url, timeout=10)
|
||
avatar_response.raise_for_status()
|
||
except requests.RequestException:
|
||
avatar_response = None
|
||
if avatar_response and avatar_response.content:
|
||
filename = f"google-{profile.provider_user_id}.{_avatar_file_extension(profile)}"
|
||
user.profile_picture.save(filename, ContentFile(avatar_response.content), save=False)
|
||
update_fields.append("profile_picture")
|
||
if update_fields:
|
||
user.save(update_fields=update_fields)
|
||
|
||
|
||
def build_authenticated_flow_payload(user: User) -> dict[str, Any]:
|
||
tokens = get_tokens_for_user(user)
|
||
return {
|
||
"status": "authenticated",
|
||
"access_token": tokens["access_token"],
|
||
"refresh_token": tokens["refresh_token"],
|
||
}
|
||
|
||
|
||
def _is_google_claim_candidate(user: User | None) -> bool:
|
||
if user is None:
|
||
return False
|
||
if is_placeholder_email(user.email):
|
||
return False
|
||
normalized_email = normalize_email_identity(user.email)
|
||
return bool(normalized_email)
|
||
|
||
|
||
def build_pending_google_flow_payload(profile: GoogleProfile) -> dict[str, Any]:
|
||
existing_email_user = None
|
||
if not is_placeholder_email(profile.email):
|
||
existing_email_user = User.objects.filter(email=profile.email).first()
|
||
|
||
resolution = "existing_email_claim" if _is_google_claim_candidate(existing_email_user) else "new_account"
|
||
mobile_hint = mask_mobile(existing_email_user.mobile) if existing_email_user else None
|
||
has_verified_mobile = bool(existing_email_user and existing_email_user.mobile and existing_email_user.is_mobile_verified)
|
||
|
||
return {
|
||
"status": "collect_profile",
|
||
"google_profile": _profile_payload(profile),
|
||
"email": profile.email,
|
||
"first_name": profile.first_name,
|
||
"last_name": profile.last_name,
|
||
"avatar_url": profile.avatar_url,
|
||
"resolution": resolution,
|
||
"target_user_id": existing_email_user.id if existing_email_user else None,
|
||
"mobile_hint": mobile_hint,
|
||
"has_verified_mobile": has_verified_mobile,
|
||
}
|
||
|
||
|
||
def _resolve_major(code: str | None) -> Major | None:
|
||
if not code:
|
||
return None
|
||
return Major.objects.filter(code=code, is_deleted=False).first()
|
||
|
||
|
||
def _resolve_university(code: str | None) -> University | None:
|
||
if not code:
|
||
return None
|
||
return University.objects.filter(code=code, is_deleted=False).first()
|
||
|
||
|
||
def complete_google_signup(
|
||
*,
|
||
flow: str,
|
||
mobile: str,
|
||
username: str | None = None,
|
||
student_id: str | None = None,
|
||
year_of_study: int | None = None,
|
||
major: str | None = None,
|
||
university: str | None = None,
|
||
first_name: str | None = None,
|
||
last_name: str | None = None,
|
||
) -> dict[str, Any]:
|
||
flow_payload = get_google_flow_payload(flow)
|
||
if flow_payload.get("status") != "collect_profile":
|
||
raise GoogleOAuthFlowError(
|
||
"Google sign-in flow is in an unexpected state.",
|
||
code="google_flow_invalid_state",
|
||
status_code=400,
|
||
)
|
||
|
||
normalized_mobile = normalize_mobile_number(mobile)
|
||
if not is_valid_mobile_number(normalized_mobile):
|
||
raise GoogleOAuthFlowError("شماره موبایل معتبر نیست.", status_code=400)
|
||
|
||
profile = _profile_from_payload(flow_payload)
|
||
resolution = flow_payload.get("resolution", "new_account")
|
||
target_user_id = flow_payload.get("target_user_id")
|
||
target_user = User.objects.filter(pk=target_user_id).first() if target_user_id else None
|
||
|
||
if resolution == "existing_email_claim":
|
||
if target_user is None:
|
||
raise GoogleOAuthFlowError(
|
||
"Target account could not be found.",
|
||
code="google_flow_invalid_state",
|
||
status_code=400,
|
||
)
|
||
conflict = User.objects.filter(mobile=normalized_mobile).exclude(pk=target_user.pk).first()
|
||
if conflict and normalize_email_identity(conflict.email) not in (None, profile.email):
|
||
raise GoogleOAuthFlowError(
|
||
"این شماره موبایل قبلاً به حساب دیگری متصل شده است.",
|
||
code="google_mobile_belongs_to_other_email",
|
||
status_code=409,
|
||
)
|
||
generate_and_send_otp(normalized_mobile, "google_claim")
|
||
claim_payload = {
|
||
"status": "claim_required",
|
||
"google_profile": _profile_payload(profile),
|
||
"resolution": resolution,
|
||
"target_user_id": target_user.id,
|
||
"mobile": normalized_mobile,
|
||
"email": profile.email,
|
||
"mobile_hint": mask_mobile(normalized_mobile),
|
||
"detail": "مالکیت شماره موبایل را تایید کنید تا ورود با گوگل تکمیل شود.",
|
||
}
|
||
update_google_flow(flow, claim_payload)
|
||
return _public_flow_payload(claim_payload)
|
||
|
||
if not username:
|
||
raise GoogleOAuthFlowError("نام کاربری الزامی است.", code="google_missing_username", status_code=400)
|
||
if User.objects.filter(username=username).exists():
|
||
raise GoogleOAuthFlowError("نام کاربری قبلاً استفاده شده است.", code="google_username_taken", status_code=400)
|
||
if student_id and len(str(student_id)) < 10:
|
||
raise GoogleOAuthFlowError("شماره دانشجویی باید حداقل ۱۰ رقم باشد.", code="google_invalid_student_id", status_code=400)
|
||
|
||
major_obj = _resolve_major(major)
|
||
if major and not major_obj:
|
||
raise GoogleOAuthFlowError("رشته انتخابی معتبر نیست.", code="google_invalid_major", status_code=400)
|
||
university_obj = _resolve_university(university)
|
||
if university and not university_obj:
|
||
raise GoogleOAuthFlowError("دانشگاه انتخابی معتبر نیست.", code="google_invalid_university", status_code=400)
|
||
|
||
if student_id and university_obj and User.objects.filter(university=university_obj, student_id=student_id).exists():
|
||
raise GoogleOAuthFlowError(
|
||
"این شماره دانشجویی در دانشگاه انتخابی قبلاً ثبت شده است.",
|
||
code="google_student_id_taken",
|
||
status_code=400,
|
||
)
|
||
|
||
conflict = User.objects.filter(mobile=normalized_mobile).first()
|
||
if conflict:
|
||
existing_email = normalize_email_identity(conflict.email)
|
||
if existing_email not in (None, profile.email):
|
||
raise GoogleOAuthFlowError(
|
||
"این شماره موبایل قبلاً به حساب دیگری متصل شده است.",
|
||
code="google_mobile_belongs_to_other_email",
|
||
status_code=409,
|
||
)
|
||
|
||
generate_and_send_otp(normalized_mobile, "google_claim")
|
||
claim_payload = {
|
||
"status": "claim_required",
|
||
"google_profile": _profile_payload(profile),
|
||
"resolution": "new_account",
|
||
"mobile": normalized_mobile,
|
||
"email": profile.email,
|
||
"username": username,
|
||
"student_id": student_id,
|
||
"year_of_study": year_of_study,
|
||
"major": major,
|
||
"university": university,
|
||
"first_name": first_name or profile.first_name,
|
||
"last_name": last_name or profile.last_name,
|
||
"detail": "کد تایید موبایل را وارد کنید تا حساب جدید شما ساخته شود.",
|
||
}
|
||
update_google_flow(flow, claim_payload)
|
||
return _public_flow_payload(claim_payload)
|
||
|
||
|
||
def send_google_claim_otp(flow: str) -> dict[str, str]:
|
||
flow_payload = get_google_flow_payload(flow)
|
||
if flow_payload.get("status") != "claim_required":
|
||
raise GoogleOAuthFlowError(
|
||
"Google sign-in flow is in an unexpected state.",
|
||
code="google_flow_invalid_state",
|
||
status_code=400,
|
||
)
|
||
mobile = flow_payload.get("mobile")
|
||
if not isinstance(mobile, str) or not mobile:
|
||
raise GoogleOAuthFlowError("Claim mobile number is missing.", status_code=400)
|
||
generate_and_send_otp(mobile, "google_claim")
|
||
return {"message": "کد تایید مجدداً ارسال شد."}
|
||
|
||
|
||
def _link_google_account(*, user: User, profile: GoogleProfile) -> None:
|
||
social = find_social_account_for_profile(profile)
|
||
if social and social.user_id != user.id:
|
||
raise GoogleOAuthFlowError(
|
||
"این حساب گوگل قبلاً به کاربر دیگری متصل شده است.",
|
||
code="google_already_linked",
|
||
status_code=409,
|
||
)
|
||
|
||
sync_user_from_google_profile(user, profile)
|
||
|
||
if social:
|
||
social.email = profile.email
|
||
social.email_verified = profile.email_verified
|
||
social.avatar_url = profile.avatar_url
|
||
social.is_active = True
|
||
social.save(update_fields=["email", "email_verified", "avatar_url", "is_active"])
|
||
return
|
||
|
||
UserSocialAccount.objects.create(
|
||
user=user,
|
||
provider=UserSocialAccount.ProviderType.GOOGLE,
|
||
provider_user_id=profile.provider_user_id,
|
||
email=profile.email,
|
||
email_verified=profile.email_verified,
|
||
avatar_url=profile.avatar_url,
|
||
is_active=True,
|
||
)
|
||
|
||
|
||
def verify_google_claim(flow: str, code: str) -> dict[str, Any]:
|
||
flow_payload = get_google_flow_payload(flow)
|
||
if flow_payload.get("status") != "claim_required":
|
||
raise GoogleOAuthFlowError(
|
||
"Google sign-in flow is in an unexpected state.",
|
||
code="google_flow_invalid_state",
|
||
status_code=400,
|
||
)
|
||
|
||
mobile = flow_payload.get("mobile")
|
||
if not isinstance(mobile, str) or not mobile:
|
||
raise GoogleOAuthFlowError("Claim mobile number is missing.", status_code=400)
|
||
|
||
verify_otp_code(mobile=mobile, code=code, mode="google_claim")
|
||
profile = _profile_from_payload(flow_payload)
|
||
resolution = flow_payload.get("resolution", "new_account")
|
||
|
||
if resolution == "existing_email_claim":
|
||
target_user_id = flow_payload.get("target_user_id")
|
||
user = User.objects.filter(pk=target_user_id).first()
|
||
if user is None or normalize_email_identity(user.email) != profile.email:
|
||
raise GoogleOAuthFlowError(
|
||
"The matching account could not be verified.",
|
||
code="google_email_claim_failed",
|
||
status_code=409,
|
||
)
|
||
user.mobile = mobile
|
||
user.is_mobile_verified = True
|
||
user.is_email_verified = True
|
||
user.save(update_fields=["mobile", "is_mobile_verified", "is_email_verified"])
|
||
_link_google_account(user=user, profile=profile)
|
||
authenticated = build_authenticated_flow_payload(user)
|
||
update_google_flow(flow, authenticated)
|
||
return _public_flow_payload(authenticated)
|
||
|
||
normalized_email = normalize_email_identity(profile.email)
|
||
if normalized_email and User.objects.filter(email=normalized_email).exists():
|
||
raise GoogleOAuthFlowError(
|
||
"این ایمیل قبلاً به حساب دیگری متصل شده است.",
|
||
code="google_email_taken",
|
||
status_code=409,
|
||
)
|
||
|
||
payload = RegistrationPayload(
|
||
username=flow_payload.get("username", ""),
|
||
mobile=mobile,
|
||
password=_create_token(),
|
||
code=code,
|
||
email=normalized_email,
|
||
first_name=flow_payload.get("first_name") or profile.first_name,
|
||
last_name=flow_payload.get("last_name") or profile.last_name,
|
||
university=flow_payload.get("university"),
|
||
student_id=flow_payload.get("student_id"),
|
||
year_of_study=flow_payload.get("year_of_study"),
|
||
major=flow_payload.get("major"),
|
||
)
|
||
|
||
if not payload.username:
|
||
raise GoogleOAuthFlowError("نام کاربری برای ساخت حساب جدید الزامی است.", status_code=400)
|
||
|
||
major_obj = _resolve_major(payload.major)
|
||
university_obj = _resolve_university(payload.university)
|
||
user = User.objects.create_user(
|
||
username=payload.username,
|
||
email=payload.email,
|
||
mobile=payload.mobile,
|
||
password=None,
|
||
first_name=payload.first_name or "",
|
||
last_name=payload.last_name or "",
|
||
student_id=payload.student_id,
|
||
year_of_study=payload.year_of_study,
|
||
major=major_obj,
|
||
university=university_obj,
|
||
is_mobile_verified=True,
|
||
is_email_verified=bool(payload.email),
|
||
)
|
||
user.set_unusable_password()
|
||
user.save(update_fields=["password"])
|
||
_link_google_account(user=user, profile=profile)
|
||
authenticated = build_authenticated_flow_payload(user)
|
||
update_google_flow(flow, authenticated)
|
||
return _public_flow_payload(authenticated)
|