Files
Amirhossein Khalili b7b21a6cc6
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
feat(backend): migrate auth and notifications off email
2026-05-21 10:28:04 +03:30

175 lines
6.1 KiB
Python

from datetime import timedelta
from celery import shared_task
from django.contrib.auth import get_user_model
from django.utils import timezone
import logging
from apps.communications.models import Announcement
from apps.events.models import Event, Registration
from apps.notifications.services import notify_user
from apps.users.tasks import send_critical_sms
User = get_user_model()
logger = logging.getLogger(__name__)
def _audience_queryset(target_audience: str):
qs = User.objects.filter(is_active=True, is_deleted=False)
if target_audience == "committee":
return qs.filter(is_staff=True)
if target_audience == "members":
return qs
return qs
def _dispatch_announcement(announcement: Announcement) -> tuple[int, int]:
in_app_count = 0
sms_count = 0
users = _audience_queryset(announcement.target_audience)
action_url = "/announcements"
for user in users.iterator():
if announcement.send_email:
notify_user(
user.id,
{
"type": "announcement",
"title": announcement.title,
"message": announcement.content[:500],
"level": "warning" if announcement.priority == "urgent" else "info",
"action_url": action_url,
"entity_type": "announcement",
"entity_id": announcement.id,
},
)
in_app_count += 1
if announcement.send_push and user.mobile and user.is_mobile_verified:
send_critical_sms.delay(user.mobile, "event_reschedule", announcement.title)
sms_count += 1
if announcement.send_email:
announcement.email_sent = True
if announcement.send_push:
announcement.push_sent = True
announcement.save(update_fields=["email_sent", "push_sent"])
return in_app_count, sms_count
@shared_task(bind=True, max_retries=3)
def send_announcement_notifications(self, announcement_id):
try:
announcement = Announcement.objects.get(id=announcement_id)
in_app_count, sms_count = _dispatch_announcement(announcement)
logger.info(
"Announcement %s dispatched in_app=%s sms=%s",
announcement.id,
in_app_count,
sms_count,
)
return f"Announcement dispatched: {announcement.title}"
except Announcement.DoesNotExist:
logger.error("Announcement %s not found", announcement_id)
return f"Announcement {announcement_id} not found"
except Exception as exc:
logger.error("Failed to send announcement notifications: %s", exc)
raise self.retry(exc=exc, countdown=60)
@shared_task
def send_event_reminders():
try:
reminder_target = timezone.now() + timedelta(hours=24)
window = timedelta(minutes=30)
events = Event.objects.filter(
start_time__range=(reminder_target - window, reminder_target + window),
status="published",
is_deleted=False,
)
total_sent = 0
for event in events:
registrations = Registration.objects.filter(
event=event,
status=Registration.StatusChoices.CONFIRMED,
is_deleted=False,
).select_related("user")
for registration in registrations:
notify_user(
registration.user_id,
{
"type": "event_reminder",
"title": f"یادآوری رویداد: {event.title}",
"message": "رویداد شما در ۲۴ ساعت آینده برگزار می‌شود.",
"level": "info",
"action_url": f"/events/{event.slug}",
"entity_type": "event",
"entity_id": event.id,
},
)
total_sent += 1
logger.info("Event reminders sent to %s users", total_sent)
return f"Event reminders sent to {total_sent} users"
except Exception as exc:
logger.error("Failed to send event reminders: %s", exc)
raise
@shared_task
def send_weekly_newsletter():
logger.info("Weekly newsletter task skipped because newsletter delivery has been removed.")
return "Newsletter delivery removed"
@shared_task
def cleanup_expired_tokens():
logger.info("Newsletter cleanup skipped because newsletter delivery has been removed.")
return "Newsletter delivery removed"
@shared_task
def send_bulk_announcement(announcement_id, recipient_emails):
try:
announcement = Announcement.objects.get(id=announcement_id)
users = User.objects.filter(email__in=recipient_emails, is_active=True)
total = 0
for user in users.iterator():
notify_user(
user.id,
{
"type": "announcement",
"title": announcement.title,
"message": announcement.content[:500],
"level": "warning" if announcement.priority == "urgent" else "info",
"entity_type": "announcement",
"entity_id": announcement.id,
},
)
total += 1
return f"Bulk announcement sent to {total} users"
except Exception as exc:
logger.error("Failed to send bulk announcement: %s", exc)
raise
@shared_task
def process_scheduled_announcements():
try:
now = timezone.now()
scheduled_announcements = Announcement.objects.filter(
is_published=True,
publish_date__lte=now,
email_sent=False,
send_email=True,
is_deleted=False,
)
processed_count = 0
for announcement in scheduled_announcements:
send_announcement_notifications.delay(announcement.id)
processed_count += 1
logger.info("Processed %s scheduled announcements", processed_count)
return f"Processed {processed_count} scheduled announcements"
except Exception as exc:
logger.error("Failed to process scheduled announcements: %s", exc)
raise