/*
 *
 * Copyright 2014 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package transport

import (
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	

	
	
	
	
	
	
	
	
	
	
	
)

var (
	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
	// the stream's state.
	ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
	// than the limit set by peer.
	ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)

// serverConnectionCounter counts the number of connections a server has seen
// (equal to the number of http2Servers created). Must be accessed atomically.
var serverConnectionCounter uint64

// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically.
	ctx         context.Context
	done        chan struct{}
	conn        net.Conn
	loopy       *loopyWriter
	readerDone  chan struct{} // sync point to enable testing.
	writerDone  chan struct{} // sync point to enable testing.
	remoteAddr  net.Addr
	localAddr   net.Addr
	authInfo    credentials.AuthInfo // auth info about the connection
	inTapHandle tap.ServerInHandle
	framer      *framer
	// The max number of concurrent streams.
	maxStreams uint32
	// controlBuf delivers all the control related tasks (e.g., window
	// updates, reset streams, and various settings) to the controller.
	controlBuf *controlBuffer
	fc         *trInFlow
	stats      []stats.Handler
	// Keepalive and max-age parameters for the server.
	kp keepalive.ServerParameters
	// Keepalive enforcement policy.
	kep keepalive.EnforcementPolicy
	// The time instance last ping was received.
	lastPingAt time.Time
	// Number of times the client has violated keepalive ping policy so far.
	pingStrikes uint8
	// Flag to signify that number of ping strikes should be reset to 0.
	// This is set whenever data or header frames are sent.
	// 1 means yes.
	resetPingStrikes      uint32 // Accessed atomically.
	initialWindowSize     int32
	bdpEst                *bdpEstimator
	maxSendHeaderListSize *uint32

	mu sync.Mutex // guard the following

	// drainEvent is initialized when Drain() is called the first time. After
	// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
	// an independent goroutine will be launched to later send the second
	// GoAway. During this time we don't want to write another first GoAway(with
	// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
	// already initialized since draining is already underway.
	drainEvent    *grpcsync.Event
	state         transportState
	activeStreams map[uint32]*Stream
	// idle is the time instant when the connection went idle.
	// This is either the beginning of the connection or when the number of
	// RPCs go down to 0.
	// When the connection is busy, this value is set to 0.
	idle time.Time

	// Fields below are for channelz metric collection.
	channelzID *channelz.Identifier
	czData     *channelzData
	bufferPool *bufferPool

	connectionID uint64

	// maxStreamMu guards the maximum stream ID
	// This lock may not be taken if mu is already held.
	maxStreamMu sync.Mutex
	maxStreamID uint32 // max stream ID ever seen
}

// NewServerTransport creates a http2 transport with conn and configuration
// options from config.
//
// It returns a non-nil transport and a nil error on success. On failure, it
// returns a nil transport and a non-nil error. For a special case where the
// underlying conn gets closed before the client preface could be read, it
// returns a nil transport and a nil error.
func ( net.Conn,  *ServerConfig) ( ServerTransport,  error) {
	var  credentials.AuthInfo
	 := 
	if .Credentials != nil {
		var  error
		, ,  = .Credentials.ServerHandshake()
		if  != nil {
			// ErrConnDispatched means that the connection was dispatched away
			// from gRPC; those connections should be left open. io.EOF means
			// the connection was closed before handshaking completed, which can
			// happen naturally from probers. Return these errors directly.
			if  == credentials.ErrConnDispatched ||  == io.EOF {
				return nil, 
			}
			return nil, connectionErrorf(false, , "ServerHandshake(%q) failed: %v", .RemoteAddr(), )
		}
	}
	 := .WriteBufferSize
	 := .ReadBufferSize
	 := defaultServerMaxHeaderListSize
	if .MaxHeaderListSize != nil {
		 = *.MaxHeaderListSize
	}
	 := newFramer(, , , )
	// Send initial settings as connection preface to client.
	 := []http2.Setting{{
		ID:  http2.SettingMaxFrameSize,
		Val: http2MaxFrameLen,
	}}
	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
	// permitted in the HTTP2 spec.
	 := .MaxStreams
	if  == 0 {
		 = math.MaxUint32
	} else {
		 = append(, http2.Setting{
			ID:  http2.SettingMaxConcurrentStreams,
			Val: ,
		})
	}
	 := true
	 := int32(initialWindowSize)
	if .InitialWindowSize >= defaultWindowSize {
		 = .InitialWindowSize
		 = false
	}
	 := int32(initialWindowSize)
	if .InitialConnWindowSize >= defaultWindowSize {
		 = .InitialConnWindowSize
		 = false
	}
	if  != defaultWindowSize {
		 = append(, http2.Setting{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32()})
	}
	if .MaxHeaderListSize != nil {
		 = append(, http2.Setting{
			ID:  http2.SettingMaxHeaderListSize,
			Val: *.MaxHeaderListSize,
		})
	}
	if .HeaderTableSize != nil {
		 = append(, http2.Setting{
			ID:  http2.SettingHeaderTableSize,
			Val: *.HeaderTableSize,
		})
	}
	if  := .fr.WriteSettings(...);  != nil {
		return nil, connectionErrorf(false, , "transport: %v", )
	}
	// Adjust the connection flow control window if needed.
	if  := uint32( - defaultWindowSize);  > 0 {
		if  := .fr.WriteWindowUpdate(0, );  != nil {
			return nil, connectionErrorf(false, , "transport: %v", )
		}
	}
	 := .KeepaliveParams
	if .MaxConnectionIdle == 0 {
		.MaxConnectionIdle = defaultMaxConnectionIdle
	}
	if .MaxConnectionAge == 0 {
		.MaxConnectionAge = defaultMaxConnectionAge
	}
	// Add a jitter to MaxConnectionAge.
	.MaxConnectionAge += getJitter(.MaxConnectionAge)
	if .MaxConnectionAgeGrace == 0 {
		.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
	}
	if .Time == 0 {
		.Time = defaultServerKeepaliveTime
	}
	if .Timeout == 0 {
		.Timeout = defaultServerKeepaliveTimeout
	}
	if .Time != infinity {
		if  = syscall.SetTCPUserTimeout(, .Timeout);  != nil {
			return nil, connectionErrorf(false, , "transport: failed to set TCP_USER_TIMEOUT: %v", )
		}
	}
	 := .KeepalivePolicy
	if .MinTime == 0 {
		.MinTime = defaultKeepalivePolicyMinTime
	}

	 := make(chan struct{})
	 := &http2Server{
		ctx:               setConnection(context.Background(), ),
		done:              ,
		conn:              ,
		remoteAddr:        .RemoteAddr(),
		localAddr:         .LocalAddr(),
		authInfo:          ,
		framer:            ,
		readerDone:        make(chan struct{}),
		writerDone:        make(chan struct{}),
		maxStreams:        ,
		inTapHandle:       .InTapHandle,
		fc:                &trInFlow{limit: uint32()},
		state:             reachable,
		activeStreams:     make(map[uint32]*Stream),
		stats:             .StatsHandlers,
		kp:                ,
		idle:              time.Now(),
		kep:               ,
		initialWindowSize: ,
		czData:            new(channelzData),
		bufferPool:        newBufferPool(),
	}
	// Add peer information to the http2server context.
	.ctx = peer.NewContext(.ctx, .getPeer())

	.controlBuf = newControlBuffer(.done)
	if  {
		.bdpEst = &bdpEstimator{
			bdp:               initialWindowSize,
			updateFlowControl: .updateFlowControl,
		}
	}
	for ,  := range .stats {
		.ctx = .TagConn(.ctx, &stats.ConnTagInfo{
			RemoteAddr: .remoteAddr,
			LocalAddr:  .localAddr,
		})
		 := &stats.ConnBegin{}
		.HandleConn(.ctx, )
	}
	.channelzID,  = channelz.RegisterNormalSocket(, .ChannelzParentID, fmt.Sprintf("%s -> %s", .remoteAddr, .localAddr))
	if  != nil {
		return nil, 
	}

	.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
	.framer.writer.Flush()

	defer func() {
		if  != nil {
			.Close()
		}
	}()

	// Check the validity of client preface.
	 := make([]byte, len(clientPreface))
	if ,  := io.ReadFull(.conn, );  != nil {
		// In deployments where a gRPC server runs behind a cloud load balancer
		// which performs regular TCP level health checks, the connection is
		// closed immediately by the latter.  Returning io.EOF here allows the
		// grpc server implementation to recognize this scenario and suppress
		// logging to reduce spam.
		if  == io.EOF {
			return nil, io.EOF
		}
		return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to receive the preface from client: %v", )
	}
	if !bytes.Equal(, clientPreface) {
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", )
	}

	,  := .framer.fr.ReadFrame()
	if  == io.EOF ||  == io.ErrUnexpectedEOF {
		return nil, 
	}
	if  != nil {
		return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to read initial settings frame: %v", )
	}
	atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
	,  := .(*http2.SettingsFrame)
	if ! {
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", )
	}
	.handleSettings()

	go func() {
		.loopy = newLoopyWriter(serverSide, .framer, .controlBuf, .bdpEst)
		.loopy.ssGoAwayHandler = .outgoingGoAwayHandler
		 := .loopy.run()
		if logger.V(logLevel) {
			logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", )
		}
		.conn.Close()
		.controlBuf.finish()
		close(.writerDone)
	}()
	go .keepalive()
	return , nil
}

// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func ( *http2Server) ( *http2.MetaHeadersFrame,  func(*Stream),  func(context.Context, string) context.Context) error {
	// Acquire max stream ID lock for entire duration
	.maxStreamMu.Lock()
	defer .maxStreamMu.Unlock()

	 := .Header().StreamID

	// frame.Truncated is set to true when framer detects that the current header
	// list size hits MaxHeaderListSize limit.
	if .Truncated {
		.controlBuf.put(&cleanupStream{
			streamID: ,
			rst:      true,
			rstCode:  http2.ErrCodeFrameSize,
			onWrite:  func() {},
		})
		return nil
	}

	if %2 != 1 ||  <= .maxStreamID {
		// illegal gRPC stream id.
		return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", , )
	}
	.maxStreamID = 

	 := newRecvBuffer()
	 := &Stream{
		id:  ,
		st:  ,
		buf: ,
		fc:  &inFlow{limit: uint32(.initialWindowSize)},
	}
	var (
		// if false, content-type was missing or invalid
		      = false
		 = ""
		       = make(map[string][]string)
		  string
		// these are set if an error is encountered while parsing the headers
		 bool
		   *status.Status

		 bool
		    time.Duration
	)

	for ,  := range .Fields {
		switch .Name {
		case "content-type":
			,  := grpcutil.ContentSubtype(.Value)
			if ! {
				 = .Value
				break
			}
			[.Name] = append([.Name], .Value)
			.contentSubtype = 
			 = true
		case "grpc-encoding":
			.recvCompress = .Value
		case ":method":
			 = .Value
		case ":path":
			.method = .Value
		case "grpc-timeout":
			 = true
			var  error
			if ,  = decodeTimeout(.Value);  != nil {
				 = status.Newf(codes.Internal, "malformed grpc-timeout: %v", )
			}
		// "Transports must consider requests containing the Connection header
		// as malformed." - A41
		case "connection":
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
			}
			 = true
		default:
			if isReservedHeader(.Name) && !isWhitelistedHeader(.Name) {
				break
			}
			,  := decodeMetadataHeader(.Name, .Value)
			if  != nil {
				 = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", .Value, .Name, )
				logger.Warningf("Failed to decode metadata header (%q, %q): %v", .Name, .Value, )
				break
			}
			[.Name] = append([.Name], )
		}
	}

	// "If multiple Host headers or multiple :authority headers are present, the
	// request must be rejected with an HTTP status code 400 as required by Host
	// validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
	// with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
	// error, this takes precedence over a client not speaking gRPC.
	if len([":authority"]) > 1 || len(["host"]) > 1 {
		 := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len([":authority"]), len(["host"]))
		if logger.V(logLevel) {
			logger.Errorf("transport: %v", )
		}
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     http.StatusBadRequest,
			streamID:       ,
			contentSubtype: .contentSubtype,
			status:         status.New(codes.Internal, ),
			rst:            !.StreamEnded(),
		})
		return nil
	}

	if  {
		.controlBuf.put(&cleanupStream{
			streamID: ,
			rst:      true,
			rstCode:  http2.ErrCodeProtocol,
			onWrite:  func() {},
		})
		return nil
	}
	if ! {
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     http.StatusUnsupportedMediaType,
			streamID:       ,
			contentSubtype: .contentSubtype,
			status:         status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", ),
			rst:            !.StreamEnded(),
		})
		return nil
	}
	if  != nil {
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     http.StatusBadRequest,
			streamID:       ,
			contentSubtype: .contentSubtype,
			status:         ,
			rst:            !.StreamEnded(),
		})
		return nil
	}

	// "If :authority is missing, Host must be renamed to :authority." - A41
	if len([":authority"]) == 0 {
		// No-op if host isn't present, no eventual :authority header is a valid
		// RPC.
		if ,  := ["host"];  {
			[":authority"] = 
			delete(, "host")
		}
	} else {
		// "If :authority is present, Host must be discarded" - A41
		delete(, "host")
	}

	if .StreamEnded() {
		// s is just created by the caller. No lock needed.
		.state = streamReadDone
	}
	if  {
		.ctx, .cancel = context.WithTimeout(.ctx, )
	} else {
		.ctx, .cancel = context.WithCancel(.ctx)
	}

	// Attach the received metadata to the context.
	if len() > 0 {
		.ctx = metadata.NewIncomingContext(.ctx, )
		if  := ["grpc-tags-bin"]; len() > 0 {
			.ctx = stats.SetIncomingTags(.ctx, []byte([len()-1]))
		}
		if  := ["grpc-trace-bin"]; len() > 0 {
			.ctx = stats.SetIncomingTrace(.ctx, []byte([len()-1]))
		}
	}
	.mu.Lock()
	if .state != reachable {
		.mu.Unlock()
		.cancel()
		return nil
	}
	if uint32(len(.activeStreams)) >= .maxStreams {
		.mu.Unlock()
		.controlBuf.put(&cleanupStream{
			streamID: ,
			rst:      true,
			rstCode:  http2.ErrCodeRefusedStream,
			onWrite:  func() {},
		})
		.cancel()
		return nil
	}
	if  != http.MethodPost {
		.mu.Unlock()
		 := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", )
		if logger.V(logLevel) {
			logger.Infof("transport: %v", )
		}
		.controlBuf.put(&earlyAbortStream{
			httpStatus:     405,
			streamID:       ,
			contentSubtype: .contentSubtype,
			status:         status.New(codes.Internal, ),
			rst:            !.StreamEnded(),
		})
		.cancel()
		return nil
	}
	if .inTapHandle != nil {
		var  error
		if .ctx,  = .inTapHandle(.ctx, &tap.Info{FullMethodName: .method});  != nil {
			.mu.Unlock()
			if logger.V(logLevel) {
				logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", )
			}
			,  := status.FromError()
			if ! {
				 = status.New(codes.PermissionDenied, .Error())
			}
			.controlBuf.put(&earlyAbortStream{
				httpStatus:     200,
				streamID:       .id,
				contentSubtype: .contentSubtype,
				status:         ,
				rst:            !.StreamEnded(),
			})
			return nil
		}
	}
	.activeStreams[] = 
	if len(.activeStreams) == 1 {
		.idle = time.Time{}
	}
	.mu.Unlock()
	if channelz.IsOn() {
		atomic.AddInt64(&.czData.streamsStarted, 1)
		atomic.StoreInt64(&.czData.lastStreamCreatedTime, time.Now().UnixNano())
	}
	.requestRead = func( int) {
		.adjustWindow(, uint32())
	}
	.ctx = (.ctx, .method)
	for ,  := range .stats {
		.ctx = .TagRPC(.ctx, &stats.RPCTagInfo{FullMethodName: .method})
		 := &stats.InHeader{
			FullMethod:  .method,
			RemoteAddr:  .remoteAddr,
			LocalAddr:   .localAddr,
			Compression: .recvCompress,
			WireLength:  int(.Header().Length),
			Header:      metadata.MD().Copy(),
		}
		.HandleRPC(.ctx, )
	}
	.ctxDone = .ctx.Done()
	.wq = newWriteQuota(defaultWriteQuota, .ctxDone)
	.trReader = &transportReader{
		reader: &recvBufferReader{
			ctx:        .ctx,
			ctxDone:    .ctxDone,
			recv:       .buf,
			freeBuffer: .bufferPool.put,
		},
		windowHandler: func( int) {
			.updateWindow(, uint32())
		},
	}
	// Register the stream with loopy.
	.controlBuf.put(&registerStream{
		streamID: .id,
		wq:       .wq,
	})
	()
	return nil
}

// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func ( *http2Server) ( func(*Stream),  func(context.Context, string) context.Context) {
	defer close(.readerDone)
	for {
		.controlBuf.throttle()
		,  := .framer.fr.ReadFrame()
		atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
		if  != nil {
			if ,  := .(http2.StreamError);  {
				if logger.V(logLevel) {
					logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", )
				}
				.mu.Lock()
				 := .activeStreams[.StreamID]
				.mu.Unlock()
				if  != nil {
					.closeStream(, true, .Code, false)
				} else {
					.controlBuf.put(&cleanupStream{
						streamID: .StreamID,
						rst:      true,
						rstCode:  .Code,
						onWrite:  func() {},
					})
				}
				continue
			}
			if  == io.EOF ||  == io.ErrUnexpectedEOF {
				.Close()
				return
			}
			.Close()
			return
		}
		switch frame := .(type) {
		case *http2.MetaHeadersFrame:
			if  := .operateHeaders(, , );  != nil {
				.Close()
				break
			}
		case *http2.DataFrame:
			.handleData()
		case *http2.RSTStreamFrame:
			.handleRSTStream()
		case *http2.SettingsFrame:
			.handleSettings()
		case *http2.PingFrame:
			.handlePing()
		case *http2.WindowUpdateFrame:
			.handleWindowUpdate()
		case *http2.GoAwayFrame:
			// TODO: Handle GoAway from the client appropriately.
		default:
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", )
			}
		}
	}
}

func ( *http2Server) ( http2.Frame) (*Stream, bool) {
	.mu.Lock()
	defer .mu.Unlock()
	if .activeStreams == nil {
		// The transport is closing.
		return nil, false
	}
	,  := .activeStreams[.Header().StreamID]
	if ! {
		// The stream is already done.
		return nil, false
	}
	return , true
}

// adjustWindow sends out extra window update over the initial window size
// of stream if the application is requesting data larger in size than
// the window.
func ( *http2Server) ( *Stream,  uint32) {
	if  := .fc.maybeAdjust();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
	}

}

// updateWindow adjusts the inbound quota for the stream and the transport.
// Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold.
func ( *http2Server) ( *Stream,  uint32) {
	if  := .fc.onRead();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id,
			increment: ,
		})
	}
}

// updateFlowControl updates the incoming flow control windows
// for the transport and the stream based on the current bdp
// estimation.
func ( *http2Server) ( uint32) {
	.mu.Lock()
	for ,  := range .activeStreams {
		.fc.newLimit()
	}
	.initialWindowSize = int32()
	.mu.Unlock()
	.controlBuf.put(&outgoingWindowUpdate{
		streamID:  0,
		increment: .fc.newLimit(),
	})
	.controlBuf.put(&outgoingSettings{
		ss: []http2.Setting{
			{
				ID:  http2.SettingInitialWindowSize,
				Val: ,
			},
		},
	})

}

func ( *http2Server) ( *http2.DataFrame) {
	 := .Header().Length
	var  bool
	if .bdpEst != nil {
		 = .bdpEst.add()
	}
	// Decouple connection's flow control from application's read.
	// An update on connection's flow control should not depend on
	// whether user application has read the data or not. Such a
	// restriction is already imposed on the stream's flow control,
	// and therefore the sender will be blocked anyways.
	// Decoupling the connection flow control will prevent other
	// active(fast) streams from starving in presence of slow or
	// inactive streams.
	if  := .fc.onData();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{
			streamID:  0,
			increment: ,
		})
	}
	if  {
		// Avoid excessive ping detection (e.g. in an L7 proxy)
		// by sending a window update prior to the BDP ping.
		if  := .fc.reset();  > 0 {
			.controlBuf.put(&outgoingWindowUpdate{
				streamID:  0,
				increment: ,
			})
		}
		.controlBuf.put(bdpPing)
	}
	// Select the right stream to dispatch.
	,  := .getStream()
	if ! {
		return
	}
	if .getState() == streamReadDone {
		.closeStream(, true, http2.ErrCodeStreamClosed, false)
		return
	}
	if  > 0 {
		if  := .fc.onData();  != nil {
			.closeStream(, true, http2.ErrCodeFlowControl, false)
			return
		}
		if .Header().Flags.Has(http2.FlagDataPadded) {
			if  := .fc.onRead( - uint32(len(.Data())));  > 0 {
				.controlBuf.put(&outgoingWindowUpdate{.id, })
			}
		}
		// TODO(bradfitz, zhaoq): A copy is required here because there is no
		// guarantee f.Data() is consumed before the arrival of next frame.
		// Can this copy be eliminated?
		if len(.Data()) > 0 {
			 := .bufferPool.get()
			.Reset()
			.Write(.Data())
			.write(recvMsg{buffer: })
		}
	}
	if .StreamEnded() {
		// Received the end of stream from the client.
		.compareAndSwapState(streamActive, streamReadDone)
		.write(recvMsg{err: io.EOF})
	}
}

func ( *http2Server) ( *http2.RSTStreamFrame) {
	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
	if ,  := .getStream();  {
		.closeStream(, false, 0, false)
		return
	}
	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
	.controlBuf.put(&cleanupStream{
		streamID: .Header().StreamID,
		rst:      false,
		rstCode:  0,
		onWrite:  func() {},
	})
}

func ( *http2Server) ( *http2.SettingsFrame) {
	if .IsAck() {
		return
	}
	var  []http2.Setting
	var  []func()
	.ForeachSetting(func( http2.Setting) error {
		switch .ID {
		case http2.SettingMaxHeaderListSize:
			 = append(, func() {
				.maxSendHeaderListSize = new(uint32)
				*.maxSendHeaderListSize = .Val
			})
		default:
			 = append(, )
		}
		return nil
	})
	.controlBuf.executeAndPut(func(interface{}) bool {
		for ,  := range  {
			()
		}
		return true
	}, &incomingSettings{
		ss: ,
	})
}

const (
	maxPingStrikes     = 2
	defaultPingTimeout = 2 * time.Hour
)

func ( *http2Server) ( *http2.PingFrame) {
	if .IsAck() {
		if .Data == goAwayPing.data && .drainEvent != nil {
			.drainEvent.Fire()
			return
		}
		// Maybe it's a BDP ping.
		if .bdpEst != nil {
			.bdpEst.calculate(.Data)
		}
		return
	}
	 := &ping{ack: true}
	copy(.data[:], .Data[:])
	.controlBuf.put()

	 := time.Now()
	defer func() {
		.lastPingAt = 
	}()
	// A reset ping strikes means that we don't need to check for policy
	// violation for this ping and the pingStrikes counter should be set
	// to 0.
	if atomic.CompareAndSwapUint32(&.resetPingStrikes, 1, 0) {
		.pingStrikes = 0
		return
	}
	.mu.Lock()
	 := len(.activeStreams)
	.mu.Unlock()
	if  < 1 && !.kep.PermitWithoutStream {
		// Keepalive shouldn't be active thus, this new ping should
		// have come after at least defaultPingTimeout.
		if .lastPingAt.Add(defaultPingTimeout).After() {
			.pingStrikes++
		}
	} else {
		// Check if keepalive policy is respected.
		if .lastPingAt.Add(.kep.MinTime).After() {
			.pingStrikes++
		}
	}

	if .pingStrikes > maxPingStrikes {
		// Send goaway and close the connection.
		.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
	}
}

func ( *http2Server) ( *http2.WindowUpdateFrame) {
	.controlBuf.put(&incomingWindowUpdate{
		streamID:  .Header().StreamID,
		increment: .Increment,
	})
}

func ( []hpack.HeaderField,  metadata.MD) []hpack.HeaderField {
	for ,  := range  {
		if isReservedHeader() {
			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
			continue
		}
		for ,  := range  {
			 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
		}
	}
	return 
}

func ( *http2Server) ( interface{}) bool {
	if .maxSendHeaderListSize == nil {
		return true
	}
	 := .(*headerFrame)
	var  int64
	for ,  := range .hf {
		if  += int64(.Size());  > int64(*.maxSendHeaderListSize) {
			if logger.V(logLevel) {
				logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *.maxSendHeaderListSize)
			}
			return false
		}
	}
	return true
}

func ( *http2Server) ( *Stream) error {
	select {
	case <-.done:
		return ErrConnClosing
	default:
	}
	return ContextErr(.ctx.Err())
}

// WriteHeader sends the header metadata md back to the client.
func ( *http2Server) ( *Stream,  metadata.MD) error {
	.hdrMu.Lock()
	defer .hdrMu.Unlock()
	if .getState() == streamDone {
		return .streamContextErr()
	}

	if .updateHeaderSent() {
		return ErrIllegalHeaderWrite
	}

	if .Len() > 0 {
		if .header.Len() > 0 {
			.header = metadata.Join(.header, )
		} else {
			.header = 
		}
	}
	if  := .writeHeaderLocked();  != nil {
		return status.Convert().Err()
	}
	return nil
}

func ( *http2Server) () {
	atomic.StoreUint32(&.resetPingStrikes, 1)
}

func ( *http2Server) ( *Stream) error {
	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
	// first and create a slice of that exact size.
	 := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
	 = append(, hpack.HeaderField{Name: ":status", Value: "200"})
	 = append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
	if .sendCompress != "" {
		 = append(, hpack.HeaderField{Name: "grpc-encoding", Value: .sendCompress})
	}
	 = appendHeaderFieldsFromMD(, .header)
	,  := .controlBuf.executeAndPut(.checkForHeaderListSize, &headerFrame{
		streamID:  .id,
		hf:        ,
		endStream: false,
		onWrite:   .setResetPingStrikes,
	})
	if ! {
		if  != nil {
			return 
		}
		.closeStream(, true, http2.ErrCodeInternal, false)
		return ErrHeaderListSizeLimitViolation
	}
	for ,  := range .stats {
		// Note: Headers are compressed with hpack after this call returns.
		// No WireLength field is set here.
		 := &stats.OutHeader{
			Header:      .header.Copy(),
			Compression: .sendCompress,
		}
		.HandleRPC(.Context(), )
	}
	return nil
}

// WriteStatus sends stream status to the client and terminates the stream.
// There is no further I/O operations being able to perform on this stream.
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func ( *http2Server) ( *Stream,  *status.Status) error {
	.hdrMu.Lock()
	defer .hdrMu.Unlock()

	if .getState() == streamDone {
		return nil
	}

	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
	// first and create a slice of that exact size.
	 := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
	if !.updateHeaderSent() {                      // No headers have been sent.
		if len(.header) > 0 { // Send a separate header frame.
			if  := .writeHeaderLocked();  != nil {
				return 
			}
		} else { // Send a trailer only response.
			 = append(, hpack.HeaderField{Name: ":status", Value: "200"})
			 = append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
		}
	}
	 = append(, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(.Code()))})
	 = append(, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(.Message())})

	if  := .Proto();  != nil && len(.Details) > 0 {
		,  := proto.Marshal()
		if  != nil {
			// TODO: return error instead, when callers are able to handle it.
			logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", , )
		} else {
			 = append(, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader()})
		}
	}

	// Attach the trailer metadata.
	 = appendHeaderFieldsFromMD(, .trailer)
	 := &headerFrame{
		streamID:  .id,
		hf:        ,
		endStream: true,
		onWrite:   .setResetPingStrikes,
	}

	,  := .controlBuf.execute(.checkForHeaderListSize, )
	if ! {
		if  != nil {
			return 
		}
		.closeStream(, true, http2.ErrCodeInternal, false)
		return ErrHeaderListSizeLimitViolation
	}
	// Send a RST_STREAM after the trailers if the client has not already half-closed.
	 := .getState() == streamActive
	.finishStream(, , http2.ErrCodeNo, , true)
	for ,  := range .stats {
		// Note: The trailer fields are compressed with hpack after this call returns.
		// No WireLength field is set here.
		.HandleRPC(.Context(), &stats.OutTrailer{
			Trailer: .trailer.Copy(),
		})
	}
	return nil
}

// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func ( *http2Server) ( *Stream,  []byte,  []byte,  *Options) error {
	if !.isHeaderSent() { // Headers haven't been written yet.
		if  := .WriteHeader(, nil);  != nil {
			return 
		}
	} else {
		// Writing headers checks for this condition.
		if .getState() == streamDone {
			return .streamContextErr()
		}
	}
	 := &dataFrame{
		streamID:    .id,
		h:           ,
		d:           ,
		onEachWrite: .setResetPingStrikes,
	}
	if  := .wq.get(int32(len() + len()));  != nil {
		return .streamContextErr()
	}
	return .controlBuf.put()
}

// keepalive running in a separate goroutine does the following:
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
// after an additional duration of keepalive.Timeout.
func ( *http2Server) () {
	 := &ping{}
	// True iff a ping has been sent, and no data has been received since then.
	 := false
	// Amount of time remaining before which we should receive an ACK for the
	// last sent ping.
	 := time.Duration(0)
	// Records the last value of t.lastRead before we go block on the timer.
	// This is required to check for read activity since then.
	 := time.Now().UnixNano()
	// Initialize the different timers to their default values.
	 := time.NewTimer(.kp.MaxConnectionIdle)
	 := time.NewTimer(.kp.MaxConnectionAge)
	 := time.NewTimer(.kp.Time)
	defer func() {
		// We need to drain the underlying channel in these timers after a call
		// to Stop(), only if we are interested in resetting them. Clearly we
		// are not interested in resetting them here.
		.Stop()
		.Stop()
		.Stop()
	}()

	for {
		select {
		case <-.C:
			.mu.Lock()
			 := .idle
			if .IsZero() { // The connection is non-idle.
				.mu.Unlock()
				.Reset(.kp.MaxConnectionIdle)
				continue
			}
			 := .kp.MaxConnectionIdle - time.Since()
			.mu.Unlock()
			if  <= 0 {
				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
				// Gracefully close the connection.
				.Drain()
				return
			}
			.Reset()
		case <-.C:
			.Drain()
			.Reset(.kp.MaxConnectionAgeGrace)
			select {
			case <-.C:
				// Close the connection after grace period.
				if logger.V(logLevel) {
					logger.Infof("transport: closing server transport due to maximum connection age.")
				}
				.controlBuf.put(closeConnection{})
			case <-.done:
			}
			return
		case <-.C:
			 := atomic.LoadInt64(&.lastRead)
			if  >  {
				// There has been read activity since the last time we were
				// here. Setup the timer to fire at kp.Time seconds from
				// lastRead time and continue.
				 = false
				.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
				 = 
				continue
			}
			if  &&  <= 0 {
				.Close(fmt.Errorf("keepalive ping not acked within timeout %s", .kp.Time))
				return
			}
			if ! {
				if channelz.IsOn() {
					atomic.AddInt64(&.czData.kpCount, 1)
				}
				.controlBuf.put()
				 = .kp.Timeout
				 = true
			}
			// The amount of time to sleep here is the minimum of kp.Time and
			// timeoutLeft. This will ensure that we wait only for kp.Time
			// before sending out the next ping (for cases where the ping is
			// acked).
			 := minTime(.kp.Time, )
			 -= 
			.Reset()
		case <-.done:
			return
		}
	}
}

// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func ( *http2Server) ( error) {
	.mu.Lock()
	if .state == closing {
		.mu.Unlock()
		return
	}
	if logger.V(logLevel) {
		logger.Infof("transport: closing: %v", )
	}
	.state = closing
	 := .activeStreams
	.activeStreams = nil
	.mu.Unlock()
	.controlBuf.finish()
	close(.done)
	if  := .conn.Close();  != nil && logger.V(logLevel) {
		logger.Infof("transport: error closing conn during Close: %v", )
	}
	channelz.RemoveEntry(.channelzID)
	// Cancel all active streams.
	for ,  := range  {
		.cancel()
	}
	for ,  := range .stats {
		 := &stats.ConnEnd{}
		.HandleConn(.ctx, )
	}
}

// deleteStream deletes the stream s from transport's active streams.
func ( *http2Server) ( *Stream,  bool) {

	.mu.Lock()
	if ,  := .activeStreams[.id];  {
		delete(.activeStreams, .id)
		if len(.activeStreams) == 0 {
			.idle = time.Now()
		}
	}
	.mu.Unlock()

	if channelz.IsOn() {
		if  {
			atomic.AddInt64(&.czData.streamsSucceeded, 1)
		} else {
			atomic.AddInt64(&.czData.streamsFailed, 1)
		}
	}
}

// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func ( *http2Server) ( *Stream,  bool,  http2.ErrCode,  *headerFrame,  bool) {
	// In case stream sending and receiving are invoked in separate
	// goroutines (e.g., bi-directional streaming), cancel needs to be
	// called to interrupt the potential blocking on other goroutines.
	.cancel()

	 := .swapState(streamDone)
	if  == streamDone {
		// If the stream was already done, return.
		return
	}

	.cleanup = &cleanupStream{
		streamID: .id,
		rst:      ,
		rstCode:  ,
		onWrite: func() {
			.deleteStream(, )
		},
	}
	.controlBuf.put()
}

// closeStream clears the footprint of a stream when the stream is not needed any more.
func ( *http2Server) ( *Stream,  bool,  http2.ErrCode,  bool) {
	// In case stream sending and receiving are invoked in separate
	// goroutines (e.g., bi-directional streaming), cancel needs to be
	// called to interrupt the potential blocking on other goroutines.
	.cancel()

	.swapState(streamDone)
	.deleteStream(, )

	.controlBuf.put(&cleanupStream{
		streamID: .id,
		rst:      ,
		rstCode:  ,
		onWrite:  func() {},
	})
}

func ( *http2Server) () net.Addr {
	return .remoteAddr
}

func ( *http2Server) () {
	.mu.Lock()
	defer .mu.Unlock()
	if .drainEvent != nil {
		return
	}
	.drainEvent = grpcsync.NewEvent()
	.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
}

var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}

// Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode.
func ( *http2Server) ( *goAway) (bool, error) {
	.maxStreamMu.Lock()
	.mu.Lock()
	if .state == closing { // TODO(mmukhi): This seems unnecessary.
		.mu.Unlock()
		.maxStreamMu.Unlock()
		// The transport is closing.
		return false, ErrConnClosing
	}
	if !.headsUp {
		// Stop accepting more streams now.
		.state = draining
		 := .maxStreamID
		 := .closeConn
		if len(.activeStreams) == 0 {
			 = errors.New("second GOAWAY written and no active streams left to process")
		}
		.mu.Unlock()
		.maxStreamMu.Unlock()
		if  := .framer.fr.WriteGoAway(, .code, .debugData);  != nil {
			return false, 
		}
		if  != nil {
			// Abruptly close the connection following the GoAway (via
			// loopywriter).  But flush out what's inside the buffer first.
			.framer.writer.Flush()
			return false, 
		}
		return true, nil
	}
	.mu.Unlock()
	.maxStreamMu.Unlock()
	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
	// Follow that with a ping and wait for the ack to come back or a timer
	// to expire. During this time accept new streams since they might have
	// originated before the GoAway reaches the client.
	// After getting the ack or timer expiration send out another GoAway this
	// time with an ID of the max stream server intends to process.
	if  := .framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{});  != nil {
		return false, 
	}
	if  := .framer.fr.WritePing(false, goAwayPing.data);  != nil {
		return false, 
	}
	go func() {
		 := time.NewTimer(time.Minute)
		defer .Stop()
		select {
		case <-.drainEvent.Done():
		case <-.C:
		case <-.done:
			return
		}
		.controlBuf.put(&goAway{code: .code, debugData: .debugData})
	}()
	return false, nil
}

func ( *http2Server) () *channelz.SocketInternalMetric {
	 := channelz.SocketInternalMetric{
		StreamsStarted:                   atomic.LoadInt64(&.czData.streamsStarted),
		StreamsSucceeded:                 atomic.LoadInt64(&.czData.streamsSucceeded),
		StreamsFailed:                    atomic.LoadInt64(&.czData.streamsFailed),
		MessagesSent:                     atomic.LoadInt64(&.czData.msgSent),
		MessagesReceived:                 atomic.LoadInt64(&.czData.msgRecv),
		KeepAlivesSent:                   atomic.LoadInt64(&.czData.kpCount),
		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastStreamCreatedTime)),
		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&.czData.lastMsgSentTime)),
		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&.czData.lastMsgRecvTime)),
		LocalFlowControlWindow:           int64(.fc.getSize()),
		SocketOptions:                    channelz.GetSocketOption(.conn),
		LocalAddr:                        .localAddr,
		RemoteAddr:                       .remoteAddr,
		// RemoteName :
	}
	if ,  := .authInfo.(credentials.ChannelzSecurityInfo);  {
		.Security = .GetSecurityValue()
	}
	.RemoteFlowControlWindow = .getOutFlowWindow()
	return &
}

func ( *http2Server) () {
	atomic.AddInt64(&.czData.msgSent, 1)
	atomic.StoreInt64(&.czData.lastMsgSentTime, time.Now().UnixNano())
}

func ( *http2Server) () {
	atomic.AddInt64(&.czData.msgRecv, 1)
	atomic.StoreInt64(&.czData.lastMsgRecvTime, time.Now().UnixNano())
}

func ( *http2Server) () int64 {
	 := make(chan uint32, 1)
	 := time.NewTimer(time.Second)
	defer .Stop()
	.controlBuf.put(&outFlowControlSizeRequest{})
	select {
	case  := <-:
		return int64()
	case <-.done:
		return -1
	case <-.C:
		return -2
	}
}

func ( *http2Server) () *peer.Peer {
	return &peer.Peer{
		Addr:     .remoteAddr,
		AuthInfo: .authInfo, // Can be nil
	}
}

func ( time.Duration) time.Duration {
	if  == infinity {
		return 0
	}
	// Generate a jitter between +/- 10% of the value.
	 := int64( / 10)
	 := grpcrand.Int63n(2*) - 
	return time.Duration()
}

type connectionKey struct{}

// GetConnection gets the connection from the context.
func ( context.Context) net.Conn {
	,  := .Value(connectionKey{}).(net.Conn)
	return 
}

// SetConnection adds the connection to the context to be able to get
// information about the destination ip and port for an incoming RPC. This also
// allows any unary or streaming interceptors to see the connection.
func ( context.Context,  net.Conn) context.Context {
	return context.WithValue(, connectionKey{}, )
}