diff --git a/apps/users/management/__init__.py b/apps/users/management/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/apps/users/management/__init__.py @@ -0,0 +1 @@ + diff --git a/apps/users/management/commands/__init__.py b/apps/users/management/commands/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/apps/users/management/commands/__init__.py @@ -0,0 +1 @@ + diff --git a/apps/users/management/commands/audit_google_social_links.py b/apps/users/management/commands/audit_google_social_links.py new file mode 100644 index 0000000..9909f89 --- /dev/null +++ b/apps/users/management/commands/audit_google_social_links.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import json +from collections import defaultdict + +from django.core.management.base import BaseCommand + +from apps.users.email_identity import normalize_email_identity +from apps.users.models import User, UserSocialAccount + + +class Command(BaseCommand): + help = "Report suspicious Google social-account links without modifying data." + + def handle(self, *args, **options): + issues: list[dict] = [] + google_accounts = list( + UserSocialAccount.objects.select_related("user").filter( + provider=UserSocialAccount.ProviderType.GOOGLE + ) + ) + + social_email_groups: dict[str, set[str]] = defaultdict(set) + user_by_email = { + user.email: user + for user in User.objects.exclude(email__isnull=True).only("id", "mobile", "email") + } + + for account in google_accounts: + provider_email = normalize_email_identity(account.email) + user_email = normalize_email_identity(account.user.email) + + if provider_email: + social_email_groups[provider_email].add(str(account.user_id)) + + if user_email and provider_email and user_email != provider_email: + issues.append( + { + "type": "linked_user_email_mismatch", + "linked_user_id": str(account.user_id), + "linked_user_mobile": account.user.mobile, + "linked_user_email": user_email, + "social_account_id": str(account.id), + "provider_email": provider_email, + "provider_user_id": account.provider_user_id, + } + ) + + other_user = user_by_email.get(provider_email) if provider_email else None + if other_user and other_user.id != account.user_id: + issues.append( + { + "type": "provider_email_matches_other_user", + "linked_user_id": str(account.user_id), + "linked_user_mobile": account.user.mobile, + "linked_user_email": user_email, + "social_account_id": str(account.id), + "provider_email": provider_email, + "provider_user_id": account.provider_user_id, + "other_user_id": str(other_user.id), + "other_user_mobile": other_user.mobile, + "other_user_email": other_user.email, + } + ) + + for provider_email, user_ids in social_email_groups.items(): + if len(user_ids) <= 1: + continue + issues.append( + { + "type": "duplicate_provider_email_across_users", + "provider_email": provider_email, + "user_ids": sorted(user_ids), + } + ) + + if not issues: + self.stdout.write(self.style.SUCCESS("No suspicious Google social links found.")) + return + + for issue in issues: + self.stdout.write(json.dumps(issue, ensure_ascii=True, sort_keys=True)) + + self.stdout.write(self.style.WARNING(f"Reported {len(issues)} suspicious Google social link issue(s)."))