368 lines
12 KiB
Python
368 lines
12 KiB
Python
import json
|
|
import uuid
|
|
from datetime import timedelta
|
|
|
|
import redis
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
from django.utils.dateparse import parse_datetime
|
|
|
|
redis_client = redis.StrictRedis.from_url(settings.REDIS_URL, decode_responses=True)
|
|
|
|
|
|
def _isoformat_datetime(value) -> str:
|
|
if not value:
|
|
return timezone.now().isoformat()
|
|
if isinstance(value, str):
|
|
parsed = parse_datetime(value)
|
|
if parsed is not None:
|
|
value = parsed
|
|
else:
|
|
return value
|
|
if timezone.is_naive(value):
|
|
value = timezone.make_aware(value, timezone.get_current_timezone())
|
|
return timezone.localtime(value).isoformat()
|
|
|
|
|
|
class RedisNotificationStore:
|
|
USERS_KEY = "notif:users"
|
|
|
|
@classmethod
|
|
def _ids_key(cls, user_id: str) -> str:
|
|
return f"notif:{user_id}:ids"
|
|
|
|
@classmethod
|
|
def _data_key(cls, user_id: str) -> str:
|
|
return f"notif:{user_id}:data"
|
|
|
|
@classmethod
|
|
def _channel_key(cls, user_id: str) -> str:
|
|
prefix = settings.NOTIFICATION_REDIS_CHANNEL_PREFIX.rstrip(":")
|
|
return f"{prefix}:{user_id}"
|
|
|
|
@staticmethod
|
|
def _normalize_notification(data: dict | None) -> dict:
|
|
payload = dict(data or {})
|
|
return {
|
|
"id": str(payload.get("id") or uuid.uuid4()),
|
|
"type": payload.get("type") or "notification",
|
|
"title": payload.get("title") or "",
|
|
"message": payload.get("message") or "",
|
|
"level": payload.get("level") or "info",
|
|
"created_at": _isoformat_datetime(payload.get("created_at")),
|
|
"is_seen": bool(payload.get("is_seen", False)),
|
|
"delete_on_seen": bool(payload.get("delete_on_seen", False)),
|
|
"action_url": payload.get("action_url"),
|
|
"entity_type": payload.get("entity_type"),
|
|
"entity_id": payload.get("entity_id"),
|
|
"meta": payload.get("meta") or {},
|
|
}
|
|
|
|
@classmethod
|
|
def _publish_event(cls, user_id: str, event: str, data: dict) -> None:
|
|
if not settings.NOTIFICATIONS_ENABLED:
|
|
return
|
|
payload = {
|
|
"event": event,
|
|
"data": data,
|
|
}
|
|
redis_client.publish(
|
|
cls._channel_key(user_id),
|
|
json.dumps(payload, ensure_ascii=False, default=str),
|
|
)
|
|
|
|
@classmethod
|
|
def unread_count(cls, user_id: str, *, type_filter: str | None = None) -> int:
|
|
notifications, _ = cls.list(
|
|
user_id,
|
|
limit=settings.NOTIFICATION_MAX_PAGE_SIZE,
|
|
offset=0,
|
|
type_filter=type_filter,
|
|
paginate=False,
|
|
)
|
|
return sum(1 for notification in notifications if not notification.get("is_seen"))
|
|
|
|
@classmethod
|
|
def add(cls, user_id: str, payload: dict) -> dict:
|
|
data = cls._normalize_notification(payload)
|
|
created_at = parse_datetime(data["created_at"]) or timezone.now()
|
|
created_at_ts = created_at.timestamp()
|
|
json_str = json.dumps(data, ensure_ascii=False, default=str)
|
|
|
|
ids_key = cls._ids_key(user_id)
|
|
data_key = cls._data_key(user_id)
|
|
|
|
pipe = redis_client.pipeline()
|
|
pipe.zadd(ids_key, {data["id"]: created_at_ts})
|
|
pipe.hset(data_key, data["id"], json_str)
|
|
pipe.sadd(cls.USERS_KEY, user_id)
|
|
pipe.execute()
|
|
|
|
unread_count = cls.unread_count(user_id)
|
|
cls._publish_event(
|
|
user_id,
|
|
"notification",
|
|
{
|
|
"notification": data,
|
|
"unread_count": unread_count,
|
|
},
|
|
)
|
|
cls._publish_event(
|
|
user_id,
|
|
"unread_count",
|
|
{
|
|
"unread_count": unread_count,
|
|
},
|
|
)
|
|
return data
|
|
|
|
@classmethod
|
|
def list(
|
|
cls,
|
|
user_id: str,
|
|
*,
|
|
limit: int | None = None,
|
|
offset: int = 0,
|
|
type_filter: str | None = None,
|
|
paginate: bool = True,
|
|
) -> tuple[list[dict], int]:
|
|
ids_key = cls._ids_key(user_id)
|
|
data_key = cls._data_key(user_id)
|
|
|
|
ids = redis_client.zrevrange(ids_key, 0, -1)
|
|
if not ids:
|
|
return [], 0
|
|
|
|
pipe = redis_client.pipeline()
|
|
for notif_id in ids:
|
|
pipe.hget(data_key, notif_id)
|
|
raw_items = pipe.execute()
|
|
|
|
items: list[dict] = []
|
|
cleanup_ids: list[str] = []
|
|
for notif_id, raw in zip(ids, raw_items, strict=False):
|
|
if not raw:
|
|
cleanup_ids.append(notif_id)
|
|
continue
|
|
try:
|
|
data = cls._normalize_notification(json.loads(raw))
|
|
except json.JSONDecodeError:
|
|
cleanup_ids.append(notif_id)
|
|
continue
|
|
if type_filter and data.get("type") != type_filter:
|
|
continue
|
|
items.append(data)
|
|
|
|
if cleanup_ids:
|
|
redis_client.zrem(ids_key, *cleanup_ids)
|
|
redis_client.hdel(data_key, *cleanup_ids)
|
|
|
|
total_count = len(items)
|
|
if not paginate:
|
|
return items, total_count
|
|
|
|
safe_offset = max(offset, 0)
|
|
safe_limit = max(limit or settings.NOTIFICATION_DEFAULT_PAGE_SIZE, 1)
|
|
return items[safe_offset : safe_offset + safe_limit], total_count
|
|
|
|
@classmethod
|
|
def get(cls, user_id: str, notif_id: str) -> dict | None:
|
|
data_key = cls._data_key(user_id)
|
|
raw = redis_client.hget(data_key, notif_id)
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return cls._normalize_notification(json.loads(raw))
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
@classmethod
|
|
def delete(cls, user_id: str, notif_id: str) -> bool:
|
|
ids_key = cls._ids_key(user_id)
|
|
data_key = cls._data_key(user_id)
|
|
pipe = redis_client.pipeline()
|
|
pipe.zrem(ids_key, notif_id)
|
|
pipe.hdel(data_key, notif_id)
|
|
result = pipe.execute()
|
|
if any(result):
|
|
unread_count = cls.unread_count(user_id)
|
|
cls._publish_event(
|
|
user_id,
|
|
"notification_seen",
|
|
{
|
|
"notification_id": notif_id,
|
|
"deleted": True,
|
|
"unread_count": unread_count,
|
|
},
|
|
)
|
|
cls._publish_event(
|
|
user_id,
|
|
"unread_count",
|
|
{
|
|
"unread_count": unread_count,
|
|
},
|
|
)
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def clear_user(cls, user_id: str) -> int:
|
|
ids_key = cls._ids_key(user_id)
|
|
data_key = cls._data_key(user_id)
|
|
count = redis_client.zcard(ids_key)
|
|
pipe = redis_client.pipeline()
|
|
pipe.delete(ids_key)
|
|
pipe.delete(data_key)
|
|
pipe.srem(cls.USERS_KEY, user_id)
|
|
pipe.execute()
|
|
return int(count or 0)
|
|
|
|
@classmethod
|
|
def mark_seen(cls, user_id: str, notif_id: str) -> dict | None:
|
|
data = cls.get(user_id, notif_id)
|
|
if not data:
|
|
return None
|
|
|
|
if data.get("delete_on_seen"):
|
|
deleted = cls.delete(user_id, notif_id)
|
|
if deleted:
|
|
return {
|
|
"notification_id": notif_id,
|
|
"deleted": True,
|
|
"notification": None,
|
|
"unread_count": cls.unread_count(user_id),
|
|
}
|
|
return None
|
|
|
|
if not data.get("is_seen"):
|
|
data["is_seen"] = True
|
|
data_key = cls._data_key(user_id)
|
|
redis_client.hset(
|
|
data_key, notif_id, json.dumps(data, ensure_ascii=False, default=str)
|
|
)
|
|
|
|
unread_count = cls.unread_count(user_id)
|
|
payload = {
|
|
"notification_id": notif_id,
|
|
"deleted": False,
|
|
"notification": data,
|
|
"unread_count": unread_count,
|
|
}
|
|
cls._publish_event(user_id, "notification_seen", payload)
|
|
cls._publish_event(
|
|
user_id,
|
|
"unread_count",
|
|
{
|
|
"unread_count": unread_count,
|
|
},
|
|
)
|
|
return payload
|
|
|
|
@classmethod
|
|
def mark_all_seen(
|
|
cls,
|
|
user_id: str,
|
|
*,
|
|
delete_on_seen_only: bool = False,
|
|
type_filter: str | None = None,
|
|
) -> int:
|
|
ids_key = cls._ids_key(user_id)
|
|
data_key = cls._data_key(user_id)
|
|
ids = redis_client.zrevrange(ids_key, 0, -1)
|
|
if not ids:
|
|
return 0
|
|
|
|
updated = 0
|
|
pipe = redis_client.pipeline()
|
|
for notif_id in ids:
|
|
raw = redis_client.hget(data_key, notif_id)
|
|
if not raw:
|
|
continue
|
|
try:
|
|
data = cls._normalize_notification(json.loads(raw))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if type_filter and data.get("type") != type_filter:
|
|
continue
|
|
if delete_on_seen_only and not data.get("delete_on_seen"):
|
|
continue
|
|
|
|
if data.get("delete_on_seen"):
|
|
pipe.zrem(ids_key, notif_id)
|
|
pipe.hdel(data_key, notif_id)
|
|
else:
|
|
data["is_seen"] = True
|
|
pipe.hset(
|
|
data_key,
|
|
notif_id,
|
|
json.dumps(data, ensure_ascii=False, default=str),
|
|
)
|
|
updated += 1
|
|
|
|
if updated:
|
|
pipe.execute()
|
|
unread_count = cls.unread_count(user_id, type_filter=type_filter)
|
|
cls._publish_event(
|
|
user_id,
|
|
"notification_mark_all_read",
|
|
{
|
|
"type": type_filter,
|
|
"unread_count": unread_count,
|
|
},
|
|
)
|
|
cls._publish_event(
|
|
user_id,
|
|
"unread_count",
|
|
{
|
|
"unread_count": cls.unread_count(user_id),
|
|
},
|
|
)
|
|
return updated
|
|
|
|
@classmethod
|
|
def get_pubsub(cls):
|
|
return redis_client.pubsub(ignore_subscribe_messages=True)
|
|
|
|
@classmethod
|
|
def cleanup_expired(cls, retention_days: int = 30) -> int:
|
|
cutoff_ts = (timezone.now() - timedelta(days=retention_days)).timestamp()
|
|
removed = 0
|
|
user_ids = redis_client.smembers(cls.USERS_KEY)
|
|
for user_id in user_ids:
|
|
ids_key = cls._ids_key(user_id)
|
|
data_key = cls._data_key(user_id)
|
|
old_ids = redis_client.zrangebyscore(ids_key, "-inf", cutoff_ts)
|
|
if not old_ids:
|
|
continue
|
|
|
|
pipe = redis_client.pipeline()
|
|
user_removed = 0
|
|
for notif_id in old_ids:
|
|
raw = redis_client.hget(data_key, notif_id)
|
|
if not raw:
|
|
pipe.zrem(ids_key, notif_id)
|
|
pipe.hdel(data_key, notif_id)
|
|
removed += 1
|
|
user_removed += 1
|
|
continue
|
|
try:
|
|
data = cls._normalize_notification(json.loads(raw))
|
|
except json.JSONDecodeError:
|
|
pipe.zrem(ids_key, notif_id)
|
|
pipe.hdel(data_key, notif_id)
|
|
removed += 1
|
|
user_removed += 1
|
|
continue
|
|
if data.get("delete_on_seen"):
|
|
continue
|
|
pipe.zrem(ids_key, notif_id)
|
|
pipe.hdel(data_key, notif_id)
|
|
removed += 1
|
|
user_removed += 1
|
|
if user_removed:
|
|
pipe.execute()
|
|
|
|
if redis_client.zcard(ids_key) == 0:
|
|
redis_client.srem(cls.USERS_KEY, user_id)
|
|
return removed
|