// Package supervisor provides a supervisor implementation for starting, // stopping, and monitoring its child processes.
package supervisor import ( ) // Supervisor is responsible for starting, stopping, and monitoring its child // processes. type Supervisor[ comparable, process.Runnable] struct { table Table[, ] clock *clock.Clock // processes is a map of managed processes. It is used to track process // state and allows returning an ErrProcessExists error to guarantee // that at most one processes is active per key. processes syncx.Map[, *managedProcess[]] // runLock ensures that at most one Run method is executing at a time. runLock syncx.Lock // startMu guards startProcess and startProcessForKey calls when // Supervisor is not running. It also allows waiting for the ongoing // calls to complete on shutdown. startMu sync.RWMutex start bool // parent is the parent context for all processes. It is set to the // context passed to Run method and is guarded by startMu. parent context.Context // wg is the wait group for running processes and watchdogs. It is // indirectly guarded by startMu and start. wg sync.WaitGroup } // managedProcess contains a process.Process and associated process.Runnable // managed by Supervisor. type managedProcess[ process.Runnable] struct { *process.Process // proc is the underlying process instance with parametrized P type. proc // stopped is used by Supervisor to remove the process instance from // internal map at most once. stopped atomic.Bool } // NewSupervisor returns a new Supervisor instance. The given table is used to // lookup managed processes and periodically restart failed units (or processes // that were added externally). Managed processes are uniquely identifiable by // key. // // A managed process may remove itself from the Supervisor by deleting the // associated entry from the table before terminating. Likewise, to stop a // process, it must be removed from the table prior to Stop call. That is, // processes must be aware of being managed and the removal is tighly coupled // with the table. // // As a rule of thumb, to keep the underlying table consistent, processes should // not be re-added to table after being removed from the table. It is possible // to implement re-adding on top of the Supervisor but that requires handling // possible orderings of table removal, addition, re-addition and process // startup, shutdown and self-removal (or a subset of these operations depending // on the use cases). func [ comparable, process.Runnable]( Table[, ], Options) *Supervisor[, ] { .setDefaults() return &Supervisor[, ]{ clock: .Clock, table: , runLock: syncx.NewLock(), } } // Run starts the supervisor and executes callback on successful initialization. func ( *Supervisor[, ]) ( context.Context, process.Callback) error { if := .runLock.Acquire(); != nil { return } defer .runLock.Release() .parent = defer func() { .parent = nil }() // Allow startProcess and startProcessForKey calls. .startMu.Lock() .start = true .startMu.Unlock() // Restore last state from the storage. .restartInitial() // Run restartLoop to keep the current state up-to-date with changes in // the storage. := .spawnRestartLoop() // Invoke callback. := () // Wait for restart loop since it uses wg to spawn background tasks. () // Block until all ongoing startProcess and startProcessForKey calls are // complete and forbid subsequent calls. .startMu.Lock() .start = false .startMu.Unlock() // Stop all processes and wait for shutdown completion. At this point // we are guaranteed that new processes would not be started. .stopAll() .wg.Wait() return } // startProcessForKey starts the process for the given key. An error is returned // if Supervisor’s Run method is not currently running. func ( *Supervisor[, ]) ( context.Context, ) (*managedProcess[], error) { .startMu.RLock() defer .startMu.RUnlock() if !.start { return nil, ErrNotRunning } if , := .processes.LoadOrStore(, nil); { return nil, ErrProcessExists } , := .table.Get(, ) if != nil { .processes.Delete() return nil, fmt.Errorf("get process from table: %w", ) } return .startProcessUnlocked(, , ) } // startProcess starts the process for the given key. Unlike startProcessForKey, // it uses the given r Runnable instance instead of getting it from the table. func ( *Supervisor[, ]) ( context.Context, , ) (*managedProcess[], error) { .startMu.RLock() defer .startMu.RUnlock() if !.start { return nil, ErrNotRunning } if , := .processes.LoadOrStore(, nil); { return nil, ErrProcessExists } return .startProcessUnlocked(, , ) } // startProcessUnlocked starts the given process assuming that the start lock // was acquired and an entry in the processes map exists. It removes this entry // on error. func ( *Supervisor[, ]) ( context.Context, , ) (*managedProcess[], error) { := &managedProcess[]{ Process: process.NewProcess(.parent, ), proc: , } .processes.Store(, ) if := .Start(); != nil { .processes.Delete() return nil, fmt.Errorf("start process: %w", ) } .wg.Add(1) go .watchdog(, ) return , nil } // watchdog waits for process shutdown and removes it from processes map on such // event. func ( *Supervisor[, ]) ( , *managedProcess[]) { defer .wg.Done() <-.Done() // Remove process from the processes map unless we have been stopped // externally. if .stopped.Swap(true) { return } .processes.Delete() _ = .Err() // TODO: log error } // stopProcess stops the process with the given key. If the processes does not // exist, it returns ErrProcessNotFound. func ( *Supervisor[, ]) ( context.Context, ) error { , := .processes.Load() if ! || == nil { return ErrProcessNotFound } // Something else has already stopped the given process. Do nothing. if .stopped.Swap(true) { return ErrProcessNotFound } .processes.Delete() return .Stop() } // stopAll stops all the processes in the underlying map. It does not wait for // processes to complete the termination. func ( *Supervisor[, ]) ( context.Context) { .processes.Range(func( , *managedProcess[]) bool { .wg.Add(1) go func() { defer .wg.Done() _ = .stopProcess(, ) }() return true }) }