Skip to main content

WebSocket Chat Example

A chat application is the canonical example that demonstrates WebSocket broadcasting — sending one client's message to all other connected clients. This lesson builds a multi-room chat system with a ConnectionManager class.

Learning Focus

By the end of this lesson you can: manage a pool of WebSocket connections with a ConnectionManager, broadcast to all clients in a room, handle disconnection cleanup, and test multi-client scenarios.

ConnectionManager

app/ws/manager.py
from fastapi import WebSocket

class ConnectionManager:
def __init__(self) -> None:
# {room_id: [websocket, ...]}
self._rooms: dict[str, list[WebSocket]] = {}

async def connect(self, room_id: str, websocket: WebSocket) -> None:
await websocket.accept()
if room_id not in self._rooms:
self._rooms[room_id] = []
self._rooms[room_id].append(websocket)

def disconnect(self, room_id: str, websocket: WebSocket) -> None:
if room_id in self._rooms:
self._rooms[room_id].discard(websocket) if hasattr(
self._rooms[room_id], "discard"
) else None
try:
self._rooms[room_id].remove(websocket)
except ValueError:
pass
if not self._rooms[room_id]:
del self._rooms[room_id]

async def broadcast(self, room_id: str, message: dict, exclude: WebSocket | None = None) -> None:
for ws in list(self._rooms.get(room_id, [])):
if ws is exclude:
continue
try:
await ws.send_json(message)
except Exception:
self.disconnect(room_id, ws)

def active_connections(self, room_id: str) -> int:
return len(self._rooms.get(room_id, []))

Chat Route

app/routers/chat.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.ws.manager import ConnectionManager

router = APIRouter()
manager = ConnectionManager()

@router.websocket("/ws/chat/{room_id}")
async def chat_endpoint(room_id: str, websocket: WebSocket) -> None:
await manager.connect(room_id, websocket)
try:
# Announce arrival
await manager.broadcast(
room_id,
{"type": "system", "message": "A user joined the room"},
exclude=websocket,
)
# Send room info to new client
await websocket.send_json({
"type": "welcome",
"room": room_id,
"online": manager.active_connections(room_id),
})

while True:
data = await websocket.receive_json()
# Broadcast message to all in room
await manager.broadcast(
room_id,
{"type": "message", "text": data.get("text", ""), "room": room_id},
)
except WebSocketDisconnect:
manager.disconnect(room_id, websocket)
await manager.broadcast(
room_id,
{"type": "system", "message": "A user left the room"},
)

Adding Usernames

app/routers/chat.py
@router.websocket("/ws/chat/{room_id}")
async def chat_with_username(
room_id: str,
websocket: WebSocket,
username: str = "Anonymous",
) -> None:
await manager.connect(room_id, websocket)
try:
await manager.broadcast(room_id, {
"type": "system",
"message": f"{username} joined"
}, exclude=websocket)

while True:
data = await websocket.receive_json()
await manager.broadcast(room_id, {
"type": "message",
"user": username,
"text": data.get("text", ""),
"room": room_id,
})
except WebSocketDisconnect:
manager.disconnect(room_id, websocket)
await manager.broadcast(room_id, {
"type": "system",
"message": f"{username} left"
})

Scaling WebSockets Across Workers

With multiple Uvicorn workers, each process has its own ConnectionManager — clients on different workers can't communicate. Solution: use Redis pub/sub.

app/ws/redis_manager.py
import json
import asyncio
import redis.asyncio as aioredis

class RedisConnectionManager:
def __init__(self, redis_url: str) -> None:
self.redis = aioredis.from_url(redis_url)
self._local: dict[str, list] = {}

async def subscribe_and_forward(self, room_id: str) -> None:
pubsub = self.redis.pubsub()
await pubsub.subscribe(f"chat:{room_id}")
async for msg in pubsub.listen():
if msg["type"] == "message":
data = json.loads(msg["data"])
for ws in list(self._local.get(room_id, [])):
await ws.send_json(data)

async def publish(self, room_id: str, message: dict) -> None:
await self.redis.publish(f"chat:{room_id}", json.dumps(message))

Common Pitfalls

PitfallCause / SymptomFix
Memory leak from disconnected socketsNot calling disconnect() on closeAlways remove from _rooms in except WebSocketDisconnect
Broadcast to closed socket raisesSending to a disconnected clientWrap send_json in try/except inside broadcast
Race condition in room cleanupConcurrent joins/leavesUse asyncio.Lock around _rooms modification
All clients on same workerMulti-worker deploymentUse Redis pub/sub to share messages across workers
Username injected from URL unvalidatedXSS risk via username broadcastSanitize username before broadcasting

Hands-On Practice

test-chat.sh
uvicorn app.main:app --reload --port 8012

# Open two terminals, each connecting to the same room
wscat -c "ws://localhost:8012/ws/chat/room1?username=alice"
wscat -c "ws://localhost:8012/ws/chat/room1?username=bob"

# In alice's terminal, type a message and send:
# {"text": "Hello Bob!"}
# Bob sees: {"type": "message", "user": "alice", "text": "Hello Bob!", "room": "room1"}

What's Next