mirror of
https://github.com/k8snetworkplumbingwg/whereabouts.git
synced 2025-06-03 06:42:26 +00:00
![dependabot[bot]](/assets/img/avatar_default.png)
Bumps the other-go-modules group with 8 updates: | Package | From | To | | --- | --- | --- | | [github.com/containernetworking/cni](https://github.com/containernetworking/cni) | `1.2.3` | `1.3.0` | | [github.com/containernetworking/plugins](https://github.com/containernetworking/plugins) | `1.6.1` | `1.6.2` | | [github.com/k8snetworkplumbingwg/network-attachment-definition-client](https://github.com/k8snetworkplumbingwg/network-attachment-definition-client) | `1.7.0` | `1.7.6` | | [github.com/onsi/gomega](https://github.com/onsi/gomega) | `1.36.2` | `1.37.0` | | [gomodules.xyz/jsonpatch/v2](https://github.com/gomodules/jsonpatch) | `2.4.0` | `2.5.0` | | [github.com/go-co-op/gocron/v2](https://github.com/go-co-op/gocron) | `2.12.4` | `2.16.1` | | [github.com/fsnotify/fsnotify](https://github.com/fsnotify/fsnotify) | `1.8.0` | `1.9.0` | | [golang.org/x/time](https://github.com/golang/time) | `0.8.0` | `0.11.0` | Updates `github.com/containernetworking/cni` from 1.2.3 to 1.3.0 - [Release notes](https://github.com/containernetworking/cni/releases) - [Commits](https://github.com/containernetworking/cni/compare/v1.2.3...v1.3.0) Updates `github.com/containernetworking/plugins` from 1.6.1 to 1.6.2 - [Release notes](https://github.com/containernetworking/plugins/releases) - [Commits](https://github.com/containernetworking/plugins/compare/v1.6.1...v1.6.2) Updates `github.com/k8snetworkplumbingwg/network-attachment-definition-client` from 1.7.0 to 1.7.6 - [Release notes](https://github.com/k8snetworkplumbingwg/network-attachment-definition-client/releases) - [Commits](https://github.com/k8snetworkplumbingwg/network-attachment-definition-client/compare/v1.7.0...v1.7.6) Updates `github.com/onsi/gomega` from 1.36.2 to 1.37.0 - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.36.2...v1.37.0) Updates `gomodules.xyz/jsonpatch/v2` from 2.4.0 to 2.5.0 - [Release notes](https://github.com/gomodules/jsonpatch/releases) - [Changelog](https://github.com/gomodules/jsonpatch/blob/release-2.0/CHANGELOG.md) - [Commits](https://github.com/gomodules/jsonpatch/compare/v2.4.0...v2.5.0) Updates `github.com/go-co-op/gocron/v2` from 2.12.4 to 2.16.1 - [Release notes](https://github.com/go-co-op/gocron/releases) - [Commits](https://github.com/go-co-op/gocron/compare/v2.12.4...v2.16.1) Updates `github.com/fsnotify/fsnotify` from 1.8.0 to 1.9.0 - [Release notes](https://github.com/fsnotify/fsnotify/releases) - [Changelog](https://github.com/fsnotify/fsnotify/blob/main/CHANGELOG.md) - [Commits](https://github.com/fsnotify/fsnotify/compare/v1.8.0...v1.9.0) Updates `golang.org/x/time` from 0.8.0 to 0.11.0 - [Commits](https://github.com/golang/time/compare/v0.8.0...v0.11.0) --- updated-dependencies: - dependency-name: github.com/containernetworking/cni dependency-version: 1.3.0 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: other-go-modules - dependency-name: github.com/containernetworking/plugins dependency-version: 1.6.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: other-go-modules - dependency-name: github.com/k8snetworkplumbingwg/network-attachment-definition-client dependency-version: 1.7.6 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: other-go-modules - dependency-name: github.com/onsi/gomega dependency-version: 1.37.0 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: other-go-modules - dependency-name: gomodules.xyz/jsonpatch/v2 dependency-version: 2.5.0 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: other-go-modules - dependency-name: github.com/go-co-op/gocron/v2 dependency-version: 2.16.1 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: other-go-modules - dependency-name: github.com/fsnotify/fsnotify dependency-version: 1.9.0 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: other-go-modules - dependency-name: golang.org/x/time dependency-version: 0.11.0 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: other-go-modules ... Signed-off-by: dependabot[bot] <support@github.com>
1023 lines
27 KiB
Go
1023 lines
27 KiB
Go
//go:generate mockgen -destination=mocks/scheduler.go -package=gocronmocks . Scheduler
|
|
package gocron
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"runtime"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/jonboulle/clockwork"
|
|
)
|
|
|
|
var _ Scheduler = (*scheduler)(nil)
|
|
|
|
// Scheduler defines the interface for the Scheduler.
|
|
type Scheduler interface {
|
|
// Jobs returns all the jobs currently in the scheduler.
|
|
Jobs() []Job
|
|
// NewJob creates a new job in the Scheduler. The job is scheduled per the provided
|
|
// definition when the Scheduler is started. If the Scheduler is already running
|
|
// the job will be scheduled when the Scheduler is started.
|
|
// If you set the first argument of your Task func to be a context.Context,
|
|
// gocron will pass in a context (either the default Job context, or one
|
|
// provided via WithContext) to the job and will cancel the context on shutdown.
|
|
// This allows you to listen for and handle cancellation within your job.
|
|
NewJob(JobDefinition, Task, ...JobOption) (Job, error)
|
|
// RemoveByTags removes all jobs that have at least one of the provided tags.
|
|
RemoveByTags(...string)
|
|
// RemoveJob removes the job with the provided id.
|
|
RemoveJob(uuid.UUID) error
|
|
// Shutdown should be called when you no longer need
|
|
// the Scheduler or Job's as the Scheduler cannot
|
|
// be restarted after calling Shutdown. This is similar
|
|
// to a Close or Cleanup method and is often deferred after
|
|
// starting the scheduler.
|
|
Shutdown() error
|
|
// Start begins scheduling jobs for execution based
|
|
// on each job's definition. Job's added to an already
|
|
// running scheduler will be scheduled immediately based
|
|
// on definition. Start is non-blocking.
|
|
Start()
|
|
// StopJobs stops the execution of all jobs in the scheduler.
|
|
// This can be useful in situations where jobs need to be
|
|
// paused globally and then restarted with Start().
|
|
StopJobs() error
|
|
// Update replaces the existing Job's JobDefinition with the provided
|
|
// JobDefinition. The Job's Job.ID() remains the same.
|
|
Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error)
|
|
// JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait
|
|
// In case of LimitModeReschedule or no limit it will be always zero
|
|
JobsWaitingInQueue() int
|
|
}
|
|
|
|
// -----------------------------------------------
|
|
// -----------------------------------------------
|
|
// ----------------- Scheduler -------------------
|
|
// -----------------------------------------------
|
|
// -----------------------------------------------
|
|
|
|
type scheduler struct {
|
|
// context used for shutting down
|
|
shutdownCtx context.Context
|
|
// cancel used to signal scheduler should shut down
|
|
shutdownCancel context.CancelFunc
|
|
// the executor, which actually runs the jobs sent to it via the scheduler
|
|
exec executor
|
|
// the map of jobs registered in the scheduler
|
|
jobs map[uuid.UUID]internalJob
|
|
// the location used by the scheduler for scheduling when relevant
|
|
location *time.Location
|
|
// whether the scheduler has been started or not
|
|
started bool
|
|
// globally applied JobOption's set on all jobs added to the scheduler
|
|
// note: individually set JobOption's take precedence.
|
|
globalJobOptions []JobOption
|
|
// the scheduler's logger
|
|
logger Logger
|
|
|
|
// used to tell the scheduler to start
|
|
startCh chan struct{}
|
|
// used to report that the scheduler has started
|
|
startedCh chan struct{}
|
|
// used to tell the scheduler to stop
|
|
stopCh chan struct{}
|
|
// used to report that the scheduler has stopped
|
|
stopErrCh chan error
|
|
// used to send all the jobs out when a request is made by the client
|
|
allJobsOutRequest chan allJobsOutRequest
|
|
// used to send a jobs out when a request is made by the client
|
|
jobOutRequestCh chan jobOutRequest
|
|
// used to run a job on-demand when requested by the client
|
|
runJobRequestCh chan runJobRequest
|
|
// new jobs are received here
|
|
newJobCh chan newJobIn
|
|
// requests from the client to remove jobs by ID are received here
|
|
removeJobCh chan uuid.UUID
|
|
// requests from the client to remove jobs by tags are received here
|
|
removeJobsByTagsCh chan []string
|
|
}
|
|
|
|
type newJobIn struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
job internalJob
|
|
}
|
|
|
|
type jobOutRequest struct {
|
|
id uuid.UUID
|
|
outChan chan internalJob
|
|
}
|
|
|
|
type runJobRequest struct {
|
|
id uuid.UUID
|
|
outChan chan error
|
|
}
|
|
|
|
type allJobsOutRequest struct {
|
|
outChan chan []Job
|
|
}
|
|
|
|
// NewScheduler creates a new Scheduler instance.
|
|
// The Scheduler is not started until Start() is called.
|
|
//
|
|
// NewJob will add jobs to the Scheduler, but they will not
|
|
// be scheduled until Start() is called.
|
|
func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
|
|
schCtx, cancel := context.WithCancel(context.Background())
|
|
|
|
exec := executor{
|
|
stopCh: make(chan struct{}),
|
|
stopTimeout: time.Second * 10,
|
|
singletonRunners: nil,
|
|
logger: &noOpLogger{},
|
|
clock: clockwork.NewRealClock(),
|
|
|
|
jobsIn: make(chan jobIn),
|
|
jobsOutForRescheduling: make(chan uuid.UUID),
|
|
jobUpdateNextRuns: make(chan uuid.UUID),
|
|
jobsOutCompleted: make(chan uuid.UUID),
|
|
jobOutRequest: make(chan jobOutRequest, 1000),
|
|
done: make(chan error, 1),
|
|
}
|
|
|
|
s := &scheduler{
|
|
shutdownCtx: schCtx,
|
|
shutdownCancel: cancel,
|
|
exec: exec,
|
|
jobs: make(map[uuid.UUID]internalJob),
|
|
location: time.Local,
|
|
logger: &noOpLogger{},
|
|
|
|
newJobCh: make(chan newJobIn),
|
|
removeJobCh: make(chan uuid.UUID),
|
|
removeJobsByTagsCh: make(chan []string),
|
|
startCh: make(chan struct{}),
|
|
startedCh: make(chan struct{}),
|
|
stopCh: make(chan struct{}),
|
|
stopErrCh: make(chan error, 1),
|
|
jobOutRequestCh: make(chan jobOutRequest),
|
|
runJobRequestCh: make(chan runJobRequest),
|
|
allJobsOutRequest: make(chan allJobsOutRequest),
|
|
}
|
|
|
|
for _, option := range options {
|
|
err := option(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
s.logger.Info("gocron: new scheduler created")
|
|
for {
|
|
select {
|
|
case id := <-s.exec.jobsOutForRescheduling:
|
|
s.selectExecJobsOutForRescheduling(id)
|
|
case id := <-s.exec.jobUpdateNextRuns:
|
|
s.updateNextScheduled(id)
|
|
case id := <-s.exec.jobsOutCompleted:
|
|
s.selectExecJobsOutCompleted(id)
|
|
|
|
case in := <-s.newJobCh:
|
|
s.selectNewJob(in)
|
|
|
|
case id := <-s.removeJobCh:
|
|
s.selectRemoveJob(id)
|
|
|
|
case tags := <-s.removeJobsByTagsCh:
|
|
s.selectRemoveJobsByTags(tags)
|
|
|
|
case out := <-s.exec.jobOutRequest:
|
|
s.selectJobOutRequest(out)
|
|
|
|
case out := <-s.jobOutRequestCh:
|
|
s.selectJobOutRequest(out)
|
|
|
|
case out := <-s.allJobsOutRequest:
|
|
s.selectAllJobsOutRequest(out)
|
|
|
|
case run := <-s.runJobRequestCh:
|
|
s.selectRunJobRequest(run)
|
|
|
|
case <-s.startCh:
|
|
s.selectStart()
|
|
|
|
case <-s.stopCh:
|
|
s.stopScheduler()
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
s.stopScheduler()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// -----------------------------------------------
|
|
// -----------------------------------------------
|
|
// --------- Scheduler Channel Methods -----------
|
|
// -----------------------------------------------
|
|
// -----------------------------------------------
|
|
|
|
// The scheduler's channel functions are broken out here
|
|
// to allow prioritizing within the select blocks. The idea
|
|
// being that we want to make sure that scheduling tasks
|
|
// are not blocked by requests from the caller for information
|
|
// about jobs.
|
|
|
|
func (s *scheduler) stopScheduler() {
|
|
s.logger.Debug("gocron: stopping scheduler")
|
|
if s.started {
|
|
s.exec.stopCh <- struct{}{}
|
|
}
|
|
|
|
for _, j := range s.jobs {
|
|
j.stop()
|
|
}
|
|
for _, j := range s.jobs {
|
|
<-j.ctx.Done()
|
|
}
|
|
var err error
|
|
if s.started {
|
|
t := time.NewTimer(s.exec.stopTimeout + 1*time.Second)
|
|
select {
|
|
case err = <-s.exec.done:
|
|
t.Stop()
|
|
case <-t.C:
|
|
err = ErrStopExecutorTimedOut
|
|
}
|
|
}
|
|
for id, j := range s.jobs {
|
|
oldCtx := j.ctx
|
|
if j.parentCtx == nil {
|
|
j.parentCtx = s.shutdownCtx
|
|
}
|
|
j.ctx, j.cancel = context.WithCancel(j.parentCtx)
|
|
|
|
// also replace the old context with the new one in the parameters
|
|
if len(j.parameters) > 0 && j.parameters[0] == oldCtx {
|
|
j.parameters[0] = j.ctx
|
|
}
|
|
|
|
s.jobs[id] = j
|
|
}
|
|
|
|
s.stopErrCh <- err
|
|
s.started = false
|
|
s.logger.Debug("gocron: scheduler stopped")
|
|
}
|
|
|
|
func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
|
|
outJobs := make([]Job, len(s.jobs))
|
|
var counter int
|
|
for _, j := range s.jobs {
|
|
outJobs[counter] = s.jobFromInternalJob(j)
|
|
counter++
|
|
}
|
|
slices.SortFunc(outJobs, func(a, b Job) int {
|
|
aID, bID := a.ID().String(), b.ID().String()
|
|
return strings.Compare(aID, bID)
|
|
})
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case out.outChan <- outJobs:
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) selectRunJobRequest(run runJobRequest) {
|
|
j, ok := s.jobs[run.id]
|
|
if !ok {
|
|
select {
|
|
case run.outChan <- ErrJobNotFound:
|
|
default:
|
|
}
|
|
}
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
select {
|
|
case run.outChan <- ErrJobRunNowFailed:
|
|
default:
|
|
}
|
|
case s.exec.jobsIn <- jobIn{
|
|
id: j.id,
|
|
shouldSendOut: false,
|
|
}:
|
|
select {
|
|
case run.outChan <- nil:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) selectRemoveJob(id uuid.UUID) {
|
|
j, ok := s.jobs[id]
|
|
if !ok {
|
|
return
|
|
}
|
|
j.stop()
|
|
delete(s.jobs, id)
|
|
}
|
|
|
|
// Jobs coming back from the executor to the scheduler that
|
|
// need to be evaluated for rescheduling.
|
|
func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
return
|
|
default:
|
|
}
|
|
j, ok := s.jobs[id]
|
|
if !ok {
|
|
// the job was removed while it was running, and
|
|
// so we don't need to reschedule it.
|
|
return
|
|
}
|
|
|
|
if j.stopTimeReached(s.now()) {
|
|
return
|
|
}
|
|
|
|
var scheduleFrom time.Time
|
|
if len(j.nextScheduled) > 0 {
|
|
// always grab the last element in the slice as that is the furthest
|
|
// out in the future and the time from which we want to calculate
|
|
// the subsequent next run time.
|
|
slices.SortStableFunc(j.nextScheduled, ascendingTime)
|
|
scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1]
|
|
}
|
|
|
|
if scheduleFrom.IsZero() {
|
|
scheduleFrom = j.startTime
|
|
}
|
|
|
|
next := j.next(scheduleFrom)
|
|
if next.IsZero() {
|
|
// the job's next function will return zero for OneTime jobs.
|
|
// since they are one time only, they do not need rescheduling.
|
|
return
|
|
}
|
|
|
|
if next.Before(s.now()) {
|
|
// in some cases the next run time can be in the past, for example:
|
|
// - the time on the machine was incorrect and has been synced with ntp
|
|
// - the machine went to sleep, and woke up some time later
|
|
// in those cases, we want to increment to the next run in the future
|
|
// and schedule the job for that time.
|
|
for next.Before(s.now()) {
|
|
next = j.next(next)
|
|
}
|
|
}
|
|
|
|
if slices.Contains(j.nextScheduled, next) {
|
|
// if the next value is a duplicate of what's already in the nextScheduled slice, for example:
|
|
// - the job is being rescheduled off the same next run value as before
|
|
// increment to the next, next value
|
|
for slices.Contains(j.nextScheduled, next) {
|
|
next = j.next(next)
|
|
}
|
|
}
|
|
|
|
// Clean up any existing timer to prevent leaks
|
|
if j.timer != nil {
|
|
j.timer.Stop()
|
|
j.timer = nil // Ensure timer is cleared for GC
|
|
}
|
|
|
|
j.nextScheduled = append(j.nextScheduled, next)
|
|
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
|
|
// set the actual timer on the job here and listen for
|
|
// shut down events so that the job doesn't attempt to
|
|
// run if the scheduler has been shutdown.
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
return
|
|
case s.exec.jobsIn <- jobIn{
|
|
id: j.id,
|
|
shouldSendOut: true,
|
|
}:
|
|
}
|
|
})
|
|
// update the job with its new next and last run times and timer.
|
|
s.jobs[id] = j
|
|
}
|
|
|
|
func (s *scheduler) updateNextScheduled(id uuid.UUID) {
|
|
j, ok := s.jobs[id]
|
|
if !ok {
|
|
return
|
|
}
|
|
var newNextScheduled []time.Time
|
|
for _, t := range j.nextScheduled {
|
|
if t.Before(s.now()) {
|
|
continue
|
|
}
|
|
newNextScheduled = append(newNextScheduled, t)
|
|
}
|
|
j.nextScheduled = newNextScheduled
|
|
s.jobs[id] = j
|
|
}
|
|
|
|
func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
|
|
j, ok := s.jobs[id]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// if the job has nextScheduled time in the past,
|
|
// we need to remove any that are in the past.
|
|
var newNextScheduled []time.Time
|
|
for _, t := range j.nextScheduled {
|
|
if t.Before(s.now()) {
|
|
continue
|
|
}
|
|
newNextScheduled = append(newNextScheduled, t)
|
|
}
|
|
j.nextScheduled = newNextScheduled
|
|
|
|
// if the job has a limited number of runs set, we need to
|
|
// check how many runs have occurred and stop running this
|
|
// job if it has reached the limit.
|
|
if j.limitRunsTo != nil {
|
|
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
|
|
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
|
|
go func() {
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
return
|
|
case s.removeJobCh <- id:
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
}
|
|
|
|
j.lastRun = s.now()
|
|
s.jobs[id] = j
|
|
}
|
|
|
|
func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
|
|
if j, ok := s.jobs[out.id]; ok {
|
|
select {
|
|
case out.outChan <- j:
|
|
case <-s.shutdownCtx.Done():
|
|
}
|
|
}
|
|
close(out.outChan)
|
|
}
|
|
|
|
func (s *scheduler) selectNewJob(in newJobIn) {
|
|
j := in.job
|
|
if s.started {
|
|
next := j.startTime
|
|
if j.startImmediately {
|
|
next = s.now()
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.exec.jobsIn <- jobIn{
|
|
id: j.id,
|
|
shouldSendOut: true,
|
|
}:
|
|
}
|
|
} else {
|
|
if next.IsZero() {
|
|
next = j.next(s.now())
|
|
}
|
|
|
|
id := j.id
|
|
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.exec.jobsIn <- jobIn{
|
|
id: id,
|
|
shouldSendOut: true,
|
|
}:
|
|
}
|
|
})
|
|
}
|
|
j.startTime = next
|
|
j.nextScheduled = append(j.nextScheduled, next)
|
|
}
|
|
|
|
s.jobs[j.id] = j
|
|
in.cancel()
|
|
}
|
|
|
|
func (s *scheduler) selectRemoveJobsByTags(tags []string) {
|
|
for _, j := range s.jobs {
|
|
for _, tag := range tags {
|
|
if slices.Contains(j.tags, tag) {
|
|
j.stop()
|
|
delete(s.jobs, j.id)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) selectStart() {
|
|
s.logger.Debug("gocron: scheduler starting")
|
|
go s.exec.start()
|
|
|
|
s.started = true
|
|
for id, j := range s.jobs {
|
|
next := j.startTime
|
|
if j.startImmediately {
|
|
next = s.now()
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.exec.jobsIn <- jobIn{
|
|
id: id,
|
|
shouldSendOut: true,
|
|
}:
|
|
}
|
|
} else {
|
|
if next.IsZero() {
|
|
next = j.next(s.now())
|
|
}
|
|
|
|
jobID := id
|
|
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.exec.jobsIn <- jobIn{
|
|
id: jobID,
|
|
shouldSendOut: true,
|
|
}:
|
|
}
|
|
})
|
|
}
|
|
j.startTime = next
|
|
j.nextScheduled = append(j.nextScheduled, next)
|
|
s.jobs[id] = j
|
|
}
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.startedCh <- struct{}{}:
|
|
s.logger.Info("gocron: scheduler started")
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------
|
|
// -----------------------------------------------
|
|
// ------------- Scheduler Methods ---------------
|
|
// -----------------------------------------------
|
|
// -----------------------------------------------
|
|
|
|
func (s *scheduler) now() time.Time {
|
|
return s.exec.clock.Now().In(s.location)
|
|
}
|
|
|
|
func (s *scheduler) jobFromInternalJob(in internalJob) job {
|
|
return job{
|
|
in.id,
|
|
in.name,
|
|
slices.Clone(in.tags),
|
|
s.jobOutRequestCh,
|
|
s.runJobRequestCh,
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) Jobs() []Job {
|
|
outChan := make(chan []Job)
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.allJobsOutRequest <- allJobsOutRequest{outChan: outChan}:
|
|
}
|
|
|
|
var jobs []Job
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case jobs = <-outChan:
|
|
}
|
|
|
|
return jobs
|
|
}
|
|
|
|
func (s *scheduler) NewJob(jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) {
|
|
return s.addOrUpdateJob(uuid.Nil, jobDefinition, task, options)
|
|
}
|
|
|
|
func (s *scheduler) verifyInterfaceVariadic(taskFunc reflect.Value, tsk task, variadicStart int) error {
|
|
ifaceType := taskFunc.Type().In(variadicStart).Elem()
|
|
for i := variadicStart; i < len(tsk.parameters); i++ {
|
|
if !reflect.TypeOf(tsk.parameters[i]).Implements(ifaceType) {
|
|
return ErrNewJobWrongTypeOfParameters
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *scheduler) verifyVariadic(taskFunc reflect.Value, tsk task, variadicStart int) error {
|
|
if err := s.verifyNonVariadic(taskFunc, tsk, variadicStart); err != nil {
|
|
return err
|
|
}
|
|
parameterType := taskFunc.Type().In(variadicStart).Elem().Kind()
|
|
if parameterType == reflect.Interface {
|
|
return s.verifyInterfaceVariadic(taskFunc, tsk, variadicStart)
|
|
}
|
|
if parameterType == reflect.Pointer {
|
|
parameterType = reflect.Indirect(reflect.ValueOf(taskFunc.Type().In(variadicStart))).Kind()
|
|
}
|
|
|
|
for i := variadicStart; i < len(tsk.parameters); i++ {
|
|
argumentType := reflect.TypeOf(tsk.parameters[i]).Kind()
|
|
if argumentType == reflect.Interface || argumentType == reflect.Pointer {
|
|
argumentType = reflect.TypeOf(tsk.parameters[i]).Elem().Kind()
|
|
}
|
|
if argumentType != parameterType {
|
|
return ErrNewJobWrongTypeOfParameters
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *scheduler) verifyNonVariadic(taskFunc reflect.Value, tsk task, length int) error {
|
|
for i := 0; i < length; i++ {
|
|
t1 := reflect.TypeOf(tsk.parameters[i]).Kind()
|
|
if t1 == reflect.Interface || t1 == reflect.Pointer {
|
|
t1 = reflect.TypeOf(tsk.parameters[i]).Elem().Kind()
|
|
}
|
|
t2 := reflect.New(taskFunc.Type().In(i)).Elem().Kind()
|
|
if t2 == reflect.Interface || t2 == reflect.Pointer {
|
|
t2 = reflect.Indirect(reflect.ValueOf(taskFunc.Type().In(i))).Kind()
|
|
}
|
|
if t1 != t2 {
|
|
return ErrNewJobWrongTypeOfParameters
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *scheduler) verifyParameterType(taskFunc reflect.Value, tsk task) error {
|
|
isVariadic := taskFunc.Type().IsVariadic()
|
|
if isVariadic {
|
|
variadicStart := taskFunc.Type().NumIn() - 1
|
|
return s.verifyVariadic(taskFunc, tsk, variadicStart)
|
|
}
|
|
expectedParameterLength := taskFunc.Type().NumIn()
|
|
if len(tsk.parameters) != expectedParameterLength {
|
|
return ErrNewJobWrongNumberOfParameters
|
|
}
|
|
return s.verifyNonVariadic(taskFunc, tsk, expectedParameterLength)
|
|
}
|
|
|
|
func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) {
|
|
j := internalJob{}
|
|
if id == uuid.Nil {
|
|
j.id = uuid.New()
|
|
} else {
|
|
currentJob := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh)
|
|
if currentJob != nil && currentJob.id != uuid.Nil {
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
return nil, nil
|
|
case s.removeJobCh <- id:
|
|
<-currentJob.ctx.Done()
|
|
}
|
|
}
|
|
|
|
j.id = id
|
|
}
|
|
|
|
if taskWrapper == nil {
|
|
return nil, ErrNewJobTaskNil
|
|
}
|
|
|
|
tsk := taskWrapper()
|
|
taskFunc := reflect.ValueOf(tsk.function)
|
|
for taskFunc.Kind() == reflect.Ptr {
|
|
taskFunc = taskFunc.Elem()
|
|
}
|
|
|
|
if taskFunc.Kind() != reflect.Func {
|
|
return nil, ErrNewJobTaskNotFunc
|
|
}
|
|
|
|
j.name = runtime.FuncForPC(taskFunc.Pointer()).Name()
|
|
j.function = tsk.function
|
|
j.parameters = tsk.parameters
|
|
|
|
// apply global job options
|
|
for _, option := range s.globalJobOptions {
|
|
if err := option(&j, s.now()); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// apply job specific options, which take precedence
|
|
for _, option := range options {
|
|
if err := option(&j, s.now()); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if j.parentCtx == nil {
|
|
j.parentCtx = s.shutdownCtx
|
|
}
|
|
j.ctx, j.cancel = context.WithCancel(j.parentCtx)
|
|
|
|
if !taskFunc.IsZero() && taskFunc.Type().NumIn() > 0 {
|
|
// if the first parameter is a context.Context and params have no context.Context, add current ctx to the params
|
|
if taskFunc.Type().In(0) == reflect.TypeOf((*context.Context)(nil)).Elem() {
|
|
if len(tsk.parameters) == 0 {
|
|
tsk.parameters = []any{j.ctx}
|
|
j.parameters = []any{j.ctx}
|
|
} else if _, ok := tsk.parameters[0].(context.Context); !ok {
|
|
tsk.parameters = append([]any{j.ctx}, tsk.parameters...)
|
|
j.parameters = append([]any{j.ctx}, j.parameters...)
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := s.verifyParameterType(taskFunc, tsk); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := definition.setup(&j, s.location, s.exec.clock.Now()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
newJobCtx, newJobCancel := context.WithCancel(context.Background())
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.newJobCh <- newJobIn{
|
|
ctx: newJobCtx,
|
|
cancel: newJobCancel,
|
|
job: j,
|
|
}:
|
|
}
|
|
|
|
select {
|
|
case <-newJobCtx.Done():
|
|
case <-s.shutdownCtx.Done():
|
|
}
|
|
|
|
out := s.jobFromInternalJob(j)
|
|
return &out, nil
|
|
}
|
|
|
|
func (s *scheduler) RemoveByTags(tags ...string) {
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.removeJobsByTagsCh <- tags:
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) RemoveJob(id uuid.UUID) error {
|
|
j := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh)
|
|
if j == nil || j.id == uuid.Nil {
|
|
return ErrJobNotFound
|
|
}
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.removeJobCh <- id:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *scheduler) Start() {
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
case s.startCh <- struct{}{}:
|
|
<-s.startedCh
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) StopJobs() error {
|
|
select {
|
|
case <-s.shutdownCtx.Done():
|
|
return nil
|
|
case s.stopCh <- struct{}{}:
|
|
}
|
|
|
|
t := time.NewTimer(s.exec.stopTimeout + 2*time.Second)
|
|
select {
|
|
case err := <-s.stopErrCh:
|
|
t.Stop()
|
|
return err
|
|
case <-t.C:
|
|
return ErrStopSchedulerTimedOut
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) Shutdown() error {
|
|
s.shutdownCancel()
|
|
|
|
t := time.NewTimer(s.exec.stopTimeout + 2*time.Second)
|
|
select {
|
|
case err := <-s.stopErrCh:
|
|
|
|
t.Stop()
|
|
return err
|
|
case <-t.C:
|
|
return ErrStopSchedulerTimedOut
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) {
|
|
return s.addOrUpdateJob(id, jobDefinition, task, options)
|
|
}
|
|
|
|
func (s *scheduler) JobsWaitingInQueue() int {
|
|
if s.exec.limitMode != nil && s.exec.limitMode.mode == LimitModeWait {
|
|
return len(s.exec.limitMode.in)
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// -----------------------------------------------
|
|
// -----------------------------------------------
|
|
// ------------- Scheduler Options ---------------
|
|
// -----------------------------------------------
|
|
// -----------------------------------------------
|
|
|
|
// SchedulerOption defines the function for setting
|
|
// options on the Scheduler.
|
|
type SchedulerOption func(*scheduler) error
|
|
|
|
// WithClock sets the clock used by the Scheduler
|
|
// to the clock provided. See https://github.com/jonboulle/clockwork
|
|
func WithClock(clock clockwork.Clock) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if clock == nil {
|
|
return ErrWithClockNil
|
|
}
|
|
s.exec.clock = clock
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithDistributedElector sets the elector to be used by multiple
|
|
// Scheduler instances to determine who should be the leader.
|
|
// Only the leader runs jobs, while non-leaders wait and continue
|
|
// to check if a new leader has been elected.
|
|
func WithDistributedElector(elector Elector) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if elector == nil {
|
|
return ErrWithDistributedElectorNil
|
|
}
|
|
s.exec.elector = elector
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithDistributedLocker sets the locker to be used by multiple
|
|
// Scheduler instances to ensure that only one instance of each
|
|
// job is run.
|
|
// To disable this global locker for specific jobs, see
|
|
// WithDisabledDistributedJobLocker.
|
|
func WithDistributedLocker(locker Locker) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if locker == nil {
|
|
return ErrWithDistributedLockerNil
|
|
}
|
|
s.exec.locker = locker
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithGlobalJobOptions sets JobOption's that will be applied to
|
|
// all jobs added to the scheduler. JobOption's set on the job
|
|
// itself will override if the same JobOption is set globally.
|
|
func WithGlobalJobOptions(jobOptions ...JobOption) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
s.globalJobOptions = jobOptions
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// LimitMode defines the modes used for handling jobs that reach
|
|
// the limit provided in WithLimitConcurrentJobs
|
|
type LimitMode int
|
|
|
|
const (
|
|
// LimitModeReschedule causes jobs reaching the limit set in
|
|
// WithLimitConcurrentJobs or WithSingletonMode to be skipped
|
|
// and rescheduled for the next run time rather than being
|
|
// queued up to wait.
|
|
LimitModeReschedule = 1
|
|
|
|
// LimitModeWait causes jobs reaching the limit set in
|
|
// WithLimitConcurrentJobs or WithSingletonMode to wait
|
|
// in a queue until a slot becomes available to run.
|
|
//
|
|
// Note: this mode can produce unpredictable results as
|
|
// job execution order isn't guaranteed. For example, a job that
|
|
// executes frequently may pile up in the wait queue and be executed
|
|
// many times back to back when the queue opens.
|
|
//
|
|
// Warning: do not use this mode if your jobs will continue to stack
|
|
// up beyond the ability of the limit workers to keep up. An example of
|
|
// what NOT to do:
|
|
//
|
|
// s, _ := gocron.NewScheduler(gocron.WithLimitConcurrentJobs)
|
|
// s.NewJob(
|
|
// gocron.DurationJob(
|
|
// time.Second,
|
|
// Task{
|
|
// Function: func() {
|
|
// time.Sleep(10 * time.Second)
|
|
// },
|
|
// },
|
|
// ),
|
|
// )
|
|
LimitModeWait = 2
|
|
)
|
|
|
|
// WithLimitConcurrentJobs sets the limit and mode to be used by the
|
|
// Scheduler for limiting the number of jobs that may be running at
|
|
// a given time.
|
|
//
|
|
// Note: the limit mode selected for WithLimitConcurrentJobs takes initial
|
|
// precedence in the event you are also running a limit mode at the job level
|
|
// using WithSingletonMode.
|
|
//
|
|
// Warning: a single time consuming job can dominate your limit in the event
|
|
// you are running both the scheduler limit WithLimitConcurrentJobs(1, LimitModeWait)
|
|
// and a job limit WithSingletonMode(LimitModeReschedule).
|
|
func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if limit == 0 {
|
|
return ErrWithLimitConcurrentJobsZero
|
|
}
|
|
s.exec.limitMode = &limitModeConfig{
|
|
mode: mode,
|
|
limit: limit,
|
|
in: make(chan jobIn, 1000),
|
|
singletonJobs: make(map[uuid.UUID]struct{}),
|
|
}
|
|
if mode == LimitModeReschedule {
|
|
s.exec.limitMode.rescheduleLimiter = make(chan struct{}, limit)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithLocation sets the location (i.e. timezone) that the scheduler
|
|
// should operate within. In many systems time.Local is UTC.
|
|
// Default: time.Local
|
|
func WithLocation(location *time.Location) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if location == nil {
|
|
return ErrWithLocationNil
|
|
}
|
|
s.location = location
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithLogger sets the logger to be used by the Scheduler.
|
|
func WithLogger(logger Logger) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if logger == nil {
|
|
return ErrWithLoggerNil
|
|
}
|
|
s.logger = logger
|
|
s.exec.logger = logger
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithStopTimeout sets the amount of time the Scheduler should
|
|
// wait gracefully for jobs to complete before returning when
|
|
// StopJobs() or Shutdown() are called.
|
|
// Default: 10 * time.Second
|
|
func WithStopTimeout(timeout time.Duration) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if timeout <= 0 {
|
|
return ErrWithStopTimeoutZeroOrNegative
|
|
}
|
|
s.exec.stopTimeout = timeout
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithMonitor sets the metrics provider to be used by the Scheduler.
|
|
func WithMonitor(monitor Monitor) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if monitor == nil {
|
|
return ErrWithMonitorNil
|
|
}
|
|
s.exec.monitor = monitor
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithMonitorStatus sets the metrics provider to be used by the Scheduler.
|
|
func WithMonitorStatus(monitor MonitorStatus) SchedulerOption {
|
|
return func(s *scheduler) error {
|
|
if monitor == nil {
|
|
return ErrWithMonitorNil
|
|
}
|
|
s.exec.monitorStatus = monitor
|
|
return nil
|
|
}
|
|
}
|