feat(backend): migrate auth and notifications off email
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled

This commit is contained in:
2026-05-21 10:28:04 +03:30
parent b4903f7cb1
commit b7b21a6cc6
35 changed files with 2784 additions and 1390 deletions

View File

@@ -0,0 +1 @@
"""Redis-backed in-app notifications."""

View File

@@ -0,0 +1 @@
"""Notifications API package."""

View File

@@ -0,0 +1,52 @@
from datetime import datetime
from typing import Any
from ninja import Schema
class NotificationSchema(Schema):
id: str
type: str
title: str
message: str
level: str
created_at: datetime | str
is_seen: bool
delete_on_seen: bool
action_url: str | None = None
entity_type: str | None = None
entity_id: int | str | None = None
meta: dict[str, Any] = {}
class NotificationListSchema(Schema):
count: int
unread_count: int
notifications: list[NotificationSchema]
class NotificationSeenIn(Schema):
id: str
class NotificationSeenResponseSchema(Schema):
marked_read: bool
notification_id: str | None = None
deleted: bool = False
notification: NotificationSchema | None = None
unread_count: int | None = None
class NotificationDeleteResponseSchema(Schema):
deleted: bool
notification_id: str | None = None
unread_count: int | None = None
class NotificationMarkAllReadResponseSchema(Schema):
marked_read: int
class NotificationStreamTokenResponseSchema(Schema):
token: str
expires_in: int

View File

