package grpc
import (
iresolver
_
_
_
_
)
const (
minConnectTimeout = 20 * time.Second
grpclbName = "grpclb"
)
var (
ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
errConnDrain = errors.New("grpc: the connection is drained")
errConnClosing = errors.New("grpc: the connection is closing")
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
)
var (
errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)")
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
)
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)
func ( string, ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), , ...)
}
type defaultConfigSelector struct {
sc *ServiceConfig
}
func ( *defaultConfigSelector) ( iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
return &iresolver.RPCConfig{
Context: .Context,
MethodConfig: getMethodConfig(.sc, .Method),
}, nil
}
func ( context.Context, string, ...DialOption) ( *ClientConn, error) {
:= &ClientConn{
target: ,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
.retryThrottler.Store((*retryThrottler)(nil))
.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
.ctx, .cancel = context.WithCancel(context.Background())
for , := range extraDialOptions {
.apply(&.dopts)
}
for , := range {
.apply(&.dopts)
}
chainUnaryClientInterceptors()
chainStreamClientInterceptors()
defer func() {
if != nil {
.Close()
}
}()
:= .dopts.channelzParentID
.channelzID = channelz.RegisterChannel(&channelzChannel{}, , )
:= &channelz.TraceEventDesc{
Desc: "Channel created",
Severity: channelz.CtInfo,
}
if .dopts.channelzParentID != nil {
.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", .channelzID.Int()),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, .channelzID, 1, )
.csMgr.channelzID = .channelzID
if .dopts.copts.TransportCredentials == nil && .dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if .dopts.copts.TransportCredentials != nil && .dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
if .dopts.copts.CredsBundle != nil && .dopts.copts.CredsBundle.TransportCredentials() == nil {
return nil, errNoTransportCredsInBundle
}
:= .dopts.copts.TransportCredentials
if == nil {
= .dopts.copts.CredsBundle.TransportCredentials()
}
if .Info().SecurityProtocol == "insecure" {
for , := range .dopts.copts.PerRPCCredentials {
if .RequireTransportSecurity() {
return nil, errTransportCredentialsMissing
}
}
}
if .dopts.defaultServiceConfigRawJSON != nil {
:= parseServiceConfig(*.dopts.defaultServiceConfigRawJSON)
if .Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, .Err)
}
.dopts.defaultServiceConfig, _ = .Config.(*ServiceConfig)
}
.mkp = .dopts.copts.KeepaliveParams
if .dopts.copts.UserAgent != "" {
.dopts.copts.UserAgent += " " + grpcUA
} else {
.dopts.copts.UserAgent = grpcUA
}
if .dopts.timeout > 0 {
var context.CancelFunc
, = context.WithTimeout(, .dopts.timeout)
defer ()
}
defer func() {
select {
case <-.Done():
switch {
case .Err() == :
= nil
case == nil || !.dopts.returnLastError:
, = nil, .Err()
default:
, = nil, fmt.Errorf("%v: %v", .Err(), )
}
default:
}
}()
:= false
if .dopts.scChan != nil {
select {
case , := <-.dopts.scChan:
if {
.sc = &
.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&})
= true
}
default:
}
}
if .dopts.bs == nil {
.dopts.bs = backoff.DefaultExponential
}
, := .parseTargetAndFindResolver()
if != nil {
return nil,
}
.authority, = determineAuthority(.parsedTarget.Endpoint(), .target, .dopts)
if != nil {
return nil,
}
channelz.Infof(logger, .channelzID, "Channel authority set to %q", .authority)
if .dopts.scChan != nil && ! {
select {
case , := <-.dopts.scChan:
if {
.sc = &
.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&})
}
case <-.Done():
return nil, .Err()
}
}
if .dopts.scChan != nil {
go .scWatcher()
}
var credentials.TransportCredentials
if := .dopts.copts.TransportCredentials; != nil {
= .Clone()
}
.balancerWrapper = newCCBalancerWrapper(, balancer.BuildOptions{
DialCreds: ,
CredsBundle: .dopts.copts.CredsBundle,
Dialer: .dopts.copts.Dialer,
Authority: .authority,
CustomUserAgent: .dopts.copts.UserAgent,
ChannelzParentID: .channelzID,
Target: .parsedTarget,
})
, := newCCResolverWrapper(, )
if != nil {
return nil, fmt.Errorf("failed to build resolver: %v", )
}
.mu.Lock()
.resolverWrapper =
.mu.Unlock()
if .dopts.block {
for {
.Connect()
:= .GetState()
if == connectivity.Ready {
break
} else if .dopts.copts.FailOnNonTempDialError && == connectivity.TransientFailure {
if = .connectionError(); != nil {
, := .(interface {
() bool
})
if && !.() {
return nil,
}
}
}
if !.WaitForStateChange(, ) {
if = .connectionError(); != nil && .dopts.returnLastError {
return nil,
}
return nil, .Err()
}
}
}
return , nil
}
func ( *ClientConn) {
:= .dopts.chainUnaryInts
if .dopts.unaryInt != nil {
= append([]UnaryClientInterceptor{.dopts.unaryInt}, ...)
}
var UnaryClientInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= func( context.Context, string, , interface{}, *ClientConn, UnaryInvoker, ...CallOption) error {
return [0](, , , , , getChainUnaryInvoker(, 0, ), ...)
}
}
.dopts.unaryInt =
}
func ( []UnaryClientInterceptor, int, UnaryInvoker) UnaryInvoker {
if == len()-1 {
return
}
return func( context.Context, string, , interface{}, *ClientConn, ...CallOption) error {
return [+1](, , , , , (, +1, ), ...)
}
}
func ( *ClientConn) {
:= .dopts.chainStreamInts
if .dopts.streamInt != nil {
= append([]StreamClientInterceptor{.dopts.streamInt}, ...)
}
var StreamClientInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= func( context.Context, *StreamDesc, *ClientConn, string, Streamer, ...CallOption) (ClientStream, error) {
return [0](, , , , getChainStreamer(, 0, ), ...)
}
}
.dopts.streamInt =
}
func ( []StreamClientInterceptor, int, Streamer) Streamer {
if == len()-1 {
return
}
return func( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) (ClientStream, error) {
return [+1](, , , , (, +1, ), ...)
}
}
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
}
func ( *connectivityStateManager) ( connectivity.State) {
.mu.Lock()
defer .mu.Unlock()
if .state == connectivity.Shutdown {
return
}
if .state == {
return
}
.state =
channelz.Infof(logger, .channelzID, "Channel Connectivity change to %v", )
if .notifyChan != nil {
close(.notifyChan)
.notifyChan = nil
}
}
func ( *connectivityStateManager) () connectivity.State {
.mu.Lock()
defer .mu.Unlock()
return .state
}
func ( *connectivityStateManager) () <-chan struct{} {
.mu.Lock()
defer .mu.Unlock()
if .notifyChan == nil {
.notifyChan = make(chan struct{})
}
return .notifyChan
}
type ClientConnInterface interface {
Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}
var _ ClientConnInterface = (*ClientConn)(nil)
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
parsedTarget resolver.Target
authority string
dopts dialOptions
channelzID *channelz.Identifier
balancerWrapper *ccBalancerWrapper
csMgr *connectivityStateManager
blockingpicker *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
czData *channelzData
retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
conns map[*addrConn]struct{}
mkp keepalive.ClientParameters
lceMu sync.Mutex
lastConnectionError error
}
func ( *ClientConn) ( context.Context, connectivity.State) bool {
:= .csMgr.getNotifyChan()
if .csMgr.getState() != {
return true
}
select {
case <-.Done():
return false
case <-:
return true
}
}
func ( *ClientConn) () connectivity.State {
return .csMgr.getState()
}
func ( *ClientConn) () {
.balancerWrapper.exitIdle()
}
func ( *ClientConn) () {
for {
select {
case , := <-.dopts.scChan:
if ! {
return
}
.mu.Lock()
.sc = &
.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&})
.mu.Unlock()
case <-.ctx.Done():
return
}
}
}
func ( *ClientConn) ( context.Context) error {
if .firstResolveEvent.HasFired() {
return nil
}
select {
case <-.firstResolveEvent.Done():
return nil
case <-.Done():
return status.FromContextError(.Err()).Err()
case <-.ctx.Done():
return ErrClientConnClosing
}
}
var emptyServiceConfig *ServiceConfig
func () {
:= parseServiceConfig("{}")
if .Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", .Err))
}
emptyServiceConfig = .Config.(*ServiceConfig)
}
func ( *ClientConn) ( []resolver.Address) {
if .sc != nil {
.applyServiceConfigAndBalancer(.sc, nil, )
return
}
if .dopts.defaultServiceConfig != nil {
.applyServiceConfigAndBalancer(.dopts.defaultServiceConfig, &defaultConfigSelector{.dopts.defaultServiceConfig}, )
} else {
.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, )
}
}
func ( *ClientConn) ( resolver.State, error) error {
defer .firstResolveEvent.Fire()
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return nil
}
if != nil {
.maybeApplyDefaultServiceConfig(nil)
.balancerWrapper.resolverError()
.mu.Unlock()
return balancer.ErrBadResolverState
}
var error
if .dopts.disableServiceConfig {
channelz.Infof(logger, .channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", .ServiceConfig)
.maybeApplyDefaultServiceConfig(.Addresses)
} else if .ServiceConfig == nil {
.maybeApplyDefaultServiceConfig(.Addresses)
} else {
if , := .ServiceConfig.Config.(*ServiceConfig); .ServiceConfig.Err == nil && {
:= iresolver.GetConfigSelector()
if != nil {
if len(.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
channelz.Infof(logger, .channelzID, "method configs in service config will be ignored due to presence of config selector")
}
} else {
= &defaultConfigSelector{}
}
.applyServiceConfigAndBalancer(, , .Addresses)
} else {
= balancer.ErrBadResolverState
if .sc == nil {
.applyFailingLB(.ServiceConfig)
.mu.Unlock()
return
}
}
}
var serviceconfig.LoadBalancingConfig
if .sc != nil && .sc.lbConfig != nil {
= .sc.lbConfig.cfg
}
:= .balancerWrapper
.mu.Unlock()
:= .updateClientConnState(&balancer.ClientConnState{ResolverState: , BalancerConfig: })
if == nil {
=
}
return
}
func ( *ClientConn) ( *serviceconfig.ParseResult) {
var error
if .Err != nil {
= status.Errorf(codes.Unavailable, "error parsing service config: %v", .Err)
} else {
= status.Errorf(codes.Unavailable, "illegal service config type: %T", .Config)
}
.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
.blockingpicker.updatePicker(base.NewErrPicker())
.csMgr.updateState(connectivity.TransientFailure)
}
func ( *ClientConn) ( balancer.SubConn, connectivity.State, error) {
.balancerWrapper.updateSubConnState(, , )
}
func ( *ClientConn) ( []resolver.Address, balancer.NewSubConnOptions) (*addrConn, error) {
:= &addrConn{
state: connectivity.Idle,
cc: ,
addrs: ,
scopts: ,
dopts: .dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
}
.ctx, .cancel = context.WithCancel(.ctx)
.mu.Lock()
defer .mu.Unlock()
if .conns == nil {
return nil, ErrClientConnClosing
}
var error
.channelzID, = channelz.RegisterSubChannel(, .channelzID, "")
if != nil {
return nil,
}
channelz.AddTraceEvent(logger, .channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) created", .channelzID.Int()),
Severity: channelz.CtInfo,
},
})
.conns[] = struct{}{}
return , nil
}
func ( *ClientConn) ( *addrConn, error) {
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return
}
delete(.conns, )
.mu.Unlock()
.tearDown()
}
func ( *ClientConn) () *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{
State: .GetState(),
Target: .target,
CallsStarted: atomic.LoadInt64(&.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastCallStartedTime)),
}
}
func ( *ClientConn) () string {
return .target
}
func ( *ClientConn) () {
atomic.AddInt64(&.czData.callsStarted, 1)
atomic.StoreInt64(&.czData.lastCallStartedTime, time.Now().UnixNano())
}
func ( *ClientConn) () {
atomic.AddInt64(&.czData.callsSucceeded, 1)
}
func ( *ClientConn) () {
atomic.AddInt64(&.czData.callsFailed, 1)
}
func ( *addrConn) () error {
.mu.Lock()
if .state == connectivity.Shutdown {
if logger.V(2) {
logger.Infof("connect called on shutdown addrConn; ignoring.")
}
.mu.Unlock()
return errConnClosing
}
if .state != connectivity.Idle {
if logger.V(2) {
logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", .state)
}
.mu.Unlock()
return nil
}
.updateConnectivityState(connectivity.Connecting, nil)
.mu.Unlock()
.resetTransport()
return nil
}
func (, []resolver.Address) bool {
if len() != len() {
return false
}
for , := range {
if !.Equal([]) {
return false
}
}
return true
}
func ( *addrConn) ( []resolver.Address) bool {
.mu.Lock()
defer .mu.Unlock()
channelz.Infof(logger, .channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", .curAddr, )
if .state == connectivity.Shutdown ||
.state == connectivity.TransientFailure ||
.state == connectivity.Idle {
.addrs =
return true
}
if equalAddresses(.addrs, ) {
return true
}
if .state == connectivity.Connecting {
return false
}
var bool
for , := range {
.ServerName = .cc.getServerName()
if reflect.DeepEqual(.curAddr, ) {
= true
break
}
}
channelz.Infof(logger, .channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", )
if {
.addrs =
}
return
}
func ( *ClientConn) ( resolver.Address) string {
if .dopts.authority != "" {
return .dopts.authority
}
if .ServerName != "" {
return .ServerName
}
return .authority
}
func ( *ServiceConfig, string) MethodConfig {
if == nil {
return MethodConfig{}
}
if , := .Methods[]; {
return
}
:= strings.LastIndex(, "/")
if , := .Methods[[:+1]]; {
return
}
return .Methods[""]
}
func ( *ClientConn) ( string) MethodConfig {
.mu.RLock()
defer .mu.RUnlock()
return getMethodConfig(.sc, )
}
func ( *ClientConn) () *healthCheckConfig {
.mu.RLock()
defer .mu.RUnlock()
if .sc == nil {
return nil
}
return .sc.healthCheckConfig
}
func ( *ClientConn) ( context.Context, bool, string) (transport.ClientTransport, balancer.PickResult, error) {
return .blockingpicker.pick(, , balancer.PickInfo{
Ctx: ,
FullMethodName: ,
})
}
func ( *ClientConn) ( *ServiceConfig, iresolver.ConfigSelector, []resolver.Address) {
if == nil {
return
}
.sc =
if != nil {
.safeConfigSelector.UpdateConfigSelector()
}
if .sc.retryThrottling != nil {
:= &retryThrottler{
tokens: .sc.retryThrottling.MaxTokens,
max: .sc.retryThrottling.MaxTokens,
thresh: .sc.retryThrottling.MaxTokens / 2,
ratio: .sc.retryThrottling.TokenRatio,
}
.retryThrottler.Store()
} else {
.retryThrottler.Store((*retryThrottler)(nil))
}
var string
if .sc != nil && .sc.lbConfig != nil {
= .sc.lbConfig.name
} else {
var bool
for , := range {
if .Type == resolver.GRPCLB {
= true
break
}
}
if {
= grpclbName
} else if .sc != nil && .sc.LB != nil {
= *.sc.LB
} else {
= PickFirstBalancerName
}
}
.balancerWrapper.switchTo()
}
func ( *ClientConn) ( resolver.ResolveNowOptions) {
.mu.RLock()
:= .resolverWrapper
.mu.RUnlock()
if == nil {
return
}
go .resolveNow()
}
func ( *ClientConn) () {
.mu.Lock()
:= .conns
.mu.Unlock()
for := range {
.resetConnectBackoff()
}
}
func ( *ClientConn) () error {
defer .cancel()
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return ErrClientConnClosing
}
:= .conns
.conns = nil
.csMgr.updateState(connectivity.Shutdown)
:= .resolverWrapper
.resolverWrapper = nil
:= .balancerWrapper
.mu.Unlock()
.blockingpicker.close()
if != nil {
.close()
}
if != nil {
.close()
}
for := range {
.tearDown(ErrClientConnClosing)
}
:= &channelz.TraceEventDesc{
Desc: "Channel deleted",
Severity: channelz.CtInfo,
}
if .dopts.channelzParentID != nil {
.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested channel(id:%d) deleted", .channelzID.Int()),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, .channelzID, 0, )
channelz.RemoveEntry(.channelzID)
return nil
}
type addrConn struct {
ctx context.Context
cancel context.CancelFunc
cc *ClientConn
dopts dialOptions
acbw balancer.SubConn
scopts balancer.NewSubConnOptions
transport transport.ClientTransport
mu sync.Mutex
curAddr resolver.Address
addrs []resolver.Address
state connectivity.State
backoffIdx int
resetBackoff chan struct{}
channelzID *channelz.Identifier
czData *channelzData
}
func ( *addrConn) ( connectivity.State, error) {
if .state == {
return
}
.state =
channelz.Infof(logger, .channelzID, "Subchannel Connectivity change to %v", )
.cc.handleSubConnStateChange(.acbw, , )
}
func ( *addrConn) ( transport.GoAwayReason) {
switch {
case transport.GoAwayTooManyPings:
:= 2 * .dopts.copts.KeepaliveParams.Time
.cc.mu.Lock()
if > .cc.mkp.Time {
.cc.mkp.Time =
}
.cc.mu.Unlock()
}
}
func ( *addrConn) () {
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
return
}
:= .addrs
:= .dopts.bs.Backoff(.backoffIdx)
:= minConnectTimeout
if .dopts.minConnectTimeout != nil {
= .dopts.minConnectTimeout()
}
if < {
=
}
:= time.Now().Add()
.updateConnectivityState(connectivity.Connecting, nil)
.mu.Unlock()
if := .tryAllAddrs(, ); != nil {
.cc.resolveNow(resolver.ResolveNowOptions{})
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
return
}
.updateConnectivityState(connectivity.TransientFailure, )
:= .resetBackoff
.mu.Unlock()
:= time.NewTimer()
select {
case <-.C:
.mu.Lock()
.backoffIdx++
.mu.Unlock()
case <-:
.Stop()
case <-.ctx.Done():
.Stop()
return
}
.mu.Lock()
if .state != connectivity.Shutdown {
.updateConnectivityState(connectivity.Idle, )
}
.mu.Unlock()
return
}
.mu.Lock()
.backoffIdx = 0
.mu.Unlock()
}
func ( *addrConn) ( []resolver.Address, time.Time) error {
var error
for , := range {
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
return errConnClosing
}
.cc.mu.RLock()
.dopts.copts.KeepaliveParams = .cc.mkp
.cc.mu.RUnlock()
:= .dopts.copts
if .scopts.CredsBundle != nil {
.CredsBundle = .scopts.CredsBundle
}
.mu.Unlock()
channelz.Infof(logger, .channelzID, "Subchannel picks a new address %q to connect", .Addr)
:= .createTransport(, , )
if == nil {
return nil
}
if == nil {
=
}
.cc.updateConnectionError()
}
return
}
func ( *addrConn) ( resolver.Address, transport.ConnectOptions, time.Time) error {
.ServerName = .cc.getServerName()
, := context.WithCancel(.ctx)
:= func( transport.GoAwayReason) {
.mu.Lock()
defer .mu.Unlock()
.adjustParams()
if .state == connectivity.Shutdown {
return
}
()
if .transport == nil {
return
}
.transport = nil
.cc.resolveNow(resolver.ResolveNowOptions{})
.updateConnectivityState(connectivity.Idle, nil)
}
, := context.WithDeadline(.ctx, )
defer ()
.ChannelzParentID = .channelzID
, := transport.NewClientTransport(, .cc.ctx, , , )
if != nil {
if logger.V(2) {
logger.Infof("Creating new client transport to %q: %v", , )
}
()
channelz.Warningf(logger, .channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", , )
return
}
.mu.Lock()
defer .mu.Unlock()
if .state == connectivity.Shutdown {
go .Close(transport.ErrConnClosing)
return nil
}
if .Err() != nil {
.updateConnectivityState(connectivity.Idle, nil)
return nil
}
.curAddr =
.transport =
.startHealthCheck()
return nil
}
func ( *addrConn) ( context.Context) {
var bool
defer func() {
if ! {
.updateConnectivityState(connectivity.Ready, nil)
}
}()
if .cc.dopts.disableHealthCheck {
return
}
:= .cc.healthCheckConfig()
if == nil {
return
}
if !.scopts.HealthCheckEnabled {
return
}
:= .cc.dopts.healthCheckFunc
if == nil {
channelz.Error(logger, .channelzID, "Health check is requested but health check function is not set.")
return
}
= true
:= .transport
:= func( string) (interface{}, error) {
.mu.Lock()
if .transport != {
.mu.Unlock()
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
}
.mu.Unlock()
return newNonRetryClientStream(, &StreamDesc{ServerStreams: true}, , , )
}
:= func( connectivity.State, error) {
.mu.Lock()
defer .mu.Unlock()
if .transport != {
return
}
.updateConnectivityState(, )
}
go func() {
:= .cc.dopts.healthCheckFunc(, , , .ServiceName)
if != nil {
if status.Code() == codes.Unimplemented {
channelz.Error(logger, .channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
channelz.Errorf(logger, .channelzID, "Health checking failed: %v", )
}
}
}()
}
func ( *addrConn) () {
.mu.Lock()
close(.resetBackoff)
.backoffIdx = 0
.resetBackoff = make(chan struct{})
.mu.Unlock()
}
func ( *addrConn) () transport.ClientTransport {
.mu.Lock()
defer .mu.Unlock()
if .state == connectivity.Ready {
return .transport
}
return nil
}
func ( *addrConn) ( error) {
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
return
}
:= .transport
.transport = nil
.updateConnectivityState(connectivity.Shutdown, nil)
.cancel()
.curAddr = resolver.Address{}
if == errConnDrain && != nil {
.mu.Unlock()
.GracefulClose()
.mu.Lock()
}
channelz.AddTraceEvent(logger, .channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel deleted",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) deleted", .channelzID.Int()),
Severity: channelz.CtInfo,
},
})
channelz.RemoveEntry(.channelzID)
.mu.Unlock()
}
func ( *addrConn) () connectivity.State {
.mu.Lock()
defer .mu.Unlock()
return .state
}
func ( *addrConn) () *channelz.ChannelInternalMetric {
.mu.Lock()
:= .curAddr.Addr
.mu.Unlock()
return &channelz.ChannelInternalMetric{
State: .getState(),
Target: ,
CallsStarted: atomic.LoadInt64(&.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastCallStartedTime)),
}
}
func ( *addrConn) () {
atomic.AddInt64(&.czData.callsStarted, 1)
atomic.StoreInt64(&.czData.lastCallStartedTime, time.Now().UnixNano())
}
func ( *addrConn) () {
atomic.AddInt64(&.czData.callsSucceeded, 1)
}
func ( *addrConn) () {
atomic.AddInt64(&.czData.callsFailed, 1)
}
type retryThrottler struct {
max float64
thresh float64
ratio float64
mu sync.Mutex
tokens float64
}
func ( *retryThrottler) () bool {
if == nil {
return false
}
.mu.Lock()
defer .mu.Unlock()
.tokens--
if .tokens < 0 {
.tokens = 0
}
return .tokens <= .thresh
}
func ( *retryThrottler) () {
if == nil {
return
}
.mu.Lock()
defer .mu.Unlock()
.tokens += .ratio
if .tokens > .max {
.tokens = .max
}
}
type channelzChannel struct {
cc *ClientConn
}
func ( *channelzChannel) () *channelz.ChannelInternalMetric {
return .cc.channelzMetric()
}
var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
func ( *ClientConn) ( string) resolver.Builder {
for , := range .dopts.resolvers {
if == .Scheme() {
return
}
}
return resolver.Get()
}
func ( *ClientConn) ( error) {
.lceMu.Lock()
.lastConnectionError =
.lceMu.Unlock()
}
func ( *ClientConn) () error {
.lceMu.Lock()
defer .lceMu.Unlock()
return .lastConnectionError
}
func ( *ClientConn) () (resolver.Builder, error) {
channelz.Infof(logger, .channelzID, "original dial target is: %q", .target)
var resolver.Builder
, := parseTarget(.target)
if != nil {
channelz.Infof(logger, .channelzID, "dial target %q parse failed: %v", .target, )
} else {
channelz.Infof(logger, .channelzID, "parsed dial target is: %+v", )
= .getResolver(.URL.Scheme)
if != nil {
.parsedTarget =
return , nil
}
}
:= resolver.GetDefaultScheme()
channelz.Infof(logger, .channelzID, "fallback to scheme %q", )
:= + ":///" + .target
, = parseTarget()
if != nil {
channelz.Infof(logger, .channelzID, "dial target %q parse failed: %v", , )
return nil,
}
channelz.Infof(logger, .channelzID, "parsed dial target is: %+v", )
= .getResolver(.URL.Scheme)
if == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", .URL.Scheme)
}
.parsedTarget =
return , nil
}
func ( string) (resolver.Target, error) {
, := url.Parse()
if != nil {
return resolver.Target{},
}
return resolver.Target{
Scheme: .Scheme,
Authority: .Host,
URL: *,
}, nil
}
func (, string, dialOptions) (string, error) {
:= ""
if := .copts.TransportCredentials; != nil && .Info().ServerName != "" {
= .Info().ServerName
}
:= .authority
if ( != "" && != "") && != {
return "", fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", , )
}
switch {
case != "":
return , nil
case != "":
return , nil
case strings.HasPrefix(, "unix:") || strings.HasPrefix(, "unix-abstract:"):
return "localhost", nil
case strings.HasPrefix(, ":"):
return "localhost" + , nil
default:
return , nil
}
}