/*
 *
 * Copyright 2017 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

// Package pickfirst contains the pick_first load balancing policy.
package pickfirst import ( rand internalgrpclog _ // For automatically registering the new pickfirst if required. ) func () { if envconfig.NewPickFirstEnabled { return } balancer.Register(pickfirstBuilder{}) } var logger = grpclog.Component("pick-first-lb") const ( // Name is the name of the pick_first balancer. Name = "pick_first" logPrefix = "[pick-first-lb %p] " ) type pickfirstBuilder struct{} func (pickfirstBuilder) ( balancer.ClientConn, balancer.BuildOptions) balancer.Balancer { := &pickfirstBalancer{cc: } .logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, )) return } func (pickfirstBuilder) () string { return Name } type pfConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` // If set to true, instructs the LB policy to shuffle the order of the list // of endpoints received from the name resolver before attempting to // connect to them. ShuffleAddressList bool `json:"shuffleAddressList"` } func (pickfirstBuilder) ( json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { var pfConfig if := json.Unmarshal(, &); != nil { return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(), ) } return , nil } type pickfirstBalancer struct { logger *internalgrpclog.PrefixLogger state connectivity.State cc balancer.ClientConn subConn balancer.SubConn } func ( *pickfirstBalancer) ( error) { if .logger.V(2) { .logger.Infof("Received error from the name resolver: %v", ) } if .subConn == nil { .state = connectivity.TransientFailure } if .state != connectivity.TransientFailure { // The picker will not change since the balancer does not currently // report an error. return } .cc.UpdateState(balancer.State{ ConnectivityState: connectivity.TransientFailure, Picker: &picker{err: fmt.Errorf("name resolver error: %v", )}, }) } // Shuffler is an interface for shuffling an address list. type Shuffler interface { ShuffleAddressListForTesting(n int, swap func(i, j int)) } // ShuffleAddressListForTesting pseudo-randomizes the order of addresses. n // is the number of elements. swap swaps the elements with indexes i and j. func ( int, func(, int)) { rand.Shuffle(, ) } func ( *pickfirstBalancer) ( balancer.ClientConnState) error { if len(.ResolverState.Addresses) == 0 && len(.ResolverState.Endpoints) == 0 { // The resolver reported an empty address list. Treat it like an error by // calling b.ResolverError. if .subConn != nil { // Shut down the old subConn. All addresses were removed, so it is // no longer valid. .subConn.Shutdown() .subConn = nil } .ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } // We don't have to guard this block with the env var because ParseConfig // already does so. , := .BalancerConfig.(pfConfig) if .BalancerConfig != nil && ! { return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", .BalancerConfig, .BalancerConfig) } if .logger.V(2) { .logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(), pretty.ToJSON(.ResolverState)) } var []resolver.Address if := .ResolverState.Endpoints; len() != 0 { // Perform the optional shuffling described in gRFC A62. The shuffling will // change the order of endpoints but not touch the order of the addresses // within each endpoint. - A61 if .ShuffleAddressList { = append([]resolver.Endpoint{}, ...) internal.RandShuffle(len(), func(, int) { [], [] = [], [] }) } // "Flatten the list by concatenating the ordered list of addresses for each // of the endpoints, in order." - A61 for , := range { // "In the flattened list, interleave addresses from the two address // families, as per RFC-8304 section 4." - A61 // TODO: support the above language. = append(, .Addresses...) } } else { // Endpoints not set, process addresses until we migrate resolver // emissions fully to Endpoints. The top channel does wrap emitted // addresses with endpoints, however some balancers such as weighted // target do not forward the corresponding correct endpoints down/split // endpoints properly. Once all balancers correctly forward endpoints // down, can delete this else conditional. = .ResolverState.Addresses if .ShuffleAddressList { = append([]resolver.Address{}, ...) rand.Shuffle(len(), func(, int) { [], [] = [], [] }) } } if .subConn != nil { .cc.UpdateAddresses(.subConn, ) return nil } var balancer.SubConn , := .cc.NewSubConn(, balancer.NewSubConnOptions{ StateListener: func( balancer.SubConnState) { .updateSubConnState(, ) }, }) if != nil { if .logger.V(2) { .logger.Infof("Failed to create new SubConn: %v", ) } .state = connectivity.TransientFailure .cc.UpdateState(balancer.State{ ConnectivityState: connectivity.TransientFailure, Picker: &picker{err: fmt.Errorf("error creating connection: %v", )}, }) return balancer.ErrBadResolverState } .subConn = .state = connectivity.Idle .cc.UpdateState(balancer.State{ ConnectivityState: connectivity.Connecting, Picker: &picker{err: balancer.ErrNoSubConnAvailable}, }) .subConn.Connect() return nil } // UpdateSubConnState is unused as a StateListener is always registered when // creating SubConns. func ( *pickfirstBalancer) ( balancer.SubConn, balancer.SubConnState) { .logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", , ) } func ( *pickfirstBalancer) ( balancer.SubConn, balancer.SubConnState) { if .logger.V(2) { .logger.Infof("Received SubConn state update: %p, %+v", , ) } if .subConn != { if .logger.V(2) { .logger.Infof("Ignored state change because subConn is not recognized") } return } if .ConnectivityState == connectivity.Shutdown { .subConn = nil return } switch .ConnectivityState { case connectivity.Ready: .cc.UpdateState(balancer.State{ ConnectivityState: .ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: }}, }) case connectivity.Connecting: if .state == connectivity.TransientFailure { // We stay in TransientFailure until we are Ready. See A62. return } .cc.UpdateState(balancer.State{ ConnectivityState: .ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}, }) case connectivity.Idle: if .state == connectivity.TransientFailure { // We stay in TransientFailure until we are Ready. Also kick the // subConn out of Idle into Connecting. See A62. .subConn.Connect() return } .cc.UpdateState(balancer.State{ ConnectivityState: .ConnectivityState, Picker: &idlePicker{subConn: }, }) case connectivity.TransientFailure: .cc.UpdateState(balancer.State{ ConnectivityState: .ConnectivityState, Picker: &picker{err: .ConnectionError}, }) } .state = .ConnectivityState } func ( *pickfirstBalancer) () { } func ( *pickfirstBalancer) () { if .subConn != nil && .state == connectivity.Idle { .subConn.Connect() } } type picker struct { result balancer.PickResult err error } func ( *picker) (balancer.PickInfo) (balancer.PickResult, error) { return .result, .err } // idlePicker is used when the SubConn is IDLE and kicks the SubConn into // CONNECTING when Pick is called. type idlePicker struct { subConn balancer.SubConn } func ( *idlePicker) (balancer.PickInfo) (balancer.PickResult, error) { .subConn.Connect() return balancer.PickResult{}, balancer.ErrNoSubConnAvailable }