Skip to main content

Streaming Responses

When your response data is large, dynamically generated, or continuous (like real-time logs or SSE), loading everything into memory before sending is wasteful or impossible. StreamingResponse lets FastAPI send data in chunks as it is generated.

Learning Focus

By the end of this lesson you can: stream a large file, implement server-sent events (SSE), stream database query results as NDJSON, and handle streaming errors gracefully.

Streaming a Large File

app/routers/downloads.py
from pathlib import Path
from typing import AsyncGenerator
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse

router = APIRouter(prefix="/downloads", tags=["downloads"])
EXPORT_DIR = Path("exports")

async def file_generator(path: Path, chunk_size: int = 65536) -> AsyncGenerator[bytes, None]:
with path.open("rb") as f:
while chunk := f.read(chunk_size):
yield chunk

@router.get("/{filename}")
async def download_file(filename: str) -> StreamingResponse:
safe_name = Path(filename).name # Prevent path traversal
path = EXPORT_DIR / safe_name
if not path.is_file():
raise HTTPException(404, "File not found")
return StreamingResponse(
file_generator(path),
media_type="application/octet-stream",
headers={"Content-Disposition": f"attachment; filename={safe_name}"},
)

Server-Sent Events (SSE)

SSE lets the server push real-time events to the browser without WebSockets:

app/routers/events.py
import asyncio
import json
from typing import AsyncGenerator
from fastapi import APIRouter
from fastapi.responses import StreamingResponse

router = APIRouter(prefix="/events", tags=["events"])

async def event_generator(client_id: str) -> AsyncGenerator[str, None]:
for i in range(10):
event = {"type": "update", "data": f"Message {i}", "client": client_id}
yield f"data: {json.dumps(event)}\n\n"
await asyncio.sleep(1)
yield "event: close\ndata: stream ended\n\n"

@router.get("/stream")
async def sse_stream(client_id: str = "anon") -> StreamingResponse:
return StreamingResponse(
event_generator(client_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable Nginx buffering
},
)
test-sse.sh
curl -N http://localhost:8000/events/stream?client_id=test
# → data: {"type": "update", "data": "Message 0", ...}
# → data: {"type": "update", "data": "Message 1", ...}

Streaming NDJSON (Newline-Delimited JSON)

For streaming database results without loading all records into memory:

app/routers/export.py
import json
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

async def stream_users(db: AsyncSession) -> AsyncGenerator[str, None]:
async with db.stream(select(User).execution_options(yield_per=100)) as result:
async for partition in result.partitions():
for user in partition:
yield json.dumps({"id": user.id, "username": user.username}) + "\n"

@router.get("/export/users.ndjson")
async def export_users(db: DBSession) -> StreamingResponse:
return StreamingResponse(
stream_users(db),
media_type="application/x-ndjson",
headers={"Content-Disposition": "attachment; filename=users.ndjson"},
)

Common Pitfalls

PitfallCause / SymptomFix
Response buffered by NginxClient receives all at onceAdd X-Accel-Buffering: no header
Generator not closed on disconnectMemory leakWrap generator in try/finally and handle GeneratorExit
DB session closed mid-streamSession tied to request scopeUse a separate session for long-running streams
Large file read into memoryUsing file.read() instead of chunked readYield in chunks (64–256KB)
SSE not working in browserWrong Content-TypeUse text/event-stream exactly

What's Next