import logging from celery import group, shared_task from django.conf import settings from django.db.models import Q from apps.events.models import Event, EventEmailLog, Registration from apps.notifications.services import notify_user from apps.users.email_identity import normalize_email_identity from apps.users.models import User from apps.users.tasks import send_critical_sms logger = logging.getLogger(__name__) def _build_context(*parts): values = [str(part) for part in parts if part not in (None, "")] return "|".join(values) if values else None def _build_email_context(*parts): return _build_context(*parts) def _event_url(event: Event) -> str: root = getattr(settings, "FRONTEND_ROOT", "/") slug_or_id = getattr(event, "slug", None) or event.id return f"{root}events/{slug_or_id}" def _send_html_email(subject: str, html_body: str, to_email: str): normalized_email = normalize_email_identity(to_email) if not normalized_email: return False user = User.objects.filter(email=normalized_email).first() if not user: return False notify_user( user.id, { "type": "admin_message", "title": subject, "message": html_body[:500], "level": "info", }, ) return True def _event_recipients(event, statuses=None): qs = Registration.objects.filter(event=event, is_deleted=False).select_related("user", "event") if statuses: qs = qs.filter(status__in=statuses) return qs @shared_task(bind=True, max_retries=3) def send_registration_confirmation_email(self, registration_pk: str): try: reg = Registration.objects.select_related("event", "user").get(pk=registration_pk) notify_user( reg.user_id, { "type": "event_registration", "title": f"ثبت‌نام شما در {reg.event.title}", "message": "ثبت‌نام شما تایید شد.", "level": "success", "action_url": f"/events/{reg.event.slug}", "entity_type": "event", "entity_id": reg.event_id, "meta": {"ticket_id": str(reg.ticket_id)}, }, ) logger.info("Registration confirmation notification sent to user=%s event=%s", reg.user_id, reg.event_id) return {"sent": True} except Exception as exc: logger.error("Failed to send registration confirmation notification: %s", exc) raise self.retry(exc=exc, countdown=60) @shared_task(bind=True, max_retries=3) def send_registration_cancellation_email(self, registration_pk: str): try: reg = Registration.objects.select_related("event", "user").get(pk=registration_pk) notify_user( reg.user_id, { "type": "event_cancellation", "title": f"لغو ثبت‌نام در {reg.event.title}", "message": "ثبت‌نام شما در این رویداد لغو شد.", "level": "warning", "action_url": f"/events/{reg.event.slug}", "entity_type": "event", "entity_id": reg.event_id, }, ) if reg.user.mobile and reg.user.is_mobile_verified: send_critical_sms.delay(reg.user.mobile, "event_cancellation", reg.event.title) logger.info("Registration cancellation delivered to user=%s event=%s", reg.user_id, reg.event_id) return {"sent": True} except Exception as exc: logger.error("Failed to send registration cancellation notification: %s", exc) raise self.retry(exc=exc, countdown=60) @shared_task(bind=True) def send_event_reminder_task(self, event_id: int): event = Event.objects.get(pk=event_id) regs = _event_recipients(event, statuses=[Registration.StatusChoices.CONFIRMED, Registration.StatusChoices.ATTENDED]) reg_ids = list(regs.values_list("id", flat=True)) job = group(send_event_reminder_to_user.s(event_id, rid) for rid in reg_ids) res = job.apply_async() logger.info('Queued %s event reminders for event "%s" (group_id=%s)', len(reg_ids), event.title, res.id) return {"event_id": event_id, "queued": len(reg_ids), "group_id": res.id} @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 3}, soft_time_limit=30, time_limit=45) def send_event_reminder_to_user(self, event_id: int, registration_id: int): log = None try: registration = Registration.objects.select_related("user", "event").get(pk=registration_id) event = registration.event user = registration.user context_key = _build_context("event_reminder", event.slug or event.id, event.start_time) log, skip = EventEmailLog.claim( event_id=event_id, user_id=user.id, kind=EventEmailLog.KIND_EVENT_REMINDER, context=context_key, ) if skip: return {"skipped": True, "status": log.status} notify_user( user.id, { "type": "event_reminder", "title": f"یادآوری رویداد: {event.title}", "message": "رویداد شما به‌زودی آغاز می‌شود.", "level": "info", "action_url": f"/events/{event.slug}", "entity_type": "event", "entity_id": event.id, }, ) log.mark_sent() return {"sent": True} except Exception as exc: if log: log.mark_failed(str(exc)) raise @shared_task(bind=True) def queue_event_announcement(self, event_id: int, subject: str, body_html: str, statuses=None): event = Event.objects.get(pk=event_id) statuses = statuses or [Registration.StatusChoices.CONFIRMED, Registration.StatusChoices.ATTENDED, Registration.StatusChoices.PENDING] regs = _event_recipients(event, statuses=statuses).distinct() reg_ids = list(regs.values_list("id", flat=True)) job = group(send_event_announcement_to_user.s(event_id, rid, subject, body_html) for rid in reg_ids) res = job.apply_async() logger.info('Queued %s event announcements for event "%s" (group_id=%s)', len(reg_ids), event.title, res.id) return {"event_id": event_id, "queued": len(reg_ids), "group_id": res.id} @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 3}, soft_time_limit=30, time_limit=45) def send_event_announcement_to_user(self, event_id: int, registration_id: int, subject: str, body_html: str): log = None try: registration = Registration.objects.select_related("user", "event").get(pk=registration_id) user = registration.user event = registration.event context_key = _build_context("event_announcement", event.slug or event.id, subject, body_html) log, skip = EventEmailLog.claim( event_id=event_id, user_id=user.id, kind=EventEmailLog.KIND_EVENT_ANNOUNCEMENT3, context=context_key, ) if skip: return {"skipped": True, "status": log.status} notify_user( user.id, { "type": "event_announcement", "title": subject, "message": body_html[:500], "level": "info", "action_url": f"/events/{event.slug}", "entity_type": "event", "entity_id": event.id, }, ) log.mark_sent() return {"sent": True} except Exception as exc: if log: log.mark_failed(str(exc)) raise @shared_task(bind=True) def queue_invites_to_non_registered_users(self, event_id: int, only_verified=True, only_active=True): event = Event.objects.get(pk=event_id) qs = User.objects.all() if only_active: qs = qs.filter(is_active=True) if only_verified: qs = qs.filter(Q(mobile__isnull=False) | Q(email__isnull=False)) qs = qs.exclude(event_registrations__event_id=event_id).distinct() user_ids = list(qs.values_list("id", flat=True)) job = group(send_invite_to_user.s(event_id, uid) for uid in user_ids) res = job.apply_async() return {"event_id": event_id, "queued": len(user_ids), "group_id": res.id} @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 3}, time_limit=60) def send_invite_to_user(self, event_id: int, user_id: int): event = Event.objects.get(pk=event_id) user = User.objects.get(pk=user_id) context_key = _build_context("invite_non_registered", event.slug or event.id, user.id) log, skip = EventEmailLog.claim( event_id=event_id, user_id=user_id, kind=EventEmailLog.KIND_INVITE_NON_REGISTERED, context=context_key, ) if skip: return {"skipped": True, "status": log.status} try: notify_user( user.id, { "type": "event_invitation", "title": f"دعوت به رویداد {event.title}", "message": "برای مشاهده جزئیات و ثبت‌نام وارد صفحه رویداد شوید.", "level": "info", "action_url": f"/events/{event.slug}", "entity_type": "event", "entity_id": event.id, }, ) log.mark_sent() return {"sent": True} except Exception as exc: log.mark_failed(str(exc)) raise @shared_task(bind=True) def queue_skyroom_credentials(self, event_id: int): event = Event.objects.get(pk=event_id) regs = _event_recipients(event, statuses=[Registration.StatusChoices.CONFIRMED]).distinct() reg_ids = list(regs.values_list("id", flat=True)) job = group(send_skyroom_credentials_to_user.s(event_id, rid) for rid in reg_ids) res = job.apply_async() logger.info('Queued %s Skyroom credential notifications for event "%s" (group_id=%s)', len(reg_ids), event.title, res.id) return {"event_id": event_id, "queued": len(reg_ids), "group_id": res.id} @shared_task(bind=True) def send_skyroom_credentials_individual_task(self, reg_id: int): registration = Registration.objects.select_related("event", "user").get(pk=reg_id) return send_skyroom_credentials_to_user.delay(registration.event_id, registration.id) @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 3}, soft_time_limit=30, time_limit=45) def send_skyroom_credentials_to_user(self, event_id: int, registration_id: int): log = None try: registration = Registration.objects.select_related("user", "event").get(pk=registration_id) user = registration.user event = registration.event sky_username = (user.email or user.username or user.mobile or "").split("@")[0] sky_password = str(registration.ticket_id or "")[:8] context_key = _build_context("skyroom_credentials", event.slug or event.id, sky_username, sky_password, event.online_link) log, skip = EventEmailLog.claim( event_id=event_id, user_id=user.id, kind=EventEmailLog.KIND_SKYROOM_CREDENTIALS, context=context_key, ) if skip: return {"skipped": True, "status": log.status} notify_user( user.id, { "type": "skyroom_credentials", "title": f"اطلاعات دسترسی رویداد {event.title}", "message": f"نام کاربری: {sky_username} | رمز عبور: {sky_password}", "level": "info", "action_url": _event_url(event), "entity_type": "event", "entity_id": event.id, "meta": { "online_link": event.online_link, "username": sky_username, "password": sky_password, }, }, ) log.mark_sent() return {"sent": True} except Exception as exc: if log: log.mark_failed(str(exc)) raise