/*
 *
 * Copyright 2014 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package grpc

import (
	
	
	
	
	
	
	
	
	
	
	
	
	

	

	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
)

const (
	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
	defaultServerMaxSendMessageSize    = math.MaxInt32

	// Server transports are tracked in a map which is keyed on listener
	// address. For regular gRPC traffic, connections are accepted in Serve()
	// through a call to Accept(), and we use the actual listener address as key
	// when we add it to the map. But for connections received through
	// ServeHTTP(), we do not have a listener and hence use this dummy value.
	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)

func () {
	internal.GetServerCredentials = func( *Server) credentials.TransportCredentials {
		return .opts.creds
	}
	internal.DrainServerTransports = func( *Server,  string) {
		.drainServerTransports()
	}
	internal.AddGlobalServerOptions = func( ...ServerOption) {
		extraServerOptions = append(extraServerOptions, ...)
	}
	internal.ClearGlobalServerOptions = func() {
		extraServerOptions = nil
	}
	internal.BinaryLogger = binaryLogger
	internal.JoinServerOptions = newJoinServerOption
}

var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")

type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)

// MethodDesc represents an RPC service's method specification.
type MethodDesc struct {
	MethodName string
	Handler    methodHandler
}

// ServiceDesc represents an RPC service's specification.
type ServiceDesc struct {
	ServiceName string
	// The pointer to the service interface. Used to check whether the user
	// provided implementation satisfies the interface requirements.
	HandlerType interface{}
	Methods     []MethodDesc
	Streams     []StreamDesc
	Metadata    interface{}
}

// serviceInfo wraps information about a service. It is very similar to
// ServiceDesc and is constructed from it for internal purposes.
type serviceInfo struct {
	// Contains the implementation for the methods in this service.
	serviceImpl interface{}
	methods     map[string]*MethodDesc
	streams     map[string]*StreamDesc
	mdata       interface{}
}

type serverWorkerData struct {
	st     transport.ServerTransport
	wg     *sync.WaitGroup
	stream *transport.Stream
}

// Server is a gRPC server to serve RPC requests.
type Server struct {
	opts serverOptions

	mu  sync.Mutex // guards following
	lis map[net.Listener]bool
	// conns contains all active server transports. It is a map keyed on a
	// listener address with the value being the set of active transports
	// belonging to that listener.
	conns    map[string]map[transport.ServerTransport]bool
	serve    bool
	drain    bool
	cv       *sync.Cond              // signaled when connections close for GracefulStop
	services map[string]*serviceInfo // service name -> service info
	events   trace.EventLog

	quit               *grpcsync.Event
	done               *grpcsync.Event
	channelzRemoveOnce sync.Once
	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

	channelzID *channelz.Identifier
	czData     *channelzData

	serverWorkerChannels []chan *serverWorkerData
}

type serverOptions struct {
	creds                 credentials.TransportCredentials
	codec                 baseCodec
	cp                    Compressor
	dc                    Decompressor
	unaryInt              UnaryServerInterceptor
	streamInt             StreamServerInterceptor
	chainUnaryInts        []UnaryServerInterceptor
	chainStreamInts       []StreamServerInterceptor
	binaryLogger          binarylog.Logger
	inTapHandle           tap.ServerInHandle
	statsHandlers         []stats.Handler
	maxConcurrentStreams  uint32
	maxReceiveMessageSize int
	maxSendMessageSize    int
	unknownStreamDesc     *StreamDesc
	keepaliveParams       keepalive.ServerParameters
	keepalivePolicy       keepalive.EnforcementPolicy
	initialWindowSize     int32
	initialConnWindowSize int32
	writeBufferSize       int
	readBufferSize        int
	connectionTimeout     time.Duration
	maxHeaderListSize     *uint32
	headerTableSize       *uint32
	numServerWorkers      uint32
}

var defaultServerOptions = serverOptions{
	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
	maxSendMessageSize:    defaultServerMaxSendMessageSize,
	connectionTimeout:     120 * time.Second,
	writeBufferSize:       defaultWriteBufSize,
	readBufferSize:        defaultReadBufSize,
}
var extraServerOptions []ServerOption

// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface {
	apply(*serverOptions)
}

// EmptyServerOption does not alter the server configuration. It can be embedded
// in another structure to build custom server options.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type EmptyServerOption struct{}

func (EmptyServerOption) (*serverOptions) {}

// funcServerOption wraps a function that modifies serverOptions into an
// implementation of the ServerOption interface.
type funcServerOption struct {
	f func(*serverOptions)
}

func ( *funcServerOption) ( *serverOptions) {
	.f()
}

func ( func(*serverOptions)) *funcServerOption {
	return &funcServerOption{
		f: ,
	}
}

// joinServerOption provides a way to combine arbitrary number of server
// options into one.
type joinServerOption struct {
	opts []ServerOption
}

func ( *joinServerOption) ( *serverOptions) {
	for ,  := range .opts {
		.apply()
	}
}

func ( ...ServerOption) ServerOption {
	return &joinServerOption{opts: }
}

// WriteBufferSize determines how much data can be batched before doing a write
// on the wire. The corresponding memory allocation for this buffer will be
// twice the size to keep syscalls low. The default value for this buffer is
// 32KB. Zero or negative values will disable the write buffer such that each
// write will be on underlying connection.
// Note: A Send call may not directly translate to a write.
func ( int) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.writeBufferSize = 
	})
}

// ReadBufferSize lets you set the size of read buffer, this determines how much
// data can be read at most for one read syscall. The default value for this
// buffer is 32KB. Zero or negative values will disable read buffer for a
// connection so data framer can access the underlying conn directly.
func ( int) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.readBufferSize = 
	})
}

// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func ( int32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.initialWindowSize = 
	})
}

// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func ( int32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.initialConnWindowSize = 
	})
}

// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func ( keepalive.ServerParameters) ServerOption {
	if .Time > 0 && .Time < time.Second {
		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
		.Time = time.Second
	}

	return newFuncServerOption(func( *serverOptions) {
		.keepaliveParams = 
	})
}

// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
func ( keepalive.EnforcementPolicy) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.keepalivePolicy = 
	})
}

// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
//
// Deprecated: register codecs using encoding.RegisterCodec. The server will
// automatically use registered codecs based on the incoming requests' headers.
// See also
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
// Will be supported throughout 1.x.
func ( Codec) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.codec = 
	})
}

// ForceServerCodec returns a ServerOption that sets a codec for message
// marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered
// with RegisterCodec.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details. Also see the documentation on RegisterCodec and
// CallContentSubtype for more details on the interaction between encoding.Codec
// and content-subtype.
//
// This function is provided for advanced users; prefer to register codecs
// using encoding.RegisterCodec.
// The server will automatically use registered codecs based on the incoming
// requests' headers. See also
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
// Will be supported throughout 1.x.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( encoding.Codec) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.codec = 
	})
}

// RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages.  For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression.  By
// default, server messages will be sent using the same compressor with which
// request messages were sent.
//
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
// throughout 1.x.
func ( Compressor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.cp = 
	})
}

// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
// messages.  It has higher priority than decompressors registered via
// encoding.RegisterCompressor.
//
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
// throughout 1.x.
func ( Decompressor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.dc = 
	})
}

// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default limit.
//
// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
func ( int) ServerOption {
	return MaxRecvMsgSize()
}

// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default 4MB.
func ( int) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.maxReceiveMessageSize = 
	})
}

// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
// If this is not set, gRPC uses the default `math.MaxInt32`.
func ( int) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.maxSendMessageSize = 
	})
}

// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func ( uint32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.maxConcurrentStreams = 
	})
}

// Creds returns a ServerOption that sets credentials for server connections.
func ( credentials.TransportCredentials) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.creds = 
	})
}

// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func ( UnaryServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .unaryInt != nil {
			panic("The unary server interceptor was already set and may not be reset.")
		}
		.unaryInt = 
	})
}

// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
// for unary RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All unary interceptors added by this method will be chained.
func ( ...UnaryServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.chainUnaryInts = append(.chainUnaryInts, ...)
	})
}

// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func ( StreamServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .streamInt != nil {
			panic("The stream server interceptor was already set and may not be reset.")
		}
		.streamInt = 
	})
}

// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
// for streaming RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All stream interceptors added by this method will be chained.
func ( ...StreamServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.chainStreamInts = append(.chainStreamInts, ...)
	})
}

// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( tap.ServerInHandle) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .inTapHandle != nil {
			panic("The tap handle was already set and may not be reset.")
		}
		.inTapHandle = 
	})
}

// StatsHandler returns a ServerOption that sets the stats handler for the server.
func ( stats.Handler) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if  == nil {
			logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
			// Do not allow a nil stats handler, which would otherwise cause
			// panics.
			return
		}
		.statsHandlers = append(.statsHandlers, )
	})
}

// binaryLogger returns a ServerOption that can set the binary logger for the
// server.
func ( binarylog.Logger) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.binaryLogger = 
	})
}

// UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the "unimplemented" gRPC
// error whenever a request is received for an unregistered service or method.
// The handling function and stream interceptor (if set) have full access to
// the ServerStream, including its Context.
func ( StreamHandler) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.unknownStreamDesc = &StreamDesc{
			StreamName: "unknown_service_handler",
			Handler:    ,
			// We need to assume that the users of the streamHandler will want to use both.
			ClientStreams: true,
			ServerStreams: true,
		}
	})
}

// ConnectionTimeout returns a ServerOption that sets the timeout for
// connection establishment (up to and including HTTP/2 handshaking) for all
// new connections.  If this is not set, the default is 120 seconds.  A zero or
// negative value will result in an immediate timeout.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( time.Duration) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.connectionTimeout = 
	})
}

// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
// of header list that the server is prepared to accept.
func ( uint32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.maxHeaderListSize = &
	})
}

// HeaderTableSize returns a ServerOption that sets the size of dynamic
// header table for stream.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( uint32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.headerTableSize = &
	})
}

// NumStreamWorkers returns a ServerOption that sets the number of worker
// goroutines that should be used to process incoming streams. Setting this to
// zero (default) will disable workers and spawn a new goroutine for each
// stream.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( uint32) ServerOption {
	// TODO: If/when this API gets stabilized (i.e. stream workers become the
	// only way streams are processed), change the behavior of the zero value to
	// a sane default. Preliminary experiments suggest that a value equal to the
	// number of CPUs available is most performant; requires thorough testing.
	return newFuncServerOption(func( *serverOptions) {
		.numServerWorkers = 
	})
}

// serverWorkerResetThreshold defines how often the stack must be reset. Every
// N requests, by spawning a new goroutine in its place, a worker can reset its
// stack so that large stacks don't live in memory forever. 2^16 should allow
// each goroutine stack to live for at least a few seconds in a typical
// workload (assuming a QPS of a few thousand requests/sec).
const serverWorkerResetThreshold = 1 << 16

// serverWorkers blocks on a *transport.Stream channel forever and waits for
// data to be fed by serveStreams. This allows different requests to be
// processed by the same goroutine, removing the need for expensive stack
// re-allocations (see the runtime.morestack problem [1]).
//
// [1] https://github.com/golang/go/issues/18138
func ( *Server) ( chan *serverWorkerData) {
	// To make sure all server workers don't reset at the same time, choose a
	// random number of iterations before resetting.
	 := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
	for  := 0;  < ; ++ {
		,  := <-
		if ! {
			return
		}
		.handleStream(.st, .stream, .traceInfo(.st, .stream))
		.wg.Done()
	}
	go .()
}

// initServerWorkers creates worker goroutines and channels to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func ( *Server) () {
	.serverWorkerChannels = make([]chan *serverWorkerData, .opts.numServerWorkers)
	for  := uint32(0);  < .opts.numServerWorkers; ++ {
		.serverWorkerChannels[] = make(chan *serverWorkerData)
		go .serverWorker(.serverWorkerChannels[])
	}
}

func ( *Server) () {
	for  := uint32(0);  < .opts.numServerWorkers; ++ {
		close(.serverWorkerChannels[])
	}
}

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func ( ...ServerOption) *Server {
	 := defaultServerOptions
	for ,  := range extraServerOptions {
		.apply(&)
	}
	for ,  := range  {
		.apply(&)
	}
	 := &Server{
		lis:      make(map[net.Listener]bool),
		opts:     ,
		conns:    make(map[string]map[transport.ServerTransport]bool),
		services: make(map[string]*serviceInfo),
		quit:     grpcsync.NewEvent(),
		done:     grpcsync.NewEvent(),
		czData:   new(channelzData),
	}
	chainUnaryServerInterceptors()
	chainStreamServerInterceptors()
	.cv = sync.NewCond(&.mu)
	if EnableTracing {
		, , ,  := runtime.Caller(1)
		.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", , ))
	}

	if .opts.numServerWorkers > 0 {
		.initServerWorkers()
	}

	.channelzID = channelz.RegisterServer(&channelzServer{}, "")
	channelz.Info(logger, .channelzID, "Server created")
	return 
}

// printf records an event in s's event log, unless s has been stopped.
// REQUIRES s.mu is held.
func ( *Server) ( string,  ...interface{}) {
	if .events != nil {
		.events.Printf(, ...)
	}
}

// errorf records an error in s's event log, unless s has been stopped.
// REQUIRES s.mu is held.
func ( *Server) ( string,  ...interface{}) {
	if .events != nil {
		.events.Errorf(, ...)
	}
}

// ServiceRegistrar wraps a single method that supports service registration. It
// enables users to pass concrete types other than grpc.Server to the service
// registration methods exported by the IDL generated code.
type ServiceRegistrar interface {
	// RegisterService registers a service and its implementation to the
	// concrete type implementing this interface.  It may not be called
	// once the server has started serving.
	// desc describes the service and its methods and handlers. impl is the
	// service implementation which is passed to the method handlers.
	RegisterService(desc *ServiceDesc, impl interface{})
}

// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
// ensure it implements sd.HandlerType.
func ( *Server) ( *ServiceDesc,  interface{}) {
	if  != nil {
		 := reflect.TypeOf(.HandlerType).Elem()
		 := reflect.TypeOf()
		if !.Implements() {
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", , )
		}
	}
	.register(, )
}

func ( *Server) ( *ServiceDesc,  interface{}) {
	.mu.Lock()
	defer .mu.Unlock()
	.printf("RegisterService(%q)", .ServiceName)
	if .serve {
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", .ServiceName)
	}
	if ,  := .services[.ServiceName];  {
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", .ServiceName)
	}
	 := &serviceInfo{
		serviceImpl: ,
		methods:     make(map[string]*MethodDesc),
		streams:     make(map[string]*StreamDesc),
		mdata:       .Metadata,
	}
	for  := range .Methods {
		 := &.Methods[]
		.methods[.MethodName] = 
	}
	for  := range .Streams {
		 := &.Streams[]
		.streams[.StreamName] = 
	}
	.services[.ServiceName] = 
}

// MethodInfo contains the information of an RPC including its method name and type.
type MethodInfo struct {
	// Name is the method name only, without the service name or package name.
	Name string
	// IsClientStream indicates whether the RPC is a client streaming RPC.
	IsClientStream bool
	// IsServerStream indicates whether the RPC is a server streaming RPC.
	IsServerStream bool
}

// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
type ServiceInfo struct {
	Methods []MethodInfo
	// Metadata is the metadata specified in ServiceDesc when registering service.
	Metadata interface{}
}

// GetServiceInfo returns a map from service names to ServiceInfo.
// Service names include the package names, in the form of <package>.<service>.
func ( *Server) () map[string]ServiceInfo {
	 := make(map[string]ServiceInfo)
	for ,  := range .services {
		 := make([]MethodInfo, 0, len(.methods)+len(.streams))
		for  := range .methods {
			 = append(, MethodInfo{
				Name:           ,
				IsClientStream: false,
				IsServerStream: false,
			})
		}
		for ,  := range .streams {
			 = append(, MethodInfo{
				Name:           ,
				IsClientStream: .ClientStreams,
				IsServerStream: .ServerStreams,
			})
		}

		[] = ServiceInfo{
			Methods:  ,
			Metadata: .mdata,
		}
	}
	return 
}

// ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped.
var ErrServerStopped = errors.New("grpc: the server has been stopped")

type listenSocket struct {
	net.Listener
	channelzID *channelz.Identifier
}

func ( *listenSocket) () *channelz.SocketInternalMetric {
	return &channelz.SocketInternalMetric{
		SocketOptions: channelz.GetSocketOption(.Listener),
		LocalAddr:     .Listener.Addr(),
	}
}

func ( *listenSocket) () error {
	 := .Listener.Close()
	channelz.RemoveEntry(.channelzID)
	channelz.Info(logger, .channelzID, "ListenSocket deleted")
	return 
}

// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func ( *Server) ( net.Listener) error {
	.mu.Lock()
	.printf("serving")
	.serve = true
	if .lis == nil {
		// Serve called after Stop or GracefulStop.
		.mu.Unlock()
		.Close()
		return ErrServerStopped
	}

	.serveWG.Add(1)
	defer func() {
		.serveWG.Done()
		if .quit.HasFired() {
			// Stop or GracefulStop called; block until done and return nil.
			<-.done.Done()
		}
	}()

	 := &listenSocket{Listener: }
	.lis[] = true

	defer func() {
		.mu.Lock()
		if .lis != nil && .lis[] {
			.Close()
			delete(.lis, )
		}
		.mu.Unlock()
	}()

	var  error
	.channelzID,  = channelz.RegisterListenSocket(, .channelzID, .Addr().String())
	if  != nil {
		.mu.Unlock()
		return 
	}
	.mu.Unlock()
	channelz.Info(logger, .channelzID, "ListenSocket created")

	var  time.Duration // how long to sleep on accept failure
	for {
		,  := .Accept()
		if  != nil {
			if ,  := .(interface {
				() bool
			});  && .() {
				if  == 0 {
					 = 5 * time.Millisecond
				} else {
					 *= 2
				}
				if  := 1 * time.Second;  >  {
					 = 
				}
				.mu.Lock()
				.printf("Accept error: %v; retrying in %v", , )
				.mu.Unlock()
				 := time.NewTimer()
				select {
				case <-.C:
				case <-.quit.Done():
					.Stop()
					return nil
				}
				continue
			}
			.mu.Lock()
			.printf("done serving; Accept = %v", )
			.mu.Unlock()

			if .quit.HasFired() {
				return nil
			}
			return 
		}
		 = 0
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
		// loop goroutine.
		//
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
		// s.conns before this conn can be added.
		.serveWG.Add(1)
		go func() {
			.handleRawConn(.Addr().String(), )
			.serveWG.Done()
		}()
	}
}

// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func ( *Server) ( string,  net.Conn) {
	if .quit.HasFired() {
		.Close()
		return
	}
	.SetDeadline(time.Now().Add(.opts.connectionTimeout))

	// Finish handshaking (HTTP2)
	 := .newHTTP2Transport()
	.SetDeadline(time.Time{})
	if  == nil {
		return
	}

	if !.addConn(, ) {
		return
	}
	go func() {
		.serveStreams()
		.removeConn(, )
	}()
}

func ( *Server) ( string) {
	.mu.Lock()
	 := .conns[]
	for  := range  {
		.Drain()
	}
	.mu.Unlock()
}

// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func ( *Server) ( net.Conn) transport.ServerTransport {
	 := &transport.ServerConfig{
		MaxStreams:            .opts.maxConcurrentStreams,
		ConnectionTimeout:     .opts.connectionTimeout,
		Credentials:           .opts.creds,
		InTapHandle:           .opts.inTapHandle,
		StatsHandlers:         .opts.statsHandlers,
		KeepaliveParams:       .opts.keepaliveParams,
		KeepalivePolicy:       .opts.keepalivePolicy,
		InitialWindowSize:     .opts.initialWindowSize,
		InitialConnWindowSize: .opts.initialConnWindowSize,
		WriteBufferSize:       .opts.writeBufferSize,
		ReadBufferSize:        .opts.readBufferSize,
		ChannelzParentID:      .channelzID,
		MaxHeaderListSize:     .opts.maxHeaderListSize,
		HeaderTableSize:       .opts.headerTableSize,
	}
	,  := transport.NewServerTransport(, )
	if  != nil {
		.mu.Lock()
		.errorf("NewServerTransport(%q) failed: %v", .RemoteAddr(), )
		.mu.Unlock()
		// ErrConnDispatched means that the connection was dispatched away from
		// gRPC; those connections should be left open.
		if  != credentials.ErrConnDispatched {
			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
			if  != io.EOF {
				channelz.Info(logger, .channelzID, "grpc: Server.Serve failed to create ServerTransport: ", )
			}
			.Close()
		}
		return nil
	}

	return 
}

func ( *Server) ( transport.ServerTransport) {
	defer .Close(errors.New("finished serving streams for the server transport"))
	var  sync.WaitGroup

	var  uint32
	.HandleStreams(func( *transport.Stream) {
		.Add(1)
		if .opts.numServerWorkers > 0 {
			 := &serverWorkerData{st: , wg: &, stream: }
			select {
			case .serverWorkerChannels[atomic.AddUint32(&, 1)%.opts.numServerWorkers] <- :
			default:
				// If all stream workers are busy, fallback to the default code path.
				go func() {
					.handleStream(, , .traceInfo(, ))
					.Done()
				}()
			}
		} else {
			go func() {
				defer .Done()
				.handleStream(, , .traceInfo(, ))
			}()
		}
	}, func( context.Context,  string) context.Context {
		if !EnableTracing {
			return 
		}
		 := trace.New("grpc.Recv."+methodFamily(), )
		return trace.NewContext(, )
	})
	.Wait()
}

var _ http.Handler = (*Server)(nil)

// ServeHTTP implements the Go standard library's http.Handler
// interface by responding to the gRPC request r, by looking up
// the requested gRPC method in the gRPC server s.
//
// The provided HTTP request must have arrived on an HTTP/2
// connection. When using the Go standard library's server,
// practically this means that the Request must also have arrived
// over TLS.
//
// To share one port (such as 443 for https) between gRPC and an
// existing http.Handler, use a root http.Handler such as:
//
//	if r.ProtoMajor == 2 && strings.HasPrefix(
//		r.Header.Get("Content-Type"), "application/grpc") {
//		grpcServer.ServeHTTP(w, r)
//	} else {
//		yourMux.ServeHTTP(w, r)
//	}
//
// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
// separate from grpc-go's HTTP/2 server. Performance and features may vary
// between the two paths. ServeHTTP does not support some gRPC features
// available through grpc-go's HTTP/2 server.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( *Server) ( http.ResponseWriter,  *http.Request) {
	,  := transport.NewServerHandlerTransport(, , .opts.statsHandlers)
	if  != nil {
		// Errors returned from transport.NewServerHandlerTransport have
		// already been written to w.
		return
	}
	if !.addConn(listenerAddressForServeHTTP, ) {
		return
	}
	defer .removeConn(listenerAddressForServeHTTP, )
	.serveStreams()
}

// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func ( *Server) ( transport.ServerTransport,  *transport.Stream) ( *traceInfo) {
	if !EnableTracing {
		return nil
	}
	,  := trace.FromContext(.Context())
	if ! {
		return nil
	}

	 = &traceInfo{
		tr: ,
		firstLine: firstLine{
			client:     false,
			remoteAddr: .RemoteAddr(),
		},
	}
	if ,  := .Context().Deadline();  {
		.firstLine.deadline = time.Until()
	}
	return 
}

func ( *Server) ( string,  transport.ServerTransport) bool {
	.mu.Lock()
	defer .mu.Unlock()
	if .conns == nil {
		.Close(errors.New("Server.addConn called when server has already been stopped"))
		return false
	}
	if .drain {
		// Transport added after we drained our existing conns: drain it
		// immediately.
		.Drain()
	}

	if .conns[] == nil {
		// Create a map entry if this is the first connection on this listener.
		.conns[] = make(map[transport.ServerTransport]bool)
	}
	.conns[][] = true
	return true
}

func ( *Server) ( string,  transport.ServerTransport) {
	.mu.Lock()
	defer .mu.Unlock()

	 := .conns[]
	if  != nil {
		delete(, )
		if len() == 0 {
			// If the last connection for this address is being removed, also
			// remove the map entry corresponding to the address. This is used
			// in GracefulStop() when waiting for all connections to be closed.
			delete(.conns, )
		}
		.cv.Broadcast()
	}
}

func ( *Server) () *channelz.ServerInternalMetric {
	return &channelz.ServerInternalMetric{
		CallsStarted:             atomic.LoadInt64(&.czData.callsStarted),
		CallsSucceeded:           atomic.LoadInt64(&.czData.callsSucceeded),
		CallsFailed:              atomic.LoadInt64(&.czData.callsFailed),
		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastCallStartedTime)),
	}
}

func ( *Server) () {
	atomic.AddInt64(&.czData.callsStarted, 1)
	atomic.StoreInt64(&.czData.lastCallStartedTime, time.Now().UnixNano())
}

func ( *Server) () {
	atomic.AddInt64(&.czData.callsSucceeded, 1)
}

func ( *Server) () {
	atomic.AddInt64(&.czData.callsFailed, 1)
}

func ( *Server) ( transport.ServerTransport,  *transport.Stream,  interface{},  Compressor,  *transport.Options,  encoding.Compressor) error {
	,  := encode(.getCodec(.ContentSubtype()), )
	if  != nil {
		channelz.Error(logger, .channelzID, "grpc: server failed to encode response: ", )
		return 
	}
	,  := compress(, , )
	if  != nil {
		channelz.Error(logger, .channelzID, "grpc: server failed to compress response: ", )
		return 
	}
	,  := msgHeader(, )
	// TODO(dfawley): should we be checking len(data) instead?
	if len() > .opts.maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(), .opts.maxSendMessageSize)
	}
	 = .Write(, , , )
	if  == nil {
		for ,  := range .opts.statsHandlers {
			.HandleRPC(.Context(), outPayload(false, , , , time.Now()))
		}
	}
	return 
}

// chainUnaryServerInterceptors chains all unary server interceptors into one.
func ( *Server) {
	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
	// be executed before any other chained interceptors.
	 := .opts.chainUnaryInts
	if .opts.unaryInt != nil {
		 = append([]UnaryServerInterceptor{.opts.unaryInt}, .opts.chainUnaryInts...)
	}

	var  UnaryServerInterceptor
	if len() == 0 {
		 = nil
	} else if len() == 1 {
		 = [0]
	} else {
		 = chainUnaryInterceptors()
	}

	.opts.unaryInt = 
}

func ( []UnaryServerInterceptor) UnaryServerInterceptor {
	return func( context.Context,  interface{},  *UnaryServerInfo,  UnaryHandler) (interface{}, error) {
		return [0](, , , getChainUnaryHandler(, 0, , ))
	}
}

func ( []UnaryServerInterceptor,  int,  *UnaryServerInfo,  UnaryHandler) UnaryHandler {
	if  == len()-1 {
		return 
	}
	return func( context.Context,  interface{}) (interface{}, error) {
		return [+1](, , , (, +1, , ))
	}
}

func ( *Server) ( transport.ServerTransport,  *transport.Stream,  *serviceInfo,  *MethodDesc,  *traceInfo) ( error) {
	 := .opts.statsHandlers
	if len() != 0 ||  != nil || channelz.IsOn() {
		if channelz.IsOn() {
			.incrCallsStarted()
		}
		var  *stats.Begin
		for ,  := range  {
			 := time.Now()
			 = &stats.Begin{
				BeginTime:      ,
				IsClientStream: false,
				IsServerStream: false,
			}
			.HandleRPC(.Context(), )
		}
		if  != nil {
			.tr.LazyLog(&.firstLine, false)
		}
		// The deferred error handling for tracing, stats handler and channelz are
		// combined into one function to reduce stack usage -- a defer takes ~56-64
		// bytes on the stack, so overflowing the stack will require a stack
		// re-allocation, which is expensive.
		//
		// To maintain behavior similar to separate deferred statements, statements
		// should be executed in the reverse order. That is, tracing first, stats
		// handler second, and channelz last. Note that panics *within* defers will
		// lead to different behavior, but that's an acceptable compromise; that
		// would be undefined behavior territory anyway.
		defer func() {
			if  != nil {
				if  != nil &&  != io.EOF {
					.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
					.tr.SetError()
				}
				.tr.Finish()
			}

			for ,  := range  {
				 := &stats.End{
					BeginTime: .BeginTime,
					EndTime:   time.Now(),
				}
				if  != nil &&  != io.EOF {
					.Error = toRPCErr()
				}
				.HandleRPC(.Context(), )
			}

			if channelz.IsOn() {
				if  != nil &&  != io.EOF {
					.incrCallsFailed()
				} else {
					.incrCallsSucceeded()
				}
			}
		}()
	}
	var  []binarylog.MethodLogger
	if  := binarylog.GetMethodLogger(.Method());  != nil {
		 = append(, )
	}
	if .opts.binaryLogger != nil {
		if  := .opts.binaryLogger.GetMethodLogger(.Method());  != nil {
			 = append(, )
		}
	}
	if len() != 0 {
		 := .Context()
		,  := metadata.FromIncomingContext()
		 := &binarylog.ClientHeader{
			Header:     ,
			MethodName: .Method(),
			PeerAddr:   nil,
		}
		if ,  := .Deadline();  {
			.Timeout = time.Until()
			if .Timeout < 0 {
				.Timeout = 0
			}
		}
		if  := [":authority"]; len() > 0 {
			.Authority = [0]
		}
		if ,  := peer.FromContext();  {
			.PeerAddr = .Addr
		}
		for ,  := range  {
			.Log()
		}
	}

	// comp and cp are used for compression.  decomp and dc are used for
	// decompression.  If comp and decomp are both set, they are the same;
	// however they are kept separate to ensure that at most one of the
	// compressor/decompressor variable pairs are set for use later.
	var ,  encoding.Compressor
	var  Compressor
	var  Decompressor

	// If dc is set and matches the stream's compression, use it.  Otherwise, try
	// to find a matching registered compressor for decomp.
	if  := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() ==  {
		 = .opts.dc
	} else if  != "" &&  != encoding.Identity {
		 = encoding.GetCompressor()
		if  == nil {
			 := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
			.WriteStatus(, )
			return .Err()
		}
	}

	// If cp is set, use it.  Otherwise, attempt to compress the response using
	// the incoming message compression method.
	//
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
	if .opts.cp != nil {
		 = .opts.cp
		.SetSendCompress(.Type())
	} else if  := .RecvCompress();  != "" &&  != encoding.Identity {
		// Legacy compressor not specified; attempt to respond with same encoding.
		 = encoding.GetCompressor()
		if  != nil {
			.SetSendCompress()
		}
	}

	var  *payloadInfo
	if len() != 0 || len() != 0 {
		 = &payloadInfo{}
	}
	,  := recvAndDecompress(&parser{r: }, , , .opts.maxReceiveMessageSize, , )
	if  != nil {
		if  := .WriteStatus(, status.Convert());  != nil {
			channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
		}
		return 
	}
	if channelz.IsOn() {
		.IncrMsgRecv()
	}
	 := func( interface{}) error {
		if  := .getCodec(.ContentSubtype()).Unmarshal(, );  != nil {
			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", )
		}
		for ,  := range  {
			.HandleRPC(.Context(), &stats.InPayload{
				RecvTime:   time.Now(),
				Payload:    ,
				WireLength: .wireLength + headerLen,
				Data:       ,
				Length:     len(),
			})
		}
		if len() != 0 {
			 := &binarylog.ClientMessage{
				Message: ,
			}
			for ,  := range  {
				.Log()
			}
		}
		if  != nil {
			.tr.LazyLog(&payload{sent: false, msg: }, true)
		}
		return nil
	}
	 := NewContextWithServerTransportStream(.Context(), )
	,  := .Handler(.serviceImpl, , , .opts.unaryInt)
	if  != nil {
		,  := status.FromError()
		if ! {
			// Convert non-status application error to a status error with code
			// Unknown, but handle context errors specifically.
			 = status.FromContextError()
			 = .Err()
		}
		if  != nil {
			.tr.LazyLog(stringer(.Message()), true)
			.tr.SetError()
		}
		if  := .WriteStatus(, );  != nil {
			channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
		}
		if len() != 0 {
			if ,  := .Header(); .Len() > 0 {
				// Only log serverHeader if there was header. Otherwise it can
				// be trailer only.
				 := &binarylog.ServerHeader{
					Header: ,
				}
				for ,  := range  {
					.Log()
				}
			}
			 := &binarylog.ServerTrailer{
				Trailer: .Trailer(),
				Err:     ,
			}
			for ,  := range  {
				.Log()
			}
		}
		return 
	}
	if  != nil {
		.tr.LazyLog(stringer("OK"), false)
	}
	 := &transport.Options{Last: true}

	if  := .sendResponse(, , , , , );  != nil {
		if  == io.EOF {
			// The entire stream is done (for unary RPC only).
			return 
		}
		if ,  := status.FromError();  {
			if  := .WriteStatus(, );  != nil {
				channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
			}
		} else {
			switch st := .(type) {
			case transport.ConnectionError:
				// Nothing to do here.
			default:
				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", , ))
			}
		}
		if len() != 0 {
			,  := .Header()
			 := &binarylog.ServerHeader{
				Header: ,
			}
			 := &binarylog.ServerTrailer{
				Trailer: .Trailer(),
				Err:     ,
			}
			for ,  := range  {
				.Log()
				.Log()
			}
		}
		return 
	}
	if len() != 0 {
		,  := .Header()
		 := &binarylog.ServerHeader{
			Header: ,
		}
		 := &binarylog.ServerMessage{
			Message: ,
		}
		for ,  := range  {
			.Log()
			.Log()
		}
	}
	if channelz.IsOn() {
		.IncrMsgSent()
	}
	if  != nil {
		.tr.LazyLog(&payload{sent: true, msg: }, true)
	}
	// TODO: Should we be logging if writing status failed here, like above?
	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
	// error or allow the stats handler to see it?
	 = .WriteStatus(, statusOK)
	if len() != 0 {
		 := &binarylog.ServerTrailer{
			Trailer: .Trailer(),
			Err:     ,
		}
		for ,  := range  {
			.Log()
		}
	}
	return 
}

// chainStreamServerInterceptors chains all stream server interceptors into one.
func ( *Server) {
	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
	// be executed before any other chained interceptors.
	 := .opts.chainStreamInts
	if .opts.streamInt != nil {
		 = append([]StreamServerInterceptor{.opts.streamInt}, .opts.chainStreamInts...)
	}

	var  StreamServerInterceptor
	if len() == 0 {
		 = nil
	} else if len() == 1 {
		 = [0]
	} else {
		 = chainStreamInterceptors()
	}

	.opts.streamInt = 
}

func ( []StreamServerInterceptor) StreamServerInterceptor {
	return func( interface{},  ServerStream,  *StreamServerInfo,  StreamHandler) error {
		return [0](, , , getChainStreamHandler(, 0, , ))
	}
}

func ( []StreamServerInterceptor,  int,  *StreamServerInfo,  StreamHandler) StreamHandler {
	if  == len()-1 {
		return 
	}
	return func( interface{},  ServerStream) error {
		return [+1](, , , (, +1, , ))
	}
}

func ( *Server) ( transport.ServerTransport,  *transport.Stream,  *serviceInfo,  *StreamDesc,  *traceInfo) ( error) {
	if channelz.IsOn() {
		.incrCallsStarted()
	}
	 := .opts.statsHandlers
	var  *stats.Begin
	if len() != 0 {
		 := time.Now()
		 = &stats.Begin{
			BeginTime:      ,
			IsClientStream: .ClientStreams,
			IsServerStream: .ServerStreams,
		}
		for ,  := range  {
			.HandleRPC(.Context(), )
		}
	}
	 := NewContextWithServerTransportStream(.Context(), )
	 := &serverStream{
		ctx:                   ,
		t:                     ,
		s:                     ,
		p:                     &parser{r: },
		codec:                 .getCodec(.ContentSubtype()),
		maxReceiveMessageSize: .opts.maxReceiveMessageSize,
		maxSendMessageSize:    .opts.maxSendMessageSize,
		trInfo:                ,
		statsHandler:          ,
	}

	if len() != 0 ||  != nil || channelz.IsOn() {
		// See comment in processUnaryRPC on defers.
		defer func() {
			if  != nil {
				.mu.Lock()
				if  != nil &&  != io.EOF {
					.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
					.trInfo.tr.SetError()
				}
				.trInfo.tr.Finish()
				.trInfo.tr = nil
				.mu.Unlock()
			}

			if len() != 0 {
				 := &stats.End{
					BeginTime: .BeginTime,
					EndTime:   time.Now(),
				}
				if  != nil &&  != io.EOF {
					.Error = toRPCErr()
				}
				for ,  := range  {
					.HandleRPC(.Context(), )
				}
			}

			if channelz.IsOn() {
				if  != nil &&  != io.EOF {
					.incrCallsFailed()
				} else {
					.incrCallsSucceeded()
				}
			}
		}()
	}

	if  := binarylog.GetMethodLogger(.Method());  != nil {
		.binlogs = append(.binlogs, )
	}
	if .opts.binaryLogger != nil {
		if  := .opts.binaryLogger.GetMethodLogger(.Method());  != nil {
			.binlogs = append(.binlogs, )
		}
	}
	if len(.binlogs) != 0 {
		,  := metadata.FromIncomingContext()
		 := &binarylog.ClientHeader{
			Header:     ,
			MethodName: .Method(),
			PeerAddr:   nil,
		}
		if ,  := .Deadline();  {
			.Timeout = time.Until()
			if .Timeout < 0 {
				.Timeout = 0
			}
		}
		if  := [":authority"]; len() > 0 {
			.Authority = [0]
		}
		if ,  := peer.FromContext(.Context());  {
			.PeerAddr = .Addr
		}
		for ,  := range .binlogs {
			.Log()
		}
	}

	// If dc is set and matches the stream's compression, use it.  Otherwise, try
	// to find a matching registered compressor for decomp.
	if  := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() ==  {
		.dc = .opts.dc
	} else if  != "" &&  != encoding.Identity {
		.decomp = encoding.GetCompressor()
		if .decomp == nil {
			 := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
			.WriteStatus(.s, )
			return .Err()
		}
	}

	// If cp is set, use it.  Otherwise, attempt to compress the response using
	// the incoming message compression method.
	//
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
	if .opts.cp != nil {
		.cp = .opts.cp
		.SetSendCompress(.opts.cp.Type())
	} else if  := .RecvCompress();  != "" &&  != encoding.Identity {
		// Legacy compressor not specified; attempt to respond with same encoding.
		.comp = encoding.GetCompressor()
		if .comp != nil {
			.SetSendCompress()
		}
	}

	.ctx = newContextWithRPCInfo(.ctx, false, .codec, .cp, .comp)

	if  != nil {
		.tr.LazyLog(&.firstLine, false)
	}
	var  error
	var  interface{}
	if  != nil {
		 = .serviceImpl
	}
	if .opts.streamInt == nil {
		 = .Handler(, )
	} else {
		 := &StreamServerInfo{
			FullMethod:     .Method(),
			IsClientStream: .ClientStreams,
			IsServerStream: .ServerStreams,
		}
		 = .opts.streamInt(, , , .Handler)
	}
	if  != nil {
		,  := status.FromError()
		if ! {
			// Convert non-status application error to a status error with code
			// Unknown, but handle context errors specifically.
			 = status.FromContextError()
			 = .Err()
		}
		if  != nil {
			.mu.Lock()
			.trInfo.tr.LazyLog(stringer(.Message()), true)
			.trInfo.tr.SetError()
			.mu.Unlock()
		}
		.WriteStatus(.s, )
		if len(.binlogs) != 0 {
			 := &binarylog.ServerTrailer{
				Trailer: .s.Trailer(),
				Err:     ,
			}
			for ,  := range .binlogs {
				.Log()
			}
		}
		// TODO: Should we log an error from WriteStatus here and below?
		return 
	}
	if  != nil {
		.mu.Lock()
		.trInfo.tr.LazyLog(stringer("OK"), false)
		.mu.Unlock()
	}
	 = .WriteStatus(.s, statusOK)
	if len(.binlogs) != 0 {
		 := &binarylog.ServerTrailer{
			Trailer: .s.Trailer(),
			Err:     ,
		}
		for ,  := range .binlogs {
			.Log()
		}
	}
	return 
}

func ( *Server) ( transport.ServerTransport,  *transport.Stream,  *traceInfo) {
	 := .Method()
	if  != "" && [0] == '/' {
		 = [1:]
	}
	 := strings.LastIndex(, "/")
	if  == -1 {
		if  != nil {
			.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{}}, true)
			.tr.SetError()
		}
		 := fmt.Sprintf("malformed method name: %q", .Method())
		if  := .WriteStatus(, status.New(codes.Unimplemented, ));  != nil {
			if  != nil {
				.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
				.tr.SetError()
			}
			channelz.Warningf(logger, .channelzID, "grpc: Server.handleStream failed to write status: %v", )
		}
		if  != nil {
			.tr.Finish()
		}
		return
	}
	 := [:]
	 := [+1:]

	,  := .services[]
	if  {
		if ,  := .methods[];  {
			.processUnaryRPC(, , , , )
			return
		}
		if ,  := .streams[];  {
			.processStreamingRPC(, , , , )
			return
		}
	}
	// Unknown service, or known server unknown method.
	if  := .opts.unknownStreamDesc;  != nil {
		.processStreamingRPC(, , nil, , )
		return
	}
	var  string
	if ! {
		 = fmt.Sprintf("unknown service %v", )
	} else {
		 = fmt.Sprintf("unknown method %v for service %v", , )
	}
	if  != nil {
		.tr.LazyPrintf("%s", )
		.tr.SetError()
	}
	if  := .WriteStatus(, status.New(codes.Unimplemented, ));  != nil {
		if  != nil {
			.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
			.tr.SetError()
		}
		channelz.Warningf(logger, .channelzID, "grpc: Server.handleStream failed to write status: %v", )
	}
	if  != nil {
		.tr.Finish()
	}
}

// The key to save ServerTransportStream in the context.
type streamKey struct{}

// NewContextWithServerTransportStream creates a new context from ctx and
// attaches stream to it.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( context.Context,  ServerTransportStream) context.Context {
	return context.WithValue(, streamKey{}, )
}

// ServerTransportStream is a minimal interface that a transport stream must
// implement. This can be used to mock an actual transport stream for tests of
// handler code that use, for example, grpc.SetHeader (which requires some
// stream to be in context).
//
// See also NewContextWithServerTransportStream.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type ServerTransportStream interface {
	Method() string
	SetHeader(md metadata.MD) error
	SendHeader(md metadata.MD) error
	SetTrailer(md metadata.MD) error
}

// ServerTransportStreamFromContext returns the ServerTransportStream saved in
// ctx. Returns nil if the given context has no stream associated with it
// (which implies it is not an RPC invocation context).
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( context.Context) ServerTransportStream {
	,  := .Value(streamKey{}).(ServerTransportStream)
	return 
}

// Stop stops the gRPC server. It immediately closes all open
// connections and listeners.
// It cancels all active RPCs on the server side and the corresponding
// pending RPCs on the client side will get notified by connection
// errors.
func ( *Server) () {
	.quit.Fire()

	defer func() {
		.serveWG.Wait()
		.done.Fire()
	}()

	.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(.channelzID) })

	.mu.Lock()
	 := .lis
	.lis = nil
	 := .conns
	.conns = nil
	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
	.cv.Broadcast()
	.mu.Unlock()

	for  := range  {
		.Close()
	}
	for ,  := range  {
		for  := range  {
			.Close(errors.New("Server.Stop called"))
		}
	}
	if .opts.numServerWorkers > 0 {
		.stopServerWorkers()
	}

	.mu.Lock()
	if .events != nil {
		.events.Finish()
		.events = nil
	}
	.mu.Unlock()
}

// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func ( *Server) () {
	.quit.Fire()
	defer .done.Fire()

	.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(.channelzID) })
	.mu.Lock()
	if .conns == nil {
		.mu.Unlock()
		return
	}

	for  := range .lis {
		.Close()
	}
	.lis = nil
	if !.drain {
		for ,  := range .conns {
			for  := range  {
				.Drain()
			}
		}
		.drain = true
	}

	// Wait for serving threads to be ready to exit.  Only then can we be sure no
	// new conns will be created.
	.mu.Unlock()
	.serveWG.Wait()
	.mu.Lock()

	for len(.conns) != 0 {
		.cv.Wait()
	}
	.conns = nil
	if .events != nil {
		.events.Finish()
		.events = nil
	}
	.mu.Unlock()
}

// contentSubtype must be lowercase
// cannot return nil
func ( *Server) ( string) baseCodec {
	if .opts.codec != nil {
		return .opts.codec
	}
	if  == "" {
		return encoding.GetCodec(proto.Name)
	}
	 := encoding.GetCodec()
	if  == nil {
		return encoding.GetCodec(proto.Name)
	}
	return 
}

// SetHeader sets the header metadata to be sent from the server to the client.
// The context provided must be the context passed to the server's handler.
//
// Streaming RPCs should prefer the SetHeader method of the ServerStream.
//
// When called multiple times, all the provided metadata will be merged.  All
// the metadata will be sent out when one of the following happens:
//
//   - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
//   - The first response message is sent.  For unary handlers, this occurs when
//     the handler returns; for streaming handlers, this can happen when stream's
//     SendMsg method is called.
//   - An RPC status is sent out (error or success).  This occurs when the handler
//     returns.
//
// SetHeader will fail if called after any of the events above.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func ( context.Context,  metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	return .SetHeader()
}

// SendHeader sends header metadata. It may be called at most once, and may not
// be called after any event that causes headers to be sent (see SetHeader for
// a complete list).  The provided md and headers set by SetHeader() will be
// sent.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func ( context.Context,  metadata.MD) error {
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	if  := .SendHeader();  != nil {
		return toRPCErr()
	}
	return nil
}

// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func ( context.Context,  metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	return .SetTrailer()
}

// Method returns the method string for the server context.  The returned
// string is in the format of "/service/method".
func ( context.Context) (string, bool) {
	 := ServerTransportStreamFromContext()
	if  == nil {
		return "", false
	}
	return .Method(), true
}

type channelzServer struct {
	s *Server
}

func ( *channelzServer) () *channelz.ServerInternalMetric {
	return .s.channelzMetric()
}