Files
Amirhossein Khalili b7b21a6cc6
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
feat(backend): migrate auth and notifications off email
2026-05-21 10:28:04 +03:30

157 lines
6.3 KiB
Python

import json
from typing import Iterator
from django.conf import settings
from django.core import signing
from django.http import JsonResponse, StreamingHttpResponse
from django.utils import timezone
from django.views import View
from ninja import Router
from apps.notifications.api.schemas import (
NotificationDeleteResponseSchema,
NotificationListSchema,
NotificationMarkAllReadResponseSchema,
NotificationSeenIn,
NotificationSeenResponseSchema,
NotificationStreamTokenResponseSchema,
)
from apps.notifications.services import RedisNotificationStore
from core.api.schemas import ErrorSchema
from core.authentication import jwt_auth
notifications_router = Router()
STREAM_TOKEN_SALT = "notifications.stream"
def _safe_int(value, default: int) -> int:
try:
return max(int(value), 0)
except (TypeError, ValueError):
return default
def _format_sse_event(event: str, data: dict) -> str:
payload = json.dumps(data, ensure_ascii=False, default=str)
return f"event: {event}\ndata: {payload}\n\n"
def _issue_stream_token_for_user(user_id: str) -> str:
return signing.dumps({"user_id": str(user_id), "type": "notification_stream"}, salt=STREAM_TOKEN_SALT)
def _validate_stream_token(token: str | None) -> str:
if not token:
raise signing.BadSignature("Missing stream token")
payload = signing.loads(
token,
salt=STREAM_TOKEN_SALT,
max_age=settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS,
)
if payload.get("type") != "notification_stream":
raise signing.BadSignature("Invalid stream token type")
return str(payload["user_id"])
@notifications_router.get("/", response=NotificationListSchema, auth=jwt_auth)
def list_notifications(request, limit: int | None = None, offset: int = 0, type: str | None = None):
user_id = str(request.auth.id)
safe_limit = min(_safe_int(limit, settings.NOTIFICATION_DEFAULT_PAGE_SIZE), settings.NOTIFICATION_MAX_PAGE_SIZE)
notifications, total_count = RedisNotificationStore.list(
user_id,
limit=safe_limit,
offset=_safe_int(offset, 0),
type_filter=type,
)
return {
"count": total_count,
"unread_count": RedisNotificationStore.unread_count(user_id),
"notifications": notifications,
}
@notifications_router.post("/mark-seen", response={200: NotificationSeenResponseSchema, 404: ErrorSchema}, auth=jwt_auth)
def mark_notification_seen(request, data: NotificationSeenIn):
payload = RedisNotificationStore.mark_seen(str(request.auth.id), data.id)
if payload is None:
return 404, {"error": "Notification not found."}
return 200, {"marked_read": True, **payload}
@notifications_router.delete("/{notif_id}", response={200: NotificationDeleteResponseSchema, 404: ErrorSchema}, auth=jwt_auth)
def delete_notification(request, notif_id: str):
deleted = RedisNotificationStore.delete(str(request.auth.id), notif_id)
if not deleted:
return 404, {"error": "Notification not found."}
return 200, {
"deleted": True,
"notification_id": notif_id,
"unread_count": RedisNotificationStore.unread_count(str(request.auth.id)),
}
@notifications_router.post("/mark-all-read", response=NotificationMarkAllReadResponseSchema, auth=jwt_auth)
def mark_all_notifications_read(request, type: str | None = None):
updated = RedisNotificationStore.mark_all_seen(str(request.auth.id), type_filter=type)
return {"marked_read": updated}
@notifications_router.post("/stream-token", response={200: NotificationStreamTokenResponseSchema, 503: ErrorSchema}, auth=jwt_auth)
def get_notification_stream_token(request):
if not settings.NOTIFICATIONS_ENABLED:
return 503, {"error": "Notifications are disabled."}
return {
"token": _issue_stream_token_for_user(str(request.auth.id)),
"expires_in": settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS,
}
class NotificationStreamView(View):
def _build_stream(self, user_id: str) -> Iterator[str]:
pubsub = RedisNotificationStore.get_pubsub()
channel = RedisNotificationStore._channel_key(user_id)
heartbeat_seconds = max(settings.NOTIFICATION_SSE_HEARTBEAT_SECONDS, 1)
initial_notifications, _ = RedisNotificationStore.list(
user_id,
limit=settings.NOTIFICATION_DEFAULT_PAGE_SIZE,
offset=0,
)
unread_count = RedisNotificationStore.unread_count(user_id)
yield f"retry: {settings.NOTIFICATION_SSE_RETRY_MS}\n\n"
yield _format_sse_event("connected", {"notifications": initial_notifications, "unread_count": unread_count})
yield _format_sse_event("unread_count", {"unread_count": unread_count})
pubsub.subscribe(channel)
last_ping_at = timezone.now()
try:
while True:
message = pubsub.get_message(timeout=1.0)
if message and message.get("type") == "message":
try:
payload = json.loads(message["data"])
except json.JSONDecodeError:
payload = {"event": "notification", "data": {}}
yield _format_sse_event(payload.get("event") or "notification", payload.get("data") or {})
if (timezone.now() - last_ping_at).total_seconds() >= heartbeat_seconds:
last_ping_at = timezone.now()
yield _format_sse_event("ping", {"timestamp": last_ping_at.isoformat()})
finally:
try:
pubsub.unsubscribe(channel)
finally:
pubsub.close()
def get(self, request, *args, **kwargs):
if not settings.NOTIFICATIONS_ENABLED:
return JsonResponse({"detail": "Notifications are disabled."}, status=503)
try:
user_id = _validate_stream_token(request.GET.get("token"))
except signing.SignatureExpired:
return JsonResponse({"detail": "Stream token expired."}, status=401)
except signing.BadSignature:
return JsonResponse({"detail": "Invalid stream token."}, status=401)
response = StreamingHttpResponse(self._build_stream(user_id), content_type="text/event-stream")
response["Cache-Control"] = "no-cache"
response["X-Accel-Buffering"] = "no"
return response