package grpc
import (
iresolver
_
_
_
_
)
const (
minConnectTimeout = 20 * time.Second
)
var (
ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
errConnDrain = errors.New("grpc: the connection is drained")
errConnClosing = errors.New("grpc: the connection is closing")
errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
PickFirstBalancerName = pickfirst.Name
)
var (
errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)")
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
)
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)
type defaultConfigSelector struct {
sc *ServiceConfig
}
func ( *defaultConfigSelector) ( iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
return &iresolver.RPCConfig{
Context: .Context,
MethodConfig: getMethodConfig(.sc, .Method),
}, nil
}
func ( string, ...DialOption) ( *ClientConn, error) {
:= &ClientConn{
target: ,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
}
.retryThrottler.Store((*retryThrottler)(nil))
.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
.ctx, .cancel = context.WithCancel(context.Background())
:= false
for , := range {
if , := .(*disableGlobalDialOptions); {
= true
break
}
}
if ! {
for , := range globalDialOptions {
.apply(&.dopts)
}
}
for , := range {
.apply(&.dopts)
}
if := .initParsedTargetAndResolverBuilder(); != nil {
return nil,
}
for , := range globalPerTargetDialOptions {
.DialOptionForTarget(.parsedTarget.URL).apply(&.dopts)
}
chainUnaryClientInterceptors()
chainStreamClientInterceptors()
if := .validateTransportCredentials(); != nil {
return nil,
}
if .dopts.defaultServiceConfigRawJSON != nil {
:= parseServiceConfig(*.dopts.defaultServiceConfigRawJSON, .dopts.maxCallAttempts)
if .Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, .Err)
}
.dopts.defaultServiceConfig, _ = .Config.(*ServiceConfig)
}
.keepaliveParams = .dopts.copts.KeepaliveParams
if = .initAuthority(); != nil {
return nil,
}
.channelzRegistration()
channelz.Infof(logger, .channelz, "parsed dial target is: %#v", .parsedTarget)
channelz.Infof(logger, .channelz, "Channel authority set to %q", .authority)
.csMgr = newConnectivityStateManager(.ctx, .channelz)
.pickerWrapper = newPickerWrapper()
.metricsRecorderList = stats.NewMetricsRecorderList(.dopts.copts.StatsHandlers)
.initIdleStateLocked()
.idlenessMgr = idle.NewManager((*idler)(), .dopts.idleTimeout)
return , nil
}
func ( string, ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), , ...)
}
func ( context.Context, string, ...DialOption) ( *ClientConn, error) {
= append([]DialOption{withDefaultScheme("passthrough"), WithLocalDNSResolution()}, ...)
, := NewClient(, ...)
if != nil {
return nil,
}
defer func() {
if != nil {
.Close()
}
}()
if := .idlenessMgr.ExitIdleMode(); != nil {
return nil,
}
if !.dopts.block {
return , nil
}
if .dopts.timeout > 0 {
var context.CancelFunc
, = context.WithTimeout(, .dopts.timeout)
defer ()
}
defer func() {
select {
case <-.Done():
switch {
case .Err() == :
= nil
case == nil || !.dopts.returnLastError:
, = nil, .Err()
default:
, = nil, fmt.Errorf("%v: %v", .Err(), )
}
default:
}
}()
for {
:= .GetState()
if == connectivity.Idle {
.Connect()
}
if == connectivity.Ready {
return , nil
} else if .dopts.copts.FailOnNonTempDialError && == connectivity.TransientFailure {
if = .connectionError(); != nil {
, := .(interface {
() bool
})
if && !.() {
return nil,
}
}
}
if !.WaitForStateChange(, ) {
if = .connectionError(); != nil && .dopts.returnLastError {
return nil,
}
return nil, .Err()
}
}
}
func ( *ClientConn) ( string) {
:= &channelz.TraceEvent{
Desc: fmt.Sprintf("Channel %s", ),
Severity: channelz.CtInfo,
}
if .dopts.channelzParent != nil {
.Parent = &channelz.TraceEvent{
Desc: fmt.Sprintf("Nested channel(id:%d) %s", .channelz.ID, ),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, .channelz, 0, )
}
type idler ClientConn
func ( *idler) () {
(*ClientConn)().enterIdleMode()
}
func ( *idler) () error {
return (*ClientConn)().exitIdleMode()
}
func ( *ClientConn) () ( error) {
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return errConnClosing
}
.mu.Unlock()
if := .resolverWrapper.start(); != nil {
return
}
.addTraceEvent("exiting idle mode")
return nil
}
func ( *ClientConn) () {
.resolverWrapper = newCCResolverWrapper()
.balancerWrapper = newCCBalancerWrapper()
.firstResolveEvent = grpcsync.NewEvent()
.conns = make(map[*addrConn]struct{})
}
func ( *ClientConn) () {
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return
}
:= .conns
:= .resolverWrapper
.close()
.pickerWrapper.reset()
:= .balancerWrapper
.close()
.csMgr.updateState(connectivity.Idle)
.addTraceEvent("entering idle mode")
.initIdleStateLocked()
.mu.Unlock()
<-.serializer.Done()
<-.serializer.Done()
for := range {
.tearDown(errConnIdling)
}
}
func ( *ClientConn) () error {
if .dopts.copts.TransportCredentials == nil && .dopts.copts.CredsBundle == nil {
return errNoTransportSecurity
}
if .dopts.copts.TransportCredentials != nil && .dopts.copts.CredsBundle != nil {
return errTransportCredsAndBundle
}
if .dopts.copts.CredsBundle != nil && .dopts.copts.CredsBundle.TransportCredentials() == nil {
return errNoTransportCredsInBundle
}
:= .dopts.copts.TransportCredentials
if == nil {
= .dopts.copts.CredsBundle.TransportCredentials()
}
if .Info().SecurityProtocol == "insecure" {
for , := range .dopts.copts.PerRPCCredentials {
if .RequireTransportSecurity() {
return errTransportCredentialsMissing
}
}
}
return nil
}
func ( *ClientConn) ( string) {
, := .dopts.channelzParent.(*channelz.Channel)
.channelz = channelz.RegisterChannel(, )
.addTraceEvent("created")
}
func ( *ClientConn) {
:= .dopts.chainUnaryInts
if .dopts.unaryInt != nil {
= append([]UnaryClientInterceptor{.dopts.unaryInt}, ...)
}
var UnaryClientInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= func( context.Context, string, , any, *ClientConn, UnaryInvoker, ...CallOption) error {
return [0](, , , , , getChainUnaryInvoker(, 0, ), ...)
}
}
.dopts.unaryInt =
}
func ( []UnaryClientInterceptor, int, UnaryInvoker) UnaryInvoker {
if == len()-1 {
return
}
return func( context.Context, string, , any, *ClientConn, ...CallOption) error {
return [+1](, , , , , (, +1, ), ...)
}
}
func ( *ClientConn) {
:= .dopts.chainStreamInts
if .dopts.streamInt != nil {
= append([]StreamClientInterceptor{.dopts.streamInt}, ...)
}
var StreamClientInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= func( context.Context, *StreamDesc, *ClientConn, string, Streamer, ...CallOption) (ClientStream, error) {
return [0](, , , , getChainStreamer(, 0, ), ...)
}
}
.dopts.streamInt =
}
func ( []StreamClientInterceptor, int, Streamer) Streamer {
if == len()-1 {
return
}
return func( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) (ClientStream, error) {
return [+1](, , , , (, +1, ), ...)
}
}
func ( context.Context, *channelz.Channel) *connectivityStateManager {
return &connectivityStateManager{
channelz: ,
pubSub: grpcsync.NewPubSub(),
}
}
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelz *channelz.Channel
pubSub *grpcsync.PubSub
}
func ( *connectivityStateManager) ( connectivity.State) {
.mu.Lock()
defer .mu.Unlock()
if .state == connectivity.Shutdown {
return
}
if .state == {
return
}
.state =
.channelz.ChannelMetrics.State.Store(&)
.pubSub.Publish()
channelz.Infof(logger, .channelz, "Channel Connectivity change to %v", )
if .notifyChan != nil {
close(.notifyChan)
.notifyChan = nil
}
}
func ( *connectivityStateManager) () connectivity.State {
.mu.Lock()
defer .mu.Unlock()
return .state
}
func ( *connectivityStateManager) () <-chan struct{} {
.mu.Lock()
defer .mu.Unlock()
if .notifyChan == nil {
.notifyChan = make(chan struct{})
}
return .notifyChan
}
type ClientConnInterface interface {
Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}
var _ ClientConnInterface = (*ClientConn)(nil)
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
parsedTarget resolver.Target
authority string
dopts dialOptions
channelz *channelz.Channel
resolverBuilder resolver.Builder
idlenessMgr *idle.Manager
metricsRecorderList *stats.MetricsRecorderList
csMgr *connectivityStateManager
pickerWrapper *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
retryThrottler atomic.Value
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
balancerWrapper *ccBalancerWrapper
sc *ServiceConfig
conns map[*addrConn]struct{}
keepaliveParams keepalive.ClientParameters
firstResolveEvent *grpcsync.Event
lceMu sync.Mutex
lastConnectionError error
}
func ( *ClientConn) ( context.Context, connectivity.State) bool {
:= .csMgr.getNotifyChan()
if .csMgr.getState() != {
return true
}
select {
case <-.Done():
return false
case <-:
return true
}
}
func ( *ClientConn) () connectivity.State {
return .csMgr.getState()
}
func ( *ClientConn) () {
if := .idlenessMgr.ExitIdleMode(); != nil {
.addTraceEvent(.Error())
return
}
.mu.Lock()
.balancerWrapper.exitIdle()
.mu.Unlock()
}
func ( *ClientConn) ( context.Context) (bool, error) {
if .firstResolveEvent.HasFired() {
return false, nil
}
internal.NewStreamWaitingForResolver()
select {
case <-.firstResolveEvent.Done():
return true, nil
case <-.Done():
return false, status.FromContextError(.Err()).Err()
case <-.ctx.Done():
return false, ErrClientConnClosing
}
}
var emptyServiceConfig *ServiceConfig
func () {
:= parseServiceConfig("{}", defaultMaxCallAttempts)
if .Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", .Err))
}
emptyServiceConfig = .Config.(*ServiceConfig)
internal.SubscribeToConnectivityStateChanges = func( *ClientConn, grpcsync.Subscriber) func() {
return .csMgr.pubSub.Subscribe()
}
internal.EnterIdleModeForTesting = func( *ClientConn) {
.idlenessMgr.EnterIdleModeForTesting()
}
internal.ExitIdleModeForTesting = func( *ClientConn) error {
return .idlenessMgr.ExitIdleMode()
}
}
func ( *ClientConn) () {
if .sc != nil {
.applyServiceConfigAndBalancer(.sc, nil)
return
}
if .dopts.defaultServiceConfig != nil {
.applyServiceConfigAndBalancer(.dopts.defaultServiceConfig, &defaultConfigSelector{.dopts.defaultServiceConfig})
} else {
.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
}
}
func ( *ClientConn) ( resolver.State, error) error {
defer .firstResolveEvent.Fire()
if .conns == nil {
.mu.Unlock()
return nil
}
if != nil {
.maybeApplyDefaultServiceConfig()
.balancerWrapper.resolverError()
.mu.Unlock()
return balancer.ErrBadResolverState
}
var error
if .dopts.disableServiceConfig {
channelz.Infof(logger, .channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", .ServiceConfig)
.maybeApplyDefaultServiceConfig()
} else if .ServiceConfig == nil {
.maybeApplyDefaultServiceConfig()
} else {
if , := .ServiceConfig.Config.(*ServiceConfig); .ServiceConfig.Err == nil && {
:= iresolver.GetConfigSelector()
if != nil {
if len(.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
channelz.Infof(logger, .channelz, "method configs in service config will be ignored due to presence of config selector")
}
} else {
= &defaultConfigSelector{}
}
.applyServiceConfigAndBalancer(, )
} else {
= balancer.ErrBadResolverState
if .sc == nil {
.applyFailingLBLocked(.ServiceConfig)
.mu.Unlock()
return
}
}
}
:= .sc.lbConfig
:= .balancerWrapper
.mu.Unlock()
:= .updateClientConnState(&balancer.ClientConnState{ResolverState: , BalancerConfig: })
if == nil {
=
}
return
}
func ( *ClientConn) ( *serviceconfig.ParseResult) {
var error
if .Err != nil {
= status.Errorf(codes.Unavailable, "error parsing service config: %v", .Err)
} else {
= status.Errorf(codes.Unavailable, "illegal service config type: %T", .Config)
}
.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
.pickerWrapper.updatePicker(base.NewErrPicker())
.csMgr.updateState(connectivity.TransientFailure)
}
func ( []resolver.Address) []resolver.Address {
:= make([]resolver.Address, len())
copy(, )
return
}
func ( *ClientConn) ( []resolver.Address, balancer.NewSubConnOptions) (*addrConn, error) {
if .conns == nil {
return nil, ErrClientConnClosing
}
:= &addrConn{
state: connectivity.Idle,
cc: ,
addrs: copyAddresses(),
scopts: ,
dopts: .dopts,
channelz: channelz.RegisterSubChannel(.channelz, ""),
resetBackoff: make(chan struct{}),
}
.ctx, .cancel = context.WithCancel(.ctx)
.channelz.ChannelMetrics.Target.Store(&[0].Addr)
channelz.AddTraceEvent(logger, .channelz, 0, &channelz.TraceEvent{
Desc: "Subchannel created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEvent{
Desc: fmt.Sprintf("Subchannel(id:%d) created", .channelz.ID),
Severity: channelz.CtInfo,
},
})
.conns[] = struct{}{}
return , nil
}
func ( *ClientConn) ( *addrConn, error) {
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return
}
delete(.conns, )
.mu.Unlock()
.tearDown()
}
func ( *ClientConn) () string {
return .target
}
func ( *ClientConn) () string {
return .parsedTarget.String()
}
func ( *ClientConn) () {
.channelz.ChannelMetrics.CallsStarted.Add(1)
.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
}
func ( *ClientConn) () {
.channelz.ChannelMetrics.CallsSucceeded.Add(1)
}
func ( *ClientConn) () {
.channelz.ChannelMetrics.CallsFailed.Add(1)
}
func ( *addrConn) () error {
.mu.Lock()
if .state == connectivity.Shutdown {
if logger.V(2) {
logger.Infof("connect called on shutdown addrConn; ignoring.")
}
.mu.Unlock()
return errConnClosing
}
if .state != connectivity.Idle {
if logger.V(2) {
logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", .state)
}
.mu.Unlock()
return nil
}
.resetTransportAndUnlock()
return nil
}
func (, *resolver.Address) bool {
return .Addr == .Addr && .ServerName == .ServerName &&
.Attributes.Equal(.Attributes) &&
.Metadata == .Metadata
}
func (, []resolver.Address) bool {
return slices.EqualFunc(, , func(, resolver.Address) bool { return equalAddressIgnoringBalAttributes(&, &) })
}
func ( *addrConn) ( []resolver.Address) {
= copyAddresses()
:= len()
if > 5 {
= 5
}
channelz.Infof(logger, .channelz, "addrConn: updateAddrs addrs (%d of %d): %v", , len(), [:])
.mu.Lock()
if equalAddressesIgnoringBalAttributes(.addrs, ) {
.mu.Unlock()
return
}
.addrs =
if .state == connectivity.Shutdown ||
.state == connectivity.TransientFailure ||
.state == connectivity.Idle {
.mu.Unlock()
return
}
if .state == connectivity.Ready {
for , := range {
.ServerName = .cc.getServerName()
if equalAddressIgnoringBalAttributes(&, &.curAddr) {
.mu.Unlock()
return
}
}
}
.cancel()
.ctx, .cancel = context.WithCancel(.cc.ctx)
if .transport != nil {
defer .transport.GracefulClose()
.transport = nil
}
if len() == 0 {
.updateConnectivityState(connectivity.Idle, nil)
}
go .resetTransportAndUnlock()
}
func ( *ClientConn) ( resolver.Address) string {
if .dopts.authority != "" {
return .dopts.authority
}
if .ServerName != "" {
return .ServerName
}
return .authority
}
func ( *ServiceConfig, string) MethodConfig {
if == nil {
return MethodConfig{}
}
if , := .Methods[]; {
return
}
:= strings.LastIndex(, "/")
if , := .Methods[[:+1]]; {
return
}
return .Methods[""]
}
func ( *ClientConn) ( string) MethodConfig {
.mu.RLock()
defer .mu.RUnlock()
return getMethodConfig(.sc, )
}
func ( *ClientConn) () *healthCheckConfig {
.mu.RLock()
defer .mu.RUnlock()
if .sc == nil {
return nil
}
return .sc.healthCheckConfig
}
func ( *ClientConn) ( *ServiceConfig, iresolver.ConfigSelector) {
if == nil {
return
}
.sc =
if != nil {
.safeConfigSelector.UpdateConfigSelector()
}
if .sc.retryThrottling != nil {
:= &retryThrottler{
tokens: .sc.retryThrottling.MaxTokens,
max: .sc.retryThrottling.MaxTokens,
thresh: .sc.retryThrottling.MaxTokens / 2,
ratio: .sc.retryThrottling.TokenRatio,
}
.retryThrottler.Store()
} else {
.retryThrottler.Store((*retryThrottler)(nil))
}
}
func ( *ClientConn) ( resolver.ResolveNowOptions) {
.mu.RLock()
.resolverWrapper.resolveNow()
.mu.RUnlock()
}
func ( *ClientConn) ( resolver.ResolveNowOptions) {
.resolverWrapper.resolveNow()
}
func ( *ClientConn) () {
.mu.Lock()
:= .conns
.mu.Unlock()
for := range {
.resetConnectBackoff()
}
}
func ( *ClientConn) () error {
defer func() {
.cancel()
<-.csMgr.pubSub.Done()
}()
.idlenessMgr.Close()
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return ErrClientConnClosing
}
:= .conns
.conns = nil
.csMgr.updateState(connectivity.Shutdown)
.mu.Unlock()
.resolverWrapper.close()
.pickerWrapper.close()
.balancerWrapper.close()
<-.resolverWrapper.serializer.Done()
<-.balancerWrapper.serializer.Done()
var sync.WaitGroup
for := range {
.Add(1)
go func( *addrConn) {
defer .Done()
.tearDown(ErrClientConnClosing)
}()
}
.Wait()
.addTraceEvent("deleted")
channelz.RemoveEntry(.channelz.ID)
return nil
}
type addrConn struct {
ctx context.Context
cancel context.CancelFunc
cc *ClientConn
dopts dialOptions
acbw *acBalancerWrapper
scopts balancer.NewSubConnOptions
transport transport.ClientTransport
mu sync.Mutex
curAddr resolver.Address
addrs []resolver.Address
state connectivity.State
backoffIdx int
resetBackoff chan struct{}
channelz *channelz.SubChannel
}
func ( *addrConn) ( connectivity.State, error) {
if .state == {
return
}
.state =
.channelz.ChannelMetrics.State.Store(&)
if == nil {
channelz.Infof(logger, .channelz, "Subchannel Connectivity change to %v", )
} else {
channelz.Infof(logger, .channelz, "Subchannel Connectivity change to %v, last error: %s", , )
}
.acbw.updateState(, .curAddr, )
}
func ( *addrConn) ( transport.GoAwayReason) {
if == transport.GoAwayTooManyPings {
:= 2 * .dopts.copts.KeepaliveParams.Time
.cc.mu.Lock()
if > .cc.keepaliveParams.Time {
.cc.keepaliveParams.Time =
}
.cc.mu.Unlock()
}
}
func ( *addrConn) () {
:= .ctx
if .Err() != nil {
.mu.Unlock()
return
}
:= .addrs
:= .dopts.bs.Backoff(.backoffIdx)
:= minConnectTimeout
if .dopts.minConnectTimeout != nil {
= .dopts.minConnectTimeout()
}
if < {
=
}
:= time.Now().Add()
.updateConnectivityState(connectivity.Connecting, nil)
.mu.Unlock()
if := .tryAllAddrs(, , ); != nil {
.cc.resolveNow(resolver.ResolveNowOptions{})
.mu.Lock()
if .Err() != nil {
.mu.Unlock()
return
}
.updateConnectivityState(connectivity.TransientFailure, )
:= .resetBackoff
.mu.Unlock()
:= time.NewTimer()
select {
case <-.C:
.mu.Lock()
.backoffIdx++
.mu.Unlock()
case <-:
.Stop()
case <-.Done():
.Stop()
return
}
.mu.Lock()
if .Err() == nil {
.updateConnectivityState(connectivity.Idle, )
}
.mu.Unlock()
return
}
.mu.Lock()
.backoffIdx = 0
.mu.Unlock()
}
func ( *addrConn) ( context.Context, []resolver.Address, time.Time) error {
var error
for , := range {
.channelz.ChannelMetrics.Target.Store(&.Addr)
if .Err() != nil {
return errConnClosing
}
.mu.Lock()
.cc.mu.RLock()
.dopts.copts.KeepaliveParams = .cc.keepaliveParams
.cc.mu.RUnlock()
:= .dopts.copts
if .scopts.CredsBundle != nil {
.CredsBundle = .scopts.CredsBundle
}
.mu.Unlock()
channelz.Infof(logger, .channelz, "Subchannel picks a new address %q to connect", .Addr)
:= .createTransport(, , , )
if == nil {
return nil
}
if == nil {
=
}
.cc.updateConnectionError()
}
return
}
func ( *addrConn) ( context.Context, resolver.Address, transport.ConnectOptions, time.Time) error {
.ServerName = .cc.getServerName()
, := context.WithCancel()
:= func( transport.GoAwayReason) {
.mu.Lock()
defer .mu.Unlock()
.adjustParams()
if .Err() != nil {
return
}
()
if .transport == nil {
return
}
.transport = nil
.cc.resolveNow(resolver.ResolveNowOptions{})
.updateConnectivityState(connectivity.Idle, nil)
}
, := context.WithDeadline(, )
defer ()
.ChannelzParent = .channelz
, := transport.NewHTTP2Client(, .cc.ctx, , , )
if != nil {
if logger.V(2) {
logger.Infof("Creating new client transport to %q: %v", , )
}
()
channelz.Warningf(logger, .channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", , )
return
}
.mu.Lock()
defer .mu.Unlock()
if .Err() != nil {
go .Close(transport.ErrConnClosing)
return nil
}
if .Err() != nil {
.updateConnectivityState(connectivity.Idle, nil)
return nil
}
.curAddr =
.transport =
.startHealthCheck()
return nil
}
func ( *addrConn) ( context.Context) {
var bool
defer func() {
if ! {
.updateConnectivityState(connectivity.Ready, nil)
}
}()
if .cc.dopts.disableHealthCheck {
return
}
:= .cc.healthCheckConfig()
if == nil {
return
}
if !.scopts.HealthCheckEnabled {
return
}
:= internal.HealthCheckFunc
if == nil {
channelz.Error(logger, .channelz, "Health check is requested but health check function is not set.")
return
}
= true
:= .transport
:= func( string) (any, error) {
.mu.Lock()
if .transport != {
.mu.Unlock()
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
}
.mu.Unlock()
return newNonRetryClientStream(, &StreamDesc{ServerStreams: true}, , , )
}
:= func( connectivity.State, error) {
.mu.Lock()
defer .mu.Unlock()
if .transport != {
return
}
.updateConnectivityState(, )
}
go func() {
:= (, , , .ServiceName)
if != nil {
if status.Code() == codes.Unimplemented {
channelz.Error(logger, .channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
channelz.Errorf(logger, .channelz, "Health checking failed: %v", )
}
}
}()
}
func ( *addrConn) () {
.mu.Lock()
close(.resetBackoff)
.backoffIdx = 0
.resetBackoff = make(chan struct{})
.mu.Unlock()
}
func ( *addrConn) () transport.ClientTransport {
.mu.Lock()
defer .mu.Unlock()
if .state == connectivity.Ready {
return .transport
}
return nil
}
func ( *addrConn) ( error) {
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
return
}
:= .transport
.transport = nil
.updateConnectivityState(connectivity.Shutdown, nil)
.cancel()
.curAddr = resolver.Address{}
channelz.AddTraceEvent(logger, .channelz, 0, &channelz.TraceEvent{
Desc: "Subchannel deleted",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEvent{
Desc: fmt.Sprintf("Subchannel(id:%d) deleted", .channelz.ID),
Severity: channelz.CtInfo,
},
})
channelz.RemoveEntry(.channelz.ID)
.mu.Unlock()
if != nil {
if == errConnDrain {
.GracefulClose()
} else {
.Close()
}
}
}
type retryThrottler struct {
max float64
thresh float64
ratio float64
mu sync.Mutex
tokens float64
}
func ( *retryThrottler) () bool {
if == nil {
return false
}
.mu.Lock()
defer .mu.Unlock()
.tokens--
if .tokens < 0 {
.tokens = 0
}
return .tokens <= .thresh
}
func ( *retryThrottler) () {
if == nil {
return
}
.mu.Lock()
defer .mu.Unlock()
.tokens += .ratio
if .tokens > .max {
.tokens = .max
}
}
func ( *addrConn) () {
.channelz.ChannelMetrics.CallsStarted.Add(1)
.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
}
func ( *addrConn) () {
.channelz.ChannelMetrics.CallsSucceeded.Add(1)
}
func ( *addrConn) () {
.channelz.ChannelMetrics.CallsFailed.Add(1)
}
var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
func ( *ClientConn) ( string) resolver.Builder {
for , := range .dopts.resolvers {
if == .Scheme() {
return
}
}
return resolver.Get()
}
func ( *ClientConn) ( error) {
.lceMu.Lock()
.lastConnectionError =
.lceMu.Unlock()
}
func ( *ClientConn) () error {
.lceMu.Lock()
defer .lceMu.Unlock()
return .lastConnectionError
}
func ( *ClientConn) () error {
logger.Infof("original dial target is: %q", .target)
var resolver.Builder
, := parseTarget(.target)
if == nil {
= .getResolver(.URL.Scheme)
if != nil {
.parsedTarget =
.resolverBuilder =
return nil
}
}
:= .dopts.defaultScheme
if internal.UserSetDefaultScheme {
= resolver.GetDefaultScheme()
}
:= + ":///" + .target
, = parseTarget()
if != nil {
return
}
= .getResolver(.URL.Scheme)
if == nil {
return fmt.Errorf("could not get resolver for default scheme: %q", .URL.Scheme)
}
.parsedTarget =
.resolverBuilder =
return nil
}
func ( string) (resolver.Target, error) {
, := url.Parse()
if != nil {
return resolver.Target{},
}
return resolver.Target{URL: *}, nil
}
func ( string) string {
const = "0123456789ABCDEF"
:= func( byte) bool {
if 'a' <= && <= 'z' || 'A' <= && <= 'Z' || '0' <= && <= '9' {
return false
}
switch {
case '-', '_', '.', '~':
return false
case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=':
return false
case ':', '[', ']', '@':
return false
}
return true
}
:= 0
for := 0; < len(); ++ {
:= []
if () {
++
}
}
if == 0 {
return
}
:= len() + 2*
:= make([]byte, )
:= 0
for := 0; < len(); ++ {
switch := []; {
case ():
[] = '%'
[+1] = [>>4]
[+2] = [&15]
+= 3
default:
[] = []
++
}
}
return string()
}
func ( *ClientConn) () error {
:= .dopts
:= ""
if := .copts.TransportCredentials; != nil && .Info().ServerName != "" {
= .Info().ServerName
}
:= .authority
if ( != "" && != "") && != {
return fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", , )
}
:= .parsedTarget.Endpoint()
if != "" {
.authority =
} else if != "" {
.authority =
} else if , := .resolverBuilder.(resolver.AuthorityOverrider); {
.authority = .OverrideAuthority(.parsedTarget)
} else if strings.HasPrefix(, ":") {
.authority = "localhost" + encodeAuthority()
} else {
.authority = encodeAuthority()
}
return nil
}