/*
 *
 * Copyright 2024 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

// Package endpointsharding implements a load balancing policy that manages // homogeneous child policies each owning a single endpoint. // // # Experimental // // Notice: This package is EXPERIMENTAL and may be changed or removed in a // later release.
package endpointsharding import ( rand ) var randIntN = rand.IntN // ChildState is the balancer state of a child along with the endpoint which // identifies the child balancer. type ChildState struct { Endpoint resolver.Endpoint State balancer.State // Balancer exposes only the ExitIdler interface of the child LB policy. // Other methods of the child policy are called only by endpointsharding. Balancer ExitIdler } // ExitIdler provides access to only the ExitIdle method of the child balancer. type ExitIdler interface { // ExitIdle instructs the LB policy to reconnect to backends / exit the // IDLE state, if appropriate and possible. Note that SubConns that enter // the IDLE state will not reconnect until SubConn.Connect is called. ExitIdle() } // Options are the options to configure the behaviour of the // endpointsharding balancer. type Options struct { // DisableAutoReconnect allows the balancer to keep child balancer in the // IDLE state until they are explicitly triggered to exit using the // ChildState obtained from the endpointsharding picker. When set to false, // the endpointsharding balancer will automatically call ExitIdle on child // connections that report IDLE. DisableAutoReconnect bool } // ChildBuilderFunc creates a new balancer with the ClientConn. It has the same // type as the balancer.Builder.Build method. type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer // NewBalancer returns a load balancing policy that manages homogeneous child // policies each owning a single endpoint. The endpointsharding balancer // forwards the LoadBalancingConfig in ClientConn state updates to its children. func ( balancer.ClientConn, balancer.BuildOptions, ChildBuilderFunc, Options) balancer.Balancer { := &endpointSharding{ cc: , bOpts: , esOpts: , childBuilder: , } .children.Store(resolver.NewEndpointMap[*balancerWrapper]()) return } // endpointSharding is a balancer that wraps child balancers. It creates a child // balancer with child config for every unique Endpoint received. It updates the // child states on any update from parent or child. type endpointSharding struct { cc balancer.ClientConn bOpts balancer.BuildOptions esOpts Options childBuilder ChildBuilderFunc // childMu synchronizes calls to any single child. It must be held for all // calls into a child. To avoid deadlocks, do not acquire childMu while // holding mu. childMu sync.Mutex children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]] // inhibitChildUpdates is set during UpdateClientConnState/ResolverError // calls (calls to children will each produce an update, only want one // update). inhibitChildUpdates atomic.Bool // mu synchronizes access to the state stored in balancerWrappers in the // children field. mu must not be held during calls into a child since // synchronous calls back from the child may require taking mu, causing a // deadlock. To avoid deadlocks, do not acquire childMu while holding mu. mu sync.Mutex } // rotateEndpoints returns a slice of all the input endpoints rotated a random // amount. func ( []resolver.Endpoint) []resolver.Endpoint { := len() if == 0 { return } := randIntN() // Make a copy to avoid mutating data beyond the end of es. := make([]resolver.Endpoint, ) copy(, [:]) copy([-:], [:]) return } // UpdateClientConnState creates a child for new endpoints and deletes children // for endpoints that are no longer present. It also updates all the children, // and sends a single synchronous update of the childrens' aggregated state at // the end of the UpdateClientConnState operation. If any endpoint has no // addresses it will ignore that endpoint. Otherwise, returns first error found // from a child, but fully processes the new update. func ( *endpointSharding) ( balancer.ClientConnState) error { .childMu.Lock() defer .childMu.Unlock() .inhibitChildUpdates.Store(true) defer func() { .inhibitChildUpdates.Store(false) .updateState() }() var error := .children.Load() := resolver.NewEndpointMap[*balancerWrapper]() // Update/Create new children. for , := range rotateEndpoints(.ResolverState.Endpoints) { if , := .Get(); { // Endpoint child was already created, continue to avoid duplicate // update. continue } , := .Get() if { // Endpoint attributes may have changed, update the stored endpoint. .mu.Lock() .childState.Endpoint = .mu.Unlock() } else { = &balancerWrapper{ childState: ChildState{Endpoint: }, ClientConn: .cc, es: , } .childState.Balancer = .child = .childBuilder(, .bOpts) } .Set(, ) if := .updateClientConnStateLocked(balancer.ClientConnState{ BalancerConfig: .BalancerConfig, ResolverState: resolver.State{ Endpoints: []resolver.Endpoint{}, Attributes: .ResolverState.Attributes, }, }); != nil && == nil { // Return first error found, and always commit full processing of // updating children. If desired to process more specific errors // across all endpoints, caller should make these specific // validations, this is a current limitation for simplicity sake. = } } // Delete old children that are no longer present. for , := range .Keys() { , := .Get() if , := .Get(); ! { .closeLocked() } } .children.Store() if .Len() == 0 { return balancer.ErrBadResolverState } return } // ResolverError forwards the resolver error to all of the endpointSharding's // children and sends a single synchronous update of the childStates at the end // of the ResolverError operation. func ( *endpointSharding) ( error) { .childMu.Lock() defer .childMu.Unlock() .inhibitChildUpdates.Store(true) defer func() { .inhibitChildUpdates.Store(false) .updateState() }() := .children.Load() for , := range .Values() { .resolverErrorLocked() } } func ( *endpointSharding) (balancer.SubConn, balancer.SubConnState) { // UpdateSubConnState is deprecated. } func ( *endpointSharding) () { .childMu.Lock() defer .childMu.Unlock() := .children.Load() for , := range .Values() { .closeLocked() } } func ( *endpointSharding) () { .childMu.Lock() defer .childMu.Unlock() for , := range .children.Load().Values() { if !.isClosed { .child.ExitIdle() } } } // updateState updates this component's state. It sends the aggregated state, // and a picker with round robin behavior with all the child states present if // needed. func ( *endpointSharding) () { if .inhibitChildUpdates.Load() { return } var , , , []balancer.Picker .mu.Lock() defer .mu.Unlock() := .children.Load() := make([]ChildState, 0, .Len()) for , := range .Values() { := .childState = append(, ) := .State.Picker switch .State.ConnectivityState { case connectivity.Ready: = append(, ) case connectivity.Connecting: = append(, ) case connectivity.Idle: = append(, ) case connectivity.TransientFailure: = append(, ) // connectivity.Shutdown shouldn't appear. } } // Construct the round robin picker based off the aggregated state. Whatever // the aggregated state, use the pickers present that are currently in that // state only. var connectivity.State var []balancer.Picker if len() >= 1 { = connectivity.Ready = } else if len() >= 1 { = connectivity.Connecting = } else if len() >= 1 { = connectivity.Idle = } else if len() >= 1 { = connectivity.TransientFailure = } else { = connectivity.TransientFailure = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))} } // No children (resolver error before valid update). := &pickerWithChildStates{ pickers: , childStates: , next: uint32(randIntN(len())), } .cc.UpdateState(balancer.State{ ConnectivityState: , Picker: , }) } // pickerWithChildStates delegates to the pickers it holds in a round robin // fashion. It also contains the childStates of all the endpointSharding's // children. type pickerWithChildStates struct { pickers []balancer.Picker childStates []ChildState next uint32 } func ( *pickerWithChildStates) ( balancer.PickInfo) (balancer.PickResult, error) { := atomic.AddUint32(&.next, 1) := .pickers[%uint32(len(.pickers))] return .Pick() } // ChildStatesFromPicker returns the state of all the children managed by the // endpoint sharding balancer that created this picker. func ( balancer.Picker) []ChildState { , := .(*pickerWithChildStates) if ! { return nil } return .childStates } // balancerWrapper is a wrapper of a balancer. It ID's a child balancer by // endpoint, and persists recent child balancer state. type balancerWrapper struct { // The following fields are initialized at build time and read-only after // that and therefore do not need to be guarded by a mutex. // child contains the wrapped balancer. Access its methods only through // methods on balancerWrapper to ensure proper synchronization child balancer.Balancer balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns es *endpointSharding // Access to the following fields is guarded by es.mu. childState ChildState isClosed bool } func ( *balancerWrapper) ( balancer.State) { .es.mu.Lock() .childState.State = .es.mu.Unlock() if .ConnectivityState == connectivity.Idle && !.es.esOpts.DisableAutoReconnect { .ExitIdle() } .es.updateState() } // ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to // avoid deadlocks due to synchronous balancer state updates. func ( *balancerWrapper) () { go func() { .es.childMu.Lock() if !.isClosed { .child.ExitIdle() } .es.childMu.Unlock() }() } // updateClientConnStateLocked delivers the ClientConnState to the child // balancer. Callers must hold the child mutex of the parent endpointsharding // balancer. func ( *balancerWrapper) ( balancer.ClientConnState) error { return .child.UpdateClientConnState() } // closeLocked closes the child balancer. Callers must hold the child mutext of // the parent endpointsharding balancer. func ( *balancerWrapper) () { .child.Close() .isClosed = true } func ( *balancerWrapper) ( error) { .child.ResolverError() }