package grpc
import (
estats
istats
)
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)
func () {
internal.GetServerCredentials = func( *Server) credentials.TransportCredentials {
return .opts.creds
}
internal.IsRegisteredMethod = func( *Server, string) bool {
return .isRegisteredMethod()
}
internal.ServerFromContext = serverFromContext
internal.AddGlobalServerOptions = func( ...ServerOption) {
globalServerOptions = append(globalServerOptions, ...)
}
internal.ClearGlobalServerOptions = func() {
globalServerOptions = nil
}
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
internal.BufferPool = bufferPool
internal.MetricsRecorderForServer = func( *Server) estats.MetricsRecorder {
return istats.NewMetricsRecorderList(.opts.statsHandlers)
}
}
var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")
type MethodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
type MethodDesc struct {
MethodName string
Handler MethodHandler
}
type ServiceDesc struct {
ServiceName string
HandlerType any
Methods []MethodDesc
Streams []StreamDesc
Metadata any
}
type serviceInfo struct {
serviceImpl any
methods map[string]*MethodDesc
streams map[string]*StreamDesc
mdata any
}
type Server struct {
opts serverOptions
mu sync.Mutex
lis map[net.Listener]bool
conns map[string]map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond
services map[string]*serviceInfo
events traceEventLog
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup
handlersWG sync.WaitGroup
channelz *channelz.Server
serverWorkerChannel chan func()
serverWorkerChannelClose func()
}
type serverOptions struct {
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
sharedWriteBuffer bool
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
bufferPool mem.BufferPool
waitForHandlers bool
staticWindowSize bool
}
var defaultServerOptions = serverOptions{
maxConcurrentStreams: math.MaxUint32,
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
bufferPool: mem.DefaultBufferPool(),
}
var globalServerOptions []ServerOption
type ServerOption interface {
apply(*serverOptions)
}
type EmptyServerOption struct{}
func (EmptyServerOption) (*serverOptions) {}
type funcServerOption struct {
f func(*serverOptions)
}
func ( *funcServerOption) ( *serverOptions) {
.f()
}
func ( func(*serverOptions)) *funcServerOption {
return &funcServerOption{
f: ,
}
}
type joinServerOption struct {
opts []ServerOption
}
func ( *joinServerOption) ( *serverOptions) {
for , := range .opts {
.apply()
}
}
func ( ...ServerOption) ServerOption {
return &joinServerOption{opts: }
}
func ( bool) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.sharedWriteBuffer =
})
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.writeBufferSize =
})
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.readBufferSize =
})
}
func ( int32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.initialWindowSize =
.staticWindowSize = true
})
}
func ( int32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.initialConnWindowSize =
.staticWindowSize = true
})
}
func ( int32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.initialWindowSize =
.staticWindowSize = true
})
}
func ( int32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.initialConnWindowSize =
.staticWindowSize = true
})
}
func ( keepalive.ServerParameters) ServerOption {
if .Time > 0 && .Time < internal.KeepaliveMinServerPingTime {
logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
.Time = internal.KeepaliveMinServerPingTime
}
return newFuncServerOption(func( *serverOptions) {
.keepaliveParams =
})
}
func ( keepalive.EnforcementPolicy) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.keepalivePolicy =
})
}
func ( Codec) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.codec = newCodecV0Bridge()
})
}
func ( encoding.Codec) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.codec = newCodecV1Bridge()
})
}
func ( encoding.CodecV2) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.codec =
})
}
func ( Compressor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.cp =
})
}
func ( Decompressor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.dc =
})
}
func ( int) ServerOption {
return MaxRecvMsgSize()
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxReceiveMessageSize =
})
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxSendMessageSize =
})
}
func ( uint32) ServerOption {
if == 0 {
= math.MaxUint32
}
return newFuncServerOption(func( *serverOptions) {
.maxConcurrentStreams =
})
}
func ( credentials.TransportCredentials) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.creds =
})
}
func ( UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
.unaryInt =
})
}
func ( ...UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.chainUnaryInts = append(.chainUnaryInts, ...)
})
}
func ( StreamServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .streamInt != nil {
panic("The stream server interceptor was already set and may not be reset.")
}
.streamInt =
})
}
func ( ...StreamServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.chainStreamInts = append(.chainStreamInts, ...)
})
}
func ( tap.ServerInHandle) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .inTapHandle != nil {
panic("The tap handle was already set and may not be reset.")
}
.inTapHandle =
})
}
func ( stats.Handler) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if == nil {
logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
return
}
.statsHandlers = append(.statsHandlers, )
})
}
func ( binarylog.Logger) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.binaryLogger =
})
}
func ( StreamHandler) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.unknownStreamDesc = &StreamDesc{
StreamName: "unknown_service_handler",
Handler: ,
ClientStreams: true,
ServerStreams: true,
}
})
}
func ( time.Duration) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.connectionTimeout =
})
}
type MaxHeaderListSizeServerOption struct {
MaxHeaderListSize uint32
}
func ( MaxHeaderListSizeServerOption) ( *serverOptions) {
.maxHeaderListSize = &.MaxHeaderListSize
}
func ( uint32) ServerOption {
return MaxHeaderListSizeServerOption{
MaxHeaderListSize: ,
}
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.headerTableSize = &
})
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.numServerWorkers =
})
}
func ( bool) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.waitForHandlers =
})
}
func ( mem.BufferPool) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.bufferPool =
})
}
const serverWorkerResetThreshold = 1 << 16
func ( *Server) () {
for := 0; < serverWorkerResetThreshold; ++ {
, := <-.serverWorkerChannel
if ! {
return
}
()
}
go .()
}
func ( *Server) () {
.serverWorkerChannel = make(chan func())
.serverWorkerChannelClose = sync.OnceFunc(func() {
close(.serverWorkerChannel)
})
for := uint32(0); < .opts.numServerWorkers; ++ {
go .serverWorker()
}
}
func ( ...ServerOption) *Server {
:= defaultServerOptions
for , := range globalServerOptions {
.apply(&)
}
for , := range {
.apply(&)
}
:= &Server{
lis: make(map[net.Listener]bool),
opts: ,
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
channelz: channelz.RegisterServer(""),
}
chainUnaryServerInterceptors()
chainStreamServerInterceptors()
.cv = sync.NewCond(&.mu)
if EnableTracing {
, , , := runtime.Caller(1)
.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", , ))
}
if .opts.numServerWorkers > 0 {
.initServerWorkers()
}
channelz.Info(logger, .channelz, "Server created")
return
}
func ( *Server) ( string, ...any) {
if .events != nil {
.events.Printf(, ...)
}
}
func ( *Server) ( string, ...any) {
if .events != nil {
.events.Errorf(, ...)
}
}
type ServiceRegistrar interface {
RegisterService(desc *ServiceDesc, impl any)
}
func ( *Server) ( *ServiceDesc, any) {
if != nil {
:= reflect.TypeOf(.HandlerType).Elem()
:= reflect.TypeOf()
if !.Implements() {
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", , )
}
}
.register(, )
}
func ( *Server) ( *ServiceDesc, any) {
.mu.Lock()
defer .mu.Unlock()
.printf("RegisterService(%q)", .ServiceName)
if .serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", .ServiceName)
}
if , := .services[.ServiceName]; {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", .ServiceName)
}
:= &serviceInfo{
serviceImpl: ,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: .Metadata,
}
for := range .Methods {
:= &.Methods[]
.methods[.MethodName] =
}
for := range .Streams {
:= &.Streams[]
.streams[.StreamName] =
}
.services[.ServiceName] =
}
type MethodInfo struct {
Name string
IsClientStream bool
IsServerStream bool
}
type ServiceInfo struct {
Methods []MethodInfo
Metadata any
}
func ( *Server) () map[string]ServiceInfo {
:= make(map[string]ServiceInfo)
for , := range .services {
:= make([]MethodInfo, 0, len(.methods)+len(.streams))
for := range .methods {
= append(, MethodInfo{
Name: ,
IsClientStream: false,
IsServerStream: false,
})
}
for , := range .streams {
= append(, MethodInfo{
Name: ,
IsClientStream: .ClientStreams,
IsServerStream: .ServerStreams,
})
}
[] = ServiceInfo{
Methods: ,
Metadata: .mdata,
}
}
return
}
var ErrServerStopped = errors.New("grpc: the server has been stopped")
type listenSocket struct {
net.Listener
channelz *channelz.Socket
}
func ( *listenSocket) () error {
:= .Listener.Close()
channelz.RemoveEntry(.channelz.ID)
channelz.Info(logger, .channelz, "ListenSocket deleted")
return
}
func ( *Server) ( net.Listener) error {
.mu.Lock()
.printf("serving")
.serve = true
if .lis == nil {
.mu.Unlock()
.Close()
return ErrServerStopped
}
.serveWG.Add(1)
defer func() {
.serveWG.Done()
if .quit.HasFired() {
<-.done.Done()
}
}()
:= &listenSocket{
Listener: ,
channelz: channelz.RegisterSocket(&channelz.Socket{
SocketType: channelz.SocketTypeListen,
Parent: .channelz,
RefName: .Addr().String(),
LocalAddr: .Addr(),
SocketOptions: channelz.GetSocketOption()},
),
}
.lis[] = true
defer func() {
.mu.Lock()
if .lis != nil && .lis[] {
.Close()
delete(.lis, )
}
.mu.Unlock()
}()
.mu.Unlock()
channelz.Info(logger, .channelz, "ListenSocket created")
var time.Duration
for {
, := .Accept()
if != nil {
if , := .(interface {
() bool
}); && .() {
if == 0 {
= 5 * time.Millisecond
} else {
*= 2
}
if := 1 * time.Second; > {
=
}
.mu.Lock()
.printf("Accept error: %v; retrying in %v", , )
.mu.Unlock()
:= time.NewTimer()
select {
case <-.C:
case <-.quit.Done():
.Stop()
return nil
}
continue
}
.mu.Lock()
.printf("done serving; Accept = %v", )
.mu.Unlock()
if .quit.HasFired() {
return nil
}
return
}
= 0
.serveWG.Add(1)
go func() {
.handleRawConn(.Addr().String(), )
.serveWG.Done()
}()
}
}
func ( *Server) ( string, net.Conn) {
if .quit.HasFired() {
.Close()
return
}
.SetDeadline(time.Now().Add(.opts.connectionTimeout))
:= .newHTTP2Transport()
.SetDeadline(time.Time{})
if == nil {
return
}
if , := .(interface {
(transport.ServerTransport)
}); {
.()
}
if !.addConn(, ) {
return
}
go func() {
.serveStreams(context.Background(), , )
.removeConn(, )
}()
}
func ( *Server) ( net.Conn) transport.ServerTransport {
:= &transport.ServerConfig{
MaxStreams: .opts.maxConcurrentStreams,
ConnectionTimeout: .opts.connectionTimeout,
Credentials: .opts.creds,
InTapHandle: .opts.inTapHandle,
StatsHandlers: .opts.statsHandlers,
KeepaliveParams: .opts.keepaliveParams,
KeepalivePolicy: .opts.keepalivePolicy,
InitialWindowSize: .opts.initialWindowSize,
InitialConnWindowSize: .opts.initialConnWindowSize,
WriteBufferSize: .opts.writeBufferSize,
ReadBufferSize: .opts.readBufferSize,
SharedWriteBuffer: .opts.sharedWriteBuffer,
ChannelzParent: .channelz,
MaxHeaderListSize: .opts.maxHeaderListSize,
HeaderTableSize: .opts.headerTableSize,
BufferPool: .opts.bufferPool,
StaticWindowSize: .opts.staticWindowSize,
}
, := transport.NewServerTransport(, )
if != nil {
.mu.Lock()
.errorf("NewServerTransport(%q) failed: %v", .RemoteAddr(), )
.mu.Unlock()
if != credentials.ErrConnDispatched {
if != io.EOF {
channelz.Info(logger, .channelz, "grpc: Server.Serve failed to create ServerTransport: ", )
}
.Close()
}
return nil
}
return
}
func ( *Server) ( context.Context, transport.ServerTransport, net.Conn) {
= transport.SetConnection(, )
= peer.NewContext(, .Peer())
for , := range .opts.statsHandlers {
= .TagConn(, &stats.ConnTagInfo{
RemoteAddr: .Peer().Addr,
LocalAddr: .Peer().LocalAddr,
})
.HandleConn(, &stats.ConnBegin{})
}
defer func() {
.Close(errors.New("finished serving streams for the server transport"))
for , := range .opts.statsHandlers {
.HandleConn(, &stats.ConnEnd{})
}
}()
:= newHandlerQuota(.opts.maxConcurrentStreams)
.HandleStreams(, func( *transport.ServerStream) {
.handlersWG.Add(1)
.acquire()
:= func() {
defer .release()
defer .handlersWG.Done()
.handleStream(, )
}
if .opts.numServerWorkers > 0 {
select {
case .serverWorkerChannel <- :
return
default:
}
}
go ()
})
}
var _ http.Handler = (*Server)(nil)
func ( *Server) ( http.ResponseWriter, *http.Request) {
, := transport.NewServerHandlerTransport(, , .opts.statsHandlers, .opts.bufferPool)
if != nil {
return
}
if !.addConn(listenerAddressForServeHTTP, ) {
return
}
defer .removeConn(listenerAddressForServeHTTP, )
.serveStreams(.Context(), , nil)
}
func ( *Server) ( string, transport.ServerTransport) bool {
.mu.Lock()
defer .mu.Unlock()
if .conns == nil {
.Close(errors.New("Server.addConn called when server has already been stopped"))
return false
}
if .drain {
.Drain("")
}
if .conns[] == nil {
.conns[] = make(map[transport.ServerTransport]bool)
}
.conns[][] = true
return true
}
func ( *Server) ( string, transport.ServerTransport) {
.mu.Lock()
defer .mu.Unlock()
:= .conns[]
if != nil {
delete(, )
if len() == 0 {
delete(.conns, )
}
.cv.Broadcast()
}
}
func ( *Server) () {
.channelz.ServerMetrics.CallsStarted.Add(1)
.channelz.ServerMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
}
func ( *Server) () {
.channelz.ServerMetrics.CallsSucceeded.Add(1)
}
func ( *Server) () {
.channelz.ServerMetrics.CallsFailed.Add(1)
}
func ( *Server) ( context.Context, *transport.ServerStream, any, Compressor, *transport.WriteOptions, encoding.Compressor) error {
, := encode(.getCodec(.ContentSubtype()), )
if != nil {
channelz.Error(logger, .channelz, "grpc: server failed to encode response: ", )
return
}
, , := compress(, , , .opts.bufferPool)
if != nil {
.Free()
channelz.Error(logger, .channelz, "grpc: server failed to compress response: ", )
return
}
, := msgHeader(, , )
defer func() {
.Free()
.Free()
}()
:= .Len()
:= .Len()
if > .opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", , .opts.maxSendMessageSize)
}
= .Write(, , )
if == nil {
if len(.opts.statsHandlers) != 0 {
for , := range .opts.statsHandlers {
.HandleRPC(, outPayload(false, , , , time.Now()))
}
}
}
return
}
func ( *Server) {
:= .opts.chainUnaryInts
if .opts.unaryInt != nil {
= append([]UnaryServerInterceptor{.opts.unaryInt}, .opts.chainUnaryInts...)
}
var UnaryServerInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= chainUnaryInterceptors()
}
.opts.unaryInt =
}
func ( []UnaryServerInterceptor) UnaryServerInterceptor {
return func( context.Context, any, *UnaryServerInfo, UnaryHandler) (any, error) {
return [0](, , , getChainUnaryHandler(, 0, , ))
}
}
func ( []UnaryServerInterceptor, int, *UnaryServerInfo, UnaryHandler) UnaryHandler {
if == len()-1 {
return
}
return func( context.Context, any) (any, error) {
return [+1](, , , (, +1, , ))
}
}
func ( *Server) ( context.Context, *transport.ServerStream, *serviceInfo, *MethodDesc, *traceInfo) ( error) {
:= .opts.statsHandlers
if len() != 0 || != nil || channelz.IsOn() {
if channelz.IsOn() {
.incrCallsStarted()
}
var *stats.Begin
for , := range {
:= time.Now()
= &stats.Begin{
BeginTime: ,
IsClientStream: false,
IsServerStream: false,
}
.HandleRPC(, )
}
if != nil {
.tr.LazyLog(&.firstLine, false)
}
defer func() {
if != nil {
if != nil && != io.EOF {
.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
.tr.SetError()
}
.tr.Finish()
}
for , := range {
:= &stats.End{
BeginTime: .BeginTime,
EndTime: time.Now(),
}
if != nil && != io.EOF {
.Error = toRPCErr()
}
.HandleRPC(, )
}
if channelz.IsOn() {
if != nil && != io.EOF {
.incrCallsFailed()
} else {
.incrCallsSucceeded()
}
}
}()
}
var []binarylog.MethodLogger
if := binarylog.GetMethodLogger(.Method()); != nil {
= append(, )
}
if .opts.binaryLogger != nil {
if := .opts.binaryLogger.GetMethodLogger(.Method()); != nil {
= append(, )
}
}
if len() != 0 {
, := metadata.FromIncomingContext()
:= &binarylog.ClientHeader{
Header: ,
MethodName: .Method(),
PeerAddr: nil,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
if := [":authority"]; len() > 0 {
.Authority = [0]
}
if , := peer.FromContext(); {
.PeerAddr = .Addr
}
for , := range {
.Log(, )
}
}
var , encoding.Compressor
var Compressor
var Decompressor
var string
if := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() == {
= .opts.dc
} else if != "" && != encoding.Identity {
= encoding.GetCompressor()
if == nil {
:= status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
.WriteStatus()
return .Err()
}
}
if .opts.cp != nil {
= .opts.cp
= .Type()
} else if := .RecvCompress(); != "" && != encoding.Identity {
= encoding.GetCompressor()
if != nil {
= .Name()
}
}
if != "" {
if := .SetSendCompress(); != nil {
return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", )
}
}
var *payloadInfo
if len() != 0 || len() != 0 {
= &payloadInfo{}
defer .free()
}
, := recvAndDecompress(&parser{r: , bufferPool: .opts.bufferPool}, , , .opts.maxReceiveMessageSize, , , true)
if != nil {
if := .WriteStatus(status.Convert()); != nil {
channelz.Warningf(logger, .channelz, "grpc: Server.processUnaryRPC failed to write status: %v", )
}
return
}
:= false
:= func() {
if ! {
.Free()
= true
}
}
defer ()
:= func( any) error {
defer ()
if := .getCodec(.ContentSubtype()).Unmarshal(, ); != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", )
}
for , := range {
.HandleRPC(, &stats.InPayload{
RecvTime: time.Now(),
Payload: ,
Length: .Len(),
WireLength: .compressedLength + headerLen,
CompressedLength: .compressedLength,
})
}
if len() != 0 {
:= &binarylog.ClientMessage{
Message: .Materialize(),
}
for , := range {
.Log(, )
}
}
if != nil {
.tr.LazyLog(&payload{sent: false, msg: }, true)
}
return nil
}
= NewContextWithServerTransportStream(, )
, := .Handler(.serviceImpl, , , .opts.unaryInt)
if != nil {
, := status.FromError()
if ! {
= status.FromContextError()
= .Err()
}
if != nil {
.tr.LazyLog(stringer(.Message()), true)
.tr.SetError()
}
if := .WriteStatus(); != nil {
channelz.Warningf(logger, .channelz, "grpc: Server.processUnaryRPC failed to write status: %v", )
}
if len() != 0 {
if , := .Header(); .Len() > 0 {
:= &binarylog.ServerHeader{
Header: ,
}
for , := range {
.Log(, )
}
}
:= &binarylog.ServerTrailer{
Trailer: .Trailer(),
Err: ,
}
for , := range {
.Log(, )
}
}
return
}
if != nil {
.tr.LazyLog(stringer("OK"), false)
}
:= &transport.WriteOptions{Last: true}
if .SendCompress() != {
= encoding.GetCompressor(.SendCompress())
}
if := .sendResponse(, , , , , ); != nil {
if == io.EOF {
return
}
if , := status.FromError(); {
if := .WriteStatus(); != nil {
channelz.Warningf(logger, .channelz, "grpc: Server.processUnaryRPC failed to write status: %v", )
}
} else {
switch st := .(type) {
case transport.ConnectionError:
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", , ))
}
}
if len() != 0 {
, := .Header()
:= &binarylog.ServerHeader{
Header: ,
}
:= &binarylog.ServerTrailer{
Trailer: .Trailer(),
Err: ,
}
for , := range {
.Log(, )
.Log(, )
}
}
return
}
if len() != 0 {
, := .Header()
:= &binarylog.ServerHeader{
Header: ,
}
:= &binarylog.ServerMessage{
Message: ,
}
for , := range {
.Log(, )
.Log(, )
}
}
if != nil {
.tr.LazyLog(&payload{sent: true, msg: }, true)
}
if len() != 0 {
:= &binarylog.ServerTrailer{
Trailer: .Trailer(),
Err: ,
}
for , := range {
.Log(, )
}
}
return .WriteStatus(statusOK)
}
func ( *Server) {
:= .opts.chainStreamInts
if .opts.streamInt != nil {
= append([]StreamServerInterceptor{.opts.streamInt}, .opts.chainStreamInts...)
}
var StreamServerInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= chainStreamInterceptors()
}
.opts.streamInt =
}
func ( []StreamServerInterceptor) StreamServerInterceptor {
return func( any, ServerStream, *StreamServerInfo, StreamHandler) error {
return [0](, , , getChainStreamHandler(, 0, , ))
}
}
func ( []StreamServerInterceptor, int, *StreamServerInfo, StreamHandler) StreamHandler {
if == len()-1 {
return
}
return func( any, ServerStream) error {
return [+1](, , , (, +1, , ))
}
}
func ( *Server) ( context.Context, *transport.ServerStream, *serviceInfo, *StreamDesc, *traceInfo) ( error) {
if channelz.IsOn() {
.incrCallsStarted()
}
:= .opts.statsHandlers
var *stats.Begin
if len() != 0 {
:= time.Now()
= &stats.Begin{
BeginTime: ,
IsClientStream: .ClientStreams,
IsServerStream: .ServerStreams,
}
for , := range {
.HandleRPC(, )
}
}
= NewContextWithServerTransportStream(, )
:= &serverStream{
ctx: ,
s: ,
p: &parser{r: , bufferPool: .opts.bufferPool},
codec: .getCodec(.ContentSubtype()),
desc: ,
maxReceiveMessageSize: .opts.maxReceiveMessageSize,
maxSendMessageSize: .opts.maxSendMessageSize,
trInfo: ,
statsHandler: ,
}
if len() != 0 || != nil || channelz.IsOn() {
defer func() {
if != nil {
.mu.Lock()
if != nil && != io.EOF {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
.trInfo.tr.SetError()
}
.trInfo.tr.Finish()
.trInfo.tr = nil
.mu.Unlock()
}
if len() != 0 {
:= &stats.End{
BeginTime: .BeginTime,
EndTime: time.Now(),
}
if != nil && != io.EOF {
.Error = toRPCErr()
}
for , := range {
.HandleRPC(, )
}
}
if channelz.IsOn() {
if != nil && != io.EOF {
.incrCallsFailed()
} else {
.incrCallsSucceeded()
}
}
}()
}
if := binarylog.GetMethodLogger(.Method()); != nil {
.binlogs = append(.binlogs, )
}
if .opts.binaryLogger != nil {
if := .opts.binaryLogger.GetMethodLogger(.Method()); != nil {
.binlogs = append(.binlogs, )
}
}
if len(.binlogs) != 0 {
, := metadata.FromIncomingContext()
:= &binarylog.ClientHeader{
Header: ,
MethodName: .Method(),
PeerAddr: nil,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
if := [":authority"]; len() > 0 {
.Authority = [0]
}
if , := peer.FromContext(.Context()); {
.PeerAddr = .Addr
}
for , := range .binlogs {
.Log(, )
}
}
if := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() == {
.decompressorV0 = .opts.dc
} else if != "" && != encoding.Identity {
.decompressorV1 = encoding.GetCompressor()
if .decompressorV1 == nil {
:= status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
.s.WriteStatus()
return .Err()
}
}
if .opts.cp != nil {
.compressorV0 = .opts.cp
.sendCompressorName = .opts.cp.Type()
} else if := .RecvCompress(); != "" && != encoding.Identity {
.compressorV1 = encoding.GetCompressor()
if .compressorV1 != nil {
.sendCompressorName =
}
}
if .sendCompressorName != "" {
if := .SetSendCompress(.sendCompressorName); != nil {
return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", )
}
}
.ctx = newContextWithRPCInfo(.ctx, false, .codec, .compressorV0, .compressorV1)
if != nil {
.tr.LazyLog(&.firstLine, false)
}
var error
var any
if != nil {
= .serviceImpl
}
if .opts.streamInt == nil {
= .Handler(, )
} else {
:= &StreamServerInfo{
FullMethod: .Method(),
IsClientStream: .ClientStreams,
IsServerStream: .ServerStreams,
}
= .opts.streamInt(, , , .Handler)
}
if != nil {
, := status.FromError()
if ! {
= status.FromContextError()
= .Err()
}
if != nil {
.mu.Lock()
.trInfo.tr.LazyLog(stringer(.Message()), true)
.trInfo.tr.SetError()
.mu.Unlock()
}
if len(.binlogs) != 0 {
:= &binarylog.ServerTrailer{
Trailer: .s.Trailer(),
Err: ,
}
for , := range .binlogs {
.Log(, )
}
}
.s.WriteStatus()
return
}
if != nil {
.mu.Lock()
.trInfo.tr.LazyLog(stringer("OK"), false)
.mu.Unlock()
}
if len(.binlogs) != 0 {
:= &binarylog.ServerTrailer{
Trailer: .s.Trailer(),
Err: ,
}
for , := range .binlogs {
.Log(, )
}
}
return .s.WriteStatus(statusOK)
}
func ( *Server) ( transport.ServerTransport, *transport.ServerStream) {
:= .Context()
= contextWithServer(, )
var *traceInfo
if EnableTracing {
:= newTrace("grpc.Recv."+methodFamily(.Method()), .Method())
= newTraceContext(, )
= &traceInfo{
tr: ,
firstLine: firstLine{
client: false,
remoteAddr: .Peer().Addr,
},
}
if , := .Deadline(); {
.firstLine.deadline = time.Until()
}
}
:= .Method()
if != "" && [0] == '/' {
= [1:]
}
:= strings.LastIndex(, "/")
if == -1 {
if != nil {
.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{}}, true)
.tr.SetError()
}
:= fmt.Sprintf("malformed method name: %q", .Method())
if := .WriteStatus(status.New(codes.Unimplemented, )); != nil {
if != nil {
.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
.tr.SetError()
}
channelz.Warningf(logger, .channelz, "grpc: Server.handleStream failed to write status: %v", )
}
if != nil {
.tr.Finish()
}
return
}
:= [:]
:= [+1:]
if len(.opts.statsHandlers) > 0 {
, := metadata.FromIncomingContext()
for , := range .opts.statsHandlers {
= .TagRPC(, &stats.RPCTagInfo{FullMethodName: .Method()})
.HandleRPC(, &stats.InHeader{
FullMethod: .Method(),
RemoteAddr: .Peer().Addr,
LocalAddr: .Peer().LocalAddr,
Compression: .RecvCompress(),
WireLength: .HeaderWireLength(),
Header: ,
})
}
}
.SetContext()
, := .services[]
if {
if , := .methods[]; {
.processUnaryRPC(, , , , )
return
}
if , := .streams[]; {
.processStreamingRPC(, , , , )
return
}
}
if := .opts.unknownStreamDesc; != nil {
.processStreamingRPC(, , nil, , )
return
}
var string
if ! {
= fmt.Sprintf("unknown service %v", )
} else {
= fmt.Sprintf("unknown method %v for service %v", , )
}
if != nil {
.tr.LazyPrintf("%s", )
.tr.SetError()
}
if := .WriteStatus(status.New(codes.Unimplemented, )); != nil {
if != nil {
.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
.tr.SetError()
}
channelz.Warningf(logger, .channelz, "grpc: Server.handleStream failed to write status: %v", )
}
if != nil {
.tr.Finish()
}
}
type streamKey struct{}
func ( context.Context, ServerTransportStream) context.Context {
return context.WithValue(, streamKey{}, )
}
type ServerTransportStream interface {
Method() string
SetHeader(md metadata.MD) error
SendHeader(md metadata.MD) error
SetTrailer(md metadata.MD) error
}
func ( context.Context) ServerTransportStream {
, := .Value(streamKey{}).(ServerTransportStream)
return
}
func ( *Server) () {
.stop(false)
}
func ( *Server) () {
.stop(true)
}
func ( *Server) ( bool) {
.quit.Fire()
defer .done.Fire()
.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(.channelz.ID) })
.mu.Lock()
.closeListenersLocked()
.mu.Unlock()
.serveWG.Wait()
.mu.Lock()
defer .mu.Unlock()
if {
.drainAllServerTransportsLocked()
} else {
.closeServerTransportsLocked()
}
for len(.conns) != 0 {
.cv.Wait()
}
.conns = nil
if .opts.numServerWorkers > 0 {
.serverWorkerChannelClose()
}
if || .opts.waitForHandlers {
.handlersWG.Wait()
}
if .events != nil {
.events.Finish()
.events = nil
}
}
func ( *Server) () {
for , := range .conns {
for := range {
.Close(errors.New("Server.Stop called"))
}
}
}
func ( *Server) () {
if !.drain {
for , := range .conns {
for := range {
.Drain("graceful_stop")
}
}
.drain = true
}
}
func ( *Server) () {
for := range .lis {
.Close()
}
.lis = nil
}
func ( *Server) ( string) baseCodec {
if .opts.codec != nil {
return .opts.codec
}
if == "" {
return getCodec(proto.Name)
}
:= getCodec()
if == nil {
logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", , proto.Name)
return getCodec(proto.Name)
}
return
}
type serverKey struct{}
func ( context.Context) *Server {
, := .Value(serverKey{}).(*Server)
return
}
func ( context.Context, *Server) context.Context {
return context.WithValue(, serverKey{}, )
}
func ( *Server) ( string) bool {
if != "" && [0] == '/' {
= [1:]
}
:= strings.LastIndex(, "/")
if == -1 {
return false
}
:= [:]
:= [+1:]
, := .services[]
if {
if , := .methods[]; {
return true
}
if , := .streams[]; {
return true
}
}
return false
}
func ( context.Context, metadata.MD) error {
if .Len() == 0 {
return nil
}
:= ServerTransportStreamFromContext()
if == nil {
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
}
return .SetHeader()
}
func ( context.Context, metadata.MD) error {
:= ServerTransportStreamFromContext()
if == nil {
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
}
if := .SendHeader(); != nil {
return toRPCErr()
}
return nil
}
func ( context.Context, string) error {
, := ServerTransportStreamFromContext().(*transport.ServerStream)
if ! || == nil {
return fmt.Errorf("failed to fetch the stream from the given context")
}
if := validateSendCompressor(, .ClientAdvertisedCompressors()); != nil {
return fmt.Errorf("unable to set send compressor: %w", )
}
return .SetSendCompress()
}
func ( context.Context) ([]string, error) {
, := ServerTransportStreamFromContext().(*transport.ServerStream)
if ! || == nil {
return nil, fmt.Errorf("failed to fetch the stream from the given context %v", )
}
return .ClientAdvertisedCompressors(), nil
}
func ( context.Context, metadata.MD) error {
if .Len() == 0 {
return nil
}
:= ServerTransportStreamFromContext()
if == nil {
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
}
return .SetTrailer()
}
func ( context.Context) (string, bool) {
:= ServerTransportStreamFromContext()
if == nil {
return "", false
}
return .Method(), true
}
func ( string, []string) error {
if == encoding.Identity {
return nil
}
if !grpcutil.IsCompressorNameRegistered() {
return fmt.Errorf("compressor not registered %q", )
}
for , := range {
if == {
return nil
}
}
return fmt.Errorf("client does not support compressor %q", )
}
type atomicSemaphore struct {
n atomic.Int64
wait chan struct{}
}
func ( *atomicSemaphore) () {
if .n.Add(-1) < 0 {
<-.wait
}
}
func ( *atomicSemaphore) () {
if .n.Add(1) <= 0 {
.wait <- struct{}{}
}
}
func ( uint32) *atomicSemaphore {
:= &atomicSemaphore{wait: make(chan struct{}, 1)}
.n.Store(int64())
return
}