feat(users): add google social account audit command
This commit is contained in:
1
apps/users/management/commands/__init__.py
Normal file
1
apps/users/management/commands/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
84
apps/users/management/commands/audit_google_social_links.py
Normal file
84
apps/users/management/commands/audit_google_social_links.py
Normal file
@@ -0,0 +1,84 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from collections import defaultdict
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from apps.users.email_identity import normalize_email_identity
|
||||
from apps.users.models import User, UserSocialAccount
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Report suspicious Google social-account links without modifying data."
|
||||
|
||||
def handle(self, *args, **options):
|
||||
issues: list[dict] = []
|
||||
google_accounts = list(
|
||||
UserSocialAccount.objects.select_related("user").filter(
|
||||
provider=UserSocialAccount.ProviderType.GOOGLE
|
||||
)
|
||||
)
|
||||
|
||||
social_email_groups: dict[str, set[str]] = defaultdict(set)
|
||||
user_by_email = {
|
||||
user.email: user
|
||||
for user in User.objects.exclude(email__isnull=True).only("id", "mobile", "email")
|
||||
}
|
||||
|
||||
for account in google_accounts:
|
||||
provider_email = normalize_email_identity(account.email)
|
||||
user_email = normalize_email_identity(account.user.email)
|
||||
|
||||
if provider_email:
|
||||
social_email_groups[provider_email].add(str(account.user_id))
|
||||
|
||||
if user_email and provider_email and user_email != provider_email:
|
||||
issues.append(
|
||||
{
|
||||
"type": "linked_user_email_mismatch",
|
||||
"linked_user_id": str(account.user_id),
|
||||
"linked_user_mobile": account.user.mobile,
|
||||
"linked_user_email": user_email,
|
||||
"social_account_id": str(account.id),
|
||||
"provider_email": provider_email,
|
||||
"provider_user_id": account.provider_user_id,
|
||||
}
|
||||
)
|
||||
|
||||
other_user = user_by_email.get(provider_email) if provider_email else None
|
||||
if other_user and other_user.id != account.user_id:
|
||||
issues.append(
|
||||
{
|
||||
"type": "provider_email_matches_other_user",
|
||||
"linked_user_id": str(account.user_id),
|
||||
"linked_user_mobile": account.user.mobile,
|
||||
"linked_user_email": user_email,
|
||||
"social_account_id": str(account.id),
|
||||
"provider_email": provider_email,
|
||||
"provider_user_id": account.provider_user_id,
|
||||
"other_user_id": str(other_user.id),
|
||||
"other_user_mobile": other_user.mobile,
|
||||
"other_user_email": other_user.email,
|
||||
}
|
||||
)
|
||||
|
||||
for provider_email, user_ids in social_email_groups.items():
|
||||
if len(user_ids) <= 1:
|
||||
continue
|
||||
issues.append(
|
||||
{
|
||||
"type": "duplicate_provider_email_across_users",
|
||||
"provider_email": provider_email,
|
||||
"user_ids": sorted(user_ids),
|
||||
}
|
||||
)
|
||||
|
||||
if not issues:
|
||||
self.stdout.write(self.style.SUCCESS("No suspicious Google social links found."))
|
||||
return
|
||||
|
||||
for issue in issues:
|
||||
self.stdout.write(json.dumps(issue, ensure_ascii=True, sort_keys=True))
|
||||
|
||||
self.stdout.write(self.style.WARNING(f"Reported {len(issues)} suspicious Google social link issue(s)."))
|
||||
Reference in New Issue
Block a user