Files
Amirhossein Khalili b7b21a6cc6
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
feat(backend): migrate auth and notifications off email
2026-05-21 10:28:04 +03:30

307 lines
12 KiB
Python

import logging
from celery import group, shared_task
from django.conf import settings
from django.db.models import Q
from apps.events.models import Event, EventEmailLog, Registration
from apps.notifications.services import notify_user
from apps.users.email_identity import normalize_email_identity
from apps.users.models import User
from apps.users.tasks import send_critical_sms
logger = logging.getLogger(__name__)
def _build_context(*parts):
values = [str(part) for part in parts if part not in (None, "")]
return "|".join(values) if values else None
def _build_email_context(*parts):
return _build_context(*parts)
def _event_url(event: Event) -> str:
root = getattr(settings, "FRONTEND_ROOT", "/")
slug_or_id = getattr(event, "slug", None) or event.id
return f"{root}events/{slug_or_id}"
def _send_html_email(subject: str, html_body: str, to_email: str):
normalized_email = normalize_email_identity(to_email)
if not normalized_email:
return False
user = User.objects.filter(email=normalized_email).first()
if not user:
return False
notify_user(
user.id,
{
"type": "admin_message",
"title": subject,
"message": html_body[:500],
"level": "info",
},
)
return True
def _event_recipients(event, statuses=None):
qs = Registration.objects.filter(event=event, is_deleted=False).select_related("user", "event")
if statuses:
qs = qs.filter(status__in=statuses)
return qs
@shared_task(bind=True, max_retries=3)
def send_registration_confirmation_email(self, registration_pk: str):
try:
reg = Registration.objects.select_related("event", "user").get(pk=registration_pk)
notify_user(
reg.user_id,
{
"type": "event_registration",
"title": f"ثبت‌نام شما در {reg.event.title}",
"message": "ثبت‌نام شما تایید شد.",
"level": "success",
"action_url": f"/events/{reg.event.slug}",
"entity_type": "event",
"entity_id": reg.event_id,
"meta": {"ticket_id": str(reg.ticket_id)},
},
)
logger.info("Registration confirmation notification sent to user=%s event=%s", reg.user_id, reg.event_id)
return {"sent": True}
except Exception as exc:
logger.error("Failed to send registration confirmation notification: %s", exc)
raise self.retry(exc=exc, countdown=60)
@shared_task(bind=True, max_retries=3)
def send_registration_cancellation_email(self, registration_pk: str):
try:
reg = Registration.objects.select_related("event", "user").get(pk=registration_pk)
notify_user(
reg.user_id,
{
"type": "event_cancellation",
"title": f"لغو ثبت‌نام در {reg.event.title}",
"message": "ثبت‌نام شما در این رویداد لغو شد.",
"level": "warning",
"action_url": f"/events/{reg.event.slug}",
"entity_type": "event",
"entity_id": reg.event_id,
},
)
if reg.user.mobile and reg.user.is_mobile_verified:
send_critical_sms.delay(reg.user.mobile, "event_cancellation", reg.event.title)
logger.info("Registration cancellation delivered to user=%s event=%s", reg.user_id, reg.event_id)
return {"sent": True}
except Exception as exc:
logger.error("Failed to send registration cancellation notification: %s", exc)
raise self.retry(exc=exc, countdown=60)
@shared_task(bind=True)
def send_event_reminder_task(self, event_id: int):
event = Event.objects.get(pk=event_id)
regs = _event_recipients(event, statuses=[Registration.StatusChoices.CONFIRMED, Registration.StatusChoices.ATTENDED])
reg_ids = list(regs.values_list("id", flat=True))
job = group(send_event_reminder_to_user.s(event_id, rid) for rid in reg_ids)
res = job.apply_async()
logger.info('Queued %s event reminders for event "%s" (group_id=%s)', len(reg_ids), event.title, res.id)
return {"event_id": event_id, "queued": len(reg_ids), "group_id": res.id}
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 3}, soft_time_limit=30, time_limit=45)
def send_event_reminder_to_user(self, event_id: int, registration_id: int):
log = None
try:
registration = Registration.objects.select_related("user", "event").get(pk=registration_id)
event = registration.event
user = registration.user
context_key = _build_context("event_reminder", event.slug or event.id, event.start_time)
log, skip = EventEmailLog.claim(
event_id=event_id,
user_id=user.id,
kind=EventEmailLog.KIND_EVENT_REMINDER,
context=context_key,
)
if skip:
return {"skipped": True, "status": log.status}
notify_user(
user.id,
{
"type": "event_reminder",
"title": f"یادآوری رویداد: {event.title}",
"message": "رویداد شما به‌زودی آغاز می‌شود.",
"level": "info",
"action_url": f"/events/{event.slug}",
"entity_type": "event",
"entity_id": event.id,
},
)
log.mark_sent()
return {"sent": True}
except Exception as exc:
if log:
log.mark_failed(str(exc))
raise
@shared_task(bind=True)
def queue_event_announcement(self, event_id: int, subject: str, body_html: str, statuses=None):
event = Event.objects.get(pk=event_id)
statuses = statuses or [Registration.StatusChoices.CONFIRMED, Registration.StatusChoices.ATTENDED, Registration.StatusChoices.PENDING]
regs = _event_recipients(event, statuses=statuses).distinct()
reg_ids = list(regs.values_list("id", flat=True))
job = group(send_event_announcement_to_user.s(event_id, rid, subject, body_html) for rid in reg_ids)
res = job.apply_async()
logger.info('Queued %s event announcements for event "%s" (group_id=%s)', len(reg_ids), event.title, res.id)
return {"event_id": event_id, "queued": len(reg_ids), "group_id": res.id}
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 3}, soft_time_limit=30, time_limit=45)
def send_event_announcement_to_user(self, event_id: int, registration_id: int, subject: str, body_html: str):
log = None
try:
registration = Registration.objects.select_related("user", "event").get(pk=registration_id)
user = registration.user
event = registration.event
context_key = _build_context("event_announcement", event.slug or event.id, subject, body_html)
log, skip = EventEmailLog.claim(
event_id=event_id,
user_id=user.id,
kind=EventEmailLog.KIND_EVENT_ANNOUNCEMENT3,
context=context_key,
)
if skip:
return {"skipped": True, "status": log.status}
notify_user(
user.id,
{
"type": "event_announcement",
"title": subject,
"message": body_html[:500],
"level": "info",
"action_url": f"/events/{event.slug}",
"entity_type": "event",
"entity_id": event.id,
},
)
log.mark_sent()
return {"sent": True}
except Exception as exc:
if log:
log.mark_failed(str(exc))
raise
@shared_task(bind=True)
def queue_invites_to_non_registered_users(self, event_id: int, only_verified=True, only_active=True):
event = Event.objects.get(pk=event_id)
qs = User.objects.all()
if only_active:
qs = qs.filter(is_active=True)
if only_verified:
qs = qs.filter(Q(mobile__isnull=False) | Q(email__isnull=False))
qs = qs.exclude(event_registrations__event_id=event_id).distinct()
user_ids = list(qs.values_list("id", flat=True))
job = group(send_invite_to_user.s(event_id, uid) for uid in user_ids)
res = job.apply_async()
return {"event_id": event_id, "queued": len(user_ids), "group_id": res.id}
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 3}, time_limit=60)
def send_invite_to_user(self, event_id: int, user_id: int):
event = Event.objects.get(pk=event_id)
user = User.objects.get(pk=user_id)
context_key = _build_context("invite_non_registered", event.slug or event.id, user.id)
log, skip = EventEmailLog.claim(
event_id=event_id,
user_id=user_id,
kind=EventEmailLog.KIND_INVITE_NON_REGISTERED,
context=context_key,
)
if skip:
return {"skipped": True, "status": log.status}
try:
notify_user(
user.id,
{
"type": "event_invitation",
"title": f"دعوت به رویداد {event.title}",
"message": "برای مشاهده جزئیات و ثبت‌نام وارد صفحه رویداد شوید.",
"level": "info",
"action_url": f"/events/{event.slug}",
"entity_type": "event",
"entity_id": event.id,
},
)
log.mark_sent()
return {"sent": True}
except Exception as exc:
log.mark_failed(str(exc))
raise
@shared_task(bind=True)
def queue_skyroom_credentials(self, event_id: int):
event = Event.objects.get(pk=event_id)
regs = _event_recipients(event, statuses=[Registration.StatusChoices.CONFIRMED]).distinct()
reg_ids = list(regs.values_list("id", flat=True))
job = group(send_skyroom_credentials_to_user.s(event_id, rid) for rid in reg_ids)
res = job.apply_async()
logger.info('Queued %s Skyroom credential notifications for event "%s" (group_id=%s)', len(reg_ids), event.title, res.id)
return {"event_id": event_id, "queued": len(reg_ids), "group_id": res.id}
@shared_task(bind=True)
def send_skyroom_credentials_individual_task(self, reg_id: int):
registration = Registration.objects.select_related("event", "user").get(pk=reg_id)
return send_skyroom_credentials_to_user.delay(registration.event_id, registration.id)
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 3}, soft_time_limit=30, time_limit=45)
def send_skyroom_credentials_to_user(self, event_id: int, registration_id: int):
log = None
try:
registration = Registration.objects.select_related("user", "event").get(pk=registration_id)
user = registration.user
event = registration.event
sky_username = (user.email or user.username or user.mobile or "").split("@")[0]
sky_password = str(registration.ticket_id or "")[:8]
context_key = _build_context("skyroom_credentials", event.slug or event.id, sky_username, sky_password, event.online_link)
log, skip = EventEmailLog.claim(
event_id=event_id,
user_id=user.id,
kind=EventEmailLog.KIND_SKYROOM_CREDENTIALS,
context=context_key,
)
if skip:
return {"skipped": True, "status": log.status}
notify_user(
user.id,
{
"type": "skyroom_credentials",
"title": f"اطلاعات دسترسی رویداد {event.title}",
"message": f"نام کاربری: {sky_username} | رمز عبور: {sky_password}",
"level": "info",
"action_url": _event_url(event),
"entity_type": "event",
"entity_id": event.id,
"meta": {
"online_link": event.online_link,
"username": sky_username,
"password": sky_password,
},
},
)
log.mark_sent()
return {"sent": True}
except Exception as exc:
if log:
log.mark_failed(str(exc))
raise