import json from collections import defaultdict import pytest from apps.notifications.services import store as services from apps.notifications.services import RedisNotificationStore class FakePipeline: def __init__(self, client): self.client = client self.operations = [] def __getattr__(self, name): def wrapper(*args, **kwargs): self.operations.append((name, args, kwargs)) return self return wrapper def execute(self): results = [] for name, args, kwargs in self.operations: results.append(getattr(self.client, name)(*args, **kwargs)) self.operations.clear() return results class FakePubSub: def __init__(self): self.channels = [] self.messages = [] self.closed = False def subscribe(self, channel): self.channels.append(channel) def unsubscribe(self, channel): if channel in self.channels: self.channels.remove(channel) def get_message(self, timeout=1.0): if self.messages: return self.messages.pop(0) return None def close(self): self.closed = True class FakeRedis: def __init__(self): self.sorted_sets = defaultdict(dict) self.hashes = defaultdict(dict) self.sets = defaultdict(set) self.published = [] self.pubsub_instance = FakePubSub() def pipeline(self): return FakePipeline(self) def zadd(self, key, mapping): self.sorted_sets[key].update(mapping) return len(mapping) def hset(self, key, field, value): self.hashes[key][field] = value return 1 def sadd(self, key, *members): before = len(self.sets[key]) self.sets[key].update(members) return len(self.sets[key]) - before def zrevrange(self, key, start, stop): items = sorted( self.sorted_sets[key].items(), key=lambda item: (item[1], item[0]), reverse=True, ) if stop == -1: return [member for member, _ in items[start:]] return [member for member, _ in items[start : stop + 1]] def hget(self, key, field): return self.hashes[key].get(field) def zrem(self, key, *members): removed = 0 for member in members: if member in self.sorted_sets[key]: del self.sorted_sets[key][member] removed += 1 return removed def hdel(self, key, *fields): removed = 0 for field in fields: if field in self.hashes[key]: del self.hashes[key][field] removed += 1 return removed def smembers(self, key): return set(self.sets[key]) def srem(self, key, member): if member in self.sets[key]: self.sets[key].remove(member) return 1 return 0 def zrangebyscore(self, key, min_score, max_score): lower = float("-inf") if min_score == "-inf" else float(min_score) upper = float(max_score) return [ member for member, score in self.sorted_sets[key].items() if lower <= score <= upper ] def zcard(self, key): return len(self.sorted_sets[key]) def publish(self, channel, message): self.published.append((channel, json.loads(message))) return 1 def pubsub(self, ignore_subscribe_messages=True): return self.pubsub_instance @pytest.fixture() def fake_redis(monkeypatch): redis = FakeRedis() monkeypatch.setattr(services, "redis_client", redis) return redis def test_add_publishes_notification_and_unread_count(fake_redis, settings): settings.NOTIFICATIONS_ENABLED = True notification = RedisNotificationStore.add( "user-1", { "title": "Build finished", "message": "Your deploy completed.", "level": "success", }, ) assert notification["title"] == "Build finished" assert notification["message"] == "Your deploy completed." assert notification["level"] == "success" assert len(fake_redis.published) == 2 channel, payload = fake_redis.published[0] assert channel == f"{settings.NOTIFICATION_REDIS_CHANNEL_PREFIX}:user-1" assert payload["event"] == "notification" assert payload["data"]["notification"]["id"] == notification["id"] assert payload["data"]["unread_count"] == 1 def test_mark_seen_and_mark_all_seen_publish_sync_events(fake_redis, settings): settings.NOTIFICATIONS_ENABLED = True first = RedisNotificationStore.add("user-2", {"title": "First"}) second = RedisNotificationStore.add("user-2", {"title": "Second"}) fake_redis.published.clear() payload = RedisNotificationStore.mark_seen("user-2", first["id"]) assert payload["notification_id"] == first["id"] assert payload["deleted"] is False assert payload["notification"]["is_seen"] is True assert fake_redis.published[0][1]["event"] == "notification_seen" fake_redis.published.clear() updated = RedisNotificationStore.mark_all_seen("user-2") assert updated == 2 assert fake_redis.published[0][1]["event"] == "notification_mark_all_read" assert fake_redis.published[1][1]["event"] == "unread_count" assert fake_redis.published[1][1]["data"]["unread_count"] == 0 def test_list_returns_total_count_and_filtered_notifications(fake_redis): RedisNotificationStore.add("user-3", {"title": "General", "type": "general"}) RedisNotificationStore.add("user-3", {"title": "Billing", "type": "billing"}) RedisNotificationStore.add("user-3", {"title": "General 2", "type": "general"}) notifications, total_count = RedisNotificationStore.list( "user-3", limit=1, offset=0, type_filter="general", ) assert total_count == 2 assert len(notifications) == 1 assert notifications[0]["type"] == "general"