Usage Patterns

The core ways to read and write cache entries. For invalidation strategies, see Invalidation. For resilience patterns (circuit breaker, SWR, stale-if-error), see Resilience.


Decorator caching (@cached)

The simplest way to add caching. Works on both async def and plain def:

from yokedcache import cached, YokedCache
from yokedcache.config import CacheConfig

cache = YokedCache(CacheConfig())

@cached(cache=cache, ttl=300, tags=["users"])
async def get_user(user_id: int):
    return await db.fetch_user(user_id)

# First call hits the database
user = await get_user(42)

# Same arguments → cache hit, no DB call
user = await get_user(42)

# Different arguments → separate cache entry, DB hit
other = await get_user(99)

All decorator options

@cached(
    cache=cache,                              # YokedCache instance (required)
    ttl=300,                                  # seconds until expiry
    tags=["users", "api_v2"],                # tags for group invalidation
    cache_key_prefix="api",                  # override the key namespace
    serialization=SerializationMethod.JSON,  # JSON (default), PICKLE, MSGPACK
    single_flight=True,                      # coalesce concurrent misses
    serve_stale_on_error=True,               # return stale value if backend fails
    stale_ttl=60,                            # extra seconds to keep stale value
)
async def get_user(user_id: int, include_prefs: bool = False):
    ...

Sync functions

@cached(cache=cache, ttl=600)
def load_config() -> dict:
    return json.load(open("config.json"))

config = load_config()   # first call: reads file
config = load_config()   # second call: from cache

Bypassing the cache

Call the underlying function directly when you need fresh data:

# These skip the cache:
result = await get_user.__wrapped__(42)
result = get_user.__wrapped__(42)  # sync version

# Or pass a flag through:
@cached(cache=cache, ttl=300)
async def get_user(user_id: int, bypass: bool = False):
    if bypass:
        return await get_user.__wrapped__(user_id)
    ...

Cache key inspection

# See what key would be generated for given args
key = get_user.cache_key(42, include_prefs=False)
print(key)  # "yokedcache:get_user:a3f8c2d1..."

Manual operations

For cases where you need direct control over keys, values, or options.

get / set / delete

# Set a value
await cache.set("user:42", {"name": "Alice", "email": "alice@example.com"}, ttl=300)

# Get a value (returns None if missing or expired)
user = await cache.get("user:42")

# Get with a default
user = await cache.get("user:42") or {"name": "Guest"}

# Delete
await cache.delete("user:42")

# Check existence (does not reset TTL)
exists = await cache.exists("user:42")  # True or False

Batch operations

Batch operations use pipelining internally for efficiency:

# Set many at once
await cache.set_many(
    {
        "user:1": {"name": "Alice"},
        "user:2": {"name": "Bob"},
        "user:3": {"name": "Charlie"},
    },
    ttl=300,
    tags=["users"],
)

# Get many at once — returns a dict keyed by the input keys
results = await cache.get_many(["user:1", "user:2", "user:3"])
# {"user:1": {...}, "user:2": {...}, "user:3": None}  ← None if missing

# Delete many at once
await cache.delete_many(["user:1", "user:2"])

Get-or-set pattern

A common pattern: check the cache, fall back to the source, store the result:

async def get_user_cached(user_id: int):
    key = f"user:{user_id}"
    user = await cache.get(key)
    if user is None:
        user = await db.fetch_user(user_id)
        await cache.set(key, user, ttl=300)
    return user

Or use get_or_set if available:

user = await cache.get_or_set(
    key=f"user:{user_id}",
    factory=lambda: db.fetch_user(user_id),
    ttl=300,
)

TTL inspection

# Remaining TTL in seconds (None if key doesn't exist)
ttl = await cache.ttl("user:42")
print(f"Expires in {ttl}s")

# Refresh TTL without changing the value
await cache.expire("user:42", ttl=600)

FastAPI integration

Dependency caching

cached_dependency wraps a FastAPI dependency. Reads are cached; writes auto-invalidate:

from fastapi import FastAPI, Depends
from yokedcache import YokedCache, cached_dependency
from yokedcache.config import CacheConfig
from contextlib import asynccontextmanager

cache = YokedCache(CacheConfig(redis_url="redis://localhost:6379/0"))

@asynccontextmanager
async def lifespan(app: FastAPI):
    await cache.connect()
    yield
    await cache.disconnect()

app = FastAPI(lifespan=lifespan)

# Wrap the dependency—table_name controls which tag to invalidate on commit
cached_get_db = cached_dependency(get_db, cache=cache, ttl=300, table_name="users")

@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(cached_get_db)):
    return db.query(User).filter(User.id == user_id).first()

