Skip to main content

Rate Limiting and Caching

Rate limiting protects your API from abuse and overload. Caching reduces repeated computation. Both are essential for production APIs under real-world load.

Learning Focus

By the end of this lesson you can: implement Redis-backed rate limiting as a dependency, cache expensive query results with a TTL, and invalidate cache on updates.

Redis Rate Limiting Dependency

install-redis-deps.sh
pip install redis[asyncio] hiredis
app/dependencies/rate_limit.py
import time
import redis.asyncio as aioredis
from fastapi import Request, HTTPException, Depends
from app.core.config import settings

redis_client = aioredis.from_url(settings.REDIS_URL, decode_responses=True)

async def rate_limit(
request: Request,
limit: int = 100,
window: int = 60,
) -> None:
"""
Sliding window rate limiter: `limit` requests per `window` seconds per IP.
"""
client_ip = request.client.host
key = f"rate:{client_ip}:{int(time.time()) // window}"
count = await redis_client.incr(key)
if count == 1:
await redis_client.expire(key, window)
if count > limit:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded: {limit} requests per {window}s",
headers={"Retry-After": str(window)},
)

Tiered Rate Limits

app/dependencies/rate_limit.py
def make_rate_limiter(limit: int, window: int = 60):
async def _limiter(request: Request) -> None:
await rate_limit(request, limit=limit, window=window)
return Depends(_limiter)

# Usage:
PublicLimit = make_rate_limiter(20) # 20 req/min for public
AuthLimit = make_rate_limiter(200) # 200 req/min for authenticated
AdminLimit = make_rate_limiter(1000) # 1000 req/min for admin
app/routers/public.py
@router.get("/search", dependencies=[PublicLimit])
async def public_search(q: str) -> dict:
...

Response Caching with Redis

app/cache/redis_cache.py
import json
from functools import wraps
from typing import Any, Callable
import redis.asyncio as aioredis
from app.core.config import settings

redis_client = aioredis.from_url(settings.REDIS_URL, decode_responses=True)

async def get_cached(key: str) -> Any | None:
value = await redis_client.get(key)
return json.loads(value) if value else None

async def set_cached(key: str, value: Any, ttl: int = 300) -> None:
await redis_client.setex(key, ttl, json.dumps(value))

async def invalidate(key: str) -> None:
await redis_client.delete(key)

async def invalidate_pattern(pattern: str) -> None:
keys = await redis_client.keys(pattern)
if keys:
await redis_client.delete(*keys)
app/routers/products.py
from app.cache.redis_cache import get_cached, set_cached, invalidate_pattern

@router.get("/")
async def list_products(db: DBSession) -> list[dict]:
cache_key = "products:all"
cached = await get_cached(cache_key)
if cached:
return cached

products = await product_crud.list(db)
result = [ProductResponse.model_validate(p).model_dump() for p in products]
await set_cached(cache_key, result, ttl=300)
return result

@router.post("/", status_code=201)
async def create_product(body: ProductCreate, db: DBSession) -> dict:
product = await product_crud.create(db, **body.model_dump())
await invalidate_pattern("products:*") # Invalidate list cache
return ProductResponse.model_validate(product).model_dump()

In-Memory Cache for Small Datasets

For single-process deployments or tiny datasets that change infrequently:

app/cache/memory.py
import time
from typing import Any

class TTLCache:
def __init__(self):
self._store: dict[str, tuple[Any, float]] = {}

def get(self, key: str) -> Any | None:
if key in self._store:
value, expires_at = self._store[key]
if time.time() < expires_at:
return value
del self._store[key]
return None

def set(self, key: str, value: Any, ttl: int = 60) -> None:
self._store[key] = (value, time.time() + ttl)

def delete(self, key: str) -> None:
self._store.pop(key, None)

cache = TTLCache()
warning

In-memory cache is per-process. With multiple Gunicorn workers, each worker has its own cache — they don't share data. Use Redis for shared state.

Common Pitfalls

PitfallCause / SymptomFix
Rate limit bypassedRedis down, rate limit fails openAdd try/except in rate limiter — decide whether to fail open or closed
Cache stale after updateNot invalidating on writeAlways invalidate related cache keys on create/update/delete
Redis connection not pooledNew connection per requestUse redis.asyncio.from_url() once at startup
Race condition in cache updateTwo requests cache simultaneouslyUse Redis SET key value EX ttl NX for atomic set-if-not-exists
Sensitive data cachedUser-specific data cached globallyAlways include user ID in cache key for personalized data

Hands-On Practice

test-rate-limit.sh
uvicorn app.main:app --reload

# Make 25 rapid requests (limit is 20/min)
for i in $(seq 1 25); do
STATUS=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/search?q=test)
echo "Request $i: HTTP $STATUS"
done
# First 20 → 200, remaining → 429

What's Next