OAuth2 and JWT
JWT (JSON Web Token) authentication is the most common stateless authentication pattern for REST APIs. FastAPI provides OAuth2PasswordBearer to parse the Authorization: Bearer <token> header and a clear pattern for issuing and validating JWTs.
Learning Focus
By the end of this lesson you can: implement the OAuth2 password grant flow, issue signed JWTs on login, validate them on protected routes, and understand what claims to include and what to avoid.
Installation
install-auth-deps.sh
pip install "python-jose[cryptography]" passlib[bcrypt]
# python-jose: JWT encoding/decoding
# passlib: password hashing (bcrypt)
JWT Flow Overview
Security Configuration
app/core/security.py
from datetime import datetime, timedelta, timezone
from typing import Any
from jose import jwt, JWTError
from passlib.context import CryptContext
SECRET_KEY = "dev-only-change-in-production-use-32-random-bytes"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode["exp"] = expire
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> dict[str, Any]:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
danger
Never commit a real SECRET_KEY. Use python -c "import secrets; print(secrets.token_hex(32))" to generate one and store it in environment variables only.
Token Endpoint
app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Annotated
from app.core.security import verify_password, create_access_token
from app.crud.users import get_user_by_username
router = APIRouter(prefix="/auth", tags=["auth"])
class Token(BaseModel):
access_token: str
token_type: str
@router.post("/token", response_model=Token)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: DBSession,
) -> Token:
user = await get_user_by_username(db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
token = create_access_token({"sub": str(user.id), "role": user.role})
return Token(access_token=token, token_type="bearer")
Auth Dependency
app/dependencies/auth.py
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from app.core.security import decode_access_token
from app.db.session import get_db
from app.crud.users import get_user_by_id
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_access_token(token)
user_id = int(payload.get("sub"))
except (JWTError, ValueError, TypeError):
raise credentials_exception
user = await get_user_by_id(db, user_id)
if user is None or not user.is_active:
raise credentials_exception
return user
CurrentUser = Annotated[UserORM, Depends(get_current_user)]
Protected Route
app/routers/users.py
from app.dependencies.auth import CurrentUser
@router.get("/me", response_model=UserResponse)
async def get_me(user: CurrentUser) -> UserResponse:
return UserResponse.model_validate(user)
Token Claims Best Practices
| Claim | Include? | Why |
|---|---|---|
sub (subject — user ID) | ✅ Always | Identify the user |
exp (expiry) | ✅ Always | Limit token lifetime |
role | ✅ Useful | Avoid DB lookup for simple RBAC |
iat (issued at) | ✅ Optional | Audit trail |
email | ⚠️ Caution | Can become stale if user changes email |
| Hashed password | ❌ Never | Catastrophic if token is leaked |
| Sensitive PII | ❌ Never | Tokens are base64-decodable by anyone |
Common Pitfalls
| Pitfall | Cause / Symptom | Fix |
|---|---|---|
SECRET_KEY in source code | Accidental commit to public repo | Use .env and pydantic-settings |
| Token never expires | Missing exp claim | Always set ACCESS_TOKEN_EXPIRE_MINUTES |
| 401 on valid token | Token issued with different key | Rotate keys carefully; invalidate old tokens |
| bcrypt too slow | Too many rounds | Use CryptContext(schemes=["bcrypt"]) default (12 rounds) |
| No refresh token | Short-lived tokens force re-login | Implement refresh token endpoint with longer-lived tokens |
Hands-On Practice
test-jwt-auth.sh
# 1) Create a user (if you have a register endpoint)
curl -X POST http://localhost:8000/auth/register \
-H "Content-Type: application/json" \
-d '{"username": "alice", "password": "Secur3Pass!", "email": "alice@example.com"}'
# 2) Get a token
TOKEN=$(curl -s -X POST http://localhost:8000/auth/token \
-d 'username=alice&password=Secur3Pass!' \
-H "Content-Type: application/x-www-form-urlencoded" | jq -r '.access_token')
echo "Token: $TOKEN"
# 3) Access protected endpoint
curl http://localhost:8000/users/me \
-H "Authorization: Bearer $TOKEN"
# 4) Try with expired/invalid token
curl http://localhost:8000/users/me \
-H "Authorization: Bearer invalid.token.here" \
-w "\nHTTP %{http_code}"
# → 401