From 3bea95058804bcadf70056638937bb6168644b7b Mon Sep 17 00:00:00 2001 From: Harsha Rahul Boggaram Date: Sat, 6 Sep 2025 14:10:26 -0700 Subject: [PATCH] cleanup --- store/cache/hybrid_cache.go | 71 +++++++++++++++---------------------- store/cache/redis_cache.go | 38 +++++++------------- 2 files changed, 41 insertions(+), 68 deletions(-) diff --git a/store/cache/hybrid_cache.go b/store/cache/hybrid_cache.go index ef7b0c04a..a1b8dbab2 100644 --- a/store/cache/hybrid_cache.go +++ b/store/cache/hybrid_cache.go @@ -12,11 +12,11 @@ import ( // HybridCache provides a Redis-backed cache with in-memory fallback. // It automatically handles Redis failures by falling back to local cache. type HybridCache struct { - redis *RedisCache - local *Cache - config Config - podID string - + redis *RedisCache + local *Cache + config Config + podID string + // Event handling mu sync.RWMutex subscription context.CancelFunc @@ -116,12 +116,8 @@ func (h *HybridCache) processEvent(event CacheEvent) { switch event.Type { case "delete": h.local.Delete(context.Background(), event.Key) - slog.Debug("processed cache delete event", "key", event.Key, "source", event.Source) case "clear": h.local.Clear(context.Background()) - slog.Debug("processed cache clear event", "source", event.Source) - default: - slog.Debug("unknown cache event type", "type", event.Type) } } @@ -132,40 +128,32 @@ func (h *HybridCache) Set(ctx context.Context, key string, value any) { // SetWithTTL adds a value to both Redis and local cache with custom TTL. func (h *HybridCache) SetWithTTL(ctx context.Context, key string, value any, ttl time.Duration) { - // Always set in local cache + // Always set in local cache first h.local.SetWithTTL(ctx, key, value, ttl) - // Try to set in Redis + // Try to set in Redis (no event needed - other pods will get it on demand) if h.redis != nil { h.redis.SetWithTTL(ctx, key, value, ttl) - - // Publish set event (optional, mainly for monitoring) - event := CacheEvent{ - Type: "set", - Key: key, - Timestamp: time.Now(), - Source: h.podID, - } - - if err := h.redis.Publish(ctx, event); err != nil { - slog.Debug("failed to publish cache set event", "key", key, "error", err) - } } } -// Get retrieves a value from cache, trying Redis first, then local cache. +// Get retrieves a value from cache, trying local first for speed, then Redis. func (h *HybridCache) Get(ctx context.Context, key string) (any, bool) { - // Try Redis first if available + // Try local cache first for speed + if value, ok := h.local.Get(ctx, key); ok { + return value, true + } + + // Try Redis if local cache miss and Redis is available if h.redis != nil { if value, ok := h.redis.Get(ctx, key); ok { - // Also update local cache for faster subsequent access + // Populate local cache for faster subsequent access h.local.SetWithTTL(ctx, key, value, h.config.DefaultTTL) return value, true } } - // Fallback to local cache - return h.local.Get(ctx, key) + return nil, false } // Delete removes a value from both Redis and local cache. @@ -176,7 +164,7 @@ func (h *HybridCache) Delete(ctx context.Context, key string) { // Try to delete from Redis and notify other pods if h.redis != nil { h.redis.Delete(ctx, key) - + // Publish delete event to other pods event := CacheEvent{ Type: "delete", @@ -184,7 +172,7 @@ func (h *HybridCache) Delete(ctx context.Context, key string) { Timestamp: time.Now(), Source: h.podID, } - + if err := h.redis.Publish(ctx, event); err != nil { slog.Debug("failed to publish cache delete event", "key", key, "error", err) } @@ -199,7 +187,7 @@ func (h *HybridCache) Clear(ctx context.Context) { // Try to clear Redis and notify other pods if h.redis != nil { h.redis.Clear(ctx) - + // Publish clear event to other pods event := CacheEvent{ Type: "clear", @@ -207,7 +195,7 @@ func (h *HybridCache) Clear(ctx context.Context) { Timestamp: time.Now(), Source: h.podID, } - + if err := h.redis.Publish(ctx, event); err != nil { slog.Debug("failed to publish cache clear event", "error", err) } @@ -254,10 +242,10 @@ func (h *HybridCache) IsRedisAvailable() bool { // GetStats returns cache statistics. func (h *HybridCache) GetStats() CacheStats { stats := CacheStats{ - LocalSize: h.local.Size(), - RedisAvailable: h.redis != nil, - PodID: h.podID, - EventQueueSize: int64(len(h.eventCh)), + LocalSize: h.local.Size(), + RedisAvailable: h.redis != nil, + PodID: h.podID, + EventQueueSize: int64(len(h.eventCh)), } if h.redis != nil { @@ -270,10 +258,9 @@ func (h *HybridCache) GetStats() CacheStats { // CacheStats provides information about cache state. type CacheStats struct { - LocalSize int64 `json:"local_size"` - RedisSize int64 `json:"redis_size"` - RedisAvailable bool `json:"redis_available"` - PodID string `json:"pod_id"` - EventQueueSize int64 `json:"event_queue_size"` + LocalSize int64 `json:"local_size"` + RedisSize int64 `json:"redis_size"` + RedisAvailable bool `json:"redis_available"` + PodID string `json:"pod_id"` + EventQueueSize int64 `json:"event_queue_size"` } - diff --git a/store/cache/redis_cache.go b/store/cache/redis_cache.go index 253f73909..22c137eaf 100644 --- a/store/cache/redis_cache.go +++ b/store/cache/redis_cache.go @@ -60,7 +60,7 @@ func NewRedisCache(redisConfig RedisConfig, cacheConfig Config) (*RedisCache, er // Test connection ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - + if err := client.Ping(ctx).Err(); err != nil { return nil, fmt.Errorf("failed to connect to Redis: %w", err) } @@ -89,7 +89,6 @@ func (r *RedisCache) Set(ctx context.Context, key string, value any) { // SetWithTTL adds a value to the cache with a custom TTL. func (r *RedisCache) SetWithTTL(ctx context.Context, key string, value any, ttl time.Duration) { - // Serialize the value to JSON data, err := json.Marshal(value) if err != nil { slog.Error("failed to marshal cache value", "key", key, "error", err) @@ -99,10 +98,7 @@ func (r *RedisCache) SetWithTTL(ctx context.Context, key string, value any, ttl redisKey := r.buildKey(key) if err := r.client.Set(ctx, redisKey, data, ttl).Err(); err != nil { slog.Error("failed to set cache value in Redis", "key", redisKey, "error", err) - return } - - slog.Debug("cache value set in Redis", "key", redisKey, "ttl", ttl) } // Get retrieves a value from the cache. @@ -111,22 +107,18 @@ func (r *RedisCache) Get(ctx context.Context, key string) (any, bool) { data, err := r.client.Get(ctx, redisKey).Bytes() if err != nil { if err == redis.Nil { - // Key not found return nil, false } slog.Error("failed to get cache value from Redis", "key", redisKey, "error", err) return nil, false } - // We need to unmarshal to interface{} since we don't know the original type - // The caller should know what type to expect and can cast accordingly var value any if err := json.Unmarshal(data, &value); err != nil { slog.Error("failed to unmarshal cache value", "key", redisKey, "error", err) return nil, false } - slog.Debug("cache value retrieved from Redis", "key", redisKey) return value, true } @@ -135,24 +127,21 @@ func (r *RedisCache) Delete(ctx context.Context, key string) { redisKey := r.buildKey(key) if err := r.client.Del(ctx, redisKey).Err(); err != nil { slog.Error("failed to delete cache value from Redis", "key", redisKey, "error", err) - return } - - slog.Debug("cache value deleted from Redis", "key", redisKey) } // Clear removes all values from the cache with the configured prefix. func (r *RedisCache) Clear(ctx context.Context) { // Use SCAN to find all keys with our prefix pattern := fmt.Sprintf("%s:*", r.prefix) - + iter := r.client.Scan(ctx, 0, pattern, 0).Iterator() keys := make([]string, 0) - + for iter.Next(ctx) { keys = append(keys, iter.Val()) } - + if err := iter.Err(); err != nil { slog.Error("failed to scan Redis keys", "pattern", pattern, "error", err) return @@ -161,9 +150,7 @@ func (r *RedisCache) Clear(ctx context.Context) { if len(keys) > 0 { if err := r.client.Del(ctx, keys...).Err(); err != nil { slog.Error("failed to delete Redis keys", "pattern", pattern, "error", err) - return } - slog.Debug("cleared cache keys from Redis", "pattern", pattern, "count", len(keys)) } } @@ -174,14 +161,14 @@ func (r *RedisCache) Size() int64 { defer cancel() pattern := fmt.Sprintf("%s:*", r.prefix) - + iter := r.client.Scan(ctx, 0, pattern, 0).Iterator() count := int64(0) - + for iter.Next(ctx) { count++ } - + if err := iter.Err(); err != nil { slog.Error("failed to count Redis keys", "pattern", pattern, "error", err) return 0 @@ -213,15 +200,15 @@ func (r *RedisCache) Publish(ctx context.Context, event CacheEvent) error { // Subscribe subscribes to cache invalidation events from other instances. func (r *RedisCache) Subscribe(ctx context.Context, handler func(CacheEvent)) error { channel := fmt.Sprintf("%s:events", r.prefix) - + pubsub := r.client.Subscribe(ctx, channel) defer pubsub.Close() // Start receiving messages ch := pubsub.Channel() - + slog.Info("subscribed to Redis cache events", "channel", channel) - + for { select { case msg := <-ch: @@ -230,10 +217,10 @@ func (r *RedisCache) Subscribe(ctx context.Context, handler func(CacheEvent)) er slog.Error("failed to unmarshal cache event", "error", err) continue } - + slog.Debug("received cache event", "event", event) handler(event) - + case <-ctx.Done(): slog.Info("cache event subscription cancelled") return ctx.Err() @@ -248,4 +235,3 @@ type CacheEvent struct { Timestamp time.Time `json:"timestamp"` // when the event occurred Source string `json:"source"` // identifier of the pod that generated the event } -