/*
 *
 * Copyright 2014 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package grpc

import (
	
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	estats 
	
	
	
	
	
	
	istats 
	
	
	
	
	
	
	
	
)

const (
	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
	defaultServerMaxSendMessageSize    = math.MaxInt32

	// Server transports are tracked in a map which is keyed on listener
	// address. For regular gRPC traffic, connections are accepted in Serve()
	// through a call to Accept(), and we use the actual listener address as key
	// when we add it to the map. But for connections received through
	// ServeHTTP(), we do not have a listener and hence use this dummy value.
	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)

func () {
	internal.GetServerCredentials = func( *Server) credentials.TransportCredentials {
		return .opts.creds
	}
	internal.IsRegisteredMethod = func( *Server,  string) bool {
		return .isRegisteredMethod()
	}
	internal.ServerFromContext = serverFromContext
	internal.AddGlobalServerOptions = func( ...ServerOption) {
		globalServerOptions = append(globalServerOptions, ...)
	}
	internal.ClearGlobalServerOptions = func() {
		globalServerOptions = nil
	}
	internal.BinaryLogger = binaryLogger
	internal.JoinServerOptions = newJoinServerOption
	internal.BufferPool = bufferPool
	internal.MetricsRecorderForServer = func( *Server) estats.MetricsRecorder {
		return istats.NewMetricsRecorderList(.opts.statsHandlers)
	}
}

var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")

// MethodHandler is a function type that processes a unary RPC method call.
type MethodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)

// MethodDesc represents an RPC service's method specification.
type MethodDesc struct {
	MethodName string
	Handler    MethodHandler
}

// ServiceDesc represents an RPC service's specification.
type ServiceDesc struct {
	ServiceName string
	// The pointer to the service interface. Used to check whether the user
	// provided implementation satisfies the interface requirements.
	HandlerType any
	Methods     []MethodDesc
	Streams     []StreamDesc
	Metadata    any
}

// serviceInfo wraps information about a service. It is very similar to
// ServiceDesc and is constructed from it for internal purposes.
type serviceInfo struct {
	// Contains the implementation for the methods in this service.
	serviceImpl any
	methods     map[string]*MethodDesc
	streams     map[string]*StreamDesc
	mdata       any
}

// Server is a gRPC server to serve RPC requests.
type Server struct {
	opts serverOptions

	mu  sync.Mutex // guards following
	lis map[net.Listener]bool
	// conns contains all active server transports. It is a map keyed on a
	// listener address with the value being the set of active transports
	// belonging to that listener.
	conns    map[string]map[transport.ServerTransport]bool
	serve    bool
	drain    bool
	cv       *sync.Cond              // signaled when connections close for GracefulStop
	services map[string]*serviceInfo // service name -> service info
	events   traceEventLog

	quit               *grpcsync.Event
	done               *grpcsync.Event
	channelzRemoveOnce sync.Once
	serveWG            sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
	handlersWG         sync.WaitGroup // counts active method handler goroutines

	channelz *channelz.Server

	serverWorkerChannel      chan func()
	serverWorkerChannelClose func()
}

type serverOptions struct {
	creds                 credentials.TransportCredentials
	codec                 baseCodec
	cp                    Compressor
	dc                    Decompressor
	unaryInt              UnaryServerInterceptor
	streamInt             StreamServerInterceptor
	chainUnaryInts        []UnaryServerInterceptor
	chainStreamInts       []StreamServerInterceptor
	binaryLogger          binarylog.Logger
	inTapHandle           tap.ServerInHandle
	statsHandlers         []stats.Handler
	maxConcurrentStreams  uint32
	maxReceiveMessageSize int
	maxSendMessageSize    int
	unknownStreamDesc     *StreamDesc
	keepaliveParams       keepalive.ServerParameters
	keepalivePolicy       keepalive.EnforcementPolicy
	initialWindowSize     int32
	initialConnWindowSize int32
	writeBufferSize       int
	readBufferSize        int
	sharedWriteBuffer     bool
	connectionTimeout     time.Duration
	maxHeaderListSize     *uint32
	headerTableSize       *uint32
	numServerWorkers      uint32
	bufferPool            mem.BufferPool
	waitForHandlers       bool
	staticWindowSize      bool
}

var defaultServerOptions = serverOptions{
	maxConcurrentStreams:  math.MaxUint32,
	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
	maxSendMessageSize:    defaultServerMaxSendMessageSize,
	connectionTimeout:     120 * time.Second,
	writeBufferSize:       defaultWriteBufSize,
	readBufferSize:        defaultReadBufSize,
	bufferPool:            mem.DefaultBufferPool(),
}
var globalServerOptions []ServerOption

// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface {
	apply(*serverOptions)
}

// EmptyServerOption does not alter the server configuration. It can be embedded
// in another structure to build custom server options.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type EmptyServerOption struct{}

func (EmptyServerOption) (*serverOptions) {}

// funcServerOption wraps a function that modifies serverOptions into an
// implementation of the ServerOption interface.
type funcServerOption struct {
	f func(*serverOptions)
}

func ( *funcServerOption) ( *serverOptions) {
	.f()
}

func ( func(*serverOptions)) *funcServerOption {
	return &funcServerOption{
		f: ,
	}
}

// joinServerOption provides a way to combine arbitrary number of server
// options into one.
type joinServerOption struct {
	opts []ServerOption
}

func ( *joinServerOption) ( *serverOptions) {
	for ,  := range .opts {
		.apply()
	}
}

func ( ...ServerOption) ServerOption {
	return &joinServerOption{opts: }
}

// SharedWriteBuffer allows reusing per-connection transport write buffer.
// If this option is set to true every connection will release the buffer after
// flushing the data on the wire.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( bool) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.sharedWriteBuffer = 
	})
}

// WriteBufferSize determines how much data can be batched before doing a write
// on the wire. The default value for this buffer is 32KB. Zero or negative
// values will disable the write buffer such that each write will be on underlying
// connection. Note: A Send call may not directly translate to a write.
func ( int) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.writeBufferSize = 
	})
}

// ReadBufferSize lets you set the size of read buffer, this determines how much
// data can be read at most for one read syscall. The default value for this
// buffer is 32KB. Zero or negative values will disable read buffer for a
// connection so data framer can access the underlying conn directly.
func ( int) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.readBufferSize = 
	})
}

// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func ( int32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.initialWindowSize = 
		.staticWindowSize = true
	})
}

// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func ( int32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.initialConnWindowSize = 
		.staticWindowSize = true
	})
}

// StaticStreamWindowSize returns a ServerOption to set the initial stream
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.
func ( int32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.initialWindowSize = 
		.staticWindowSize = true
	})
}

// StaticConnWindowSize returns a ServerOption to set the initial connection
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.
func ( int32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.initialConnWindowSize = 
		.staticWindowSize = true
	})
}

// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func ( keepalive.ServerParameters) ServerOption {
	if .Time > 0 && .Time < internal.KeepaliveMinServerPingTime {
		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
		.Time = internal.KeepaliveMinServerPingTime
	}

	return newFuncServerOption(func( *serverOptions) {
		.keepaliveParams = 
	})
}

// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
func ( keepalive.EnforcementPolicy) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.keepalivePolicy = 
	})
}

// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
//
// Deprecated: register codecs using encoding.RegisterCodec. The server will
// automatically use registered codecs based on the incoming requests' headers.
// See also
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
// Will be supported throughout 1.x.
func ( Codec) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.codec = newCodecV0Bridge()
	})
}

// ForceServerCodec returns a ServerOption that sets a codec for message
// marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered
// with RegisterCodec.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details. Also see the documentation on RegisterCodec and
// CallContentSubtype for more details on the interaction between encoding.Codec
// and content-subtype.
//
// This function is provided for advanced users; prefer to register codecs
// using encoding.RegisterCodec.
// The server will automatically use registered codecs based on the incoming
// requests' headers. See also
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
// Will be supported throughout 1.x.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( encoding.Codec) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.codec = newCodecV1Bridge()
	})
}

// ForceServerCodecV2 is the equivalent of ForceServerCodec, but for the new
// CodecV2 interface.
//
// Will be supported throughout 1.x.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( encoding.CodecV2) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.codec = 
	})
}

// RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages.  For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression.  By
// default, server messages will be sent using the same compressor with which
// request messages were sent.
//
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
// throughout 1.x.
func ( Compressor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.cp = 
	})
}

// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
// messages.  It has higher priority than decompressors registered via
// encoding.RegisterCompressor.
//
// Deprecated: use encoding.RegisterCompressor instead. Will be supported
// throughout 1.x.
func ( Decompressor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.dc = 
	})
}

// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default limit.
//
// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
func ( int) ServerOption {
	return MaxRecvMsgSize()
}

// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default 4MB.
func ( int) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.maxReceiveMessageSize = 
	})
}

// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
// If this is not set, gRPC uses the default `math.MaxInt32`.
func ( int) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.maxSendMessageSize = 
	})
}

// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func ( uint32) ServerOption {
	if  == 0 {
		 = math.MaxUint32
	}
	return newFuncServerOption(func( *serverOptions) {
		.maxConcurrentStreams = 
	})
}

// Creds returns a ServerOption that sets credentials for server connections.
func ( credentials.TransportCredentials) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.creds = 
	})
}

// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func ( UnaryServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .unaryInt != nil {
			panic("The unary server interceptor was already set and may not be reset.")
		}
		.unaryInt = 
	})
}

// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
// for unary RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All unary interceptors added by this method will be chained.
func ( ...UnaryServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.chainUnaryInts = append(.chainUnaryInts, ...)
	})
}

// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func ( StreamServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .streamInt != nil {
			panic("The stream server interceptor was already set and may not be reset.")
		}
		.streamInt = 
	})
}

// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
// for streaming RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All stream interceptors added by this method will be chained.
func ( ...StreamServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.chainStreamInts = append(.chainStreamInts, ...)
	})
}

// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( tap.ServerInHandle) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .inTapHandle != nil {
			panic("The tap handle was already set and may not be reset.")
		}
		.inTapHandle = 
	})
}

// StatsHandler returns a ServerOption that sets the stats handler for the server.
func ( stats.Handler) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if  == nil {
			logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
			// Do not allow a nil stats handler, which would otherwise cause
			// panics.
			return
		}
		.statsHandlers = append(.statsHandlers, )
	})
}

// binaryLogger returns a ServerOption that can set the binary logger for the
// server.
func ( binarylog.Logger) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.binaryLogger = 
	})
}

// UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service
// handler that will be invoked instead of returning the "unimplemented" gRPC
// error whenever a request is received for an unregistered service or method.
// The handling function and stream interceptor (if set) have full access to
// the ServerStream, including its Context.
func ( StreamHandler) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.unknownStreamDesc = &StreamDesc{
			StreamName: "unknown_service_handler",
			Handler:    ,
			// We need to assume that the users of the streamHandler will want to use both.
			ClientStreams: true,
			ServerStreams: true,
		}
	})
}

// ConnectionTimeout returns a ServerOption that sets the timeout for
// connection establishment (up to and including HTTP/2 handshaking) for all
// new connections.  If this is not set, the default is 120 seconds.  A zero or
// negative value will result in an immediate timeout.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( time.Duration) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.connectionTimeout = 
	})
}

// MaxHeaderListSizeServerOption is a ServerOption that sets the max
// (uncompressed) size of header list that the server is prepared to accept.
type MaxHeaderListSizeServerOption struct {
	MaxHeaderListSize uint32
}

func ( MaxHeaderListSizeServerOption) ( *serverOptions) {
	.maxHeaderListSize = &.MaxHeaderListSize
}

// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
// of header list that the server is prepared to accept.
func ( uint32) ServerOption {
	return MaxHeaderListSizeServerOption{
		MaxHeaderListSize: ,
	}
}

// HeaderTableSize returns a ServerOption that sets the size of dynamic
// header table for stream.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( uint32) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.headerTableSize = &
	})
}

// NumStreamWorkers returns a ServerOption that sets the number of worker
// goroutines that should be used to process incoming streams. Setting this to
// zero (default) will disable workers and spawn a new goroutine for each
// stream.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( uint32) ServerOption {
	// TODO: If/when this API gets stabilized (i.e. stream workers become the
	// only way streams are processed), change the behavior of the zero value to
	// a sane default. Preliminary experiments suggest that a value equal to the
	// number of CPUs available is most performant; requires thorough testing.
	return newFuncServerOption(func( *serverOptions) {
		.numServerWorkers = 
	})
}

// WaitForHandlers cause Stop to wait until all outstanding method handlers have
// exited before returning.  If false, Stop will return as soon as all
// connections have closed, but method handlers may still be running. By
// default, Stop does not wait for method handlers to return.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( bool) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.waitForHandlers = 
	})
}

func ( mem.BufferPool) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.bufferPool = 
	})
}

// serverWorkerResetThreshold defines how often the stack must be reset. Every
// N requests, by spawning a new goroutine in its place, a worker can reset its
// stack so that large stacks don't live in memory forever. 2^16 should allow
// each goroutine stack to live for at least a few seconds in a typical
// workload (assuming a QPS of a few thousand requests/sec).
const serverWorkerResetThreshold = 1 << 16

// serverWorker blocks on a *transport.ServerStream channel forever and waits
// for data to be fed by serveStreams. This allows multiple requests to be
// processed by the same goroutine, removing the need for expensive stack
// re-allocations (see the runtime.morestack problem [1]).
//
// [1] https://github.com/golang/go/issues/18138
func ( *Server) () {
	for  := 0;  < serverWorkerResetThreshold; ++ {
		,  := <-.serverWorkerChannel
		if ! {
			return
		}
		()
	}
	go .()
}

// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func ( *Server) () {
	.serverWorkerChannel = make(chan func())
	.serverWorkerChannelClose = sync.OnceFunc(func() {
		close(.serverWorkerChannel)
	})
	for  := uint32(0);  < .opts.numServerWorkers; ++ {
		go .serverWorker()
	}
}

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func ( ...ServerOption) *Server {
	 := defaultServerOptions
	for ,  := range globalServerOptions {
		.apply(&)
	}
	for ,  := range  {
		.apply(&)
	}
	 := &Server{
		lis:      make(map[net.Listener]bool),
		opts:     ,
		conns:    make(map[string]map[transport.ServerTransport]bool),
		services: make(map[string]*serviceInfo),
		quit:     grpcsync.NewEvent(),
		done:     grpcsync.NewEvent(),
		channelz: channelz.RegisterServer(""),
	}
	chainUnaryServerInterceptors()
	chainStreamServerInterceptors()
	.cv = sync.NewCond(&.mu)
	if EnableTracing {
		, , ,  := runtime.Caller(1)
		.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", , ))
	}

	if .opts.numServerWorkers > 0 {
		.initServerWorkers()
	}

	channelz.Info(logger, .channelz, "Server created")
	return 
}

// printf records an event in s's event log, unless s has been stopped.
// REQUIRES s.mu is held.
func ( *Server) ( string,  ...any) {
	if .events != nil {
		.events.Printf(, ...)
	}
}

// errorf records an error in s's event log, unless s has been stopped.
// REQUIRES s.mu is held.
func ( *Server) ( string,  ...any) {
	if .events != nil {
		.events.Errorf(, ...)
	}
}

// ServiceRegistrar wraps a single method that supports service registration. It
// enables users to pass concrete types other than grpc.Server to the service
// registration methods exported by the IDL generated code.
type ServiceRegistrar interface {
	// RegisterService registers a service and its implementation to the
	// concrete type implementing this interface.  It may not be called
	// once the server has started serving.
	// desc describes the service and its methods and handlers. impl is the
	// service implementation which is passed to the method handlers.
	RegisterService(desc *ServiceDesc, impl any)
}

// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
// ensure it implements sd.HandlerType.
func ( *Server) ( *ServiceDesc,  any) {
	if  != nil {
		 := reflect.TypeOf(.HandlerType).Elem()
		 := reflect.TypeOf()
		if !.Implements() {
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", , )
		}
	}
	.register(, )
}

func ( *Server) ( *ServiceDesc,  any) {
	.mu.Lock()
	defer .mu.Unlock()
	.printf("RegisterService(%q)", .ServiceName)
	if .serve {
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", .ServiceName)
	}
	if ,  := .services[.ServiceName];  {
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", .ServiceName)
	}
	 := &serviceInfo{
		serviceImpl: ,
		methods:     make(map[string]*MethodDesc),
		streams:     make(map[string]*StreamDesc),
		mdata:       .Metadata,
	}
	for  := range .Methods {
		 := &.Methods[]
		.methods[.MethodName] = 
	}
	for  := range .Streams {
		 := &.Streams[]
		.streams[.StreamName] = 
	}
	.services[.ServiceName] = 
}

// MethodInfo contains the information of an RPC including its method name and type.
type MethodInfo struct {
	// Name is the method name only, without the service name or package name.
	Name string
	// IsClientStream indicates whether the RPC is a client streaming RPC.
	IsClientStream bool
	// IsServerStream indicates whether the RPC is a server streaming RPC.
	IsServerStream bool
}

// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
type ServiceInfo struct {
	Methods []MethodInfo
	// Metadata is the metadata specified in ServiceDesc when registering service.
	Metadata any
}

// GetServiceInfo returns a map from service names to ServiceInfo.
// Service names include the package names, in the form of <package>.<service>.
func ( *Server) () map[string]ServiceInfo {
	 := make(map[string]ServiceInfo)
	for ,  := range .services {
		 := make([]MethodInfo, 0, len(.methods)+len(.streams))
		for  := range .methods {
			 = append(, MethodInfo{
				Name:           ,
				IsClientStream: false,
				IsServerStream: false,
			})
		}
		for ,  := range .streams {
			 = append(, MethodInfo{
				Name:           ,
				IsClientStream: .ClientStreams,
				IsServerStream: .ServerStreams,
			})
		}

		[] = ServiceInfo{
			Methods:  ,
			Metadata: .mdata,
		}
	}
	return 
}

// ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped.
var ErrServerStopped = errors.New("grpc: the server has been stopped")

type listenSocket struct {
	net.Listener
	channelz *channelz.Socket
}

func ( *listenSocket) () error {
	 := .Listener.Close()
	channelz.RemoveEntry(.channelz.ID)
	channelz.Info(logger, .channelz, "ListenSocket deleted")
	return 
}

// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
//
// Note: All supported releases of Go (as of December 2023) override the OS
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
// with OS defaults for keepalive time and interval, callers need to do the
// following two things:
//   - pass a net.Listener created by calling the Listen method on a
//     net.ListenConfig with the `KeepAlive` field set to a negative value. This
//     will result in the Go standard library not overriding OS defaults for TCP
//     keepalive interval and time. But this will also result in the Go standard
//     library not enabling TCP keepalives by default.
//   - override the Accept method on the passed in net.Listener and set the
//     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
func ( *Server) ( net.Listener) error {
	.mu.Lock()
	.printf("serving")
	.serve = true
	if .lis == nil {
		// Serve called after Stop or GracefulStop.
		.mu.Unlock()
		.Close()
		return ErrServerStopped
	}

	.serveWG.Add(1)
	defer func() {
		.serveWG.Done()
		if .quit.HasFired() {
			// Stop or GracefulStop called; block until done and return nil.
			<-.done.Done()
		}
	}()

	 := &listenSocket{
		Listener: ,
		channelz: channelz.RegisterSocket(&channelz.Socket{
			SocketType:    channelz.SocketTypeListen,
			Parent:        .channelz,
			RefName:       .Addr().String(),
			LocalAddr:     .Addr(),
			SocketOptions: channelz.GetSocketOption()},
		),
	}
	.lis[] = true

	defer func() {
		.mu.Lock()
		if .lis != nil && .lis[] {
			.Close()
			delete(.lis, )
		}
		.mu.Unlock()
	}()

	.mu.Unlock()
	channelz.Info(logger, .channelz, "ListenSocket created")

	var  time.Duration // how long to sleep on accept failure
	for {
		,  := .Accept()
		if  != nil {
			if ,  := .(interface {
				() bool
			});  && .() {
				if  == 0 {
					 = 5 * time.Millisecond
				} else {
					 *= 2
				}
				if  := 1 * time.Second;  >  {
					 = 
				}
				.mu.Lock()
				.printf("Accept error: %v; retrying in %v", , )
				.mu.Unlock()
				 := time.NewTimer()
				select {
				case <-.C:
				case <-.quit.Done():
					.Stop()
					return nil
				}
				continue
			}
			.mu.Lock()
			.printf("done serving; Accept = %v", )
			.mu.Unlock()

			if .quit.HasFired() {
				return nil
			}
			return 
		}
		 = 0
		// Start a new goroutine to deal with rawConn so we don't stall this Accept
		// loop goroutine.
		//
		// Make sure we account for the goroutine so GracefulStop doesn't nil out
		// s.conns before this conn can be added.
		.serveWG.Add(1)
		go func() {
			.handleRawConn(.Addr().String(), )
			.serveWG.Done()
		}()
	}
}

// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func ( *Server) ( string,  net.Conn) {
	if .quit.HasFired() {
		.Close()
		return
	}
	.SetDeadline(time.Now().Add(.opts.connectionTimeout))

	// Finish handshaking (HTTP2)
	 := .newHTTP2Transport()
	.SetDeadline(time.Time{})
	if  == nil {
		return
	}

	if ,  := .(interface {
		(transport.ServerTransport)
	});  {
		.()
	}

	if !.addConn(, ) {
		return
	}
	go func() {
		.serveStreams(context.Background(), , )
		.removeConn(, )
	}()
}

// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func ( *Server) ( net.Conn) transport.ServerTransport {
	 := &transport.ServerConfig{
		MaxStreams:            .opts.maxConcurrentStreams,
		ConnectionTimeout:     .opts.connectionTimeout,
		Credentials:           .opts.creds,
		InTapHandle:           .opts.inTapHandle,
		StatsHandlers:         .opts.statsHandlers,
		KeepaliveParams:       .opts.keepaliveParams,
		KeepalivePolicy:       .opts.keepalivePolicy,
		InitialWindowSize:     .opts.initialWindowSize,
		InitialConnWindowSize: .opts.initialConnWindowSize,
		WriteBufferSize:       .opts.writeBufferSize,
		ReadBufferSize:        .opts.readBufferSize,
		SharedWriteBuffer:     .opts.sharedWriteBuffer,
		ChannelzParent:        .channelz,
		MaxHeaderListSize:     .opts.maxHeaderListSize,
		HeaderTableSize:       .opts.headerTableSize,
		BufferPool:            .opts.bufferPool,
		StaticWindowSize:      .opts.staticWindowSize,
	}
	,  := transport.NewServerTransport(, )
	if  != nil {
		.mu.Lock()
		.errorf("NewServerTransport(%q) failed: %v", .RemoteAddr(), )
		.mu.Unlock()
		// ErrConnDispatched means that the connection was dispatched away from
		// gRPC; those connections should be left open.
		if  != credentials.ErrConnDispatched {
			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
			if  != io.EOF {
				channelz.Info(logger, .channelz, "grpc: Server.Serve failed to create ServerTransport: ", )
			}
			.Close()
		}
		return nil
	}

	return 
}

func ( *Server) ( context.Context,  transport.ServerTransport,  net.Conn) {
	 = transport.SetConnection(, )
	 = peer.NewContext(, .Peer())
	for ,  := range .opts.statsHandlers {
		 = .TagConn(, &stats.ConnTagInfo{
			RemoteAddr: .Peer().Addr,
			LocalAddr:  .Peer().LocalAddr,
		})
		.HandleConn(, &stats.ConnBegin{})
	}

	defer func() {
		.Close(errors.New("finished serving streams for the server transport"))
		for ,  := range .opts.statsHandlers {
			.HandleConn(, &stats.ConnEnd{})
		}
	}()

	 := newHandlerQuota(.opts.maxConcurrentStreams)
	.HandleStreams(, func( *transport.ServerStream) {
		.handlersWG.Add(1)
		.acquire()
		 := func() {
			defer .release()
			defer .handlersWG.Done()
			.handleStream(, )
		}

		if .opts.numServerWorkers > 0 {
			select {
			case .serverWorkerChannel <- :
				return
			default:
				// If all stream workers are busy, fallback to the default code path.
			}
		}
		go ()
	})
}

var _ http.Handler = (*Server)(nil)

// ServeHTTP implements the Go standard library's http.Handler
// interface by responding to the gRPC request r, by looking up
// the requested gRPC method in the gRPC server s.
//
// The provided HTTP request must have arrived on an HTTP/2
// connection. When using the Go standard library's server,
// practically this means that the Request must also have arrived
// over TLS.
//
// To share one port (such as 443 for https) between gRPC and an
// existing http.Handler, use a root http.Handler such as:
//
//	if r.ProtoMajor == 2 && strings.HasPrefix(
//		r.Header.Get("Content-Type"), "application/grpc") {
//		grpcServer.ServeHTTP(w, r)
//	} else {
//		yourMux.ServeHTTP(w, r)
//	}
//
// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
// separate from grpc-go's HTTP/2 server. Performance and features may vary
// between the two paths. ServeHTTP does not support some gRPC features
// available through grpc-go's HTTP/2 server.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( *Server) ( http.ResponseWriter,  *http.Request) {
	,  := transport.NewServerHandlerTransport(, , .opts.statsHandlers, .opts.bufferPool)
	if  != nil {
		// Errors returned from transport.NewServerHandlerTransport have
		// already been written to w.
		return
	}
	if !.addConn(listenerAddressForServeHTTP, ) {
		return
	}
	defer .removeConn(listenerAddressForServeHTTP, )
	.serveStreams(.Context(), , nil)
}

func ( *Server) ( string,  transport.ServerTransport) bool {
	.mu.Lock()
	defer .mu.Unlock()
	if .conns == nil {
		.Close(errors.New("Server.addConn called when server has already been stopped"))
		return false
	}
	if .drain {
		// Transport added after we drained our existing conns: drain it
		// immediately.
		.Drain("")
	}

	if .conns[] == nil {
		// Create a map entry if this is the first connection on this listener.
		.conns[] = make(map[transport.ServerTransport]bool)
	}
	.conns[][] = true
	return true
}

func ( *Server) ( string,  transport.ServerTransport) {
	.mu.Lock()
	defer .mu.Unlock()

	 := .conns[]
	if  != nil {
		delete(, )
		if len() == 0 {
			// If the last connection for this address is being removed, also
			// remove the map entry corresponding to the address. This is used
			// in GracefulStop() when waiting for all connections to be closed.
			delete(.conns, )
		}
		.cv.Broadcast()
	}
}

func ( *Server) () {
	.channelz.ServerMetrics.CallsStarted.Add(1)
	.channelz.ServerMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
}

func ( *Server) () {
	.channelz.ServerMetrics.CallsSucceeded.Add(1)
}

func ( *Server) () {
	.channelz.ServerMetrics.CallsFailed.Add(1)
}

func ( *Server) ( context.Context,  *transport.ServerStream,  any,  Compressor,  *transport.WriteOptions,  encoding.Compressor) error {
	,  := encode(.getCodec(.ContentSubtype()), )
	if  != nil {
		channelz.Error(logger, .channelz, "grpc: server failed to encode response: ", )
		return 
	}

	, ,  := compress(, , , .opts.bufferPool)
	if  != nil {
		.Free()
		channelz.Error(logger, .channelz, "grpc: server failed to compress response: ", )
		return 
	}

	,  := msgHeader(, , )

	defer func() {
		.Free()
		.Free()
		// payload does not need to be freed here, it is either data or compData, both of
		// which are already freed.
	}()

	 := .Len()
	 := .Len()
	// TODO(dfawley): should we be checking len(data) instead?
	if  > .opts.maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", , .opts.maxSendMessageSize)
	}
	 = .Write(, , )
	if  == nil {
		if len(.opts.statsHandlers) != 0 {
			for ,  := range .opts.statsHandlers {
				.HandleRPC(, outPayload(false, , , , time.Now()))
			}
		}
	}
	return 
}

// chainUnaryServerInterceptors chains all unary server interceptors into one.
func ( *Server) {
	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
	// be executed before any other chained interceptors.
	 := .opts.chainUnaryInts
	if .opts.unaryInt != nil {
		 = append([]UnaryServerInterceptor{.opts.unaryInt}, .opts.chainUnaryInts...)
	}

	var  UnaryServerInterceptor
	if len() == 0 {
		 = nil
	} else if len() == 1 {
		 = [0]
	} else {
		 = chainUnaryInterceptors()
	}

	.opts.unaryInt = 
}

func ( []UnaryServerInterceptor) UnaryServerInterceptor {
	return func( context.Context,  any,  *UnaryServerInfo,  UnaryHandler) (any, error) {
		return [0](, , , getChainUnaryHandler(, 0, , ))
	}
}

func ( []UnaryServerInterceptor,  int,  *UnaryServerInfo,  UnaryHandler) UnaryHandler {
	if  == len()-1 {
		return 
	}
	return func( context.Context,  any) (any, error) {
		return [+1](, , , (, +1, , ))
	}
}

func ( *Server) ( context.Context,  *transport.ServerStream,  *serviceInfo,  *MethodDesc,  *traceInfo) ( error) {
	 := .opts.statsHandlers
	if len() != 0 ||  != nil || channelz.IsOn() {
		if channelz.IsOn() {
			.incrCallsStarted()
		}
		var  *stats.Begin
		for ,  := range  {
			 := time.Now()
			 = &stats.Begin{
				BeginTime:      ,
				IsClientStream: false,
				IsServerStream: false,
			}
			.HandleRPC(, )
		}
		if  != nil {
			.tr.LazyLog(&.firstLine, false)
		}
		// The deferred error handling for tracing, stats handler and channelz are
		// combined into one function to reduce stack usage -- a defer takes ~56-64
		// bytes on the stack, so overflowing the stack will require a stack
		// re-allocation, which is expensive.
		//
		// To maintain behavior similar to separate deferred statements, statements
		// should be executed in the reverse order. That is, tracing first, stats
		// handler second, and channelz last. Note that panics *within* defers will
		// lead to different behavior, but that's an acceptable compromise; that
		// would be undefined behavior territory anyway.
		defer func() {
			if  != nil {
				if  != nil &&  != io.EOF {
					.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
					.tr.SetError()
				}
				.tr.Finish()
			}

			for ,  := range  {
				 := &stats.End{
					BeginTime: .BeginTime,
					EndTime:   time.Now(),
				}
				if  != nil &&  != io.EOF {
					.Error = toRPCErr()
				}
				.HandleRPC(, )
			}

			if channelz.IsOn() {
				if  != nil &&  != io.EOF {
					.incrCallsFailed()
				} else {
					.incrCallsSucceeded()
				}
			}
		}()
	}
	var  []binarylog.MethodLogger
	if  := binarylog.GetMethodLogger(.Method());  != nil {
		 = append(, )
	}
	if .opts.binaryLogger != nil {
		if  := .opts.binaryLogger.GetMethodLogger(.Method());  != nil {
			 = append(, )
		}
	}
	if len() != 0 {
		,  := metadata.FromIncomingContext()
		 := &binarylog.ClientHeader{
			Header:     ,
			MethodName: .Method(),
			PeerAddr:   nil,
		}
		if ,  := .Deadline();  {
			.Timeout = time.Until()
			if .Timeout < 0 {
				.Timeout = 0
			}
		}
		if  := [":authority"]; len() > 0 {
			.Authority = [0]
		}
		if ,  := peer.FromContext();  {
			.PeerAddr = .Addr
		}
		for ,  := range  {
			.Log(, )
		}
	}

	// comp and cp are used for compression.  decomp and dc are used for
	// decompression.  If comp and decomp are both set, they are the same;
	// however they are kept separate to ensure that at most one of the
	// compressor/decompressor variable pairs are set for use later.
	var ,  encoding.Compressor
	var  Compressor
	var  Decompressor
	var  string

	// If dc is set and matches the stream's compression, use it.  Otherwise, try
	// to find a matching registered compressor for decomp.
	if  := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() ==  {
		 = .opts.dc
	} else if  != "" &&  != encoding.Identity {
		 = encoding.GetCompressor()
		if  == nil {
			 := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
			.WriteStatus()
			return .Err()
		}
	}

	// If cp is set, use it.  Otherwise, attempt to compress the response using
	// the incoming message compression method.
	//
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
	if .opts.cp != nil {
		 = .opts.cp
		 = .Type()
	} else if  := .RecvCompress();  != "" &&  != encoding.Identity {
		// Legacy compressor not specified; attempt to respond with same encoding.
		 = encoding.GetCompressor()
		if  != nil {
			 = .Name()
		}
	}

	if  != "" {
		if  := .SetSendCompress();  != nil {
			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", )
		}
	}

	var  *payloadInfo
	if len() != 0 || len() != 0 {
		 = &payloadInfo{}
		defer .free()
	}

	,  := recvAndDecompress(&parser{r: , bufferPool: .opts.bufferPool}, , , .opts.maxReceiveMessageSize, , , true)
	if  != nil {
		if  := .WriteStatus(status.Convert());  != nil {
			channelz.Warningf(logger, .channelz, "grpc: Server.processUnaryRPC failed to write status: %v", )
		}
		return 
	}
	 := false
	 := func() {
		if ! {
			.Free()
			 = true
		}
	}
	defer ()
	 := func( any) error {
		defer ()
		if  := .getCodec(.ContentSubtype()).Unmarshal(, );  != nil {
			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", )
		}

		for ,  := range  {
			.HandleRPC(, &stats.InPayload{
				RecvTime:         time.Now(),
				Payload:          ,
				Length:           .Len(),
				WireLength:       .compressedLength + headerLen,
				CompressedLength: .compressedLength,
			})
		}
		if len() != 0 {
			 := &binarylog.ClientMessage{
				Message: .Materialize(),
			}
			for ,  := range  {
				.Log(, )
			}
		}
		if  != nil {
			.tr.LazyLog(&payload{sent: false, msg: }, true)
		}
		return nil
	}
	 = NewContextWithServerTransportStream(, )
	,  := .Handler(.serviceImpl, , , .opts.unaryInt)
	if  != nil {
		,  := status.FromError()
		if ! {
			// Convert non-status application error to a status error with code
			// Unknown, but handle context errors specifically.
			 = status.FromContextError()
			 = .Err()
		}
		if  != nil {
			.tr.LazyLog(stringer(.Message()), true)
			.tr.SetError()
		}
		if  := .WriteStatus();  != nil {
			channelz.Warningf(logger, .channelz, "grpc: Server.processUnaryRPC failed to write status: %v", )
		}
		if len() != 0 {
			if ,  := .Header(); .Len() > 0 {
				// Only log serverHeader if there was header. Otherwise it can
				// be trailer only.
				 := &binarylog.ServerHeader{
					Header: ,
				}
				for ,  := range  {
					.Log(, )
				}
			}
			 := &binarylog.ServerTrailer{
				Trailer: .Trailer(),
				Err:     ,
			}
			for ,  := range  {
				.Log(, )
			}
		}
		return 
	}
	if  != nil {
		.tr.LazyLog(stringer("OK"), false)
	}
	 := &transport.WriteOptions{Last: true}

	// Server handler could have set new compressor by calling SetSendCompressor.
	// In case it is set, we need to use it for compressing outbound message.
	if .SendCompress() !=  {
		 = encoding.GetCompressor(.SendCompress())
	}
	if  := .sendResponse(, , , , , );  != nil {
		if  == io.EOF {
			// The entire stream is done (for unary RPC only).
			return 
		}
		if ,  := status.FromError();  {
			if  := .WriteStatus();  != nil {
				channelz.Warningf(logger, .channelz, "grpc: Server.processUnaryRPC failed to write status: %v", )
			}
		} else {
			switch st := .(type) {
			case transport.ConnectionError:
				// Nothing to do here.
			default:
				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", , ))
			}
		}
		if len() != 0 {
			,  := .Header()
			 := &binarylog.ServerHeader{
				Header: ,
			}
			 := &binarylog.ServerTrailer{
				Trailer: .Trailer(),
				Err:     ,
			}
			for ,  := range  {
				.Log(, )
				.Log(, )
			}
		}
		return 
	}
	if len() != 0 {
		,  := .Header()
		 := &binarylog.ServerHeader{
			Header: ,
		}
		 := &binarylog.ServerMessage{
			Message: ,
		}
		for ,  := range  {
			.Log(, )
			.Log(, )
		}
	}
	if  != nil {
		.tr.LazyLog(&payload{sent: true, msg: }, true)
	}
	// TODO: Should we be logging if writing status failed here, like above?
	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
	// error or allow the stats handler to see it?
	if len() != 0 {
		 := &binarylog.ServerTrailer{
			Trailer: .Trailer(),
			Err:     ,
		}
		for ,  := range  {
			.Log(, )
		}
	}
	return .WriteStatus(statusOK)
}

// chainStreamServerInterceptors chains all stream server interceptors into one.
func ( *Server) {
	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
	// be executed before any other chained interceptors.
	 := .opts.chainStreamInts
	if .opts.streamInt != nil {
		 = append([]StreamServerInterceptor{.opts.streamInt}, .opts.chainStreamInts...)
	}

	var  StreamServerInterceptor
	if len() == 0 {
		 = nil
	} else if len() == 1 {
		 = [0]
	} else {
		 = chainStreamInterceptors()
	}

	.opts.streamInt = 
}

func ( []StreamServerInterceptor) StreamServerInterceptor {
	return func( any,  ServerStream,  *StreamServerInfo,  StreamHandler) error {
		return [0](, , , getChainStreamHandler(, 0, , ))
	}
}

func ( []StreamServerInterceptor,  int,  *StreamServerInfo,  StreamHandler) StreamHandler {
	if  == len()-1 {
		return 
	}
	return func( any,  ServerStream) error {
		return [+1](, , , (, +1, , ))
	}
}

func ( *Server) ( context.Context,  *transport.ServerStream,  *serviceInfo,  *StreamDesc,  *traceInfo) ( error) {
	if channelz.IsOn() {
		.incrCallsStarted()
	}
	 := .opts.statsHandlers
	var  *stats.Begin
	if len() != 0 {
		 := time.Now()
		 = &stats.Begin{
			BeginTime:      ,
			IsClientStream: .ClientStreams,
			IsServerStream: .ServerStreams,
		}
		for ,  := range  {
			.HandleRPC(, )
		}
	}
	 = NewContextWithServerTransportStream(, )
	 := &serverStream{
		ctx:                   ,
		s:                     ,
		p:                     &parser{r: , bufferPool: .opts.bufferPool},
		codec:                 .getCodec(.ContentSubtype()),
		desc:                  ,
		maxReceiveMessageSize: .opts.maxReceiveMessageSize,
		maxSendMessageSize:    .opts.maxSendMessageSize,
		trInfo:                ,
		statsHandler:          ,
	}

	if len() != 0 ||  != nil || channelz.IsOn() {
		// See comment in processUnaryRPC on defers.
		defer func() {
			if  != nil {
				.mu.Lock()
				if  != nil &&  != io.EOF {
					.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
					.trInfo.tr.SetError()
				}
				.trInfo.tr.Finish()
				.trInfo.tr = nil
				.mu.Unlock()
			}

			if len() != 0 {
				 := &stats.End{
					BeginTime: .BeginTime,
					EndTime:   time.Now(),
				}
				if  != nil &&  != io.EOF {
					.Error = toRPCErr()
				}
				for ,  := range  {
					.HandleRPC(, )
				}
			}

			if channelz.IsOn() {
				if  != nil &&  != io.EOF {
					.incrCallsFailed()
				} else {
					.incrCallsSucceeded()
				}
			}
		}()
	}

	if  := binarylog.GetMethodLogger(.Method());  != nil {
		.binlogs = append(.binlogs, )
	}
	if .opts.binaryLogger != nil {
		if  := .opts.binaryLogger.GetMethodLogger(.Method());  != nil {
			.binlogs = append(.binlogs, )
		}
	}
	if len(.binlogs) != 0 {
		,  := metadata.FromIncomingContext()
		 := &binarylog.ClientHeader{
			Header:     ,
			MethodName: .Method(),
			PeerAddr:   nil,
		}
		if ,  := .Deadline();  {
			.Timeout = time.Until()
			if .Timeout < 0 {
				.Timeout = 0
			}
		}
		if  := [":authority"]; len() > 0 {
			.Authority = [0]
		}
		if ,  := peer.FromContext(.Context());  {
			.PeerAddr = .Addr
		}
		for ,  := range .binlogs {
			.Log(, )
		}
	}

	// If dc is set and matches the stream's compression, use it.  Otherwise, try
	// to find a matching registered compressor for decomp.
	if  := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() ==  {
		.decompressorV0 = .opts.dc
	} else if  != "" &&  != encoding.Identity {
		.decompressorV1 = encoding.GetCompressor()
		if .decompressorV1 == nil {
			 := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
			.s.WriteStatus()
			return .Err()
		}
	}

	// If cp is set, use it.  Otherwise, attempt to compress the response using
	// the incoming message compression method.
	//
	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
	if .opts.cp != nil {
		.compressorV0 = .opts.cp
		.sendCompressorName = .opts.cp.Type()
	} else if  := .RecvCompress();  != "" &&  != encoding.Identity {
		// Legacy compressor not specified; attempt to respond with same encoding.
		.compressorV1 = encoding.GetCompressor()
		if .compressorV1 != nil {
			.sendCompressorName = 
		}
	}

	if .sendCompressorName != "" {
		if  := .SetSendCompress(.sendCompressorName);  != nil {
			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", )
		}
	}

	.ctx = newContextWithRPCInfo(.ctx, false, .codec, .compressorV0, .compressorV1)

	if  != nil {
		.tr.LazyLog(&.firstLine, false)
	}
	var  error
	var  any
	if  != nil {
		 = .serviceImpl
	}
	if .opts.streamInt == nil {
		 = .Handler(, )
	} else {
		 := &StreamServerInfo{
			FullMethod:     .Method(),
			IsClientStream: .ClientStreams,
			IsServerStream: .ServerStreams,
		}
		 = .opts.streamInt(, , , .Handler)
	}
	if  != nil {
		,  := status.FromError()
		if ! {
			// Convert non-status application error to a status error with code
			// Unknown, but handle context errors specifically.
			 = status.FromContextError()
			 = .Err()
		}
		if  != nil {
			.mu.Lock()
			.trInfo.tr.LazyLog(stringer(.Message()), true)
			.trInfo.tr.SetError()
			.mu.Unlock()
		}
		if len(.binlogs) != 0 {
			 := &binarylog.ServerTrailer{
				Trailer: .s.Trailer(),
				Err:     ,
			}
			for ,  := range .binlogs {
				.Log(, )
			}
		}
		.s.WriteStatus()
		// TODO: Should we log an error from WriteStatus here and below?
		return 
	}
	if  != nil {
		.mu.Lock()
		.trInfo.tr.LazyLog(stringer("OK"), false)
		.mu.Unlock()
	}
	if len(.binlogs) != 0 {
		 := &binarylog.ServerTrailer{
			Trailer: .s.Trailer(),
			Err:     ,
		}
		for ,  := range .binlogs {
			.Log(, )
		}
	}
	return .s.WriteStatus(statusOK)
}

func ( *Server) ( transport.ServerTransport,  *transport.ServerStream) {
	 := .Context()
	 = contextWithServer(, )
	var  *traceInfo
	if EnableTracing {
		 := newTrace("grpc.Recv."+methodFamily(.Method()), .Method())
		 = newTraceContext(, )
		 = &traceInfo{
			tr: ,
			firstLine: firstLine{
				client:     false,
				remoteAddr: .Peer().Addr,
			},
		}
		if ,  := .Deadline();  {
			.firstLine.deadline = time.Until()
		}
	}

	 := .Method()
	if  != "" && [0] == '/' {
		 = [1:]
	}
	 := strings.LastIndex(, "/")
	if  == -1 {
		if  != nil {
			.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{}}, true)
			.tr.SetError()
		}
		 := fmt.Sprintf("malformed method name: %q", .Method())
		if  := .WriteStatus(status.New(codes.Unimplemented, ));  != nil {
			if  != nil {
				.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
				.tr.SetError()
			}
			channelz.Warningf(logger, .channelz, "grpc: Server.handleStream failed to write status: %v", )
		}
		if  != nil {
			.tr.Finish()
		}
		return
	}
	 := [:]
	 := [+1:]

	// FromIncomingContext is expensive: skip if there are no statsHandlers
	if len(.opts.statsHandlers) > 0 {
		,  := metadata.FromIncomingContext()
		for ,  := range .opts.statsHandlers {
			 = .TagRPC(, &stats.RPCTagInfo{FullMethodName: .Method()})
			.HandleRPC(, &stats.InHeader{
				FullMethod:  .Method(),
				RemoteAddr:  .Peer().Addr,
				LocalAddr:   .Peer().LocalAddr,
				Compression: .RecvCompress(),
				WireLength:  .HeaderWireLength(),
				Header:      ,
			})
		}
	}
	// To have calls in stream callouts work. Will delete once all stats handler
	// calls come from the gRPC layer.
	.SetContext()

	,  := .services[]
	if  {
		if ,  := .methods[];  {
			.processUnaryRPC(, , , , )
			return
		}
		if ,  := .streams[];  {
			.processStreamingRPC(, , , , )
			return
		}
	}
	// Unknown service, or known server unknown method.
	if  := .opts.unknownStreamDesc;  != nil {
		.processStreamingRPC(, , nil, , )
		return
	}
	var  string
	if ! {
		 = fmt.Sprintf("unknown service %v", )
	} else {
		 = fmt.Sprintf("unknown method %v for service %v", , )
	}
	if  != nil {
		.tr.LazyPrintf("%s", )
		.tr.SetError()
	}
	if  := .WriteStatus(status.New(codes.Unimplemented, ));  != nil {
		if  != nil {
			.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
			.tr.SetError()
		}
		channelz.Warningf(logger, .channelz, "grpc: Server.handleStream failed to write status: %v", )
	}
	if  != nil {
		.tr.Finish()
	}
}

// The key to save ServerTransportStream in the context.
type streamKey struct{}

// NewContextWithServerTransportStream creates a new context from ctx and
// attaches stream to it.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( context.Context,  ServerTransportStream) context.Context {
	return context.WithValue(, streamKey{}, )
}

// ServerTransportStream is a minimal interface that a transport stream must
// implement. This can be used to mock an actual transport stream for tests of
// handler code that use, for example, grpc.SetHeader (which requires some
// stream to be in context).
//
// See also NewContextWithServerTransportStream.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type ServerTransportStream interface {
	Method() string
	SetHeader(md metadata.MD) error
	SendHeader(md metadata.MD) error
	SetTrailer(md metadata.MD) error
}

// ServerTransportStreamFromContext returns the ServerTransportStream saved in
// ctx. Returns nil if the given context has no stream associated with it
// (which implies it is not an RPC invocation context).
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( context.Context) ServerTransportStream {
	,  := .Value(streamKey{}).(ServerTransportStream)
	return 
}

// Stop stops the gRPC server. It immediately closes all open
// connections and listeners.
// It cancels all active RPCs on the server side and the corresponding
// pending RPCs on the client side will get notified by connection
// errors.
func ( *Server) () {
	.stop(false)
}

// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func ( *Server) () {
	.stop(true)
}

func ( *Server) ( bool) {
	.quit.Fire()
	defer .done.Fire()

	.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(.channelz.ID) })
	.mu.Lock()
	.closeListenersLocked()
	// Wait for serving threads to be ready to exit.  Only then can we be sure no
	// new conns will be created.
	.mu.Unlock()
	.serveWG.Wait()

	.mu.Lock()
	defer .mu.Unlock()

	if  {
		.drainAllServerTransportsLocked()
	} else {
		.closeServerTransportsLocked()
	}

	for len(.conns) != 0 {
		.cv.Wait()
	}
	.conns = nil

	if .opts.numServerWorkers > 0 {
		// Closing the channel (only once, via sync.OnceFunc) after all the
		// connections have been closed above ensures that there are no
		// goroutines executing the callback passed to st.HandleStreams (where
		// the channel is written to).
		.serverWorkerChannelClose()
	}

	if  || .opts.waitForHandlers {
		.handlersWG.Wait()
	}

	if .events != nil {
		.events.Finish()
		.events = nil
	}
}

// s.mu must be held by the caller.
func ( *Server) () {
	for ,  := range .conns {
		for  := range  {
			.Close(errors.New("Server.Stop called"))
		}
	}
}

// s.mu must be held by the caller.
func ( *Server) () {
	if !.drain {
		for ,  := range .conns {
			for  := range  {
				.Drain("graceful_stop")
			}
		}
		.drain = true
	}
}

// s.mu must be held by the caller.
func ( *Server) () {
	for  := range .lis {
		.Close()
	}
	.lis = nil
}

// contentSubtype must be lowercase
// cannot return nil
func ( *Server) ( string) baseCodec {
	if .opts.codec != nil {
		return .opts.codec
	}
	if  == "" {
		return getCodec(proto.Name)
	}
	 := getCodec()
	if  == nil {
		logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", , proto.Name)
		return getCodec(proto.Name)
	}
	return 
}

type serverKey struct{}

// serverFromContext gets the Server from the context.
func ( context.Context) *Server {
	,  := .Value(serverKey{}).(*Server)
	return 
}

// contextWithServer sets the Server in the context.
func ( context.Context,  *Server) context.Context {
	return context.WithValue(, serverKey{}, )
}

// isRegisteredMethod returns whether the passed in method is registered as a
// method on the server. /service/method and service/method will match if the
// service and method are registered on the server.
func ( *Server) ( string) bool {
	if  != "" && [0] == '/' {
		 = [1:]
	}
	 := strings.LastIndex(, "/")
	if  == -1 { // Invalid method name syntax.
		return false
	}
	 := [:]
	 := [+1:]
	,  := .services[]
	if  {
		if ,  := .methods[];  {
			return true
		}
		if ,  := .streams[];  {
			return true
		}
	}
	return false
}

// SetHeader sets the header metadata to be sent from the server to the client.
// The context provided must be the context passed to the server's handler.
//
// Streaming RPCs should prefer the SetHeader method of the ServerStream.
//
// When called multiple times, all the provided metadata will be merged.  All
// the metadata will be sent out when one of the following happens:
//
//   - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
//   - The first response message is sent.  For unary handlers, this occurs when
//     the handler returns; for streaming handlers, this can happen when stream's
//     SendMsg method is called.
//   - An RPC status is sent out (error or success).  This occurs when the handler
//     returns.
//
// SetHeader will fail if called after any of the events above.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func ( context.Context,  metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	return .SetHeader()
}

// SendHeader sends header metadata. It may be called at most once, and may not
// be called after any event that causes headers to be sent (see SetHeader for
// a complete list).  The provided md and headers set by SetHeader() will be
// sent.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func ( context.Context,  metadata.MD) error {
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	if  := .SendHeader();  != nil {
		return toRPCErr()
	}
	return nil
}

// SetSendCompressor sets a compressor for outbound messages from the server.
// It must not be called after any event that causes headers to be sent
// (see ServerStream.SetHeader for the complete list). Provided compressor is
// used when below conditions are met:
//
//   - compressor is registered via encoding.RegisterCompressor
//   - compressor name must exist in the client advertised compressor names
//     sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
//     get client supported compressor names.
//
// The context provided must be the context passed to the server's handler.
// It must be noted that compressor name encoding.Identity disables the
// outbound compression.
// By default, server messages will be sent using the same compressor with
// which request messages were sent.
//
// It is not safe to call SetSendCompressor concurrently with SendHeader and
// SendMsg.
//
// # Experimental
//
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( context.Context,  string) error {
	,  := ServerTransportStreamFromContext().(*transport.ServerStream)
	if ! ||  == nil {
		return fmt.Errorf("failed to fetch the stream from the given context")
	}

	if  := validateSendCompressor(, .ClientAdvertisedCompressors());  != nil {
		return fmt.Errorf("unable to set send compressor: %w", )
	}

	return .SetSendCompress()
}

// ClientSupportedCompressors returns compressor names advertised by the client
// via grpc-accept-encoding header.
//
// The context provided must be the context passed to the server's handler.
//
// # Experimental
//
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
// later release.
func ( context.Context) ([]string, error) {
	,  := ServerTransportStreamFromContext().(*transport.ServerStream)
	if ! ||  == nil {
		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", )
	}

	return .ClientAdvertisedCompressors(), nil
}

// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged.
//
// The error returned is compatible with the status package.  However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
func ( context.Context,  metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	return .SetTrailer()
}

// Method returns the method string for the server context.  The returned
// string is in the format of "/service/method".
func ( context.Context) (string, bool) {
	 := ServerTransportStreamFromContext()
	if  == nil {
		return "", false
	}
	return .Method(), true
}

// validateSendCompressor returns an error when given compressor name cannot be
// handled by the server or the client based on the advertised compressors.
func ( string,  []string) error {
	if  == encoding.Identity {
		return nil
	}

	if !grpcutil.IsCompressorNameRegistered() {
		return fmt.Errorf("compressor not registered %q", )
	}

	for ,  := range  {
		if  ==  {
			return nil // found match
		}
	}
	return fmt.Errorf("client does not support compressor %q", )
}

// atomicSemaphore implements a blocking, counting semaphore. acquire should be
// called synchronously; release may be called asynchronously.
type atomicSemaphore struct {
	n    atomic.Int64
	wait chan struct{}
}

func ( *atomicSemaphore) () {
	if .n.Add(-1) < 0 {
		// We ran out of quota.  Block until a release happens.
		<-.wait
	}
}

func ( *atomicSemaphore) () {
	// N.B. the "<= 0" check below should allow for this to work with multiple
	// concurrent calls to acquire, but also note that with synchronous calls to
	// acquire, as our system does, n will never be less than -1.  There are
	// fairness issues (queuing) to consider if this was to be generalized.
	if .n.Add(1) <= 0 {
		// An acquire was waiting on us.  Unblock it.
		.wait <- struct{}{}
	}
}

func ( uint32) *atomicSemaphore {
	 := &atomicSemaphore{wait: make(chan struct{}, 1)}
	.n.Store(int64())
	return 
}