/* * * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */package grpcimport (istatus)// pickerGeneration stores a picker and a channel used to signal that a picker// newer than this one is available.typepickerGenerationstruct {// picker is the picker produced by the LB policy. May be nil if a picker // has never been produced.pickerbalancer.Picker// blockingCh is closed when the picker has been invalidated because there // is a new one available.blockingChchanstruct{}}// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick// actions and unblock when there's a picker update.typepickerWrapperstruct {// If pickerGen holds a nil pointer, the pickerWrapper is closed.pickerGenatomic.Pointer[pickerGeneration]}func () *pickerWrapper { := &pickerWrapper{} .pickerGen.Store(&pickerGeneration{blockingCh: make(chanstruct{}), })return}// updatePicker is called by UpdateState calls from the LB policy. It// unblocks all blocked pick.func ( *pickerWrapper) ( balancer.Picker) { := .pickerGen.Swap(&pickerGeneration{picker: ,blockingCh: make(chanstruct{}), })close(.blockingCh)}// doneChannelzWrapper performs the following:// - increments the calls started channelz counter// - wraps the done function in the passed in result to increment the calls// failed or calls succeeded channelz counter before invoking the actual// done function.func ( *acBalancerWrapper, *balancer.PickResult) { := .ac .incrCallsStarted() := .Done .Done = func( balancer.DoneInfo) {if .Err != nil && .Err != io.EOF { .incrCallsFailed() } else { .incrCallsSucceeded() }if != nil { () } }}typepickstruct {transporttransport.ClientTransport// the selected transportresultbalancer.PickResult// the contents of the pick from the LB policyblockedbool// set if a picker call queued for a new picker}// pick returns the transport that will be used for the RPC.// It may block in the following cases:// - there's no picker// - the current picker returns ErrNoSubConnAvailable// - the current picker returns other errors and failfast is false.// - the subConn returned by the current picker is not READY// When one of these situations happens, pick blocks until the picker gets updated.func ( *pickerWrapper) ( context.Context, bool, balancer.PickInfo) (pick, error) {varchanstruct{}varerror := falsefor { := .pickerGen.Load()if == nil {returnpick{}, ErrClientConnClosing }if .picker == nil { = .blockingCh }if == .blockingCh {// This could happen when either: // - pw.picker is nil (the previous if condition), or // - we have already called pick on the current picker.select {case<-.Done():varstringif != nil { = "latest balancer error: " + .Error() } else { = fmt.Sprintf("%v while waiting for connections to become ready", .Err()) }switch .Err() {casecontext.DeadlineExceeded:returnpick{}, status.Error(codes.DeadlineExceeded, )casecontext.Canceled:returnpick{}, status.Error(codes.Canceled, ) }case<-: }continue }// If the channel is set, it means that the pick call had to wait for a // new picker at some point. Either it's the first iteration and this // function received the first picker, or a picker errored with // ErrNoSubConnAvailable or errored with failfast set to false, which // will trigger a continue to the next iteration. In the first case this // conditional will hit if this call had to block (the channel is set). // In the second case, the only way it will get to this conditional is // if there is a new picker.if != nil { = true } = .blockingCh := .picker , := .Pick()if != nil {if == balancer.ErrNoSubConnAvailable {continue }if , := status.FromError(); {// Status error: end the RPC unconditionally with this status. // First restrict the code to the list allowed by gRFC A54.ifistatus.IsRestrictedControlPlaneCode() { = status.Errorf(codes.Internal, "received picker error with illegal status: %v", ) }returnpick{}, dropError{error: } }// For all other errors, wait for ready RPCs should block and other // RPCs should fail with unavailable.if ! { = continue }returnpick{}, status.Error(codes.Unavailable, .Error()) } , := .SubConn.(*acBalancerWrapper)if ! {logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", .SubConn)continue }if := .ac.getReadyTransport(); != nil {ifchannelz.IsOn() {doneChannelzWrapper(, &) }returnpick{transport: , result: , blocked: }, nil }if .Done != nil {// Calling done with nil error, no bytes sent and no bytes received. // DoneInfo with default value works. .Done(balancer.DoneInfo{}) }logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")// If ok == false, ac.state is not READY. // A valid picker always returns READY subConn. This means the state of ac // just changed, and picker will be updated shortly. // continue back to the beginning of the for loop to repick. }}func ( *pickerWrapper) () { := .pickerGen.Swap(nil)close(.blockingCh)}// reset clears the pickerWrapper and prepares it for being used again when idle// mode is exited.func ( *pickerWrapper) () { := .pickerGen.Swap(&pickerGeneration{blockingCh: make(chanstruct{})})close(.blockingCh)}// dropError is a wrapper error that indicates the LB policy wishes to drop the// RPC and not retry it.typedropErrorstruct {error}
The pages are generated with Goldsv0.7.6. (GOOS=linux GOARCH=amd64)