Source File
process.go
Belonging Package
go.pact.im/x/process
// Package process provides primitives for managing processes, an abstraction// for stateful goroutines.package processimport ()// ErrProcessInvalidState is an error that is returned if the process// is not in the valid state for the operation.var ErrProcessInvalidState = errors.New("process: invalid process state")// State represents the current state of the process.type State intconst (// StateInitial is the initial state of the process. If Stop is called// in initial state, it prevents subsequent Start calls from succeeding,// thus entering StateStopped. Otherwise the transition on Start call// is to the StateStarting.StateInitial State = iota// StateStarting is the starting state that process enters when Start// is called from initial state. It transitions to either StateRunning// on success or StateStopped on failure (or premature shutdown observed// during startup).StateStarting// StateRunning is the state process enters after a successful startup.// The only possible transition is to the StateStopped if either Stop// is called or a process terminates.StateRunning// StateStopped is the final state of the process. There are no// transitions from this state.StateStopped)// Process represents a stateful process that is running in the background.// It exposes Start and Stop methods that use an underlying state machine to// prevent operations in invalid states. Process is safe for concurrent use.//// Unlike some implementations of the underlying [Runner] interface that allow// multiple consecutive Run invocations on the same instance, a Process may not// be reset and started after being stopped.type Process struct {runner Runnerparent context.ContextstateMu sync.Mutexstate Statecancel context.CancelFuncstop chan struct{}done chan struct{}err atomic.Value}// NewProcess returns a new stateful process instance for the given [Runner]// type parameter that would run with the ctx context.func ( context.Context, Runner) *Process {return &Process{runner: ,parent: ,stop: make(chan struct{}),done: make(chan struct{}),}}// Done returns a channel that is closed when process terminates.func ( *Process) () <-chan struct{} {return .done}// Err returns the error from running the process.func ( *Process) () error {, := .err.Load().(error)return}// State returns the current process state.func ( *Process) () State {.stateMu.Lock()defer .stateMu.Unlock()return .state}// Start starts the process or cancels the underlying process context on error.//// The startup deadline may be set using the given ctx context. Note that the// context would not be used by process directly so the associated values are// not propagated.//// It returns [ErrProcessInvalidState] if the process is not in the initial// state.func ( *Process) ( context.Context) error {var context.Contextvar context.CancelFuncif !.transition(StateStarting, func() {, = context.WithCancel(.parent).parent, .cancel = nil,}) {return ErrProcessInvalidState}:= make(chan struct{})go func() {:= .runner.Run(, func( context.Context) error {_ = .transition(StateRunning, nil)close()select {case <-.Done():case <-.stop:}return nil})_ = .transition(StateStopped, func() {.cancel = nil})()if != nil {.err.Store()}close(.done)}()select {case <-.Done():// Propagate context cancellation to cancel startup.()<-.Done()return .Err()case <-.Done():// Note that a non-nil error is still an error on start if the// process terminates without initialization.return fmt.Errorf("run: %w", .Err())case <-:// OK}return nil}// Stop stops the process by returning from the Run method callback and waiting// for the termination.//// The shutdown deadline may be set using the given ctx context. If the deadline// is exceeded, underlying context is canceled, signaling a forced shutdown to// the process.//// It returns [ErrProcessInvalidState] if the process was not started or has// already been stopped.func ( *Process) ( context.Context) error {var boolvar context.CancelFuncif !.transition(StateStopped, func() {switch .state {case StateInitial:close(.done)= true.parent = nilcase StateStarting:.cancel()= .cancel.cancel = nilcase StateRunning:close(.stop)= .cancel.cancel = nil}}) {return ErrProcessInvalidState}if {// We prevented a start. Do nothing.return nil}select {case <-.Done():// Propagate context cancellation to force shutdown.()<-.Done()return .Err()case <-.Done():if := .Err(); != nil {return fmt.Errorf("run: %w", )}return nil}}// transition advances to the next process state. It returns false if there is// not transition from the current to the given next state.func ( *Process) ( State, func()) bool {.stateMu.Lock()defer .stateMu.Unlock()switch .state {case StateInitial:if != StateStarting && != StateStopped {return false}case StateStarting:if != StateRunning && != StateStopped {return false}case StateRunning:if != StateStopped {return false}case StateStopped:return false}if != nil {()}.state =return true}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)