@app.post("/users")
async def create_user(data: UserCreate, db=Depends(cached_get_db)):
    user = User(**data.dict())
    db.add(user)
    await db.commit()  # ← this invalidates "table:users" automatically
    return user

Multiple table dependencies

# Different dependencies for different tables
cached_users_db = cached_dependency(get_db, cache=cache, ttl=300, table_name="users")
cached_orders_db = cached_dependency(get_db, cache=cache, ttl=60, table_name="orders")

@app.get("/users/{user_id}/orders")
async def get_user_orders(
    user_id: int,
    users_db=Depends(cached_users_db),
    orders_db=Depends(cached_orders_db),
):
    user = users_db.query(User).filter(User.id == user_id).first()
    orders = orders_db.query(Order).filter(Order.user_id == user_id).all()
    return {"user": user, "orders": orders}

Caching non-database dependencies

cached_dependency works on any FastAPI dependency, not just databases:

def get_feature_flags():
    return load_flags_from_remote()

cached_flags = cached_dependency(get_feature_flags, cache=cache, ttl=60)

@app.get("/features")
async def features(flags=Depends(cached_flags)):
    return flags

Route-level caching with @cached

@app.get("/products/top")
@cached(cache=cache, ttl=300, tags=["products"])
async def top_products():
    return await db.fetch_top_products(limit=20)

Listing and inspecting keys

# All keys matching a pattern
keys = await cache.get_keys_by_pattern("user:*")

# All keys (use with care on large caches)
all_keys = await cache.get_all_keys()

# Key metadata
meta = await cache.get_meta("user:42")
# {"key": "user:42", "ttl": 247, "tags": ["users"], "size_bytes": 128}

From the CLI:

yokedcache list --pattern "user:*"
yokedcache list --tags users
yokedcache list --include-values --format json

Stats and health

# In-process stats
stats = await cache.get_stats()
print(f"Hit rate:   {stats.hit_rate:.1%}")
print(f"Keys:       {stats.key_count}")
print(f"Memory:     {stats.memory_usage_mb:.1f} MB")
print(f"Total ops:  {stats.total_operations}")

# Health check
is_healthy = await cache.health()              # bool
details = await cache.detailed_health_check()  # dict with connection, pool, etc.

From the CLI:

yokedcache stats
yokedcache stats --watch          # live refresh every 2s
yokedcache stats --format json    # machine-readable

Find keys by approximate match (requires yokedcache[fuzzy]):

results = await cache.fuzzy_search(
    query="alice",
    threshold=80,        # similarity score 0–100 (default: 80)
    max_results=10,
    tags={"users"},      # restrict search to this tag
)
for r in results:
    print(r.key, r.score, r.value)

CLI:

yokedcache search "alice" --threshold 80 --tags users

See Vector Search for semantic similarity search.


Cache warming

Pre-populate the cache before traffic arrives. Avoids cold-start misses after a deploy.

from yokedcache import warm_cache

await warm_cache(cache, [
    {"key": "config:global", "value": await fetch_config(), "ttl": 3600},
    {"key": "categories", "value": await fetch_categories(), "ttl": 7200},
])

Or run concurrently:

import asyncio

async def warm():
    top_ids = await db.get_top_user_ids(limit=100)
    await asyncio.gather(*[get_user(uid) for uid in top_ids])

asyncio.run(warm())

CLI:

yokedcache warm --config-file warming.yaml --verbose

HTTP cache middleware

Add ETag / Cache-Control headers at the HTTP layer (requires yokedcache[web]):

pip install "yokedcache[web]"
from yokedcache.middleware import HTTPCacheMiddleware

app.add_middleware(
    HTTPCacheMiddleware,
    cache=cache,
    ttl=60,
    # key_builder is required for authenticated routes to avoid leaking responses
    key_builder=lambda req: f"{req.url}:{req.headers.get('x-user-id', 'anon')}",
)

See Middleware for the full reference.


Sync equivalents

Every async method has a *_sync counterpart for use in blocking contexts:

Async Sync
await cache.get(key) cache.get_sync(key)
await cache.set(key, val, ttl) cache.set_sync(key, val, ttl)
await cache.delete(key) cache.delete_sync(key)
await cache.exists(key) cache.exists_sync(key)
await cache.invalidate_tags([...]) cache.invalidate_tags_sync([...])
await cache.invalidate_pattern(p) cache.invalidate_pattern_sync(p)
await cache.get_many(keys) cache.get_many_sync(keys)
await cache.set_many({...}) cache.set_many_sync({...})

Don't call *_sync from inside a running event loop. Use await there instead.

Search documentation

Type to search. Fuzzy matching handles typos.