Core Concepts
Understanding how YokedCache works will help you make good decisions about keys, TTLs, tags, and serialization—and debug things when they don't behave as expected.
Architecture
YokedCache is a thin wrapper that sits in front of a pluggable backend. Every read goes through the wrapper first; on a miss, it calls your function or query, stores the result, and returns it. On a hit, it skips the underlying call entirely.
Your app
│
▼
YokedCache wrapper
│ ├── cache hit? ─── yes ──▶ return stored value
│ └── cache miss? ─── no ──▶ call function/query ──▶ store result ──▶ return
│
▼
Backend (Memory / Redis / Memcached / Disk / SQLite)
The same wrapper API works across all backends. Switching from memory to Redis doesn't change your application code.
Cache keys
Automatic key generation
When you use @cached or cached_dependency, keys are generated automatically from:
- The configured
key_prefix(default:"yokedcache") - The function name or table name
- A stable hash of the arguments
yokedcache:get_user:a3f8c2d1 ← prefix:function:hash_of_args
yokedcache:table:users:b9e4f721
Different arguments produce different hashes, so get_user(1) and get_user(2) never collide.
Manual keys
For cache.set() / cache.get(), you pass the key directly:
await cache.set("user:42", {"name": "Alice"}, ttl=300)
user = await cache.get("user:42") # returns the dict or None
Key prefix
The prefix namespaces all keys. This prevents collisions between different apps or environments sharing the same Redis database:
config = CacheConfig(key_prefix="prod_myapp") # all keys: "prod_myapp:..."
config = CacheConfig(key_prefix="staging_myapp")
Set it per environment via YOKEDCACHE_KEY_PREFIX.
Key sanitization
Keys are automatically sanitized before storage—non-ASCII characters are encoded, length is capped, and dangerous patterns are removed. You generally don't need to think about this, but it means keys with special characters may look slightly different in Redis than what you passed in.
TTL and expiration
Every cache entry has a TTL (time-to-live) in seconds. After that time, the entry expires and the next read is a miss.
TTL priority
When multiple TTL sources are configured, the most specific wins:
Explicit TTL on the call (highest priority)
└── Table-specific TTL in CacheConfig
└── Global default_ttl in CacheConfig
└── Backend default (lowest priority)
# Global default
config = CacheConfig(default_ttl=300)
# Per-table override
config = CacheConfig(
default_ttl=300,
tables={"users": TableCacheConfig(ttl=3600)},
)
# Per-call override (highest priority)
await cache.set("key", value, ttl=60)
Jitter
YokedCache adds random jitter to every TTL (default: ±10%). A 300s TTL becomes something between 270–330s.
This prevents the thundering herd problem: if many cache entries expire at exactly the same time, every entry becomes a miss simultaneously and your database gets flooded with requests at once. Jitter spreads out the expirations.
# Disable jitter if you need exact TTLs (not recommended for high-traffic systems)
config = CacheConfig(ttl_jitter_percent=0)
Choosing TTL values
| Data type | Suggested TTL |
|---|---|
| User sessions | 15–60 minutes |
| User profiles | 5–60 minutes |
| Product catalog | 1–24 hours |
| Config / feature flags | 5–30 minutes |
| Search results | 1–5 minutes |
| Aggregations / analytics | 10–60 minutes |
| Reference data (countries, categories) | 24 hours+ |
Hot data that changes often → short TTL. Stable reference data → long TTL.
Tags
Tags let you group related cache entries and invalidate them together, regardless of their keys.
Setting tags
# Manual set
await cache.set(
"product:1",
product_data,
ttl=600,
tags=["products", "category:electronics", "tenant:acme"],
)
# Via decorator
@cached(cache=cache, ttl=300, tags=["users", "api_v2"])
async def get_user(user_id: int):
...
Invalidating by tag
# All entries tagged "products" are invalidated
await cache.invalidate_tags(["products"])
# Multiple tags—any entry with ANY of these tags is invalidated
await cache.invalidate_tags(["category:electronics", "tenant:acme"])
Automatic tagging (cached_dependency)
When you use cached_dependency(get_db, table_name="users"), YokedCache automatically:
- Tags all reads with
"table:users" - Listens for
commit()calls on the session - Calls
invalidate_tags(["table:users"])on commit
You never have to manually track what to invalidate after a write.
Tag design patterns
# Per-table tags (automatic with cached_dependency)
"table:users"
"table:products"
# Per-entity tags (for fine-grained invalidation)
f"user:{user_id}"
f"product:{product_id}"
# Feature tags (for grouped invalidation)
"search_results"
"homepage_data"
"analytics"
# Tenant tags (for multi-tenant isolation)
f"tenant:{tenant_id}"
Invalidation patterns
Beyond tags, YokedCache supports two other invalidation strategies:
Pattern-based
Invalidates all keys matching a glob pattern:
await cache.invalidate_pattern("user:*") # all user keys
await cache.invalidate_pattern("session:temp:*") # temporary sessions
await cache.invalidate_pattern("*:stale") # anything marked stale
Note: Pattern invalidation on Redis uses
SCAN+DEL, which can be slow if you have millions of keys. Prefer tag-based invalidation for high-traffic systems.
Manual delete
await cache.delete("user:42")
await cache.delete_many(["user:42", "user:43", "user:44"])
Serialization
Values are serialized before storage and deserialized on read. Three built-in methods:
JSON (default)
Best for simple data types. Portable across languages and tools. YokedCache's JSON encoder handles common Python types automatically:
| Python type | JSON representation |
|---|---|
datetime |
ISO 8601 string |
date |
ISO 8601 string |
Decimal |
String |
UUID |
String |
set |
Array |
bytes |
Base64 string |
from datetime import datetime
from decimal import Decimal
from uuid import uuid4
data = {
"id": uuid4(),
"price": Decimal("99.99"),
"created_at": datetime.now(),
"tags": {"featured", "sale"},
}
await cache.set("item", data) # serializes transparently
result = await cache.get("item") # deserializes back
Pickle
Supports any Python object. Use when you need to cache complex ORM objects, custom classes, or anything that isn't JSON-serializable.
from yokedcache.models import SerializationMethod
await cache.set("session", session_obj, serialization=SerializationMethod.PICKLE)
Security: Only use pickle with backends you fully control (private Redis, local memory). Anyone who can write to your cache backend can execute arbitrary code via pickle. See Security.
MessagePack
Compact binary format. Faster than JSON for large or deeply nested data, and more space-efficient.
await cache.set("bulk_data", large_dict, serialization=SerializationMethod.MSGPACK)
Requires pip install "yokedcache[backends]" or pip install msgpack.
Setting serialization
# Per call
await cache.set("key", value, serialization=SerializationMethod.PICKLE)
# Per table (via CacheConfig)
config = CacheConfig(
tables={
"sessions": TableCacheConfig(serialization_method=SerializationMethod.PICKLE),
"products": TableCacheConfig(serialization_method=SerializationMethod.MSGPACK),
}
)
# Global default
config = CacheConfig(default_serialization=SerializationMethod.JSON)
Error handling and resilience
Graceful degradation
By default, cache failures don't crash your app. If cache.get() throws, the decorated function still runs and returns a real result. The error is logged but not re-raised.
@cached(cache=cache, ttl=300)
async def get_data():
# If Redis is down, this function still runs normally
return await fetch_from_database()
Set fallback_enabled=False on CacheConfig if you want cache errors to propagate.
Circuit breaker
For sustained failures (Redis is fully down), the circuit breaker prevents your app from repeatedly trying the cache and failing:
config = CacheConfig(
enable_circuit_breaker=True,
circuit_breaker_failure_threshold=5, # open after 5 consecutive failures
circuit_breaker_timeout=60.0, # try again after 60 seconds
)
States:
- Closed — normal operation
- Open — cache bypassed, all requests go to the underlying function
- Half-open — testing if the backend has recovered
Retries
config = CacheConfig(
connection_retries=3,
retry_delay=0.1, # seconds (with exponential backoff)
)
Connection lifecycle
Always connect before using the cache and disconnect when done:
# Script usage
cache = YokedCache(CacheConfig())
asyncio.run(cache.connect())
# ... use cache ...
asyncio.run(cache.disconnect())
In FastAPI, use the lifespan context manager:
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
await cache.connect()
yield
await cache.disconnect()
app = FastAPI(lifespan=lifespan)
Async vs sync
| Context | Use |
|---|---|
| Inside FastAPI, Starlette, Django async views, asyncio | await cache.get(...), await cache.set(...) |
| Scripts, blocking functions, sync code | cache.get_sync(...), cache.set_sync(...) |
def functions |
@cached (auto-detects sync) |
The sync helpers internally run asyncio.run(), which creates a new event loop per call. This works fine for occasional use (scripts, startup tasks) but has overhead in tight loops. Prefer await in any context that already runs an event loop.
Available sync methods: get_sync, set_sync, delete_sync, exists_sync, invalidate_tags_sync, invalidate_pattern_sync.