Source File
supervisor.go
Belonging Package
go.pact.im/x/supervisor
// Package supervisor provides [process.Runner] supervision implementation.package supervisorimport ()// errInterrupt is an internal error used to distinguish between supervisor// interrupts and other errors.var errInterrupt = errors.New("supervisor: interrupt")// errRecursiveOrConcurrentRun is an error that [Supervisor] returns on// recursive or concurrent Run call.var errRecursiveOrConcurrentRun = errors.New("supervisor: recursive or concurrent Supervisor.Run calls are not allowed")// Supervisor runs a [process.Runner] alongside a control callback, managing// their concurrent execution and coordinated shutdown. It wraps the runner// with [flaky.Executor] retry logic and pre/post execution hooks.type Supervisor struct {runner process.Runnerexec flaky.Executorhook Hookintr atomic.Pointer[supervisorInterrupter]}// Hook is a set of hooks for supervisor’s runner.type Hook struct {// Pre is a function that is called on runner’s callback. A non-nil// error is immediately returned from the callback. In that case, an// error from PostHook is ignored.// Defaults to a function that returns nil error.Pre func(context.Context, *Supervisor) error// Post is a function that is called before runner’s callback returns.// The result is returned from the callback.// Defaults to a function that returns nil error.Post func(context.Context, *Supervisor) error}// NewSupervisor returns a new [Supervisor] instance for the given runner.func ( process.Runner, flaky.Executor, Hook) *Supervisor {return &Supervisor{runner: ,exec: ,hook: ,}}// Interrupt signals the supervisor to stop the current execution. This method// is non-blocking and returns immediately; it does not wait for execution to// complete.//// If no execution is active ([Supervisor.Run] is not being called), this method// does nothing. Otherwise, it signals the process to stop, but the caller must// separately wait for Run to return if synchronization is needed.//// It is safe to call from multiple goroutines concurrently.func ( *Supervisor) () {:= .intr.Load()if == nil {return}.Interrupt()}// Run executes the supervisor’s managed process concurrently with the provided// callback function without waiting for the initial process startup.//// Use [Hook.Pre] and [Hook.Post] to run code in runner’s callback.//// It returns an error combining any errors from the callback and runner execution.//// Execution timeline:// - T0: Start runner under executor in background goroutine.// - T0: Start callback in current goroutine.// - T1: Callback returns → interrupt executor.// - T1: Executor returns → cancel callback context.// - T2: Wait for executor to complete cleanup → return combined errors.//// Only one active Run invocation is permitted per Supervisor instance.// Concurrent or recursive calls will return an error.func ( *Supervisor) ( context.Context, process.Callback) error {, := context.WithCancel()defer ():= &supervisorInterrupter{done: make(chan struct{}),cancel: ,}if !.intr.CompareAndSwap(nil, ) {return errRecursiveOrConcurrentRun}defer .intr.Store(nil)var sync.WaitGroupvar error.Go(func() {defer () // cancel callback= .execute(, )}):= ().Interrupt().Wait()// Do not wrap errors in errors.joinError unless necessary.switch {case == nil:returncase == nil:return}return errors.Join(, )}// execute runs the supervised process under the executor with pre/post Run hooks.func ( *Supervisor) ( context.Context, *supervisorInterrupter) error {:= .exec.Execute(, func( context.Context) error {if .shouldStopBeforeRunner() {return flaky.Internal(errInterrupt)}:= .runner.Run(, func( context.Context) error {if := .pre(); != nil {_ = .post()return}select {case <-.Done():case <-.done:}return .post()}).afterRunner()return})if errors.Is(, errInterrupt) {= nil}return}// pre runs pre hook of the supervisor.func ( *Supervisor) ( context.Context) error {if .hook.Pre == nil {return nil}return .hook.Pre(, )}// pre runs post hook of the supervisor.func ( *Supervisor) ( context.Context) error {if .hook.Post == nil {return nil}return .hook.Post(, )}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)