This commit is contained in:
Harsha Rahul Boggaram 2025-09-06 14:10:26 -07:00
parent f227c4040f
commit 3bea950588
2 changed files with 41 additions and 68 deletions

View File

@ -12,11 +12,11 @@ import (
// HybridCache provides a Redis-backed cache with in-memory fallback.
// It automatically handles Redis failures by falling back to local cache.
type HybridCache struct {
redis *RedisCache
local *Cache
config Config
podID string
redis *RedisCache
local *Cache
config Config
podID string
// Event handling
mu sync.RWMutex
subscription context.CancelFunc
@ -116,12 +116,8 @@ func (h *HybridCache) processEvent(event CacheEvent) {
switch event.Type {
case "delete":
h.local.Delete(context.Background(), event.Key)
slog.Debug("processed cache delete event", "key", event.Key, "source", event.Source)
case "clear":
h.local.Clear(context.Background())
slog.Debug("processed cache clear event", "source", event.Source)
default:
slog.Debug("unknown cache event type", "type", event.Type)
}
}
@ -132,40 +128,32 @@ func (h *HybridCache) Set(ctx context.Context, key string, value any) {
// SetWithTTL adds a value to both Redis and local cache with custom TTL.
func (h *HybridCache) SetWithTTL(ctx context.Context, key string, value any, ttl time.Duration) {
// Always set in local cache
// Always set in local cache first
h.local.SetWithTTL(ctx, key, value, ttl)
// Try to set in Redis
// Try to set in Redis (no event needed - other pods will get it on demand)
if h.redis != nil {
h.redis.SetWithTTL(ctx, key, value, ttl)
// Publish set event (optional, mainly for monitoring)
event := CacheEvent{
Type: "set",
Key: key,
Timestamp: time.Now(),
Source: h.podID,
}
if err := h.redis.Publish(ctx, event); err != nil {
slog.Debug("failed to publish cache set event", "key", key, "error", err)
}
}
}
// Get retrieves a value from cache, trying Redis first, then local cache.
// Get retrieves a value from cache, trying local first for speed, then Redis.
func (h *HybridCache) Get(ctx context.Context, key string) (any, bool) {
// Try Redis first if available
// Try local cache first for speed
if value, ok := h.local.Get(ctx, key); ok {
return value, true
}
// Try Redis if local cache miss and Redis is available
if h.redis != nil {
if value, ok := h.redis.Get(ctx, key); ok {
// Also update local cache for faster subsequent access
// Populate local cache for faster subsequent access
h.local.SetWithTTL(ctx, key, value, h.config.DefaultTTL)
return value, true
}
}
// Fallback to local cache
return h.local.Get(ctx, key)
return nil, false
}
// Delete removes a value from both Redis and local cache.
@ -176,7 +164,7 @@ func (h *HybridCache) Delete(ctx context.Context, key string) {
// Try to delete from Redis and notify other pods
if h.redis != nil {
h.redis.Delete(ctx, key)
// Publish delete event to other pods
event := CacheEvent{
Type: "delete",
@ -184,7 +172,7 @@ func (h *HybridCache) Delete(ctx context.Context, key string) {
Timestamp: time.Now(),
Source: h.podID,
}
if err := h.redis.Publish(ctx, event); err != nil {
slog.Debug("failed to publish cache delete event", "key", key, "error", err)
}
@ -199,7 +187,7 @@ func (h *HybridCache) Clear(ctx context.Context) {
// Try to clear Redis and notify other pods
if h.redis != nil {
h.redis.Clear(ctx)
// Publish clear event to other pods
event := CacheEvent{
Type: "clear",
@ -207,7 +195,7 @@ func (h *HybridCache) Clear(ctx context.Context) {
Timestamp: time.Now(),
Source: h.podID,
}
if err := h.redis.Publish(ctx, event); err != nil {
slog.Debug("failed to publish cache clear event", "error", err)
}
@ -254,10 +242,10 @@ func (h *HybridCache) IsRedisAvailable() bool {
// GetStats returns cache statistics.
func (h *HybridCache) GetStats() CacheStats {
stats := CacheStats{
LocalSize: h.local.Size(),
RedisAvailable: h.redis != nil,
PodID: h.podID,
EventQueueSize: int64(len(h.eventCh)),
LocalSize: h.local.Size(),
RedisAvailable: h.redis != nil,
PodID: h.podID,
EventQueueSize: int64(len(h.eventCh)),
}
if h.redis != nil {
@ -270,10 +258,9 @@ func (h *HybridCache) GetStats() CacheStats {
// CacheStats provides information about cache state.
type CacheStats struct {
LocalSize int64 `json:"local_size"`
RedisSize int64 `json:"redis_size"`
RedisAvailable bool `json:"redis_available"`
PodID string `json:"pod_id"`
EventQueueSize int64 `json:"event_queue_size"`
LocalSize int64 `json:"local_size"`
RedisSize int64 `json:"redis_size"`
RedisAvailable bool `json:"redis_available"`
PodID string `json:"pod_id"`
EventQueueSize int64 `json:"event_queue_size"`
}

View File

@ -60,7 +60,7 @@ func NewRedisCache(redisConfig RedisConfig, cacheConfig Config) (*RedisCache, er
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
}
@ -89,7 +89,6 @@ func (r *RedisCache) Set(ctx context.Context, key string, value any) {
// SetWithTTL adds a value to the cache with a custom TTL.
func (r *RedisCache) SetWithTTL(ctx context.Context, key string, value any, ttl time.Duration) {
// Serialize the value to JSON
data, err := json.Marshal(value)
if err != nil {
slog.Error("failed to marshal cache value", "key", key, "error", err)
@ -99,10 +98,7 @@ func (r *RedisCache) SetWithTTL(ctx context.Context, key string, value any, ttl
redisKey := r.buildKey(key)
if err := r.client.Set(ctx, redisKey, data, ttl).Err(); err != nil {
slog.Error("failed to set cache value in Redis", "key", redisKey, "error", err)
return
}
slog.Debug("cache value set in Redis", "key", redisKey, "ttl", ttl)
}
// Get retrieves a value from the cache.
@ -111,22 +107,18 @@ func (r *RedisCache) Get(ctx context.Context, key string) (any, bool) {
data, err := r.client.Get(ctx, redisKey).Bytes()
if err != nil {
if err == redis.Nil {
// Key not found
return nil, false
}
slog.Error("failed to get cache value from Redis", "key", redisKey, "error", err)
return nil, false
}
// We need to unmarshal to interface{} since we don't know the original type
// The caller should know what type to expect and can cast accordingly
var value any
if err := json.Unmarshal(data, &value); err != nil {
slog.Error("failed to unmarshal cache value", "key", redisKey, "error", err)
return nil, false
}
slog.Debug("cache value retrieved from Redis", "key", redisKey)
return value, true
}
@ -135,24 +127,21 @@ func (r *RedisCache) Delete(ctx context.Context, key string) {
redisKey := r.buildKey(key)
if err := r.client.Del(ctx, redisKey).Err(); err != nil {
slog.Error("failed to delete cache value from Redis", "key", redisKey, "error", err)
return
}
slog.Debug("cache value deleted from Redis", "key", redisKey)
}
// Clear removes all values from the cache with the configured prefix.
func (r *RedisCache) Clear(ctx context.Context) {
// Use SCAN to find all keys with our prefix
pattern := fmt.Sprintf("%s:*", r.prefix)
iter := r.client.Scan(ctx, 0, pattern, 0).Iterator()
keys := make([]string, 0)
for iter.Next(ctx) {
keys = append(keys, iter.Val())
}
if err := iter.Err(); err != nil {
slog.Error("failed to scan Redis keys", "pattern", pattern, "error", err)
return
@ -161,9 +150,7 @@ func (r *RedisCache) Clear(ctx context.Context) {
if len(keys) > 0 {
if err := r.client.Del(ctx, keys...).Err(); err != nil {
slog.Error("failed to delete Redis keys", "pattern", pattern, "error", err)
return
}
slog.Debug("cleared cache keys from Redis", "pattern", pattern, "count", len(keys))
}
}
@ -174,14 +161,14 @@ func (r *RedisCache) Size() int64 {
defer cancel()
pattern := fmt.Sprintf("%s:*", r.prefix)
iter := r.client.Scan(ctx, 0, pattern, 0).Iterator()
count := int64(0)
for iter.Next(ctx) {
count++
}
if err := iter.Err(); err != nil {
slog.Error("failed to count Redis keys", "pattern", pattern, "error", err)
return 0
@ -213,15 +200,15 @@ func (r *RedisCache) Publish(ctx context.Context, event CacheEvent) error {
// Subscribe subscribes to cache invalidation events from other instances.
func (r *RedisCache) Subscribe(ctx context.Context, handler func(CacheEvent)) error {
channel := fmt.Sprintf("%s:events", r.prefix)
pubsub := r.client.Subscribe(ctx, channel)
defer pubsub.Close()
// Start receiving messages
ch := pubsub.Channel()
slog.Info("subscribed to Redis cache events", "channel", channel)
for {
select {
case msg := <-ch:
@ -230,10 +217,10 @@ func (r *RedisCache) Subscribe(ctx context.Context, handler func(CacheEvent)) er
slog.Error("failed to unmarshal cache event", "error", err)
continue
}
slog.Debug("received cache event", "event", event)
handler(event)
case <-ctx.Done():
slog.Info("cache event subscription cancelled")
return ctx.Err()
@ -248,4 +235,3 @@ type CacheEvent struct {
Timestamp time.Time `json:"timestamp"` // when the event occurred
Source string `json:"source"` // identifier of the pod that generated the event
}