Async Database Engines
Using async SQLAlchemy correctly in FastAPI requires understanding how the event loop, connection pool, and async session interact. This lesson covers configuration tuning, safe patterns for async queries, and pitfalls that cause blocking in production.
Learning Focus
By the end of this lesson you can: configure connection pool size for production, execute raw SQL safely, use run_sync for sync-only operations, and diagnose common async engine errors.
Engine Configuration for Production
app/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.core.config import settings
engine = create_async_engine(
settings.DATABASE_URL,
echo=False, # Disable SQL logging in production
pool_size=20, # Connections kept open (per worker process)
max_overflow=10, # Additional connections beyond pool_size
pool_timeout=30, # Seconds to wait for a connection from pool
pool_recycle=1800, # Recycle connections every 30 minutes
pool_pre_ping=True, # Test connection before returning from pool
)
Rule of thumb for pool sizing:
pool_size× (number of Uvicorn workers) ≤ PostgreSQLmax_connections- A PostgreSQL default allows 100 connections
- With 4 workers and
pool_size=20, you use 80 connections — leaving room for maintenance
Raw SQL with text()
For complex queries not expressible with the ORM:
app/crud/analytics.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
async def get_daily_revenue(db: AsyncSession, days: int = 30) -> list[dict]:
result = await db.execute(
text("""
SELECT
DATE(created_at) AS day,
COUNT(*) AS orders,
SUM(total_amount) AS revenue
FROM orders
WHERE created_at >= NOW() - INTERVAL ':days days'
AND status = 'completed'
GROUP BY DATE(created_at)
ORDER BY day DESC
"""),
{"days": days}, # Always use bound parameters — never f-strings
)
return [dict(row._mapping) for row in result]
danger
Never use f-strings or string concatenation to build SQL queries. Always use SQLAlchemy bound parameters (text("... :param") with dict, or ORM filters) to prevent SQL injection.
Executing Sync Operations with run_sync
Some operations (like create_all) are sync-only:
app/db/init.py
from sqlalchemy.ext.asyncio import AsyncEngine
from app.db.base import Base
async def create_all_tables(engine: AsyncEngine) -> None:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def drop_all_tables(engine: AsyncEngine) -> None:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
Connection Lifecycle
Async-Safe Bulk Operations
app/crud/bulk.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.dialects.postgresql import insert
async def upsert_users(db: AsyncSession, users: list[dict]) -> None:
stmt = insert(User).values(users)
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"username": stmt.excluded.username, "updated_at": func.now()},
)
await db.execute(stmt)
Connection String Formats
| Driver | URL Format |
|---|---|
| asyncpg (async, recommended) | postgresql+asyncpg://user:pass@host/db |
| psycopg3 async | postgresql+psycopg://user:pass@host/db |
| aiosqlite (async SQLite for tests) | sqlite+aiosqlite:///./test.db |
| psycopg2 (sync, for Alembic) | postgresql+psycopg2://user:pass@host/db |
Multiple Databases
app/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
primary_engine = create_async_engine(settings.PRIMARY_DB_URL, pool_size=15)
replica_engine = create_async_engine(settings.REPLICA_DB_URL, pool_size=10)
PrimarySession = async_sessionmaker(primary_engine, expire_on_commit=False)
ReplicaSession = async_sessionmaker(replica_engine, expire_on_commit=False)
async def get_primary_db():
async with PrimarySession() as s:
yield s
async def get_replica_db():
async with ReplicaSession() as s:
yield s
Common Pitfalls
| Pitfall | Cause / Symptom | Fix |
|---|---|---|
OperationalError: too many connections | pool_size × workers > PostgreSQL max_connections | Reduce pool_size or increase PostgreSQL limits |
| SSL cert errors with asyncpg | Missing SSL config | Add ?ssl=require to connection string or pass ssl=True |
| Connection leaks | Session not closed after exception | Always use async with — never open sessions manually |
asyncpg prepared statement cache conflict | Running schema changes between queries | Disable prepared statements: ?prepared_statement_cache_size=0 |
| Slow first request | Cold connection pool | Warm up pool at startup in lifespan |
Hands-On Practice
app/db/warmup.py
from sqlalchemy import text
from app.db.session import engine, AsyncSessionFactory
async def warmup_pool() -> None:
"""Pre-populate the connection pool at startup."""
tasks = []
async with AsyncSessionFactory() as session:
await session.execute(text("SELECT 1"))
app/main.py
from contextlib import asynccontextmanager
from app.db.warmup import warmup_pool
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
await warmup_pool()
yield
app = FastAPI(lifespan=lifespan)
verify-pooling.sh
# Check active PostgreSQL connections
psql -U postgres -c "SELECT count(*) FROM pg_stat_activity WHERE datname='app';"
# Start the app
uvicorn app.main:app --workers 4
# Check again — should see pool_size * workers connections
psql -U postgres -c "SELECT count(*) FROM pg_stat_activity WHERE datname='app';"