Streaming Responses
When your response data is large, dynamically generated, or continuous (like real-time logs or SSE), loading everything into memory before sending is wasteful or impossible. StreamingResponse lets FastAPI send data in chunks as it is generated.
Learning Focus
By the end of this lesson you can: stream a large file, implement server-sent events (SSE), stream database query results as NDJSON, and handle streaming errors gracefully.
Streaming a Large File
app/routers/downloads.py
from pathlib import Path
from typing import AsyncGenerator
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
router = APIRouter(prefix="/downloads", tags=["downloads"])
EXPORT_DIR = Path("exports")
async def file_generator(path: Path, chunk_size: int = 65536) -> AsyncGenerator[bytes, None]:
with path.open("rb") as f:
while chunk := f.read(chunk_size):
yield chunk
@router.get("/{filename}")
async def download_file(filename: str) -> StreamingResponse:
safe_name = Path(filename).name # Prevent path traversal
path = EXPORT_DIR / safe_name
if not path.is_file():
raise HTTPException(404, "File not found")
return StreamingResponse(
file_generator(path),
media_type="application/octet-stream",
headers={"Content-Disposition": f"attachment; filename={safe_name}"},
)
Server-Sent Events (SSE)
SSE lets the server push real-time events to the browser without WebSockets:
app/routers/events.py
import asyncio
import json
from typing import AsyncGenerator
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
router = APIRouter(prefix="/events", tags=["events"])
async def event_generator(client_id: str) -> AsyncGenerator[str, None]:
for i in range(10):
event = {"type": "update", "data": f"Message {i}", "client": client_id}
yield f"data: {json.dumps(event)}\n\n"
await asyncio.sleep(1)
yield "event: close\ndata: stream ended\n\n"
@router.get("/stream")
async def sse_stream(client_id: str = "anon") -> StreamingResponse:
return StreamingResponse(
event_generator(client_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable Nginx buffering
},
)
test-sse.sh
curl -N http://localhost:8000/events/stream?client_id=test
# → data: {"type": "update", "data": "Message 0", ...}
# → data: {"type": "update", "data": "Message 1", ...}
Streaming NDJSON (Newline-Delimited JSON)
For streaming database results without loading all records into memory:
app/routers/export.py
import json
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
async def stream_users(db: AsyncSession) -> AsyncGenerator[str, None]:
async with db.stream(select(User).execution_options(yield_per=100)) as result:
async for partition in result.partitions():
for user in partition:
yield json.dumps({"id": user.id, "username": user.username}) + "\n"
@router.get("/export/users.ndjson")
async def export_users(db: DBSession) -> StreamingResponse:
return StreamingResponse(
stream_users(db),
media_type="application/x-ndjson",
headers={"Content-Disposition": "attachment; filename=users.ndjson"},
)
Common Pitfalls
| Pitfall | Cause / Symptom | Fix |
|---|---|---|
| Response buffered by Nginx | Client receives all at once | Add X-Accel-Buffering: no header |
| Generator not closed on disconnect | Memory leak | Wrap generator in try/finally and handle GeneratorExit |
| DB session closed mid-stream | Session tied to request scope | Use a separate session for long-running streams |
| Large file read into memory | Using file.read() instead of chunked read | Yield in chunks (64–256KB) |
| SSE not working in browser | Wrong Content-Type | Use text/event-stream exactly |