Skip to main content

Background Tasks

Sometimes you need to do work after sending the response — sending a confirmation email, writing to an audit log, or triggering a slow cleanup job. FastAPI's BackgroundTasks lets you schedule work that runs after the response is sent, without blocking the client.

Learning Focus

By the end of this lesson you can: schedule background tasks in route handlers, pass dependencies into tasks, understand the limitations vs. Celery, and test background task execution.

Basic Background Task

app/routers/orders.py
import logging
from fastapi import APIRouter, BackgroundTasks
from pydantic import BaseModel

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/orders", tags=["orders"])

def send_order_confirmation(order_id: int, email: str) -> None:
"""Runs after the response is sent. Do not use async def here unless truly needed."""
logger.info("Sending confirmation for order %d to %s", order_id, email)
# Simulate email send
# send_email(to=email, subject="Order confirmed", body=f"Order #{order_id} confirmed")

class OrderCreate(BaseModel):
product_id: int
email: str

@router.post("/", status_code=201)
async def create_order(
order: OrderCreate,
background_tasks: BackgroundTasks,
) -> dict:
order_id = 12345 # In reality: created in DB
# Schedule the email — runs AFTER this response is returned
background_tasks.add_task(send_order_confirmation, order_id, order.email)
return {"order_id": order_id, "status": "created"}

The client receives {"order_id": 12345, "status": "created"} immediately. The email sends afterward.

Async Background Tasks

app/tasks/notifications.py
import asyncio
import httpx

async def notify_webhook(url: str, payload: dict) -> None:
async with httpx.AsyncClient() as client:
try:
await client.post(url, json=payload, timeout=5.0)
except httpx.HTTPError as e:
import logging
logging.getLogger(__name__).error("Webhook failed: %s", e)
app/routers/orders.py
from app.tasks.notifications import notify_webhook

@router.post("/")
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks) -> dict:
order_id = 99
background_tasks.add_task(
notify_webhook,
"https://hooks.example.com/order",
{"event": "order.created", "order_id": order_id},
)
return {"order_id": order_id}

Background Tasks with Dependencies

Dependencies (like DB sessions) from the request scope are closed before background tasks run. Create fresh resources inside the task:

app/tasks/audit.py
from app.db.session import AsyncSessionFactory

async def log_audit_event(user_id: int, action: str, resource_id: int) -> None:
"""Creates its own DB session — does not reuse request session."""
async with AsyncSessionFactory() as db:
audit = AuditLog(user_id=user_id, action=action, resource_id=resource_id)
db.add(audit)
await db.commit()
warning

Never pass a SQLAlchemy AsyncSession from the request into a background task. The session is closed when the request ends. Create a new session inside the task instead.

BackgroundTasks vs Celery

FeatureBackgroundTasksCelery
SetupNone — built inRedis/RabbitMQ broker required
Task persistence❌ No — lost on crash✅ Persisted to broker
Retries❌ No✅ Configurable
Monitoring❌ No dashboard✅ Flower / built-in
Distributed workers❌ Same process✅ Multiple workers
Use forEmail, logging, webhooksLong jobs, retryable work

Testing Background Tasks

tests/test_orders.py
from fastapi.testclient import TestClient
from unittest.mock import patch
from app.main import app

client = TestClient(app)

def test_order_schedules_email():
with patch("app.routers.orders.send_order_confirmation") as mock_send:
resp = client.post(
"/orders/",
json={"product_id": 1, "email": "user@example.com"},
)
assert resp.status_code == 201
# Background tasks run synchronously in TestClient
mock_send.assert_called_once()
note

TestClient runs background tasks synchronously inline. Your task function will have executed by the time the test assertion runs.

Common Pitfalls

PitfallCause / SymptomFix
Task never runsTask throws an exception that is silently swallowedWrap task body in try/except with logging so failures are visible
DB session closed errorReusing request session in taskCreate new session with AsyncSessionFactory()
Task fails silentlyNo error handling in taskWrap task body in try/except with logging
Task runs but response is slowLong-running synchronous task before add_taskOffload to background — don't do work in handler
Tasks lost on restartIn-memory BackgroundTasks with no persistenceUse Celery for critical tasks

Hands-On Practice

app/routers/signups.py
import logging
from fastapi import APIRouter, BackgroundTasks
from pydantic import BaseModel, EmailStr

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/signups", tags=["signups"])

def send_welcome_email(email: str, username: str) -> None:
logger.info("[EMAIL] Welcome email sent to %s (%s)", email, username)

def log_signup_event(username: str) -> None:
logger.info("[AUDIT] New signup: %s", username)

class SignupRequest(BaseModel):
username: str
email: EmailStr

@router.post("/", status_code=201)
async def signup(body: SignupRequest, tasks: BackgroundTasks) -> dict:
tasks.add_task(send_welcome_email, body.email, body.username)
tasks.add_task(log_signup_event, body.username)
return {"message": "Account created", "username": body.username}
test-background.sh
uvicorn app.routers.signups:router --reload --port 8010

curl -s -X POST http://localhost:8010/signups/ \
-H "Content-Type: application/json" \
-d '{"username": "alice", "email": "alice@example.com"}'
# Response comes back immediately
# Log output appears shortly after

What's Next