API Reference
Complete reference for YokedCache, CacheConfig, decorators, and supporting types.
YokedCache
The main class. Create one instance per application and reuse it.
from yokedcache import YokedCache
from yokedcache.config import CacheConfig
cache = YokedCache(config: CacheConfig)
Constructors
# From CacheConfig
cache = YokedCache(CacheConfig(...))
# From environment variables
cache = YokedCache.from_env()
# From YAML file
cache = YokedCache.from_yaml("cache.yaml")
Lifecycle
| Method | Description |
|---|---|
await cache.connect() |
Connect to the backend. Must be called before any cache operations. |
await cache.disconnect() |
Close all connections gracefully. |
Core operations
get
value = await cache.get(key: str) -> Any | None
Returns the cached value or None if missing or expired.
user = await cache.get("user:42")
if user is None:
user = await db.fetch_user(42)
set
await cache.set(
key: str,
value: Any,
ttl: int | None = None,
tags: list[str] | set[str] | None = None,
serialization: SerializationMethod | None = None,
) -> bool
Stores a value. Returns True on success.
await cache.set("user:42", {"name": "Alice"}, ttl=300, tags=["users"])
delete
await cache.delete(key: str) -> bool
Deletes a key. Returns True if the key existed.
exists
exists = await cache.exists(key: str) -> bool
Checks if a key exists without fetching the value.
ttl
remaining = await cache.ttl(key: str) -> int | None
Returns remaining TTL in seconds, or None if the key doesn't exist.
expire
await cache.expire(key: str, ttl: int) -> bool
Updates the TTL of an existing key without changing its value.
get_or_set
value = await cache.get_or_set(
key: str,
factory: Callable[[], Awaitable[Any]],
ttl: int | None = None,
tags: list[str] | None = None,
) -> Any
Returns the cached value if present; otherwise calls factory(), caches the result, and returns it.
user = await cache.get_or_set(
key=f"user:{user_id}",
factory=lambda: db.fetch_user(user_id),
ttl=300,
tags=["users"],
)
Batch operations
get_many
results = await cache.get_many(keys: list[str]) -> dict[str, Any | None]
Returns a dict mapping each key to its value (or None if missing).
results = await cache.get_many(["user:1", "user:2", "user:3"])
# {"user:1": {...}, "user:2": {...}, "user:3": None}
set_many
await cache.set_many(
mapping: dict[str, Any],
ttl: int | None = None,
tags: list[str] | None = None,
) -> bool
await cache.set_many({"user:1": u1, "user:2": u2}, ttl=300, tags=["users"])
delete_many
await cache.delete_many(keys: list[str]) -> int
Deletes multiple keys. Returns count of deleted keys.
Invalidation
invalidate_tags
await cache.invalidate_tags(tags: list[str]) -> int
Invalidates all entries tagged with any of the given tags. Returns the count of invalidated entries.
await cache.invalidate_tags(["users", "table:users"])
invalidate_pattern
await cache.invalidate_pattern(pattern: str) -> int
Invalidates all keys matching a glob pattern. Returns the count of invalidated entries.
await cache.invalidate_pattern("user:*")
await cache.invalidate_pattern("session:temp:*")
flush_all
await cache.flush_all() -> bool
Clears all keys under the configured key_prefix. Does not flush the entire Redis database.
Search
fuzzy_search
results = await cache.fuzzy_search(
query: str,
threshold: int = 80, # similarity score 0–100
max_results: int = 100,
tags: set[str] | None = None,
) -> list[FuzzySearchResult]
Requires yokedcache[fuzzy].
results = await cache.fuzzy_search("alice", threshold=75, tags={"users"})
for r in results:
print(r.key, r.score, r.value)
FuzzySearchResult fields: key: str, score: int, value: Any.
Key inspection
get_keys_by_pattern
keys = await cache.get_keys_by_pattern(
pattern: str,
limit: int = 1000,
) -> list[str]
get_all_keys
keys = await cache.get_all_keys(limit: int = 1000) -> list[str]
Returns all keys under the configured prefix.
get_meta
meta = await cache.get_meta(key: str) -> dict | None
Returns metadata about a key: TTL, tags, size, serialization method.
Monitoring
health
is_healthy = await cache.health() -> bool
detailed_health_check
info = await cache.detailed_health_check() -> dict
Returns:
{
"status": "healthy", # or "unhealthy"
"backend_type": "redis",
"redis_connected": True,
"connection_pool": {
"available": 48,
"in_use": 2,
"max": 50,
},
"circuit_breaker": {
"state": "closed",
"failure_count": 0,
},
"hit_rate": 0.87,
"uptime_seconds": 3600,
}
get_stats
stats = await cache.get_stats() -> CacheStats
CacheStats fields:
| Field | Type | Description |
|---|---|---|
hit_rate |
float |
0.0–1.0 |
miss_rate |
float |
0.0–1.0 |
total_operations |
int |
Total get+set+delete count |
cache_hits |
int |
Cumulative hits |
cache_misses |
int |
Cumulative misses |
key_count |
int |
Current number of keys |
memory_usage_mb |
float |
Approximate memory usage |
uptime_seconds |
int |
Seconds since connect |
Sync equivalents
All async methods have *_sync counterparts:
cache.get_sync(key)
cache.set_sync(key, value, ttl, tags)
cache.delete_sync(key)
cache.exists_sync(key)
cache.get_many_sync(keys)
cache.set_many_sync(mapping, ttl, tags)
cache.delete_many_sync(keys)
cache.invalidate_tags_sync(tags)
cache.invalidate_pattern_sync(pattern)
cache.flush_all_sync()
Don't call these from inside a running event loop.
CacheConfig
All configuration lives here. Pass it to YokedCache(config).
from yokedcache.config import CacheConfig, TableCacheConfig
Core
| Parameter | Type | Default | Env var |
|---|---|---|---|
redis_url |
str \| None |
None (memory) |
YOKEDCACHE_REDIS_URL |
default_ttl |
int |
300 |
YOKEDCACHE_DEFAULT_TTL |
key_prefix |
str |
"yokedcache" |
YOKEDCACHE_KEY_PREFIX |
default_serialization |
SerializationMethod |
JSON |
YOKEDCACHE_DEFAULT_SERIALIZATION |
ttl_jitter_percent |
float |
10.0 |
— |
Connection
| Parameter | Type | Default | Env var |
|---|---|---|---|
max_connections |
int |
50 |
YOKEDCACHE_MAX_CONNECTIONS |
connection_timeout |
int |
30 |
YOKEDCACHE_CONNECTION_TIMEOUT |
connection_pool_kwargs |
dict |
{} |
— |
Resilience
| Parameter | Type | Default | Env var |
|---|---|---|---|
fallback_enabled |
bool |
True |
YOKEDCACHE_FALLBACK_ENABLED |
connection_retries |
int |
3 |
YOKEDCACHE_CONNECTION_RETRIES |
retry_delay |
float |
0.1 |
— |
enable_circuit_breaker |
bool |
False |
YOKEDCACHE_ENABLE_CIRCUIT_BREAKER |
circuit_breaker_failure_threshold |
int |
5 |
— |
circuit_breaker_timeout |
float |
60.0 |
— |
Features
| Parameter | Type | Default | Env var |
|---|---|---|---|
enable_fuzzy |
bool |
False |
YOKEDCACHE_ENABLE_FUZZY |
fuzzy_threshold |
int |
80 |
YOKEDCACHE_FUZZY_THRESHOLD |
enable_compression |
bool |
False |
YOKEDCACHE_ENABLE_COMPRESSION |
compression_threshold |
int |
1024 |
— |
Memory backend
| Parameter | Type | Default |
|---|---|---|
memory_max_size |
int |
10000 |
memory_cleanup_interval |
int |
300 |
Monitoring
| Parameter | Type | Default | Env var |
|---|---|---|---|
enable_metrics |
bool |
False |
YOKEDCACHE_ENABLE_METRICS |
prometheus_port |
int |
8000 |
YOKEDCACHE_PROMETHEUS_PORT |
statsd_host |
str \| None |
None |
YOKEDCACHE_STATSD_HOST |
statsd_port |
int |
8125 |
YOKEDCACHE_STATSD_PORT |
log_level |
str |
"INFO" |
YOKEDCACHE_LOG_LEVEL |
Per-table config
config = CacheConfig(
tables={
"users": TableCacheConfig(
ttl=3600,
tags={"user_data"},
serialization_method=SerializationMethod.JSON,
enable_fuzzy=True,
fuzzy_threshold=85,
enable_compression=False,
),
}
)
TableCacheConfig accepts: ttl, tags, serialization_method, enable_fuzzy, fuzzy_threshold, enable_compression, compression_threshold, query_specific_ttls.
Decorators
@cached
from yokedcache import cached
@cached(
cache: YokedCache,
ttl: int | None = None,
tags: list[str] | None = None,
cache_key_prefix: str | None = None,
serialization: SerializationMethod | None = None,
single_flight: bool = False,
serve_stale_on_error: bool = False,
stale_ttl: int = 0,
)
Works on async def and plain def. The cache key is derived from the function name and all arguments.
@cached(cache=cache, ttl=300, tags=["users"])
async def get_user(user_id: int) -> dict:
...
# Access the underlying unwrapped function
raw = get_user.__wrapped__
# Inspect the generated key without calling the function
key = get_user.cache_key(user_id=42)
cached_dependency
from yokedcache import cached_dependency
cached_dep = cached_dependency(
dependency: Callable,
cache: YokedCache,
ttl: int = 300,
table_name: str | None = None,
tags: list[str] | None = None,
)
Returns a FastAPI-compatible dependency that caches the dependency's return value and auto-invalidates on session commits.
cached_get_db = cached_dependency(
get_db,
cache=cache,
ttl=300,
table_name="users",
)
SerializationMethod
from yokedcache.models import SerializationMethod
SerializationMethod.JSON # default, portable, handles common Python types
SerializationMethod.PICKLE # any Python object; requires trusted storage
SerializationMethod.MSGPACK # binary, compact; requires pip install msgpack
Exceptions
from yokedcache.exceptions import (
CacheError, # base exception
CacheConnectionError, # backend connection failure
CacheSerializationError, # serialization/deserialization failure
CacheKeyError, # invalid key format
ConfigValidationError, # invalid CacheConfig
)
All inherit from CacheError. When fallback_enabled=True (the default), these are caught internally and logged—they don't propagate to your application code unless you set fallback_enabled=False.