package cron
import (
)
type Cron struct {
entries []*Entry
chain Chain
stop chan struct{}
add chan *Entry
remove chan EntryID
snapshot chan chan []Entry
running bool
logger Logger
runningMu sync.Mutex
location *time.Location
parser ScheduleParser
nextID EntryID
jobWaiter sync.WaitGroup
}
type ScheduleParser interface {
Parse(spec string) (Schedule, error)
}
type Job interface {
Run()
}
type Schedule interface {
Next(time.Time) time.Time
}
type EntryID int
type Entry struct {
ID EntryID
Schedule Schedule
Next time.Time
Prev time.Time
WrappedJob Job
Job Job
}
func ( Entry) () bool { return .ID != 0 }
type byTime []*Entry
func ( byTime) () int { return len() }
func ( byTime) (, int) { [], [] = [], [] }
func ( byTime) (, int) bool {
if [].Next.IsZero() {
return false
}
if [].Next.IsZero() {
return true
}
return [].Next.Before([].Next)
}
func ( ...Option) *Cron {
:= &Cron{
entries: nil,
chain: NewChain(),
add: make(chan *Entry),
stop: make(chan struct{}),
snapshot: make(chan chan []Entry),
remove: make(chan EntryID),
running: false,
runningMu: sync.Mutex{},
logger: DefaultLogger,
location: time.Local,
parser: standardParser,
}
for , := range {
()
}
return
}
type FuncJob func()
func ( FuncJob) () { () }
func ( *Cron) ( string, func()) (EntryID, error) {
return .AddJob(, FuncJob())
}
func ( *Cron) ( string, Job) (EntryID, error) {
, := .parser.Parse()
if != nil {
return 0,
}
return .Schedule(, ), nil
}
func ( *Cron) ( Schedule, Job) EntryID {
.runningMu.Lock()
defer .runningMu.Unlock()
.nextID++
:= &Entry{
ID: .nextID,
Schedule: ,
WrappedJob: .chain.Then(),
Job: ,
}
if !.running {
.entries = append(.entries, )
} else {
.add <-
}
return .ID
}
func ( *Cron) () []Entry {
.runningMu.Lock()
defer .runningMu.Unlock()
if .running {
:= make(chan []Entry, 1)
.snapshot <-
return <-
}
return .entrySnapshot()
}
func ( *Cron) () *time.Location {
return .location
}
func ( *Cron) ( EntryID) Entry {
for , := range .Entries() {
if == .ID {
return
}
}
return Entry{}
}
func ( *Cron) ( EntryID) {
.runningMu.Lock()
defer .runningMu.Unlock()
if .running {
.remove <-
} else {
.removeEntry()
}
}
func ( *Cron) () {
.runningMu.Lock()
defer .runningMu.Unlock()
if .running {
return
}
.running = true
go .run()
}
func ( *Cron) () {
.runningMu.Lock()
if .running {
.runningMu.Unlock()
return
}
.running = true
.runningMu.Unlock()
.run()
}
func ( *Cron) () {
.logger.Info("start")
:= .now()
for , := range .entries {
.Next = .Schedule.Next()
.logger.Info("schedule", "now", , "entry", .ID, "next", .Next)
}
for {
sort.Sort(byTime(.entries))
var *time.Timer
if len(.entries) == 0 || .entries[0].Next.IsZero() {
= time.NewTimer(100000 * time.Hour)
} else {
= time.NewTimer(.entries[0].Next.Sub())
}
for {
select {
case = <-.C:
= .In(.location)
.logger.Info("wake", "now", )
for , := range .entries {
if .Next.After() || .Next.IsZero() {
break
}
.startJob(.WrappedJob)
.Prev = .Next
.Next = .Schedule.Next()
.logger.Info("run", "now", , "entry", .ID, "next", .Next)
}
case := <-.add:
.Stop()
= .now()
.Next = .Schedule.Next()
.entries = append(.entries, )
.logger.Info("added", "now", , "entry", .ID, "next", .Next)
case := <-.snapshot:
<- .entrySnapshot()
continue
case <-.stop:
.Stop()
.logger.Info("stop")
return
case := <-.remove:
.Stop()
= .now()
.removeEntry()
.logger.Info("removed", "entry", )
}
break
}
}
}
func ( *Cron) ( Job) {
.jobWaiter.Add(1)
go func() {
defer .jobWaiter.Done()
.Run()
}()
}
func ( *Cron) () time.Time {
return time.Now().In(.location)
}
func ( *Cron) () context.Context {
.runningMu.Lock()
defer .runningMu.Unlock()
if .running {
.stop <- struct{}{}
.running = false
}
, := context.WithCancel(context.Background())
go func() {
.jobWaiter.Wait()
()
}()
return
}
func ( *Cron) () []Entry {
var = make([]Entry, len(.entries))
for , := range .entries {
[] = *
}
return
}
func ( *Cron) ( EntryID) {
var []*Entry
for , := range .entries {
if .ID != {
= append(, )
}
}
.entries =
}