/*
 *
 * Copyright 2018 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

// Package channelz defines APIs for enabling channelz service, entry // registration/deletion, and accessing channelz data. It also defines channelz // metric struct formats. // // All APIs in this package are experimental.
package channelz import ( ) const ( defaultMaxTraceEntry int32 = 30 ) var ( db dbWrapper idGen idGenerator // EntryPerPage defines the number of channelz entries to be shown on a web page. EntryPerPage = int64(50) curState int32 maxTraceEntry = defaultMaxTraceEntry ) // TurnOn turns on channelz data collection. func () { if !IsOn() { db.set(newChannelMap()) idGen.reset() atomic.StoreInt32(&curState, 1) } } // IsOn returns whether channelz data collection is on. func () bool { return atomic.CompareAndSwapInt32(&curState, 1, 1) } // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel). // Setting it to 0 will disable channel tracing. func ( int32) { atomic.StoreInt32(&maxTraceEntry, ) } // ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default. func () { atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry) } func () int { := atomic.LoadInt32(&maxTraceEntry) return int() } // dbWarpper wraps around a reference to internal channelz data storage, and // provide synchronized functionality to set and get the reference. type dbWrapper struct { mu sync.RWMutex DB *channelMap } func ( *dbWrapper) ( *channelMap) { .mu.Lock() .DB = .mu.Unlock() } func ( *dbWrapper) () *channelMap { .mu.RLock() defer .mu.RUnlock() return .DB } // NewChannelzStorageForTesting initializes channelz data storage and id // generator for testing purposes. // // Returns a cleanup function to be invoked by the test, which waits for up to // 10s for all channelz state to be reset by the grpc goroutines when those // entities get closed. This cleanup function helps with ensuring that tests // don't mess up each other. func () ( func() error) { db.set(newChannelMap()) idGen.reset() return func() error { := db.get() if == nil { return nil } , := context.WithTimeout(context.Background(), 10*time.Second) defer () := time.NewTicker(10 * time.Millisecond) defer .Stop() for { .mu.RLock() , , , , , := len(.topLevelChannels), len(.servers), len(.channels), len(.subChannels), len(.listenSockets), len(.normalSockets) .mu.RUnlock() if := .Err(); != nil { return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", , , , , , ) } if == 0 && == 0 && == 0 && == 0 && == 0 && == 0 { return nil } <-.C } } } // GetTopChannels returns a slice of top channel's ChannelMetric, along with a // boolean indicating whether there's more top channels to be queried for. // // The arg id specifies that only top channel with id at or above it will be included // in the result. The returned slice is up to a length of the arg maxResults or // EntryPerPage if maxResults is zero, and is sorted in ascending id order. func ( int64, int64) ([]*ChannelMetric, bool) { return db.get().GetTopChannels(, ) } // GetServers returns a slice of server's ServerMetric, along with a // boolean indicating whether there's more servers to be queried for. // // The arg id specifies that only server with id at or above it will be included // in the result. The returned slice is up to a length of the arg maxResults or // EntryPerPage if maxResults is zero, and is sorted in ascending id order. func ( int64, int64) ([]*ServerMetric, bool) { return db.get().GetServers(, ) } // GetServerSockets returns a slice of server's (identified by id) normal socket's // SocketMetric, along with a boolean indicating whether there's more sockets to // be queried for. // // The arg startID specifies that only sockets with id at or above it will be // included in the result. The returned slice is up to a length of the arg maxResults // or EntryPerPage if maxResults is zero, and is sorted in ascending id order. func ( int64, int64, int64) ([]*SocketMetric, bool) { return db.get().GetServerSockets(, , ) } // GetChannel returns the ChannelMetric for the channel (identified by id). func ( int64) *ChannelMetric { return db.get().GetChannel() } // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id). func ( int64) *SubChannelMetric { return db.get().GetSubChannel() } // GetSocket returns the SocketInternalMetric for the socket (identified by id). func ( int64) *SocketMetric { return db.get().GetSocket() } // GetServer returns the ServerMetric for the server (identified by id). func ( int64) *ServerMetric { return db.get().GetServer() } // RegisterChannel registers the given channel c in the channelz database with // ref as its reference name, and adds it to the child list of its parent // (identified by pid). pid == nil means no parent. // // Returns a unique channelz identifier assigned to this channel. // // If channelz is not turned ON, the channelz database is not mutated. func ( Channel, *Identifier, string) *Identifier { := idGen.genID() var int64 := true if != nil { = false = .Int() } if !IsOn() { return newIdentifer(RefChannel, , ) } := &channel{ refName: , c: , subChans: make(map[int64]string), nestedChans: make(map[int64]string), id: , pid: , trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, } db.get().addChannel(, , , ) return newIdentifer(RefChannel, , ) } // RegisterSubChannel registers the given subChannel c in the channelz database // with ref as its reference name, and adds it to the child list of its parent // (identified by pid). // // Returns a unique channelz identifier assigned to this subChannel. // // If channelz is not turned ON, the channelz database is not mutated. func ( Channel, *Identifier, string) (*Identifier, error) { if == nil { return nil, errors.New("a SubChannel's parent id cannot be nil") } := idGen.genID() if !IsOn() { return newIdentifer(RefSubChannel, , ), nil } := &subChannel{ refName: , c: , sockets: make(map[int64]string), id: , pid: .Int(), trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, } db.get().addSubChannel(, , .Int()) return newIdentifer(RefSubChannel, , ), nil } // RegisterServer registers the given server s in channelz database. It returns // the unique channelz tracking id assigned to this server. // // If channelz is not turned ON, the channelz database is not mutated. func ( Server, string) *Identifier { := idGen.genID() if !IsOn() { return newIdentifer(RefServer, , nil) } := &server{ refName: , s: , sockets: make(map[int64]string), listenSockets: make(map[int64]string), id: , } db.get().addServer(, ) return newIdentifer(RefServer, , nil) } // RegisterListenSocket registers the given listen socket s in channelz database // with ref as its reference name, and add it to the child list of its parent // (identified by pid). It returns the unique channelz tracking id assigned to // this listen socket. // // If channelz is not turned ON, the channelz database is not mutated. func ( Socket, *Identifier, string) (*Identifier, error) { if == nil { return nil, errors.New("a ListenSocket's parent id cannot be 0") } := idGen.genID() if !IsOn() { return newIdentifer(RefListenSocket, , ), nil } := &listenSocket{refName: , s: , id: , pid: .Int()} db.get().addListenSocket(, , .Int()) return newIdentifer(RefListenSocket, , ), nil } // RegisterNormalSocket registers the given normal socket s in channelz database // with ref as its reference name, and adds it to the child list of its parent // (identified by pid). It returns the unique channelz tracking id assigned to // this normal socket. // // If channelz is not turned ON, the channelz database is not mutated. func ( Socket, *Identifier, string) (*Identifier, error) { if == nil { return nil, errors.New("a NormalSocket's parent id cannot be 0") } := idGen.genID() if !IsOn() { return newIdentifer(RefNormalSocket, , ), nil } := &normalSocket{refName: , s: , id: , pid: .Int()} db.get().addNormalSocket(, , .Int()) return newIdentifer(RefNormalSocket, , ), nil } // RemoveEntry removes an entry with unique channelz tracking id to be id from // channelz database. // // If channelz is not turned ON, this function is a no-op. func ( *Identifier) { if !IsOn() { return } db.get().removeEntry(.Int()) } // TraceEventDesc is what the caller of AddTraceEvent should provide to describe // the event to be added to the channel trace. // // The Parent field is optional. It is used for an event that will be recorded // in the entity's parent trace. type TraceEventDesc struct { Desc string Severity Severity Parent *TraceEventDesc } // AddTraceEvent adds trace related to the entity with specified id, using the // provided TraceEventDesc. // // If channelz is not turned ON, this will simply log the event descriptions. func ( grpclog.DepthLoggerV2, *Identifier, int, *TraceEventDesc) { // Log only the trace description associated with the bottom most entity. switch .Severity { case CtUnknown, CtInfo: .InfoDepth(+1, withParens()+.Desc) case CtWarning: .WarningDepth(+1, withParens()+.Desc) case CtError: .ErrorDepth(+1, withParens()+.Desc) } if getMaxTraceEntry() == 0 { return } if IsOn() { db.get().traceEvent(.Int(), ) } } // channelMap is the storage data structure for channelz. // Methods of channelMap can be divided in two two categories with respect to locking. // 1. Methods acquire the global lock. // 2. Methods that can only be called when global lock is held. // A second type of method need always to be called inside a first type of method. type channelMap struct { mu sync.RWMutex topLevelChannels map[int64]struct{} servers map[int64]*server channels map[int64]*channel subChannels map[int64]*subChannel listenSockets map[int64]*listenSocket normalSockets map[int64]*normalSocket } func () *channelMap { return &channelMap{ topLevelChannels: make(map[int64]struct{}), channels: make(map[int64]*channel), listenSockets: make(map[int64]*listenSocket), normalSockets: make(map[int64]*normalSocket), servers: make(map[int64]*server), subChannels: make(map[int64]*subChannel), } } func ( *channelMap) ( int64, *server) { .mu.Lock() .cm = .servers[] = .mu.Unlock() } func ( *channelMap) ( int64, *channel, bool, int64) { .mu.Lock() .cm = .trace.cm = .channels[] = if { .topLevelChannels[] = struct{}{} } else { .findEntry().addChild(, ) } .mu.Unlock() } func ( *channelMap) ( int64, *subChannel, int64) { .mu.Lock() .cm = .trace.cm = .subChannels[] = .findEntry().addChild(, ) .mu.Unlock() } func ( *channelMap) ( int64, *listenSocket, int64) { .mu.Lock() .cm = .listenSockets[] = .findEntry().addChild(, ) .mu.Unlock() } func ( *channelMap) ( int64, *normalSocket, int64) { .mu.Lock() .cm = .normalSockets[] = .findEntry().addChild(, ) .mu.Unlock() } // removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to // wait on the deletion of its children and until no other entity's channel trace references it. // It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully // shutting down server will lead to the server being also deleted. func ( *channelMap) ( int64) { .mu.Lock() .findEntry().triggerDelete() .mu.Unlock() } // c.mu must be held by the caller func ( *channelMap) ( int64) { := .findEntry() if , := .(tracedChannel); { .decrTraceRefCount() .deleteSelfIfReady() } } // c.mu must be held by the caller. func ( *channelMap) ( int64) entry { var entry var bool if , = .channels[]; { return } if , = .subChannels[]; { return } if , = .servers[]; { return } if , = .listenSockets[]; { return } if , = .normalSockets[]; { return } return &dummyEntry{idNotFound: } } // c.mu must be held by the caller // deleteEntry simply deletes an entry from the channelMap. Before calling this // method, caller must check this entry is ready to be deleted, i.e removeEntry() // has been called on it, and no children still exist. // Conditionals are ordered by the expected frequency of deletion of each entity // type, in order to optimize performance. func ( *channelMap) ( int64) { var bool if _, = .normalSockets[]; { delete(.normalSockets, ) return } if _, = .subChannels[]; { delete(.subChannels, ) return } if _, = .channels[]; { delete(.channels, ) delete(.topLevelChannels, ) return } if _, = .listenSockets[]; { delete(.listenSockets, ) return } if _, = .servers[]; { delete(.servers, ) return } } func ( *channelMap) ( int64, *TraceEventDesc) { .mu.Lock() := .findEntry() , := .(tracedChannel) if ! { .mu.Unlock() return } .getChannelTrace().append(&TraceEvent{Desc: .Desc, Severity: .Severity, Timestamp: time.Now()}) if .Parent != nil { := .findEntry(.getParentID()) var RefChannelType switch .(type) { case *channel: = RefChannel case *subChannel: = RefSubChannel } if , := .(tracedChannel); { .getChannelTrace().append(&TraceEvent{ Desc: .Parent.Desc, Severity: .Parent.Severity, Timestamp: time.Now(), RefID: , RefName: .getRefName(), RefType: , }) .incrTraceRefCount() } } .mu.Unlock() } type int64Slice []int64 func ( int64Slice) () int { return len() } func ( int64Slice) (, int) { [], [] = [], [] } func ( int64Slice) (, int) bool { return [] < [] } func ( map[int64]string) map[int64]string { := make(map[int64]string) for , := range { [] = } return } func (, int64) int64 { if < { return } return } func ( *channelMap) ( int64, int64) ([]*ChannelMetric, bool) { if <= 0 { = EntryPerPage } .mu.RLock() := int64(len(.topLevelChannels)) := make([]int64, 0, ) := make([]*channel, 0, min(, )) for := range .topLevelChannels { = append(, ) } sort.Sort(int64Slice()) := sort.Search(len(), func( int) bool { return [] >= }) := int64(0) var bool var []*ChannelMetric for , := range [:] { if == { break } if , := .channels[]; { = append(, ) = append(, &ChannelMetric{ NestedChans: copyMap(.nestedChans), SubChans: copyMap(.subChans), }) ++ } if == len([:])-1 { = true break } } .mu.RUnlock() if == 0 { = true } for , := range { [].ChannelData = .c.ChannelzMetric() [].ID = .id [].RefName = .refName [].Trace = .trace.dumpData() } return , } func ( *channelMap) (, int64) ([]*ServerMetric, bool) { if <= 0 { = EntryPerPage } .mu.RLock() := int64(len(.servers)) := make([]int64, 0, ) := make([]*server, 0, min(, )) for := range .servers { = append(, ) } sort.Sort(int64Slice()) := sort.Search(len(), func( int) bool { return [] >= }) := int64(0) var bool var []*ServerMetric for , := range [:] { if == { break } if , := .servers[]; { = append(, ) = append(, &ServerMetric{ ListenSockets: copyMap(.listenSockets), }) ++ } if == len([:])-1 { = true break } } .mu.RUnlock() if == 0 { = true } for , := range { [].ServerData = .s.ChannelzMetric() [].ID = .id [].RefName = .refName } return , } func ( *channelMap) ( int64, int64, int64) ([]*SocketMetric, bool) { if <= 0 { = EntryPerPage } var *server var bool .mu.RLock() if , = .servers[]; ! { // server with id doesn't exist. .mu.RUnlock() return nil, true } := .sockets := int64(len()) := make([]int64, 0, ) := make([]*normalSocket, 0, min(, )) for := range { = append(, ) } sort.Sort(int64Slice()) := sort.Search(len(), func( int) bool { return [] >= }) := int64(0) var bool for , := range [:] { if == { break } if , := .normalSockets[]; { = append(, ) ++ } if == len([:])-1 { = true break } } .mu.RUnlock() if == 0 { = true } := make([]*SocketMetric, 0, len()) for , := range { := &SocketMetric{} .SocketData = .s.ChannelzMetric() .ID = .id .RefName = .refName = append(, ) } return , } func ( *channelMap) ( int64) *ChannelMetric { := &ChannelMetric{} var *channel var bool .mu.RLock() if , = .channels[]; ! { // channel with id doesn't exist. .mu.RUnlock() return nil } .NestedChans = copyMap(.nestedChans) .SubChans = copyMap(.subChans) // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when // holding the lock to prevent potential data race. := .c .mu.RUnlock() .ChannelData = .ChannelzMetric() .ID = .id .RefName = .refName .Trace = .trace.dumpData() return } func ( *channelMap) ( int64) *SubChannelMetric { := &SubChannelMetric{} var *subChannel var bool .mu.RLock() if , = .subChannels[]; ! { // subchannel with id doesn't exist. .mu.RUnlock() return nil } .Sockets = copyMap(.sockets) // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when // holding the lock to prevent potential data race. := .c .mu.RUnlock() .ChannelData = .ChannelzMetric() .ID = .id .RefName = .refName .Trace = .trace.dumpData() return } func ( *channelMap) ( int64) *SocketMetric { := &SocketMetric{} .mu.RLock() if , := .listenSockets[]; { .mu.RUnlock() .SocketData = .s.ChannelzMetric() .ID = .id .RefName = .refName return } if , := .normalSockets[]; { .mu.RUnlock() .SocketData = .s.ChannelzMetric() .ID = .id .RefName = .refName return } .mu.RUnlock() return nil } func ( *channelMap) ( int64) *ServerMetric { := &ServerMetric{} var *server var bool .mu.RLock() if , = .servers[]; ! { .mu.RUnlock() return nil } .ListenSockets = copyMap(.listenSockets) .mu.RUnlock() .ID = .id .RefName = .refName .ServerData = .s.ChannelzMetric() return } type idGenerator struct { id int64 } func ( *idGenerator) () { atomic.StoreInt64(&.id, 0) } func ( *idGenerator) () int64 { return atomic.AddInt64(&.id, 1) }