Files
guilan-ace-backend/apps/notifications/services.py
Amirhossein Khalili b7b21a6cc6
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
feat(backend): migrate auth and notifications off email
2026-05-21 10:28:04 +03:30

277 lines
9.7 KiB
Python

from __future__ import annotations
import json
import uuid
from datetime import timedelta
import redis
from django.conf import settings
from django.utils import timezone
from django.utils.dateparse import parse_datetime
redis_client = redis.StrictRedis.from_url(settings.REDIS_URL, decode_responses=True)
def _isoformat_datetime(value) -> str:
if not value:
return timezone.now().isoformat()
if isinstance(value, str):
parsed = parse_datetime(value)
if parsed is not None:
value = parsed
else:
return value
if timezone.is_naive(value):
value = timezone.make_aware(value, timezone.get_current_timezone())
return timezone.localtime(value).isoformat()
class RedisNotificationStore:
USERS_KEY = "notif:users"
@classmethod
def _ids_key(cls, user_id: str) -> str:
return f"notif:{user_id}:ids"
@classmethod
def _data_key(cls, user_id: str) -> str:
return f"notif:{user_id}:data"
@classmethod
def _channel_key(cls, user_id: str) -> str:
prefix = settings.NOTIFICATION_REDIS_CHANNEL_PREFIX.rstrip(":")
return f"{prefix}:{user_id}"
@staticmethod
def _normalize_notification(data: dict | None) -> dict:
payload = dict(data or {})
return {
"id": str(payload.get("id") or uuid.uuid4()),
"type": payload.get("type") or "notification",
"title": payload.get("title") or "",
"message": payload.get("message") or "",
"level": payload.get("level") or "info",
"created_at": _isoformat_datetime(payload.get("created_at")),
"is_seen": bool(payload.get("is_seen", False)),
"delete_on_seen": bool(payload.get("delete_on_seen", False)),
"action_url": payload.get("action_url"),
"entity_type": payload.get("entity_type"),
"entity_id": payload.get("entity_id"),
"meta": payload.get("meta") or {},
}
@classmethod
def _publish_event(cls, user_id: str, event: str, data: dict) -> None:
if not settings.NOTIFICATIONS_ENABLED:
return
redis_client.publish(
cls._channel_key(user_id),
json.dumps({"event": event, "data": data}, ensure_ascii=False, default=str),
)
@classmethod
def unread_count(cls, user_id: str, *, type_filter: str | None = None) -> int:
notifications, _ = cls.list(
user_id,
limit=settings.NOTIFICATION_MAX_PAGE_SIZE,
offset=0,
type_filter=type_filter,
paginate=False,
)
return sum(1 for notification in notifications if not notification.get("is_seen"))
@classmethod
def add(cls, user_id: str, payload: dict) -> dict:
data = cls._normalize_notification(payload)
created_at = parse_datetime(data["created_at"]) or timezone.now()
created_at_ts = created_at.timestamp()
json_str = json.dumps(data, ensure_ascii=False, default=str)
ids_key = cls._ids_key(user_id)
data_key = cls._data_key(user_id)
pipe = redis_client.pipeline()
pipe.zadd(ids_key, {data["id"]: created_at_ts})
pipe.hset(data_key, data["id"], json_str)
pipe.sadd(cls.USERS_KEY, user_id)
pipe.execute()
unread_count = cls.unread_count(user_id)
cls._publish_event(user_id, "notification", {"notification": data, "unread_count": unread_count})
cls._publish_event(user_id, "unread_count", {"unread_count": unread_count})
return data
@classmethod
def list(
cls,
user_id: str,
*,
limit: int | None = None,
offset: int = 0,
type_filter: str | None = None,
paginate: bool = True,
) -> tuple[list[dict], int]:
ids_key = cls._ids_key(user_id)
data_key = cls._data_key(user_id)
ids = redis_client.zrevrange(ids_key, 0, -1)
if not ids:
return [], 0
pipe = redis_client.pipeline()
for notif_id in ids:
pipe.hget(data_key, notif_id)
raw_items = pipe.execute()
items: list[dict] = []
cleanup_ids: list[str] = []
for notif_id, raw in zip(ids, raw_items, strict=False):
if not raw:
cleanup_ids.append(notif_id)
continue
try:
data = cls._normalize_notification(json.loads(raw))
except json.JSONDecodeError:
cleanup_ids.append(notif_id)
continue
if type_filter and data.get("type") != type_filter:
continue
items.append(data)
if cleanup_ids:
redis_client.zrem(ids_key, *cleanup_ids)
redis_client.hdel(data_key, *cleanup_ids)
total_count = len(items)
if not paginate:
return items, total_count
safe_offset = max(offset, 0)
safe_limit = max(limit or settings.NOTIFICATION_DEFAULT_PAGE_SIZE, 1)
return items[safe_offset : safe_offset + safe_limit], total_count
@classmethod
def get(cls, user_id: str, notif_id: str) -> dict | None:
raw = redis_client.hget(cls._data_key(user_id), notif_id)
if not raw:
return None
try:
return cls._normalize_notification(json.loads(raw))
except json.JSONDecodeError:
return None
@classmethod
def delete(cls, user_id: str, notif_id: str) -> bool:
ids_key = cls._ids_key(user_id)
data_key = cls._data_key(user_id)
pipe = redis_client.pipeline()
pipe.zrem(ids_key, notif_id)
pipe.hdel(data_key, notif_id)
result = pipe.execute()
if any(result):
unread_count = cls.unread_count(user_id)
cls._publish_event(
user_id,
"notification_seen",
{"notification_id": notif_id, "deleted": True, "unread_count": unread_count},
)
cls._publish_event(user_id, "unread_count", {"unread_count": unread_count})
return True
return False
@classmethod
def mark_seen(cls, user_id: str, notif_id: str) -> dict | None:
data = cls.get(user_id, notif_id)
if not data:
return None
if data.get("delete_on_seen"):
deleted = cls.delete(user_id, notif_id)
if deleted:
return {
"notification_id": notif_id,
"deleted": True,
"notification": None,
"unread_count": cls.unread_count(user_id),
}
return None
if not data.get("is_seen"):
data["is_seen"] = True
redis_client.hset(
cls._data_key(user_id),
notif_id,
json.dumps(data, ensure_ascii=False, default=str),
)
unread_count = cls.unread_count(user_id)
payload = {
"notification_id": notif_id,
"deleted": False,
"notification": data,
"unread_count": unread_count,
}
cls._publish_event(user_id, "notification_seen", payload)
cls._publish_event(user_id, "unread_count", {"unread_count": unread_count})
return payload
@classmethod
def mark_all_seen(cls, user_id: str, *, type_filter: str | None = None) -> int:
ids = redis_client.zrevrange(cls._ids_key(user_id), 0, -1)
if not ids:
return 0
updated = 0
pipe = redis_client.pipeline()
for notif_id in ids:
raw = redis_client.hget(cls._data_key(user_id), notif_id)
if not raw:
continue
try:
data = cls._normalize_notification(json.loads(raw))
except json.JSONDecodeError:
continue
if type_filter and data.get("type") != type_filter:
continue
if data.get("delete_on_seen"):
pipe.zrem(cls._ids_key(user_id), notif_id)
pipe.hdel(cls._data_key(user_id), notif_id)
else:
data["is_seen"] = True
pipe.hset(
cls._data_key(user_id),
notif_id,
json.dumps(data, ensure_ascii=False, default=str),
)
updated += 1
if updated:
pipe.execute()
unread_count = cls.unread_count(user_id, type_filter=type_filter)
cls._publish_event(user_id, "notification_mark_all_read", {"type": type_filter, "unread_count": unread_count})
cls._publish_event(user_id, "unread_count", {"unread_count": cls.unread_count(user_id)})
return updated
@classmethod
def get_pubsub(cls):
return redis_client.pubsub(ignore_subscribe_messages=True)
@classmethod
def cleanup_expired(cls, retention_days: int | None = None) -> int:
days = retention_days or settings.NOTIFICATION_RETENTION_DAYS
cutoff_ts = (timezone.now() - timedelta(days=days)).timestamp()
removed = 0
for user_id in redis_client.smembers(cls.USERS_KEY):
ids_key = cls._ids_key(user_id)
data_key = cls._data_key(user_id)
old_ids = redis_client.zrangebyscore(ids_key, "-inf", cutoff_ts)
if not old_ids:
continue
pipe = redis_client.pipeline()
for notif_id in old_ids:
pipe.zrem(ids_key, notif_id)
pipe.hdel(data_key, notif_id)
removed += 1
pipe.execute()
if redis_client.zcard(ids_key) == 0:
redis_client.srem(cls.USERS_KEY, user_id)
return removed
def notify_user(user_id: int | str, payload: dict) -> dict:
return RedisNotificationStore.add(str(user_id), payload)