WebSocket Chat Example
A chat application is the canonical example that demonstrates WebSocket broadcasting — sending one client's message to all other connected clients. This lesson builds a multi-room chat system with a ConnectionManager class.
Learning Focus
By the end of this lesson you can: manage a pool of WebSocket connections with a ConnectionManager, broadcast to all clients in a room, handle disconnection cleanup, and test multi-client scenarios.
ConnectionManager
app/ws/manager.py
from fastapi import WebSocket
class ConnectionManager:
def __init__(self) -> None:
# {room_id: [websocket, ...]}
self._rooms: dict[str, list[WebSocket]] = {}
async def connect(self, room_id: str, websocket: WebSocket) -> None:
await websocket.accept()
if room_id not in self._rooms:
self._rooms[room_id] = []
self._rooms[room_id].append(websocket)
def disconnect(self, room_id: str, websocket: WebSocket) -> None:
if room_id in self._rooms:
self._rooms[room_id].discard(websocket) if hasattr(
self._rooms[room_id], "discard"
) else None
try:
self._rooms[room_id].remove(websocket)
except ValueError:
pass
if not self._rooms[room_id]:
del self._rooms[room_id]
async def broadcast(self, room_id: str, message: dict, exclude: WebSocket | None = None) -> None:
for ws in list(self._rooms.get(room_id, [])):
if ws is exclude:
continue
try:
await ws.send_json(message)
except Exception:
self.disconnect(room_id, ws)
def active_connections(self, room_id: str) -> int:
return len(self._rooms.get(room_id, []))
Chat Route
app/routers/chat.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.ws.manager import ConnectionManager
router = APIRouter()
manager = ConnectionManager()
@router.websocket("/ws/chat/{room_id}")
async def chat_endpoint(room_id: str, websocket: WebSocket) -> None:
await manager.connect(room_id, websocket)
try:
# Announce arrival
await manager.broadcast(
room_id,
{"type": "system", "message": "A user joined the room"},
exclude=websocket,
)
# Send room info to new client
await websocket.send_json({
"type": "welcome",
"room": room_id,
"online": manager.active_connections(room_id),
})
while True:
data = await websocket.receive_json()
# Broadcast message to all in room
await manager.broadcast(
room_id,
{"type": "message", "text": data.get("text", ""), "room": room_id},
)
except WebSocketDisconnect:
manager.disconnect(room_id, websocket)
await manager.broadcast(
room_id,
{"type": "system", "message": "A user left the room"},
)
Adding Usernames
app/routers/chat.py
@router.websocket("/ws/chat/{room_id}")
async def chat_with_username(
room_id: str,
websocket: WebSocket,
username: str = "Anonymous",
) -> None:
await manager.connect(room_id, websocket)
try:
await manager.broadcast(room_id, {
"type": "system",
"message": f"{username} joined"
}, exclude=websocket)
while True:
data = await websocket.receive_json()
await manager.broadcast(room_id, {
"type": "message",
"user": username,
"text": data.get("text", ""),
"room": room_id,
})
except WebSocketDisconnect:
manager.disconnect(room_id, websocket)
await manager.broadcast(room_id, {
"type": "system",
"message": f"{username} left"
})
Scaling WebSockets Across Workers
With multiple Uvicorn workers, each process has its own ConnectionManager — clients on different workers can't communicate. Solution: use Redis pub/sub.
app/ws/redis_manager.py
import json
import asyncio
import redis.asyncio as aioredis
class RedisConnectionManager:
def __init__(self, redis_url: str) -> None:
self.redis = aioredis.from_url(redis_url)
self._local: dict[str, list] = {}
async def subscribe_and_forward(self, room_id: str) -> None:
pubsub = self.redis.pubsub()
await pubsub.subscribe(f"chat:{room_id}")
async for msg in pubsub.listen():
if msg["type"] == "message":
data = json.loads(msg["data"])
for ws in list(self._local.get(room_id, [])):
await ws.send_json(data)
async def publish(self, room_id: str, message: dict) -> None:
await self.redis.publish(f"chat:{room_id}", json.dumps(message))
Common Pitfalls
| Pitfall | Cause / Symptom | Fix |
|---|---|---|
| Memory leak from disconnected sockets | Not calling disconnect() on close | Always remove from _rooms in except WebSocketDisconnect |
| Broadcast to closed socket raises | Sending to a disconnected client | Wrap send_json in try/except inside broadcast |
| Race condition in room cleanup | Concurrent joins/leaves | Use asyncio.Lock around _rooms modification |
| All clients on same worker | Multi-worker deployment | Use Redis pub/sub to share messages across workers |
| Username injected from URL unvalidated | XSS risk via username broadcast | Sanitize username before broadcasting |
Hands-On Practice
test-chat.sh
uvicorn app.main:app --reload --port 8012
# Open two terminals, each connecting to the same room
wscat -c "ws://localhost:8012/ws/chat/room1?username=alice"
wscat -c "ws://localhost:8012/ws/chat/room1?username=bob"
# In alice's terminal, type a message and send:
# {"text": "Hello Bob!"}
# Bob sees: {"type": "message", "user": "alice", "text": "Hello Bob!", "room": "room1"}