@@ -0,0 +1,156 @@
import json
from typing import Iterator
from django.conf import settings
from django.core import signing
from django.http import JsonResponse, StreamingHttpResponse
from django.utils import timezone
from django.views import View
from ninja import Router
from apps.notifications.api.schemas import (
NotificationDeleteResponseSchema,
NotificationListSchema,
NotificationMarkAllReadResponseSchema,
NotificationSeenIn,
NotificationSeenResponseSchema,
NotificationStreamTokenResponseSchema,
)
from apps.notifications.services import RedisNotificationStore
from core.api.schemas import ErrorSchema
from core.authentication import jwt_auth
notifications_router = Router()
STREAM_TOKEN_SALT = "notifications.stream"
def _safe_int(value, default: int) -> int:
try:
return max(int(value), 0)
except (TypeError, ValueError):
return default
def _format_sse_event(event: str, data: dict) -> str:
payload = json.dumps(data, ensure_ascii=False, default=str)
return f"event: {event}\ndata: {payload}\n\n"
def _issue_stream_token_for_user(user_id: str) -> str:
return signing.dumps({"user_id": str(user_id), "type": "notification_stream"}, salt=STREAM_TOKEN_SALT)
def _validate_stream_token(token: str | None) -> str:
if not token:
raise signing.BadSignature("Missing stream token")
payload = signing.loads(
token,
salt=STREAM_TOKEN_SALT,
max_age=settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS,
)
if payload.get("type") != "notification_stream":
raise signing.BadSignature("Invalid stream token type")
return str(payload["user_id"])
@notifications_router.get("/", response=NotificationListSchema, auth=jwt_auth)
def list_notifications(request, limit: int | None = None, offset: int = 0, type: str | None = None):
user_id = str(request.auth.id)
safe_limit = min(_safe_int(limit, settings.NOTIFICATION_DEFAULT_PAGE_SIZE), settings.NOTIFICATION_MAX_PAGE_SIZE)
notifications, total_count = RedisNotificationStore.list(
user_id,
limit=safe_limit,
offset=_safe_int(offset, 0),
type_filter=type,
)
return {
"count": total_count,
"unread_count": RedisNotificationStore.unread_count(user_id),
"notifications": notifications,
}
@notifications_router.post("/mark-seen", response={200: NotificationSeenResponseSchema, 404: ErrorSchema}, auth=jwt_auth)
def mark_notification_seen(request, data: NotificationSeenIn):
payload = RedisNotificationStore.mark_seen(str(request.auth.id), data.id)
if payload is None:
return 404, {"error": "Notification not found."}
return 200, {"marked_read": True, **payload}
@notifications_router.delete("/{notif_id}", response={200: NotificationDeleteResponseSchema, 404: ErrorSchema}, auth=jwt_auth)
def delete_notification(request, notif_id: str):
deleted = RedisNotificationStore.delete(str(request.auth.id), notif_id)
if not deleted:
return 404, {"error": "Notification not found."}
return 200, {
"deleted": True,
"notification_id": notif_id,
"unread_count": RedisNotificationStore.unread_count(str(request.auth.id)),
}
@notifications_router.post("/mark-all-read", response=NotificationMarkAllReadResponseSchema, auth=jwt_auth)
def mark_all_notifications_read(request, type: str | None = None):
updated = RedisNotificationStore.mark_all_seen(str(request.auth.id), type_filter=type)
return {"marked_read": updated}
@notifications_router.post("/stream-token", response={200: NotificationStreamTokenResponseSchema, 503: ErrorSchema}, auth=jwt_auth)
def get_notification_stream_token(request):
if not settings.NOTIFICATIONS_ENABLED:
return 503, {"error": "Notifications are disabled."}
return {
"token": _issue_stream_token_for_user(str(request.auth.id)),
"expires_in": settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS,
}
class NotificationStreamView(View):
def _build_stream(self, user_id: str) -> Iterator[str]:
pubsub = RedisNotificationStore.get_pubsub()
channel = RedisNotificationStore._channel_key(user_id)
heartbeat_seconds = max(settings.NOTIFICATION_SSE_HEARTBEAT_SECONDS, 1)
initial_notifications, _ = RedisNotificationStore.list(
user_id,
limit=settings.NOTIFICATION_DEFAULT_PAGE_SIZE,
offset=0,
)
unread_count = RedisNotificationStore.unread_count(user_id)
yield f"retry: {settings.NOTIFICATION_SSE_RETRY_MS}\n\n"
yield _format_sse_event("connected", {"notifications": initial_notifications, "unread_count": unread_count})
yield _format_sse_event("unread_count", {"unread_count": unread_count})
pubsub.subscribe(channel)
last_ping_at = timezone.now()
try:
while True:
message = pubsub.get_message(timeout=1.0)
if message and message.get("type") == "message":
try:
payload = json.loads(message["data"])
except json.JSONDecodeError:
payload = {"event": "notification", "data": {}}
yield _format_sse_event(payload.get("event") or "notification", payload.get("data") or {})
if (timezone.now() - last_ping_at).total_seconds() >= heartbeat_seconds:
last_ping_at = timezone.now()
yield _format_sse_event("ping", {"timestamp": last_ping_at.isoformat()})
finally:
try:
pubsub.unsubscribe(channel)
finally:
pubsub.close()
def get(self, request, *args, **kwargs):
if not settings.NOTIFICATIONS_ENABLED:
return JsonResponse({"detail": "Notifications are disabled."}, status=503)
try:
user_id = _validate_stream_token(request.GET.get("token"))
except signing.SignatureExpired:
return JsonResponse({"detail": "Stream token expired."}, status=401)
except signing.BadSignature:
return JsonResponse({"detail": "Invalid stream token."}, status=401)
response = StreamingHttpResponse(self._build_stream(user_id), content_type="text/event-stream")
response["Cache-Control"] = "no-cache"
response["X-Accel-Buffering"] = "no"
return response

View File

@@ -0,0 +1,7 @@
from django.apps import AppConfig
class NotificationsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "apps.notifications"
label = "notifications"

View File

