package grpc
import (
)
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)
func () {
internal.GetServerCredentials = func( *Server) credentials.TransportCredentials {
return .opts.creds
}
internal.DrainServerTransports = func( *Server, string) {
.drainServerTransports()
}
internal.AddGlobalServerOptions = func( ...ServerOption) {
extraServerOptions = append(extraServerOptions, ...)
}
internal.ClearGlobalServerOptions = func() {
extraServerOptions = nil
}
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
}
var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
type MethodDesc struct {
MethodName string
Handler methodHandler
}
type ServiceDesc struct {
ServiceName string
HandlerType interface{}
Methods []MethodDesc
Streams []StreamDesc
Metadata interface{}
}
type serviceInfo struct {
serviceImpl interface{}
methods map[string]*MethodDesc
streams map[string]*StreamDesc
mdata interface{}
}
type serverWorkerData struct {
st transport.ServerTransport
wg *sync.WaitGroup
stream *transport.Stream
}
type Server struct {
opts serverOptions
mu sync.Mutex
lis map[net.Listener]bool
conns map[string]map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond
services map[string]*serviceInfo
events trace.EventLog
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup
channelzID *channelz.Identifier
czData *channelzData
serverWorkerChannels []chan *serverWorkerData
}
type serverOptions struct {
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
}
var defaultServerOptions = serverOptions{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
}
var extraServerOptions []ServerOption
type ServerOption interface {
apply(*serverOptions)
}
type EmptyServerOption struct{}
func (EmptyServerOption) (*serverOptions) {}
type funcServerOption struct {
f func(*serverOptions)
}
func ( *funcServerOption) ( *serverOptions) {
.f()
}
func ( func(*serverOptions)) *funcServerOption {
return &funcServerOption{
f: ,
}
}
type joinServerOption struct {
opts []ServerOption
}
func ( *joinServerOption) ( *serverOptions) {
for , := range .opts {
.apply()
}
}
func ( ...ServerOption) ServerOption {
return &joinServerOption{opts: }
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.writeBufferSize =
})
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.readBufferSize =
})
}
func ( int32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.initialWindowSize =
})
}
func ( int32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.initialConnWindowSize =
})
}
func ( keepalive.ServerParameters) ServerOption {
if .Time > 0 && .Time < time.Second {
logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
.Time = time.Second
}
return newFuncServerOption(func( *serverOptions) {
.keepaliveParams =
})
}
func ( keepalive.EnforcementPolicy) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.keepalivePolicy =
})
}
func ( Codec) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.codec =
})
}
func ( encoding.Codec) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.codec =
})
}
func ( Compressor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.cp =
})
}
func ( Decompressor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.dc =
})
}
func ( int) ServerOption {
return MaxRecvMsgSize()
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxReceiveMessageSize =
})
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxSendMessageSize =
})
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxConcurrentStreams =
})
}
func ( credentials.TransportCredentials) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.creds =
})
}
func ( UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
.unaryInt =
})
}
func ( ...UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.chainUnaryInts = append(.chainUnaryInts, ...)
})
}
func ( StreamServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .streamInt != nil {
panic("The stream server interceptor was already set and may not be reset.")
}
.streamInt =
})
}
func ( ...StreamServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.chainStreamInts = append(.chainStreamInts, ...)
})
}
func ( tap.ServerInHandle) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .inTapHandle != nil {
panic("The tap handle was already set and may not be reset.")
}
.inTapHandle =
})
}
func ( stats.Handler) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if == nil {
logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
return
}
.statsHandlers = append(.statsHandlers, )
})
}
func ( binarylog.Logger) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.binaryLogger =
})
}
func ( StreamHandler) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.unknownStreamDesc = &StreamDesc{
StreamName: "unknown_service_handler",
Handler: ,
ClientStreams: true,
ServerStreams: true,
}
})
}
func ( time.Duration) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.connectionTimeout =
})
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxHeaderListSize = &
})
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.headerTableSize = &
})
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.numServerWorkers =
})
}
const serverWorkerResetThreshold = 1 << 16
func ( *Server) ( chan *serverWorkerData) {
:= serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
for := 0; < ; ++ {
, := <-
if ! {
return
}
.handleStream(.st, .stream, .traceInfo(.st, .stream))
.wg.Done()
}
go .()
}
func ( *Server) () {
.serverWorkerChannels = make([]chan *serverWorkerData, .opts.numServerWorkers)
for := uint32(0); < .opts.numServerWorkers; ++ {
.serverWorkerChannels[] = make(chan *serverWorkerData)
go .serverWorker(.serverWorkerChannels[])
}
}
func ( *Server) () {
for := uint32(0); < .opts.numServerWorkers; ++ {
close(.serverWorkerChannels[])
}
}
func ( ...ServerOption) *Server {
:= defaultServerOptions
for , := range extraServerOptions {
.apply(&)
}
for , := range {
.apply(&)
}
:= &Server{
lis: make(map[net.Listener]bool),
opts: ,
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
chainUnaryServerInterceptors()
chainStreamServerInterceptors()
.cv = sync.NewCond(&.mu)
if EnableTracing {
, , , := runtime.Caller(1)
.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", , ))
}
if .opts.numServerWorkers > 0 {
.initServerWorkers()
}
.channelzID = channelz.RegisterServer(&channelzServer{}, "")
channelz.Info(logger, .channelzID, "Server created")
return
}
func ( *Server) ( string, ...interface{}) {
if .events != nil {
.events.Printf(, ...)
}
}
func ( *Server) ( string, ...interface{}) {
if .events != nil {
.events.Errorf(, ...)
}
}
type ServiceRegistrar interface {
RegisterService(desc *ServiceDesc, impl interface{})
}
func ( *Server) ( *ServiceDesc, interface{}) {
if != nil {
:= reflect.TypeOf(.HandlerType).Elem()
:= reflect.TypeOf()
if !.Implements() {
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", , )
}
}
.register(, )
}
func ( *Server) ( *ServiceDesc, interface{}) {
.mu.Lock()
defer .mu.Unlock()
.printf("RegisterService(%q)", .ServiceName)
if .serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", .ServiceName)
}
if , := .services[.ServiceName]; {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", .ServiceName)
}
:= &serviceInfo{
serviceImpl: ,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: .Metadata,
}
for := range .Methods {
:= &.Methods[]
.methods[.MethodName] =
}
for := range .Streams {
:= &.Streams[]
.streams[.StreamName] =
}
.services[.ServiceName] =
}
type MethodInfo struct {
Name string
IsClientStream bool
IsServerStream bool
}
type ServiceInfo struct {
Methods []MethodInfo
Metadata interface{}
}
func ( *Server) () map[string]ServiceInfo {
:= make(map[string]ServiceInfo)
for , := range .services {
:= make([]MethodInfo, 0, len(.methods)+len(.streams))
for := range .methods {
= append(, MethodInfo{
Name: ,
IsClientStream: false,
IsServerStream: false,
})
}
for , := range .streams {
= append(, MethodInfo{
Name: ,
IsClientStream: .ClientStreams,
IsServerStream: .ServerStreams,
})
}
[] = ServiceInfo{
Methods: ,
Metadata: .mdata,
}
}
return
}
var ErrServerStopped = errors.New("grpc: the server has been stopped")
type listenSocket struct {
net.Listener
channelzID *channelz.Identifier
}
func ( *listenSocket) () *channelz.SocketInternalMetric {
return &channelz.SocketInternalMetric{
SocketOptions: channelz.GetSocketOption(.Listener),
LocalAddr: .Listener.Addr(),
}
}
func ( *listenSocket) () error {
:= .Listener.Close()
channelz.RemoveEntry(.channelzID)
channelz.Info(logger, .channelzID, "ListenSocket deleted")
return
}
func ( *Server) ( net.Listener) error {
.mu.Lock()
.printf("serving")
.serve = true
if .lis == nil {
.mu.Unlock()
.Close()
return ErrServerStopped
}
.serveWG.Add(1)
defer func() {
.serveWG.Done()
if .quit.HasFired() {
<-.done.Done()
}
}()
:= &listenSocket{Listener: }
.lis[] = true
defer func() {
.mu.Lock()
if .lis != nil && .lis[] {
.Close()
delete(.lis, )
}
.mu.Unlock()
}()
var error
.channelzID, = channelz.RegisterListenSocket(, .channelzID, .Addr().String())
if != nil {
.mu.Unlock()
return
}
.mu.Unlock()
channelz.Info(logger, .channelzID, "ListenSocket created")
var time.Duration
for {
, := .Accept()
if != nil {
if , := .(interface {
() bool
}); && .() {
if == 0 {
= 5 * time.Millisecond
} else {
*= 2
}
if := 1 * time.Second; > {
=
}
.mu.Lock()
.printf("Accept error: %v; retrying in %v", , )
.mu.Unlock()
:= time.NewTimer()
select {
case <-.C:
case <-.quit.Done():
.Stop()
return nil
}
continue
}
.mu.Lock()
.printf("done serving; Accept = %v", )
.mu.Unlock()
if .quit.HasFired() {
return nil
}
return
}
= 0
.serveWG.Add(1)
go func() {
.handleRawConn(.Addr().String(), )
.serveWG.Done()
}()
}
}
func ( *Server) ( string, net.Conn) {
if .quit.HasFired() {
.Close()
return
}
.SetDeadline(time.Now().Add(.opts.connectionTimeout))
:= .newHTTP2Transport()
.SetDeadline(time.Time{})
if == nil {
return
}
if !.addConn(, ) {
return
}
go func() {
.serveStreams()
.removeConn(, )
}()
}
func ( *Server) ( string) {
.mu.Lock()
:= .conns[]
for := range {
.Drain()
}
.mu.Unlock()
}
func ( *Server) ( net.Conn) transport.ServerTransport {
:= &transport.ServerConfig{
MaxStreams: .opts.maxConcurrentStreams,
ConnectionTimeout: .opts.connectionTimeout,
Credentials: .opts.creds,
InTapHandle: .opts.inTapHandle,
StatsHandlers: .opts.statsHandlers,
KeepaliveParams: .opts.keepaliveParams,
KeepalivePolicy: .opts.keepalivePolicy,
InitialWindowSize: .opts.initialWindowSize,
InitialConnWindowSize: .opts.initialConnWindowSize,
WriteBufferSize: .opts.writeBufferSize,
ReadBufferSize: .opts.readBufferSize,
ChannelzParentID: .channelzID,
MaxHeaderListSize: .opts.maxHeaderListSize,
HeaderTableSize: .opts.headerTableSize,
}
, := transport.NewServerTransport(, )
if != nil {
.mu.Lock()
.errorf("NewServerTransport(%q) failed: %v", .RemoteAddr(), )
.mu.Unlock()
if != credentials.ErrConnDispatched {
if != io.EOF {
channelz.Info(logger, .channelzID, "grpc: Server.Serve failed to create ServerTransport: ", )
}
.Close()
}
return nil
}
return
}
func ( *Server) ( transport.ServerTransport) {
defer .Close(errors.New("finished serving streams for the server transport"))
var sync.WaitGroup
var uint32
.HandleStreams(func( *transport.Stream) {
.Add(1)
if .opts.numServerWorkers > 0 {
:= &serverWorkerData{st: , wg: &, stream: }
select {
case .serverWorkerChannels[atomic.AddUint32(&, 1)%.opts.numServerWorkers] <- :
default:
go func() {
.handleStream(, , .traceInfo(, ))
.Done()
}()
}
} else {
go func() {
defer .Done()
.handleStream(, , .traceInfo(, ))
}()
}
}, func( context.Context, string) context.Context {
if !EnableTracing {
return
}
:= trace.New("grpc.Recv."+methodFamily(), )
return trace.NewContext(, )
})
.Wait()
}
var _ http.Handler = (*Server)(nil)
func ( *Server) ( http.ResponseWriter, *http.Request) {
, := transport.NewServerHandlerTransport(, , .opts.statsHandlers)
if != nil {
return
}
if !.addConn(listenerAddressForServeHTTP, ) {
return
}
defer .removeConn(listenerAddressForServeHTTP, )
.serveStreams()
}
func ( *Server) ( transport.ServerTransport, *transport.Stream) ( *traceInfo) {
if !EnableTracing {
return nil
}
, := trace.FromContext(.Context())
if ! {
return nil
}
= &traceInfo{
tr: ,
firstLine: firstLine{
client: false,
remoteAddr: .RemoteAddr(),
},
}
if , := .Context().Deadline(); {
.firstLine.deadline = time.Until()
}
return
}
func ( *Server) ( string, transport.ServerTransport) bool {
.mu.Lock()
defer .mu.Unlock()
if .conns == nil {
.Close(errors.New("Server.addConn called when server has already been stopped"))
return false
}
if .drain {
.Drain()
}
if .conns[] == nil {
.conns[] = make(map[transport.ServerTransport]bool)
}
.conns[][] = true
return true
}
func ( *Server) ( string, transport.ServerTransport) {
.mu.Lock()
defer .mu.Unlock()
:= .conns[]
if != nil {
delete(, )
if len() == 0 {
delete(.conns, )
}
.cv.Broadcast()
}
}
func ( *Server) () *channelz.ServerInternalMetric {
return &channelz.ServerInternalMetric{
CallsStarted: atomic.LoadInt64(&.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastCallStartedTime)),
}
}
func ( *Server) () {
atomic.AddInt64(&.czData.callsStarted, 1)
atomic.StoreInt64(&.czData.lastCallStartedTime, time.Now().UnixNano())
}
func ( *Server) () {
atomic.AddInt64(&.czData.callsSucceeded, 1)
}
func ( *Server) () {
atomic.AddInt64(&.czData.callsFailed, 1)
}
func ( *Server) ( transport.ServerTransport, *transport.Stream, interface{}, Compressor, *transport.Options, encoding.Compressor) error {
, := encode(.getCodec(.ContentSubtype()), )
if != nil {
channelz.Error(logger, .channelzID, "grpc: server failed to encode response: ", )
return
}
, := compress(, , )
if != nil {
channelz.Error(logger, .channelzID, "grpc: server failed to compress response: ", )
return
}
, := msgHeader(, )
if len() > .opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(), .opts.maxSendMessageSize)
}
= .Write(, , , )
if == nil {
for , := range .opts.statsHandlers {
.HandleRPC(.Context(), outPayload(false, , , , time.Now()))
}
}
return
}
func ( *Server) {
:= .opts.chainUnaryInts
if .opts.unaryInt != nil {
= append([]UnaryServerInterceptor{.opts.unaryInt}, .opts.chainUnaryInts...)
}
var UnaryServerInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= chainUnaryInterceptors()
}
.opts.unaryInt =
}
func ( []UnaryServerInterceptor) UnaryServerInterceptor {
return func( context.Context, interface{}, *UnaryServerInfo, UnaryHandler) (interface{}, error) {
return [0](, , , getChainUnaryHandler(, 0, , ))
}
}
func ( []UnaryServerInterceptor, int, *UnaryServerInfo, UnaryHandler) UnaryHandler {
if == len()-1 {
return
}
return func( context.Context, interface{}) (interface{}, error) {
return [+1](, , , (, +1, , ))
}
}
func ( *Server) ( transport.ServerTransport, *transport.Stream, *serviceInfo, *MethodDesc, *traceInfo) ( error) {
:= .opts.statsHandlers
if len() != 0 || != nil || channelz.IsOn() {
if channelz.IsOn() {
.incrCallsStarted()
}
var *stats.Begin
for , := range {
:= time.Now()
= &stats.Begin{
BeginTime: ,
IsClientStream: false,
IsServerStream: false,
}
.HandleRPC(.Context(), )
}
if != nil {
.tr.LazyLog(&.firstLine, false)
}
defer func() {
if != nil {
if != nil && != io.EOF {
.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.tr.SetError()
}
.tr.Finish()
}
for , := range {
:= &stats.End{
BeginTime: .BeginTime,
EndTime: time.Now(),
}
if != nil && != io.EOF {
.Error = toRPCErr()
}
.HandleRPC(.Context(), )
}
if channelz.IsOn() {
if != nil && != io.EOF {
.incrCallsFailed()
} else {
.incrCallsSucceeded()
}
}
}()
}
var []binarylog.MethodLogger
if := binarylog.GetMethodLogger(.Method()); != nil {
= append(, )
}
if .opts.binaryLogger != nil {
if := .opts.binaryLogger.GetMethodLogger(.Method()); != nil {
= append(, )
}
}
if len() != 0 {
:= .Context()
, := metadata.FromIncomingContext()
:= &binarylog.ClientHeader{
Header: ,
MethodName: .Method(),
PeerAddr: nil,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
if := [":authority"]; len() > 0 {
.Authority = [0]
}
if , := peer.FromContext(); {
.PeerAddr = .Addr
}
for , := range {
.Log()
}
}
var , encoding.Compressor
var Compressor
var Decompressor
if := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() == {
= .opts.dc
} else if != "" && != encoding.Identity {
= encoding.GetCompressor()
if == nil {
:= status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
.WriteStatus(, )
return .Err()
}
}
if .opts.cp != nil {
= .opts.cp
.SetSendCompress(.Type())
} else if := .RecvCompress(); != "" && != encoding.Identity {
= encoding.GetCompressor()
if != nil {
.SetSendCompress()
}
}
var *payloadInfo
if len() != 0 || len() != 0 {
= &payloadInfo{}
}
, := recvAndDecompress(&parser{r: }, , , .opts.maxReceiveMessageSize, , )
if != nil {
if := .WriteStatus(, status.Convert()); != nil {
channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
}
return
}
if channelz.IsOn() {
.IncrMsgRecv()
}
:= func( interface{}) error {
if := .getCodec(.ContentSubtype()).Unmarshal(, ); != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", )
}
for , := range {
.HandleRPC(.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: ,
WireLength: .wireLength + headerLen,
Data: ,
Length: len(),
})
}
if len() != 0 {
:= &binarylog.ClientMessage{
Message: ,
}
for , := range {
.Log()
}
}
if != nil {
.tr.LazyLog(&payload{sent: false, msg: }, true)
}
return nil
}
:= NewContextWithServerTransportStream(.Context(), )
, := .Handler(.serviceImpl, , , .opts.unaryInt)
if != nil {
, := status.FromError()
if ! {
= status.FromContextError()
= .Err()
}
if != nil {
.tr.LazyLog(stringer(.Message()), true)
.tr.SetError()
}
if := .WriteStatus(, ); != nil {
channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
}
if len() != 0 {
if , := .Header(); .Len() > 0 {
:= &binarylog.ServerHeader{
Header: ,
}
for , := range {
.Log()
}
}
:= &binarylog.ServerTrailer{
Trailer: .Trailer(),
Err: ,
}
for , := range {
.Log()
}
}
return
}
if != nil {
.tr.LazyLog(stringer("OK"), false)
}
:= &transport.Options{Last: true}
if := .sendResponse(, , , , , ); != nil {
if == io.EOF {
return
}
if , := status.FromError(); {
if := .WriteStatus(, ); != nil {
channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
}
} else {
switch st := .(type) {
case transport.ConnectionError:
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", , ))
}
}
if len() != 0 {
, := .Header()
:= &binarylog.ServerHeader{
Header: ,
}
:= &binarylog.ServerTrailer{
Trailer: .Trailer(),
Err: ,
}
for , := range {
.Log()
.Log()
}
}
return
}
if len() != 0 {
, := .Header()
:= &binarylog.ServerHeader{
Header: ,
}
:= &binarylog.ServerMessage{
Message: ,
}
for , := range {
.Log()
.Log()
}
}
if channelz.IsOn() {
.IncrMsgSent()
}
if != nil {
.tr.LazyLog(&payload{sent: true, msg: }, true)
}
= .WriteStatus(, statusOK)
if len() != 0 {
:= &binarylog.ServerTrailer{
Trailer: .Trailer(),
Err: ,
}
for , := range {
.Log()
}
}
return
}
func ( *Server) {
:= .opts.chainStreamInts
if .opts.streamInt != nil {
= append([]StreamServerInterceptor{.opts.streamInt}, .opts.chainStreamInts...)
}
var StreamServerInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= chainStreamInterceptors()
}
.opts.streamInt =
}
func ( []StreamServerInterceptor) StreamServerInterceptor {
return func( interface{}, ServerStream, *StreamServerInfo, StreamHandler) error {
return [0](, , , getChainStreamHandler(, 0, , ))
}
}
func ( []StreamServerInterceptor, int, *StreamServerInfo, StreamHandler) StreamHandler {
if == len()-1 {
return
}
return func( interface{}, ServerStream) error {
return [+1](, , , (, +1, , ))
}
}
func ( *Server) ( transport.ServerTransport, *transport.Stream, *serviceInfo, *StreamDesc, *traceInfo) ( error) {
if channelz.IsOn() {
.incrCallsStarted()
}
:= .opts.statsHandlers
var *stats.Begin
if len() != 0 {
:= time.Now()
= &stats.Begin{
BeginTime: ,
IsClientStream: .ClientStreams,
IsServerStream: .ServerStreams,
}
for , := range {
.HandleRPC(.Context(), )
}
}
:= NewContextWithServerTransportStream(.Context(), )
:= &serverStream{
ctx: ,
t: ,
s: ,
p: &parser{r: },
codec: .getCodec(.ContentSubtype()),
maxReceiveMessageSize: .opts.maxReceiveMessageSize,
maxSendMessageSize: .opts.maxSendMessageSize,
trInfo: ,
statsHandler: ,
}
if len() != 0 || != nil || channelz.IsOn() {
defer func() {
if != nil {
.mu.Lock()
if != nil && != io.EOF {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.trInfo.tr.SetError()
}
.trInfo.tr.Finish()
.trInfo.tr = nil
.mu.Unlock()
}
if len() != 0 {
:= &stats.End{
BeginTime: .BeginTime,
EndTime: time.Now(),
}
if != nil && != io.EOF {
.Error = toRPCErr()
}
for , := range {
.HandleRPC(.Context(), )
}
}
if channelz.IsOn() {
if != nil && != io.EOF {
.incrCallsFailed()
} else {
.incrCallsSucceeded()
}
}
}()
}
if := binarylog.GetMethodLogger(.Method()); != nil {
.binlogs = append(.binlogs, )
}
if .opts.binaryLogger != nil {
if := .opts.binaryLogger.GetMethodLogger(.Method()); != nil {
.binlogs = append(.binlogs, )
}
}
if len(.binlogs) != 0 {
, := metadata.FromIncomingContext()
:= &binarylog.ClientHeader{
Header: ,
MethodName: .Method(),
PeerAddr: nil,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
if := [":authority"]; len() > 0 {
.Authority = [0]
}
if , := peer.FromContext(.Context()); {
.PeerAddr = .Addr
}
for , := range .binlogs {
.Log()
}
}
if := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() == {
.dc = .opts.dc
} else if != "" && != encoding.Identity {
.decomp = encoding.GetCompressor()
if .decomp == nil {
:= status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
.WriteStatus(.s, )
return .Err()
}
}
if .opts.cp != nil {
.cp = .opts.cp
.SetSendCompress(.opts.cp.Type())
} else if := .RecvCompress(); != "" && != encoding.Identity {
.comp = encoding.GetCompressor()
if .comp != nil {
.SetSendCompress()
}
}
.ctx = newContextWithRPCInfo(.ctx, false, .codec, .cp, .comp)
if != nil {
.tr.LazyLog(&.firstLine, false)
}
var error
var interface{}
if != nil {
= .serviceImpl
}
if .opts.streamInt == nil {
= .Handler(, )
} else {
:= &StreamServerInfo{
FullMethod: .Method(),
IsClientStream: .ClientStreams,
IsServerStream: .ServerStreams,
}
= .opts.streamInt(, , , .Handler)
}
if != nil {
, := status.FromError()
if ! {
= status.FromContextError()
= .Err()
}
if != nil {
.mu.Lock()
.trInfo.tr.LazyLog(stringer(.Message()), true)
.trInfo.tr.SetError()
.mu.Unlock()
}
.WriteStatus(.s, )
if len(.binlogs) != 0 {
:= &binarylog.ServerTrailer{
Trailer: .s.Trailer(),
Err: ,
}
for , := range .binlogs {
.Log()
}
}
return
}
if != nil {
.mu.Lock()
.trInfo.tr.LazyLog(stringer("OK"), false)
.mu.Unlock()
}
= .WriteStatus(.s, statusOK)
if len(.binlogs) != 0 {
:= &binarylog.ServerTrailer{
Trailer: .s.Trailer(),
Err: ,
}
for , := range .binlogs {
.Log()
}
}
return
}
func ( *Server) ( transport.ServerTransport, *transport.Stream, *traceInfo) {
:= .Method()
if != "" && [0] == '/' {
= [1:]
}
:= strings.LastIndex(, "/")
if == -1 {
if != nil {
.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{}}, true)
.tr.SetError()
}
:= fmt.Sprintf("malformed method name: %q", .Method())
if := .WriteStatus(, status.New(codes.Unimplemented, )); != nil {
if != nil {
.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.tr.SetError()
}
channelz.Warningf(logger, .channelzID, "grpc: Server.handleStream failed to write status: %v", )
}
if != nil {
.tr.Finish()
}
return
}
:= [:]
:= [+1:]
, := .services[]
if {
if , := .methods[]; {
.processUnaryRPC(, , , , )
return
}
if , := .streams[]; {
.processStreamingRPC(, , , , )
return
}
}
if := .opts.unknownStreamDesc; != nil {
.processStreamingRPC(, , nil, , )
return
}
var string
if ! {
= fmt.Sprintf("unknown service %v", )
} else {
= fmt.Sprintf("unknown method %v for service %v", , )
}
if != nil {
.tr.LazyPrintf("%s", )
.tr.SetError()
}
if := .WriteStatus(, status.New(codes.Unimplemented, )); != nil {
if != nil {
.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.tr.SetError()
}
channelz.Warningf(logger, .channelzID, "grpc: Server.handleStream failed to write status: %v", )
}
if != nil {
.tr.Finish()
}
}
type streamKey struct{}
func ( context.Context, ServerTransportStream) context.Context {
return context.WithValue(, streamKey{}, )
}
type ServerTransportStream interface {
Method() string
SetHeader(md metadata.MD) error
SendHeader(md metadata.MD) error
SetTrailer(md metadata.MD) error
}
func ( context.Context) ServerTransportStream {
, := .Value(streamKey{}).(ServerTransportStream)
return
}
func ( *Server) () {
.quit.Fire()
defer func() {
.serveWG.Wait()
.done.Fire()
}()
.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(.channelzID) })
.mu.Lock()
:= .lis
.lis = nil
:= .conns
.conns = nil
.cv.Broadcast()
.mu.Unlock()
for := range {
.Close()
}
for , := range {
for := range {
.Close(errors.New("Server.Stop called"))
}
}
if .opts.numServerWorkers > 0 {
.stopServerWorkers()
}
.mu.Lock()
if .events != nil {
.events.Finish()
.events = nil
}
.mu.Unlock()
}
func ( *Server) () {
.quit.Fire()
defer .done.Fire()
.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(.channelzID) })
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return
}
for := range .lis {
.Close()
}
.lis = nil
if !.drain {
for , := range .conns {
for := range {
.Drain()
}
}
.drain = true
}
.mu.Unlock()
.serveWG.Wait()
.mu.Lock()
for len(.conns) != 0 {
.cv.Wait()
}
.conns = nil
if .events != nil {
.events.Finish()
.events = nil
}
.mu.Unlock()
}
func ( *Server) ( string) baseCodec {
if .opts.codec != nil {
return .opts.codec
}
if == "" {
return encoding.GetCodec(proto.Name)
}
:= encoding.GetCodec()
if == nil {
return encoding.GetCodec(proto.Name)
}
return
}
func ( context.Context, metadata.MD) error {
if .Len() == 0 {
return nil
}
:= ServerTransportStreamFromContext()
if == nil {
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
}
return .SetHeader()
}
func ( context.Context, metadata.MD) error {
:= ServerTransportStreamFromContext()
if == nil {
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
}
if := .SendHeader(); != nil {
return toRPCErr()
}
return nil
}
func ( context.Context, metadata.MD) error {
if .Len() == 0 {
return nil
}
:= ServerTransportStreamFromContext()
if == nil {
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
}
return .SetTrailer()
}
func ( context.Context) (string, bool) {
:= ServerTransportStreamFromContext()
if == nil {
return "", false
}
return .Method(), true
}
type channelzServer struct {
s *Server
}
func ( *channelzServer) () *channelz.ServerInternalMetric {
return .s.channelzMetric()
}