package grpc
import (
)
type ccBalancerWrapper struct {
cc *ClientConn
balancer *gracefulswitch.Balancer
curBalancerName string
updateCh *buffer.Unbounded
resultCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event
}
func ( *ClientConn, balancer.BuildOptions) *ccBalancerWrapper {
:= &ccBalancerWrapper{
cc: ,
updateCh: buffer.NewUnbounded(),
resultCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
}
go .watcher()
.balancer = gracefulswitch.NewBalancer(, )
return
}
type ccStateUpdate struct {
ccs *balancer.ClientConnState
}
type scStateUpdate struct {
sc balancer.SubConn
state connectivity.State
err error
}
type exitIdleUpdate struct{}
type resolverErrorUpdate struct {
err error
}
type switchToUpdate struct {
name string
}
type subConnUpdate struct {
acbw *acBalancerWrapper
}
func ( *ccBalancerWrapper) () {
for {
select {
case := <-.updateCh.Get():
.updateCh.Load()
if .closed.HasFired() {
break
}
switch update := .(type) {
case *ccStateUpdate:
.handleClientConnStateChange(.ccs)
case *scStateUpdate:
.handleSubConnStateChange()
case *exitIdleUpdate:
.handleExitIdle()
case *resolverErrorUpdate:
.handleResolverError(.err)
case *switchToUpdate:
.handleSwitchTo(.name)
case *subConnUpdate:
.handleRemoveSubConn(.acbw)
default:
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", , )
}
case <-.closed.Done():
}
if .closed.HasFired() {
.handleClose()
return
}
}
}
func ( *ccBalancerWrapper) ( *balancer.ClientConnState) error {
.updateCh.Put(&ccStateUpdate{ccs: })
var interface{}
select {
case = <-.resultCh.Get():
.resultCh.Load()
case <-.closed.Done():
return nil
}
if == nil {
return nil
}
return .(error)
}
func ( *ccBalancerWrapper) ( *balancer.ClientConnState) {
if .curBalancerName != grpclbName {
var []resolver.Address
for , := range .ResolverState.Addresses {
if .Type == resolver.GRPCLB {
continue
}
= append(, )
}
.ResolverState.Addresses =
}
.resultCh.Put(.balancer.UpdateClientConnState(*))
}
func ( *ccBalancerWrapper) ( balancer.SubConn, connectivity.State, error) {
if == nil {
return
}
.updateCh.Put(&scStateUpdate{
sc: ,
state: ,
err: ,
})
}
func ( *ccBalancerWrapper) ( *scStateUpdate) {
.balancer.UpdateSubConnState(.sc, balancer.SubConnState{ConnectivityState: .state, ConnectionError: .err})
}
func ( *ccBalancerWrapper) () {
.updateCh.Put(&exitIdleUpdate{})
}
func ( *ccBalancerWrapper) () {
if .cc.GetState() != connectivity.Idle {
return
}
.balancer.ExitIdle()
}
func ( *ccBalancerWrapper) ( error) {
.updateCh.Put(&resolverErrorUpdate{err: })
}
func ( *ccBalancerWrapper) ( error) {
.balancer.ResolverError()
}
func ( *ccBalancerWrapper) ( string) {
.updateCh.Put(&switchToUpdate{name: })
}
func ( *ccBalancerWrapper) ( string) {
if strings.EqualFold(.curBalancerName, ) {
return
}
:= balancer.Get()
if == nil {
channelz.Warningf(logger, .cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, )
= newPickfirstBuilder()
} else {
channelz.Infof(logger, .cc.channelzID, "Channel switches to new LB policy %q", )
}
if := .balancer.SwitchTo(); != nil {
channelz.Errorf(logger, .cc.channelzID, "Channel failed to build new LB policy %q: %v", , )
return
}
.curBalancerName = .Name()
}
func ( *ccBalancerWrapper) ( *acBalancerWrapper) {
.cc.removeAddrConn(.getAddrConn(), errConnDrain)
}
func ( *ccBalancerWrapper) () {
.closed.Fire()
<-.done.Done()
}
func ( *ccBalancerWrapper) () {
.balancer.Close()
.done.Fire()
}
func ( *ccBalancerWrapper) ( []resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len() <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
, := .cc.newAddrConn(, )
if != nil {
channelz.Warningf(logger, .cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", )
return nil,
}
:= &acBalancerWrapper{ac: , producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
.ac.mu.Lock()
.acbw =
.ac.mu.Unlock()
return , nil
}
func ( *ccBalancerWrapper) ( balancer.SubConn) {
, := .(*acBalancerWrapper)
if ! {
return
}
.updateCh.Put(&subConnUpdate{acbw: })
}
func ( *ccBalancerWrapper) ( balancer.SubConn, []resolver.Address) {
, := .(*acBalancerWrapper)
if ! {
return
}
.UpdateAddresses()
}
func ( *ccBalancerWrapper) ( balancer.State) {
.cc.blockingpicker.updatePicker(.Picker)
.cc.csMgr.updateState(.ConnectivityState)
}
func ( *ccBalancerWrapper) ( resolver.ResolveNowOptions) {
.cc.resolveNow()
}
func ( *ccBalancerWrapper) () string {
return .cc.target
}
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
producers map[balancer.ProducerBuilder]*refCountedProducer
}
func ( *acBalancerWrapper) ( []resolver.Address) {
.mu.Lock()
defer .mu.Unlock()
if len() <= 0 {
.ac.cc.removeAddrConn(.ac, errConnDrain)
return
}
if !.ac.tryUpdateAddrs() {
:= .ac.cc
:= .ac.scopts
.ac.mu.Lock()
.ac.acbw = nil
.ac.mu.Unlock()
:= .ac.getState()
.ac.cc.removeAddrConn(.ac, errConnDrain)
if == connectivity.Shutdown {
return
}
, := .newAddrConn(, )
if != nil {
channelz.Warningf(logger, .ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", )
return
}
.ac =
.mu.Lock()
.acbw =
.mu.Unlock()
if != connectivity.Idle {
go .connect()
}
}
}
func ( *acBalancerWrapper) () {
.mu.Lock()
defer .mu.Unlock()
go .ac.connect()
}
func ( *acBalancerWrapper) () *addrConn {
.mu.Lock()
defer .mu.Unlock()
return .ac
}
var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected")
func ( *acBalancerWrapper) ( context.Context, *StreamDesc, string, ...CallOption) (ClientStream, error) {
:= .ac.getReadyTransport()
if == nil {
return nil, errSubConnNotReady
}
return newNonRetryClientStream(, , , , .ac, ...)
}
func ( *acBalancerWrapper) ( context.Context, string, interface{}, interface{}, ...CallOption) error {
, := .NewStream(, unaryStreamDesc, , ...)
if != nil {
return
}
if := .SendMsg(); != nil {
return
}
return .RecvMsg()
}
type refCountedProducer struct {
producer balancer.Producer
refs int
close func()
}
func ( *acBalancerWrapper) ( balancer.ProducerBuilder) (balancer.Producer, func()) {
.mu.Lock()
defer .mu.Unlock()
:= .producers[]
if == nil {
, := .Build()
= &refCountedProducer{producer: , close: }
.producers[] =
}
.refs++
:= func() {
.mu.Lock()
.refs--
if .refs == 0 {
defer .close()
delete(.producers, )
}
.mu.Unlock()
}
return .producer, grpcsync.OnceFunc()
}