Tutorial: FastAPI Integration

This tutorial walks through a complete FastAPI app with cached database queries, auto-invalidation on writes, and cache management endpoints.

Setup

pip install "yokedcache[full]" fastapi uvicorn sqlalchemy
docker run -d --name redis -p 6379:6379 redis:7

Step 1: Basic app

Start with a simple app and a database dependency:

# app.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from yokedcache import YokedCache, cached, cached_dependency
from yokedcache.config import CacheConfig

DATABASE_URL = "sqlite:///./app.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String, index=True)
    email = Column(String, unique=True)
    active = Column(Boolean, default=True)

Base.metadata.create_all(bind=engine)

app = FastAPI()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Step 2: Add caching

Replace get_db with a cached version. YokedCache wraps the dependency and automatically invalidates the cache when the session commits a write to the users table:

cache = YokedCache(CacheConfig(redis_url="redis://localhost:6379/0"))

# Cached dependency—reads are cached, writes invalidate the "table:users" tag
cached_get_db = cached_dependency(get_db, cache=cache, ttl=300, table_name="users")

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: Session = Depends(cached_get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.get("/users")
async def list_users(active_only: bool = True, db: Session = Depends(cached_get_db)):
    q = db.query(User)
    if active_only:
        q = q.filter(User.active == True)
    return q.all()

Step 3: Write operations

Any commit() through the cached session automatically invalidates the table:users tag, so readers see fresh data on the next request:

from pydantic import BaseModel
from typing import Optional

class UserCreate(BaseModel):
    name: str
    email: str

class UserUpdate(BaseModel):
    name: Optional[str] = None
    email: Optional[str] = None
    active: Optional[bool] = None

@app.post("/users")
async def create_user(user: UserCreate, db: Session = Depends(cached_get_db)):
    db_user = User(**user.dict())
    db.add(db_user)
    await db.commit()  # invalidates "table:users"
    return {"id": db_user.id}

@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserUpdate, db: Session = Depends(cached_get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")
    for k, v in user.dict(exclude_unset=True).items():
        setattr(db_user, k, v)
    await db.commit()  # invalidates "table:users"
    return db_user

@app.delete("/users/{user_id}")
async def delete_user(user_id: int, db: Session = Depends(cached_get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")
    db.delete(db_user)
    await db.commit()  # invalidates "table:users"
    return {"ok": True}

Step 4: Function-level caching

For expensive aggregations that aren't tied to a single table:

from yokedcache import cached

@cached(ttl=600, tags=["analytics"])
async def compute_user_stats(db: Session):
    total = db.query(User).count()
    active = db.query(User).filter(User.active == True).count()
    return {"total": total, "active": active, "inactive": total - active}

@app.get("/analytics/users")
async def user_analytics(db: Session = Depends(get_db)):
    return await compute_user_stats(db)

# Invalidate analytics separately when needed
@app.post("/analytics/invalidate")
async def invalidate_analytics():
    await cache.invalidate_tags(["analytics"])
    return {"ok": True}

Step 5: Cache management endpoints

These are handy for debugging and ops:

@app.get("/cache/stats")
async def cache_stats():
    stats = await cache.get_stats()
    return {"hit_rate": f"{stats.hit_rate:.1%}", "keys": stats.key_count}

@app.get("/cache/health")
async def cache_health():
    return {"healthy": await cache.health()}

@app.post("/cache/flush/users")
async def flush_users():
    await cache.invalidate_tags(["table:users"])
    return {"ok": True}

Step 6: Lifecycle management

Connect/disconnect cleanly using FastAPI's lifespan:

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    await cache.connect()
    yield
    await cache.disconnect()

app = FastAPI(lifespan=lifespan)

Testing

The memory backend makes tests fast and self-contained—no Redis needed:

import pytest
from fastapi.testclient import TestClient
from yokedcache import YokedCache, CacheConfig

@pytest.fixture
def test_cache():
    return YokedCache(CacheConfig())  # memory backend

@pytest.fixture
def client(test_cache):
    app.dependency_overrides[cache] = lambda: test_cache
    return TestClient(app)

Production config

config = CacheConfig(
    redis_url=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    default_ttl=300,
    key_prefix=os.getenv("CACHE_PREFIX", "myapp"),
    max_connections=50,
    enable_circuit_breaker=True,
    log_level="WARNING",
)

Use rediss:// and TLS for any Redis not on localhost. Store the URL in an env var or secret manager.

Search documentation

Type to search. Fuzzy matching handles typos.