Source File
supervisor.go
Belonging Package
go.pact.im/x/supervisor
// Package supervisor provides a supervisor implementation for starting,// stopping, and monitoring its child processes.package supervisorimport ()// Supervisor is responsible for starting, stopping, and monitoring its child// processes.type Supervisor[ comparable, process.Runner] struct {table Table[, ]clock *clock.Clock// processes is a map of managed processes. It is used to track process// state and allows returning an ErrProcessExists error to guarantee// that at most one processes is active per key.processes typedMap[, *managedProcess[]]// runLock ensures that at most one Run method is executing at a time.runLock chanLock// startMu guards startProcess and startProcessForKey calls when// Supervisor is not running. It also allows waiting for the ongoing// calls to complete on shutdown.startMu sync.RWMutexstart bool// parent is the parent context for all processes. It is set to the// context passed to Run method and is guarded by startMu.parent context.Context// wg is the wait group for running processes and watchdogs. It is// indirectly guarded by startMu and start.wg sync.WaitGroup}// managedProcess contains a process.Process and associated [process.Runner]// managed by Supervisor.type managedProcess[ process.Runner] struct {*process.Process// runner is the underlying process entrypoint with parametrized type P.runner// stopped is used by Supervisor to remove the process instance from// internal map at most once.stopped atomic.Bool}// NewSupervisor returns a new Supervisor instance. The given table is used to// lookup managed processes and periodically restart failed units (or processes// that were added externally). Managed processes are uniquely identifiable by// key.//// A managed process may remove itself from the Supervisor by deleting the// associated entry from the table before terminating. Likewise, to stop a// process, it must be removed from the table prior to Stop call. That is,// processes must be aware of being managed and the removal is tighly coupled// with the table.//// As a rule of thumb, to keep the underlying table consistent, processes should// not be re-added to table after being removed from the table. It is possible// to implement re-adding on top of the Supervisor but that requires handling// possible orderings of table removal, addition, re-addition and process// startup, shutdown and self-removal (or a subset of these operations depending// on the use cases).func [ comparable, process.Runner]( Table[, ], Options) *Supervisor[, ] {.setDefaults()return &Supervisor[, ]{clock: .Clock,table: ,runLock: newChanLock(),}}// Run starts the supervisor and executes callback on successful initialization.func ( *Supervisor[, ]) ( context.Context, process.Callback) error {if := .runLock.Acquire(); != nil {return}defer .runLock.Release().parent =defer func() { .parent = nil }()// Allow startProcess and startProcessForKey calls..startMu.Lock().start = true.startMu.Unlock()// Restore last state from the storage..restartInitial()// Run restartLoop to keep the current state up-to-date with changes in// the storage.:= .spawnRestartLoop()// Invoke callback.:= ()// Wait for restart loop since it uses wg to spawn background tasks.()// Block until all ongoing startProcess and startProcessForKey calls are// complete and forbid subsequent calls..startMu.Lock().start = false.startMu.Unlock()// Stop all processes and wait for shutdown completion. At this point// we are guaranteed that new processes would not be started..stopAll().wg.Wait()return}// startProcessForKey starts the process for the given key. An error is returned// if Supervisor’s Run method is not currently running.func ( *Supervisor[, ]) ( context.Context, ) (*managedProcess[], error) {.startMu.RLock()defer .startMu.RUnlock()if !.start {return nil, ErrNotRunning}if , := .processes.LoadOrStore(, nil); {return nil, ErrProcessExists}, := .table.Get(, )if != nil {.processes.Delete()return nil, fmt.Errorf("get process from table: %w", )}return .startProcessUnlocked(, , )}// startProcess starts the process for the given key. Unlike startProcessForKey,// it uses the given r [process.Runner] instance instead of getting it from the// table.func ( *Supervisor[, ]) ( context.Context, , ) (*managedProcess[], error) {.startMu.RLock()defer .startMu.RUnlock()if !.start {return nil, ErrNotRunning}if , := .processes.LoadOrStore(, nil); {return nil, ErrProcessExists}return .startProcessUnlocked(, , )}// startProcessUnlocked starts the given process assuming that the start lock// was acquired and an entry in the processes map exists. It removes this entry// on error.func ( *Supervisor[, ]) ( context.Context, , ) (*managedProcess[], error) {:= &managedProcess[]{Process: process.NewProcess(.parent, ),runner: ,}.processes.Store(, )if := .Start(); != nil {.processes.Delete()return nil, fmt.Errorf("start process: %w", )}.wg.Add(1)go .watchdog(, )return , nil}// watchdog waits for process shutdown and removes it from processes map on such// event.func ( *Supervisor[, ]) ( , *managedProcess[]) {defer .wg.Done()<-.Done()// Remove process from the processes map unless we have been stopped// externally.if .stopped.Swap(true) {return}.processes.Delete()_ = .Err() // TODO: log error}// stopProcess stops the process with the given key. If the processes does not// exist, it returns ErrProcessNotFound.func ( *Supervisor[, ]) ( context.Context, ) error {, := .processes.Load()if ! || == nil {return ErrProcessNotFound}// Something else has already stopped the given process. Do nothing.if .stopped.Swap(true) {return ErrProcessNotFound}.processes.Delete()return .Stop()}// stopAll stops all the processes in the underlying map. It does not wait for// processes to complete the termination.func ( *Supervisor[, ]) ( context.Context) {.processes.Range(func( , *managedProcess[]) bool {.wg.Add(1)go func() {defer .wg.Done()_ = .stopProcess(, )}()return true})}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)