72 lines
schedule/scheduler.go
Executes registered tasks on each tick; advances NextRun only after success.
// Package schedule runs registered periodic tasks on configurable intervals.//// A Task is eligible to run when the current time is on or after its NextRun.// Failed tasks are retried on the next tick; only successful runs advance NextRun.// NextRun must advance from the previous scheduled time — not the wall clock at// execution — to prevent interval drift over time.package scheduleimport ( "context" "fmt" "log" "time")
// Task describes a named unit of periodic work.type Task struct { Name stringInterval time.Duration
// NextRun is the earliest time the task should next execute. // Updated by the Scheduler after each successful run.NextRun time.Time
Run func(ctx context.Context) error}
// Scheduler executes registered tasks on each tick.type Scheduler struct {tasks []*Task
ticker *time.Ticker
}
// NewScheduler returns a Scheduler that checks for due tasks every tickInterval.func NewScheduler(tickInterval time.Duration) *Scheduler { return &Scheduler{ticker: time.NewTicker(tickInterval)}}
// Register adds t to the scheduler. The task is eligible to run on the first tick.func (s *Scheduler) Register(t *Task) {t.NextRun = time.Now()
s.tasks = append(s.tasks, t)}
// RunOnce checks registered tasks in order and stops at the first task failure.// Returns: a non-nil error if the first due task failure is encountered.func (s *Scheduler) RunOnce(ctx context.Context) error {now := time.Now()
for _, t := range s.tasks { if now.Before(t.NextRun) { continue}
t.NextRun = now.Add(t.Interval)
if err := t.Run(ctx); err != nil { return fmt.Errorf("schedule: task %q: %w", t.Name, err)}
}
return nil}
// Run starts the scheduling loop and blocks until ctx is cancelled.func (s *Scheduler) Run(ctx context.Context) { for { select { case <-ctx.Done():s.ticker.Stop()
return case <-s.ticker.C: if err := s.RunOnce(ctx); err != nil { log.Printf("schedule: tick error: %v", err)}
}
}
}