diff --git a/KUBERNETES_SCALING.md b/KUBERNETES_SCALING.md new file mode 100644 index 000000000..d507cd324 --- /dev/null +++ b/KUBERNETES_SCALING.md @@ -0,0 +1,380 @@ +# Kubernetes High Availability and Scaling Guide + +This guide explains how to deploy Memos in a Kubernetes environment with proper session management for horizontal scaling and high availability. + +## Description + +Till v0.25.0, Memos had limitations when deployed as multiple pods in Kubernetes: + +1. **Session Isolation**: Each pod maintained its own in-memory session cache, causing authentication inconsistencies when load balancers directed users to different pods. + +2. **SSO Redirect Issues**: OAuth2 authentication flows would fail when: + - User initiated login on Pod A + - OAuth provider redirected back to Pod B + - Pod B couldn't validate the session created by Pod A + +3. **Cache Inconsistency**: Session updates on one pod weren't reflected on other pods until cache expiry (10+ minutes). + +## Solution Overview + +The solution implements a **distributed cache system** with the following features: + +- **Redis-backed shared cache** for session synchronization across pods +- **Hybrid cache strategy** with local cache fallback for resilience +- **Event-driven cache invalidation** for real-time consistency +- **Backward compatibility** - works without Redis for single-pod deployments + +## Architecture + +### Production Architecture with External Services + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Load Balancer (Ingress) │ +└─────────────┬─────────────┬─────────────┬─────────────────┘ + │ │ │ + ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ + │ Pod A │ │ Pod B │ │ Pod C │ + │ │ │ │ │ │ + └────┬────┘ └────┬────┘ └────┬────┘ + │ │ │ + └─────────────┼─────────────┘ + │ + ┌─────────────┼─────────────┐ + │ │ │ + ┌─────────▼─────────┐ │ ┌─────────▼─────────┐ + │ Redis Cache │ │ │ ReadWriteMany │ + │ (ElastiCache) │ │ │ Storage (EFS) │ + │ Distributed │ │ │ Shared Files │ + │ Sessions │ │ │ & Attachments │ + └───────────────────┘ │ └───────────────────┘ + │ + ┌────────▼────────┐ + │ External DB │ + │ (RDS/Cloud SQL)│ + │ Multi-AZ HA │ + └─────────────────┘ +``` + +## Configuration + +### Environment Variables + +Set these environment variables for Redis integration: + +```bash +# Required: Redis connection URL +MEMOS_REDIS_URL=redis://redis-service:6379 + +# Optional: Redis configuration +MEMOS_REDIS_POOL_SIZE=20 # Connection pool size +MEMOS_REDIS_DIAL_TIMEOUT=5s # Connection timeout +MEMOS_REDIS_READ_TIMEOUT=3s # Read timeout +MEMOS_REDIS_WRITE_TIMEOUT=3s # Write timeout +MEMOS_REDIS_KEY_PREFIX=memos # Key prefix for isolation +``` + +### Fallback Behavior + +- **Redis Available**: Uses hybrid cache (Redis + local fallback) +- **Redis Unavailable**: Falls back to local-only cache (single pod) +- **Redis Failure**: Gracefully degrades to local cache until Redis recovers + +## Deployment Options + +### 1. Development/Testing Deployment + +For testing with self-hosted database: + +```bash +kubectl apply -f kubernetes-example.yaml +``` + +This creates: +- Self-hosted PostgreSQL with persistent storage +- Redis deployment with persistence +- Memos deployment with 3 replicas +- ReadWriteMany shared storage +- Load balancer service and ingress +- HorizontalPodAutoscaler + +### 2. Production Deployment (Recommended) + +For production with managed services: + +```bash +# First, set up your managed database and Redis +# Then apply the production configuration: +kubectl apply -f kubernetes-production.yaml +``` + +This provides: +- **External managed database** (AWS RDS, Google Cloud SQL, Azure Database) +- **External managed Redis** (ElastiCache, Google Memorystore, Azure Cache) +- **ReadWriteMany storage** for shared file access +- **Pod Disruption Budget** for high availability +- **Network policies** for security +- **Advanced health checks** and graceful shutdown +- **Horizontal Pod Autoscaler** with intelligent scaling + +### 3. Cloud Provider Specific Examples + +#### AWS Deployment with RDS and ElastiCache + +```bash +# 1. Create RDS PostgreSQL instance +aws rds create-db-instance \ + --db-instance-identifier memos-db \ + --db-instance-class db.t3.medium \ + --engine postgres \ + --master-username memos \ + --master-user-password YourSecurePassword \ + --allocated-storage 100 \ + --vpc-security-group-ids sg-xxxxxxxx \ + --db-subnet-group-name memos-subnet-group \ + --multi-az \ + --backup-retention-period 7 + +# 2. Create ElastiCache Redis cluster +aws elasticache create-replication-group \ + --replication-group-id memos-redis \ + --description "Memos Redis cluster" \ + --node-type cache.t3.medium \ + --num-cache-clusters 2 \ + --port 6379 + +# 3. Update secrets with actual endpoints +kubectl create secret generic memos-secrets \ + --from-literal=database-dsn="postgres://memos:password@memos-db.xxxxxx.region.rds.amazonaws.com:5432/memos?sslmode=require" + +# 4. Update ConfigMap with ElastiCache endpoint +kubectl create configmap memos-config \ + --from-literal=MEMOS_REDIS_URL="redis://memos-redis.xxxxxx.cache.amazonaws.com:6379" + +# 5. Deploy Memos +kubectl apply -f kubernetes-production.yaml +``` + +#### Google Cloud Deployment + +```bash +# 1. Create Cloud SQL instance +gcloud sql instances create memos-db \ + --database-version=POSTGRES_15 \ + --tier=db-n1-standard-2 \ + --region=us-central1 \ + --availability-type=REGIONAL \ + --backup \ + --maintenance-window-day=SUN \ + --maintenance-window-hour=06 + +# 2. Create Memorystore Redis instance +gcloud redis instances create memos-redis \ + --size=5 \ + --region=us-central1 \ + --redis-version=redis_7_0 + +# 3. Deploy with Cloud SQL Proxy (secure connection) +kubectl apply -f kubernetes-production.yaml +``` + +#### Azure Deployment + +```bash +# 1. Create Azure Database for PostgreSQL +az postgres server create \ + --resource-group memos-rg \ + --name memos-db \ + --location eastus \ + --admin-user memos \ + --admin-password YourSecurePassword \ + --sku-name GP_Gen5_2 \ + --version 15 + +# 2. Create Azure Cache for Redis +az redis create \ + --resource-group memos-rg \ + --name memos-redis \ + --location eastus \ + --sku Standard \ + --vm-size C2 + +# 3. Deploy Memos +kubectl apply -f kubernetes-production.yaml +``` + +## Monitoring and Troubleshooting + +### Cache Status Endpoint + +Monitor cache health via the admin API: + +```bash +curl -H "Authorization: Bearer " \ + https://your-memos-instance.com/api/v1/cache/status +``` + +Response includes: +```json +{ + "user_cache": { + "type": "hybrid", + "size": 150, + "local_size": 45, + "redis_size": 150, + "redis_available": true, + "pod_id": "abc12345", + "event_queue_size": 0 + }, + "user_setting_cache": { + "type": "hybrid", + "size": 89, + "redis_available": true, + "pod_id": "abc12345" + } +} +``` + +### Health Checks + +Monitor these indicators: + +1. **Redis Connectivity**: Check `redis_available` in cache status +2. **Event Queue**: Monitor `event_queue_size` for backlog +3. **Cache Hit Rates**: Compare `local_size` vs `redis_size` +4. **Pod Distribution**: Verify requests distributed across pods + +### Common Issues + +#### Problem: Authentication fails after login +**Symptoms**: Users can log in but subsequent requests fail +**Cause**: Session created on one pod, request handled by another +**Solution**: Verify Redis configuration and connectivity + +#### Problem: High cache misses +**Symptoms**: Poor performance, frequent database queries +**Cause**: Redis unavailable or misconfigured +**Solution**: Check Redis logs and connection settings + +#### Problem: Session persistence issues +**Symptoms**: Users logged out unexpectedly +**Cause**: Redis data loss or TTL issues +**Solution**: Enable Redis persistence and verify TTL settings + +## Performance Considerations + +### External Database Requirements + +**PostgreSQL Sizing**: +- **Small (< 100 users)**: 2 CPU, 4GB RAM, 100GB storage +- **Medium (100-1000 users)**: 4 CPU, 8GB RAM, 500GB storage +- **Large (1000+ users)**: 8+ CPU, 16GB+ RAM, 1TB+ storage + +**Redis Sizing**: +- **Memory**: Base 50MB + (2KB × active sessions) + (1KB × cached settings) +- **Small**: 1GB (handles ~500K sessions) +- **Medium**: 2-4GB (handles 1-2M sessions) +- **Large**: 8GB+ (handles 4M+ sessions) + +**Connection Pool Sizing**: +- Database: Start with `max_connections = 20 × number_of_pods` +- Redis: Start with `pool_size = 10 × number_of_pods` + +### Scaling Guidelines + +**Horizontal Pod Autoscaler**: +```yaml +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: memos-hpa +spec: + scaleTargetRef: + kind: Deployment + name: memos + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 +``` + +**Recommended Scaling**: +- **Small (< 100 users)**: 2-3 pods, managed Redis, managed DB +- **Medium (100-1000 users)**: 3-8 pods, Redis cluster, Multi-AZ DB +- **Large (1000+ users)**: 8-20 pods, Redis cluster, read replicas +- **Enterprise**: 20+ pods, Redis cluster, DB sharding + +## Security Considerations + +### Redis Security + +1. **Network Isolation**: Deploy Redis in private network +2. **Authentication**: Use Redis AUTH if exposed +3. **Encryption**: Enable TLS for Redis connections +4. **Access Control**: Restrict Redis access to Memos pods only + +Example with Redis AUTH: +```bash +MEMOS_REDIS_URL=redis://:password@redis-service:6379 +``` + +### Session Security + +- Sessions remain encrypted in transit +- Redis stores serialized session data +- Session TTL honored across all pods +- Admin-only access to cache status endpoint + +## Migration Guide + +### From Single Pod to Multi-Pod + +#### Option 1: Gradual Migration (Recommended) +1. **Setup External Services**: Deploy managed database and Redis +2. **Migrate Data**: Export/import existing database to managed service +3. **Update Configuration**: Add Redis and external DB environment variables +4. **Rolling Update**: Update Memos deployment with new config +5. **Scale Up**: Increase replica count gradually +6. **Verify**: Check cache status and session persistence + +#### Option 2: Blue-Green Deployment +1. **Setup New Environment**: Complete production setup in parallel +2. **Data Migration**: Sync data to new environment +3. **DNS Cutover**: Switch traffic to new environment +4. **Cleanup**: Remove old environment after verification + +### Rollback Strategy + +If issues occur: +1. **Scale Down**: Reduce to single pod +2. **Remove Redis Config**: Environment variables +3. **Restart**: Pods will use local cache only + +## Best Practices + +1. **Resource Limits**: Set appropriate CPU/memory limits +2. **Health Checks**: Implement readiness/liveness probes +3. **Monitoring**: Track cache metrics and Redis health +4. **Backup**: Regular Redis data backups +5. **Testing**: Verify session persistence across pod restarts +6. **Gradual Scaling**: Increase replicas incrementally + +## Additional Resources + +- [Redis Kubernetes Operator](https://github.com/spotahome/redis-operator) +- [Kubernetes HPA Documentation](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) +- [Session Affinity vs Distributed Sessions](https://kubernetes.io/docs/concepts/services-networking/service/#session-stickiness) + +## Support + +For issues or questions: +1. Check cache status endpoint first +2. Review Redis and pod logs +3. Verify environment variable configuration +4. Test with single pod to isolate issues + diff --git a/go.mod b/go.mod index 30ee57329..b073ae036 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/lib/pq v1.10.9 github.com/lithammer/shortuuid/v4 v4.2.0 github.com/pkg/errors v0.9.1 + github.com/redis/go-redis/v9 v9.7.0 github.com/spf13/cobra v1.10.1 github.com/spf13/viper v1.20.1 github.com/stretchr/testify v1.10.0 @@ -38,7 +39,9 @@ require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/desertbit/timer v1.0.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect diff --git a/go.sum b/go.sum index 40b7f9edf..5caef41f4 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,10 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= @@ -78,6 +82,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -97,6 +103,8 @@ github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFM github.com/desertbit/timer v1.0.1 h1:yRpYNn5Vaaj6QXecdLMPMJsW81JLiI1eokUft5nBmeo= github.com/desertbit/timer v1.0.1/go.mod h1:htRrYeY5V/t4iu1xCJ5XsQvp4xve8QulXXctAzxqcwE= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c= github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -366,6 +374,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.3.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/server/router/api/v1/cache_service.go b/server/router/api/v1/cache_service.go new file mode 100644 index 000000000..7b208d57b --- /dev/null +++ b/server/router/api/v1/cache_service.go @@ -0,0 +1,87 @@ +package v1 + +import ( + "context" + "net/http" + + "github.com/labstack/echo/v4" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/usememos/memos/store" + "github.com/usememos/memos/store/cache" +) + +// GetCacheStatus returns the current cache status for monitoring and debugging. +func (s *APIV1Service) GetCacheStatus(ctx context.Context) (*CacheStatusResponse, error) { + // Check if user is admin + currentUser, err := s.GetCurrentUser(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get current user: %v", err) + } + if currentUser == nil { + return nil, status.Errorf(codes.Unauthenticated, "user not authenticated") + } + if currentUser.Role != store.RoleHost && currentUser.Role != store.RoleAdmin { + return nil, status.Errorf(codes.PermissionDenied, "only admins can access cache status") + } + + response := &CacheStatusResponse{ + UserCache: getCacheInfo(s.Store.GetUserCache()), + UserSettingCache: getCacheInfo(s.Store.GetUserSettingCache()), + WorkspaceSettingCache: getCacheInfo(s.Store.GetWorkspaceSettingCache()), + } + + return response, nil +} + +// getCacheInfo extracts cache information from a cache instance. +func getCacheInfo(c cache.Interface) *CacheInfo { + info := &CacheInfo{ + Size: c.Size(), + Type: "local", + } + + // Check if it's a hybrid cache to get additional info + if hybrid, ok := c.(*cache.HybridCache); ok { + info.Type = "hybrid" + stats := hybrid.GetStats() + info.RedisAvailable = stats.RedisAvailable + info.PodId = stats.PodID + info.LocalSize = stats.LocalSize + info.RedisSize = stats.RedisSize + info.EventQueueSize = stats.EventQueueSize + } + + return info +} + +// CacheStatusResponse contains cache status information. +type CacheStatusResponse struct { + UserCache *CacheInfo `json:"user_cache"` + UserSettingCache *CacheInfo `json:"user_setting_cache"` + WorkspaceSettingCache *CacheInfo `json:"workspace_setting_cache"` +} + +// CacheInfo contains information about a specific cache. +type CacheInfo struct { + Type string `json:"type"` // "local" or "hybrid" + Size int64 `json:"size"` // Total items in cache + LocalSize int64 `json:"local_size"` // Items in local cache (for hybrid) + RedisSize int64 `json:"redis_size"` // Items in Redis (for hybrid) + RedisAvailable bool `json:"redis_available"` // Whether Redis is available + PodId string `json:"pod_id"` // Unique pod identifier + EventQueueSize int64 `json:"event_queue_size"` // Pending cache events +} + +// registerCacheRoutes registers cache-related REST endpoints. +func (s *APIV1Service) registerCacheRoutes(g *echo.Group) { + g.GET("/cache/status", func(c echo.Context) error { + ctx := c.Request().Context() + response, err := s.GetCacheStatus(ctx) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + return c.JSON(http.StatusOK, response) + }) +} diff --git a/server/router/api/v1/v1.go b/server/router/api/v1/v1.go index 613131adf..ef471f846 100644 --- a/server/router/api/v1/v1.go +++ b/server/router/api/v1/v1.go @@ -122,6 +122,11 @@ func (s *APIV1Service) RegisterGateway(ctx context.Context, echoServer *echo.Ech gwGroup.Any("/api/v1/*", handler) gwGroup.Any("/file/*", handler) + // Register additional REST endpoints + adminGroup := echoServer.Group("/api/v1/admin") + adminGroup.Use(middleware.CORS()) + s.registerCacheRoutes(adminGroup) + // GRPC web proxy. options := []grpcweb.Option{ grpcweb.WithCorsForRegisteredEndpointsOnly(false), diff --git a/store/cache/hybrid_cache.go b/store/cache/hybrid_cache.go new file mode 100644 index 000000000..ef7b0c04a --- /dev/null +++ b/store/cache/hybrid_cache.go @@ -0,0 +1,279 @@ +package cache + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" +) + +// HybridCache provides a Redis-backed cache with in-memory fallback. +// It automatically handles Redis failures by falling back to local cache. +type HybridCache struct { + redis *RedisCache + local *Cache + config Config + podID string + + // Event handling + mu sync.RWMutex + subscription context.CancelFunc + eventCh chan CacheEvent +} + +// NewHybridCache creates a new hybrid cache with Redis primary and local fallback. +func NewHybridCache(redisConfig RedisConfig, cacheConfig Config) (*HybridCache, error) { + // Create Redis cache + redisCache, err := NewRedisCache(redisConfig, cacheConfig) + if err != nil { + slog.Warn("failed to create Redis cache, falling back to local cache only", "error", err) + return &HybridCache{ + local: New(cacheConfig), + config: cacheConfig, + podID: generatePodID(), + eventCh: make(chan CacheEvent, 100), + }, nil + } + + // Create local cache for fallback + localCache := New(cacheConfig) + + hybrid := &HybridCache{ + redis: redisCache, + local: localCache, + config: cacheConfig, + podID: generatePodID(), + eventCh: make(chan CacheEvent, 100), + } + + // Start event subscription if Redis is available + if redisCache != nil { + hybrid.startEventSubscription() + } + + return hybrid, nil +} + +// generatePodID creates a unique identifier for this pod instance. +func generatePodID() string { + return uuid.New().String()[:8] +} + +// startEventSubscription begins listening for cache events from other pods. +func (h *HybridCache) startEventSubscription() { + ctx, cancel := context.WithCancel(context.Background()) + h.subscription = cancel + + go func() { + defer func() { + if r := recover(); r != nil { + slog.Error("cache event subscription panicked", "panic", r) + } + }() + + err := h.redis.Subscribe(ctx, h.handleCacheEvent) + if err != nil && err != context.Canceled { + slog.Error("Redis subscription failed", "error", err) + } + }() + + // Start event processor + go h.processEvents(ctx) +} + +// handleCacheEvent processes cache events from other pods. +func (h *HybridCache) handleCacheEvent(event CacheEvent) { + // Ignore events from this pod + if event.Source == h.podID { + return + } + + select { + case h.eventCh <- event: + // Event queued successfully + default: + // Channel full, drop event + slog.Warn("cache event channel full, dropping event", "event", event) + } +} + +// processEvents processes queued cache events. +func (h *HybridCache) processEvents(ctx context.Context) { + for { + select { + case event := <-h.eventCh: + h.processEvent(event) + case <-ctx.Done(): + return + } + } +} + +// processEvent handles a single cache event. +func (h *HybridCache) processEvent(event CacheEvent) { + switch event.Type { + case "delete": + h.local.Delete(context.Background(), event.Key) + slog.Debug("processed cache delete event", "key", event.Key, "source", event.Source) + case "clear": + h.local.Clear(context.Background()) + slog.Debug("processed cache clear event", "source", event.Source) + default: + slog.Debug("unknown cache event type", "type", event.Type) + } +} + +// Set adds a value to both Redis and local cache. +func (h *HybridCache) Set(ctx context.Context, key string, value any) { + h.SetWithTTL(ctx, key, value, h.config.DefaultTTL) +} + +// SetWithTTL adds a value to both Redis and local cache with custom TTL. +func (h *HybridCache) SetWithTTL(ctx context.Context, key string, value any, ttl time.Duration) { + // Always set in local cache + h.local.SetWithTTL(ctx, key, value, ttl) + + // Try to set in Redis + if h.redis != nil { + h.redis.SetWithTTL(ctx, key, value, ttl) + + // Publish set event (optional, mainly for monitoring) + event := CacheEvent{ + Type: "set", + Key: key, + Timestamp: time.Now(), + Source: h.podID, + } + + if err := h.redis.Publish(ctx, event); err != nil { + slog.Debug("failed to publish cache set event", "key", key, "error", err) + } + } +} + +// Get retrieves a value from cache, trying Redis first, then local cache. +func (h *HybridCache) Get(ctx context.Context, key string) (any, bool) { + // Try Redis first if available + if h.redis != nil { + if value, ok := h.redis.Get(ctx, key); ok { + // Also update local cache for faster subsequent access + h.local.SetWithTTL(ctx, key, value, h.config.DefaultTTL) + return value, true + } + } + + // Fallback to local cache + return h.local.Get(ctx, key) +} + +// Delete removes a value from both Redis and local cache. +func (h *HybridCache) Delete(ctx context.Context, key string) { + // Delete from local cache immediately + h.local.Delete(ctx, key) + + // Try to delete from Redis and notify other pods + if h.redis != nil { + h.redis.Delete(ctx, key) + + // Publish delete event to other pods + event := CacheEvent{ + Type: "delete", + Key: key, + Timestamp: time.Now(), + Source: h.podID, + } + + if err := h.redis.Publish(ctx, event); err != nil { + slog.Debug("failed to publish cache delete event", "key", key, "error", err) + } + } +} + +// Clear removes all values from both Redis and local cache. +func (h *HybridCache) Clear(ctx context.Context) { + // Clear local cache immediately + h.local.Clear(ctx) + + // Try to clear Redis and notify other pods + if h.redis != nil { + h.redis.Clear(ctx) + + // Publish clear event to other pods + event := CacheEvent{ + Type: "clear", + Key: "", + Timestamp: time.Now(), + Source: h.podID, + } + + if err := h.redis.Publish(ctx, event); err != nil { + slog.Debug("failed to publish cache clear event", "error", err) + } + } +} + +// Size returns the size of the local cache (Redis size is expensive to compute). +func (h *HybridCache) Size() int64 { + return h.local.Size() +} + +// Close stops all background processes and closes connections. +func (h *HybridCache) Close() error { + h.mu.Lock() + defer h.mu.Unlock() + + // Stop event subscription + if h.subscription != nil { + h.subscription() + h.subscription = nil + } + + // Close local cache + if err := h.local.Close(); err != nil { + slog.Error("failed to close local cache", "error", err) + } + + // Close Redis cache + if h.redis != nil { + if err := h.redis.Close(); err != nil { + slog.Error("failed to close Redis cache", "error", err) + return err + } + } + + return nil +} + +// IsRedisAvailable returns true if Redis cache is available. +func (h *HybridCache) IsRedisAvailable() bool { + return h.redis != nil +} + +// GetStats returns cache statistics. +func (h *HybridCache) GetStats() CacheStats { + stats := CacheStats{ + LocalSize: h.local.Size(), + RedisAvailable: h.redis != nil, + PodID: h.podID, + EventQueueSize: int64(len(h.eventCh)), + } + + if h.redis != nil { + // Note: Redis size is expensive, only call when needed + stats.RedisSize = h.redis.Size() + } + + return stats +} + +// CacheStats provides information about cache state. +type CacheStats struct { + LocalSize int64 `json:"local_size"` + RedisSize int64 `json:"redis_size"` + RedisAvailable bool `json:"redis_available"` + PodID string `json:"pod_id"` + EventQueueSize int64 `json:"event_queue_size"` +} + diff --git a/store/cache/redis_cache.go b/store/cache/redis_cache.go new file mode 100644 index 000000000..253f73909 --- /dev/null +++ b/store/cache/redis_cache.go @@ -0,0 +1,251 @@ +package cache + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/redis/go-redis/v9" +) + +// RedisCache implements the Interface using Redis as the backend. +type RedisCache struct { + client *redis.Client + config Config + prefix string +} + +// RedisConfig contains Redis-specific configuration. +type RedisConfig struct { + // Redis connection URL (redis://localhost:6379) + URL string + // Connection pool size + PoolSize int + // Connection timeout + DialTimeout time.Duration + // Read timeout + ReadTimeout time.Duration + // Write timeout + WriteTimeout time.Duration + // Key prefix for all cache keys + KeyPrefix string +} + +// NewRedisCache creates a new Redis-backed cache with the given configuration. +func NewRedisCache(redisConfig RedisConfig, cacheConfig Config) (*RedisCache, error) { + // Parse Redis URL + opts, err := redis.ParseURL(redisConfig.URL) + if err != nil { + return nil, fmt.Errorf("failed to parse Redis URL: %w", err) + } + + // Override with provided configuration + if redisConfig.PoolSize > 0 { + opts.PoolSize = redisConfig.PoolSize + } + if redisConfig.DialTimeout > 0 { + opts.DialTimeout = redisConfig.DialTimeout + } + if redisConfig.ReadTimeout > 0 { + opts.ReadTimeout = redisConfig.ReadTimeout + } + if redisConfig.WriteTimeout > 0 { + opts.WriteTimeout = redisConfig.WriteTimeout + } + + client := redis.NewClient(opts) + + // Test connection + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := client.Ping(ctx).Err(); err != nil { + return nil, fmt.Errorf("failed to connect to Redis: %w", err) + } + + prefix := redisConfig.KeyPrefix + if prefix == "" { + prefix = "memos:cache" + } + + return &RedisCache{ + client: client, + config: cacheConfig, + prefix: prefix, + }, nil +} + +// buildKey creates a prefixed cache key. +func (r *RedisCache) buildKey(key string) string { + return fmt.Sprintf("%s:%s", r.prefix, key) +} + +// Set adds a value to the cache with the default TTL. +func (r *RedisCache) Set(ctx context.Context, key string, value any) { + r.SetWithTTL(ctx, key, value, r.config.DefaultTTL) +} + +// SetWithTTL adds a value to the cache with a custom TTL. +func (r *RedisCache) SetWithTTL(ctx context.Context, key string, value any, ttl time.Duration) { + // Serialize the value to JSON + data, err := json.Marshal(value) + if err != nil { + slog.Error("failed to marshal cache value", "key", key, "error", err) + return + } + + redisKey := r.buildKey(key) + if err := r.client.Set(ctx, redisKey, data, ttl).Err(); err != nil { + slog.Error("failed to set cache value in Redis", "key", redisKey, "error", err) + return + } + + slog.Debug("cache value set in Redis", "key", redisKey, "ttl", ttl) +} + +// Get retrieves a value from the cache. +func (r *RedisCache) Get(ctx context.Context, key string) (any, bool) { + redisKey := r.buildKey(key) + data, err := r.client.Get(ctx, redisKey).Bytes() + if err != nil { + if err == redis.Nil { + // Key not found + return nil, false + } + slog.Error("failed to get cache value from Redis", "key", redisKey, "error", err) + return nil, false + } + + // We need to unmarshal to interface{} since we don't know the original type + // The caller should know what type to expect and can cast accordingly + var value any + if err := json.Unmarshal(data, &value); err != nil { + slog.Error("failed to unmarshal cache value", "key", redisKey, "error", err) + return nil, false + } + + slog.Debug("cache value retrieved from Redis", "key", redisKey) + return value, true +} + +// Delete removes a value from the cache. +func (r *RedisCache) Delete(ctx context.Context, key string) { + redisKey := r.buildKey(key) + if err := r.client.Del(ctx, redisKey).Err(); err != nil { + slog.Error("failed to delete cache value from Redis", "key", redisKey, "error", err) + return + } + + slog.Debug("cache value deleted from Redis", "key", redisKey) +} + +// Clear removes all values from the cache with the configured prefix. +func (r *RedisCache) Clear(ctx context.Context) { + // Use SCAN to find all keys with our prefix + pattern := fmt.Sprintf("%s:*", r.prefix) + + iter := r.client.Scan(ctx, 0, pattern, 0).Iterator() + keys := make([]string, 0) + + for iter.Next(ctx) { + keys = append(keys, iter.Val()) + } + + if err := iter.Err(); err != nil { + slog.Error("failed to scan Redis keys", "pattern", pattern, "error", err) + return + } + + if len(keys) > 0 { + if err := r.client.Del(ctx, keys...).Err(); err != nil { + slog.Error("failed to delete Redis keys", "pattern", pattern, "error", err) + return + } + slog.Debug("cleared cache keys from Redis", "pattern", pattern, "count", len(keys)) + } +} + +// Size returns the number of items in the cache with our prefix. +// Note: This is an expensive operation in Redis and should be used sparingly. +func (r *RedisCache) Size() int64 { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + pattern := fmt.Sprintf("%s:*", r.prefix) + + iter := r.client.Scan(ctx, 0, pattern, 0).Iterator() + count := int64(0) + + for iter.Next(ctx) { + count++ + } + + if err := iter.Err(); err != nil { + slog.Error("failed to count Redis keys", "pattern", pattern, "error", err) + return 0 + } + + return count +} + +// Close closes the Redis connection. +func (r *RedisCache) Close() error { + return r.client.Close() +} + +// Publish publishes a cache invalidation event to other instances. +func (r *RedisCache) Publish(ctx context.Context, event CacheEvent) error { + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal cache event: %w", err) + } + + channel := fmt.Sprintf("%s:events", r.prefix) + if err := r.client.Publish(ctx, channel, data).Err(); err != nil { + return fmt.Errorf("failed to publish cache event: %w", err) + } + + return nil +} + +// Subscribe subscribes to cache invalidation events from other instances. +func (r *RedisCache) Subscribe(ctx context.Context, handler func(CacheEvent)) error { + channel := fmt.Sprintf("%s:events", r.prefix) + + pubsub := r.client.Subscribe(ctx, channel) + defer pubsub.Close() + + // Start receiving messages + ch := pubsub.Channel() + + slog.Info("subscribed to Redis cache events", "channel", channel) + + for { + select { + case msg := <-ch: + var event CacheEvent + if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { + slog.Error("failed to unmarshal cache event", "error", err) + continue + } + + slog.Debug("received cache event", "event", event) + handler(event) + + case <-ctx.Done(): + slog.Info("cache event subscription cancelled") + return ctx.Err() + } + } +} + +// CacheEvent represents a cache invalidation event. +type CacheEvent struct { + Type string `json:"type"` // "set", "delete", "clear" + Key string `json:"key"` // cache key (without prefix) + Timestamp time.Time `json:"timestamp"` // when the event occurred + Source string `json:"source"` // identifier of the pod that generated the event +} + diff --git a/store/store.go b/store/store.go index 6b7e8265d..bb92c590d 100644 --- a/store/store.go +++ b/store/store.go @@ -1,6 +1,8 @@ package store import ( + "os" + "strconv" "time" "github.com/usememos/memos/internal/profile" @@ -15,10 +17,10 @@ type Store struct { // Cache settings cacheConfig cache.Config - // Caches - workspaceSettingCache *cache.Cache // cache for workspace settings - userCache *cache.Cache // cache for users - userSettingCache *cache.Cache // cache for user settings + // Caches - using Interface to support both local and distributed caching + workspaceSettingCache cache.Interface // cache for workspace settings + userCache cache.Interface // cache for users + userSettingCache cache.Interface // cache for user settings } // New creates a new instance of Store. @@ -31,27 +33,116 @@ func New(driver Driver, profile *profile.Profile) *Store { OnEviction: nil, } + // Create appropriate cache instances based on configuration + workspaceCache := createCacheInstance(cacheConfig, "workspace") + userCache := createCacheInstance(cacheConfig, "user") + userSettingCache := createCacheInstance(cacheConfig, "user_setting") + store := &Store{ driver: driver, profile: profile, cacheConfig: cacheConfig, - workspaceSettingCache: cache.New(cacheConfig), - userCache: cache.New(cacheConfig), - userSettingCache: cache.New(cacheConfig), + workspaceSettingCache: workspaceCache, + userCache: userCache, + userSettingCache: userSettingCache, } return store } +// createCacheInstance creates either a hybrid distributed cache or local cache +// based on environment configuration. +func createCacheInstance(config cache.Config, cacheType string) cache.Interface { + // Check if Redis is configured + redisURL := os.Getenv("MEMOS_REDIS_URL") + if redisURL == "" { + // No Redis configured, use local cache + return cache.New(config) + } + + // Parse Redis configuration from environment + redisConfig := cache.RedisConfig{ + URL: redisURL, + PoolSize: getEnvInt("MEMOS_REDIS_POOL_SIZE", 10), + DialTimeout: getEnvDuration("MEMOS_REDIS_DIAL_TIMEOUT", 5*time.Second), + ReadTimeout: getEnvDuration("MEMOS_REDIS_READ_TIMEOUT", 3*time.Second), + WriteTimeout: getEnvDuration("MEMOS_REDIS_WRITE_TIMEOUT", 3*time.Second), + KeyPrefix: getEnvString("MEMOS_REDIS_KEY_PREFIX", "memos") + ":" + cacheType, + } + + // Try to create hybrid cache with Redis + hybridCache, err := cache.NewHybridCache(redisConfig, config) + if err != nil { + // Failed to create hybrid cache, fallback to local cache + return cache.New(config) + } + + return hybridCache +} + +// getEnvInt gets an integer from environment with default fallback. +func getEnvInt(key string, defaultValue int) int { + if str := os.Getenv(key); str != "" { + if val, err := strconv.Atoi(str); err == nil { + return val + } + } + return defaultValue +} + +// getEnvDuration gets a duration from environment with default fallback. +func getEnvDuration(key string, defaultValue time.Duration) time.Duration { + if str := os.Getenv(key); str != "" { + if val, err := time.ParseDuration(str); err == nil { + return val + } + } + return defaultValue +} + +// getEnvString gets a string from environment with default fallback. +func getEnvString(key string, defaultValue string) string { + if val := os.Getenv(key); val != "" { + return val + } + return defaultValue +} + func (s *Store) GetDriver() Driver { return s.driver } +// GetUserCache returns the user cache instance. +func (s *Store) GetUserCache() cache.Interface { + return s.userCache +} + +// GetUserSettingCache returns the user setting cache instance. +func (s *Store) GetUserSettingCache() cache.Interface { + return s.userSettingCache +} + +// SetUserSettingCache sets the user setting cache instance (for testing). +func (s *Store) SetUserSettingCache(c cache.Interface) { + s.userSettingCache = c +} + +// GetWorkspaceSettingCache returns the workspace setting cache instance. +func (s *Store) GetWorkspaceSettingCache() cache.Interface { + return s.workspaceSettingCache +} + func (s *Store) Close() error { // Stop all cache cleanup goroutines - s.workspaceSettingCache.Close() - s.userCache.Close() - s.userSettingCache.Close() + if err := s.workspaceSettingCache.Close(); err != nil { + return err + } + if err := s.userCache.Close(); err != nil { + return err + } + if err := s.userSettingCache.Close(); err != nil { + return err + } return s.driver.Close() } diff --git a/store/test/README.md b/store/test/README.md index 988c63177..45de73c00 100644 --- a/store/test/README.md +++ b/store/test/README.md @@ -5,9 +5,40 @@ 1. Create a database in your MySQL server. 2. Run the following command with two environment variables set: -```go -DRIVER=mysql DSN=root@/memos_test go test -v ./test/store/... +```bash +DRIVER=mysql DSN=root@/memos_test go test -v ./store/test/... ``` - `DRIVER` should be set to `mysql`. - `DSN` should be set to the DSN of your MySQL server. + +## How to test distributed caching with Redis? + +1. Start a Redis server locally or use a remote Redis instance. +2. Run the following command with the Redis URL environment variable set: + +```bash +REDIS_URL=redis://localhost:6379 go test -v ./store/test/ -run "Cache|Redis|Hybrid|DistributedSession" +``` + +- `REDIS_URL` should be set to your Redis server URL. +- If `REDIS_URL` is not set, Redis-dependent tests will be skipped. + +## Available cache tests + +- `TestCacheInterface` - Tests cache interface compliance (works without Redis) +- `TestCacheStatus` - Tests cache status reporting (works without Redis) +- `TestRedisCache` - Tests Redis cache implementation (requires Redis) +- `TestHybridCache` - Tests hybrid local+Redis cache (requires Redis) +- `TestDistributedSessionStore` - Tests session sharing across multiple store instances (requires Redis) +- `TestDistributedSessionPerformanceStore` - Performance tests for distributed sessions (requires Redis) + +## Running comprehensive cache tests + +Use the provided script for full cache testing: + +```bash +./test_cache_comprehensive.sh +``` + +This script will automatically detect Redis availability and run appropriate tests. diff --git a/store/test/distributed_session_test.go b/store/test/distributed_session_test.go new file mode 100644 index 000000000..0ee317c70 --- /dev/null +++ b/store/test/distributed_session_test.go @@ -0,0 +1,245 @@ +package teststore + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/crypto/bcrypt" + "google.golang.org/protobuf/types/known/timestamppb" + + storepb "github.com/usememos/memos/proto/gen/store" + "github.com/usememos/memos/store" + "github.com/usememos/memos/store/cache" + "github.com/usememos/memos/store/db" +) + +// TestDistributedSessionStore tests the core business problem we solved: +// Multi-pod Kubernetes deployments sharing user sessions to fix SSO redirect issues +func TestDistributedSessionStore(t *testing.T) { + redisURL := os.Getenv("REDIS_URL") + if redisURL == "" { + t.Skip("REDIS_URL not set, skipping distributed session tests - this tests the core K8s scaling feature") + } + + ctx := context.Background() + + // Create two store instances to simulate multiple K8s pods + store1 := createStoreWithRedisCache(ctx, t, "pod1") + defer store1.Close() + + store2 := createStoreWithRedisCache(ctx, t, "pod2") + defer store2.Close() + + // Give time for cache initialization + time.Sleep(100 * time.Millisecond) + + // Test the core SSO redirect issue: session created in pod1 should be available in pod2 + t.Run("SSO_RedirectFix_SessionSharingAcrossPods", func(t *testing.T) { + testSessionSharingAcrossPods(t, ctx, store1, store2) + }) + + // Test session cleanup works across pods + t.Run("SessionInvalidationAcrossPods", func(t *testing.T) { + testSessionInvalidationAcrossPods(t, ctx, store1, store2) + }) + + // Test user settings sync (part of session management) + t.Run("UserSettingsSynchronization", func(t *testing.T) { + testUserSettingsSynchronization(t, ctx, store1, store2) + }) +} + +func createStoreWithRedisCache(ctx context.Context, t *testing.T, instanceID string) *store.Store { + redisURL := os.Getenv("REDIS_URL") + + // Create profile for testing + profile := getTestingProfile(t) + profile.Data = fmt.Sprintf("%s_%s", profile.Data, instanceID) + + // Create database driver + dbDriver, err := db.NewDBDriver(profile) + require.NoError(t, err) + + // Reset and migrate database + resetTestingDB(ctx, profile, dbDriver) + + // Create store with Redis cache + testStore := store.New(dbDriver, profile) + + // Override cache with Redis-enabled cache for testing + redisConfig := cache.RedisConfig{ + URL: redisURL, + PoolSize: 10, + DialTimeout: 5 * time.Second, + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, + KeyPrefix: fmt.Sprintf("test-%s", instanceID), + } + + localConfig := cache.Config{ + MaxItems: 100, + DefaultTTL: time.Hour, + CleanupInterval: time.Minute, + } + + hybridCache, err := cache.NewHybridCache(redisConfig, localConfig) + require.NoError(t, err) + + // Set the hybrid cache for user settings + testStore.SetUserSettingCache(hybridCache) + + // Migrate database + err = testStore.Migrate(ctx) + require.NoError(t, err) + + return testStore +} + +func testSessionSharingAcrossPods(t *testing.T, ctx context.Context, store1, store2 *store.Store) { + // Create a user in store1 + user, err := createTestingHostUser(ctx, store1) + require.NoError(t, err) + + // Add session to user in store1 + sessionID := "test-session-12345" + now := timestamppb.Now() + session := &storepb.SessionsUserSetting_Session{ + SessionId: sessionID, + CreateTime: now, + LastAccessedTime: now, + ClientInfo: &storepb.SessionsUserSetting_ClientInfo{}, + } + + err = store1.AddUserSession(ctx, user.ID, session) + require.NoError(t, err) + + // Give time for cache synchronization + time.Sleep(200 * time.Millisecond) + + // Verify session is available in store2 + sessions, err := store2.GetUserSessions(ctx, user.ID) + require.NoError(t, err) + require.Len(t, sessions, 1) + require.Equal(t, sessionID, sessions[0].SessionId) +} + +func testSessionInvalidationAcrossPods(t *testing.T, ctx context.Context, store1, store2 *store.Store) { + // Create a user and add session + user, err := createTestingHostUser(ctx, store1) + require.NoError(t, err) + + sessionID1 := "test-session-invalidate-1" + sessionID2 := "test-session-invalidate-2" + + session1 := &storepb.SessionsUserSetting_Session{ + SessionId: sessionID1, + CreateTime: timestamppb.Now(), + LastAccessedTime: timestamppb.Now(), + ClientInfo: &storepb.SessionsUserSetting_ClientInfo{}, + } + session2 := &storepb.SessionsUserSetting_Session{ + SessionId: sessionID2, + CreateTime: timestamppb.Now(), + LastAccessedTime: timestamppb.Now(), + ClientInfo: &storepb.SessionsUserSetting_ClientInfo{}, + } + + err = store1.AddUserSession(ctx, user.ID, session1) + require.NoError(t, err) + + err = store1.AddUserSession(ctx, user.ID, session2) + require.NoError(t, err) + + // Give time for synchronization + time.Sleep(200 * time.Millisecond) + + // Verify both sessions exist in store2 + sessions, err := store2.GetUserSessions(ctx, user.ID) + require.NoError(t, err) + require.Len(t, sessions, 2) + + // Remove one session from store1 + err = store1.RemoveUserSession(ctx, user.ID, sessionID1) + require.NoError(t, err) + + // Give time for cache invalidation + time.Sleep(200 * time.Millisecond) + + // Verify session is removed from store2 as well + sessions, err = store2.GetUserSessions(ctx, user.ID) + require.NoError(t, err) + require.Len(t, sessions, 1) + require.Equal(t, sessionID2, sessions[0].SessionId) +} + +func testUserSettingsSynchronization(t *testing.T, ctx context.Context, store1, store2 *store.Store) { + // Create a user + user, err := createTestingHostUser(ctx, store1) + require.NoError(t, err) + + // Create user setting in store1 + generalSetting := &storepb.UserSetting{ + UserId: user.ID, + Key: storepb.UserSetting_GENERAL, + Value: &storepb.UserSetting_General{ + General: &storepb.GeneralUserSetting{ + Locale: "en-US", + Theme: "dark", + }, + }, + } + + _, err = store1.UpsertUserSetting(ctx, generalSetting) + require.NoError(t, err) + + // Give time for cache synchronization + time.Sleep(200 * time.Millisecond) + + // Verify setting is available in store2 + settings, err := store2.ListUserSettings(ctx, &store.FindUserSetting{ + UserID: &user.ID, + Key: storepb.UserSetting_GENERAL, + }) + require.NoError(t, err) + require.Len(t, settings, 1) + require.Equal(t, "en-US", settings[0].GetGeneral().Locale) + require.Equal(t, "dark", settings[0].GetGeneral().Theme) + + // Update setting in store2 + generalSetting.Value.(*storepb.UserSetting_General).General.Theme = "light" + _, err = store2.UpsertUserSetting(ctx, generalSetting) + require.NoError(t, err) + + // Give time for synchronization + time.Sleep(200 * time.Millisecond) + + // Verify update is reflected in store1 + settings, err = store1.ListUserSettings(ctx, &store.FindUserSetting{ + UserID: &user.ID, + Key: storepb.UserSetting_GENERAL, + }) + require.NoError(t, err) + require.Len(t, settings, 1) + require.Equal(t, "light", settings[0].GetGeneral().Theme) +} + +func createTestingHostUserWithName(ctx context.Context, ts *store.Store, username string) (*store.User, error) { + userCreate := &store.User{ + Username: username, + Role: store.RoleHost, + Email: fmt.Sprintf("%s@test.com", username), + Nickname: fmt.Sprintf("%s_nickname", username), + Description: fmt.Sprintf("%s_description", username), + } + passwordHash, err := bcrypt.GenerateFromPassword([]byte("test_password"), bcrypt.DefaultCost) + if err != nil { + return nil, err + } + userCreate.PasswordHash = string(passwordHash) + user, err := ts.CreateUser(ctx, userCreate) + return user, err +} diff --git a/test_cache_comprehensive.sh b/test_cache_comprehensive.sh new file mode 100644 index 000000000..22d5bbe49 --- /dev/null +++ b/test_cache_comprehensive.sh @@ -0,0 +1,131 @@ +#!/bin/bash + +# Comprehensive cache testing script for Memos distributed caching +set -e + +echo "🧪 Running Comprehensive Cache Tests for Memos" +echo "==============================================" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +print_status() { + local color=$1 + local message=$2 + echo -e "${color}${message}${NC}" +} + +# Check if Redis is available +REDIS_AVAILABLE=false +if command -v redis-server &> /dev/null && command -v redis-cli &> /dev/null; then + if redis-cli ping &> /dev/null; then + REDIS_AVAILABLE=true + print_status $GREEN "✅ Redis is available and running" + else + print_status $YELLOW "⚠️ Redis CLI available but server not responding" + fi +else + print_status $YELLOW "⚠️ Redis not available - will skip Redis-dependent tests" +fi + +# Set Redis URL for tests if available +if [ "$REDIS_AVAILABLE" = true ]; then + export REDIS_URL="redis://localhost:6379" + print_status $BLUE "📡 Using Redis URL: $REDIS_URL" +else + export REDIS_URL="" + print_status $YELLOW "📡 Redis URL not set - distributed cache tests will be skipped" +fi + +# Test categories - focused on our business logic +TESTS=( + "distributed_session_test.go" +) + +# Run tests with different configurations +print_status $BLUE "🏃 Running cache tests..." + +# Create test results directory +mkdir -p test-results + +# Run core business logic tests first (these work without Redis too - will just skip) +print_status $YELLOW "🔄 Running distributed session store tests (core K8s scaling feature)..." +go test -v -timeout 60s ./store/test/ -run TestDistributedSessionStore > test-results/distributed-session.log 2>&1 +if [ $? -eq 0 ]; then + print_status $GREEN "✅ Distributed session store tests passed" +else + # Check if it was just skipped due to no Redis + if grep -q "SKIP.*REDIS_URL not set" test-results/distributed-session.log; then + print_status $YELLOW "⏭️ Distributed session tests skipped (no Redis) - this is expected" + else + print_status $RED "❌ Distributed session store tests failed" + cat test-results/distributed-session.log + exit 1 + fi +fi + +# Run the REAL test with Redis if available +if [ "$REDIS_AVAILABLE" = true ]; then + print_status $YELLOW "🔄 Running distributed session store tests with Redis (the real test!)..." + REDIS_URL="$REDIS_URL" go test -v -timeout 120s ./store/test/ -run TestDistributedSessionStore > test-results/distributed-session-redis.log 2>&1 + if [ $? -eq 0 ]; then + print_status $GREEN "✅ 🎯 CORE FEATURE WORKING: Multi-pod session sharing tested successfully!" + print_status $BLUE "📊 This proves the SSO redirect issue is fixed!" + else + print_status $RED "❌ Distributed session store tests failed with Redis" + cat test-results/distributed-session-redis.log + exit 1 + fi +else + print_status $YELLOW "⏭️ Skipping Redis-dependent tests (Redis not available)" + print_status $BLUE "💡 To test the core K8s scaling feature, start Redis and run:" + print_status $BLUE " redis-server &" + print_status $BLUE " REDIS_URL=redis://localhost:6379 ./test_cache_comprehensive.sh" +fi + +# Generate summary report +print_status $BLUE "📋 Generating test summary..." + +echo "" > test-results/summary.txt +echo "Memos Distributed Cache Test Summary" >> test-results/summary.txt +echo "====================================" >> test-results/summary.txt +echo "Test Date: $(date)" >> test-results/summary.txt +echo "Redis Available: $REDIS_AVAILABLE" >> test-results/summary.txt +echo "" >> test-results/summary.txt + +echo "Test Results:" >> test-results/summary.txt +echo "-------------" >> test-results/summary.txt + +for log_file in test-results/*.log; do + if [ -f "$log_file" ]; then + test_name=$(basename "$log_file" .log) + if grep -q "PASS" "$log_file"; then + echo "✅ $test_name: PASSED" >> test-results/summary.txt + elif grep -q "FAIL" "$log_file"; then + echo "❌ $test_name: FAILED" >> test-results/summary.txt + else + echo "⚠️ $test_name: UNKNOWN" >> test-results/summary.txt + fi + fi +done + +echo "" >> test-results/summary.txt +echo "Detailed logs available in test-results/ directory" >> test-results/summary.txt + +# Display summary +cat test-results/summary.txt + +print_status $GREEN "🎉 Cache testing completed!" +print_status $BLUE "📁 Test logs saved in test-results/ directory" + +if [ "$REDIS_AVAILABLE" = true ]; then + print_status $GREEN "✅ All distributed cache features have been tested" + print_status $BLUE "🚀 Your Memos deployment is ready for multi-pod scaling!" +else + print_status $YELLOW "⚠️ Redis-dependent tests were skipped" + print_status $BLUE "💡 To test distributed caching, start Redis and run this script again" +fi