175 lines
6.1 KiB
Python
175 lines
6.1 KiB
Python
from datetime import timedelta
|
|
|
|
from celery import shared_task
|
|
from django.contrib.auth import get_user_model
|
|
from django.utils import timezone
|
|
|
|
import logging
|
|
|
|
from apps.communications.models import Announcement
|
|
from apps.events.models import Event, Registration
|
|
from apps.notifications.services import notify_user
|
|
from apps.users.tasks import send_critical_sms
|
|
|
|
User = get_user_model()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _audience_queryset(target_audience: str):
|
|
qs = User.objects.filter(is_active=True, is_deleted=False)
|
|
if target_audience == "committee":
|
|
return qs.filter(is_staff=True)
|
|
if target_audience == "members":
|
|
return qs
|
|
return qs
|
|
|
|
|
|
def _dispatch_announcement(announcement: Announcement) -> tuple[int, int]:
|
|
in_app_count = 0
|
|
sms_count = 0
|
|
users = _audience_queryset(announcement.target_audience)
|
|
action_url = "/announcements"
|
|
|
|
for user in users.iterator():
|
|
if announcement.send_email:
|
|
notify_user(
|
|
user.id,
|
|
{
|
|
"type": "announcement",
|
|
"title": announcement.title,
|
|
"message": announcement.content[:500],
|
|
"level": "warning" if announcement.priority == "urgent" else "info",
|
|
"action_url": action_url,
|
|
"entity_type": "announcement",
|
|
"entity_id": announcement.id,
|
|
},
|
|
)
|
|
in_app_count += 1
|
|
if announcement.send_push and user.mobile and user.is_mobile_verified:
|
|
send_critical_sms.delay(user.mobile, "event_reschedule", announcement.title)
|
|
sms_count += 1
|
|
|
|
if announcement.send_email:
|
|
announcement.email_sent = True
|
|
if announcement.send_push:
|
|
announcement.push_sent = True
|
|
announcement.save(update_fields=["email_sent", "push_sent"])
|
|
return in_app_count, sms_count
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def send_announcement_notifications(self, announcement_id):
|
|
try:
|
|
announcement = Announcement.objects.get(id=announcement_id)
|
|
in_app_count, sms_count = _dispatch_announcement(announcement)
|
|
logger.info(
|
|
"Announcement %s dispatched in_app=%s sms=%s",
|
|
announcement.id,
|
|
in_app_count,
|
|
sms_count,
|
|
)
|
|
return f"Announcement dispatched: {announcement.title}"
|
|
except Announcement.DoesNotExist:
|
|
logger.error("Announcement %s not found", announcement_id)
|
|
return f"Announcement {announcement_id} not found"
|
|
except Exception as exc:
|
|
logger.error("Failed to send announcement notifications: %s", exc)
|
|
raise self.retry(exc=exc, countdown=60)
|
|
|
|
|
|
@shared_task
|
|
def send_event_reminders():
|
|
try:
|
|
reminder_target = timezone.now() + timedelta(hours=24)
|
|
window = timedelta(minutes=30)
|
|
events = Event.objects.filter(
|
|
start_time__range=(reminder_target - window, reminder_target + window),
|
|
status="published",
|
|
is_deleted=False,
|
|
)
|
|
total_sent = 0
|
|
for event in events:
|
|
registrations = Registration.objects.filter(
|
|
event=event,
|
|
status=Registration.StatusChoices.CONFIRMED,
|
|
is_deleted=False,
|
|
).select_related("user")
|
|
for registration in registrations:
|
|
notify_user(
|
|
registration.user_id,
|
|
{
|
|
"type": "event_reminder",
|
|
"title": f"یادآوری رویداد: {event.title}",
|
|
"message": "رویداد شما در ۲۴ ساعت آینده برگزار میشود.",
|
|
"level": "info",
|
|
"action_url": f"/events/{event.slug}",
|
|
"entity_type": "event",
|
|
"entity_id": event.id,
|
|
},
|
|
)
|
|
total_sent += 1
|
|
logger.info("Event reminders sent to %s users", total_sent)
|
|
return f"Event reminders sent to {total_sent} users"
|
|
except Exception as exc:
|
|
logger.error("Failed to send event reminders: %s", exc)
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def send_weekly_newsletter():
|
|
logger.info("Weekly newsletter task skipped because newsletter delivery has been removed.")
|
|
return "Newsletter delivery removed"
|
|
|
|
|
|
@shared_task
|
|
def cleanup_expired_tokens():
|
|
logger.info("Newsletter cleanup skipped because newsletter delivery has been removed.")
|
|
return "Newsletter delivery removed"
|
|
|
|
|
|
@shared_task
|
|
def send_bulk_announcement(announcement_id, recipient_emails):
|
|
try:
|
|
announcement = Announcement.objects.get(id=announcement_id)
|
|
users = User.objects.filter(email__in=recipient_emails, is_active=True)
|
|
total = 0
|
|
for user in users.iterator():
|
|
notify_user(
|
|
user.id,
|
|
{
|
|
"type": "announcement",
|
|
"title": announcement.title,
|
|
"message": announcement.content[:500],
|
|
"level": "warning" if announcement.priority == "urgent" else "info",
|
|
"entity_type": "announcement",
|
|
"entity_id": announcement.id,
|
|
},
|
|
)
|
|
total += 1
|
|
return f"Bulk announcement sent to {total} users"
|
|
except Exception as exc:
|
|
logger.error("Failed to send bulk announcement: %s", exc)
|
|
raise
|
|
|
|
|
|
@shared_task
|
|
def process_scheduled_announcements():
|
|
try:
|
|
now = timezone.now()
|
|
scheduled_announcements = Announcement.objects.filter(
|
|
is_published=True,
|
|
publish_date__lte=now,
|
|
email_sent=False,
|
|
send_email=True,
|
|
is_deleted=False,
|
|
)
|
|
processed_count = 0
|
|
for announcement in scheduled_announcements:
|
|
send_announcement_notifications.delay(announcement.id)
|
|
processed_count += 1
|
|
logger.info("Processed %s scheduled announcements", processed_count)
|
|
return f"Processed {processed_count} scheduled announcements"
|
|
except Exception as exc:
|
|
logger.error("Failed to process scheduled announcements: %s", exc)
|
|
raise
|