package grpc
import (
istatus
)
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
picker balancer.Picker
}
func () *pickerWrapper {
return &pickerWrapper{blockingCh: make(chan struct{})}
}
func ( *pickerWrapper) ( balancer.Picker) {
.mu.Lock()
if .done {
.mu.Unlock()
return
}
.picker =
close(.blockingCh)
.blockingCh = make(chan struct{})
.mu.Unlock()
}
func ( *acBalancerWrapper, *balancer.PickResult) {
.mu.Lock()
:= .ac
.mu.Unlock()
.incrCallsStarted()
:= .Done
.Done = func( balancer.DoneInfo) {
if .Err != nil && .Err != io.EOF {
.incrCallsFailed()
} else {
.incrCallsSucceeded()
}
if != nil {
()
}
}
}
func ( *pickerWrapper) ( context.Context, bool, balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
var chan struct{}
var error
for {
.mu.Lock()
if .done {
.mu.Unlock()
return nil, balancer.PickResult{}, ErrClientConnClosing
}
if .picker == nil {
= .blockingCh
}
if == .blockingCh {
.mu.Unlock()
select {
case <-.Done():
var string
if != nil {
= "latest balancer error: " + .Error()
} else {
= .Err().Error()
}
switch .Err() {
case context.DeadlineExceeded:
return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, )
case context.Canceled:
return nil, balancer.PickResult{}, status.Error(codes.Canceled, )
}
case <-:
}
continue
}
= .blockingCh
:= .picker
.mu.Unlock()
, := .Pick()
if != nil {
if == balancer.ErrNoSubConnAvailable {
continue
}
if , := status.FromError(); {
if istatus.IsRestrictedControlPlaneCode() {
= status.Errorf(codes.Internal, "received picker error with illegal status: %v", )
}
return nil, balancer.PickResult{}, dropError{error: }
}
if ! {
=
continue
}
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, .Error())
}
, := .SubConn.(*acBalancerWrapper)
if ! {
logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", .SubConn)
continue
}
if := .getAddrConn().getReadyTransport(); != nil {
if channelz.IsOn() {
doneChannelzWrapper(, &)
return , , nil
}
return , , nil
}
if .Done != nil {
.Done(balancer.DoneInfo{})
}
logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
}
}
func ( *pickerWrapper) () {
.mu.Lock()
defer .mu.Unlock()
if .done {
return
}
.done = true
close(.blockingCh)
}
type dropError struct {
error
}