package grpc
import (
)
var (
setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
noOpRegisterHealthListenerFn = func( context.Context, func(balancer.SubConnState)) func() {
(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return func() {}
}
)
type ccBalancerWrapper struct {
internal.EnforceClientConnEmbedding
cc *ClientConn
opts balancer.BuildOptions
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc
curBalancerName string
balancer *gracefulswitch.Balancer
mu sync.Mutex
closed bool
}
func ( *ClientConn) *ccBalancerWrapper {
, := context.WithCancel(.ctx)
:= &ccBalancerWrapper{
cc: ,
opts: balancer.BuildOptions{
DialCreds: .dopts.copts.TransportCredentials,
CredsBundle: .dopts.copts.CredsBundle,
Dialer: .dopts.copts.Dialer,
Authority: .authority,
CustomUserAgent: .dopts.copts.UserAgent,
ChannelzParent: .channelz,
Target: .parsedTarget,
},
serializer: grpcsync.NewCallbackSerializer(),
serializerCancel: ,
}
.balancer = gracefulswitch.NewBalancer(, .opts)
return
}
func ( *ccBalancerWrapper) () stats.MetricsRecorder {
return .cc.metricsRecorderList
}
func ( *ccBalancerWrapper) ( *balancer.ClientConnState) error {
:= make(chan error)
:= func( context.Context) {
defer close()
if .Err() != nil || .balancer == nil {
return
}
:= gracefulswitch.ChildName(.BalancerConfig)
if .curBalancerName != {
.curBalancerName =
channelz.Infof(logger, .cc.channelz, "Channel switches to new LB policy %q", )
}
:= .balancer.UpdateClientConnState(*)
if logger.V(2) && != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", )
}
<-
}
:= func() { close() }
.serializer.ScheduleOr(, )
return <-
}
func ( *ccBalancerWrapper) ( error) {
.serializer.TrySchedule(func( context.Context) {
if .Err() != nil || .balancer == nil {
return
}
.balancer.ResolverError()
})
}
func ( *ccBalancerWrapper) () {
.mu.Lock()
.closed = true
.mu.Unlock()
channelz.Info(logger, .cc.channelz, "ccBalancerWrapper: closing")
.serializer.TrySchedule(func(context.Context) {
if .balancer == nil {
return
}
.balancer.Close()
.balancer = nil
})
.serializerCancel()
}
func ( *ccBalancerWrapper) () {
.serializer.TrySchedule(func( context.Context) {
if .Err() != nil || .balancer == nil {
return
}
.balancer.ExitIdle()
})
}
func ( *ccBalancerWrapper) ( []resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) {
.cc.mu.Lock()
defer .cc.mu.Unlock()
.mu.Lock()
if .closed {
.mu.Unlock()
return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed")
}
.mu.Unlock()
if len() == 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
, := .cc.newAddrConnLocked(, )
if != nil {
channelz.Warningf(logger, .cc.channelz, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", )
return nil,
}
:= &acBalancerWrapper{
ccb: ,
ac: ,
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
stateListener: .StateListener,
healthData: newHealthData(connectivity.Idle),
}
.acbw =
return , nil
}
func ( *ccBalancerWrapper) (balancer.SubConn) {
logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
}
func ( *ccBalancerWrapper) ( balancer.SubConn, []resolver.Address) {
, := .(*acBalancerWrapper)
if ! {
return
}
.UpdateAddresses()
}
func ( *ccBalancerWrapper) ( balancer.State) {
.cc.mu.Lock()
defer .cc.mu.Unlock()
if .cc.conns == nil {
return
}
.mu.Lock()
if .closed {
.mu.Unlock()
return
}
.mu.Unlock()
.cc.pickerWrapper.updatePicker(.Picker)
.cc.csMgr.updateState(.ConnectivityState)
}
func ( *ccBalancerWrapper) ( resolver.ResolveNowOptions) {
.cc.mu.RLock()
defer .cc.mu.RUnlock()
.mu.Lock()
if .closed {
.mu.Unlock()
return
}
.mu.Unlock()
.cc.resolveNowLocked()
}
func ( *ccBalancerWrapper) () string {
return .cc.target
}
type acBalancerWrapper struct {
internal.EnforceSubConnEmbedding
ac *addrConn
ccb *ccBalancerWrapper
stateListener func(balancer.SubConnState)
producersMu sync.Mutex
producers map[balancer.ProducerBuilder]*refCountedProducer
healthMu sync.Mutex
healthData *healthData
}
type healthData struct {
connectivityState connectivity.State
closeHealthProducer func()
}
func ( connectivity.State) *healthData {
return &healthData{
connectivityState: ,
closeHealthProducer: func() {},
}
}
func ( *acBalancerWrapper) ( connectivity.State, resolver.Address, error) {
.ccb.serializer.TrySchedule(func( context.Context) {
if .Err() != nil || .ccb.balancer == nil {
return
}
.closeProducers()
:= balancer.SubConnState{ConnectivityState: , ConnectionError: }
if == connectivity.Ready {
setConnectedAddress(&, )
}
.healthMu.Lock()
.healthData = newHealthData(.ConnectivityState)
.healthMu.Unlock()
.stateListener()
})
}
func ( *acBalancerWrapper) () string {
return fmt.Sprintf("SubConn(id:%d)", .ac.channelz.ID)
}
func ( *acBalancerWrapper) ( []resolver.Address) {
.ac.updateAddrs()
}
func ( *acBalancerWrapper) () {
go .ac.connect()
}
func ( *acBalancerWrapper) () {
.closeProducers()
.ccb.cc.removeAddrConn(.ac, errConnDrain)
}
func ( *acBalancerWrapper) ( context.Context, *StreamDesc, string, ...CallOption) (ClientStream, error) {
:= .ac.getReadyTransport()
if == nil {
return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready")
}
return newNonRetryClientStream(, , , , .ac, ...)
}
func ( *acBalancerWrapper) ( context.Context, string, any, any, ...CallOption) error {
, := .NewStream(, unaryStreamDesc, , ...)
if != nil {
return
}
if := .SendMsg(); != nil {
return
}
return .RecvMsg()
}
type refCountedProducer struct {
producer balancer.Producer
refs int
close func()
}
func ( *acBalancerWrapper) ( balancer.ProducerBuilder) (balancer.Producer, func()) {
.producersMu.Lock()
defer .producersMu.Unlock()
:= .producers[]
if == nil {
, := .Build()
= &refCountedProducer{producer: , close: }
.producers[] =
}
.refs++
:= func() {
.producersMu.Lock()
.refs--
if .refs == 0 {
defer .close()
delete(.producers, )
}
.producersMu.Unlock()
}
return .producer, sync.OnceFunc()
}
func ( *acBalancerWrapper) () {
.producersMu.Lock()
defer .producersMu.Unlock()
for , := range .producers {
.refs = 0
.close()
delete(.producers, )
}
}
type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()
func ( *acBalancerWrapper) () func(context.Context, func(balancer.SubConnState)) func() {
if .ccb.cc.dopts.disableHealthCheck {
return noOpRegisterHealthListenerFn
}
:= internal.RegisterClientHealthCheckListener
if == nil {
return noOpRegisterHealthListenerFn
}
:= .ac.cc.healthCheckConfig()
if == nil {
return noOpRegisterHealthListenerFn
}
return func( context.Context, func(balancer.SubConnState)) func() {
return .(healthProducerRegisterFn)(, , .ServiceName, )
}
}
func ( *acBalancerWrapper) ( func(balancer.SubConnState)) {
.healthMu.Lock()
defer .healthMu.Unlock()
.healthData.closeHealthProducer()
if .healthData.connectivityState != connectivity.Ready {
return
}
:= newHealthData(connectivity.Ready)
.healthData =
if == nil {
return
}
:= .healthListenerRegFn()
.ccb.serializer.TrySchedule(func( context.Context) {
if .Err() != nil || .ccb.balancer == nil {
return
}
.healthMu.Lock()
defer .healthMu.Unlock()
if .healthData != {
return
}
:= func( balancer.SubConnState) {
.ccb.serializer.TrySchedule(func( context.Context) {
if .Err() != nil || .ccb.balancer == nil {
return
}
.healthMu.Lock()
defer .healthMu.Unlock()
if .healthData != {
return
}
()
})
}
.closeHealthProducer = (, )
})
}