import json from typing import Iterator from django.conf import settings from django.core import signing from django.http import JsonResponse, StreamingHttpResponse from django.utils import timezone from django.views import View from ninja import Router from apps.notifications.api.schemas import ( NotificationDeleteResponseSchema, NotificationListSchema, NotificationMarkAllReadResponseSchema, NotificationSeenIn, NotificationSeenResponseSchema, NotificationStreamTokenResponseSchema, ) from apps.notifications.services import RedisNotificationStore from core.api.schemas import ErrorSchema from core.authentication import jwt_auth notifications_router = Router() STREAM_TOKEN_SALT = "notifications.stream" def _safe_int(value, default: int) -> int: try: return max(int(value), 0) except (TypeError, ValueError): return default def _format_sse_event(event: str, data: dict) -> str: payload = json.dumps(data, ensure_ascii=False, default=str) return f"event: {event}\ndata: {payload}\n\n" def _issue_stream_token_for_user(user_id: str) -> str: return signing.dumps({"user_id": str(user_id), "type": "notification_stream"}, salt=STREAM_TOKEN_SALT) def _validate_stream_token(token: str | None) -> str: if not token: raise signing.BadSignature("Missing stream token") payload = signing.loads( token, salt=STREAM_TOKEN_SALT, max_age=settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS, ) if payload.get("type") != "notification_stream": raise signing.BadSignature("Invalid stream token type") return str(payload["user_id"]) @notifications_router.get("/", response=NotificationListSchema, auth=jwt_auth) def list_notifications(request, limit: int | None = None, offset: int = 0, type: str | None = None): user_id = str(request.auth.id) safe_limit = min(_safe_int(limit, settings.NOTIFICATION_DEFAULT_PAGE_SIZE), settings.NOTIFICATION_MAX_PAGE_SIZE) notifications, total_count = RedisNotificationStore.list( user_id, limit=safe_limit, offset=_safe_int(offset, 0), type_filter=type, ) return { "count": total_count, "unread_count": RedisNotificationStore.unread_count(user_id), "notifications": notifications, } @notifications_router.post("/mark-seen", response={200: NotificationSeenResponseSchema, 404: ErrorSchema}, auth=jwt_auth) def mark_notification_seen(request, data: NotificationSeenIn): payload = RedisNotificationStore.mark_seen(str(request.auth.id), data.id) if payload is None: return 404, {"error": "Notification not found."} return 200, {"marked_read": True, **payload} @notifications_router.delete("/{notif_id}", response={200: NotificationDeleteResponseSchema, 404: ErrorSchema}, auth=jwt_auth) def delete_notification(request, notif_id: str): deleted = RedisNotificationStore.delete(str(request.auth.id), notif_id) if not deleted: return 404, {"error": "Notification not found."} return 200, { "deleted": True, "notification_id": notif_id, "unread_count": RedisNotificationStore.unread_count(str(request.auth.id)), } @notifications_router.post("/mark-all-read", response=NotificationMarkAllReadResponseSchema, auth=jwt_auth) def mark_all_notifications_read(request, type: str | None = None): updated = RedisNotificationStore.mark_all_seen(str(request.auth.id), type_filter=type) return {"marked_read": updated} @notifications_router.post("/stream-token", response={200: NotificationStreamTokenResponseSchema, 503: ErrorSchema}, auth=jwt_auth) def get_notification_stream_token(request): if not settings.NOTIFICATIONS_ENABLED: return 503, {"error": "Notifications are disabled."} return { "token": _issue_stream_token_for_user(str(request.auth.id)), "expires_in": settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS, } class NotificationStreamView(View): def _build_stream(self, user_id: str) -> Iterator[str]: pubsub = RedisNotificationStore.get_pubsub() channel = RedisNotificationStore._channel_key(user_id) heartbeat_seconds = max(settings.NOTIFICATION_SSE_HEARTBEAT_SECONDS, 1) initial_notifications, _ = RedisNotificationStore.list( user_id, limit=settings.NOTIFICATION_DEFAULT_PAGE_SIZE, offset=0, ) unread_count = RedisNotificationStore.unread_count(user_id) yield f"retry: {settings.NOTIFICATION_SSE_RETRY_MS}\n\n" yield _format_sse_event("connected", {"notifications": initial_notifications, "unread_count": unread_count}) yield _format_sse_event("unread_count", {"unread_count": unread_count}) pubsub.subscribe(channel) last_ping_at = timezone.now() try: while True: message = pubsub.get_message(timeout=1.0) if message and message.get("type") == "message": try: payload = json.loads(message["data"]) except json.JSONDecodeError: payload = {"event": "notification", "data": {}} yield _format_sse_event(payload.get("event") or "notification", payload.get("data") or {}) if (timezone.now() - last_ping_at).total_seconds() >= heartbeat_seconds: last_ping_at = timezone.now() yield _format_sse_event("ping", {"timestamp": last_ping_at.isoformat()}) finally: try: pubsub.unsubscribe(channel) finally: pubsub.close() def get(self, request, *args, **kwargs): if not settings.NOTIFICATIONS_ENABLED: return JsonResponse({"detail": "Notifications are disabled."}, status=503) try: user_id = _validate_stream_token(request.GET.get("token")) except signing.SignatureExpired: return JsonResponse({"detail": "Stream token expired."}, status=401) except signing.BadSignature: return JsonResponse({"detail": "Invalid stream token."}, status=401) response = StreamingHttpResponse(self._build_stream(user_id), content_type="text/event-stream") response["Cache-Control"] = "no-cache" response["X-Accel-Buffering"] = "no" return response