Source File
supervisor.go
Belonging Package
go.pact.im/x/supervisor
// Package supervisor provides a supervisor implementation for starting,
// stopping, and monitoring its child processes.
package supervisor
import (
)
// Supervisor is responsible for starting, stopping, and monitoring its child
// processes.
type Supervisor[ comparable, process.Runnable] struct {
table Table[, ]
clock *clock.Clock
// processes is a map of managed processes. It is used to track process
// state and allows returning an ErrProcessExists error to guarantee
// that at most one processes is active per key.
processes syncx.Map[, *managedProcess[]]
// runLock ensures that at most one Run method is executing at a time.
runLock syncx.Lock
// startMu guards startProcess and startProcessForKey calls when
// Supervisor is not running. It also allows waiting for the ongoing
// calls to complete on shutdown.
startMu sync.RWMutex
start bool
// parent is the parent context for all processes. It is set to the
// context passed to Run method and is guarded by startMu.
parent context.Context
// wg is the wait group for running processes and watchdogs. It is
// indirectly guarded by startMu and start.
wg sync.WaitGroup
}
// managedProcess contains a process.Process and associated process.Runnable
// managed by Supervisor.
type managedProcess[ process.Runnable] struct {
*process.Process
// proc is the underlying process instance with parametrized P type.
proc
// stopped is used by Supervisor to remove the process instance from
// internal map at most once.
stopped atomic.Bool
}
// NewSupervisor returns a new Supervisor instance. The given table is used to
// lookup managed processes and periodically restart failed units (or processes
// that were added externally). Managed processes are uniquely identifiable by
// key.
//
// A managed process may remove itself from the Supervisor by deleting the
// associated entry from the table before terminating. Likewise, to stop a
// process, it must be removed from the table prior to Stop call. That is,
// processes must be aware of being managed and the removal is tighly coupled
// with the table.
//
// As a rule of thumb, to keep the underlying table consistent, processes should
// not be re-added to table after being removed from the table. It is possible
// to implement re-adding on top of the Supervisor but that requires handling
// possible orderings of table removal, addition, re-addition and process
// startup, shutdown and self-removal (or a subset of these operations depending
// on the use cases).
func [ comparable, process.Runnable]( Table[, ], Options) *Supervisor[, ] {
.setDefaults()
return &Supervisor[, ]{
clock: .Clock,
table: ,
runLock: syncx.NewLock(),
}
}
// Run starts the supervisor and executes callback on successful initialization.
func ( *Supervisor[, ]) ( context.Context, process.Callback) error {
if := .runLock.Acquire(); != nil {
return
}
defer .runLock.Release()
.parent =
defer func() { .parent = nil }()
// Allow startProcess and startProcessForKey calls.
.startMu.Lock()
.start = true
.startMu.Unlock()
// Restore last state from the storage.
.restartInitial()
// Run restartLoop to keep the current state up-to-date with changes in
// the storage.
:= .spawnRestartLoop()
// Invoke callback.
:= ()
// Wait for restart loop since it uses wg to spawn background tasks.
()
// Block until all ongoing startProcess and startProcessForKey calls are
// complete and forbid subsequent calls.
.startMu.Lock()
.start = false
.startMu.Unlock()
// Stop all processes and wait for shutdown completion. At this point
// we are guaranteed that new processes would not be started.
.stopAll()
.wg.Wait()
return
}
// startProcessForKey starts the process for the given key. An error is returned
// if Supervisor’s Run method is not currently running.
func ( *Supervisor[, ]) ( context.Context, ) (*managedProcess[], error) {
.startMu.RLock()
defer .startMu.RUnlock()
if !.start {
return nil, ErrNotRunning
}
if , := .processes.LoadOrStore(, nil); {
return nil, ErrProcessExists
}
, := .table.Get(, )
if != nil {
.processes.Delete()
return nil, fmt.Errorf("get process from table: %w", )
}
return .startProcessUnlocked(, , )
}
// startProcess starts the process for the given key. Unlike startProcessForKey,
// it uses the given r Runnable instance instead of getting it from the table.
func ( *Supervisor[, ]) ( context.Context, , ) (*managedProcess[], error) {
.startMu.RLock()
defer .startMu.RUnlock()
if !.start {
return nil, ErrNotRunning
}
if , := .processes.LoadOrStore(, nil); {
return nil, ErrProcessExists
}
return .startProcessUnlocked(, , )
}
// startProcessUnlocked starts the given process assuming that the start lock
// was acquired and an entry in the processes map exists. It removes this entry
// on error.
func ( *Supervisor[, ]) ( context.Context, , ) (*managedProcess[], error) {
:= &managedProcess[]{
Process: process.NewProcess(.parent, ),
proc: ,
}
.processes.Store(, )
if := .Start(); != nil {
.processes.Delete()
return nil, fmt.Errorf("start process: %w", )
}
.wg.Add(1)
go .watchdog(, )
return , nil
}
// watchdog waits for process shutdown and removes it from processes map on such
// event.
func ( *Supervisor[, ]) ( , *managedProcess[]) {
defer .wg.Done()
<-.Done()
// Remove process from the processes map unless we have been stopped
// externally.
if .stopped.Swap(true) {
return
}
.processes.Delete()
_ = .Err() // TODO: log error
}
// stopProcess stops the process with the given key. If the processes does not
// exist, it returns ErrProcessNotFound.
func ( *Supervisor[, ]) ( context.Context, ) error {
, := .processes.Load()
if ! || == nil {
return ErrProcessNotFound
}
// Something else has already stopped the given process. Do nothing.
if .stopped.Swap(true) {
return ErrProcessNotFound
}
.processes.Delete()
return .Stop()
}
// stopAll stops all the processes in the underlying map. It does not wait for
// processes to complete the termination.
func ( *Supervisor[, ]) ( context.Context) {
.processes.Range(func( , *managedProcess[]) bool {
.wg.Add(1)
go func() {
defer .wg.Done()
_ = .stopProcess(, )
}()
return true
})
}
The pages are generated with Golds v0.4.9. (GOOS=linux GOARCH=amd64)