// Package process provides primitives for managing processes, an abstraction // for stateful goroutines.
package process import ( ) // ErrProcessInvalidState is an error that is returned if the process // is not in the valid state for the operation. var ErrProcessInvalidState = errors.New("process: invalid process state") // State represents the current state of the process. type State int const ( // StateInitial is the initial state of the process. If Stop is called // in initial state, it prevents subsequent Start calls from succeeding, // thus entering StateStopped. Otherwise the transition on Start call // is to the StateStarting. StateInitial State = iota // StateStarting is the starting state that process enters when Start // is called from initial state. It transitions to either StateRunning // on success or StateStopped on failure (or premature shutdown observed // during startup). StateStarting // StateRunning is the state process enters after a successful startup. // The only possible transition is to the StateStopped if either Stop // is called or a process terminates. StateRunning // StateStopped is the final state of the process. There are no // transitions from this state. StateStopped ) // Process represents a stateful process that is running in the background. // It exposes Start and Stop methods that use an underlying state machine to // prevent operations in invalid states. Process is safe for concurrent use. // // Unlike some implementations of the underlying Runnable interface that allow // multiple consecutive Run invocations on the same instance, a Process may not // be reset and started after being stopped. type Process struct { proc Runnable parent context.Context stateMu sync.Mutex state State cancel context.CancelFunc stop chan struct{} done chan struct{} err atomic.Error } // NewProcess returns a new stateful process instance for the given Runnable // type parameter that would run with the ctx context. func ( context.Context, Runnable) *Process { return &Process{ proc: , parent: , stop: make(chan struct{}), done: make(chan struct{}), } } // Done returns a channel that is closed when process terminates. func ( *Process) () <-chan struct{} { return .done } // Err returns the error from running the process. func ( *Process) () error { return .err.Load() } // State returns the current process state. func ( *Process) () State { .stateMu.Lock() defer .stateMu.Unlock() return .state } // Start starts the process or cancels the underlying process context on error. // // The startup deadline may be set using the given ctx context. Note that the // context would not be used by process directly so the associated values are // not propagated. // // It returns ErrProcessInvalidState if the process is not in the initial state. func ( *Process) ( context.Context) error { var context.Context var context.CancelFunc if !.transition(StateStarting, func() { , = context.WithCancel(.parent) .parent, .cancel = nil, }) { return ErrProcessInvalidState } := make(chan struct{}) go func() { := .proc.Run(, func( context.Context) error { _ = .transition(StateRunning, nil) close() select { case <-.Done(): case <-.stop: } return nil }) _ = .transition(StateStopped, func() { .cancel = nil }) () .err.Store() close(.done) }() select { case <-.Done(): // Propagate context cancellation to cancel startup. () <-.Done() return .Err() case <-.Done(): // Note that a non-nil error is still an error on start if the // process terminates without initialization. return fmt.Errorf("run: %w", .Err()) case <-: // OK } return nil } // Stop stops the process by returning from the Run method callback and waiting // for the termination. // // The shutdown deadline may be set using the given ctx context. If the deadline // is exceeded, underlying context is canceled, signaling a forced shutdown to // the process. // // It returns ErrProcessInvalidState if the process was not started or has // already been stopped. func ( *Process) ( context.Context) error { var bool var context.CancelFunc if !.transition(StateStopped, func() { switch .state { case StateInitial: close(.done) = true .parent = nil case StateStarting: .cancel() = .cancel .cancel = nil case StateRunning: close(.stop) = .cancel .cancel = nil } }) { return ErrProcessInvalidState } if { // We prevented a start. Do nothing. return nil } select { case <-.Done(): // Propagate context cancellation to force shutdown. () <-.Done() return .Err() case <-.Done(): if := .Err(); != nil { return fmt.Errorf("run: %w", ) } return nil } } // transition advances to the next process state. It returns false if there is // not transition from the current to the given next state. func ( *Process) ( State, func()) bool { .stateMu.Lock() defer .stateMu.Unlock() switch .state { case StateInitial: if != StateStarting && != StateStopped { return false } case StateStarting: if != StateRunning && != StateStopped { return false } case StateRunning: if != StateStopped { return false } case StateStopped: return false } if != nil { () } .state = return true }