package transport
import (
icredentials
imetadata
istatus
)
var clientConnectionCounter uint64
type http2Client struct {
lastRead int64
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{}
userAgent string
address resolver.Address
md metadata.MD
conn net.Conn
loopy *loopyWriter
remoteAddr net.Addr
localAddr net.Addr
authInfo credentials.AuthInfo
readerDone chan struct{}
writerDone chan struct{}
goAway chan struct{}
framer *framer
controlBuf *controlBuffer
fc *trInFlow
scheme string
isSecure bool
perRPCCreds []credentials.PerRPCCredentials
kp keepalive.ClientParameters
keepaliveEnabled bool
statsHandlers []stats.Handler
initialWindowSize int32
maxSendHeaderListSize *uint32
bdpEst *bdpEstimator
maxConcurrentStreams uint32
streamQuota int64
streamsQuotaAvailable chan struct{}
waitingStreams uint32
nextID uint32
registeredCompressors string
mu sync.Mutex
state transportState
activeStreams map[uint32]*Stream
prevGoAwayID uint32
goAwayReason GoAwayReason
goAwayDebugMessage string
kpDormancyCond *sync.Cond
kpDormant bool
channelzID *channelz.Identifier
czData *channelzData
onClose func(GoAwayReason)
bufferPool *bufferPool
connectionID uint64
}
func ( context.Context, func(context.Context, string) (net.Conn, error), resolver.Address, bool, string) (net.Conn, error) {
:= .Addr
, := networktype.Get()
if != nil {
if == "unix" && !strings.HasPrefix(, "\x00") {
if filepath.IsAbs() {
return (, "unix://"+)
}
return (, "unix:"+)
}
return (, )
}
if ! {
, = parseDialTarget()
}
if == "tcp" && {
return proxyDial(, , )
}
return (&net.Dialer{}).DialContext(, , )
}
func ( error) bool {
switch err := .(type) {
case interface {
() bool
}:
return .()
case interface {
() bool
}:
return .()
}
return true
}
func (, context.Context, resolver.Address, ConnectOptions, func(GoAwayReason)) ( *http2Client, error) {
:= "http"
, := context.WithCancel()
defer func() {
if != nil {
()
}
}()
= icredentials.NewClientHandshakeInfoContext(, credentials.ClientHandshakeInfo{Attributes: .Attributes})
, := dial(, .Dialer, , .UseProxy, .UserAgent)
if != nil {
if .FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(), , "transport: error while dialing: %v", )
}
return nil, connectionErrorf(true, , "transport: Error while dialing: %v", )
}
defer func( net.Conn) {
if != nil {
.Close()
}
}()
:= grpcsync.NewEvent()
, := context.WithCancel()
defer func() {
()
<-.Done()
}()
go func( net.Conn) {
defer .Fire()
<-.Done()
if := .Err(); != nil {
if logger.V(logLevel) {
logger.Infof("newClientTransport: aborting due to connectCtx: %v", )
}
.Close()
}
}()
:= .KeepaliveParams
if .Time == 0 {
.Time = defaultClientKeepaliveTime
}
if .Timeout == 0 {
.Timeout = defaultClientKeepaliveTimeout
}
:= false
if .Time != infinity {
if = syscall.SetTCPUserTimeout(, .Timeout); != nil {
return nil, connectionErrorf(false, , "transport: failed to set TCP_USER_TIMEOUT: %v", )
}
= true
}
var (
bool
credentials.AuthInfo
)
:= .TransportCredentials
:= .PerRPCCredentials
if := .CredsBundle; != nil {
if := .TransportCredentials(); != nil {
=
}
if := .PerRPCCredentials(); != nil {
= append(, )
}
}
if != nil {
, , = .ClientHandshake(, .ServerName, )
if != nil {
return nil, connectionErrorf(isTemporary(), , "transport: authentication handshake failed: %v", )
}
for , := range {
if .RequireTransportSecurity() {
if , := .(interface {
() credentials.CommonAuthInfo
}); {
:= .().SecurityLevel
if != credentials.InvalidSecurityLevel && < credentials.PrivacyAndIntegrity {
return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
}
}
}
}
= true
if .Info().SecurityProtocol == "tls" {
= "https"
}
}
:= true
:= int32(initialWindowSize)
if .InitialConnWindowSize >= defaultWindowSize {
= .InitialConnWindowSize
= false
}
:= .WriteBufferSize
:= .ReadBufferSize
:= defaultClientMaxHeaderListSize
if .MaxHeaderListSize != nil {
= *.MaxHeaderListSize
}
:= &http2Client{
ctx: ,
ctxDone: .Done(),
cancel: ,
userAgent: .UserAgent,
registeredCompressors: grpcutil.RegisteredCompressors(),
address: ,
conn: ,
remoteAddr: .RemoteAddr(),
localAddr: .LocalAddr(),
authInfo: ,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
framer: newFramer(, , , ),
fc: &trInFlow{limit: uint32()},
scheme: ,
activeStreams: make(map[uint32]*Stream),
isSecure: ,
perRPCCreds: ,
kp: ,
statsHandlers: .StatsHandlers,
initialWindowSize: initialWindowSize,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
keepaliveEnabled: ,
bufferPool: newBufferPool(),
onClose: ,
}
.ctx = peer.NewContext(.ctx, .getPeer())
if , := .Metadata.(*metadata.MD); {
.md = *
} else if := imetadata.Get(); != nil {
.md =
}
.controlBuf = newControlBuffer(.ctxDone)
if .InitialWindowSize >= defaultWindowSize {
.initialWindowSize = .InitialWindowSize
= false
}
if {
.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: .updateFlowControl,
}
}
for , := range .statsHandlers {
.ctx = .TagConn(.ctx, &stats.ConnTagInfo{
RemoteAddr: .remoteAddr,
LocalAddr: .localAddr,
})
:= &stats.ConnBegin{
Client: true,
}
.HandleConn(.ctx, )
}
.channelzID, = channelz.RegisterNormalSocket(, .ChannelzParentID, fmt.Sprintf("%s -> %s", .localAddr, .remoteAddr))
if != nil {
return nil,
}
if .keepaliveEnabled {
.kpDormancyCond = sync.NewCond(&.mu)
go .keepalive()
}
:= make(chan error, 1)
go .reader()
defer func() {
if == nil {
= <-
}
if != nil {
.Close()
}
}()
, := .conn.Write(clientPreface)
if != nil {
= connectionErrorf(true, , "transport: failed to write client preface: %v", )
return nil,
}
if != len(clientPreface) {
= connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", , len(clientPreface))
return nil,
}
var []http2.Setting
if .initialWindowSize != defaultWindowSize {
= append(, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(.initialWindowSize),
})
}
if .MaxHeaderListSize != nil {
= append(, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *.MaxHeaderListSize,
})
}
= .framer.fr.WriteSettings(...)
if != nil {
= connectionErrorf(true, , "transport: failed to write initial settings frame: %v", )
return nil,
}
if := uint32( - defaultWindowSize); > 0 {
if := .framer.fr.WriteWindowUpdate(0, ); != nil {
= connectionErrorf(true, , "transport: failed to write window update: %v", )
return nil,
}
}
.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
if := .framer.writer.Flush(); != nil {
return nil,
}
go func() {
.loopy = newLoopyWriter(clientSide, .framer, .controlBuf, .bdpEst)
:= .loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", )
}
.conn.Close()
.controlBuf.finish()
close(.writerDone)
}()
return , nil
}
func ( *http2Client) ( context.Context, *CallHdr) *Stream {
:= &Stream{
ct: ,
done: make(chan struct{}),
method: .Method,
sendCompress: .SendCompress,
buf: newRecvBuffer(),
headerChan: make(chan struct{}),
contentSubtype: .ContentSubtype,
doneFunc: .DoneFunc,
}
.wq = newWriteQuota(defaultWriteQuota, .done)
.requestRead = func( int) {
.adjustWindow(, uint32())
}
.ctx =
.trReader = &transportReader{
reader: &recvBufferReader{
ctx: .ctx,
ctxDone: .ctx.Done(),
recv: .buf,
closeStream: func( error) {
.CloseStream(, )
},
freeBuffer: .bufferPool.put,
},
windowHandler: func( int) {
.updateWindow(, uint32())
},
}
return
}
func ( *http2Client) () *peer.Peer {
return &peer.Peer{
Addr: .remoteAddr,
AuthInfo: .authInfo,
}
}
func ( *http2Client) ( context.Context, *CallHdr) ([]hpack.HeaderField, error) {
:= .createAudience()
:= credentials.RequestInfo{
Method: .Method,
AuthInfo: .authInfo,
}
:= icredentials.NewRequestInfoContext(, )
, := .getTrAuthData(, )
if != nil {
return nil,
}
, := .getCallAuthData(, , )
if != nil {
return nil,
}
:= 7
+= len() + len()
:= make([]hpack.HeaderField, 0, )
= append(, hpack.HeaderField{Name: ":method", Value: "POST"})
= append(, hpack.HeaderField{Name: ":scheme", Value: .scheme})
= append(, hpack.HeaderField{Name: ":path", Value: .Method})
= append(, hpack.HeaderField{Name: ":authority", Value: .Host})
= append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.ContentSubtype)})
= append(, hpack.HeaderField{Name: "user-agent", Value: .userAgent})
= append(, hpack.HeaderField{Name: "te", Value: "trailers"})
if .PreviousAttempts > 0 {
= append(, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(.PreviousAttempts)})
}
:= .registeredCompressors
if .SendCompress != "" {
= append(, hpack.HeaderField{Name: "grpc-encoding", Value: .SendCompress})
if !grpcutil.IsCompressorNameRegistered(.SendCompress) {
if != "" {
+= ","
}
+= .SendCompress
}
}
if != "" {
= append(, hpack.HeaderField{Name: "grpc-accept-encoding", Value: })
}
if , := .Deadline(); {
:= time.Until()
= append(, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration()})
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
if := stats.OutgoingTags(); != nil {
= append(, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader()})
}
if := stats.OutgoingTrace(); != nil {
= append(, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader()})
}
if , , := metadata.FromOutgoingContextRaw(); {
var string
for , := range {
if isReservedHeader() {
continue
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
for , := range {
for , := range {
if %2 == 0 {
= strings.ToLower()
continue
}
if isReservedHeader() {
continue
}
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
}
for , := range .md {
if isReservedHeader() {
continue
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
return , nil
}
func ( *http2Client) ( *CallHdr) string {
if len(.perRPCCreds) == 0 && .Creds == nil {
return ""
}
:= strings.TrimSuffix(.Host, ":443")
:= strings.LastIndex(.Method, "/")
if == -1 {
= len(.Method)
}
return "https://" + + .Method[:]
}
func ( *http2Client) ( context.Context, string) (map[string]string, error) {
if len(.perRPCCreds) == 0 {
return nil, nil
}
:= map[string]string{}
for , := range .perRPCCreds {
, := .GetRequestMetadata(, )
if != nil {
if , := status.FromError(); {
if istatus.IsRestrictedControlPlaneCode() {
= status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", )
}
return nil,
}
return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", )
}
for , := range {
= strings.ToLower()
[] =
}
}
return , nil
}
func ( *http2Client) ( context.Context, string, *CallHdr) (map[string]string, error) {
var map[string]string
if := .Creds; != nil {
if .RequireTransportSecurity() {
, := credentials.RequestInfoFromContext()
if !.isSecure || credentials.CheckSecurityLevel(.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
}
}
, := .GetRequestMetadata(, )
if != nil {
if , := status.FromError(); {
if istatus.IsRestrictedControlPlaneCode() {
= status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", )
}
return nil,
}
return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", )
}
= make(map[string]string, len())
for , := range {
= strings.ToLower()
[] =
}
}
return , nil
}
type NewStreamError struct {
Err error
AllowTransparentRetry bool
}
func ( NewStreamError) () string {
return .Err.Error()
}
func ( *http2Client) ( context.Context, *CallHdr) (*Stream, error) {
= peer.NewContext(, .getPeer())
if .address.ServerName != "" {
:= *
.Host = .address.ServerName
= &
}
, := .createHeaderFields(, )
if != nil {
return nil, &NewStreamError{Err: , AllowTransparentRetry: false}
}
:= .newStream(, )
:= func( error) {
if .swapState(streamDone) == streamDone {
return
}
atomic.StoreUint32(&.unprocessed, 1)
.write(recvMsg{err: })
close(.done)
if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
close(.headerChan)
}
}
:= &headerFrame{
hf: ,
endStream: false,
initStream: func( uint32) error {
.mu.Lock()
if .state == closing {
.mu.Unlock()
(ErrConnClosing)
return ErrConnClosing
}
if channelz.IsOn() {
atomic.AddInt64(&.czData.streamsStarted, 1)
atomic.StoreInt64(&.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
if .kpDormant {
.kpDormancyCond.Signal()
}
.mu.Unlock()
return nil
},
onOrphaned: ,
wq: .wq,
}
:= true
var chan struct{}
:= false
:= func( interface{}) bool {
if .streamQuota <= 0 {
if {
.waitingStreams++
}
= .streamsQuotaAvailable
return false
}
if ! {
.waitingStreams--
}
.streamQuota--
:= .(*headerFrame)
.streamID = .nextID
.nextID += 2
= .nextID > MaxStreamID
.id = .streamID
.fc = &inFlow{limit: uint32(.initialWindowSize)}
.mu.Lock()
if .activeStreams == nil {
.mu.Unlock()
return false
}
.activeStreams[.id] =
.mu.Unlock()
if .streamQuota > 0 && .waitingStreams > 0 {
select {
case .streamsQuotaAvailable <- struct{}{}:
default:
}
}
return true
}
var error
:= func( interface{}) bool {
if .maxSendHeaderListSize == nil {
return true
}
:= .(*headerFrame)
var int64
for , := range .hf {
if += int64(.Size()); > int64(*.maxSendHeaderListSize) {
= status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *.maxSendHeaderListSize)
return false
}
}
return true
}
for {
, := .controlBuf.executeAndPut(func( interface{}) bool {
return () && ()
}, )
if != nil {
return nil, &NewStreamError{Err: , AllowTransparentRetry: true}
}
if {
break
}
if != nil {
return nil, &NewStreamError{Err: }
}
= false
select {
case <-:
case <-.Done():
return nil, &NewStreamError{Err: ContextErr(.Err())}
case <-.goAway:
return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
case <-.ctx.Done():
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if len(.statsHandlers) != 0 {
, := metadata.FromOutgoingContext()
if {
.Set("user-agent", .userAgent)
} else {
= metadata.Pairs("user-agent", .userAgent)
}
for , := range .statsHandlers {
:= &stats.OutHeader{
Client: true,
FullMethod: .Method,
RemoteAddr: .remoteAddr,
LocalAddr: .localAddr,
Compression: .SendCompress,
Header: ,
}
.HandleRPC(.ctx, )
}
}
if {
if logger.V(logLevel) {
logger.Infof("transport: t.nextID > MaxStreamID. Draining")
}
.GracefulClose()
}
return , nil
}
func ( *http2Client) ( *Stream, error) {
var (
bool
http2.ErrCode
)
if != nil {
= true
= http2.ErrCodeCancel
}
.closeStream(, , , , status.Convert(), nil, false)
}
func ( *http2Client) ( *Stream, error, bool, http2.ErrCode, *status.Status, map[string][]string, bool) {
if .swapState(streamDone) == streamDone {
<-.done
return
}
.status =
if len() > 0 {
.trailer =
}
if != nil {
.write(recvMsg{err: })
}
if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
.noHeaders = true
close(.headerChan)
}
:= &cleanupStream{
streamID: .id,
onWrite: func() {
.mu.Lock()
if .activeStreams != nil {
delete(.activeStreams, .id)
}
.mu.Unlock()
if channelz.IsOn() {
if {
atomic.AddInt64(&.czData.streamsSucceeded, 1)
} else {
atomic.AddInt64(&.czData.streamsFailed, 1)
}
}
},
rst: ,
rstCode: ,
}
:= func(interface{}) bool {
.streamQuota++
if .streamQuota > 0 && .waitingStreams > 0 {
select {
case .streamsQuotaAvailable <- struct{}{}:
default:
}
}
return true
}
.controlBuf.executeAndPut(, )
close(.done)
if .doneFunc != nil {
.doneFunc()
}
}
func ( *http2Client) ( error) {
.mu.Lock()
if .state == closing {
.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", )
}
if .state != draining {
.onClose(GoAwayInvalid)
}
.state = closing
:= .activeStreams
.activeStreams = nil
if .kpDormant {
.kpDormancyCond.Signal()
}
.mu.Unlock()
.controlBuf.finish()
.cancel()
.conn.Close()
channelz.RemoveEntry(.channelzID)
, := .GetGoAwayReason()
var *status.Status
if len() > 0 {
= status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", , )
= .Err()
} else {
= status.New(codes.Unavailable, .Error())
}
for , := range {
.closeStream(, , false, http2.ErrCodeNo, , nil, false)
}
for , := range .statsHandlers {
:= &stats.ConnEnd{
Client: true,
}
.HandleConn(.ctx, )
}
}
func ( *http2Client) () {
.mu.Lock()
if .state == draining || .state == closing {
.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
}
.onClose(GoAwayInvalid)
.state = draining
:= len(.activeStreams)
.mu.Unlock()
if == 0 {
.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
return
}
.controlBuf.put(&incomingGoAway{})
}
func ( *http2Client) ( *Stream, []byte, []byte, *Options) error {
if .Last {
if !.compareAndSwapState(streamActive, streamWriteDone) {
return errStreamDone
}
} else if .getState() != streamActive {
return errStreamDone
}
:= &dataFrame{
streamID: .id,
endStream: .Last,
h: ,
d: ,
}
if != nil || != nil {
if := .wq.get(int32(len() + len())); != nil {
return
}
}
return .controlBuf.put()
}
func ( *http2Client) ( http2.Frame) *Stream {
.mu.Lock()
:= .activeStreams[.Header().StreamID]
.mu.Unlock()
return
}
func ( *http2Client) ( *Stream, uint32) {
if := .fc.maybeAdjust(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
}
}
func ( *http2Client) ( *Stream, uint32) {
if := .fc.onRead(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
}
}
func ( *http2Client) ( uint32) {
:= func(interface{}) bool {
.initialWindowSize = int32()
.mu.Lock()
for , := range .activeStreams {
.fc.newLimit()
}
.mu.Unlock()
return true
}
.controlBuf.executeAndPut(, &outgoingWindowUpdate{streamID: 0, increment: .fc.newLimit()})
.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
Val: ,
},
},
})
}
func ( *http2Client) ( *http2.DataFrame) {
:= .Header().Length
var bool
if .bdpEst != nil {
= .bdpEst.add()
}
if := .fc.onData(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
if {
if := .fc.reset(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
.controlBuf.put(bdpPing)
}
:= .getStream()
if == nil {
return
}
if > 0 {
if := .fc.onData(); != nil {
.closeStream(, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, .Error()), nil, false)
return
}
if .Header().Flags.Has(http2.FlagDataPadded) {
if := .fc.onRead( - uint32(len(.Data()))); > 0 {
.controlBuf.put(&outgoingWindowUpdate{.id, })
}
}
if len(.Data()) > 0 {
:= .bufferPool.get()
.Reset()
.Write(.Data())
.write(recvMsg{buffer: })
}
}
if .StreamEnded() {
.closeStream(, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
}
func ( *http2Client) ( *http2.RSTStreamFrame) {
:= .getStream()
if == nil {
return
}
if .ErrCode == http2.ErrCodeRefusedStream {
atomic.StoreUint32(&.unprocessed, 1)
}
, := http2ErrConvTab[.ErrCode]
if ! {
if logger.V(logLevel) {
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error: %v", .ErrCode)
}
= codes.Unknown
}
if == codes.Canceled {
if , := .ctx.Deadline(); && !.After(time.Now()) {
= codes.DeadlineExceeded
}
}
.closeStream(, io.EOF, false, http2.ErrCodeNo, status.Newf(, "stream terminated by RST_STREAM with error code: %v", .ErrCode), nil, false)
}
func ( *http2Client) ( *http2.SettingsFrame, bool) {
if .IsAck() {
return
}
var *uint32
var []http2.Setting
var []func()
.ForeachSetting(func( http2.Setting) error {
switch .ID {
case http2.SettingMaxConcurrentStreams:
= new(uint32)
* = .Val
case http2.SettingMaxHeaderListSize:
= append(, func() {
.maxSendHeaderListSize = new(uint32)
*.maxSendHeaderListSize = .Val
})
default:
= append(, )
}
return nil
})
if && == nil {
= new(uint32)
* = math.MaxUint32
}
:= &incomingSettings{
ss: ,
}
if != nil {
:= func() {
:= int64(*) - int64(.maxConcurrentStreams)
.maxConcurrentStreams = *
.streamQuota +=
if > 0 && .waitingStreams > 0 {
close(.streamsQuotaAvailable)
.streamsQuotaAvailable = make(chan struct{}, 1)
}
}
= append(, )
}
.controlBuf.executeAndPut(func(interface{}) bool {
for , := range {
()
}
return true
}, )
}
func ( *http2Client) ( *http2.PingFrame) {
if .IsAck() {
if .bdpEst != nil {
.bdpEst.calculate(.Data)
}
return
}
:= &ping{ack: true}
copy(.data[:], .Data[:])
.controlBuf.put()
}
func ( *http2Client) ( *http2.GoAwayFrame) {
.mu.Lock()
if .state == closing {
.mu.Unlock()
return
}
if .ErrCode == http2.ErrCodeEnhanceYourCalm {
if logger.V(logLevel) {
logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
}
}
:= .LastStreamID
if > 0 && %2 == 0 {
.mu.Unlock()
.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", ))
return
}
select {
case <-.goAway:
if > .prevGoAwayID {
.mu.Unlock()
.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", , .prevGoAwayID))
return
}
default:
.setGoAwayReason()
close(.goAway)
defer .controlBuf.put(&incomingGoAway{})
if .state != draining {
.onClose(.goAwayReason)
.state = draining
}
}
:= .prevGoAwayID
if == 0 {
= math.MaxUint32
}
.prevGoAwayID =
if len(.activeStreams) == 0 {
.mu.Unlock()
.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
}
:= make([]*Stream, 0)
for , := range .activeStreams {
if > && <= {
if > && <= {
atomic.StoreUint32(&.unprocessed, 1)
= append(, )
}
}
}
.mu.Unlock()
for , := range {
.closeStream(, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
}
func ( *http2Client) ( *http2.GoAwayFrame) {
.goAwayReason = GoAwayNoReason
switch .ErrCode {
case http2.ErrCodeEnhanceYourCalm:
if string(.DebugData()) == "too_many_pings" {
.goAwayReason = GoAwayTooManyPings
}
}
if len(.DebugData()) == 0 {
.goAwayDebugMessage = fmt.Sprintf("code: %s", .ErrCode)
} else {
.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", .ErrCode, string(.DebugData()))
}
}
func ( *http2Client) () (GoAwayReason, string) {
.mu.Lock()
defer .mu.Unlock()
return .goAwayReason, .goAwayDebugMessage
}
func ( *http2Client) ( *http2.WindowUpdateFrame) {
.controlBuf.put(&incomingWindowUpdate{
streamID: .Header().StreamID,
increment: .Increment,
})
}
func ( *http2Client) ( *http2.MetaHeadersFrame) {
:= .getStream()
if == nil {
return
}
:= .StreamEnded()
atomic.StoreUint32(&.bytesReceived, 1)
:= atomic.LoadUint32(&.headerChanClosed) == 0
if ! && ! {
:= status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, false)
return
}
if .Truncated {
:= status.New(codes.Internal, "peer header list size exceeded limit")
.closeStream(, .Err(), true, http2.ErrCodeFrameSize, , nil, )
return
}
var (
= !
= make(map[string][]string)
= "malformed header: missing HTTP content-type"
string
*status.Status
string
*int
string
= codes.Unknown
string
)
if {
= "malformed header: missing HTTP status"
}
for , := range .Fields {
switch .Name {
case "content-type":
if , := grpcutil.ContentSubtype(.Value); ! {
= fmt.Sprintf("transport: received unexpected content-type %q", .Value)
break
}
= ""
[.Name] = append([.Name], .Value)
= true
case "grpc-encoding":
= .Value
case "grpc-status":
, := strconv.ParseInt(.Value, 10, 32)
if != nil {
:= status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", ))
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
return
}
= codes.Code(uint32())
case "grpc-message":
= decodeGrpcMessage(.Value)
case "grpc-status-details-bin":
var error
, = decodeGRPCStatusDetails(.Value)
if != nil {
= fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", )
}
case ":status":
if .Value == "200" {
= ""
:= 200
= &
break
}
, := strconv.ParseInt(.Value, 10, 32)
if != nil {
:= status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", ))
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
return
}
:= int()
= &
= fmt.Sprintf(
"unexpected HTTP status code received from server: %d (%s)",
,
http.StatusText(),
)
default:
if isReservedHeader(.Name) && !isWhitelistedHeader(.Name) {
break
}
, := decodeMetadataHeader(.Name, .Value)
if != nil {
= fmt.Sprintf("transport: malformed %s: %v", .Name, )
logger.Warningf("Failed to decode metadata header (%q, %q): %v", .Name, .Value, )
break
}
[.Name] = append([.Name], )
}
}
if ! || != "" {
var = codes.Internal
if != nil {
var bool
, = HTTPStatusConvTab[*]
if ! {
= codes.Unknown
}
}
var []string
if != "" {
= append(, )
}
if != "" {
= append(, )
}
:= status.New(, strings.Join(, "; "))
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
return
}
if != "" {
:= status.New(codes.Internal, )
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
return
}
:= false
if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
.headerValid = true
if ! {
= true
.recvCompress =
if len() > 0 {
.header =
}
} else {
.noHeaders = true
}
close(.headerChan)
}
for , := range .statsHandlers {
if {
:= &stats.InHeader{
Client: true,
WireLength: int(.Header().Length),
Header: metadata.MD().Copy(),
Compression: .recvCompress,
}
.HandleRPC(.ctx, )
} else {
:= &stats.InTrailer{
Client: true,
WireLength: int(.Header().Length),
Trailer: metadata.MD().Copy(),
}
.HandleRPC(.ctx, )
}
}
if ! {
return
}
if == nil {
= status.New(, )
}
:= .getState() == streamActive
.closeStream(, io.EOF, , http2.ErrCodeNo, , , true)
}
func ( *http2Client) () error {
, := .framer.fr.ReadFrame()
if != nil {
return connectionErrorf(true, , "error reading server preface: %v", )
}
, := .(*http2.SettingsFrame)
if ! {
return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", )
}
.handleSettings(, true)
return nil
}
func ( *http2Client) ( chan<- error) {
defer close(.readerDone)
if := .readServerPreface(); != nil {
<-
return
}
close()
if .keepaliveEnabled {
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
}
for {
.controlBuf.throttle()
, := .framer.fr.ReadFrame()
if .keepaliveEnabled {
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
}
if != nil {
if , := .(http2.StreamError); {
.mu.Lock()
:= .activeStreams[.StreamID]
.mu.Unlock()
if != nil {
:= http2ErrConvTab[.Code]
:= .framer.fr.ErrorDetail()
var string
if != nil {
= .Error()
} else {
= "received invalid frame"
}
.closeStream(, status.Error(, ), true, http2.ErrCodeProtocol, status.New(, ), nil, false)
}
continue
} else {
.Close(connectionErrorf(true, , "error reading from server: %v", ))
return
}
}
switch frame := .(type) {
case *http2.MetaHeadersFrame:
.operateHeaders()
case *http2.DataFrame:
.handleData()
case *http2.RSTStreamFrame:
.handleRSTStream()
case *http2.SettingsFrame:
.handleSettings(, false)
case *http2.PingFrame:
.handlePing()
case *http2.GoAwayFrame:
.handleGoAway()
case *http2.WindowUpdateFrame:
.handleWindowUpdate()
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", )
}
}
}
}
func (, time.Duration) time.Duration {
if < {
return
}
return
}
func ( *http2Client) () {
:= &ping{data: [8]byte{}}
:= false
:= time.Duration(0)
:= time.Now().UnixNano()
:= time.NewTimer(.kp.Time)
for {
select {
case <-.C:
:= atomic.LoadInt64(&.lastRead)
if > {
= false
.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
=
continue
}
if && <= 0 {
.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
.mu.Lock()
if .state == closing {
.mu.Unlock()
return
}
if len(.activeStreams) < 1 && !.kp.PermitWithoutStream {
= false
.kpDormant = true
.kpDormancyCond.Wait()
}
.kpDormant = false
.mu.Unlock()
if ! {
if channelz.IsOn() {
atomic.AddInt64(&.czData.kpCount, 1)
}
.controlBuf.put()
= .kp.Timeout
= true
}
:= minTime(.kp.Time, )
-=
.Reset()
case <-.ctx.Done():
if !.Stop() {
<-.C
}
return
}
}
}
func ( *http2Client) () <-chan struct{} {
return .ctx.Done()
}
func ( *http2Client) () <-chan struct{} {
return .goAway
}
func ( *http2Client) () *channelz.SocketInternalMetric {
:= channelz.SocketInternalMetric{
StreamsStarted: atomic.LoadInt64(&.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&.czData.kpCount),
LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(.fc.getSize()),
SocketOptions: channelz.GetSocketOption(.conn),
LocalAddr: .localAddr,
RemoteAddr: .remoteAddr,
}
if , := .authInfo.(credentials.ChannelzSecurityInfo); {
.Security = .GetSecurityValue()
}
.RemoteFlowControlWindow = .getOutFlowWindow()
return &
}
func ( *http2Client) () net.Addr { return .remoteAddr }
func ( *http2Client) () {
atomic.AddInt64(&.czData.msgSent, 1)
atomic.StoreInt64(&.czData.lastMsgSentTime, time.Now().UnixNano())
}
func ( *http2Client) () {
atomic.AddInt64(&.czData.msgRecv, 1)
atomic.StoreInt64(&.czData.lastMsgRecvTime, time.Now().UnixNano())
}
func ( *http2Client) () int64 {
:= make(chan uint32, 1)
:= time.NewTimer(time.Second)
defer .Stop()
.controlBuf.put(&outFlowControlSizeRequest{})
select {
case := <-:
return int64()
case <-.ctxDone:
return -1
case <-.C:
return -2
}
}
func ( *http2Client) () transportState {
.mu.Lock()
defer .mu.Unlock()
return .state
}