package grpc
import (
)
type ccResolverWrapper struct {
cc *ClientConn
ignoreServiceConfig bool
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc
resolver resolver.Resolver
mu sync.Mutex
curState resolver.State
closed bool
}
func ( *ClientConn) *ccResolverWrapper {
, := context.WithCancel(.ctx)
return &ccResolverWrapper{
cc: ,
ignoreServiceConfig: .dopts.disableServiceConfig,
serializer: grpcsync.NewCallbackSerializer(),
serializerCancel: ,
}
}
func ( *ccResolverWrapper) () error {
:= make(chan error)
.serializer.TrySchedule(func( context.Context) {
if .Err() != nil {
return
}
:= resolver.BuildOptions{
DisableServiceConfig: .cc.dopts.disableServiceConfig,
DialCreds: .cc.dopts.copts.TransportCredentials,
CredsBundle: .cc.dopts.copts.CredsBundle,
Dialer: .cc.dopts.copts.Dialer,
Authority: .cc.authority,
MetricsRecorder: .cc.metricsRecorderList,
}
var error
if .cc.dopts.copts.Dialer != nil || !.cc.dopts.useProxy {
.resolver, = .cc.resolverBuilder.Build(.cc.parsedTarget, , )
} else {
.resolver, = delegatingresolver.New(.cc.parsedTarget, , , .cc.resolverBuilder, .cc.dopts.enableLocalDNSResolution)
}
<-
})
return <-
}
func ( *ccResolverWrapper) ( resolver.ResolveNowOptions) {
.serializer.TrySchedule(func( context.Context) {
if .Err() != nil || .resolver == nil {
return
}
.resolver.ResolveNow()
})
}
func ( *ccResolverWrapper) () {
channelz.Info(logger, .cc.channelz, "Closing the name resolver")
.mu.Lock()
.closed = true
.mu.Unlock()
.serializer.TrySchedule(func(context.Context) {
if .resolver == nil {
return
}
.resolver.Close()
.resolver = nil
})
.serializerCancel()
}
func ( *ccResolverWrapper) ( resolver.State) error {
.cc.mu.Lock()
.mu.Lock()
if .closed {
.mu.Unlock()
.cc.mu.Unlock()
return nil
}
if .Endpoints == nil {
.Endpoints = addressesToEndpoints(.Addresses)
}
.addChannelzTraceEvent()
.curState =
.mu.Unlock()
return .cc.updateResolverStateAndUnlock(, nil)
}
func ( *ccResolverWrapper) ( error) {
.cc.mu.Lock()
.mu.Lock()
if .closed {
.mu.Unlock()
.cc.mu.Unlock()
return
}
.mu.Unlock()
channelz.Warningf(logger, .cc.channelz, "ccResolverWrapper: reporting error to cc: %v", )
.cc.updateResolverStateAndUnlock(resolver.State{}, )
}
func ( *ccResolverWrapper) ( []resolver.Address) {
.cc.mu.Lock()
.mu.Lock()
if .closed {
.mu.Unlock()
.cc.mu.Unlock()
return
}
:= resolver.State{
Addresses: ,
ServiceConfig: .curState.ServiceConfig,
Endpoints: addressesToEndpoints(),
}
.addChannelzTraceEvent()
.curState =
.mu.Unlock()
.cc.updateResolverStateAndUnlock(, nil)
}
func ( *ccResolverWrapper) ( string) *serviceconfig.ParseResult {
return parseServiceConfig(, .cc.dopts.maxCallAttempts)
}
func ( *ccResolverWrapper) ( resolver.State) {
if !logger.V(0) && !channelz.IsOn() {
return
}
var []string
var , *ServiceConfig
var , bool
if .curState.ServiceConfig != nil {
, = .curState.ServiceConfig.Config.(*ServiceConfig)
}
if .ServiceConfig != nil {
, = .ServiceConfig.Config.(*ServiceConfig)
}
if != || ( && && .rawJSONString != .rawJSONString) {
= append(, "service config updated")
}
if len(.curState.Addresses) > 0 && len(.Addresses) == 0 {
= append(, "resolver returned an empty address list")
} else if len(.curState.Addresses) == 0 && len(.Addresses) > 0 {
= append(, "resolver returned new addresses")
}
channelz.Infof(logger, .cc.channelz, "Resolver state updated: %s (%v)", pretty.ToJSON(), strings.Join(, "; "))
}
func ( []resolver.Address) []resolver.Endpoint {
:= make([]resolver.Endpoint, 0, len())
for , := range {
:= resolver.Endpoint{Addresses: []resolver.Address{}, Attributes: .BalancerAttributes}
.Addresses[0].BalancerAttributes = nil
= append(, )
}
return
}