@@ -0,0 +1,276 @@
from __future__ import annotations
import json
import uuid
from datetime import timedelta
import redis
from django.conf import settings
from django.utils import timezone
from django.utils.dateparse import parse_datetime
redis_client = redis.StrictRedis.from_url(settings.REDIS_URL, decode_responses=True)
def _isoformat_datetime(value) -> str:
if not value:
return timezone.now().isoformat()
if isinstance(value, str):
parsed = parse_datetime(value)
if parsed is not None:
value = parsed
else:
return value
if timezone.is_naive(value):
value = timezone.make_aware(value, timezone.get_current_timezone())
return timezone.localtime(value).isoformat()
class RedisNotificationStore:
USERS_KEY = "notif:users"
@classmethod
def _ids_key(cls, user_id: str) -> str:
return f"notif:{user_id}:ids"
@classmethod
def _data_key(cls, user_id: str) -> str:
return f"notif:{user_id}:data"
@classmethod
def _channel_key(cls, user_id: str) -> str:
prefix = settings.NOTIFICATION_REDIS_CHANNEL_PREFIX.rstrip(":")
return f"{prefix}:{user_id}"
@staticmethod
def _normalize_notification(data: dict | None) -> dict:
payload = dict(data or {})
return {
"id": str(payload.get("id") or uuid.uuid4()),
"type": payload.get("type") or "notification",
"title": payload.get("title") or "",
"message": payload.get("message") or "",
"level": payload.get("level") or "info",
"created_at": _isoformat_datetime(payload.get("created_at")),
"is_seen": bool(payload.get("is_seen", False)),
"delete_on_seen": bool(payload.get("delete_on_seen", False)),
"action_url": payload.get("action_url"),
"entity_type": payload.get("entity_type"),
"entity_id": payload.get("entity_id"),
"meta": payload.get("meta") or {},
}
@classmethod
def _publish_event(cls, user_id: str, event: str, data: dict) -> None:
if not settings.NOTIFICATIONS_ENABLED:
return
redis_client.publish(
cls._channel_key(user_id),
json.dumps({"event": event, "data": data}, ensure_ascii=False, default=str),
)
@classmethod
def unread_count(cls, user_id: str, *, type_filter: str | None = None) -> int:
notifications, _ = cls.list(
user_id,
limit=settings.NOTIFICATION_MAX_PAGE_SIZE,
offset=0,
type_filter=type_filter,
paginate=False,
)
return sum(1 for notification in notifications if not notification.get("is_seen"))
@classmethod
def add(cls, user_id: str, payload: dict) -> dict:
data = cls._normalize_notification(payload)
created_at = parse_datetime(data["created_at"]) or timezone.now()
created_at_ts = created_at.timestamp()
json_str = json.dumps(data, ensure_ascii=False, default=str)
ids_key = cls._ids_key(user_id)
data_key = cls._data_key(user_id)
pipe = redis_client.pipeline()
pipe.zadd(ids_key, {data["id"]: created_at_ts})
pipe.hset(data_key, data["id"], json_str)
pipe.sadd(cls.USERS_KEY, user_id)
pipe.execute()
unread_count = cls.unread_count(user_id)
cls._publish_event(user_id, "notification", {"notification": data, "unread_count": unread_count})
cls._publish_event(user_id, "unread_count", {"unread_count": unread_count})
return data
@classmethod
def list(
cls,
user_id: str,
*,
limit: int | None = None,
offset: int = 0,
type_filter: str | None = None,
paginate: bool = True,
) -> tuple[list[dict], int]:
ids_key = cls._ids_key(user_id)
data_key = cls._data_key(user_id)
ids = redis_client.zrevrange(ids_key, 0, -1)
if not ids:
return [], 0
pipe = redis_client.pipeline()
for notif_id in ids:
pipe.hget(data_key, notif_id)
raw_items = pipe.execute()
items: list[dict] = []
cleanup_ids: list[str] = []
for notif_id, raw in zip(ids, raw_items, strict=False):
if not raw:
cleanup_ids.append(notif_id)
continue
try:
data = cls._normalize_notification(json.loads(raw))
except json.JSONDecodeError:
cleanup_ids.append(notif_id)
continue
if type_filter and data.get("type") != type_filter:
continue
items.append(data)
if cleanup_ids:
redis_client.zrem(ids_key, *cleanup_ids)
redis_client.hdel(data_key, *cleanup_ids)
total_count = len(items)
if not paginate:
return items, total_count
safe_offset = max(offset, 0)
safe_limit = max(limit or settings.NOTIFICATION_DEFAULT_PAGE_SIZE, 1)
return items[safe_offset : safe_offset + safe_limit], total_count
@classmethod
def get(cls, user_id: str, notif_id: str) -> dict | None:
raw = redis_client.hget(cls._data_key(user_id), notif_id)
if not raw:
return None
try:
return cls._normalize_notification(json.loads(raw))
except json.JSONDecodeError:
return None
@classmethod
def delete(cls, user_id: str, notif_id: str) -> bool:
ids_key = cls._ids_key(user_id)
data_key = cls._data_key(user_id)
pipe = redis_client.pipeline()
pipe.zrem(ids_key, notif_id)
pipe.hdel(data_key, notif_id)
result = pipe.execute()
if any(result):
unread_count = cls.unread_count(user_id)
cls._publish_event(
user_id,
"notification_seen",
{"notification_id": notif_id, "deleted": True, "unread_count": unread_count},
)
cls._publish_event(user_id, "unread_count", {"unread_count": unread_count})
return True
return False
@classmethod
def mark_seen(cls, user_id: str, notif_id: str) -> dict | None:
data = cls.get(user_id, notif_id)
if not data:
return None
if data.get("delete_on_seen"):
deleted = cls.delete(user_id, notif_id)
if deleted:
return {
"notification_id": notif_id,
"deleted": True,
"notification": None,
"unread_count": cls.unread_count(user_id),
}
return None
if not data.get("is_seen"):
data["is_seen"] = True
redis_client.hset(
cls._data_key(user_id),
notif_id,
json.dumps(data, ensure_ascii=False, default=str),
)
unread_count = cls.unread_count(user_id)
payload = {
"notification_id": notif_id,
"deleted": False,
"notification": data,
"unread_count": unread_count,
}
cls._publish_event(user_id, "notification_seen", payload)
cls._publish_event(user_id, "unread_count", {"unread_count": unread_count})
return payload
@classmethod
def mark_all_seen(cls, user_id: str, *, type_filter: str | None = None) -> int:
ids = redis_client.zrevrange(cls._ids_key(user_id), 0, -1)
if not ids:
return 0
updated = 0
pipe = redis_client.pipeline()
for notif_id in ids:
raw = redis_client.hget(cls._data_key(user_id), notif_id)
if not raw:
continue
try:
data = cls._normalize_notification(json.loads(raw))
except json.JSONDecodeError:
continue
if type_filter and data.get("type") != type_filter:
continue
if data.get("delete_on_seen"):
pipe.zrem(cls._ids_key(user_id), notif_id)
pipe.hdel(cls._data_key(user_id), notif_id)
else:
data["is_seen"] = True
pipe.hset(
cls._data_key(user_id),
notif_id,
json.dumps(data, ensure_ascii=False, default=str),
)
updated += 1
if updated:
pipe.execute()
unread_count = cls.unread_count(user_id, type_filter=type_filter)
cls._publish_event(user_id, "notification_mark_all_read", {"type": type_filter, "unread_count": unread_count})
cls._publish_event(user_id, "unread_count", {"unread_count": cls.unread_count(user_id)})
return updated
@classmethod
def get_pubsub(cls):
return redis_client.pubsub(ignore_subscribe_messages=True)
@classmethod
def cleanup_expired(cls, retention_days: int | None = None) -> int:
days = retention_days or settings.NOTIFICATION_RETENTION_DAYS
cutoff_ts = (timezone.now() - timedelta(days=days)).timestamp()
removed = 0
for user_id in redis_client.smembers(cls.USERS_KEY):
ids_key = cls._ids_key(user_id)
data_key = cls._data_key(user_id)
old_ids = redis_client.zrangebyscore(ids_key, "-inf", cutoff_ts)
if not old_ids:
continue
pipe = redis_client.pipeline()
for notif_id in old_ids:
pipe.zrem(ids_key, notif_id)
pipe.hdel(data_key, notif_id)
removed += 1
pipe.execute()
if redis_client.zcard(ids_key) == 0:
redis_client.srem(cls.USERS_KEY, user_id)
return removed
def notify_user(user_id: int | str, payload: dict) -> dict:
return RedisNotificationStore.add(str(user_id), payload)

View File

@@ -0,0 +1,14 @@
import logging
from celery import shared_task
from apps.notifications.services import RedisNotificationStore
logger = logging.getLogger(__name__)
@shared_task
def cleanup_notification_retention():
removed = RedisNotificationStore.cleanup_expired()
logger.info("Cleaned up %s expired notifications", removed)
return removed