package supervisorimport ()const (restartInitialWait = time.SecondrestartLoopInterval = time.MinuterestartLoopWait = 0)// restartInitial restores the last state from the underlying table.func ( *Supervisor[, ]) ( context.Context) { _ = .restart(, restartInitialWait) // TODO: log error}// spawnRestartLoop spawns a restartLoop using the given context and returns// a function that stops restart loop and waits completion.func ( *Supervisor[, ]) ( context.Context) func() { , := context.WithCancel()varsync.WaitGroup .Add(1)gofunc() {defer .Done()defer () .restartLoop() }()returnfunc() { () .Wait() }}// restartLoop runs a loop that calls restart.func ( *Supervisor[, ]) ( context.Context) {const = restartLoopInterval := .clock.Timer()defer .Stop()for {select {case<-.Done():returncase<-.C(): } _ = .restart(, restartLoopWait) // TODO: log error .Reset() }}// restart starts processes from the table that are not currently running.//// If wait duration is not zero, it waits until background startProcess calls// complete. This allows ensuring that we restore at least partial state in// restoreInitial before invoking Supervisor’s Run callback.func ( *Supervisor[, ]) ( context.Context, time.Duration) error { , := .table.Iter()if != nil {returnfmt.Errorf("create iterator: %w", ) }deferfunc() {if := .Close(); != nil { _ = // TODO: log error } }()var , sync.WaitGroupvar , chanstruct{}for .Next() { , , := .Get()if != nil { _ = // TODO: log errorcontinue } := .restartProcessInBackground(, , )if > 0 {// Note that we cannot use timer.C here since there are multiple // consumers (each process we start) and timer sends the value // on channel only once (and never closes it).if == nil { = make(chanstruct{}) = make(chanstruct{}) .Add(1)gofunc() {defer .Done() := .clock.Timer()defer .Stop()select {case<-.Done():returncase<-.C():close()case<-: } }()defer .Wait() } .Add(1)gofunc() {defer .Done()select {case<-.Done():case<-:case<-: } }() } }if := .Err(); != nil { _ = // TODO: log error }// Do not wait for timer if all goroutines under wg have finished. Note // that this must be run after the loop since we cannot use wg.Wait // concurrently with wg.Add.if != nil { .Wait()close() }returnnil}// restartProcessInBackground starts the process for the given key in the// background. It returns a channel that is closed when startProcess call// completes.func ( *Supervisor[, ]) ( context.Context, , ) <-chanstruct{} { := make(chanstruct{}) .wg.Add(1)gofunc() {defer .wg.Done()deferclose() _, _ = .startProcess(, , ) // TODO: log error }()return}
The pages are generated with Goldsv0.4.9. (GOOS=linux GOARCH=amd64)