Source File
process.go
Belonging Package
go.pact.im/x/process
// Package process provides primitives for managing processes, an abstraction
// for stateful goroutines.
package process
import (
)
// ErrProcessInvalidState is an error that is returned if the process
// is not in the valid state for the operation.
var ErrProcessInvalidState = errors.New("process: invalid process state")
// State represents the current state of the process.
type State int
const (
// StateInitial is the initial state of the process. If Stop is called
// in initial state, it prevents subsequent Start calls from succeeding,
// thus entering StateStopped. Otherwise the transition on Start call
// is to the StateStarting.
StateInitial State = iota
// StateStarting is the starting state that process enters when Start
// is called from initial state. It transitions to either StateRunning
// on success or StateStopped on failure (or premature shutdown observed
// during startup).
StateStarting
// StateRunning is the state process enters after a successful startup.
// The only possible transition is to the StateStopped if either Stop
// is called or a process terminates.
StateRunning
// StateStopped is the final state of the process. There are no
// transitions from this state.
StateStopped
)
// Process represents a stateful process that is running in the background.
// It exposes Start and Stop methods that use an underlying state machine to
// prevent operations in invalid states. Process is safe for concurrent use.
//
// Unlike some implementations of the underlying Runnable interface that allow
// multiple consecutive Run invocations on the same instance, a Process may not
// be reset and started after being stopped.
type Process struct {
proc Runnable
parent context.Context
stateMu sync.Mutex
state State
cancel context.CancelFunc
stop chan struct{}
done chan struct{}
err atomic.Error
}
// NewProcess returns a new stateful process instance for the given Runnable
// type parameter that would run with the ctx context.
func ( context.Context, Runnable) *Process {
return &Process{
proc: ,
parent: ,
stop: make(chan struct{}),
done: make(chan struct{}),
}
}
// Done returns a channel that is closed when process terminates.
func ( *Process) () <-chan struct{} {
return .done
}
// Err returns the error from running the process.
func ( *Process) () error {
return .err.Load()
}
// State returns the current process state.
func ( *Process) () State {
.stateMu.Lock()
defer .stateMu.Unlock()
return .state
}
// Start starts the process or cancels the underlying process context on error.
//
// The startup deadline may be set using the given ctx context. Note that the
// context would not be used by process directly so the associated values are
// not propagated.
//
// It returns ErrProcessInvalidState if the process is not in the initial state.
func ( *Process) ( context.Context) error {
var context.Context
var context.CancelFunc
if !.transition(StateStarting, func() {
, = context.WithCancel(.parent)
.parent, .cancel = nil,
}) {
return ErrProcessInvalidState
}
:= make(chan struct{})
go func() {
:= .proc.Run(, func( context.Context) error {
_ = .transition(StateRunning, nil)
close()
select {
case <-.Done():
case <-.stop:
}
return nil
})
_ = .transition(StateStopped, func() {
.cancel = nil
})
()
.err.Store()
close(.done)
}()
select {
case <-.Done():
// Propagate context cancellation to cancel startup.
()
<-.Done()
return .Err()
case <-.Done():
// Note that a non-nil error is still an error on start if the
// process terminates without initialization.
return fmt.Errorf("run: %w", .Err())
case <-:
// OK
}
return nil
}
// Stop stops the process by returning from the Run method callback and waiting
// for the termination.
//
// The shutdown deadline may be set using the given ctx context. If the deadline
// is exceeded, underlying context is canceled, signaling a forced shutdown to
// the process.
//
// It returns ErrProcessInvalidState if the process was not started or has
// already been stopped.
func ( *Process) ( context.Context) error {
var bool
var context.CancelFunc
if !.transition(StateStopped, func() {
switch .state {
case StateInitial:
close(.done)
= true
.parent = nil
case StateStarting:
.cancel()
= .cancel
.cancel = nil
case StateRunning:
close(.stop)
= .cancel
.cancel = nil
}
}) {
return ErrProcessInvalidState
}
if {
// We prevented a start. Do nothing.
return nil
}
select {
case <-.Done():
// Propagate context cancellation to force shutdown.
()
<-.Done()
return .Err()
case <-.Done():
if := .Err(); != nil {
return fmt.Errorf("run: %w", )
}
return nil
}
}
// transition advances to the next process state. It returns false if there is
// not transition from the current to the given next state.
func ( *Process) ( State, func()) bool {
.stateMu.Lock()
defer .stateMu.Unlock()
switch .state {
case StateInitial:
if != StateStarting && != StateStopped {
return false
}
case StateStarting:
if != StateRunning && != StateStopped {
return false
}
case StateRunning:
if != StateStopped {
return false
}
case StateStopped:
return false
}
if != nil {
()
}
.state =
return true
}
The pages are generated with Golds v0.4.9. (GOOS=linux GOARCH=amd64)