Tutorial: SQLAlchemy Integration¶
Learn how to integrate YokedCache with SQLAlchemy for high-performance database caching with intelligent invalidation patterns.
What You'll Learn¶
- How to cache SQLAlchemy queries effectively
- Different caching patterns for different use cases
- Automatic cache invalidation on database writes
- Performance optimization techniques
- Production-ready patterns
Prerequisites¶
# Install dependencies
pip install yokedcache[full] sqlalchemy psycopg2-binary
# Start Redis
docker run -d --name redis -p 6379:6379 redis:7
Basic SQLAlchemy Setup¶
First, let's create a standard SQLAlchemy setup:
# models.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Text, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
import os
# Database configuration
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./tutorial.db")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Models
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True)
email = Column(String(100), unique=True, index=True)
full_name = Column(String(100))
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), index=True)
content = Column(Text)
published = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Foreign keys
author_id = Column(Integer, ForeignKey("users.id"))
# Relationships
author = relationship("User", back_populates="posts")
# Create tables
Base.metadata.create_all(bind=engine)
# Session factory
def get_session():
session = SessionLocal()
try:
yield session
finally:
session.close()
Pattern 1: Function-Level Caching¶
Cache individual database queries using function decorators:
# cached_queries.py
from yokedcache import YokedCache, cached
from models import User, Post, get_session
from typing import List, Optional
# Initialize cache
cache = YokedCache()
@cached(ttl=600, tags=["users"])
async def get_user_by_id(user_id: int) -> Optional[dict]:
"""Get user by ID - cached for 10 minutes"""
with next(get_session()) as session:
user = session.query(User).filter(User.id == user_id).first()
if user:
return {
"id": user.id,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"is_active": user.is_active,
"created_at": user.created_at.isoformat()
}
return None
@cached(ttl=180, tags=["posts"])
async def get_published_posts(limit: int = 10) -> List[dict]:
"""Get published posts - cached for 3 minutes"""
with next(get_session()) as session:
posts = (session.query(Post)
.filter(Post.published == True)
.order_by(Post.created_at.desc())
.limit(limit)
.all())
return [{
"id": post.id,
"title": post.title,
"content": post.content[:200] + "..." if len(post.content) > 200 else post.content,
"created_at": post.created_at.isoformat(),
"author": {
"id": post.author.id,
"username": post.author.username,
"full_name": post.author.full_name
}
} for post in posts]
@cached(ttl=900, tags=["analytics"])
async def get_user_stats() -> dict:
"""Get user statistics - cached for 15 minutes"""
with next(get_session()) as session:
total_users = session.query(User).count()
active_users = session.query(User).filter(User.is_active == True).count()
total_posts = session.query(Post).count()
published_posts = session.query(Post).filter(Post.published == True).count()
return {
"total_users": total_users,
"active_users": active_users,
"total_posts": total_posts,
"published_posts": published_posts,
"calculated_at": datetime.utcnow().isoformat()
}
Pattern 2: Session-Level Caching¶
Cache at the session level for dependency injection:
# cached_sessions.py
from yokedcache import cached_dependency
from models import get_session
# Create cached session dependency
cached_get_session = cached_dependency(
get_session,
cache=cache,
ttl=300, # 5 minutes default
table_name="auto_detect" # Auto-detect table names from queries
)
# Table-specific cached sessions
users_cached_session = cached_dependency(
get_session,
cache=cache,
ttl=600, # 10 minutes for user queries
table_name="users"
)
posts_cached_session = cached_dependency(
get_session,
cache=cache,
ttl=180, # 3 minutes for post queries
table_name="posts"
)
Pattern 3: Repository Pattern with Caching¶
Implement the repository pattern with built-in caching:
# repositories.py
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from yokedcache import cached
from models import User, Post
from sqlalchemy.orm import Session
class BaseRepository(ABC):
def __init__(self, session: Session):
self.session = session
class UserRepository(BaseRepository):
@cached(ttl=600, tags=["users"])
async def get_by_id(self, user_id: int) -> Optional[User]:
"""Get user by ID with caching"""
return self.session.query(User).filter(User.id == user_id).first()
@cached(ttl=300, tags=["users"])
async def get_by_username(self, username: str) -> Optional[User]:
"""Get user by username with caching"""
return self.session.query(User).filter(User.username == username).first()
@cached(ttl=180, tags=["users"])
async def get_active_users(self, limit: int = 50) -> List[User]:
"""Get active users with caching"""
return (self.session.query(User)
.filter(User.is_active == True)
.limit(limit)
.all())
async def create(self, user_data: Dict[str, Any]) -> User:
"""Create user and invalidate cache"""
user = User(**user_data)
self.session.add(user)
await self.session.commit()
# Invalidate user-related cache
await cache.invalidate_tags(["users"])
return user
class PostRepository(BaseRepository):
@cached(ttl=300, tags=["posts"])
async def get_by_id(self, post_id: int) -> Optional[Post]:
"""Get post by ID with caching"""
return self.session.query(Post).filter(Post.id == post_id).first()
@cached(ttl=180, tags=["posts"])
async def get_published(self, limit: int = 10) -> List[Post]:
"""Get published posts with caching"""
return (self.session.query(Post)
.filter(Post.published == True)
.order_by(Post.created_at.desc())
.limit(limit)
.all())
async def create(self, post_data: Dict[str, Any]) -> Post:
"""Create post and invalidate cache"""
post = Post(**post_data)
self.session.add(post)
await self.session.commit()
# Invalidate post-related cache
await cache.invalidate_tags(["posts"])
return post
Cache Warming Strategies¶
Implement cache warming for frequently accessed data:
# cache_warming.py
import asyncio
from cached_queries import *
async def warm_user_cache(user_ids: List[int]):
"""Warm cache for specific users"""
print(f"Warming cache for {len(user_ids)} users...")
tasks = []
for user_id in user_ids:
tasks.append(get_user_by_id(user_id))
await asyncio.gather(*tasks)
print("User cache warmed successfully")
async def warm_popular_content():
"""Warm cache for popular content"""
print("Warming popular content cache...")
# Warm popular posts
await get_published_posts(limit=20)
# Warm user statistics
await get_user_stats()
print("Popular content cache warmed successfully")
async def full_cache_warm():
"""Perform full cache warming"""
print("Starting full cache warming...")
# Warm user cache for first 50 users
user_ids = list(range(1, 51))
await warm_user_cache(user_ids)
# Warm popular content
await warm_popular_content()
print("Full cache warming completed")
Performance Monitoring¶
Monitor cache performance and database query patterns:
# monitoring.py
import time
import asyncio
from contextlib import asynccontextmanager
class QueryPerformanceMonitor:
def __init__(self):
self.query_stats = {}
@asynccontextmanager
async def monitor_query(self, query_name: str):
"""Context manager to monitor query performance"""
start_time = time.time()
try:
yield
finally:
end_time = time.time()
execution_time = end_time - start_time
if query_name not in self.query_stats:
self.query_stats[query_name] = {
"total_calls": 0,
"total_time": 0,
"avg_time": 0
}
stats = self.query_stats[query_name]
stats["total_calls"] += 1
stats["total_time"] += execution_time
stats["avg_time"] = stats["total_time"] / stats["total_calls"]
def get_stats(self):
"""Get performance statistics"""
return self.query_stats
# Global monitor instance
monitor = QueryPerformanceMonitor()
async def performance_test():
"""Test cache performance vs database performance"""
print("Running performance tests...")
user_id = 1
# First call (cache miss)
async with monitor.monitor_query("cached_user_first_call"):
await get_user_by_id(user_id)
# Second call (cache hit)
async with monitor.monitor_query("cached_user_second_call"):
await get_user_by_id(user_id)
# Print statistics
stats = monitor.get_stats()
for query_name, query_stats in stats.items():
print(f"{query_name}: {query_stats['avg_time']:.4f}s avg")
Best Practices Summary¶
1. Cache TTL Strategy¶
- Hot data (frequently changing): 30-300 seconds
- Warm data (occasionally changing): 300-1800 seconds
- Cold data (rarely changing): 1800-3600 seconds
- Analytics data (expensive to compute): 900-3600 seconds
2. Cache Key Design¶
- Use descriptive function names for automatic key generation
- Include all relevant parameters in function signatures
- Consider using table-specific tags for easier invalidation
3. Invalidation Strategy¶
- Use
cached_dependency
for automatic invalidation on writes - Group related data with common tags
- Implement manual invalidation for complex scenarios
4. Performance Optimization¶
- Cache expensive queries and computations
- Monitor cache hit rates and adjust TTL values accordingly
- Implement cache warming for critical data
This SQLAlchemy integration tutorial demonstrates key patterns for implementing high-performance caching in database-driven applications.