package transport
import (
)
var updateHeaderTblSize = func( *hpack.Encoder, uint32) {
.SetMaxDynamicTableSizeLimit()
}
type itemNode struct {
it interface{}
next *itemNode
}
type itemList struct {
head *itemNode
tail *itemNode
}
func ( *itemList) ( interface{}) {
:= &itemNode{it: }
if .tail == nil {
.head, .tail = ,
return
}
.tail.next =
.tail =
}
func ( *itemList) () interface{} {
return .head.it
}
func ( *itemList) () interface{} {
if .head == nil {
return nil
}
:= .head.it
.head = .head.next
if .head == nil {
.tail = nil
}
return
}
func ( *itemList) () *itemNode {
:= .head
.head, .tail = nil, nil
return
}
func ( *itemList) () bool {
return .head == nil
}
const maxQueuedTransportResponseFrames = 50
type cbItem interface {
isTransportResponseFrame() bool
}
type registerStream struct {
streamID uint32
wq *writeQuota
}
func (*registerStream) () bool { return false }
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
endStream bool
initStream func(uint32) error
onWrite func()
wq *writeQuota
cleanup *cleanupStream
onOrphaned func(error)
}
func ( *headerFrame) () bool {
return .cleanup != nil && .cleanup.rst
}
type cleanupStream struct {
streamID uint32
rst bool
rstCode http2.ErrCode
onWrite func()
}
func ( *cleanupStream) () bool { return .rst }
type earlyAbortStream struct {
httpStatus uint32
streamID uint32
contentSubtype string
status *status.Status
rst bool
}
func (*earlyAbortStream) () bool { return false }
type dataFrame struct {
streamID uint32
endStream bool
h []byte
d []byte
onEachWrite func()
}
func (*dataFrame) () bool { return false }
type incomingWindowUpdate struct {
streamID uint32
increment uint32
}
func (*incomingWindowUpdate) () bool { return false }
type outgoingWindowUpdate struct {
streamID uint32
increment uint32
}
func (*outgoingWindowUpdate) () bool {
return false
}
type incomingSettings struct {
ss []http2.Setting
}
func (*incomingSettings) () bool { return true }
type outgoingSettings struct {
ss []http2.Setting
}
func (*outgoingSettings) () bool { return false }
type incomingGoAway struct {
}
func (*incomingGoAway) () bool { return false }
type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn error
}
func (*goAway) () bool { return false }
type ping struct {
ack bool
data [8]byte
}
func (*ping) () bool { return true }
type outFlowControlSizeRequest struct {
resp chan uint32
}
func (*outFlowControlSizeRequest) () bool { return false }
type closeConnection struct{}
func (closeConnection) () bool { return false }
type outStreamState int
const (
active outStreamState = iota
empty
waitingOnStreamQuota
)
type outStream struct {
id uint32
state outStreamState
itl *itemList
bytesOutStanding int
wq *writeQuota
next *outStream
prev *outStream
}
func ( *outStream) () {
if .prev != nil {
.prev.next = .next
}
if .next != nil {
.next.prev = .prev
}
.next, .prev = nil, nil
}
type outStreamList struct {
head *outStream
tail *outStream
}
func () *outStreamList {
, := new(outStream), new(outStream)
.next =
.prev =
return &outStreamList{
head: ,
tail: ,
}
}
func ( *outStreamList) ( *outStream) {
:= .tail.prev
.next =
.prev =
.next = .tail
.tail.prev =
}
func ( *outStreamList) () *outStream {
:= .head.next
if == .tail {
return nil
}
.deleteSelf()
return
}
type controlBuffer struct {
ch chan struct{}
done <-chan struct{}
mu sync.Mutex
consumerWaiting bool
list *itemList
err error
transportResponseFrames int
trfChan atomic.Value
}
func ( <-chan struct{}) *controlBuffer {
return &controlBuffer{
ch: make(chan struct{}, 1),
list: &itemList{},
done: ,
}
}
func ( *controlBuffer) () {
, := .trfChan.Load().(chan struct{})
if != nil {
select {
case <-:
case <-.done:
}
}
}
func ( *controlBuffer) ( cbItem) error {
, := .executeAndPut(nil, )
return
}
func ( *controlBuffer) ( func( interface{}) bool, cbItem) (bool, error) {
var bool
.mu.Lock()
if .err != nil {
.mu.Unlock()
return false, .err
}
if != nil {
if !() {
.mu.Unlock()
return false, nil
}
}
if .consumerWaiting {
= true
.consumerWaiting = false
}
.list.enqueue()
if .isTransportResponseFrame() {
.transportResponseFrames++
if .transportResponseFrames == maxQueuedTransportResponseFrames {
.trfChan.Store(make(chan struct{}))
}
}
.mu.Unlock()
if {
select {
case .ch <- struct{}{}:
default:
}
}
return true, nil
}
func ( *controlBuffer) ( func( interface{}) bool, interface{}) (bool, error) {
.mu.Lock()
if .err != nil {
.mu.Unlock()
return false, .err
}
if !() {
.mu.Unlock()
return false, nil
}
.mu.Unlock()
return true, nil
}
func ( *controlBuffer) ( bool) (interface{}, error) {
for {
.mu.Lock()
if .err != nil {
.mu.Unlock()
return nil, .err
}
if !.list.isEmpty() {
:= .list.dequeue().(cbItem)
if .isTransportResponseFrame() {
if .transportResponseFrames == maxQueuedTransportResponseFrames {
:= .trfChan.Load().(chan struct{})
close()
.trfChan.Store((chan struct{})(nil))
}
.transportResponseFrames--
}
.mu.Unlock()
return , nil
}
if ! {
.mu.Unlock()
return nil, nil
}
.consumerWaiting = true
.mu.Unlock()
select {
case <-.ch:
case <-.done:
return nil, errors.New("transport closed by client")
}
}
}
func ( *controlBuffer) () {
.mu.Lock()
if .err != nil {
.mu.Unlock()
return
}
.err = ErrConnClosing
for := .list.dequeueAll(); != nil; = .next {
, := .it.(*headerFrame)
if ! {
continue
}
if .onOrphaned != nil {
.onOrphaned(ErrConnClosing)
}
}
, := .trfChan.Load().(chan struct{})
if != nil {
close()
}
.trfChan.Store((chan struct{})(nil))
.mu.Unlock()
}
type side int
const (
clientSide side = iota
serverSide
)
type loopyWriter struct {
side side
cbuf *controlBuffer
sendQuota uint32
oiws uint32
estdStreams map[uint32]*outStream
activeStreams *outStreamList
framer *framer
hBuf *bytes.Buffer
hEnc *hpack.Encoder
bdpEst *bdpEstimator
draining bool
ssGoAwayHandler func(*goAway) (bool, error)
}
func ( side, *framer, *controlBuffer, *bdpEstimator) *loopyWriter {
var bytes.Buffer
:= &loopyWriter{
side: ,
cbuf: ,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: ,
hBuf: &,
hEnc: hpack.NewEncoder(&),
bdpEst: ,
}
return
}
const minBatchSize = 1000
func ( *loopyWriter) () ( error) {
defer .framer.writer.Flush()
for {
, := .cbuf.get(true)
if != nil {
return
}
if = .handle(); != nil {
return
}
if _, = .processData(); != nil {
return
}
:= true
:
for {
, := .cbuf.get(false)
if != nil {
return
}
if != nil {
if = .handle(); != nil {
return
}
if _, = .processData(); != nil {
return
}
continue
}
, := .processData()
if != nil {
return
}
if ! {
continue
}
if {
= false
if .framer.writer.offset < minBatchSize {
runtime.Gosched()
continue
}
}
.framer.writer.Flush()
break
}
}
}
func ( *loopyWriter) ( *outgoingWindowUpdate) error {
return .framer.fr.WriteWindowUpdate(.streamID, .increment)
}
func ( *loopyWriter) ( *incomingWindowUpdate) error {
if .streamID == 0 {
.sendQuota += .increment
return nil
}
if , := .estdStreams[.streamID]; {
.bytesOutStanding -= int(.increment)
if := int(.oiws) - .bytesOutStanding; > 0 && .state == waitingOnStreamQuota {
.state = active
.activeStreams.enqueue()
return nil
}
}
return nil
}
func ( *loopyWriter) ( *outgoingSettings) error {
return .framer.fr.WriteSettings(.ss...)
}
func ( *loopyWriter) ( *incomingSettings) error {
if := .applySettings(.ss); != nil {
return
}
return .framer.fr.WriteSettingsAck()
}
func ( *loopyWriter) ( *registerStream) error {
:= &outStream{
id: .streamID,
state: empty,
itl: &itemList{},
wq: .wq,
}
.estdStreams[.streamID] =
return nil
}
func ( *loopyWriter) ( *headerFrame) error {
if .side == serverSide {
, := .estdStreams[.streamID]
if ! {
if logger.V(logLevel) {
logger.Warningf("transport: loopy doesn't recognize the stream: %d", .streamID)
}
return nil
}
if !.endStream {
return .writeHeader(.streamID, .endStream, .hf, .onWrite)
}
if .state != empty {
.itl.enqueue()
return nil
}
if := .writeHeader(.streamID, .endStream, .hf, .onWrite); != nil {
return
}
return .cleanupStreamHandler(.cleanup)
}
:= &outStream{
id: .streamID,
state: empty,
itl: &itemList{},
wq: .wq,
}
return .originateStream(, )
}
func ( *loopyWriter) ( *outStream, *headerFrame) error {
if .draining {
.onOrphaned(errStreamDrain)
return nil
}
if := .initStream(.id); != nil {
return
}
if := .writeHeader(.id, .endStream, .hf, .onWrite); != nil {
return
}
.estdStreams[.id] =
return nil
}
func ( *loopyWriter) ( uint32, bool, []hpack.HeaderField, func()) error {
if != nil {
()
}
.hBuf.Reset()
for , := range {
if := .hEnc.WriteField(); != nil {
if logger.V(logLevel) {
logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", )
}
}
}
var (
error
, bool
)
= true
for ! {
:= .hBuf.Len()
if > http2MaxFrameLen {
= http2MaxFrameLen
} else {
= true
}
if {
= false
= .framer.fr.WriteHeaders(http2.HeadersFrameParam{
StreamID: ,
BlockFragment: .hBuf.Next(),
EndStream: ,
EndHeaders: ,
})
} else {
= .framer.fr.WriteContinuation(
,
,
.hBuf.Next(),
)
}
if != nil {
return
}
}
return nil
}
func ( *loopyWriter) ( *dataFrame) error {
, := .estdStreams[.streamID]
if ! {
return nil
}
.itl.enqueue()
if .state == empty {
.state = active
.activeStreams.enqueue()
}
return nil
}
func ( *loopyWriter) ( *ping) error {
if !.ack {
.bdpEst.timesnap(.data)
}
return .framer.fr.WritePing(.ack, .data)
}
func ( *loopyWriter) ( *outFlowControlSizeRequest) error {
.resp <- .sendQuota
return nil
}
func ( *loopyWriter) ( *cleanupStream) error {
.onWrite()
if , := .estdStreams[.streamID]; {
delete(.estdStreams, .streamID)
.deleteSelf()
}
if .rst {
if := .framer.fr.WriteRSTStream(.streamID, .rstCode); != nil {
return
}
}
if .draining && len(.estdStreams) == 0 {
return errors.New("finished processing active streams while in draining mode")
}
return nil
}
func ( *loopyWriter) ( *earlyAbortStream) error {
if .side == clientSide {
return errors.New("earlyAbortStream not handled on client")
}
if .httpStatus == 0 {
.httpStatus = 200
}
:= []hpack.HeaderField{
{Name: ":status", Value: strconv.Itoa(int(.httpStatus))},
{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)},
{Name: "grpc-status", Value: strconv.Itoa(int(.status.Code()))},
{Name: "grpc-message", Value: encodeGrpcMessage(.status.Message())},
}
if := .writeHeader(.streamID, true, , nil); != nil {
return
}
if .rst {
if := .framer.fr.WriteRSTStream(.streamID, http2.ErrCodeNo); != nil {
return
}
}
return nil
}
func ( *loopyWriter) (*incomingGoAway) error {
if .side == clientSide {
.draining = true
if len(.estdStreams) == 0 {
return errors.New("received GOAWAY with no active streams")
}
}
return nil
}
func ( *loopyWriter) ( *goAway) error {
if .ssGoAwayHandler != nil {
, := .ssGoAwayHandler()
if != nil {
return
}
.draining =
}
return nil
}
func ( *loopyWriter) () error {
return ErrConnClosing
}
func ( *loopyWriter) ( interface{}) error {
switch i := .(type) {
case *incomingWindowUpdate:
return .incomingWindowUpdateHandler()
case *outgoingWindowUpdate:
return .outgoingWindowUpdateHandler()
case *incomingSettings:
return .incomingSettingsHandler()
case *outgoingSettings:
return .outgoingSettingsHandler()
case *headerFrame:
return .headerHandler()
case *registerStream:
return .registerStreamHandler()
case *cleanupStream:
return .cleanupStreamHandler()
case *earlyAbortStream:
return .earlyAbortStreamHandler()
case *incomingGoAway:
return .incomingGoAwayHandler()
case *dataFrame:
return .preprocessData()
case *ping:
return .pingHandler()
case *goAway:
return .goAwayHandler()
case *outFlowControlSizeRequest:
return .outFlowControlSizeRequestHandler()
case closeConnection:
return .closeConnectionHandler()
default:
return fmt.Errorf("transport: unknown control message type %T", )
}
}
func ( *loopyWriter) ( []http2.Setting) error {
for , := range {
switch .ID {
case http2.SettingInitialWindowSize:
:= .oiws
.oiws = .Val
if < .oiws {
for , := range .estdStreams {
if .state == waitingOnStreamQuota {
.state = active
.activeStreams.enqueue()
}
}
}
case http2.SettingHeaderTableSize:
updateHeaderTblSize(.hEnc, .Val)
}
}
return nil
}
func ( *loopyWriter) () (bool, error) {
if .sendQuota == 0 {
return true, nil
}
:= .activeStreams.dequeue()
if == nil {
return true, nil
}
:= .itl.peek().(*dataFrame)
if len(.h) == 0 && len(.d) == 0 {
if := .framer.fr.WriteData(.streamID, .endStream, nil); != nil {
return false,
}
.itl.dequeue()
if .itl.isEmpty() {
.state = empty
} else if , := .itl.peek().(*headerFrame); {
if := .writeHeader(.streamID, .endStream, .hf, .onWrite); != nil {
return false,
}
if := .cleanupStreamHandler(.cleanup); != nil {
return false, nil
}
} else {
.activeStreams.enqueue()
}
return false, nil
}
var (
[]byte
)
:= http2MaxFrameLen
if := int(.oiws) - .bytesOutStanding; <= 0 {
.state = waitingOnStreamQuota
return false, nil
} else if > {
=
}
if > int(.sendQuota) {
= int(.sendQuota)
}
:= min(, len(.h))
:= min(-, len(.d))
if != 0 {
if == 0 {
= .h
} else {
var [http2MaxFrameLen]byte
copy([:], .h)
copy([:], .d[:])
= [:+]
}
} else {
= .d
}
:= +
.wq.replenish()
var bool
if .endStream && len(.h)+len(.d) <= {
= true
}
if .onEachWrite != nil {
.onEachWrite()
}
if := .framer.fr.WriteData(.streamID, , [:]); != nil {
return false,
}
.bytesOutStanding +=
.sendQuota -= uint32()
.h = .h[:]
.d = .d[:]
if len(.h) == 0 && len(.d) == 0 {
.itl.dequeue()
}
if .itl.isEmpty() {
.state = empty
} else if , := .itl.peek().(*headerFrame); {
if := .writeHeader(.streamID, .endStream, .hf, .onWrite); != nil {
return false,
}
if := .cleanupStreamHandler(.cleanup); != nil {
return false,
}
} else if int(.oiws)-.bytesOutStanding <= 0 {
.state = waitingOnStreamQuota
} else {
.activeStreams.enqueue()
}
return false, nil
}
func (, int) int {
if < {
return
}
return
}