Skip to main content

OAuth2 and JWT

JWT (JSON Web Token) authentication is the most common stateless authentication pattern for REST APIs. FastAPI provides OAuth2PasswordBearer to parse the Authorization: Bearer <token> header and a clear pattern for issuing and validating JWTs.

Learning Focus

By the end of this lesson you can: implement the OAuth2 password grant flow, issue signed JWTs on login, validate them on protected routes, and understand what claims to include and what to avoid.

Installation

install-auth-deps.sh
pip install "python-jose[cryptography]" passlib[bcrypt]
# python-jose: JWT encoding/decoding
# passlib: password hashing (bcrypt)

JWT Flow Overview

Security Configuration

app/core/security.py
from datetime import datetime, timedelta, timezone
from typing import Any
from jose import jwt, JWTError
from passlib.context import CryptContext

SECRET_KEY = "dev-only-change-in-production-use-32-random-bytes"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)

def hash_password(plain: str) -> str:
return pwd_context.hash(plain)

def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode["exp"] = expire
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def decode_access_token(token: str) -> dict[str, Any]:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
danger

Never commit a real SECRET_KEY. Use python -c "import secrets; print(secrets.token_hex(32))" to generate one and store it in environment variables only.

Token Endpoint

app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Annotated
from app.core.security import verify_password, create_access_token
from app.crud.users import get_user_by_username

router = APIRouter(prefix="/auth", tags=["auth"])

class Token(BaseModel):
access_token: str
token_type: str

@router.post("/token", response_model=Token)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: DBSession,
) -> Token:
user = await get_user_by_username(db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")

token = create_access_token({"sub": str(user.id), "role": user.role})
return Token(access_token=token, token_type="bearer")

Auth Dependency

app/dependencies/auth.py
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from app.core.security import decode_access_token
from app.db.session import get_db
from app.crud.users import get_user_by_id

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_access_token(token)
user_id = int(payload.get("sub"))
except (JWTError, ValueError, TypeError):
raise credentials_exception

user = await get_user_by_id(db, user_id)
if user is None or not user.is_active:
raise credentials_exception
return user

CurrentUser = Annotated[UserORM, Depends(get_current_user)]

Protected Route

app/routers/users.py
from app.dependencies.auth import CurrentUser

@router.get("/me", response_model=UserResponse)
async def get_me(user: CurrentUser) -> UserResponse:
return UserResponse.model_validate(user)

Token Claims Best Practices

ClaimInclude?Why
sub (subject — user ID)✅ AlwaysIdentify the user
exp (expiry)✅ AlwaysLimit token lifetime
role✅ UsefulAvoid DB lookup for simple RBAC
iat (issued at)✅ OptionalAudit trail
email⚠️ CautionCan become stale if user changes email
Hashed password❌ NeverCatastrophic if token is leaked
Sensitive PII❌ NeverTokens are base64-decodable by anyone

Common Pitfalls

PitfallCause / SymptomFix
SECRET_KEY in source codeAccidental commit to public repoUse .env and pydantic-settings
Token never expiresMissing exp claimAlways set ACCESS_TOKEN_EXPIRE_MINUTES
401 on valid tokenToken issued with different keyRotate keys carefully; invalidate old tokens
bcrypt too slowToo many roundsUse CryptContext(schemes=["bcrypt"]) default (12 rounds)
No refresh tokenShort-lived tokens force re-loginImplement refresh token endpoint with longer-lived tokens

Hands-On Practice

test-jwt-auth.sh
# 1) Create a user (if you have a register endpoint)
curl -X POST http://localhost:8000/auth/register \
-H "Content-Type: application/json" \
-d '{"username": "alice", "password": "Secur3Pass!", "email": "alice@example.com"}'

# 2) Get a token
TOKEN=$(curl -s -X POST http://localhost:8000/auth/token \
-d 'username=alice&password=Secur3Pass!' \
-H "Content-Type: application/x-www-form-urlencoded" | jq -r '.access_token')

echo "Token: $TOKEN"

# 3) Access protected endpoint
curl http://localhost:8000/users/me \
-H "Authorization: Bearer $TOKEN"

# 4) Try with expired/invalid token
curl http://localhost:8000/users/me \
-H "Authorization: Bearer invalid.token.here" \
-w "\nHTTP %{http_code}"
# → 401

What's Next