/*
 *
 * Copyright 2017 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

// Package health provides a service that exposes server's health and it must be // imported to enable support for client-side health checks.
package health import ( healthgrpc healthpb ) const ( // maxAllowedServices defines the maximum number of resources a List // operation can return. An error is returned if the number of services // exceeds this limit. maxAllowedServices = 100 ) // Server implements `service Health`. type Server struct { healthgrpc.UnimplementedHealthServer mu sync.RWMutex // If shutdown is true, it's expected all serving status is NOT_SERVING, and // will stay in NOT_SERVING. shutdown bool // statusMap stores the serving status of the services this Server monitors. statusMap map[string]healthpb.HealthCheckResponse_ServingStatus updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus } // NewServer returns a new Server. func () *Server { return &Server{ statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING}, updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus), } } // Check implements `service Health`. func ( *Server) ( context.Context, *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { .mu.RLock() defer .mu.RUnlock() if , := .statusMap[.Service]; { return &healthpb.HealthCheckResponse{ Status: , }, nil } return nil, status.Error(codes.NotFound, "unknown service") } // List implements `service Health`. func ( *Server) ( context.Context, *healthpb.HealthListRequest) (*healthpb.HealthListResponse, error) { .mu.RLock() defer .mu.RUnlock() if len(.statusMap) > maxAllowedServices { return nil, status.Errorf(codes.ResourceExhausted, "server health list exceeds maximum capacity: %d", maxAllowedServices) } := make(map[string]*healthpb.HealthCheckResponse, len(.statusMap)) for , := range .statusMap { [] = &healthpb.HealthCheckResponse{Status: } } return &healthpb.HealthListResponse{Statuses: }, nil } // Watch implements `service Health`. func ( *Server) ( *healthpb.HealthCheckRequest, healthgrpc.Health_WatchServer) error { := .Service // update channel is used for getting service status updates. := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) .mu.Lock() // Puts the initial status to the channel. if , := .statusMap[]; { <- } else { <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN } // Registers the update channel to the correct place in the updates map. if , := .updates[]; ! { .updates[] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus) } .updates[][] = defer func() { .mu.Lock() delete(.updates[], ) .mu.Unlock() }() .mu.Unlock() var healthpb.HealthCheckResponse_ServingStatus = -1 for { select { // Status updated. Sends the up-to-date status to the client. case := <-: if == { continue } = := .Send(&healthpb.HealthCheckResponse{Status: }) if != nil { return status.Error(codes.Canceled, "Stream has ended.") } // Context done. Removes the update channel from the updates map. case <-.Context().Done(): return status.Error(codes.Canceled, "Stream has ended.") } } } // SetServingStatus is called when need to reset the serving status of a service // or insert a new service entry into the statusMap. func ( *Server) ( string, healthpb.HealthCheckResponse_ServingStatus) { .mu.Lock() defer .mu.Unlock() if .shutdown { logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", , ) return } .setServingStatusLocked(, ) } func ( *Server) ( string, healthpb.HealthCheckResponse_ServingStatus) { .statusMap[] = for , := range .updates[] { // Clears previous updates, that are not sent to the client, from the channel. // This can happen if the client is not reading and the server gets flow control limited. select { case <-: default: } // Puts the most recent update to the channel. <- } } // Shutdown sets all serving status to NOT_SERVING, and configures the server to // ignore all future status changes. // // This changes serving status for all services. To set status for a particular // services, call SetServingStatus(). func ( *Server) () { .mu.Lock() defer .mu.Unlock() .shutdown = true for := range .statusMap { .setServingStatusLocked(, healthpb.HealthCheckResponse_NOT_SERVING) } } // Resume sets all serving status to SERVING, and configures the server to // accept all future status changes. // // This changes serving status for all services. To set status for a particular // services, call SetServingStatus(). func ( *Server) () { .mu.Lock() defer .mu.Unlock() .shutdown = false for := range .statusMap { .setServingStatusLocked(, healthpb.HealthCheckResponse_SERVING) } }