Skip to main content

CRUD Operations

With the database session wired in, the next step is querying and mutating data. SQLAlchemy 2 replaced the legacy session.query() API with select() statements — more explicit, more composable, and fully async-compatible.

Learning Focus

By the end of this lesson you can: write async CRUD operations with select(), session.execute(), session.add(), session.delete(), and encapsulate them in a repository class.

Create

app/crud/users.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models.user import User

async def create_user(db: AsyncSession, username: str, email: str, hashed_pw: str) -> User:
user = User(username=username, email=email, hashed_password=hashed_pw)
db.add(user)
await db.flush() # Write to DB but keep in transaction
await db.refresh(user) # Reload generated fields (id, created_at)
return user
note

flush() sends the INSERT to the database but does not commit the transaction. The get_db dependency commits after the handler returns. Use flush() when you need the generated id before committing.

Read (Single and List)

app/crud/users.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models.user import User

async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()

async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()

async def list_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
result = await db.execute(
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())

Update

app/crud/users.py
async def update_user(
db: AsyncSession, user_id: int, update_data: dict
) -> User | None:
user = await get_user_by_id(db, user_id)
if user is None:
return None
for field, value in update_data.items():
setattr(user, field, value)
await db.flush()
await db.refresh(user)
return user

Delete

app/crud/users.py
async def delete_user(db: AsyncSession, user_id: int) -> bool:
user = await get_user_by_id(db, user_id)
if user is None:
return False
await db.delete(user)
return True

Repository Pattern

Encapsulate all CRUD for a model in a class:

app/crud/base.py
from typing import Generic, TypeVar, Type
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.base import Base

ModelT = TypeVar("ModelT", bound=Base)

class CRUDBase(Generic[ModelT]):
def __init__(self, model: Type[ModelT]):
self.model = model

async def get(self, db: AsyncSession, id: int) -> ModelT | None:
return await db.get(self.model, id)

async def list(self, db: AsyncSession, skip: int = 0, limit: int = 100) -> list[ModelT]:
result = await db.execute(select(self.model).offset(skip).limit(limit))
return list(result.scalars().all())

async def create(self, db: AsyncSession, **kwargs) -> ModelT:
obj = self.model(**kwargs)
db.add(obj)
await db.flush()
await db.refresh(obj)
return obj

async def delete(self, db: AsyncSession, id: int) -> bool:
obj = await self.get(db, id)
if obj is None:
return False
await db.delete(obj)
return True
app/crud/users.py
from app.crud.base import CRUDBase
from app.db.models.user import User

user_crud = CRUDBase(User)

Route Using CRUD

app/routers/users.py
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from app.crud.users import user_crud
from app.models.user import UserCreate, UserResponse

router = APIRouter(prefix="/users", tags=["users"])
DBSession = Annotated[AsyncSession, Depends(get_db)]

@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(data: UserCreate, db: DBSession) -> UserResponse:
existing = await user_crud.get_by_email(db, data.email)
if existing:
raise HTTPException(409, "Email already registered")
user = await user_crud.create(db, **data.model_dump())
return UserResponse.model_validate(user)

@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: DBSession) -> UserResponse:
user = await user_crud.get(db, user_id)
if user is None:
raise HTTPException(404, "User not found")
return UserResponse.model_validate(user)

Eager Loading Relationships

Avoid lazy-loading N+1 queries in async context:

app/crud/posts.py
from sqlalchemy import select
from sqlalchemy.orm import selectinload

async def list_posts_with_authors(db: AsyncSession) -> list:
result = await db.execute(
select(Post)
.options(selectinload(Post.author)) # Load author in single query
.order_by(Post.created_at.desc())
)
return list(result.scalars().all())
Loading StrategySQL GeneratedUse When
selectinload2 queries (1 for items, 1 for related)Lists with relationships
joinedload1 JOIN querySingle item with nested object
Lazy (default)N+1 queries❌ Never use in async context

Common Pitfalls

PitfallCause / SymptomFix
MissingGreenlet on attribute accessLazy loading in async contextUse selectinload() or joinedload()
DetachedInstanceErrorAccessing ORM object after session closesSet expire_on_commit=False
Duplicate key errorMissing unique check before insertQuery for existing record first
N+1 queriesIterating relationship in a loopUse eager loading in the original query
refresh() failsObject not attached to sessionCall flush() before refresh()

Hands-On Practice

tests/test_crud.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.db.base import Base
from app.db.models.user import User
from app.crud.base import CRUDBase

TEST_URL = "sqlite+aiosqlite:///:memory:"
user_crud = CRUDBase(User)

@pytest.fixture
async def db():
engine = create_async_engine(TEST_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
factory = async_sessionmaker(engine, expire_on_commit=False)
async with factory() as session:
yield session

@pytest.mark.asyncio
async def test_create_and_get_user(db: AsyncSession):
user = await user_crud.create(
db,
username="alice",
email="alice@example.com",
hashed_password="hashed",
is_active=True,
)
assert user.id is not None
found = await user_crud.get(db, user.id)
assert found is not None
assert found.username == "alice"
run-tests.sh
pip install pytest pytest-asyncio aiosqlite
pytest tests/test_crud.py -v

What's Next