package health
import (
"context"
"sync"
"google.golang.org/grpc/codes"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)
const (
maxAllowedServices = 100
)
type Server struct {
healthgrpc .UnimplementedHealthServer
mu sync .RWMutex
shutdown bool
statusMap map [string ]healthpb .HealthCheckResponse_ServingStatus
updates map [string ]map [healthgrpc .Health_WatchServer ]chan healthpb .HealthCheckResponse_ServingStatus
}
func NewServer () *Server {
return &Server {
statusMap : map [string ]healthpb .HealthCheckResponse_ServingStatus {"" : healthpb .HealthCheckResponse_SERVING },
updates : make (map [string ]map [healthgrpc .Health_WatchServer ]chan healthpb .HealthCheckResponse_ServingStatus ),
}
}
func (s *Server ) Check (_ context .Context , in *healthpb .HealthCheckRequest ) (*healthpb .HealthCheckResponse , error ) {
s .mu .RLock ()
defer s .mu .RUnlock ()
if servingStatus , ok := s .statusMap [in .Service ]; ok {
return &healthpb .HealthCheckResponse {
Status : servingStatus ,
}, nil
}
return nil , status .Error (codes .NotFound , "unknown service" )
}
func (s *Server ) List (_ context .Context , _ *healthpb .HealthListRequest ) (*healthpb .HealthListResponse , error ) {
s .mu .RLock ()
defer s .mu .RUnlock ()
if len (s .statusMap ) > maxAllowedServices {
return nil , status .Errorf (codes .ResourceExhausted , "server health list exceeds maximum capacity: %d" , maxAllowedServices )
}
statusMap := make (map [string ]*healthpb .HealthCheckResponse , len (s .statusMap ))
for k , v := range s .statusMap {
statusMap [k ] = &healthpb .HealthCheckResponse {Status : v }
}
return &healthpb .HealthListResponse {Statuses : statusMap }, nil
}
func (s *Server ) Watch (in *healthpb .HealthCheckRequest , stream healthgrpc .Health_WatchServer ) error {
service := in .Service
update := make (chan healthpb .HealthCheckResponse_ServingStatus , 1 )
s .mu .Lock ()
if servingStatus , ok := s .statusMap [service ]; ok {
update <- servingStatus
} else {
update <- healthpb .HealthCheckResponse_SERVICE_UNKNOWN
}
if _ , ok := s .updates [service ]; !ok {
s .updates [service ] = make (map [healthgrpc .Health_WatchServer ]chan healthpb .HealthCheckResponse_ServingStatus )
}
s .updates [service ][stream ] = update
defer func () {
s .mu .Lock ()
delete (s .updates [service ], stream )
s .mu .Unlock ()
}()
s .mu .Unlock ()
var lastSentStatus healthpb .HealthCheckResponse_ServingStatus = -1
for {
select {
case servingStatus := <- update :
if lastSentStatus == servingStatus {
continue
}
lastSentStatus = servingStatus
err := stream .Send (&healthpb .HealthCheckResponse {Status : servingStatus })
if err != nil {
return status .Error (codes .Canceled , "Stream has ended." )
}
case <- stream .Context ().Done ():
return status .Error (codes .Canceled , "Stream has ended." )
}
}
}
func (s *Server ) SetServingStatus (service string , servingStatus healthpb .HealthCheckResponse_ServingStatus ) {
s .mu .Lock ()
defer s .mu .Unlock ()
if s .shutdown {
logger .Infof ("health: status changing for %s to %v is ignored because health service is shutdown" , service , servingStatus )
return
}
s .setServingStatusLocked (service , servingStatus )
}
func (s *Server ) setServingStatusLocked (service string , servingStatus healthpb .HealthCheckResponse_ServingStatus ) {
s .statusMap [service ] = servingStatus
for _ , update := range s .updates [service ] {
select {
case <- update :
default :
}
update <- servingStatus
}
}
func (s *Server ) Shutdown () {
s .mu .Lock ()
defer s .mu .Unlock ()
s .shutdown = true
for service := range s .statusMap {
s .setServingStatusLocked (service , healthpb .HealthCheckResponse_NOT_SERVING )
}
}
func (s *Server ) Resume () {
s .mu .Lock ()
defer s .mu .Unlock ()
s .shutdown = false
for service := range s .statusMap {
s .setServingStatusLocked (service , healthpb .HealthCheckResponse_SERVING )
}
}