from dataclasses import dataclass from typing import Any from fastapi import WebSocket @dataclass class ClientConnection: websocket: WebSocket client_id: str class ConnectionManager: def __init__(self): self.active_connections: dict[str, list[ClientConnection]] = {} async def connect(self, websocket: WebSocket, room_id: str, client_id: str): await websocket.accept() self.active_connections.setdefault(room_id, []) self.active_connections[room_id].append( ClientConnection(websocket=websocket, client_id=client_id) ) def disconnect(self, websocket: WebSocket, room_id: str): if room_id not in self.active_connections: return self.active_connections[room_id] = [ connection for connection in self.active_connections[room_id] if connection.websocket is not websocket ] if not self.active_connections[room_id]: del self.active_connections[room_id] async def send_json(self, websocket: WebSocket, payload: dict[str, Any]): await websocket.send_json(payload) async def broadcast_json(self, payload: dict[str, Any], room_id: str): if room_id not in self.active_connections: return dead_connections: list[WebSocket] = [] for connection in list(self.active_connections[room_id]): try: await connection.websocket.send_json(payload) except Exception: dead_connections.append(connection.websocket) for websocket in dead_connections: self.disconnect(websocket, room_id) def room_users(self, room_id: str) -> list[str]: return [ connection.client_id for connection in self.active_connections.get(room_id, []) ] def rooms_overview(self) -> dict[str, Any]: return { "rooms": [ { "room_id": room_id, "online_count": len(connections), "users": [connection.client_id for connection in connections], } for room_id, connections in self.active_connections.items() ] } manager = ConnectionManager()