85 lines
3.3 KiB
Python
85 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from collections import defaultdict
|
|
|
|
from django.core.management.base import BaseCommand
|
|
|
|
from apps.users.email_identity import normalize_email_identity
|
|
from apps.users.models import User, UserSocialAccount
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Report suspicious Google social-account links without modifying data."
|
|
|
|
def handle(self, *args, **options):
|
|
issues: list[dict] = []
|
|
google_accounts = list(
|
|
UserSocialAccount.objects.select_related("user").filter(
|
|
provider=UserSocialAccount.ProviderType.GOOGLE
|
|
)
|
|
)
|
|
|
|
social_email_groups: dict[str, set[str]] = defaultdict(set)
|
|
user_by_email = {
|
|
user.email: user
|
|
for user in User.objects.exclude(email__isnull=True).only("id", "mobile", "email")
|
|
}
|
|
|
|
for account in google_accounts:
|
|
provider_email = normalize_email_identity(account.email)
|
|
user_email = normalize_email_identity(account.user.email)
|
|
|
|
if provider_email:
|
|
social_email_groups[provider_email].add(str(account.user_id))
|
|
|
|
if user_email and provider_email and user_email != provider_email:
|
|
issues.append(
|
|
{
|
|
"type": "linked_user_email_mismatch",
|
|
"linked_user_id": str(account.user_id),
|
|
"linked_user_mobile": account.user.mobile,
|
|
"linked_user_email": user_email,
|
|
"social_account_id": str(account.id),
|
|
"provider_email": provider_email,
|
|
"provider_user_id": account.provider_user_id,
|
|
}
|
|
)
|
|
|
|
other_user = user_by_email.get(provider_email) if provider_email else None
|
|
if other_user and other_user.id != account.user_id:
|
|
issues.append(
|
|
{
|
|
"type": "provider_email_matches_other_user",
|
|
"linked_user_id": str(account.user_id),
|
|
"linked_user_mobile": account.user.mobile,
|
|
"linked_user_email": user_email,
|
|
"social_account_id": str(account.id),
|
|
"provider_email": provider_email,
|
|
"provider_user_id": account.provider_user_id,
|
|
"other_user_id": str(other_user.id),
|
|
"other_user_mobile": other_user.mobile,
|
|
"other_user_email": other_user.email,
|
|
}
|
|
)
|
|
|
|
for provider_email, user_ids in social_email_groups.items():
|
|
if len(user_ids) <= 1:
|
|
continue
|
|
issues.append(
|
|
{
|
|
"type": "duplicate_provider_email_across_users",
|
|
"provider_email": provider_email,
|
|
"user_ids": sorted(user_ids),
|
|
}
|
|
)
|
|
|
|
if not issues:
|
|
self.stdout.write(self.style.SUCCESS("No suspicious Google social links found."))
|
|
return
|
|
|
|
for issue in issues:
|
|
self.stdout.write(json.dumps(issue, ensure_ascii=True, sort_keys=True))
|
|
|
|
self.stdout.write(self.style.WARNING(f"Reported {len(issues)} suspicious Google social link issue(s)."))
|