Skip to main content

Async Database Engines

Using async SQLAlchemy correctly in FastAPI requires understanding how the event loop, connection pool, and async session interact. This lesson covers configuration tuning, safe patterns for async queries, and pitfalls that cause blocking in production.

Learning Focus

By the end of this lesson you can: configure connection pool size for production, execute raw SQL safely, use run_sync for sync-only operations, and diagnose common async engine errors.

Engine Configuration for Production

app/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.core.config import settings

engine = create_async_engine(
settings.DATABASE_URL,
echo=False, # Disable SQL logging in production
pool_size=20, # Connections kept open (per worker process)
max_overflow=10, # Additional connections beyond pool_size
pool_timeout=30, # Seconds to wait for a connection from pool
pool_recycle=1800, # Recycle connections every 30 minutes
pool_pre_ping=True, # Test connection before returning from pool
)

Rule of thumb for pool sizing:

  • pool_size × (number of Uvicorn workers) ≤ PostgreSQL max_connections
  • A PostgreSQL default allows 100 connections
  • With 4 workers and pool_size=20, you use 80 connections — leaving room for maintenance

Raw SQL with text()

For complex queries not expressible with the ORM:

app/crud/analytics.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

async def get_daily_revenue(db: AsyncSession, days: int = 30) -> list[dict]:
result = await db.execute(
text("""
SELECT
DATE(created_at) AS day,
COUNT(*) AS orders,
SUM(total_amount) AS revenue
FROM orders
WHERE created_at >= NOW() - INTERVAL ':days days'
AND status = 'completed'
GROUP BY DATE(created_at)
ORDER BY day DESC
"""),
{"days": days}, # Always use bound parameters — never f-strings
)
return [dict(row._mapping) for row in result]
danger

Never use f-strings or string concatenation to build SQL queries. Always use SQLAlchemy bound parameters (text("... :param") with dict, or ORM filters) to prevent SQL injection.

Executing Sync Operations with run_sync

Some operations (like create_all) are sync-only:

app/db/init.py
from sqlalchemy.ext.asyncio import AsyncEngine
from app.db.base import Base

async def create_all_tables(engine: AsyncEngine) -> None:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

async def drop_all_tables(engine: AsyncEngine) -> None:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)

Connection Lifecycle

Async-Safe Bulk Operations

app/crud/bulk.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.dialects.postgresql import insert

async def upsert_users(db: AsyncSession, users: list[dict]) -> None:
stmt = insert(User).values(users)
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"username": stmt.excluded.username, "updated_at": func.now()},
)
await db.execute(stmt)

Connection String Formats

DriverURL Format
asyncpg (async, recommended)postgresql+asyncpg://user:pass@host/db
psycopg3 asyncpostgresql+psycopg://user:pass@host/db
aiosqlite (async SQLite for tests)sqlite+aiosqlite:///./test.db
psycopg2 (sync, for Alembic)postgresql+psycopg2://user:pass@host/db

Multiple Databases

app/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

primary_engine = create_async_engine(settings.PRIMARY_DB_URL, pool_size=15)
replica_engine = create_async_engine(settings.REPLICA_DB_URL, pool_size=10)

PrimarySession = async_sessionmaker(primary_engine, expire_on_commit=False)
ReplicaSession = async_sessionmaker(replica_engine, expire_on_commit=False)

async def get_primary_db():
async with PrimarySession() as s:
yield s

async def get_replica_db():
async with ReplicaSession() as s:
yield s

Common Pitfalls

PitfallCause / SymptomFix
OperationalError: too many connectionspool_size × workers > PostgreSQL max_connectionsReduce pool_size or increase PostgreSQL limits
SSL cert errors with asyncpgMissing SSL configAdd ?ssl=require to connection string or pass ssl=True
Connection leaksSession not closed after exceptionAlways use async with — never open sessions manually
asyncpg prepared statement cache conflictRunning schema changes between queriesDisable prepared statements: ?prepared_statement_cache_size=0
Slow first requestCold connection poolWarm up pool at startup in lifespan

Hands-On Practice

app/db/warmup.py
from sqlalchemy import text
from app.db.session import engine, AsyncSessionFactory

async def warmup_pool() -> None:
"""Pre-populate the connection pool at startup."""
tasks = []
async with AsyncSessionFactory() as session:
await session.execute(text("SELECT 1"))
app/main.py
from contextlib import asynccontextmanager
from app.db.warmup import warmup_pool
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
await warmup_pool()
yield

app = FastAPI(lifespan=lifespan)
verify-pooling.sh
# Check active PostgreSQL connections
psql -U postgres -c "SELECT count(*) FROM pg_stat_activity WHERE datname='app';"

# Start the app
uvicorn app.main:app --workers 4

# Check again — should see pool_size * workers connections
psql -U postgres -c "SELECT count(*) FROM pg_stat_activity WHERE datname='app';"

What's Next