Background Tasks
Sometimes you need to do work after sending the response — sending a confirmation email, writing to an audit log, or triggering a slow cleanup job. FastAPI's BackgroundTasks lets you schedule work that runs after the response is sent, without blocking the client.
By the end of this lesson you can: schedule background tasks in route handlers, pass dependencies into tasks, understand the limitations vs. Celery, and test background task execution.
Basic Background Task
import logging
from fastapi import APIRouter, BackgroundTasks
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/orders", tags=["orders"])
def send_order_confirmation(order_id: int, email: str) -> None:
"""Runs after the response is sent. Do not use async def here unless truly needed."""
logger.info("Sending confirmation for order %d to %s", order_id, email)
# Simulate email send
# send_email(to=email, subject="Order confirmed", body=f"Order #{order_id} confirmed")
class OrderCreate(BaseModel):
product_id: int
email: str
@router.post("/", status_code=201)
async def create_order(
order: OrderCreate,
background_tasks: BackgroundTasks,
) -> dict:
order_id = 12345 # In reality: created in DB
# Schedule the email — runs AFTER this response is returned
background_tasks.add_task(send_order_confirmation, order_id, order.email)
return {"order_id": order_id, "status": "created"}
The client receives {"order_id": 12345, "status": "created"} immediately. The email sends afterward.
Async Background Tasks
import asyncio
import httpx
async def notify_webhook(url: str, payload: dict) -> None:
async with httpx.AsyncClient() as client:
try:
await client.post(url, json=payload, timeout=5.0)
except httpx.HTTPError as e:
import logging
logging.getLogger(__name__).error("Webhook failed: %s", e)
from app.tasks.notifications import notify_webhook
@router.post("/")
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks) -> dict:
order_id = 99
background_tasks.add_task(
notify_webhook,
"https://hooks.example.com/order",
{"event": "order.created", "order_id": order_id},
)
return {"order_id": order_id}
Background Tasks with Dependencies
Dependencies (like DB sessions) from the request scope are closed before background tasks run. Create fresh resources inside the task:
from app.db.session import AsyncSessionFactory
async def log_audit_event(user_id: int, action: str, resource_id: int) -> None:
"""Creates its own DB session — does not reuse request session."""
async with AsyncSessionFactory() as db:
audit = AuditLog(user_id=user_id, action=action, resource_id=resource_id)
db.add(audit)
await db.commit()
Never pass a SQLAlchemy AsyncSession from the request into a background task. The session is closed when the request ends. Create a new session inside the task instead.
BackgroundTasks vs Celery
| Feature | BackgroundTasks | Celery |
|---|---|---|
| Setup | None — built in | Redis/RabbitMQ broker required |
| Task persistence | ❌ No — lost on crash | ✅ Persisted to broker |
| Retries | ❌ No | ✅ Configurable |
| Monitoring | ❌ No dashboard | ✅ Flower / built-in |
| Distributed workers | ❌ Same process | ✅ Multiple workers |
| Use for | Email, logging, webhooks | Long jobs, retryable work |
Testing Background Tasks
from fastapi.testclient import TestClient
from unittest.mock import patch
from app.main import app
client = TestClient(app)
def test_order_schedules_email():
with patch("app.routers.orders.send_order_confirmation") as mock_send:
resp = client.post(
"/orders/",
json={"product_id": 1, "email": "user@example.com"},
)
assert resp.status_code == 201
# Background tasks run synchronously in TestClient
mock_send.assert_called_once()
TestClient runs background tasks synchronously inline. Your task function will have executed by the time the test assertion runs.
Common Pitfalls
| Pitfall | Cause / Symptom | Fix |
|---|---|---|
| Task never runs | Task throws an exception that is silently swallowed | Wrap task body in try/except with logging so failures are visible |
| DB session closed error | Reusing request session in task | Create new session with AsyncSessionFactory() |
| Task fails silently | No error handling in task | Wrap task body in try/except with logging |
| Task runs but response is slow | Long-running synchronous task before add_task | Offload to background — don't do work in handler |
| Tasks lost on restart | In-memory BackgroundTasks with no persistence | Use Celery for critical tasks |
Hands-On Practice
import logging
from fastapi import APIRouter, BackgroundTasks
from pydantic import BaseModel, EmailStr
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/signups", tags=["signups"])
def send_welcome_email(email: str, username: str) -> None:
logger.info("[EMAIL] Welcome email sent to %s (%s)", email, username)
def log_signup_event(username: str) -> None:
logger.info("[AUDIT] New signup: %s", username)
class SignupRequest(BaseModel):
username: str
email: EmailStr
@router.post("/", status_code=201)
async def signup(body: SignupRequest, tasks: BackgroundTasks) -> dict:
tasks.add_task(send_welcome_email, body.email, body.username)
tasks.add_task(log_signup_event, body.username)
return {"message": "Account created", "username": body.username}
uvicorn app.routers.signups:router --reload --port 8010
curl -s -X POST http://localhost:8010/signups/ \
-H "Content-Type: application/json" \
-d '{"username": "alice", "email": "alice@example.com"}'
# Response comes back immediately
# Log output appears shortly after