157 lines
6.3 KiB
Python
157 lines
6.3 KiB
Python
import json
|
|
from typing import Iterator
|
|
|
|
from django.conf import settings
|
|
from django.core import signing
|
|
from django.http import JsonResponse, StreamingHttpResponse
|
|
from django.utils import timezone
|
|
from django.views import View
|
|
from ninja import Router
|
|
|
|
from apps.notifications.api.schemas import (
|
|
NotificationDeleteResponseSchema,
|
|
NotificationListSchema,
|
|
NotificationMarkAllReadResponseSchema,
|
|
NotificationSeenIn,
|
|
NotificationSeenResponseSchema,
|
|
NotificationStreamTokenResponseSchema,
|
|
)
|
|
from apps.notifications.services import RedisNotificationStore
|
|
from core.api.schemas import ErrorSchema
|
|
from core.authentication import jwt_auth
|
|
|
|
notifications_router = Router()
|
|
STREAM_TOKEN_SALT = "notifications.stream"
|
|
|
|
|
|
def _safe_int(value, default: int) -> int:
|
|
try:
|
|
return max(int(value), 0)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _format_sse_event(event: str, data: dict) -> str:
|
|
payload = json.dumps(data, ensure_ascii=False, default=str)
|
|
return f"event: {event}\ndata: {payload}\n\n"
|
|
|
|
|
|
def _issue_stream_token_for_user(user_id: str) -> str:
|
|
return signing.dumps({"user_id": str(user_id), "type": "notification_stream"}, salt=STREAM_TOKEN_SALT)
|
|
|
|
|
|
def _validate_stream_token(token: str | None) -> str:
|
|
if not token:
|
|
raise signing.BadSignature("Missing stream token")
|
|
payload = signing.loads(
|
|
token,
|
|
salt=STREAM_TOKEN_SALT,
|
|
max_age=settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS,
|
|
)
|
|
if payload.get("type") != "notification_stream":
|
|
raise signing.BadSignature("Invalid stream token type")
|
|
return str(payload["user_id"])
|
|
|
|
|
|
@notifications_router.get("/", response=NotificationListSchema, auth=jwt_auth)
|
|
def list_notifications(request, limit: int | None = None, offset: int = 0, type: str | None = None):
|
|
user_id = str(request.auth.id)
|
|
safe_limit = min(_safe_int(limit, settings.NOTIFICATION_DEFAULT_PAGE_SIZE), settings.NOTIFICATION_MAX_PAGE_SIZE)
|
|
notifications, total_count = RedisNotificationStore.list(
|
|
user_id,
|
|
limit=safe_limit,
|
|
offset=_safe_int(offset, 0),
|
|
type_filter=type,
|
|
)
|
|
return {
|
|
"count": total_count,
|
|
"unread_count": RedisNotificationStore.unread_count(user_id),
|
|
"notifications": notifications,
|
|
}
|
|
|
|
|
|
@notifications_router.post("/mark-seen", response={200: NotificationSeenResponseSchema, 404: ErrorSchema}, auth=jwt_auth)
|
|
def mark_notification_seen(request, data: NotificationSeenIn):
|
|
payload = RedisNotificationStore.mark_seen(str(request.auth.id), data.id)
|
|
if payload is None:
|
|
return 404, {"error": "Notification not found."}
|
|
return 200, {"marked_read": True, **payload}
|
|
|
|
|
|
@notifications_router.delete("/{notif_id}", response={200: NotificationDeleteResponseSchema, 404: ErrorSchema}, auth=jwt_auth)
|
|
def delete_notification(request, notif_id: str):
|
|
deleted = RedisNotificationStore.delete(str(request.auth.id), notif_id)
|
|
if not deleted:
|
|
return 404, {"error": "Notification not found."}
|
|
return 200, {
|
|
"deleted": True,
|
|
"notification_id": notif_id,
|
|
"unread_count": RedisNotificationStore.unread_count(str(request.auth.id)),
|
|
}
|
|
|
|
|
|
@notifications_router.post("/mark-all-read", response=NotificationMarkAllReadResponseSchema, auth=jwt_auth)
|
|
def mark_all_notifications_read(request, type: str | None = None):
|
|
updated = RedisNotificationStore.mark_all_seen(str(request.auth.id), type_filter=type)
|
|
return {"marked_read": updated}
|
|
|
|
|
|
@notifications_router.post("/stream-token", response={200: NotificationStreamTokenResponseSchema, 503: ErrorSchema}, auth=jwt_auth)
|
|
def get_notification_stream_token(request):
|
|
if not settings.NOTIFICATIONS_ENABLED:
|
|
return 503, {"error": "Notifications are disabled."}
|
|
return {
|
|
"token": _issue_stream_token_for_user(str(request.auth.id)),
|
|
"expires_in": settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS,
|
|
}
|
|
|
|
|
|
class NotificationStreamView(View):
|
|
def _build_stream(self, user_id: str) -> Iterator[str]:
|
|
pubsub = RedisNotificationStore.get_pubsub()
|
|
channel = RedisNotificationStore._channel_key(user_id)
|
|
heartbeat_seconds = max(settings.NOTIFICATION_SSE_HEARTBEAT_SECONDS, 1)
|
|
initial_notifications, _ = RedisNotificationStore.list(
|
|
user_id,
|
|
limit=settings.NOTIFICATION_DEFAULT_PAGE_SIZE,
|
|
offset=0,
|
|
)
|
|
unread_count = RedisNotificationStore.unread_count(user_id)
|
|
yield f"retry: {settings.NOTIFICATION_SSE_RETRY_MS}\n\n"
|
|
yield _format_sse_event("connected", {"notifications": initial_notifications, "unread_count": unread_count})
|
|
yield _format_sse_event("unread_count", {"unread_count": unread_count})
|
|
|
|
pubsub.subscribe(channel)
|
|
last_ping_at = timezone.now()
|
|
try:
|
|
while True:
|
|
message = pubsub.get_message(timeout=1.0)
|
|
if message and message.get("type") == "message":
|
|
try:
|
|
payload = json.loads(message["data"])
|
|
except json.JSONDecodeError:
|
|
payload = {"event": "notification", "data": {}}
|
|
yield _format_sse_event(payload.get("event") or "notification", payload.get("data") or {})
|
|
if (timezone.now() - last_ping_at).total_seconds() >= heartbeat_seconds:
|
|
last_ping_at = timezone.now()
|
|
yield _format_sse_event("ping", {"timestamp": last_ping_at.isoformat()})
|
|
finally:
|
|
try:
|
|
pubsub.unsubscribe(channel)
|
|
finally:
|
|
pubsub.close()
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
if not settings.NOTIFICATIONS_ENABLED:
|
|
return JsonResponse({"detail": "Notifications are disabled."}, status=503)
|
|
try:
|
|
user_id = _validate_stream_token(request.GET.get("token"))
|
|
except signing.SignatureExpired:
|
|
return JsonResponse({"detail": "Stream token expired."}, status=401)
|
|
except signing.BadSignature:
|
|
return JsonResponse({"detail": "Invalid stream token."}, status=401)
|
|
response = StreamingHttpResponse(self._build_stream(user_id), content_type="text/event-stream")
|
|
response["Cache-Control"] = "no-cache"
|
|
response["X-Accel-Buffering"] = "no"
|
|
return response
|