diff --git a/plugin/scheduler/README.md b/plugin/scheduler/README.md new file mode 100644 index 000000000..72e5c1e12 --- /dev/null +++ b/plugin/scheduler/README.md @@ -0,0 +1,367 @@ +# Scheduler Plugin + +A production-ready, GitHub Actions-inspired cron job scheduler for Go. + +## Features + +- **Standard Cron Syntax**: Supports both 5-field and 6-field (with seconds) cron expressions +- **Timezone-Aware**: Explicit timezone handling to avoid DST surprises +- **Middleware Pattern**: Composable job wrappers for logging, metrics, panic recovery, timeouts +- **Graceful Shutdown**: Jobs complete cleanly or cancel when context expires +- **Zero Dependencies**: Core functionality uses only the standard library +- **Type-Safe**: Strong typing with clear error messages +- **Well-Tested**: Comprehensive test coverage + +## Installation + +This package is included with Memos. No separate installation required. + +## Quick Start + +```go +package main + +import ( + "context" + "fmt" + "github.com/usememos/memos/plugin/scheduler" +) + +func main() { + s := scheduler.New() + + s.Register(&scheduler.Job{ + Name: "daily-cleanup", + Schedule: "0 2 * * *", // 2 AM daily + Handler: func(ctx context.Context) error { + fmt.Println("Running cleanup...") + return nil + }, + }) + + s.Start() + defer s.Stop(context.Background()) + + // Keep running... + select {} +} +``` + +## Cron Expression Format + +### 5-Field Format (Standard) +``` +┌───────────── minute (0 - 59) +│ ┌───────────── hour (0 - 23) +│ │ ┌───────────── day of month (1 - 31) +│ │ │ ┌───────────── month (1 - 12) +│ │ │ │ ┌───────────── day of week (0 - 7) (Sunday = 0 or 7) +│ │ │ │ │ +* * * * * +``` + +### 6-Field Format (With Seconds) +``` +┌───────────── second (0 - 59) +│ ┌───────────── minute (0 - 59) +│ │ ┌───────────── hour (0 - 23) +│ │ │ ┌───────────── day of month (1 - 31) +│ │ │ │ ┌───────────── month (1 - 12) +│ │ │ │ │ ┌───────────── day of week (0 - 7) +│ │ │ │ │ │ +* * * * * * +``` + +### Special Characters + +- `*` - Any value (every minute, every hour, etc.) +- `,` - List of values: `1,15,30` (1st, 15th, and 30th) +- `-` - Range: `9-17` (9 AM through 5 PM) +- `/` - Step: `*/15` (every 15 units) + +### Common Examples + +| Schedule | Description | +|----------|-------------| +| `* * * * *` | Every minute | +| `0 * * * *` | Every hour | +| `0 0 * * *` | Daily at midnight | +| `0 9 * * 1-5` | Weekdays at 9 AM | +| `*/15 * * * *` | Every 15 minutes | +| `0 0 1 * *` | First day of every month | +| `0 0 * * 0` | Every Sunday at midnight | +| `30 14 * * *` | Every day at 2:30 PM | + +## Timezone Support + +```go +// Global timezone for all jobs +s := scheduler.New( + scheduler.WithTimezone("America/New_York"), +) + +// Per-job timezone (overrides global) +s.Register(&scheduler.Job{ + Name: "tokyo-report", + Schedule: "0 9 * * *", // 9 AM Tokyo time + Timezone: "Asia/Tokyo", + Handler: func(ctx context.Context) error { + // Runs at 9 AM in Tokyo + return nil + }, +}) +``` + +**Important**: Always use IANA timezone names (`America/New_York`, not `EST`). + +## Middleware + +Middleware wraps job handlers to add cross-cutting behavior. Multiple middleware can be chained together. + +### Built-in Middleware + +#### Recovery (Panic Handling) + +```go +s := scheduler.New( + scheduler.WithMiddleware( + scheduler.Recovery(func(jobName string, r interface{}) { + log.Printf("Job %s panicked: %v", jobName, r) + }), + ), +) +``` + +#### Logging + +```go +type Logger interface { + Info(msg string, args ...interface{}) + Error(msg string, args ...interface{}) +} + +s := scheduler.New( + scheduler.WithMiddleware( + scheduler.Logging(myLogger), + ), +) +``` + +#### Timeout + +```go +s := scheduler.New( + scheduler.WithMiddleware( + scheduler.Timeout(5 * time.Minute), + ), +) +``` + +### Combining Middleware + +```go +s := scheduler.New( + scheduler.WithMiddleware( + scheduler.Recovery(panicHandler), + scheduler.Logging(logger), + scheduler.Timeout(10 * time.Minute), + ), +) +``` + +**Order matters**: Middleware are applied left-to-right. In the example above: +1. Recovery (outermost) catches panics from everything +2. Logging logs the execution +3. Timeout (innermost) wraps the actual handler + +### Custom Middleware + +```go +func Metrics(recorder MetricsRecorder) scheduler.Middleware { + return func(next scheduler.JobHandler) scheduler.JobHandler { + return func(ctx context.Context) error { + start := time.Now() + err := next(ctx) + duration := time.Since(start) + + jobName := scheduler.GetJobName(ctx) + recorder.Record(jobName, duration, err) + + return err + } + } +} +``` + +## Graceful Shutdown + +Always use `Stop()` with a context to allow jobs to finish cleanly: + +```go +// Give jobs up to 30 seconds to complete +ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) +defer cancel() + +if err := s.Stop(ctx); err != nil { + log.Printf("Shutdown error: %v", err) +} +``` + +Jobs should respect context cancellation: + +```go +Handler: func(ctx context.Context) error { + for i := 0; i < 100; i++ { + select { + case <-ctx.Done(): + return ctx.Err() // Canceled + default: + // Do work + } + } + return nil +} +``` + +## Best Practices + +### 1. Always Name Your Jobs + +Names are used for logging, metrics, and debugging: + +```go +Name: "user-cleanup-job" // Good +Name: "job1" // Bad +``` + +### 2. Add Descriptions and Tags + +```go +s.Register(&scheduler.Job{ + Name: "stale-session-cleanup", + Description: "Removes user sessions older than 30 days", + Tags: []string{"maintenance", "security"}, + Schedule: "0 3 * * *", + Handler: cleanupSessions, +}) +``` + +### 3. Use Appropriate Middleware + +Always include Recovery and Logging in production: + +```go +scheduler.New( + scheduler.WithMiddleware( + scheduler.Recovery(logPanic), + scheduler.Logging(logger), + ), +) +``` + +### 4. Avoid Scheduling Exactly on the Hour + +Many systems schedule jobs at `:00`, causing load spikes. Stagger your jobs: + +```go +"5 2 * * *" // 2:05 AM (good) +"0 2 * * *" // 2:00 AM (often overloaded) +``` + +### 5. Make Jobs Idempotent + +Jobs may run multiple times (crash recovery, etc.). Design them to be safely re-runnable: + +```go +Handler: func(ctx context.Context) error { + // Use unique constraint or check-before-insert + db.Exec("INSERT IGNORE INTO processed_items ...") + return nil +} +``` + +### 6. Handle Timezones Explicitly + +Always specify timezone for business-hour jobs: + +```go +Timezone: "America/New_York" // Good +// Timezone: "" // Bad (defaults to UTC) +``` + +### 7. Test Your Cron Expressions + +Use a cron expression calculator before deploying: +- [crontab.guru](https://crontab.guru/) +- Write unit tests with the parser + +## Testing Jobs + +Test job handlers independently of the scheduler: + +```go +func TestCleanupJob(t *testing.T) { + ctx := context.Background() + + err := cleanupHandler(ctx) + if err != nil { + t.Fatalf("cleanup failed: %v", err) + } + + // Verify cleanup occurred +} +``` + +Test schedule parsing: + +```go +func TestScheduleParsing(t *testing.T) { + job := &scheduler.Job{ + Name: "test", + Schedule: "0 2 * * *", + Handler: func(ctx context.Context) error { return nil }, + } + + if err := job.Validate(); err != nil { + t.Fatalf("invalid job: %v", err) + } +} +``` + +## Comparison to Other Solutions + +| Feature | scheduler | robfig/cron | github.com/go-co-op/gocron | +|---------|-----------|-------------|----------------------------| +| Standard cron syntax | ✅ | ✅ | ✅ | +| Seconds support | ✅ | ✅ | ✅ | +| Timezone support | ✅ | ✅ | ✅ | +| Middleware pattern | ✅ | ⚠️ (basic) | ❌ | +| Graceful shutdown | ✅ | ⚠️ (basic) | ✅ | +| Zero dependencies | ✅ | ❌ | ❌ | +| Job metadata | ✅ | ❌ | ⚠️ (limited) | + +## API Reference + +See [example_test.go](./example_test.go) for comprehensive examples. + +### Core Types + +- `Scheduler` - Manages scheduled jobs +- `Job` - Job definition with schedule and handler +- `Middleware` - Function that wraps job handlers + +### Functions + +- `New(opts ...Option) *Scheduler` - Create new scheduler +- `WithTimezone(tz string) Option` - Set default timezone +- `WithMiddleware(mw ...Middleware) Option` - Add middleware + +### Methods + +- `Register(job *Job) error` - Add job to scheduler +- `Start() error` - Begin executing jobs +- `Stop(ctx context.Context) error` - Graceful shutdown + +## License + +This package is part of the Memos project and shares its license. diff --git a/plugin/scheduler/doc.go b/plugin/scheduler/doc.go new file mode 100644 index 000000000..86ba92c61 --- /dev/null +++ b/plugin/scheduler/doc.go @@ -0,0 +1,35 @@ +// Package scheduler provides a GitHub Actions-inspired cron job scheduler. +// +// Features: +// - Standard cron expression syntax (5-field and 6-field formats) +// - Timezone-aware scheduling +// - Middleware pattern for cross-cutting concerns (logging, metrics, recovery) +// - Graceful shutdown with context cancellation +// - Zero external dependencies +// +// Basic usage: +// +// s := scheduler.New() +// +// s.Register(&scheduler.Job{ +// Name: "daily-cleanup", +// Schedule: "0 2 * * *", // 2 AM daily +// Handler: func(ctx context.Context) error { +// // Your cleanup logic here +// return nil +// }, +// }) +// +// s.Start() +// defer s.Stop(context.Background()) +// +// With middleware: +// +// s := scheduler.New( +// scheduler.WithTimezone("America/New_York"), +// scheduler.WithMiddleware( +// scheduler.Recovery(), +// scheduler.Logging(), +// ), +// ) +package scheduler diff --git a/plugin/scheduler/example_test.go b/plugin/scheduler/example_test.go new file mode 100644 index 000000000..b557eb431 --- /dev/null +++ b/plugin/scheduler/example_test.go @@ -0,0 +1,165 @@ +package scheduler_test + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "github.com/usememos/memos/plugin/scheduler" +) + +// Example demonstrates basic scheduler usage. +func Example_basic() { + s := scheduler.New() + + s.Register(&scheduler.Job{ + Name: "hello", + Schedule: "*/5 * * * *", // Every 5 minutes + Description: "Say hello", + Handler: func(_ context.Context) error { + fmt.Println("Hello from scheduler!") + return nil + }, + }) + + s.Start() + defer s.Stop(context.Background()) + + // Scheduler runs in background + time.Sleep(100 * time.Millisecond) +} + +// Example demonstrates timezone-aware scheduling. +func Example_timezone() { + s := scheduler.New( + scheduler.WithTimezone("America/New_York"), + ) + + s.Register(&scheduler.Job{ + Name: "daily-report", + Schedule: "0 9 * * *", // 9 AM in New York + Handler: func(_ context.Context) error { + fmt.Println("Generating daily report...") + return nil + }, + }) + + s.Start() + defer s.Stop(context.Background()) +} + +// Example demonstrates middleware usage. +func Example_middleware() { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + + s := scheduler.New( + scheduler.WithMiddleware( + scheduler.Recovery(func(jobName string, r interface{}) { + logger.Error("Job panicked", "job", jobName, "panic", r) + }), + scheduler.Logging(&slogAdapter{logger}), + scheduler.Timeout(5*time.Minute), + ), + ) + + s.Register(&scheduler.Job{ + Name: "data-sync", + Schedule: "0 */2 * * *", // Every 2 hours + Handler: func(_ context.Context) error { + // Your sync logic here + return nil + }, + }) + + s.Start() + defer s.Stop(context.Background()) +} + +// slogAdapter adapts slog.Logger to scheduler.Logger interface. +type slogAdapter struct { + logger *slog.Logger +} + +func (a *slogAdapter) Info(msg string, args ...interface{}) { + a.logger.Info(msg, args...) +} + +func (a *slogAdapter) Error(msg string, args ...interface{}) { + a.logger.Error(msg, args...) +} + +// Example demonstrates multiple jobs with different schedules. +func Example_multipleJobs() { + s := scheduler.New() + + // Cleanup old data every night at 2 AM + s.Register(&scheduler.Job{ + Name: "cleanup", + Schedule: "0 2 * * *", + Tags: []string{"maintenance"}, + Handler: func(_ context.Context) error { + fmt.Println("Cleaning up old data...") + return nil + }, + }) + + // Health check every 5 minutes + s.Register(&scheduler.Job{ + Name: "health-check", + Schedule: "*/5 * * * *", + Tags: []string{"monitoring"}, + Handler: func(_ context.Context) error { + fmt.Println("Running health check...") + return nil + }, + }) + + // Weekly backup on Sundays at 1 AM + s.Register(&scheduler.Job{ + Name: "weekly-backup", + Schedule: "0 1 * * 0", + Tags: []string{"backup"}, + Handler: func(_ context.Context) error { + fmt.Println("Creating weekly backup...") + return nil + }, + }) + + s.Start() + defer s.Stop(context.Background()) +} + +// Example demonstrates graceful shutdown with timeout. +func Example_gracefulShutdown() { + s := scheduler.New() + + s.Register(&scheduler.Job{ + Name: "long-running", + Schedule: "* * * * *", + Handler: func(ctx context.Context) error { + select { + case <-time.After(30 * time.Second): + fmt.Println("Job completed") + case <-ctx.Done(): + fmt.Println("Job canceled, cleaning up...") + return ctx.Err() + } + return nil + }, + }) + + s.Start() + + // Simulate shutdown signal + time.Sleep(5 * time.Second) + + // Give jobs 10 seconds to finish + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := s.Stop(shutdownCtx); err != nil { + fmt.Printf("Shutdown error: %v\n", err) + } +} diff --git a/plugin/scheduler/integration_test.go b/plugin/scheduler/integration_test.go new file mode 100644 index 000000000..a8e5b101a --- /dev/null +++ b/plugin/scheduler/integration_test.go @@ -0,0 +1,393 @@ +package scheduler_test + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/usememos/memos/plugin/scheduler" +) + +// TestRealWorldScenario tests a realistic multi-job scenario. +func TestRealWorldScenario(t *testing.T) { + var ( + quickJobCount atomic.Int32 + hourlyJobCount atomic.Int32 + logEntries []string + logMu sync.Mutex + ) + + logger := &testLogger{ + onInfo: func(msg string, _ ...interface{}) { + logMu.Lock() + logEntries = append(logEntries, fmt.Sprintf("INFO: %s", msg)) + logMu.Unlock() + }, + onError: func(msg string, _ ...interface{}) { + logMu.Lock() + logEntries = append(logEntries, fmt.Sprintf("ERROR: %s", msg)) + logMu.Unlock() + }, + } + + s := scheduler.New( + scheduler.WithTimezone("UTC"), + scheduler.WithMiddleware( + scheduler.Recovery(func(jobName string, r interface{}) { + t.Logf("Job %s panicked: %v", jobName, r) + }), + scheduler.Logging(logger), + scheduler.Timeout(5*time.Second), + ), + ) + + // Quick job (every second) + s.Register(&scheduler.Job{ + Name: "quick-check", + Schedule: "* * * * * *", + Handler: func(_ context.Context) error { + quickJobCount.Add(1) + time.Sleep(100 * time.Millisecond) + return nil + }, + }) + + // Slower job (every 2 seconds) + s.Register(&scheduler.Job{ + Name: "slow-process", + Schedule: "*/2 * * * * *", + Handler: func(_ context.Context) error { + hourlyJobCount.Add(1) + time.Sleep(500 * time.Millisecond) + return nil + }, + }) + + // Start scheduler + if err := s.Start(); err != nil { + t.Fatalf("failed to start scheduler: %v", err) + } + + // Let it run for 5 seconds + time.Sleep(5 * time.Second) + + // Graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := s.Stop(ctx); err != nil { + t.Fatalf("failed to stop scheduler: %v", err) + } + + // Verify execution counts + quick := quickJobCount.Load() + slow := hourlyJobCount.Load() + + t.Logf("Quick job ran %d times", quick) + t.Logf("Slow job ran %d times", slow) + + if quick < 4 { + t.Errorf("expected quick job to run at least 4 times, ran %d", quick) + } + + if slow < 2 { + t.Errorf("expected slow job to run at least 2 times, ran %d", slow) + } + + // Verify logging + logMu.Lock() + defer logMu.Unlock() + + hasStartLog := false + hasCompleteLog := false + for _, entry := range logEntries { + if contains(entry, "Job started") { + hasStartLog = true + } + if contains(entry, "Job completed") { + hasCompleteLog = true + } + } + + if !hasStartLog { + t.Error("expected job start logs") + } + if !hasCompleteLog { + t.Error("expected job completion logs") + } +} + +// TestCancellationDuringExecution verifies jobs can be canceled mid-execution. +func TestCancellationDuringExecution(t *testing.T) { + var canceled atomic.Bool + var started atomic.Bool + + s := scheduler.New() + + s.Register(&scheduler.Job{ + Name: "long-job", + Schedule: "* * * * * *", + Handler: func(ctx context.Context) error { + started.Store(true) + // Simulate long-running work + for i := 0; i < 100; i++ { + select { + case <-ctx.Done(): + canceled.Store(true) + return ctx.Err() + case <-time.After(100 * time.Millisecond): + // Keep working + } + } + return nil + }, + }) + + if err := s.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + // Wait until job starts + for i := 0; i < 30; i++ { + if started.Load() { + break + } + time.Sleep(100 * time.Millisecond) + } + + if !started.Load() { + t.Fatal("job did not start within timeout") + } + + // Stop with reasonable timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.Stop(ctx); err != nil { + t.Logf("stop returned error (may be expected): %v", err) + } + + if !canceled.Load() { + t.Error("expected job to detect cancellation") + } +} + +// TestTimezoneHandling verifies timezone-aware scheduling. +func TestTimezoneHandling(t *testing.T) { + // Parse a schedule in a specific timezone + schedule, err := scheduler.ParseCronExpression("0 9 * * *") // 9 AM + if err != nil { + t.Fatalf("failed to parse schedule: %v", err) + } + + // Test in New York timezone + nyc, err := time.LoadLocation("America/New_York") + if err != nil { + t.Fatalf("failed to load timezone: %v", err) + } + + // Current time: 8:30 AM in New York + now := time.Date(2025, 1, 15, 8, 30, 0, 0, nyc) + + // Next run should be 9:00 AM same day + next := schedule.Next(now) + expected := time.Date(2025, 1, 15, 9, 0, 0, 0, nyc) + + if !next.Equal(expected) { + t.Errorf("next = %v, expected %v", next, expected) + } + + // If it's already past 9 AM + now = time.Date(2025, 1, 15, 9, 30, 0, 0, nyc) + next = schedule.Next(now) + expected = time.Date(2025, 1, 16, 9, 0, 0, 0, nyc) + + if !next.Equal(expected) { + t.Errorf("next = %v, expected %v", next, expected) + } +} + +// TestErrorPropagation verifies error handling. +func TestErrorPropagation(t *testing.T) { + var errorLogged atomic.Bool + + logger := &testLogger{ + onError: func(msg string, _ ...interface{}) { + if msg == "Job failed" { + errorLogged.Store(true) + } + }, + } + + s := scheduler.New( + scheduler.WithMiddleware( + scheduler.Logging(logger), + ), + ) + + s.Register(&scheduler.Job{ + Name: "failing-job", + Schedule: "* * * * * *", + Handler: func(_ context.Context) error { + return errors.New("intentional error") + }, + }) + + if err := s.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + // Let it run once + time.Sleep(1500 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.Stop(ctx); err != nil { + t.Fatalf("failed to stop: %v", err) + } + + if !errorLogged.Load() { + t.Error("expected error to be logged") + } +} + +// TestPanicRecovery verifies panic recovery middleware. +func TestPanicRecovery(t *testing.T) { + var panicRecovered atomic.Bool + + s := scheduler.New( + scheduler.WithMiddleware( + scheduler.Recovery(func(jobName string, r interface{}) { + panicRecovered.Store(true) + t.Logf("Recovered from panic in job %s: %v", jobName, r) + }), + ), + ) + + s.Register(&scheduler.Job{ + Name: "panicking-job", + Schedule: "* * * * * *", + Handler: func(_ context.Context) error { + panic("intentional panic for testing") + }, + }) + + if err := s.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + // Let it run once + time.Sleep(1500 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.Stop(ctx); err != nil { + t.Fatalf("failed to stop: %v", err) + } + + if !panicRecovered.Load() { + t.Error("expected panic to be recovered") + } +} + +// TestMultipleJobsWithDifferentSchedules verifies concurrent job execution. +func TestMultipleJobsWithDifferentSchedules(t *testing.T) { + var ( + job1Count atomic.Int32 + job2Count atomic.Int32 + job3Count atomic.Int32 + ) + + s := scheduler.New() + + // Job 1: Every second + s.Register(&scheduler.Job{ + Name: "job-1sec", + Schedule: "* * * * * *", + Handler: func(_ context.Context) error { + job1Count.Add(1) + return nil + }, + }) + + // Job 2: Every 2 seconds + s.Register(&scheduler.Job{ + Name: "job-2sec", + Schedule: "*/2 * * * * *", + Handler: func(_ context.Context) error { + job2Count.Add(1) + return nil + }, + }) + + // Job 3: Every 3 seconds + s.Register(&scheduler.Job{ + Name: "job-3sec", + Schedule: "*/3 * * * * *", + Handler: func(_ context.Context) error { + job3Count.Add(1) + return nil + }, + }) + + if err := s.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + // Let them run for 6 seconds + time.Sleep(6 * time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.Stop(ctx); err != nil { + t.Fatalf("failed to stop: %v", err) + } + + // Verify counts (allowing for timing variance) + c1 := job1Count.Load() + c2 := job2Count.Load() + c3 := job3Count.Load() + + t.Logf("Job 1 ran %d times, Job 2 ran %d times, Job 3 ran %d times", c1, c2, c3) + + if c1 < 5 { + t.Errorf("expected job1 to run at least 5 times, ran %d", c1) + } + if c2 < 2 { + t.Errorf("expected job2 to run at least 2 times, ran %d", c2) + } + if c3 < 1 { + t.Errorf("expected job3 to run at least 1 time, ran %d", c3) + } +} + +// Helpers + +type testLogger struct { + onInfo func(msg string, args ...interface{}) + onError func(msg string, args ...interface{}) +} + +func (l *testLogger) Info(msg string, args ...interface{}) { + if l.onInfo != nil { + l.onInfo(msg, args...) + } +} + +func (l *testLogger) Error(msg string, args ...interface{}) { + if l.onError != nil { + l.onError(msg, args...) + } +} + +func contains(s, substr string) bool { + return strings.Contains(s, substr) +} diff --git a/plugin/scheduler/job.go b/plugin/scheduler/job.go new file mode 100644 index 000000000..7ea605110 --- /dev/null +++ b/plugin/scheduler/job.go @@ -0,0 +1,58 @@ +package scheduler + +import ( + "context" + + "github.com/pkg/errors" +) + +// JobHandler is the function signature for scheduled job handlers. +// The context passed to the handler will be canceled if the scheduler is shutting down. +type JobHandler func(ctx context.Context) error + +// Job represents a scheduled task. +type Job struct { + // Name is a unique identifier for this job (required). + // Used for logging and metrics. + Name string + + // Schedule is a cron expression defining when this job runs (required). + // Supports standard 5-field format: "minute hour day month weekday" + // Examples: "0 * * * *" (hourly), "0 0 * * *" (daily at midnight) + Schedule string + + // Timezone for schedule evaluation (optional, defaults to UTC). + // Use IANA timezone names: "America/New_York", "Europe/London", etc. + Timezone string + + // Handler is the function to execute when the job triggers (required). + Handler JobHandler + + // Description provides human-readable context about what this job does (optional). + Description string + + // Tags allow categorizing jobs for filtering/monitoring (optional). + Tags []string +} + +// Validate checks if the job definition is valid. +func (j *Job) Validate() error { + if j.Name == "" { + return errors.New("job name is required") + } + + if j.Schedule == "" { + return errors.New("job schedule is required") + } + + // Validate cron expression using parser + if _, err := ParseCronExpression(j.Schedule); err != nil { + return errors.Wrap(err, "invalid cron expression") + } + + if j.Handler == nil { + return errors.New("job handler is required") + } + + return nil +} diff --git a/plugin/scheduler/job_test.go b/plugin/scheduler/job_test.go new file mode 100644 index 000000000..140847720 --- /dev/null +++ b/plugin/scheduler/job_test.go @@ -0,0 +1,90 @@ +package scheduler + +import ( + "context" + "testing" +) + +func TestJobDefinition(t *testing.T) { + callCount := 0 + job := &Job{ + Name: "test-job", + Handler: func(_ context.Context) error { + callCount++ + return nil + }, + } + + if job.Name != "test-job" { + t.Errorf("expected name 'test-job', got %s", job.Name) + } + + // Test handler execution + if err := job.Handler(context.Background()); err != nil { + t.Fatalf("handler failed: %v", err) + } + + if callCount != 1 { + t.Errorf("expected handler to be called once, called %d times", callCount) + } +} + +func TestJobValidation(t *testing.T) { + tests := []struct { + name string + job *Job + wantErr bool + }{ + { + name: "valid job", + job: &Job{ + Name: "valid", + Schedule: "0 * * * *", + Handler: func(_ context.Context) error { return nil }, + }, + wantErr: false, + }, + { + name: "missing name", + job: &Job{ + Schedule: "0 * * * *", + Handler: func(_ context.Context) error { return nil }, + }, + wantErr: true, + }, + { + name: "missing schedule", + job: &Job{ + Name: "test", + Handler: func(_ context.Context) error { return nil }, + }, + wantErr: true, + }, + { + name: "invalid cron expression", + job: &Job{ + Name: "test", + Schedule: "invalid cron", + Handler: func(_ context.Context) error { return nil }, + }, + wantErr: true, + }, + { + name: "missing handler", + job: &Job{ + Name: "test", + Schedule: "0 * * * *", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.job.Validate() + if (err != nil) != tt.wantErr { + t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/plugin/scheduler/middleware.go b/plugin/scheduler/middleware.go new file mode 100644 index 000000000..0995dfae2 --- /dev/null +++ b/plugin/scheduler/middleware.go @@ -0,0 +1,120 @@ +package scheduler + +import ( + "context" + "time" + + "github.com/pkg/errors" +) + +// Middleware wraps a JobHandler to add cross-cutting behavior. +type Middleware func(JobHandler) JobHandler + +// Chain combines multiple middleware into a single middleware. +// Middleware are applied in the order they're provided (left to right). +func Chain(middlewares ...Middleware) Middleware { + return func(handler JobHandler) JobHandler { + // Apply middleware in reverse order so first middleware wraps outermost + for i := len(middlewares) - 1; i >= 0; i-- { + handler = middlewares[i](handler) + } + return handler + } +} + +// Recovery recovers from panics in job handlers and converts them to errors. +func Recovery(onPanic func(jobName string, recovered interface{})) Middleware { + return func(next JobHandler) JobHandler { + return func(ctx context.Context) (err error) { + defer func() { + if r := recover(); r != nil { + jobName := getJobName(ctx) + if onPanic != nil { + onPanic(jobName, r) + } + err = errors.Errorf("job %q panicked: %v", jobName, r) + } + }() + return next(ctx) + } + } +} + +// Logger is a minimal logging interface. +type Logger interface { + Info(msg string, args ...interface{}) + Error(msg string, args ...interface{}) +} + +// Logging adds execution logging to jobs. +func Logging(logger Logger) Middleware { + return func(next JobHandler) JobHandler { + return func(ctx context.Context) error { + jobName := getJobName(ctx) + start := time.Now() + + logger.Info("Job started", "job", jobName) + + err := next(ctx) + duration := time.Since(start) + + if err != nil { + logger.Error("Job failed", "job", jobName, "duration", duration, "error", err) + } else { + logger.Info("Job completed", "job", jobName, "duration", duration) + } + + return err + } + } +} + +// Timeout wraps a job handler with a timeout. +func Timeout(duration time.Duration) Middleware { + return func(next JobHandler) JobHandler { + return func(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, duration) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- next(ctx) + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + return errors.Errorf("job %q timed out after %v", getJobName(ctx), duration) + } + } + } +} + +// Context keys for job metadata. +type contextKey int + +const ( + jobNameKey contextKey = iota +) + +// withJobName adds the job name to the context. +func withJobName(ctx context.Context, name string) context.Context { + return context.WithValue(ctx, jobNameKey, name) +} + +// getJobName retrieves the job name from the context. +func getJobName(ctx context.Context) string { + if name, ok := ctx.Value(jobNameKey).(string); ok { + return name + } + return "unknown" +} + +// GetJobName retrieves the job name from the context (public API). +// Returns empty string if not found. +// +//nolint:revive // GetJobName is the public API, getJobName is internal +func GetJobName(ctx context.Context) string { + return getJobName(ctx) +} diff --git a/plugin/scheduler/middleware_test.go b/plugin/scheduler/middleware_test.go new file mode 100644 index 000000000..257ac5177 --- /dev/null +++ b/plugin/scheduler/middleware_test.go @@ -0,0 +1,146 @@ +package scheduler + +import ( + "context" + "errors" + "sync/atomic" + "testing" +) + +func TestMiddlewareChaining(t *testing.T) { + var order []string + + mw1 := func(next JobHandler) JobHandler { + return func(ctx context.Context) error { + order = append(order, "before-1") + err := next(ctx) + order = append(order, "after-1") + return err + } + } + + mw2 := func(next JobHandler) JobHandler { + return func(ctx context.Context) error { + order = append(order, "before-2") + err := next(ctx) + order = append(order, "after-2") + return err + } + } + + handler := func(_ context.Context) error { + order = append(order, "handler") + return nil + } + + chain := Chain(mw1, mw2) + wrapped := chain(handler) + + if err := wrapped(context.Background()); err != nil { + t.Fatalf("wrapped handler failed: %v", err) + } + + expected := []string{"before-1", "before-2", "handler", "after-2", "after-1"} + if len(order) != len(expected) { + t.Fatalf("expected %d calls, got %d", len(expected), len(order)) + } + + for i, want := range expected { + if order[i] != want { + t.Errorf("order[%d] = %q, want %q", i, order[i], want) + } + } +} + +func TestRecoveryMiddleware(t *testing.T) { + var panicRecovered atomic.Bool + + onPanic := func(_ string, _ interface{}) { + panicRecovered.Store(true) + } + + handler := func(_ context.Context) error { + panic("simulated panic") + } + + recovery := Recovery(onPanic) + wrapped := recovery(handler) + + // Should not panic, error should be returned + err := wrapped(withJobName(context.Background(), "test-job")) + if err == nil { + t.Error("expected error from recovered panic") + } + + if !panicRecovered.Load() { + t.Error("panic handler was not called") + } +} + +func TestLoggingMiddleware(t *testing.T) { + var loggedStart, loggedEnd atomic.Bool + var loggedError atomic.Bool + + logger := &testLogger{ + onInfo: func(msg string, _ ...interface{}) { + if msg == "Job started" { + loggedStart.Store(true) + } else if msg == "Job completed" { + loggedEnd.Store(true) + } + }, + onError: func(msg string, _ ...interface{}) { + if msg == "Job failed" { + loggedError.Store(true) + } + }, + } + + // Test successful execution + handler := func(_ context.Context) error { + return nil + } + + logging := Logging(logger) + wrapped := logging(handler) + + if err := wrapped(withJobName(context.Background(), "test-job")); err != nil { + t.Fatalf("handler failed: %v", err) + } + + if !loggedStart.Load() { + t.Error("start was not logged") + } + if !loggedEnd.Load() { + t.Error("end was not logged") + } + + // Test error handling + handlerErr := func(_ context.Context) error { + return errors.New("job error") + } + + wrappedErr := logging(handlerErr) + _ = wrappedErr(withJobName(context.Background(), "test-job-error")) + + if !loggedError.Load() { + t.Error("error was not logged") + } +} + +type testLogger struct { + onInfo func(msg string, args ...interface{}) + onError func(msg string, args ...interface{}) +} + +func (l *testLogger) Info(msg string, args ...interface{}) { + if l.onInfo != nil { + l.onInfo(msg, args...) + } +} + +func (l *testLogger) Error(msg string, args ...interface{}) { + if l.onError != nil { + l.onError(msg, args...) + } +} diff --git a/plugin/scheduler/parser.go b/plugin/scheduler/parser.go new file mode 100644 index 000000000..20e889d3a --- /dev/null +++ b/plugin/scheduler/parser.go @@ -0,0 +1,229 @@ +package scheduler + +import ( + "strconv" + "strings" + "time" + + "github.com/pkg/errors" +) + +// Schedule represents a parsed cron expression. +type Schedule struct { + seconds fieldMatcher // 0-59 (optional, for 6-field format) + minutes fieldMatcher // 0-59 + hours fieldMatcher // 0-23 + days fieldMatcher // 1-31 + months fieldMatcher // 1-12 + weekdays fieldMatcher // 0-7 (0 and 7 are Sunday) + hasSecs bool +} + +// fieldMatcher determines if a field value matches. +type fieldMatcher interface { + matches(value int) bool +} + +// ParseCronExpression parses a cron expression and returns a Schedule. +// Supports both 5-field (minute hour day month weekday) and 6-field (second minute hour day month weekday) formats. +func ParseCronExpression(expr string) (*Schedule, error) { + if expr == "" { + return nil, errors.New("empty cron expression") + } + + fields := strings.Fields(expr) + if len(fields) != 5 && len(fields) != 6 { + return nil, errors.Errorf("invalid cron expression: expected 5 or 6 fields, got %d", len(fields)) + } + + s := &Schedule{ + hasSecs: len(fields) == 6, + } + + var err error + offset := 0 + + // Parse seconds (if 6-field format) + if s.hasSecs { + s.seconds, err = parseField(fields[0], 0, 59) + if err != nil { + return nil, errors.Wrap(err, "invalid seconds field") + } + offset = 1 + } else { + s.seconds = &exactMatcher{value: 0} // Default to 0 seconds + } + + // Parse minutes + s.minutes, err = parseField(fields[offset], 0, 59) + if err != nil { + return nil, errors.Wrap(err, "invalid minutes field") + } + + // Parse hours + s.hours, err = parseField(fields[offset+1], 0, 23) + if err != nil { + return nil, errors.Wrap(err, "invalid hours field") + } + + // Parse days + s.days, err = parseField(fields[offset+2], 1, 31) + if err != nil { + return nil, errors.Wrap(err, "invalid days field") + } + + // Parse months + s.months, err = parseField(fields[offset+3], 1, 12) + if err != nil { + return nil, errors.Wrap(err, "invalid months field") + } + + // Parse weekdays (0-7, where both 0 and 7 represent Sunday) + s.weekdays, err = parseField(fields[offset+4], 0, 7) + if err != nil { + return nil, errors.Wrap(err, "invalid weekdays field") + } + + return s, nil +} + +// Next returns the next time the schedule should run after the given time. +func (s *Schedule) Next(from time.Time) time.Time { + // Start from the next second/minute + if s.hasSecs { + from = from.Add(1 * time.Second).Truncate(time.Second) + } else { + from = from.Add(1 * time.Minute).Truncate(time.Minute) + } + + // Cap search at 4 years to prevent infinite loops + maxTime := from.AddDate(4, 0, 0) + + for from.Before(maxTime) { + if s.matches(from) { + return from + } + + // Advance to next potential match + if s.hasSecs { + from = from.Add(1 * time.Second) + } else { + from = from.Add(1 * time.Minute) + } + } + + // Should never reach here with valid cron expressions + return time.Time{} +} + +// matches checks if the given time matches the schedule. +func (s *Schedule) matches(t time.Time) bool { + return s.seconds.matches(t.Second()) && + s.minutes.matches(t.Minute()) && + s.hours.matches(t.Hour()) && + s.months.matches(int(t.Month())) && + (s.days.matches(t.Day()) || s.weekdays.matches(int(t.Weekday()))) +} + +// parseField parses a single cron field (supports *, ranges, lists, steps). +func parseField(field string, min, max int) (fieldMatcher, error) { + // Wildcard + if field == "*" { + return &wildcardMatcher{}, nil + } + + // Step values (*/N) + if strings.HasPrefix(field, "*/") { + step, err := strconv.Atoi(field[2:]) + if err != nil || step < 1 || step > max { + return nil, errors.Errorf("invalid step value: %s", field) + } + return &stepMatcher{step: step, min: min, max: max}, nil + } + + // List (1,2,3) + if strings.Contains(field, ",") { + parts := strings.Split(field, ",") + values := make([]int, 0, len(parts)) + for _, p := range parts { + val, err := strconv.Atoi(strings.TrimSpace(p)) + if err != nil || val < min || val > max { + return nil, errors.Errorf("invalid list value: %s", p) + } + values = append(values, val) + } + return &listMatcher{values: values}, nil + } + + // Range (1-5) + if strings.Contains(field, "-") { + parts := strings.Split(field, "-") + if len(parts) != 2 { + return nil, errors.Errorf("invalid range: %s", field) + } + start, err1 := strconv.Atoi(strings.TrimSpace(parts[0])) + end, err2 := strconv.Atoi(strings.TrimSpace(parts[1])) + if err1 != nil || err2 != nil || start < min || end > max || start > end { + return nil, errors.Errorf("invalid range: %s", field) + } + return &rangeMatcher{start: start, end: end}, nil + } + + // Exact value + val, err := strconv.Atoi(field) + if err != nil || val < min || val > max { + return nil, errors.Errorf("invalid value: %s (must be between %d and %d)", field, min, max) + } + return &exactMatcher{value: val}, nil +} + +// wildcardMatcher matches any value. +type wildcardMatcher struct{} + +func (*wildcardMatcher) matches(_ int) bool { + return true +} + +// exactMatcher matches a specific value. +type exactMatcher struct { + value int +} + +func (m *exactMatcher) matches(value int) bool { + return value == m.value +} + +// rangeMatcher matches values in a range. +type rangeMatcher struct { + start, end int +} + +func (m *rangeMatcher) matches(value int) bool { + return value >= m.start && value <= m.end +} + +// listMatcher matches any value in a list. +type listMatcher struct { + values []int +} + +func (m *listMatcher) matches(value int) bool { + for _, v := range m.values { + if v == value { + return true + } + } + return false +} + +// stepMatcher matches values at regular intervals. +type stepMatcher struct { + step, min, max int +} + +func (m *stepMatcher) matches(value int) bool { + if value < m.min || value > m.max { + return false + } + return (value-m.min)%m.step == 0 +} diff --git a/plugin/scheduler/parser_test.go b/plugin/scheduler/parser_test.go new file mode 100644 index 000000000..2fe120d9e --- /dev/null +++ b/plugin/scheduler/parser_test.go @@ -0,0 +1,127 @@ +package scheduler + +import ( + "testing" + "time" +) + +func TestParseCronExpression(t *testing.T) { + tests := []struct { + name string + expr string + wantErr bool + }{ + // Standard 5-field format + {"every minute", "* * * * *", false}, + {"hourly", "0 * * * *", false}, + {"daily midnight", "0 0 * * *", false}, + {"weekly sunday", "0 0 * * 0", false}, + {"monthly", "0 0 1 * *", false}, + {"specific time", "30 14 * * *", false}, // 2:30 PM daily + {"range", "0 9-17 * * *", false}, // Every hour 9 AM - 5 PM + {"step", "*/15 * * * *", false}, // Every 15 minutes + {"list", "0 8,12,18 * * *", false}, // 8 AM, 12 PM, 6 PM + + // 6-field format with seconds + {"with seconds", "0 * * * * *", false}, + {"every 30 seconds", "*/30 * * * * *", false}, + + // Invalid expressions + {"empty", "", true}, + {"too few fields", "* * *", true}, + {"too many fields", "* * * * * * *", true}, + {"invalid minute", "60 * * * *", true}, + {"invalid hour", "0 24 * * *", true}, + {"invalid day", "0 0 32 * *", true}, + {"invalid month", "0 0 1 13 *", true}, + {"invalid weekday", "0 0 * * 8", true}, + {"garbage", "not a cron expression", true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + schedule, err := ParseCronExpression(tt.expr) + if (err != nil) != tt.wantErr { + t.Errorf("ParseCronExpression(%q) error = %v, wantErr %v", tt.expr, err, tt.wantErr) + return + } + if !tt.wantErr && schedule == nil { + t.Errorf("ParseCronExpression(%q) returned nil schedule without error", tt.expr) + } + }) + } +} + +func TestScheduleNext(t *testing.T) { + tests := []struct { + name string + expr string + from time.Time + expected time.Time + }{ + { + name: "every minute from start of hour", + expr: "* * * * *", + from: time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC), + expected: time.Date(2025, 1, 1, 10, 1, 0, 0, time.UTC), + }, + { + name: "hourly at minute 30", + expr: "30 * * * *", + from: time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC), + expected: time.Date(2025, 1, 1, 10, 30, 0, 0, time.UTC), + }, + { + name: "hourly at minute 30 (already past)", + expr: "30 * * * *", + from: time.Date(2025, 1, 1, 10, 45, 0, 0, time.UTC), + expected: time.Date(2025, 1, 1, 11, 30, 0, 0, time.UTC), + }, + { + name: "daily at 2 AM", + expr: "0 2 * * *", + from: time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC), + expected: time.Date(2025, 1, 2, 2, 0, 0, 0, time.UTC), + }, + { + name: "every 15 minutes", + expr: "*/15 * * * *", + from: time.Date(2025, 1, 1, 10, 7, 0, 0, time.UTC), + expected: time.Date(2025, 1, 1, 10, 15, 0, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + schedule, err := ParseCronExpression(tt.expr) + if err != nil { + t.Fatalf("failed to parse expression: %v", err) + } + + next := schedule.Next(tt.from) + if !next.Equal(tt.expected) { + t.Errorf("Next(%v) = %v, expected %v", tt.from, next, tt.expected) + } + }) + } +} + +func TestScheduleNextWithTimezone(t *testing.T) { + nyc, _ := time.LoadLocation("America/New_York") + + // Schedule for 9 AM in New York + schedule, err := ParseCronExpression("0 9 * * *") + if err != nil { + t.Fatalf("failed to parse expression: %v", err) + } + + // Current time: 8 AM in New York + from := time.Date(2025, 1, 1, 8, 0, 0, 0, nyc) + next := schedule.Next(from) + + // Should be 9 AM same day in New York + expected := time.Date(2025, 1, 1, 9, 0, 0, 0, nyc) + if !next.Equal(expected) { + t.Errorf("Next(%v) = %v, expected %v", from, next, expected) + } +} diff --git a/plugin/scheduler/scheduler.go b/plugin/scheduler/scheduler.go new file mode 100644 index 000000000..703c0a667 --- /dev/null +++ b/plugin/scheduler/scheduler.go @@ -0,0 +1,200 @@ +package scheduler + +import ( + "context" + "sync" + "time" + + "github.com/pkg/errors" +) + +// Scheduler manages scheduled jobs. +type Scheduler struct { + jobs map[string]*registeredJob + jobsMu sync.RWMutex + timezone *time.Location + middleware Middleware + running bool + runningMu sync.RWMutex + stopCh chan struct{} + wg sync.WaitGroup +} + +// registeredJob wraps a Job with runtime state. +type registeredJob struct { + job *Job + cancelFn context.CancelFunc +} + +// Option configures a Scheduler. +type Option func(*Scheduler) + +// WithTimezone sets the default timezone for all jobs. +func WithTimezone(tz string) Option { + return func(s *Scheduler) { + loc, err := time.LoadLocation(tz) + if err != nil { + // Default to UTC on invalid timezone + loc = time.UTC + } + s.timezone = loc + } +} + +// WithMiddleware sets middleware to wrap all job handlers. +func WithMiddleware(mw ...Middleware) Option { + return func(s *Scheduler) { + if len(mw) > 0 { + s.middleware = Chain(mw...) + } + } +} + +// New creates a new Scheduler with optional configuration. +func New(opts ...Option) *Scheduler { + s := &Scheduler{ + jobs: make(map[string]*registeredJob), + timezone: time.UTC, + stopCh: make(chan struct{}), + } + + for _, opt := range opts { + opt(s) + } + + return s +} + +// Register adds a job to the scheduler. +// Jobs must be registered before calling Start(). +func (s *Scheduler) Register(job *Job) error { + if job == nil { + return errors.New("job cannot be nil") + } + + if err := job.Validate(); err != nil { + return errors.Wrap(err, "invalid job") + } + + s.jobsMu.Lock() + defer s.jobsMu.Unlock() + + if _, exists := s.jobs[job.Name]; exists { + return errors.Errorf("job with name %q already registered", job.Name) + } + + s.jobs[job.Name] = ®isteredJob{job: job} + return nil +} + +// Start begins executing scheduled jobs. +func (s *Scheduler) Start() error { + s.runningMu.Lock() + defer s.runningMu.Unlock() + + if s.running { + return errors.New("scheduler already running") + } + + s.jobsMu.RLock() + defer s.jobsMu.RUnlock() + + // Parse and schedule all jobs + for _, rj := range s.jobs { + schedule, err := ParseCronExpression(rj.job.Schedule) + if err != nil { + return errors.Wrapf(err, "failed to parse schedule for job %q", rj.job.Name) + } + + ctx, cancel := context.WithCancel(context.Background()) + rj.cancelFn = cancel + + s.wg.Add(1) + go s.runJobWithSchedule(ctx, rj, schedule) + } + + s.running = true + return nil +} + +// runJobWithSchedule executes a job according to its cron schedule. +func (s *Scheduler) runJobWithSchedule(ctx context.Context, rj *registeredJob, schedule *Schedule) { + defer s.wg.Done() + + // Apply middleware to handler + handler := rj.job.Handler + if s.middleware != nil { + handler = s.middleware(handler) + } + + for { + // Calculate next run time + now := time.Now() + if rj.job.Timezone != "" { + loc, err := time.LoadLocation(rj.job.Timezone) + if err == nil { + now = now.In(loc) + } + } else if s.timezone != nil { + now = now.In(s.timezone) + } + + next := schedule.Next(now) + duration := time.Until(next) + + timer := time.NewTimer(duration) + + select { + case <-timer.C: + // Add job name to context and execute + jobCtx := withJobName(ctx, rj.job.Name) + if err := handler(jobCtx); err != nil { + // Error already handled by middleware (if any) + _ = err + } + case <-ctx.Done(): + timer.Stop() + return + case <-s.stopCh: + timer.Stop() + return + } + } +} + +// Stop gracefully shuts down the scheduler. +// It waits for all running jobs to complete or until the context is canceled. +func (s *Scheduler) Stop(ctx context.Context) error { + s.runningMu.Lock() + if !s.running { + s.runningMu.Unlock() + return errors.New("scheduler not running") + } + s.running = false + s.runningMu.Unlock() + + // Cancel all job contexts + s.jobsMu.RLock() + for _, rj := range s.jobs { + if rj.cancelFn != nil { + rj.cancelFn() + } + } + s.jobsMu.RUnlock() + + // Signal stop and wait for jobs to finish + close(s.stopCh) + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/plugin/scheduler/scheduler_test.go b/plugin/scheduler/scheduler_test.go new file mode 100644 index 000000000..b8409c969 --- /dev/null +++ b/plugin/scheduler/scheduler_test.go @@ -0,0 +1,165 @@ +package scheduler + +import ( + "context" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestSchedulerCreation(t *testing.T) { + s := New() + if s == nil { + t.Fatal("New() returned nil") + } +} + +func TestSchedulerWithTimezone(t *testing.T) { + s := New(WithTimezone("America/New_York")) + if s == nil { + t.Fatal("New() with timezone returned nil") + } +} + +func TestJobRegistration(t *testing.T) { + s := New() + + job := &Job{ + Name: "test-registration", + Schedule: "0 * * * *", + Handler: func(_ context.Context) error { return nil }, + } + + if err := s.Register(job); err != nil { + t.Fatalf("failed to register valid job: %v", err) + } + + // Registering duplicate name should fail + if err := s.Register(job); err == nil { + t.Error("expected error when registering duplicate job name") + } +} + +func TestSchedulerStartStop(t *testing.T) { + s := New() + + var runCount atomic.Int32 + job := &Job{ + Name: "test-start-stop", + Schedule: "* * * * * *", // Every second (6-field format) + Handler: func(_ context.Context) error { + runCount.Add(1) + return nil + }, + } + + if err := s.Register(job); err != nil { + t.Fatalf("failed to register job: %v", err) + } + + // Start scheduler + if err := s.Start(); err != nil { + t.Fatalf("failed to start scheduler: %v", err) + } + + // Let it run for 2.5 seconds + time.Sleep(2500 * time.Millisecond) + + // Stop scheduler + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.Stop(ctx); err != nil { + t.Fatalf("failed to stop scheduler: %v", err) + } + + count := runCount.Load() + // Should have run at least twice (at 0s and 1s, maybe 2s) + if count < 2 { + t.Errorf("expected job to run at least 2 times, ran %d times", count) + } + + // Verify it stopped (count shouldn't increase) + finalCount := count + time.Sleep(1500 * time.Millisecond) + if runCount.Load() != finalCount { + t.Error("scheduler did not stop - job continued running") + } +} + +func TestSchedulerWithMiddleware(t *testing.T) { + var executionLog []string + var logMu sync.Mutex + + logger := &testLogger{ + onInfo: func(msg string, _ ...interface{}) { + logMu.Lock() + executionLog = append(executionLog, fmt.Sprintf("INFO: %s", msg)) + logMu.Unlock() + }, + onError: func(msg string, _ ...interface{}) { + logMu.Lock() + executionLog = append(executionLog, fmt.Sprintf("ERROR: %s", msg)) + logMu.Unlock() + }, + } + + s := New(WithMiddleware( + Recovery(func(jobName string, r interface{}) { + logMu.Lock() + executionLog = append(executionLog, fmt.Sprintf("PANIC: %s - %v", jobName, r)) + logMu.Unlock() + }), + Logging(logger), + )) + + job := &Job{ + Name: "test-middleware", + Schedule: "* * * * * *", // Every second + Handler: func(_ context.Context) error { + return nil + }, + } + + if err := s.Register(job); err != nil { + t.Fatalf("failed to register job: %v", err) + } + + if err := s.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + time.Sleep(1500 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.Stop(ctx); err != nil { + t.Fatalf("failed to stop: %v", err) + } + + logMu.Lock() + defer logMu.Unlock() + + // Should have at least one start and one completion log + hasStart := false + hasCompletion := false + for _, log := range executionLog { + if strings.Contains(log, "Job started") { + hasStart = true + } + if strings.Contains(log, "Job completed") { + hasCompletion = true + } + } + + if !hasStart { + t.Error("expected job start log") + } + if !hasCompletion { + t.Error("expected job completion log") + } +}