/*
 *
 * Copyright 2014 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package transport

import (
	
	
	
	
	
	
	

	
	
	
	
)

var updateHeaderTblSize = func( *hpack.Encoder,  uint32) {
	.SetMaxDynamicTableSizeLimit()
}

type itemNode struct {
	it   interface{}
	next *itemNode
}

type itemList struct {
	head *itemNode
	tail *itemNode
}

func ( *itemList) ( interface{}) {
	 := &itemNode{it: }
	if .tail == nil {
		.head, .tail = , 
		return
	}
	.tail.next = 
	.tail = 
}

// peek returns the first item in the list without removing it from the
// list.
func ( *itemList) () interface{} {
	return .head.it
}

func ( *itemList) () interface{} {
	if .head == nil {
		return nil
	}
	 := .head.it
	.head = .head.next
	if .head == nil {
		.tail = nil
	}
	return 
}

func ( *itemList) () *itemNode {
	 := .head
	.head, .tail = nil, nil
	return 
}

func ( *itemList) () bool {
	return .head == nil
}

// The following defines various control items which could flow through
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.

// maxQueuedTransportResponseFrames is the most queued "transport response"
// frames we will buffer before preventing new reads from occurring on the
// transport.  These are control frames sent in response to client requests,
// such as RST_STREAM due to bad headers or settings acks.
const maxQueuedTransportResponseFrames = 50

type cbItem interface {
	isTransportResponseFrame() bool
}

// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
	streamID uint32
	wq       *writeQuota
}

func (*registerStream) () bool { return false }

// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
	streamID   uint32
	hf         []hpack.HeaderField
	endStream  bool               // Valid on server side.
	initStream func(uint32) error // Used only on the client side.
	onWrite    func()
	wq         *writeQuota    // write quota for the stream created.
	cleanup    *cleanupStream // Valid on the server side.
	onOrphaned func(error)    // Valid on client-side
}

func ( *headerFrame) () bool {
	return .cleanup != nil && .cleanup.rst // Results in a RST_STREAM
}

type cleanupStream struct {
	streamID uint32
	rst      bool
	rstCode  http2.ErrCode
	onWrite  func()
}

func ( *cleanupStream) () bool { return .rst } // Results in a RST_STREAM

type earlyAbortStream struct {
	httpStatus     uint32
	streamID       uint32
	contentSubtype string
	status         *status.Status
	rst            bool
}

func (*earlyAbortStream) () bool { return false }

type dataFrame struct {
	streamID  uint32
	endStream bool
	h         []byte
	d         []byte
	// onEachWrite is called every time
	// a part of d is written out.
	onEachWrite func()
}

func (*dataFrame) () bool { return false }

type incomingWindowUpdate struct {
	streamID  uint32
	increment uint32
}

func (*incomingWindowUpdate) () bool { return false }

type outgoingWindowUpdate struct {
	streamID  uint32
	increment uint32
}

func (*outgoingWindowUpdate) () bool {
	return false // window updates are throttled by thresholds
}

type incomingSettings struct {
	ss []http2.Setting
}

func (*incomingSettings) () bool { return true } // Results in a settings ACK

type outgoingSettings struct {
	ss []http2.Setting
}

func (*outgoingSettings) () bool { return false }

type incomingGoAway struct {
}

func (*incomingGoAway) () bool { return false }

type goAway struct {
	code      http2.ErrCode
	debugData []byte
	headsUp   bool
	closeConn error // if set, loopyWriter will exit, resulting in conn closure
}

func (*goAway) () bool { return false }

type ping struct {
	ack  bool
	data [8]byte
}

func (*ping) () bool { return true }

type outFlowControlSizeRequest struct {
	resp chan uint32
}

func (*outFlowControlSizeRequest) () bool { return false }

// closeConnection is an instruction to tell the loopy writer to flush the
// framer and exit, which will cause the transport's connection to be closed
// (by the client or server).  The transport itself will close after the reader
// encounters the EOF caused by the connection closure.
type closeConnection struct{}

func (closeConnection) () bool { return false }

type outStreamState int

const (
	active outStreamState = iota
	empty
	waitingOnStreamQuota
)

type outStream struct {
	id               uint32
	state            outStreamState
	itl              *itemList
	bytesOutStanding int
	wq               *writeQuota

	next *outStream
	prev *outStream
}

func ( *outStream) () {
	if .prev != nil {
		.prev.next = .next
	}
	if .next != nil {
		.next.prev = .prev
	}
	.next, .prev = nil, nil
}

type outStreamList struct {
	// Following are sentinel objects that mark the
	// beginning and end of the list. They do not
	// contain any item lists. All valid objects are
	// inserted in between them.
	// This is needed so that an outStream object can
	// deleteSelf() in O(1) time without knowing which
	// list it belongs to.
	head *outStream
	tail *outStream
}

func () *outStreamList {
	,  := new(outStream), new(outStream)
	.next = 
	.prev = 
	return &outStreamList{
		head: ,
		tail: ,
	}
}

func ( *outStreamList) ( *outStream) {
	 := .tail.prev
	.next = 
	.prev = 
	.next = .tail
	.tail.prev = 
}

// remove from the beginning of the list.
func ( *outStreamList) () *outStream {
	 := .head.next
	if  == .tail {
		return nil
	}
	.deleteSelf()
	return 
}

// controlBuffer is a way to pass information to loopy.
// Information is passed as specific struct types called control frames.
// A control frame not only represents data, messages or headers to be sent out
// but can also be used to instruct loopy to update its internal state.
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
type controlBuffer struct {
	ch              chan struct{}
	done            <-chan struct{}
	mu              sync.Mutex
	consumerWaiting bool
	list            *itemList
	err             error

	// transportResponseFrames counts the number of queued items that represent
	// the response of an action initiated by the peer.  trfChan is created
	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
	// closed and nilled when transportResponseFrames drops below the
	// threshold.  Both fields are protected by mu.
	transportResponseFrames int
	trfChan                 atomic.Value // chan struct{}
}

func ( <-chan struct{}) *controlBuffer {
	return &controlBuffer{
		ch:   make(chan struct{}, 1),
		list: &itemList{},
		done: ,
	}
}

// throttle blocks if there are too many incomingSettings/cleanupStreams in the
// controlbuf.
func ( *controlBuffer) () {
	,  := .trfChan.Load().(chan struct{})
	if  != nil {
		select {
		case <-:
		case <-.done:
		}
	}
}

func ( *controlBuffer) ( cbItem) error {
	,  := .executeAndPut(nil, )
	return 
}

func ( *controlBuffer) ( func( interface{}) bool,  cbItem) (bool, error) {
	var  bool
	.mu.Lock()
	if .err != nil {
		.mu.Unlock()
		return false, .err
	}
	if  != nil {
		if !() { // f wasn't successful
			.mu.Unlock()
			return false, nil
		}
	}
	if .consumerWaiting {
		 = true
		.consumerWaiting = false
	}
	.list.enqueue()
	if .isTransportResponseFrame() {
		.transportResponseFrames++
		if .transportResponseFrames == maxQueuedTransportResponseFrames {
			// We are adding the frame that puts us over the threshold; create
			// a throttling channel.
			.trfChan.Store(make(chan struct{}))
		}
	}
	.mu.Unlock()
	if  {
		select {
		case .ch <- struct{}{}:
		default:
		}
	}
	return true, nil
}

// Note argument f should never be nil.
func ( *controlBuffer) ( func( interface{}) bool,  interface{}) (bool, error) {
	.mu.Lock()
	if .err != nil {
		.mu.Unlock()
		return false, .err
	}
	if !() { // f wasn't successful
		.mu.Unlock()
		return false, nil
	}
	.mu.Unlock()
	return true, nil
}

func ( *controlBuffer) ( bool) (interface{}, error) {
	for {
		.mu.Lock()
		if .err != nil {
			.mu.Unlock()
			return nil, .err
		}
		if !.list.isEmpty() {
			 := .list.dequeue().(cbItem)
			if .isTransportResponseFrame() {
				if .transportResponseFrames == maxQueuedTransportResponseFrames {
					// We are removing the frame that put us over the
					// threshold; close and clear the throttling channel.
					 := .trfChan.Load().(chan struct{})
					close()
					.trfChan.Store((chan struct{})(nil))
				}
				.transportResponseFrames--
			}
			.mu.Unlock()
			return , nil
		}
		if ! {
			.mu.Unlock()
			return nil, nil
		}
		.consumerWaiting = true
		.mu.Unlock()
		select {
		case <-.ch:
		case <-.done:
			return nil, errors.New("transport closed by client")
		}
	}
}

func ( *controlBuffer) () {
	.mu.Lock()
	if .err != nil {
		.mu.Unlock()
		return
	}
	.err = ErrConnClosing
	// There may be headers for streams in the control buffer.
	// These streams need to be cleaned out since the transport
	// is still not aware of these yet.
	for  := .list.dequeueAll();  != nil;  = .next {
		,  := .it.(*headerFrame)
		if ! {
			continue
		}
		if .onOrphaned != nil { // It will be nil on the server-side.
			.onOrphaned(ErrConnClosing)
		}
	}
	// In case throttle() is currently in flight, it needs to be unblocked.
	// Otherwise, the transport may not close, since the transport is closed by
	// the reader encountering the connection error.
	,  := .trfChan.Load().(chan struct{})
	if  != nil {
		close()
	}
	.trfChan.Store((chan struct{})(nil))
	.mu.Unlock()
}

type side int

const (
	clientSide side = iota
	serverSide
)

// Loopy receives frames from the control buffer.
// Each frame is handled individually; most of the work done by loopy goes
// into handling data frames. Loopy maintains a queue of active streams, and each
// stream maintains a queue of data frames; as loopy receives data frames
// it gets added to the queue of the relevant stream.
// Loopy goes over this list of active streams by processing one node every iteration,
// thereby closely resemebling to a round-robin scheduling over all streams. While
// processing a stream, loopy writes out data bytes from this stream capped by the min
// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
type loopyWriter struct {
	side      side
	cbuf      *controlBuffer
	sendQuota uint32
	oiws      uint32 // outbound initial window size.
	// estdStreams is map of all established streams that are not cleaned-up yet.
	// On client-side, this is all streams whose headers were sent out.
	// On server-side, this is all streams whose headers were received.
	estdStreams map[uint32]*outStream // Established streams.
	// activeStreams is a linked-list of all streams that have data to send and some
	// stream-level flow control quota.
	// Each of these streams internally have a list of data items(and perhaps trailers
	// on the server-side) to be sent out.
	activeStreams *outStreamList
	framer        *framer
	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
	hEnc          *hpack.Encoder // HPACK encoder.
	bdpEst        *bdpEstimator
	draining      bool

	// Side-specific handlers
	ssGoAwayHandler func(*goAway) (bool, error)
}

func ( side,  *framer,  *controlBuffer,  *bdpEstimator) *loopyWriter {
	var  bytes.Buffer
	 := &loopyWriter{
		side:          ,
		cbuf:          ,
		sendQuota:     defaultWindowSize,
		oiws:          defaultWindowSize,
		estdStreams:   make(map[uint32]*outStream),
		activeStreams: newOutStreamList(),
		framer:        ,
		hBuf:          &,
		hEnc:          hpack.NewEncoder(&),
		bdpEst:        ,
	}
	return 
}

const minBatchSize = 1000

// run should be run in a separate goroutine.
// It reads control frames from controlBuf and processes them by:
// 1. Updating loopy's internal state, or/and
// 2. Writing out HTTP2 frames on the wire.
//
// Loopy keeps all active streams with data to send in a linked-list.
// All streams in the activeStreams linked-list must have both:
// 1. Data to send, and
// 2. Stream level flow control quota available.
//
// In each iteration of run loop, other than processing the incoming control
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
// This results in writing of HTTP2 frames into an underlying write buffer.
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func ( *loopyWriter) () ( error) {
	// Always flush the writer before exiting in case there are pending frames
	// to be sent.
	defer .framer.writer.Flush()
	for {
		,  := .cbuf.get(true)
		if  != nil {
			return 
		}
		if  = .handle();  != nil {
			return 
		}
		if _,  = .processData();  != nil {
			return 
		}
		 := true
	:
		for {
			,  := .cbuf.get(false)
			if  != nil {
				return 
			}
			if  != nil {
				if  = .handle();  != nil {
					return 
				}
				if _,  = .processData();  != nil {
					return 
				}
				continue 
			}
			,  := .processData()
			if  != nil {
				return 
			}
			if ! {
				continue 
			}
			if  {
				 = false
				if .framer.writer.offset < minBatchSize {
					runtime.Gosched()
					continue 
				}
			}
			.framer.writer.Flush()
			break 
		}
	}
}

func ( *loopyWriter) ( *outgoingWindowUpdate) error {
	return .framer.fr.WriteWindowUpdate(.streamID, .increment)
}

func ( *loopyWriter) ( *incomingWindowUpdate) error {
	// Otherwise update the quota.
	if .streamID == 0 {
		.sendQuota += .increment
		return nil
	}
	// Find the stream and update it.
	if ,  := .estdStreams[.streamID];  {
		.bytesOutStanding -= int(.increment)
		if  := int(.oiws) - .bytesOutStanding;  > 0 && .state == waitingOnStreamQuota {
			.state = active
			.activeStreams.enqueue()
			return nil
		}
	}
	return nil
}

func ( *loopyWriter) ( *outgoingSettings) error {
	return .framer.fr.WriteSettings(.ss...)
}

func ( *loopyWriter) ( *incomingSettings) error {
	if  := .applySettings(.ss);  != nil {
		return 
	}
	return .framer.fr.WriteSettingsAck()
}

func ( *loopyWriter) ( *registerStream) error {
	 := &outStream{
		id:    .streamID,
		state: empty,
		itl:   &itemList{},
		wq:    .wq,
	}
	.estdStreams[.streamID] = 
	return nil
}

func ( *loopyWriter) ( *headerFrame) error {
	if .side == serverSide {
		,  := .estdStreams[.streamID]
		if ! {
			if logger.V(logLevel) {
				logger.Warningf("transport: loopy doesn't recognize the stream: %d", .streamID)
			}
			return nil
		}
		// Case 1.A: Server is responding back with headers.
		if !.endStream {
			return .writeHeader(.streamID, .endStream, .hf, .onWrite)
		}
		// else:  Case 1.B: Server wants to close stream.

		if .state != empty { // either active or waiting on stream quota.
			// add it str's list of items.
			.itl.enqueue()
			return nil
		}
		if  := .writeHeader(.streamID, .endStream, .hf, .onWrite);  != nil {
			return 
		}
		return .cleanupStreamHandler(.cleanup)
	}
	// Case 2: Client wants to originate stream.
	 := &outStream{
		id:    .streamID,
		state: empty,
		itl:   &itemList{},
		wq:    .wq,
	}
	return .originateStream(, )
}

func ( *loopyWriter) ( *outStream,  *headerFrame) error {
	// l.draining is set when handling GoAway. In which case, we want to avoid
	// creating new streams.
	if .draining {
		// TODO: provide a better error with the reason we are in draining.
		.onOrphaned(errStreamDrain)
		return nil
	}
	if  := .initStream(.id);  != nil {
		return 
	}
	if  := .writeHeader(.id, .endStream, .hf, .onWrite);  != nil {
		return 
	}
	.estdStreams[.id] = 
	return nil
}

func ( *loopyWriter) ( uint32,  bool,  []hpack.HeaderField,  func()) error {
	if  != nil {
		()
	}
	.hBuf.Reset()
	for ,  := range  {
		if  := .hEnc.WriteField();  != nil {
			if logger.V(logLevel) {
				logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", )
			}
		}
	}
	var (
		               error
		,  bool
	)
	 = true
	for ! {
		 := .hBuf.Len()
		if  > http2MaxFrameLen {
			 = http2MaxFrameLen
		} else {
			 = true
		}
		if  {
			 = false
			 = .framer.fr.WriteHeaders(http2.HeadersFrameParam{
				StreamID:      ,
				BlockFragment: .hBuf.Next(),
				EndStream:     ,
				EndHeaders:    ,
			})
		} else {
			 = .framer.fr.WriteContinuation(
				,
				,
				.hBuf.Next(),
			)
		}
		if  != nil {
			return 
		}
	}
	return nil
}

func ( *loopyWriter) ( *dataFrame) error {
	,  := .estdStreams[.streamID]
	if ! {
		return nil
	}
	// If we got data for a stream it means that
	// stream was originated and the headers were sent out.
	.itl.enqueue()
	if .state == empty {
		.state = active
		.activeStreams.enqueue()
	}
	return nil
}

func ( *loopyWriter) ( *ping) error {
	if !.ack {
		.bdpEst.timesnap(.data)
	}
	return .framer.fr.WritePing(.ack, .data)

}

func ( *loopyWriter) ( *outFlowControlSizeRequest) error {
	.resp <- .sendQuota
	return nil
}

func ( *loopyWriter) ( *cleanupStream) error {
	.onWrite()
	if ,  := .estdStreams[.streamID];  {
		// On the server side it could be a trailers-only response or
		// a RST_STREAM before stream initialization thus the stream might
		// not be established yet.
		delete(.estdStreams, .streamID)
		.deleteSelf()
	}
	if .rst { // If RST_STREAM needs to be sent.
		if  := .framer.fr.WriteRSTStream(.streamID, .rstCode);  != nil {
			return 
		}
	}
	if .draining && len(.estdStreams) == 0 {
		return errors.New("finished processing active streams while in draining mode")
	}
	return nil
}

func ( *loopyWriter) ( *earlyAbortStream) error {
	if .side == clientSide {
		return errors.New("earlyAbortStream not handled on client")
	}
	// In case the caller forgets to set the http status, default to 200.
	if .httpStatus == 0 {
		.httpStatus = 200
	}
	 := []hpack.HeaderField{
		{Name: ":status", Value: strconv.Itoa(int(.httpStatus))},
		{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)},
		{Name: "grpc-status", Value: strconv.Itoa(int(.status.Code()))},
		{Name: "grpc-message", Value: encodeGrpcMessage(.status.Message())},
	}

	if  := .writeHeader(.streamID, true, , nil);  != nil {
		return 
	}
	if .rst {
		if  := .framer.fr.WriteRSTStream(.streamID, http2.ErrCodeNo);  != nil {
			return 
		}
	}
	return nil
}

func ( *loopyWriter) (*incomingGoAway) error {
	if .side == clientSide {
		.draining = true
		if len(.estdStreams) == 0 {
			return errors.New("received GOAWAY with no active streams")
		}
	}
	return nil
}

func ( *loopyWriter) ( *goAway) error {
	// Handling of outgoing GoAway is very specific to side.
	if .ssGoAwayHandler != nil {
		,  := .ssGoAwayHandler()
		if  != nil {
			return 
		}
		.draining = 
	}
	return nil
}

func ( *loopyWriter) () error {
	// Exit loopyWriter entirely by returning an error here.  This will lead to
	// the transport closing the connection, and, ultimately, transport
	// closure.
	return ErrConnClosing
}

func ( *loopyWriter) ( interface{}) error {
	switch i := .(type) {
	case *incomingWindowUpdate:
		return .incomingWindowUpdateHandler()
	case *outgoingWindowUpdate:
		return .outgoingWindowUpdateHandler()
	case *incomingSettings:
		return .incomingSettingsHandler()
	case *outgoingSettings:
		return .outgoingSettingsHandler()
	case *headerFrame:
		return .headerHandler()
	case *registerStream:
		return .registerStreamHandler()
	case *cleanupStream:
		return .cleanupStreamHandler()
	case *earlyAbortStream:
		return .earlyAbortStreamHandler()
	case *incomingGoAway:
		return .incomingGoAwayHandler()
	case *dataFrame:
		return .preprocessData()
	case *ping:
		return .pingHandler()
	case *goAway:
		return .goAwayHandler()
	case *outFlowControlSizeRequest:
		return .outFlowControlSizeRequestHandler()
	case closeConnection:
		return .closeConnectionHandler()
	default:
		return fmt.Errorf("transport: unknown control message type %T", )
	}
}

func ( *loopyWriter) ( []http2.Setting) error {
	for ,  := range  {
		switch .ID {
		case http2.SettingInitialWindowSize:
			 := .oiws
			.oiws = .Val
			if  < .oiws {
				// If the new limit is greater make all depleted streams active.
				for ,  := range .estdStreams {
					if .state == waitingOnStreamQuota {
						.state = active
						.activeStreams.enqueue()
					}
				}
			}
		case http2.SettingHeaderTableSize:
			updateHeaderTblSize(.hEnc, .Val)
		}
	}
	return nil
}

// processData removes the first stream from active streams, writes out at most 16KB
// of its data and then puts it at the end of activeStreams if there's still more data
// to be sent and stream has some stream-level flow control.
func ( *loopyWriter) () (bool, error) {
	if .sendQuota == 0 {
		return true, nil
	}
	 := .activeStreams.dequeue() // Remove the first stream.
	if  == nil {
		return true, nil
	}
	 := .itl.peek().(*dataFrame) // Peek at the first data item this stream.
	// A data item is represented by a dataFrame, since it later translates into
	// multiple HTTP2 data frames.
	// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
	// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
	// maximum possible HTTP2 frame size.

	if len(.h) == 0 && len(.d) == 0 { // Empty data frame
		// Client sends out empty data frame with endStream = true
		if  := .framer.fr.WriteData(.streamID, .endStream, nil);  != nil {
			return false, 
		}
		.itl.dequeue() // remove the empty data item from stream
		if .itl.isEmpty() {
			.state = empty
		} else if ,  := .itl.peek().(*headerFrame);  { // the next item is trailers.
			if  := .writeHeader(.streamID, .endStream, .hf, .onWrite);  != nil {
				return false, 
			}
			if  := .cleanupStreamHandler(.cleanup);  != nil {
				return false, nil
			}
		} else {
			.activeStreams.enqueue()
		}
		return false, nil
	}
	var (
		 []byte
	)
	// Figure out the maximum size we can send
	 := http2MaxFrameLen
	if  := int(.oiws) - .bytesOutStanding;  <= 0 { // stream-level flow control.
		.state = waitingOnStreamQuota
		return false, nil
	} else if  >  {
		 = 
	}
	if  > int(.sendQuota) { // connection-level flow control.
		 = int(.sendQuota)
	}
	// Compute how much of the header and data we can send within quota and max frame length
	 := min(, len(.h))
	 := min(-, len(.d))
	if  != 0 {
		if  == 0 {
			 = .h
		} else {
			// We can add some data to grpc message header to distribute bytes more equally across frames.
			// Copy on the stack to avoid generating garbage
			var  [http2MaxFrameLen]byte
			copy([:], .h)
			copy([:], .d[:])
			 = [:+]
		}
	} else {
		 = .d
	}

	 :=  + 

	// Now that outgoing flow controls are checked we can replenish str's write quota
	.wq.replenish()
	var  bool
	// If this is the last data message on this stream and all of it can be written in this iteration.
	if .endStream && len(.h)+len(.d) <=  {
		 = true
	}
	if .onEachWrite != nil {
		.onEachWrite()
	}
	if  := .framer.fr.WriteData(.streamID, , [:]);  != nil {
		return false, 
	}
	.bytesOutStanding += 
	.sendQuota -= uint32()
	.h = .h[:]
	.d = .d[:]

	if len(.h) == 0 && len(.d) == 0 { // All the data from that message was written out.
		.itl.dequeue()
	}
	if .itl.isEmpty() {
		.state = empty
	} else if ,  := .itl.peek().(*headerFrame);  { // The next item is trailers.
		if  := .writeHeader(.streamID, .endStream, .hf, .onWrite);  != nil {
			return false, 
		}
		if  := .cleanupStreamHandler(.cleanup);  != nil {
			return false, 
		}
	} else if int(.oiws)-.bytesOutStanding <= 0 { // Ran out of stream quota.
		.state = waitingOnStreamQuota
	} else { // Otherwise add it back to the list of active streams.
		.activeStreams.enqueue()
	}
	return false, nil
}

func (,  int) int {
	if  <  {
		return 
	}
	return 
}