/*
 *
 * Copyright 2017 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package grpc

import (
	
	
	

	
	
	
	
	
	
	
	
	
	
)

var (
	setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
	// noOpRegisterHealthListenerFn is used when client side health checking is
	// disabled. It sends a single READY update on the registered listener.
	noOpRegisterHealthListenerFn = func( context.Context,  func(balancer.SubConnState)) func() {
		(balancer.SubConnState{ConnectivityState: connectivity.Ready})
		return func() {}
	}
)

// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
// ccBalancerWrapper implements methods corresponding to the ones on the
// balancer.Balancer interface. The ClientConn is free to call these methods
// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
// to the Balancer happen in order by performing them in the serializer, without
// any mutexes held.
//
// ccBalancerWrapper also implements the balancer.ClientConn interface and is
// passed to the Balancer implementations. It invokes unexported methods on the
// ClientConn to handle these calls from the Balancer.
//
// It uses the gracefulswitch.Balancer internally to ensure that balancer
// switches happen in a graceful manner.
type ccBalancerWrapper struct {
	internal.EnforceClientConnEmbedding
	// The following fields are initialized when the wrapper is created and are
	// read-only afterwards, and therefore can be accessed without a mutex.
	cc               *ClientConn
	opts             balancer.BuildOptions
	serializer       *grpcsync.CallbackSerializer
	serializerCancel context.CancelFunc

	// The following fields are only accessed within the serializer or during
	// initialization.
	curBalancerName string
	balancer        *gracefulswitch.Balancer

	// The following field is protected by mu.  Caller must take cc.mu before
	// taking mu.
	mu     sync.Mutex
	closed bool
}

// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the updateClientConnState() method
// is invoked.
func ( *ClientConn) *ccBalancerWrapper {
	,  := context.WithCancel(.ctx)
	 := &ccBalancerWrapper{
		cc: ,
		opts: balancer.BuildOptions{
			DialCreds:       .dopts.copts.TransportCredentials,
			CredsBundle:     .dopts.copts.CredsBundle,
			Dialer:          .dopts.copts.Dialer,
			Authority:       .authority,
			CustomUserAgent: .dopts.copts.UserAgent,
			ChannelzParent:  .channelz,
			Target:          .parsedTarget,
		},
		serializer:       grpcsync.NewCallbackSerializer(),
		serializerCancel: ,
	}
	.balancer = gracefulswitch.NewBalancer(, .opts)
	return 
}

func ( *ccBalancerWrapper) () stats.MetricsRecorder {
	return .cc.metricsRecorderList
}

// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer.  This is always executed from the serializer, so
// it is safe to call into the balancer here.
func ( *ccBalancerWrapper) ( *balancer.ClientConnState) error {
	 := make(chan error)
	 := func( context.Context) {
		defer close()
		if .Err() != nil || .balancer == nil {
			return
		}
		 := gracefulswitch.ChildName(.BalancerConfig)
		if .curBalancerName !=  {
			.curBalancerName = 
			channelz.Infof(logger, .cc.channelz, "Channel switches to new LB policy %q", )
		}
		 := .balancer.UpdateClientConnState(*)
		if logger.V(2) &&  != nil {
			logger.Infof("error from balancer.UpdateClientConnState: %v", )
		}
		 <- 
	}
	 := func() { close() }

	// UpdateClientConnState can race with Close, and when the latter wins, the
	// serializer is closed, and the attempt to schedule the callback will fail.
	// It is acceptable to ignore this failure. But since we want to handle the
	// state update in a blocking fashion (when we successfully schedule the
	// callback), we have to use the ScheduleOr method and not the MaybeSchedule
	// method on the serializer.
	.serializer.ScheduleOr(, )
	return <-
}

// resolverError is invoked by grpc to push a resolver error to the underlying
// balancer.  The call to the balancer is executed from the serializer.
func ( *ccBalancerWrapper) ( error) {
	.serializer.TrySchedule(func( context.Context) {
		if .Err() != nil || .balancer == nil {
			return
		}
		.balancer.ResolverError()
	})
}

// close initiates async shutdown of the wrapper.  cc.mu must be held when
// calling this function.  To determine the wrapper has finished shutting down,
// the channel should block on ccb.serializer.Done() without cc.mu held.
func ( *ccBalancerWrapper) () {
	.mu.Lock()
	.closed = true
	.mu.Unlock()
	channelz.Info(logger, .cc.channelz, "ccBalancerWrapper: closing")
	.serializer.TrySchedule(func(context.Context) {
		if .balancer == nil {
			return
		}
		.balancer.Close()
		.balancer = nil
	})
	.serializerCancel()
}

// exitIdle invokes the balancer's exitIdle method in the serializer.
func ( *ccBalancerWrapper) () {
	.serializer.TrySchedule(func( context.Context) {
		if .Err() != nil || .balancer == nil {
			return
		}
		.balancer.ExitIdle()
	})
}

func ( *ccBalancerWrapper) ( []resolver.Address,  balancer.NewSubConnOptions) (balancer.SubConn, error) {
	.cc.mu.Lock()
	defer .cc.mu.Unlock()

	.mu.Lock()
	if .closed {
		.mu.Unlock()
		return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed")
	}
	.mu.Unlock()

	if len() == 0 {
		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
	}
	,  := .cc.newAddrConnLocked(, )
	if  != nil {
		channelz.Warningf(logger, .cc.channelz, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", )
		return nil, 
	}
	 := &acBalancerWrapper{
		ccb:           ,
		ac:            ,
		producers:     make(map[balancer.ProducerBuilder]*refCountedProducer),
		stateListener: .StateListener,
		healthData:    newHealthData(connectivity.Idle),
	}
	.acbw = 
	return , nil
}

func ( *ccBalancerWrapper) (balancer.SubConn) {
	// The graceful switch balancer will never call this.
	logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
}

func ( *ccBalancerWrapper) ( balancer.SubConn,  []resolver.Address) {
	,  := .(*acBalancerWrapper)
	if ! {
		return
	}
	.UpdateAddresses()
}

func ( *ccBalancerWrapper) ( balancer.State) {
	.cc.mu.Lock()
	defer .cc.mu.Unlock()
	if .cc.conns == nil {
		// The CC has been closed; ignore this update.
		return
	}

	.mu.Lock()
	if .closed {
		.mu.Unlock()
		return
	}
	.mu.Unlock()
	// Update picker before updating state.  Even though the ordering here does
	// not matter, it can lead to multiple calls of Pick in the common start-up
	// case where we wait for ready and then perform an RPC.  If the picker is
	// updated later, we could call the "connecting" picker when the state is
	// updated, and then call the "ready" picker after the picker gets updated.

	// Note that there is no need to check if the balancer wrapper was closed,
	// as we know the graceful switch LB policy will not call cc if it has been
	// closed.
	.cc.pickerWrapper.updatePicker(.Picker)
	.cc.csMgr.updateState(.ConnectivityState)
}

func ( *ccBalancerWrapper) ( resolver.ResolveNowOptions) {
	.cc.mu.RLock()
	defer .cc.mu.RUnlock()

	.mu.Lock()
	if .closed {
		.mu.Unlock()
		return
	}
	.mu.Unlock()
	.cc.resolveNowLocked()
}

func ( *ccBalancerWrapper) () string {
	return .cc.target
}

// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
	internal.EnforceSubConnEmbedding
	ac            *addrConn          // read-only
	ccb           *ccBalancerWrapper // read-only
	stateListener func(balancer.SubConnState)

	producersMu sync.Mutex
	producers   map[balancer.ProducerBuilder]*refCountedProducer

	// Access to healthData is protected by healthMu.
	healthMu sync.Mutex
	// healthData is stored as a pointer to detect when the health listener is
	// dropped or updated. This is required as closures can't be compared for
	// equality.
	healthData *healthData
}

// healthData holds data related to health state reporting.
type healthData struct {
	// connectivityState stores the most recent connectivity state delivered
	// to the LB policy. This is stored to avoid sending updates when the
	// SubConn has already exited connectivity state READY.
	connectivityState connectivity.State
	// closeHealthProducer stores function to close the ref counted health
	// producer. The health producer is automatically closed when the SubConn
	// state changes.
	closeHealthProducer func()
}

func ( connectivity.State) *healthData {
	return &healthData{
		connectivityState:   ,
		closeHealthProducer: func() {},
	}
}

// updateState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func ( *acBalancerWrapper) ( connectivity.State,  resolver.Address,  error) {
	.ccb.serializer.TrySchedule(func( context.Context) {
		if .Err() != nil || .ccb.balancer == nil {
			return
		}
		// Invalidate all producers on any state change.
		.closeProducers()

		// Even though it is optional for balancers, gracefulswitch ensures
		// opts.StateListener is set, so this cannot ever be nil.
		// TODO: delete this comment when UpdateSubConnState is removed.
		 := balancer.SubConnState{ConnectivityState: , ConnectionError: }
		if  == connectivity.Ready {
			setConnectedAddress(&, )
		}
		// Invalidate the health listener by updating the healthData.
		.healthMu.Lock()
		// A race may occur if a health listener is registered soon after the
		// connectivity state is set but before the stateListener is called.
		// Two cases may arise:
		// 1. The new state is not READY: RegisterHealthListener has checks to
		//    ensure no updates are sent when the connectivity state is not
		//    READY.
		// 2. The new state is READY: This means that the old state wasn't Ready.
		//    The RegisterHealthListener API mentions that a health listener
		//    must not be registered when a SubConn is not ready to avoid such
		//    races. When this happens, the LB policy would get health updates
		//    on the old listener. When the LB policy registers a new listener
		//    on receiving the connectivity update, the health updates will be
		//    sent to the new health listener.
		.healthData = newHealthData(.ConnectivityState)
		.healthMu.Unlock()

		.stateListener()
	})
}

func ( *acBalancerWrapper) () string {
	return fmt.Sprintf("SubConn(id:%d)", .ac.channelz.ID)
}

func ( *acBalancerWrapper) ( []resolver.Address) {
	.ac.updateAddrs()
}

func ( *acBalancerWrapper) () {
	go .ac.connect()
}

func ( *acBalancerWrapper) () {
	.closeProducers()
	.ccb.cc.removeAddrConn(.ac, errConnDrain)
}

// NewStream begins a streaming RPC on the addrConn.  If the addrConn is not
// ready, blocks until it is or ctx expires.  Returns an error when the context
// expires or the addrConn is shut down.
func ( *acBalancerWrapper) ( context.Context,  *StreamDesc,  string,  ...CallOption) (ClientStream, error) {
	 := .ac.getReadyTransport()
	if  == nil {
		return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready")

	}
	return newNonRetryClientStream(, , , , .ac, ...)
}

// Invoke performs a unary RPC.  If the addrConn is not ready, returns
// errSubConnNotReady.
func ( *acBalancerWrapper) ( context.Context,  string,  any,  any,  ...CallOption) error {
	,  := .NewStream(, unaryStreamDesc, , ...)
	if  != nil {
		return 
	}
	if  := .SendMsg();  != nil {
		return 
	}
	return .RecvMsg()
}

type refCountedProducer struct {
	producer balancer.Producer
	refs     int    // number of current refs to the producer
	close    func() // underlying producer's close function
}

func ( *acBalancerWrapper) ( balancer.ProducerBuilder) (balancer.Producer, func()) {
	.producersMu.Lock()
	defer .producersMu.Unlock()

	// Look up existing producer from this builder.
	 := .producers[]
	if  == nil {
		// Not found; create a new one and add it to the producers map.
		,  := .Build()
		 = &refCountedProducer{producer: , close: }
		.producers[] = 
	}
	// Account for this new reference.
	.refs++

	// Return a cleanup function wrapped in a OnceFunc to remove this reference
	// and delete the refCountedProducer from the map if the total reference
	// count goes to zero.
	 := func() {
		.producersMu.Lock()
		// If closeProducers has already closed this producer instance, refs is
		// set to 0, so the check after decrementing will never pass, and the
		// producer will not be double-closed.
		.refs--
		if .refs == 0 {
			defer .close() // Run outside the acbw mutex
			delete(.producers, )
		}
		.producersMu.Unlock()
	}
	return .producer, sync.OnceFunc()
}

func ( *acBalancerWrapper) () {
	.producersMu.Lock()
	defer .producersMu.Unlock()
	for ,  := range .producers {
		.refs = 0
		.close()
		delete(.producers, )
	}
}

// healthProducerRegisterFn is a type alias for the health producer's function
// for registering listeners.
type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()

// healthListenerRegFn returns a function to register a listener for health
// updates. If client side health checks are disabled, the registered listener
// will get a single READY (raw connectivity state) update.
//
// Client side health checking is enabled when all the following
// conditions are satisfied:
// 1. Health checking is not disabled using the dial option.
// 2. The health package is imported.
// 3. The health check config is present in the service config.
func ( *acBalancerWrapper) () func(context.Context, func(balancer.SubConnState)) func() {
	if .ccb.cc.dopts.disableHealthCheck {
		return noOpRegisterHealthListenerFn
	}
	 := internal.RegisterClientHealthCheckListener
	if  == nil {
		// The health package is not imported.
		return noOpRegisterHealthListenerFn
	}
	 := .ac.cc.healthCheckConfig()
	if  == nil {
		return noOpRegisterHealthListenerFn
	}
	return func( context.Context,  func(balancer.SubConnState)) func() {
		return .(healthProducerRegisterFn)(, , .ServiceName, )
	}
}

// RegisterHealthListener accepts a health listener from the LB policy. It sends
// updates to the health listener as long as the SubConn's connectivity state
// doesn't change and a new health listener is not registered. To invalidate
// the currently registered health listener, acbw updates the healthData. If a
// nil listener is registered, the active health listener is dropped.
func ( *acBalancerWrapper) ( func(balancer.SubConnState)) {
	.healthMu.Lock()
	defer .healthMu.Unlock()
	.healthData.closeHealthProducer()
	// listeners should not be registered when the connectivity state
	// isn't Ready. This may happen when the balancer registers a listener
	// after the connectivityState is updated, but before it is notified
	// of the update.
	if .healthData.connectivityState != connectivity.Ready {
		return
	}
	// Replace the health data to stop sending updates to any previously
	// registered health listeners.
	 := newHealthData(connectivity.Ready)
	.healthData = 
	if  == nil {
		return
	}

	 := .healthListenerRegFn()
	.ccb.serializer.TrySchedule(func( context.Context) {
		if .Err() != nil || .ccb.balancer == nil {
			return
		}
		// Don't send updates if a new listener is registered.
		.healthMu.Lock()
		defer .healthMu.Unlock()
		if .healthData !=  {
			return
		}
		// Serialize the health updates from the health producer with
		// other calls into the LB policy.
		 := func( balancer.SubConnState) {
			.ccb.serializer.TrySchedule(func( context.Context) {
				if .Err() != nil || .ccb.balancer == nil {
					return
				}
				.healthMu.Lock()
				defer .healthMu.Unlock()
				if .healthData !=  {
					return
				}
				()
			})
		}

		.closeHealthProducer = (, )
	})
}