Production Monitoring & Health Checks¶
YokedCache v0.2.1 includes comprehensive monitoring, health checking, and metrics collection capabilities for production environments. Monitor cache performance, track detailed metrics, and set up alerts using industry-standard tools like Prometheus and StatsD.
Table of Contents¶
- Overview
- Health Checks
- Metrics Collection
- Metrics Collectors
- Prometheus Integration
- StatsD Integration
- Custom Metrics
- Dashboards and Alerting
- Performance Monitoring
- Real-time Performance Tracking
- Alerting and Notifications
Overview¶
The monitoring system provides real-time insights into your cache performance through multiple metrics collectors that can run simultaneously. Whether you're using Prometheus for metrics collection or StatsD for real-time monitoring, YokedCache adapts to your infrastructure.
Key Features¶
- Multiple Collectors: Run Prometheus and StatsD simultaneously
- Cache Metrics: Hit rates, miss rates, operation latency, memory usage
- System Metrics: Connection health, error rates, throughput
- Custom Metrics: Add your own application-specific metrics
- Zero Configuration: Works out of the box with sensible defaults
- Production Ready: Designed for high-performance production environments
Available Metrics¶
Metric | Type | Description |
---|---|---|
cache.gets.total |
Counter | Total number of GET operations |
cache.sets.total |
Counter | Total number of SET operations |
cache.deletes.total |
Counter | Total number of DELETE operations |
cache.hits.total |
Counter | Total number of cache hits |
cache.misses.total |
Counter | Total number of cache misses |
cache.hit_rate |
Gauge | Current cache hit rate (0-1) |
cache.size_bytes |
Gauge | Current memory usage in bytes |
cache.keys_count |
Gauge | Current number of cached keys |
cache.operation_duration |
Histogram | Operation latency distribution |
cache.invalidations.total |
Counter | Total number of invalidations |
Health Checks¶
Basic Health Check¶
from yokedcache import YokedCache
cache = YokedCache()
# Simple health check
is_healthy = await cache.health()
print(f"Cache is healthy: {is_healthy}")
Detailed Health Check (v0.2.1+)¶
Get comprehensive health information including connection status, pool statistics, and performance metrics:
# Detailed health check with full diagnostics
health_info = await cache.detailed_health_check()
print(f"Status: {health_info['status']}")
print(f"Redis connected: {health_info['redis_connected']}")
print(f"Connection pool: {health_info['connection_pool']}")
print(f"Circuit breaker: {health_info['circuit_breaker']}")
print(f"Performance metrics: {health_info['performance_metrics']}")
Metrics Collection¶
Enabling Metrics¶
from yokedcache import YokedCache, CacheConfig
config = CacheConfig(
enable_metrics=True,
metrics_retention_days=7
)
cache = YokedCache(config=config)
cache.start_metrics_collection()
Accessing Metrics¶
# Get current metrics snapshot
metrics = cache.get_comprehensive_metrics()
print(f"Hit rate: {metrics.hit_rate:.2%}")
print(f"Average response time: {metrics.avg_response_time:.3f}s")
print(f"Total operations: {metrics.total_operations}")
Metrics Collectors¶
Basic Setup¶
from yokedcache import YokedCache, CacheConfig
from yokedcache.backends import RedisBackend
from yokedcache.monitoring import CacheMetrics, PrometheusCollector, StatsDCollector
# Setup backend
backend = RedisBackend(redis_url="redis://localhost:6379/0")
# Setup monitoring
metrics = CacheMetrics([
PrometheusCollector(namespace="myapp", port=8000),
StatsDCollector(host="localhost", port=8125, prefix="myapp.cache")
])
# Create cache with monitoring
config = CacheConfig(backend=backend, metrics=metrics)
cache = YokedCache(config)
No-Op Collector (Default)¶
from yokedcache.monitoring import CacheMetrics, NoOpCollector
# Default behavior - no metrics collection
metrics = CacheMetrics() # Uses NoOpCollector by default
# Explicit no-op collector
metrics = CacheMetrics([NoOpCollector()])
The NoOpCollector allows you to disable metrics collection without changing your application code.
Prometheus Integration¶
Installation¶
# Install Prometheus dependencies
pip install yokedcache[monitoring]
# Or install manually
pip install prometheus_client
Basic Configuration¶
from yokedcache.monitoring import PrometheusCollector
# Basic Prometheus setup
prometheus_collector = PrometheusCollector(
namespace="yokedcache", # Metric prefix
port=8000, # HTTP port for metrics endpoint
registry=None # Use default registry
)
# Custom configuration
prometheus_collector = PrometheusCollector(
namespace="myapp_cache",
port=9090,
subsystem="backend", # Additional prefix
labels={"environment": "production", "service": "api"}
)
Metrics Endpoint¶
The Prometheus collector automatically exposes metrics on the specified port:
# View metrics
curl http://localhost:8000/metrics
# Example output:
# HELP yokedcache_cache_gets_total Total number of cache GET operations
# TYPE yokedcache_cache_gets_total counter
yokedcache_cache_gets_total{result="hit"} 1247.0
yokedcache_cache_gets_total{result="miss"} 153.0
# HELP yokedcache_cache_hit_rate Current cache hit rate
# TYPE yokedcache_cache_hit_rate gauge
yokedcache_cache_hit_rate 0.89
# HELP yokedcache_cache_operation_duration_seconds Cache operation duration
# TYPE yokedcache_cache_operation_duration_seconds histogram
yokedcache_cache_operation_duration_seconds_bucket{operation="get",le="0.001"} 1024.0
yokedcache_cache_operation_duration_seconds_bucket{operation="get",le="0.01"} 1389.0
Custom Labels and Registry¶
from prometheus_client import CollectorRegistry
from yokedcache.monitoring import PrometheusCollector
# Custom registry for isolation
custom_registry = CollectorRegistry()
collector = PrometheusCollector(
namespace="myapp",
port=8001,
registry=custom_registry,
labels={
"environment": "production",
"region": "us-east-1",
"service": "user-service"
}
)
StatsD Integration¶
Installation¶
# Install StatsD dependencies
pip install yokedcache[monitoring]
# Or install manually
pip install statsd
Basic Configuration¶
from yokedcache.monitoring import StatsDCollector
# Basic StatsD setup
statsd_collector = StatsDCollector(
host="localhost",
port=8125,
prefix="yokedcache"
)
# Advanced configuration
statsd_collector = StatsDCollector(
host="statsd.example.com",
port=8125,
prefix="myapp.cache",
sample_rate=1.0, # Sample all metrics
timeout=5.0 # Socket timeout
)
DataDog Integration¶
# DataDog StatsD configuration
statsd_collector = StatsDCollector(
host="localhost",
port=8125,
prefix="myapp.cache",
use_tags=True # Enable DataDog-style tags
)
Metric Examples¶
StatsD metrics are sent in real-time:
# Counter metrics
myapp.cache.gets:1|c|#result:hit
myapp.cache.gets:1|c|#result:miss
myapp.cache.sets:1|c
# Gauge metrics
myapp.cache.hit_rate:0.89|g
myapp.cache.size_bytes:1048576|g
myapp.cache.keys_count:1500|g
# Histogram metrics
myapp.cache.operation_duration:0.002|h|#operation:get
myapp.cache.operation_duration:0.005|h|#operation:set
Custom Metrics¶
Adding Custom Collectors¶
from yokedcache.monitoring import CacheMetrics
import asyncio
class CustomMetricsCollector:
"""Custom metrics collector example."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def increment(self, metric: str, value: float = 1, tags: dict = None):
"""Send increment to webhook."""
data = {
"type": "increment",
"metric": metric,
"value": value,
"tags": tags or {},
"timestamp": time.time()
}
# Send to webhook (implementation depends on your system)
await self._send_webhook(data)
async def gauge(self, metric: str, value: float, tags: dict = None):
"""Send gauge to webhook."""
data = {
"type": "gauge",
"metric": metric,
"value": value,
"tags": tags or {},
"timestamp": time.time()
}
await self._send_webhook(data)
async def histogram(self, metric: str, value: float, tags: dict = None):
"""Send histogram to webhook."""
# Implementation for histogram metrics
pass
async def timing(self, metric: str, value: float, tags: dict = None):
"""Send timing to webhook."""
# Implementation for timing metrics
pass
async def _send_webhook(self, data: dict):
"""Send data to webhook endpoint."""
# Implement webhook sending logic
pass
# Use custom collector
custom_collector = CustomMetricsCollector("https://metrics.example.com/webhook")
metrics = CacheMetrics([custom_collector])
Application-Specific Metrics¶
async def track_business_metrics(cache_metrics: CacheMetrics):
"""Track business-specific metrics."""
# Track user actions
await cache_metrics.increment(
"user.login",
tags={"source": "api", "method": "oauth"}
)
# Track application state
await cache_metrics.gauge(
"active_sessions",
value=get_active_session_count(),
tags={"server": "web-01"}
)
# Track request processing time
timer_id = cache_metrics.start_timer("request_processing")
# ... process request ...
await cache_metrics.end_timer(timer_id, "request_processing", {
"endpoint": "/api/users",
"status_code": "200"
})
Dashboards and Alerting¶
Prometheus + Grafana Dashboard¶
{
"dashboard": {
"title": "YokedCache Monitoring",
"panels": [
{
"title": "Cache Hit Rate",
"type": "singlestat",
"targets": [
{
"expr": "yokedcache_cache_hit_rate",
"format": "time_series"
}
]
},
{
"title": "Operations per Second",
"type": "graph",
"targets": [
{
"expr": "rate(yokedcache_cache_gets_total[5m])",
"legendFormat": "Gets/sec"
},
{
"expr": "rate(yokedcache_cache_sets_total[5m])",
"legendFormat": "Sets/sec"
}
]
},
{
"title": "Operation Latency",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(yokedcache_cache_operation_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
},
{
"expr": "histogram_quantile(0.50, rate(yokedcache_cache_operation_duration_seconds_bucket[5m]))",
"legendFormat": "Median"
}
]
}
]
}
}
Prometheus Alerting Rules¶
# prometheus-alerts.yml
groups:
- name: yokedcache
rules:
- alert: CacheHitRateLow
expr: yokedcache_cache_hit_rate < 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "Cache hit rate is below 80%"
description: "Cache hit rate is {{ $value | humanizePercentage }}"
- alert: CacheOperationLatencyHigh
expr: histogram_quantile(0.95, rate(yokedcache_cache_operation_duration_seconds_bucket[5m])) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "Cache operation latency is high"
description: "95th percentile latency is {{ $value }}s"
- alert: CacheMemoryUsageHigh
expr: yokedcache_cache_size_bytes > 1000000000 # 1GB
for: 1m
labels:
severity: warning
annotations:
summary: "Cache memory usage is high"
description: "Cache is using {{ $value | humanizeBytes }} of memory"
DataDog Dashboard¶
# datadog_dashboard.py
from datadog import initialize, api
# Setup DataDog
initialize(api_key='your-api-key', app_key='your-app-key')
# Create dashboard
dashboard = {
'title': 'YokedCache Monitoring',
'description': 'Monitor YokedCache performance and health',
'widgets': [
{
'definition': {
'type': 'timeseries',
'title': 'Cache Hit Rate',
'requests': [
{
'q': 'avg:myapp.cache.hit_rate{*}',
'display_type': 'line'
}
]
}
},
{
'definition': {
'type': 'timeseries',
'title': 'Cache Operations',
'requests': [
{
'q': 'sum:myapp.cache.gets{*}.as_rate()',
'display_type': 'line'
},
{
'q': 'sum:myapp.cache.sets{*}.as_rate()',
'display_type': 'line'
}
]
}
}
]
}
api.Dashboard.create(dashboard=dashboard)
Performance Monitoring¶
Health Monitoring¶
import asyncio
import time
from yokedcache.monitoring import CacheMetrics
class CacheHealthMonitor:
"""Monitor cache health and performance."""
def __init__(self, cache: YokedCache, metrics: CacheMetrics):
self.cache = cache
self.metrics = metrics
self.monitoring = True
async def start_monitoring(self, interval: int = 60):
"""Start health monitoring loop."""
while self.monitoring:
await self._collect_health_metrics()
await asyncio.sleep(interval)
async def _collect_health_metrics(self):
"""Collect and report health metrics."""
try:
# Check backend health
start_time = time.time()
is_healthy = await self.cache.health_check()
response_time = time.time() - start_time
# Report health metrics
await self.metrics.gauge("cache.health", 1.0 if is_healthy else 0.0)
await self.metrics.timing("cache.health_check_duration", response_time)
if is_healthy:
# Collect performance metrics
stats = await self.cache.get_stats()
await self.metrics.gauge("cache.hit_rate", stats.hit_rate)
await self.metrics.gauge("cache.size_bytes", stats.total_memory_bytes)
await self.metrics.gauge("cache.keys_count", stats.total_keys)
await self.metrics.gauge("cache.uptime_seconds", stats.uptime_seconds)
except Exception as e:
await self.metrics.increment("cache.health_check_errors")
print(f"Health check failed: {e}")
def stop_monitoring(self):
"""Stop health monitoring."""
self.monitoring = False
# Usage
monitor = CacheHealthMonitor(cache, metrics)
asyncio.create_task(monitor.start_monitoring(interval=30))
Performance Benchmarking¶
import asyncio
import statistics
import time
async def benchmark_cache_performance(cache: YokedCache, metrics: CacheMetrics):
"""Benchmark cache performance and report metrics."""
# Warm up cache
print("Warming up cache...")
for i in range(1000):
await cache.set(f"benchmark:warm:{i}", f"value_{i}")
# Benchmark GET operations
print("Benchmarking GET operations...")
get_times = []
for i in range(1000):
start = time.time()
await cache.get(f"benchmark:warm:{i % 1000}")
get_times.append(time.time() - start)
# Report GET performance
await metrics.gauge("benchmark.get.avg_latency", statistics.mean(get_times))
await metrics.gauge("benchmark.get.p95_latency", statistics.quantiles(get_times, n=20)[18])
await metrics.gauge("benchmark.get.p99_latency", statistics.quantiles(get_times, n=100)[98])
# Benchmark SET operations
print("Benchmarking SET operations...")
set_times = []
for i in range(1000):
start = time.time()
await cache.set(f"benchmark:set:{i}", f"benchmark_value_{i}")
set_times.append(time.time() - start)
# Report SET performance
await metrics.gauge("benchmark.set.avg_latency", statistics.mean(set_times))
await metrics.gauge("benchmark.set.p95_latency", statistics.quantiles(set_times, n=20)[18])
await metrics.gauge("benchmark.set.p99_latency", statistics.quantiles(set_times, n=100)[98])
# Calculate throughput
total_ops = len(get_times) + len(set_times)
total_time = max(get_times) + max(set_times)
throughput = total_ops / total_time
await metrics.gauge("benchmark.throughput_ops_per_sec", throughput)
print(f"Benchmark complete:")
print(f" GET avg: {statistics.mean(get_times)*1000:.2f}ms")
print(f" SET avg: {statistics.mean(set_times)*1000:.2f}ms")
print(f" Throughput: {throughput:.0f} ops/sec")
Error Rate Monitoring¶
class ErrorTrackingMetrics:
"""Track and report error rates."""
def __init__(self, metrics: CacheMetrics):
self.metrics = metrics
self.error_counts = {}
async def track_error(self, operation: str, error_type: str, error: Exception):
"""Track an error occurrence."""
error_key = f"{operation}.{error_type}"
await self.metrics.increment("cache.errors.total", tags={
"operation": operation,
"error_type": error_type,
"error_class": error.__class__.__name__
})
# Track error rate
self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
async def calculate_error_rates(self, total_operations: dict):
"""Calculate and report error rates."""
for error_key, error_count in self.error_counts.items():
operation = error_key.split('.')[0]
total_ops = total_operations.get(operation, 1)
error_rate = error_count / total_ops
await self.metrics.gauge(f"cache.error_rate.{operation}", error_rate)
# Usage with cache operations
error_tracker = ErrorTrackingMetrics(metrics)
async def safe_cache_get(key: str):
"""Cache GET with error tracking."""
try:
return await cache.get(key)
except ConnectionError as e:
await error_tracker.track_error("get", "connection", e)
return None
except Exception as e:
await error_tracker.track_error("get", "unknown", e)
return None
Real-time Performance Tracking¶
Monitor performance in real-time and alert on issues:
# Monitor performance in real-time
async def monitor_cache_performance():
while True:
metrics = cache.get_comprehensive_metrics()
# Alert on poor performance
if metrics.hit_rate < 0.70:
await send_alert(f"Low hit rate: {metrics.hit_rate:.2%}")
if metrics.avg_response_time > 0.100:
await send_alert(f"High latency: {metrics.avg_response_time:.3f}s")
if metrics.error_rate > 0.01:
await send_alert(f"High error rate: {metrics.error_rate:.3%}")
await asyncio.sleep(30) # Check every 30 seconds
Alerting and Notifications¶
Configuring Alerts¶
Set up automated alerting based on metrics thresholds:
from yokedcache.monitoring import AlertManager
alert_manager = AlertManager(cache)
# Configure alert thresholds
alert_manager.add_alert(
name="low_hit_rate",
metric="hit_rate",
threshold=0.70,
comparison="less_than",
webhook_url="https://your-webhook.com/alerts"
)
alert_manager.add_alert(
name="high_latency",
metric="avg_response_time",
threshold=0.100,
comparison="greater_than",
email_recipients=["admin@yourcompany.com"]
)
# Start monitoring
alert_manager.start()
Common Alert Patterns¶
Performance Alerts: - Hit rate below 70% - Average response time above 100ms - Error rate above 1%
Availability Alerts: - Redis connection failures - Circuit breaker opened - Connection pool exhaustion
Capacity Alerts: - Memory usage above 80% - Connection pool utilization above 90% - Cache eviction rate above threshold
Production monitoring with YokedCache provides comprehensive visibility and insights needed to maintain high-performance caching systems. Use these tools to optimize performance, detect issues early, and ensure reliable operation in production environments.