CRUD Operations
With the database session wired in, the next step is querying and mutating data. SQLAlchemy 2 replaced the legacy session.query() API with select() statements — more explicit, more composable, and fully async-compatible.
Learning Focus
By the end of this lesson you can: write async CRUD operations with select(), session.execute(), session.add(), session.delete(), and encapsulate them in a repository class.
Create
app/crud/users.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models.user import User
async def create_user(db: AsyncSession, username: str, email: str, hashed_pw: str) -> User:
user = User(username=username, email=email, hashed_password=hashed_pw)
db.add(user)
await db.flush() # Write to DB but keep in transaction
await db.refresh(user) # Reload generated fields (id, created_at)
return user
note
flush() sends the INSERT to the database but does not commit the transaction. The get_db dependency commits after the handler returns. Use flush() when you need the generated id before committing.
Read (Single and List)
app/crud/users.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models.user import User
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def list_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
result = await db.execute(
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
Update
app/crud/users.py
async def update_user(
db: AsyncSession, user_id: int, update_data: dict
) -> User | None:
user = await get_user_by_id(db, user_id)
if user is None:
return None
for field, value in update_data.items():
setattr(user, field, value)
await db.flush()
await db.refresh(user)
return user
Delete
app/crud/users.py
async def delete_user(db: AsyncSession, user_id: int) -> bool:
user = await get_user_by_id(db, user_id)
if user is None:
return False
await db.delete(user)
return True
Repository Pattern
Encapsulate all CRUD for a model in a class:
app/crud/base.py
from typing import Generic, TypeVar, Type
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.base import Base
ModelT = TypeVar("ModelT", bound=Base)
class CRUDBase(Generic[ModelT]):
def __init__(self, model: Type[ModelT]):
self.model = model
async def get(self, db: AsyncSession, id: int) -> ModelT | None:
return await db.get(self.model, id)
async def list(self, db: AsyncSession, skip: int = 0, limit: int = 100) -> list[ModelT]:
result = await db.execute(select(self.model).offset(skip).limit(limit))
return list(result.scalars().all())
async def create(self, db: AsyncSession, **kwargs) -> ModelT:
obj = self.model(**kwargs)
db.add(obj)
await db.flush()
await db.refresh(obj)
return obj
async def delete(self, db: AsyncSession, id: int) -> bool:
obj = await self.get(db, id)
if obj is None:
return False
await db.delete(obj)
return True
app/crud/users.py
from app.crud.base import CRUDBase
from app.db.models.user import User
user_crud = CRUDBase(User)
Route Using CRUD
app/routers/users.py
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from app.crud.users import user_crud
from app.models.user import UserCreate, UserResponse
router = APIRouter(prefix="/users", tags=["users"])
DBSession = Annotated[AsyncSession, Depends(get_db)]
@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(data: UserCreate, db: DBSession) -> UserResponse:
existing = await user_crud.get_by_email(db, data.email)
if existing:
raise HTTPException(409, "Email already registered")
user = await user_crud.create(db, **data.model_dump())
return UserResponse.model_validate(user)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: DBSession) -> UserResponse:
user = await user_crud.get(db, user_id)
if user is None:
raise HTTPException(404, "User not found")
return UserResponse.model_validate(user)
Eager Loading Relationships
Avoid lazy-loading N+1 queries in async context:
app/crud/posts.py
from sqlalchemy import select
from sqlalchemy.orm import selectinload
async def list_posts_with_authors(db: AsyncSession) -> list:
result = await db.execute(
select(Post)
.options(selectinload(Post.author)) # Load author in single query
.order_by(Post.created_at.desc())
)
return list(result.scalars().all())
| Loading Strategy | SQL Generated | Use When |
|---|---|---|
selectinload | 2 queries (1 for items, 1 for related) | Lists with relationships |
joinedload | 1 JOIN query | Single item with nested object |
| Lazy (default) | N+1 queries | ❌ Never use in async context |
Common Pitfalls
| Pitfall | Cause / Symptom | Fix |
|---|---|---|
MissingGreenlet on attribute access | Lazy loading in async context | Use selectinload() or joinedload() |
DetachedInstanceError | Accessing ORM object after session closes | Set expire_on_commit=False |
| Duplicate key error | Missing unique check before insert | Query for existing record first |
| N+1 queries | Iterating relationship in a loop | Use eager loading in the original query |
refresh() fails | Object not attached to session | Call flush() before refresh() |
Hands-On Practice
tests/test_crud.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.db.base import Base
from app.db.models.user import User
from app.crud.base import CRUDBase
TEST_URL = "sqlite+aiosqlite:///:memory:"
user_crud = CRUDBase(User)
@pytest.fixture
async def db():
engine = create_async_engine(TEST_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
factory = async_sessionmaker(engine, expire_on_commit=False)
async with factory() as session:
yield session
@pytest.mark.asyncio
async def test_create_and_get_user(db: AsyncSession):
user = await user_crud.create(
db,
username="alice",
email="alice@example.com",
hashed_password="hashed",
is_active=True,
)
assert user.id is not None
found = await user_crud.get(db, user.id)
assert found is not None
assert found.username == "alice"
run-tests.sh
pip install pytest pytest-asyncio aiosqlite
pytest tests/test_crud.py -v