import json import uuid from datetime import timedelta import redis from django.conf import settings from django.utils import timezone from django.utils.dateparse import parse_datetime redis_client = redis.StrictRedis.from_url(settings.REDIS_URL, decode_responses=True) def _isoformat_datetime(value) -> str: if not value: return timezone.now().isoformat() if isinstance(value, str): parsed = parse_datetime(value) if parsed is not None: value = parsed else: return value if timezone.is_naive(value): value = timezone.make_aware(value, timezone.get_current_timezone()) return timezone.localtime(value).isoformat() class RedisNotificationStore: USERS_KEY = "notif:users" @classmethod def _ids_key(cls, user_id: str) -> str: return f"notif:{user_id}:ids" @classmethod def _data_key(cls, user_id: str) -> str: return f"notif:{user_id}:data" @classmethod def _channel_key(cls, user_id: str) -> str: prefix = settings.NOTIFICATION_REDIS_CHANNEL_PREFIX.rstrip(":") return f"{prefix}:{user_id}" @staticmethod def _normalize_notification(data: dict | None) -> dict: payload = dict(data or {}) return { "id": str(payload.get("id") or uuid.uuid4()), "type": payload.get("type") or "notification", "title": payload.get("title") or "", "message": payload.get("message") or "", "level": payload.get("level") or "info", "created_at": _isoformat_datetime(payload.get("created_at")), "is_seen": bool(payload.get("is_seen", False)), "delete_on_seen": bool(payload.get("delete_on_seen", False)), "action_url": payload.get("action_url"), "entity_type": payload.get("entity_type"), "entity_id": payload.get("entity_id"), "meta": payload.get("meta") or {}, } @classmethod def _publish_event(cls, user_id: str, event: str, data: dict) -> None: if not settings.NOTIFICATIONS_ENABLED: return payload = { "event": event, "data": data, } redis_client.publish( cls._channel_key(user_id), json.dumps(payload, ensure_ascii=False, default=str), ) @classmethod def unread_count(cls, user_id: str, *, type_filter: str | None = None) -> int: notifications, _ = cls.list( user_id, limit=settings.NOTIFICATION_MAX_PAGE_SIZE, offset=0, type_filter=type_filter, paginate=False, ) return sum(1 for notification in notifications if not notification.get("is_seen")) @classmethod def add(cls, user_id: str, payload: dict) -> dict: data = cls._normalize_notification(payload) created_at = parse_datetime(data["created_at"]) or timezone.now() created_at_ts = created_at.timestamp() json_str = json.dumps(data, ensure_ascii=False, default=str) ids_key = cls._ids_key(user_id) data_key = cls._data_key(user_id) pipe = redis_client.pipeline() pipe.zadd(ids_key, {data["id"]: created_at_ts}) pipe.hset(data_key, data["id"], json_str) pipe.sadd(cls.USERS_KEY, user_id) pipe.execute() unread_count = cls.unread_count(user_id) cls._publish_event( user_id, "notification", { "notification": data, "unread_count": unread_count, }, ) cls._publish_event( user_id, "unread_count", { "unread_count": unread_count, }, ) return data @classmethod def list( cls, user_id: str, *, limit: int | None = None, offset: int = 0, type_filter: str | None = None, paginate: bool = True, ) -> tuple[list[dict], int]: ids_key = cls._ids_key(user_id) data_key = cls._data_key(user_id) ids = redis_client.zrevrange(ids_key, 0, -1) if not ids: return [], 0 pipe = redis_client.pipeline() for notif_id in ids: pipe.hget(data_key, notif_id) raw_items = pipe.execute() items: list[dict] = [] cleanup_ids: list[str] = [] for notif_id, raw in zip(ids, raw_items, strict=False): if not raw: cleanup_ids.append(notif_id) continue try: data = cls._normalize_notification(json.loads(raw)) except json.JSONDecodeError: cleanup_ids.append(notif_id) continue if type_filter and data.get("type") != type_filter: continue items.append(data) if cleanup_ids: redis_client.zrem(ids_key, *cleanup_ids) redis_client.hdel(data_key, *cleanup_ids) total_count = len(items) if not paginate: return items, total_count safe_offset = max(offset, 0) safe_limit = max(limit or settings.NOTIFICATION_DEFAULT_PAGE_SIZE, 1) return items[safe_offset : safe_offset + safe_limit], total_count @classmethod def get(cls, user_id: str, notif_id: str) -> dict | None: data_key = cls._data_key(user_id) raw = redis_client.hget(data_key, notif_id) if not raw: return None try: return cls._normalize_notification(json.loads(raw)) except json.JSONDecodeError: return None @classmethod def delete(cls, user_id: str, notif_id: str) -> bool: ids_key = cls._ids_key(user_id) data_key = cls._data_key(user_id) pipe = redis_client.pipeline() pipe.zrem(ids_key, notif_id) pipe.hdel(data_key, notif_id) result = pipe.execute() if any(result): unread_count = cls.unread_count(user_id) cls._publish_event( user_id, "notification_seen", { "notification_id": notif_id, "deleted": True, "unread_count": unread_count, }, ) cls._publish_event( user_id, "unread_count", { "unread_count": unread_count, }, ) return True return False @classmethod def mark_seen(cls, user_id: str, notif_id: str) -> dict | None: data = cls.get(user_id, notif_id) if not data: return None if data.get("delete_on_seen"): deleted = cls.delete(user_id, notif_id) if deleted: return { "notification_id": notif_id, "deleted": True, "notification": None, "unread_count": cls.unread_count(user_id), } return None if not data.get("is_seen"): data["is_seen"] = True data_key = cls._data_key(user_id) redis_client.hset( data_key, notif_id, json.dumps(data, ensure_ascii=False, default=str) ) unread_count = cls.unread_count(user_id) payload = { "notification_id": notif_id, "deleted": False, "notification": data, "unread_count": unread_count, } cls._publish_event(user_id, "notification_seen", payload) cls._publish_event( user_id, "unread_count", { "unread_count": unread_count, }, ) return payload @classmethod def mark_all_seen( cls, user_id: str, *, delete_on_seen_only: bool = False, type_filter: str | None = None, ) -> int: ids_key = cls._ids_key(user_id) data_key = cls._data_key(user_id) ids = redis_client.zrevrange(ids_key, 0, -1) if not ids: return 0 updated = 0 pipe = redis_client.pipeline() for notif_id in ids: raw = redis_client.hget(data_key, notif_id) if not raw: continue try: data = cls._normalize_notification(json.loads(raw)) except json.JSONDecodeError: continue if type_filter and data.get("type") != type_filter: continue if delete_on_seen_only and not data.get("delete_on_seen"): continue if data.get("delete_on_seen"): pipe.zrem(ids_key, notif_id) pipe.hdel(data_key, notif_id) else: data["is_seen"] = True pipe.hset( data_key, notif_id, json.dumps(data, ensure_ascii=False, default=str), ) updated += 1 if updated: pipe.execute() unread_count = cls.unread_count(user_id, type_filter=type_filter) cls._publish_event( user_id, "notification_mark_all_read", { "type": type_filter, "unread_count": unread_count, }, ) cls._publish_event( user_id, "unread_count", { "unread_count": cls.unread_count(user_id), }, ) return updated @classmethod def get_pubsub(cls): return redis_client.pubsub(ignore_subscribe_messages=True) @classmethod def cleanup_expired(cls, retention_days: int = 30) -> int: cutoff_ts = (timezone.now() - timedelta(days=retention_days)).timestamp() removed = 0 user_ids = redis_client.smembers(cls.USERS_KEY) for user_id in user_ids: ids_key = cls._ids_key(user_id) data_key = cls._data_key(user_id) old_ids = redis_client.zrangebyscore(ids_key, "-inf", cutoff_ts) if not old_ids: continue pipe = redis_client.pipeline() user_removed = 0 for notif_id in old_ids: raw = redis_client.hget(data_key, notif_id) if not raw: pipe.zrem(ids_key, notif_id) pipe.hdel(data_key, notif_id) removed += 1 user_removed += 1 continue try: data = cls._normalize_notification(json.loads(raw)) except json.JSONDecodeError: pipe.zrem(ids_key, notif_id) pipe.hdel(data_key, notif_id) removed += 1 user_removed += 1 continue if data.get("delete_on_seen"): continue pipe.zrem(ids_key, notif_id) pipe.hdel(data_key, notif_id) removed += 1 user_removed += 1 if user_removed: pipe.execute() if redis_client.zcard(ids_key) == 0: redis_client.srem(cls.USERS_KEY, user_id) return removed