/*
 *
 * Copyright 2014 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package transport

import (
	
	
	
	
	
	
	rand 
	
	
	
	
	
	

	
	
	
	
	
	
	istatus 
	
	
	

	
	
	
	
	
	
	
	
	
	
)

var (
	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
	// the stream's state.
	ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
	// than the limit set by peer.
	ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)

// serverConnectionCounter counts the number of connections a server has seen
// (equal to the number of http2Servers created). Must be accessed atomically.
var serverConnectionCounter uint64

// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
	lastRead        int64 // Keep this field 64-bit aligned. Accessed atomically.
	done            chan struct{}
	conn            net.Conn
	loopy           *loopyWriter
	readerDone      chan struct{} // sync point to enable testing.
	loopyWriterDone chan struct{}
	peer            peer.Peer
	inTapHandle     tap.ServerInHandle
	framer          *framer
	// The max number of concurrent streams.
	maxStreams uint32
	// controlBuf delivers all the control related tasks (e.g., window
	// updates, reset streams, and various settings) to the controller.
	controlBuf *controlBuffer
	fc         *trInFlow
	stats      []stats.Handler
	// Keepalive and max-age parameters for the server.
	kp keepalive.ServerParameters
	// Keepalive enforcement policy.
	kep keepalive.EnforcementPolicy
	// The time instance last ping was received.
	lastPingAt time.Time
	// Number of times the client has violated keepalive ping policy so far.
	pingStrikes uint8
	// Flag to signify that number of ping strikes should be reset to 0.
	// This is set whenever data or header frames are sent.
	// 1 means yes.
	resetPingStrikes      uint32 // Accessed atomically.
	initialWindowSize     int32
	bdpEst                *bdpEstimator
	maxSendHeaderListSize *uint32

	mu sync.Mutex // guard the following

	// drainEvent is initialized when Drain() is called the first time. After
	// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
	// an independent goroutine will be launched to later send the second
	// GoAway. During this time we don't want to write another first GoAway(with
	// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
	// already initialized since draining is already underway.
	drainEvent    *grpcsync.Event
	state         transportState
	activeStreams map[uint32]*ServerStream
	// idle is the time instant when the connection went idle.
	// This is either the beginning of the connection or when the number of
	// RPCs go down to 0.
	// When the connection is busy, this value is set to 0.
	idle time.Time

	// Fields below are for channelz metric collection.
	channelz   *channelz.Socket
	bufferPool mem.BufferPool

	connectionID uint64

	// maxStreamMu guards the maximum stream ID
	// This lock may not be taken if mu is already held.
	maxStreamMu sync.Mutex
	maxStreamID uint32 // max stream ID ever seen

	logger *grpclog.PrefixLogger
	// setResetPingStrikes is stored as a closure instead of making this a
	// method on http2Server to avoid a heap allocation when converting a method
	// to a closure for passing to frames objects.
	setResetPingStrikes func()
}

// NewServerTransport creates a http2 transport with conn and configuration
// options from config.
//
// It returns a non-nil transport and a nil error on success. On failure, it
// returns a nil transport and a non-nil error. For a special case where the
// underlying conn gets closed before the client preface could be read, it
// returns a nil transport and a nil error.
func ( net.Conn,  *ServerConfig) ( ServerTransport,  error) {
	var  credentials.AuthInfo
	 := 
	if .Credentials != nil {
		var  error
		, ,  = .Credentials.ServerHandshake()
		if  != nil {
			// ErrConnDispatched means that the connection was dispatched away
			// from gRPC; those connections should be left open. io.EOF means
			// the connection was closed before handshaking completed, which can
			// happen naturally from probers. Return these errors directly.
			if  == credentials.ErrConnDispatched ||  == io.EOF {
				return nil, 
			}
			return nil, connectionErrorf(false, , "ServerHandshake(%q) failed: %v", .RemoteAddr(), )
		}
	}
	 := .WriteBufferSize
	 := .ReadBufferSize
	 := defaultServerMaxHeaderListSize
	if .MaxHeaderListSize != nil {
		 = *.MaxHeaderListSize
	}
	 := newFramer(, , , .SharedWriteBuffer, )
	// Send initial settings as connection preface to client.
	 := []http2.Setting{{
		ID:  http2.SettingMaxFrameSize,
		Val: http2MaxFrameLen,
	}}
	if .MaxStreams != math.MaxUint32 {
		 = append(, http2.Setting{
			ID:  http2.SettingMaxConcurrentStreams,
			Val: .MaxStreams,
		})
	}
	 := int32(initialWindowSize)
	if .InitialWindowSize >= defaultWindowSize {
		 = .InitialWindowSize
	}
	 := int32(initialWindowSize)
	if .InitialConnWindowSize >= defaultWindowSize {
		 = .InitialConnWindowSize
	}
	if  != defaultWindowSize {
		 = append(, http2.Setting{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32()})
	}
	if .MaxHeaderListSize != nil {
		 = append(, http2.Setting{
			ID:  http2.SettingMaxHeaderListSize,
			Val: *.MaxHeaderListSize,
		})
	}
	if .HeaderTableSize != nil {
		 = append(, http2.Setting{
			ID:  http2.SettingHeaderTableSize,
			Val: *.HeaderTableSize,
		})
	}
	if  := .fr.WriteSettings(...);  != nil {
		return nil, connectionErrorf(false, , "transport: %v", )
	}
	// Adjust the connection flow control window if needed.
	if  := uint32( - defaultWindowSize);  > 0 {
		if  := .fr.WriteWindowUpdate(0, );  != nil {
			return nil, connectionErrorf(false, , "transport: %v", )
		}
	}
	 := .KeepaliveParams
	if .MaxConnectionIdle == 0 {
		.MaxConnectionIdle = defaultMaxConnectionIdle
	}
	if .MaxConnectionAge == 0 {
		.MaxConnectionAge = defaultMaxConnectionAge
	}
	// Add a jitter to MaxConnectionAge.
	.MaxConnectionAge += getJitter(.MaxConnectionAge)
	if .MaxConnectionAgeGrace == 0 {
		.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
	}
	if .Time == 0 {
		.Time = defaultServerKeepaliveTime
	}
	if .Timeout == 0 {
		.Timeout = defaultServerKeepaliveTimeout
	}
	if .Time != infinity {
		if  = syscall.SetTCPUserTimeout(, .Timeout);  != nil {
			return nil, connectionErrorf(false, , "transport: failed to set TCP_USER_TIMEOUT: %v", )
		}
	}
	 := .KeepalivePolicy
	if .MinTime == 0 {
		.MinTime = defaultKeepalivePolicyMinTime
	}

	 := make(chan struct{})
	 := peer.Peer{
		Addr:      .RemoteAddr(),
		LocalAddr: .LocalAddr(),
		AuthInfo:  ,
	}
	 := &http2Server{
		done:              ,
		conn:              ,
		peer:              ,
		framer:            ,
		readerDone:        make(chan struct{}),
		loopyWriterDone:   make(chan struct{}),
		maxStreams:        .MaxStreams,
		inTapHandle:       .InTapHandle,
		fc:                &trInFlow{limit: uint32()},
		state:             reachable,
		activeStreams:     make(map[uint32]*ServerStream),
		stats:             .StatsHandlers,
		kp:                ,
		idle:              time.Now(),
		kep:               ,
		initialWindowSize: ,
		bufferPool:        .BufferPool,
	}
	.setResetPingStrikes = func() {
		atomic.StoreUint32(&.resetPingStrikes, 1)
	}
	var  credentials.ChannelzSecurityValue
	if ,  := .(credentials.ChannelzSecurityInfo);  {
		 = .GetSecurityValue()
	}
	.channelz = channelz.RegisterSocket(
		&channelz.Socket{
			SocketType:       channelz.SocketTypeNormal,
			Parent:           .ChannelzParent,
			SocketMetrics:    channelz.SocketMetrics{},
			EphemeralMetrics: .socketMetrics,
			LocalAddr:        .peer.LocalAddr,
			RemoteAddr:       .peer.Addr,
			SocketOptions:    channelz.GetSocketOption(.conn),
			Security:         ,
		},
	)
	.logger = prefixLoggerForServerTransport()

	.controlBuf = newControlBuffer(.done)
	if !.StaticWindowSize {
		.bdpEst = &bdpEstimator{
			bdp:               initialWindowSize,
			updateFlowControl: .updateFlowControl,
		}
	}

	.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
	.framer.writer.Flush()

	defer func() {
		if  != nil {
			.Close()
		}
	}()

	// Check the validity of client preface.
	 := make([]byte, len(clientPreface))
	if ,  := io.ReadFull(.conn, );  != nil {
		// In deployments where a gRPC server runs behind a cloud load balancer
		// which performs regular TCP level health checks, the connection is
		// closed immediately by the latter.  Returning io.EOF here allows the
		// grpc server implementation to recognize this scenario and suppress
		// logging to reduce spam.
		if  == io.EOF {
			return nil, io.EOF
		}
		return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to receive the preface from client: %v", )
	}
	if !bytes.Equal(, clientPreface) {
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", )
	}

	,  := .framer.fr.ReadFrame()
	if  == io.EOF ||  == io.ErrUnexpectedEOF {
		return nil, 
	}
	if  != nil {
		return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to read initial settings frame: %v", )
	}
	atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
	,  := .(*http2.SettingsFrame)
	if ! {
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", )
	}
	.handleSettings()

	go func() {
		.loopy = newLoopyWriter(serverSide, .framer, .controlBuf, .bdpEst, .conn, .logger, .outgoingGoAwayHandler, .bufferPool)
		 := .loopy.run()
		close(.loopyWriterDone)
		if !isIOError() {
			// Close the connection if a non-I/O error occurs (for I/O errors
			// the reader will also encounter the error and close).  Wait 1
			// second before closing the connection, or when the reader is done
			// (i.e. the client already closed the connection or a connection
			// error occurred).  This avoids the potential problem where there
			// is unread data on the receive side of the connection, which, if
			// closed, would lead to a TCP RST instead of FIN, and the client
			// encountering errors.  For more info:
			// https://github.com/grpc/grpc-go/issues/5358
			 := time.NewTimer(time.Second)
			defer .Stop()
			select {
			case <-.readerDone:
			case <-.C:
			}
			.conn.Close()
		}
	}()
	go .keepalive()
	return , nil
}

// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func ( *http2Server) ( context.Context,  *http2.MetaHeadersFrame,  func(*ServerStream)) error {
	// Acquire max stream ID lock for entire duration
	.maxStreamMu.Lock()
	defer .maxStreamMu.Unlock()

	 := .Header().StreamID

	// frame.Truncated is set to true when framer detects that the current header
	// list size hits MaxHeaderListSize limit.
	if .Truncated {
		.controlBuf.put(&cleanupStream{
			streamID: ,
			rst:      true,
			rstCode:  http2.ErrCodeFrameSize,
			onWrite:  func() {},
		})
		return nil
	}

	if %2 != 1 ||  <= .maxStreamID {
		// illegal gRPC stream id.
		return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", , )
	}
	.maxStreamID = 

	 := newRecvBuffer()
	 := &ServerStream{
		Stream: &Stream{
			id:  ,
			buf: ,
			fc:  &inFlow{limit: uint32(.initialWindowSize)},
		},
		st:               ,
		headerWireLength: int(.Header().Length),
	}
	var (
		// if false, content-type was missing or invalid
		      = false
		 = ""
		       = make(metadata.MD, len(.Fields))
		  string
		// these are set if an error is encountered while parsing the headers
		 bool
		   *status.Status

		 bool
		    time.Duration
	)

	for ,  := range .Fields {
		switch .Name {
		case "content-type":
			,  := grpcutil.ContentSubtype(.Value)
			if ! {
				 = .Value
				break
			}
			[.Name] = append([.Name], .Value)
			.contentSubtype = 
			 = true

		case "grpc-accept-encoding":
			[.Name] = append([.Name], .Value)
			if .Value == "" {
				continue
			}
			 := .Value
			if .clientAdvertisedCompressors != "" {
				 = .clientAdvertisedCompressors + "," + 
			}
			.clientAdvertisedCompressors = 
		case "grpc-encoding":
			.recvCompress = .Value
		case ":method":
			 = .Value
		case ":path":
			.method = .Value
		case "grpc-timeout":
			 = true
			var  error
			if ,  = decodeTimeout(.Value);  != nil {
				 = status.Newf(codes.Internal, "malformed grpc-timeout: %v", )
			}
		// "Transports must consider requests containing the Connection header
		// as malformed." - A41
		case "connection":
			if .logger.V(logLevel) {
				.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
			}
			 = true
		default:
			if isReservedHeader(.Name) && !isWhitelistedHeader(.Name) {
				break
			}
			,  := decodeMetadataHeader(.Name, .Value)
			if  != nil {
				 = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", .Value, .Name, )
				.logger.Warningf("Failed to decode metadata header (%q, %q): %v", .Name, .Value, )
				break
			}
			[.Name] = append([.Name], )
		}
	}

	// "If multiple Host headers or multiple :authority headers are present, the
	// request must be rejected with an HTTP status code 400 as required by Host
	// validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
	// with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
	// error, this takes precedence over a client not speaking gRPC.
	if len([":authority"]) > 1 || len(["host"]) > 1 {
		 := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len([":authority"]), len(["host"]))
		if .logger.V(logLevel) {
			.logger.Infof("Aborting the stream early: %v", )
		}
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     http.StatusBadRequest,
			streamID:       ,
			contentSubtype: .contentSubtype,
			status:         status.New(codes.Internal, ),
			rst:            !.StreamEnded(),
		})
		return nil
	}

	if  {
		.controlBuf.put(&cleanupStream{
			streamID: ,
			rst:      true,
			rstCode:  http2.ErrCodeProtocol,
			onWrite:  func() {},
		})
		return nil
	}
	if ! {
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     http.StatusUnsupportedMediaType,
			streamID:       ,
			contentSubtype: .contentSubtype,
			status:         status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", ),
			rst:            !.StreamEnded(),
		})
		return nil
	}
	if  != nil {
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     http.StatusBadRequest,
			streamID:       ,
			contentSubtype: .contentSubtype,
			status:         ,
			rst:            !.StreamEnded(),
		})
		return nil
	}

	// "If :authority is missing, Host must be renamed to :authority." - A41
	if len([":authority"]) == 0 {
		// No-op if host isn't present, no eventual :authority header is a valid
		// RPC.
		if ,  := ["host"];  {
			[":authority"] = 
			delete(, "host")
		}
	} else {
		// "If :authority is present, Host must be discarded" - A41
		delete(, "host")
	}

	if .StreamEnded() {
		// s is just created by the caller. No lock needed.
		.state = streamReadDone
	}
	if  {
		.ctx, .cancel = context.WithTimeout(, )
	} else {
		.ctx, .cancel = context.WithCancel()
	}

	// Attach the received metadata to the context.
	if len() > 0 {
		.ctx = metadata.NewIncomingContext(.ctx, )
	}
	.mu.Lock()
	if .state != reachable {
		.mu.Unlock()
		.cancel()
		return nil
	}
	if uint32(len(.activeStreams)) >= .maxStreams {
		.mu.Unlock()
		.controlBuf.put(&cleanupStream{
			streamID: ,
			rst:      true,
			rstCode:  http2.ErrCodeRefusedStream,
			onWrite:  func() {},
		})
		.cancel()
		return nil
	}
	if  != http.MethodPost {
		.mu.Unlock()
		 := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", )
		if .logger.V(logLevel) {
			.logger.Infof("Aborting the stream early: %v", )
		}
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     http.StatusMethodNotAllowed,
			streamID:       ,
			contentSubtype: .contentSubtype,
			status:         status.New(codes.Internal, ),
			rst:            !.StreamEnded(),
		})
		.cancel()
		return nil
	}
	if .inTapHandle != nil {
		var  error
		if .ctx,  = .inTapHandle(.ctx, &tap.Info{FullMethodName: .method, Header: });  != nil {
			.mu.Unlock()
			if .logger.V(logLevel) {
				.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", )
			}
			,  := status.FromError()
			if ! {
				 = status.New(codes.PermissionDenied, .Error())
			}
			.controlBuf.put(&earlyAbortStream{
				httpStatus:     http.StatusOK,
				streamID:       .id,
				contentSubtype: .contentSubtype,
				status:         ,
				rst:            !.StreamEnded(),
			})
			return nil
		}
	}

	if .ctx.Err() != nil {
		.mu.Unlock()
		// Early abort in case the timeout was zero or so low it already fired.
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     http.StatusOK,
			streamID:       .id,
			contentSubtype: .contentSubtype,
			status:         status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
			rst:            !.StreamEnded(),
		})
		return nil
	}

	.activeStreams[] = 
	if len(.activeStreams) == 1 {
		.idle = time.Time{}
	}

	// Start a timer to close the stream on reaching the deadline.
	if  {
		// We need to wait for s.cancel to be updated before calling
		// t.closeStream to avoid data races.
		 := make(chan struct{})
		 := internal.TimeAfterFunc(, func() {
			<-
			.closeStream(, true, http2.ErrCodeCancel, false)
		})
		 := .cancel
		.cancel = func() {
			()
			.Stop()
		}
		close()
	}
	.mu.Unlock()
	if channelz.IsOn() {
		.channelz.SocketMetrics.StreamsStarted.Add(1)
		.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
	}
	.requestRead = func( int) {
		.adjustWindow(, uint32())
	}
	.ctxDone = .ctx.Done()
	.wq = newWriteQuota(defaultWriteQuota, .ctxDone)
	.trReader = &transportReader{
		reader: &recvBufferReader{
			ctx:     .ctx,
			ctxDone: .ctxDone,
			recv:    .buf,
		},
		windowHandler: func( int) {
			.updateWindow(, uint32())
		},
	}
	// Register the stream with loopy.
	.controlBuf.put(&registerStream{
		streamID: .id,
		wq:       .wq,
	})
	()
	return nil
}

// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func ( *http2Server) ( context.Context,  func(*ServerStream)) {
	defer func() {
		close(.readerDone)
		<-.loopyWriterDone
	}()
	for {
		.controlBuf.throttle()
		,  := .framer.fr.ReadFrame()
		atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
		if  != nil {
			if ,  := .(http2.StreamError);  {
				if .logger.V(logLevel) {
					.logger.Warningf("Encountered http2.StreamError: %v", )
				}
				.mu.Lock()
				 := .activeStreams[.StreamID]
				.mu.Unlock()
				if  != nil {
					.closeStream(, true, .Code, false)
				} else {
					.controlBuf.put(&cleanupStream{
						streamID: .StreamID,
						rst:      true,
						rstCode:  .Code,
						onWrite:  func() {},
					})
				}
				continue
			}
			.Close()
			return
		}
		switch frame := .(type) {
		case *http2.MetaHeadersFrame:
			if  := .operateHeaders(, , );  != nil {
				// Any error processing client headers, e.g. invalid stream ID,
				// is considered a protocol violation.
				.controlBuf.put(&goAway{
					code:      http2.ErrCodeProtocol,
					debugData: []byte(.Error()),
					closeConn: ,
				})
				continue
			}
		case *http2.DataFrame:
			.handleData()
		case *http2.RSTStreamFrame:
			.handleRSTStream()
		case *http2.SettingsFrame:
			.handleSettings()
		case *http2.PingFrame:
			.handlePing()
		case *http2.WindowUpdateFrame:
			.handleWindowUpdate()
		case *http2.GoAwayFrame:
			// TODO: Handle GoAway from the client appropriately.
		default:
			if .logger.V(logLevel) {
				.logger.Infof("Received unsupported frame type %T", )
			}
		}
	}
}

func ( *http2Server) ( http2.Frame) (*ServerStream, bool) {
	.mu.Lock()
	defer .mu.Unlock()
	if .activeStreams == nil {
		// The transport is closing.
		return nil, false
	}
	,  := .activeStreams[.Header().StreamID]
	if ! {
		// The stream is already done.
		return nil, false
	}
	return , true
}

// adjustWindow sends out extra window update over the initial window size
// of stream if the application is requesting data larger in size than
// the window.
func ( *http2Server) ( *ServerStream,  uint32) {
	if  := .fc.maybeAdjust();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
	}

}

// updateWindow adjusts the inbound quota for the stream and the transport.
// Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold.
func ( *http2Server) ( *ServerStream,  uint32) {
	if  := .fc.onRead();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id,
			increment: ,
		})
	}
}

// updateFlowControl updates the incoming flow control windows
// for the transport and the stream based on the current bdp
// estimation.
func ( *http2Server) ( uint32) {
	.mu.Lock()
	for ,  := range .activeStreams {
		.fc.newLimit()
	}
	.initialWindowSize = int32()
	.mu.Unlock()
	.controlBuf.put(&outgoingWindowUpdate{
		streamID:  0,
		increment: .fc.newLimit(),
	})
	.controlBuf.put(&outgoingSettings{
		ss: []http2.Setting{
			{
				ID:  http2.SettingInitialWindowSize,
				Val: ,
			},
		},
	})

}

func ( *http2Server) ( *http2.DataFrame) {
	 := .Header().Length
	var  bool
	if .bdpEst != nil {
		 = .bdpEst.add()
	}
	// Decouple connection's flow control from application's read.
	// An update on connection's flow control should not depend on
	// whether user application has read the data or not. Such a
	// restriction is already imposed on the stream's flow control,
	// and therefore the sender will be blocked anyways.
	// Decoupling the connection flow control will prevent other
	// active(fast) streams from starving in presence of slow or
	// inactive streams.
	if  := .fc.onData();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{
			streamID:  0,
			increment: ,
		})
	}
	if  {
		// Avoid excessive ping detection (e.g. in an L7 proxy)
		// by sending a window update prior to the BDP ping.
		if  := .fc.reset();  > 0 {
			.controlBuf.put(&outgoingWindowUpdate{
				streamID:  0,
				increment: ,
			})
		}
		.controlBuf.put(bdpPing)
	}
	// Select the right stream to dispatch.
	,  := .getStream()
	if ! {
		return
	}
	if .getState() == streamReadDone {
		.closeStream(, true, http2.ErrCodeStreamClosed, false)
		return
	}
	if  > 0 {
		if  := .fc.onData();  != nil {
			.closeStream(, true, http2.ErrCodeFlowControl, false)
			return
		}
		if .Header().Flags.Has(http2.FlagDataPadded) {
			if  := .fc.onRead( - uint32(len(.Data())));  > 0 {
				.controlBuf.put(&outgoingWindowUpdate{.id, })
			}
		}
		// TODO(bradfitz, zhaoq): A copy is required here because there is no
		// guarantee f.Data() is consumed before the arrival of next frame.
		// Can this copy be eliminated?
		if len(.Data()) > 0 {
			 := .bufferPool
			if  == nil {
				// Note that this is only supposed to be nil in tests. Otherwise, stream is
				// always initialized with a BufferPool.
				 = mem.DefaultBufferPool()
			}
			.write(recvMsg{buffer: mem.Copy(.Data(), )})
		}
	}
	if .StreamEnded() {
		// Received the end of stream from the client.
		.compareAndSwapState(streamActive, streamReadDone)
		.write(recvMsg{err: io.EOF})
	}
}

func ( *http2Server) ( *http2.RSTStreamFrame) {
	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
	if ,  := .getStream();  {
		.closeStream(, false, 0, false)
		return
	}
	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
	.controlBuf.put(&cleanupStream{
		streamID: .Header().StreamID,
		rst:      false,
		rstCode:  0,
		onWrite:  func() {},
	})
}

func ( *http2Server) ( *http2.SettingsFrame) {
	if .IsAck() {
		return
	}
	var  []http2.Setting
	var  []func()
	.ForeachSetting(func( http2.Setting) error {
		switch .ID {
		case http2.SettingMaxHeaderListSize:
			 = append(, func() {
				.maxSendHeaderListSize = new(uint32)
				*.maxSendHeaderListSize = .Val
			})
		default:
			 = append(, )
		}
		return nil
	})
	.controlBuf.executeAndPut(func() bool {
		for ,  := range  {
			()
		}
		return true
	}, &incomingSettings{
		ss: ,
	})
}

const (
	maxPingStrikes     = 2
	defaultPingTimeout = 2 * time.Hour
)

func ( *http2Server) ( *http2.PingFrame) {
	if .IsAck() {
		if .Data == goAwayPing.data && .drainEvent != nil {
			.drainEvent.Fire()
			return
		}
		// Maybe it's a BDP ping.
		if .bdpEst != nil {
			.bdpEst.calculate(.Data)
		}
		return
	}
	 := &ping{ack: true}
	copy(.data[:], .Data[:])
	.controlBuf.put()

	 := time.Now()
	defer func() {
		.lastPingAt = 
	}()
	// A reset ping strikes means that we don't need to check for policy
	// violation for this ping and the pingStrikes counter should be set
	// to 0.
	if atomic.CompareAndSwapUint32(&.resetPingStrikes, 1, 0) {
		.pingStrikes = 0
		return
	}
	.mu.Lock()
	 := len(.activeStreams)
	.mu.Unlock()
	if  < 1 && !.kep.PermitWithoutStream {
		// Keepalive shouldn't be active thus, this new ping should
		// have come after at least defaultPingTimeout.
		if .lastPingAt.Add(defaultPingTimeout).After() {
			.pingStrikes++
		}
	} else {
		// Check if keepalive policy is respected.
		if .lastPingAt.Add(.kep.MinTime).After() {
			.pingStrikes++
		}
	}

	if .pingStrikes > maxPingStrikes {
		// Send goaway and close the connection.
		.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
	}
}

func ( *http2Server) ( *http2.WindowUpdateFrame) {
	.controlBuf.put(&incomingWindowUpdate{
		streamID:  .Header().StreamID,
		increment: .Increment,
	})
}

func ( []hpack.HeaderField,  metadata.MD) []hpack.HeaderField {
	for ,  := range  {
		if isReservedHeader() {
			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
			continue
		}
		for ,  := range  {
			 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
		}
	}
	return 
}

func ( *http2Server) ( any) bool {
	if .maxSendHeaderListSize == nil {
		return true
	}
	 := .(*headerFrame)
	var  int64
	for ,  := range .hf {
		if  += int64(.Size());  > int64(*.maxSendHeaderListSize) {
			if .logger.V(logLevel) {
				.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *.maxSendHeaderListSize)
			}
			return false
		}
	}
	return true
}

func ( *http2Server) ( *ServerStream) error {
	select {
	case <-.done:
		return ErrConnClosing
	default:
	}
	return ContextErr(.ctx.Err())
}

// WriteHeader sends the header metadata md back to the client.
func ( *http2Server) ( *ServerStream,  metadata.MD) error {
	.hdrMu.Lock()
	defer .hdrMu.Unlock()
	if .getState() == streamDone {
		return .streamContextErr()
	}

	if .updateHeaderSent() {
		return ErrIllegalHeaderWrite
	}

	if .Len() > 0 {
		if .header.Len() > 0 {
			.header = metadata.Join(.header, )
		} else {
			.header = 
		}
	}
	if  := .writeHeaderLocked();  != nil {
		switch e := .(type) {
		case ConnectionError:
			return status.Error(codes.Unavailable, .Desc)
		default:
			return status.Convert().Err()
		}
	}
	return nil
}

func ( *http2Server) ( *ServerStream) error {
	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
	// first and create a slice of that exact size.
	 := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
	 = append(, hpack.HeaderField{Name: ":status", Value: "200"})
	 = append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
	if .sendCompress != "" {
		 = append(, hpack.HeaderField{Name: "grpc-encoding", Value: .sendCompress})
	}
	 = appendHeaderFieldsFromMD(, .header)
	 := &headerFrame{
		streamID:  .id,
		hf:        ,
		endStream: false,
		onWrite:   .setResetPingStrikes,
	}
	,  := .controlBuf.executeAndPut(func() bool { return .checkForHeaderListSize() }, )
	if ! {
		if  != nil {
			return 
		}
		.closeStream(, true, http2.ErrCodeInternal, false)
		return ErrHeaderListSizeLimitViolation
	}
	for ,  := range .stats {
		// Note: Headers are compressed with hpack after this call returns.
		// No WireLength field is set here.
		 := &stats.OutHeader{
			Header:      .header.Copy(),
			Compression: .sendCompress,
		}
		.HandleRPC(.Context(), )
	}
	return nil
}

// writeStatus sends stream status to the client and terminates the stream.
// There is no further I/O operations being able to perform on this stream.
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func ( *http2Server) ( *ServerStream,  *status.Status) error {
	.hdrMu.Lock()
	defer .hdrMu.Unlock()

	if .getState() == streamDone {
		return nil
	}

	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
	// first and create a slice of that exact size.
	 := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
	if !.updateHeaderSent() {                      // No headers have been sent.
		if len(.header) > 0 { // Send a separate header frame.
			if  := .writeHeaderLocked();  != nil {
				return 
			}
		} else { // Send a trailer only response.
			 = append(, hpack.HeaderField{Name: ":status", Value: "200"})
			 = append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
		}
	}
	 = append(, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(.Code()))})
	 = append(, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(.Message())})

	if  := istatus.RawStatusProto(); len(.GetDetails()) > 0 {
		// Do not use the user's grpc-status-details-bin (if present) if we are
		// even attempting to set our own.
		delete(.trailer, grpcStatusDetailsBinHeader)
		,  := proto.Marshal()
		if  != nil {
			// TODO: return error instead, when callers are able to handle it.
			.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(), )
		} else {
			 = append(, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader()})
		}
	}

	// Attach the trailer metadata.
	 = appendHeaderFieldsFromMD(, .trailer)
	 := &headerFrame{
		streamID:  .id,
		hf:        ,
		endStream: true,
		onWrite:   .setResetPingStrikes,
	}

	,  := .controlBuf.executeAndPut(func() bool {
		return .checkForHeaderListSize()
	}, nil)
	if ! {
		if  != nil {
			return 
		}
		.closeStream(, true, http2.ErrCodeInternal, false)
		return ErrHeaderListSizeLimitViolation
	}
	// Send a RST_STREAM after the trailers if the client has not already half-closed.
	 := .getState() == streamActive
	.finishStream(, , http2.ErrCodeNo, , true)
	for ,  := range .stats {
		// Note: The trailer fields are compressed with hpack after this call returns.
		// No WireLength field is set here.
		.HandleRPC(.Context(), &stats.OutTrailer{
			Trailer: .trailer.Copy(),
		})
	}
	return nil
}

// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func ( *http2Server) ( *ServerStream,  []byte,  mem.BufferSlice,  *WriteOptions) error {
	if !.isHeaderSent() { // Headers haven't been written yet.
		if  := .writeHeader(, nil);  != nil {
			return 
		}
	} else {
		// Writing headers checks for this condition.
		if .getState() == streamDone {
			return .streamContextErr()
		}
	}

	 := &dataFrame{
		streamID:    .id,
		h:           ,
		data:        ,
		onEachWrite: .setResetPingStrikes,
	}
	 := .Len()
	if  := .wq.get(int32(len() + ));  != nil {
		return .streamContextErr()
	}
	.Ref()
	if  := .controlBuf.put();  != nil {
		.Free()
		return 
	}
	.incrMsgSent()
	return nil
}

// keepalive running in a separate goroutine does the following:
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
// after an additional duration of keepalive.Timeout.
func ( *http2Server) () {
	 := &ping{}
	// True iff a ping has been sent, and no data has been received since then.
	 := false
	// Amount of time remaining before which we should receive an ACK for the
	// last sent ping.
	 := time.Duration(0)
	// Records the last value of t.lastRead before we go block on the timer.
	// This is required to check for read activity since then.
	 := time.Now().UnixNano()
	// Initialize the different timers to their default values.
	 := time.NewTimer(.kp.MaxConnectionIdle)
	 := time.NewTimer(.kp.MaxConnectionAge)
	 := time.NewTimer(.kp.Time)
	defer func() {
		// We need to drain the underlying channel in these timers after a call
		// to Stop(), only if we are interested in resetting them. Clearly we
		// are not interested in resetting them here.
		.Stop()
		.Stop()
		.Stop()
	}()

	for {
		select {
		case <-.C:
			.mu.Lock()
			 := .idle
			if .IsZero() { // The connection is non-idle.
				.mu.Unlock()
				.Reset(.kp.MaxConnectionIdle)
				continue
			}
			 := .kp.MaxConnectionIdle - time.Since()
			.mu.Unlock()
			if  <= 0 {
				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
				// Gracefully close the connection.
				.Drain("max_idle")
				return
			}
			.Reset()
		case <-.C:
			.Drain("max_age")
			.Reset(.kp.MaxConnectionAgeGrace)
			select {
			case <-.C:
				// Close the connection after grace period.
				if .logger.V(logLevel) {
					.logger.Infof("Closing server transport due to maximum connection age")
				}
				.controlBuf.put(closeConnection{})
			case <-.done:
			}
			return
		case <-.C:
			 := atomic.LoadInt64(&.lastRead)
			if  >  {
				// There has been read activity since the last time we were
				// here. Setup the timer to fire at kp.Time seconds from
				// lastRead time and continue.
				 = false
				.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
				 = 
				continue
			}
			if  &&  <= 0 {
				.Close(fmt.Errorf("keepalive ping not acked within timeout %s", .kp.Timeout))
				return
			}
			if ! {
				if channelz.IsOn() {
					.channelz.SocketMetrics.KeepAlivesSent.Add(1)
				}
				.controlBuf.put()
				 = .kp.Timeout
				 = true
			}
			// The amount of time to sleep here is the minimum of kp.Time and
			// timeoutLeft. This will ensure that we wait only for kp.Time
			// before sending out the next ping (for cases where the ping is
			// acked).
			 := min(.kp.Time, )
			 -= 
			.Reset()
		case <-.done:
			return
		}
	}
}

// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func ( *http2Server) ( error) {
	.mu.Lock()
	if .state == closing {
		.mu.Unlock()
		return
	}
	if .logger.V(logLevel) {
		.logger.Infof("Closing: %v", )
	}
	.state = closing
	 := .activeStreams
	.activeStreams = nil
	.mu.Unlock()
	.controlBuf.finish()
	close(.done)
	if  := .conn.Close();  != nil && .logger.V(logLevel) {
		.logger.Infof("Error closing underlying net.Conn during Close: %v", )
	}
	channelz.RemoveEntry(.channelz.ID)
	// Cancel all active streams.
	for ,  := range  {
		.cancel()
	}
}

// deleteStream deletes the stream s from transport's active streams.
func ( *http2Server) ( *ServerStream,  bool) {
	.mu.Lock()
	if ,  := .activeStreams[.id];  {
		delete(.activeStreams, .id)
		if len(.activeStreams) == 0 {
			.idle = time.Now()
		}
	}
	.mu.Unlock()

	if channelz.IsOn() {
		if  {
			.channelz.SocketMetrics.StreamsSucceeded.Add(1)
		} else {
			.channelz.SocketMetrics.StreamsFailed.Add(1)
		}
	}
}

// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func ( *http2Server) ( *ServerStream,  bool,  http2.ErrCode,  *headerFrame,  bool) {
	// In case stream sending and receiving are invoked in separate
	// goroutines (e.g., bi-directional streaming), cancel needs to be
	// called to interrupt the potential blocking on other goroutines.
	.cancel()

	 := .swapState(streamDone)
	if  == streamDone {
		// If the stream was already done, return.
		return
	}

	.cleanup = &cleanupStream{
		streamID: .id,
		rst:      ,
		rstCode:  ,
		onWrite: func() {
			.deleteStream(, )
		},
	}
	.controlBuf.put()
}

// closeStream clears the footprint of a stream when the stream is not needed any more.
func ( *http2Server) ( *ServerStream,  bool,  http2.ErrCode,  bool) {
	// In case stream sending and receiving are invoked in separate
	// goroutines (e.g., bi-directional streaming), cancel needs to be
	// called to interrupt the potential blocking on other goroutines.
	.cancel()

	 := .swapState(streamDone)
	if  == streamDone {
		return
	}
	.deleteStream(, )

	.controlBuf.put(&cleanupStream{
		streamID: .id,
		rst:      ,
		rstCode:  ,
		onWrite:  func() {},
	})
}

func ( *http2Server) ( string) {
	.mu.Lock()
	defer .mu.Unlock()
	if .drainEvent != nil {
		return
	}
	.drainEvent = grpcsync.NewEvent()
	.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(), headsUp: true})
}

var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}

// Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode.
func ( *http2Server) ( *goAway) (bool, error) {
	.maxStreamMu.Lock()
	.mu.Lock()
	if .state == closing { // TODO(mmukhi): This seems unnecessary.
		.mu.Unlock()
		.maxStreamMu.Unlock()
		// The transport is closing.
		return false, ErrConnClosing
	}
	if !.headsUp {
		// Stop accepting more streams now.
		.state = draining
		 := .maxStreamID
		 := .closeConn
		if len(.activeStreams) == 0 {
			 = errors.New("second GOAWAY written and no active streams left to process")
		}
		.mu.Unlock()
		.maxStreamMu.Unlock()
		if  := .framer.fr.WriteGoAway(, .code, .debugData);  != nil {
			return false, 
		}
		.framer.writer.Flush()
		if  != nil {
			return false, 
		}
		return true, nil
	}
	.mu.Unlock()
	.maxStreamMu.Unlock()
	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
	// Follow that with a ping and wait for the ack to come back or a timer
	// to expire. During this time accept new streams since they might have
	// originated before the GoAway reaches the client.
	// After getting the ack or timer expiration send out another GoAway this
	// time with an ID of the max stream server intends to process.
	if  := .framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, .debugData);  != nil {
		return false, 
	}
	if  := .framer.fr.WritePing(false, goAwayPing.data);  != nil {
		return false, 
	}
	go func() {
		 := time.NewTimer(5 * time.Second)
		defer .Stop()
		select {
		case <-.drainEvent.Done():
		case <-.C:
		case <-.done:
			return
		}
		.controlBuf.put(&goAway{code: .code, debugData: .debugData})
	}()
	return false, nil
}

func ( *http2Server) () *channelz.EphemeralSocketMetrics {
	return &channelz.EphemeralSocketMetrics{
		LocalFlowControlWindow:  int64(.fc.getSize()),
		RemoteFlowControlWindow: .getOutFlowWindow(),
	}
}

func ( *http2Server) () {
	if channelz.IsOn() {
		.channelz.SocketMetrics.MessagesSent.Add(1)
		.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
	}
}

func ( *http2Server) () {
	if channelz.IsOn() {
		.channelz.SocketMetrics.MessagesReceived.Add(1)
		.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
	}
}

func ( *http2Server) () int64 {
	 := make(chan uint32, 1)
	 := time.NewTimer(time.Second)
	defer .Stop()
	.controlBuf.put(&outFlowControlSizeRequest{})
	select {
	case  := <-:
		return int64()
	case <-.done:
		return -1
	case <-.C:
		return -2
	}
}

// Peer returns the peer of the transport.
func ( *http2Server) () *peer.Peer {
	return &peer.Peer{
		Addr:      .peer.Addr,
		LocalAddr: .peer.LocalAddr,
		AuthInfo:  .peer.AuthInfo, // Can be nil
	}
}

func ( time.Duration) time.Duration {
	if  == infinity {
		return 0
	}
	// Generate a jitter between +/- 10% of the value.
	 := int64( / 10)
	 := rand.Int64N(2*) - 
	return time.Duration()
}

type connectionKey struct{}

// GetConnection gets the connection from the context.
func ( context.Context) net.Conn {
	,  := .Value(connectionKey{}).(net.Conn)
	return 
}

// SetConnection adds the connection to the context to be able to get
// information about the destination ip and port for an incoming RPC. This also
// allows any unary or streaming interceptors to see the connection.
func ( context.Context,  net.Conn) context.Context {
	return context.WithValue(, connectionKey{}, )
}