package transport
import (
icredentials
imetadata
istatus
isyscall
)
var clientConnectionCounter uint64
var goAwayLoopyWriterTimeout = 5 * time.Second
var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
type http2Client struct {
lastRead int64
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{}
userAgent string
address resolver.Address
md metadata.MD
conn net.Conn
loopy *loopyWriter
remoteAddr net.Addr
localAddr net.Addr
authInfo credentials.AuthInfo
readerDone chan struct{}
writerDone chan struct{}
goAway chan struct{}
keepaliveDone chan struct{}
framer *framer
controlBuf *controlBuffer
fc *trInFlow
scheme string
isSecure bool
perRPCCreds []credentials.PerRPCCredentials
kp keepalive.ClientParameters
keepaliveEnabled bool
statsHandlers []stats.Handler
initialWindowSize int32
maxSendHeaderListSize *uint32
bdpEst *bdpEstimator
maxConcurrentStreams uint32
streamQuota int64
streamsQuotaAvailable chan struct{}
waitingStreams uint32
registeredCompressors string
mu sync.Mutex
nextID uint32
state transportState
activeStreams map[uint32]*ClientStream
prevGoAwayID uint32
goAwayReason GoAwayReason
goAwayDebugMessage string
kpDormancyCond *sync.Cond
kpDormant bool
channelz *channelz.Socket
onClose func(GoAwayReason)
bufferPool mem.BufferPool
connectionID uint64
logger *grpclog.PrefixLogger
}
func ( context.Context, func(context.Context, string) (net.Conn, error), resolver.Address, string) (net.Conn, error) {
:= .Addr
, := networktype.Get()
if != nil {
if == "unix" && !strings.HasPrefix(, "\x00") {
if filepath.IsAbs() {
return (, "unix://"+)
}
return (, "unix:"+)
}
return (, )
}
if ! {
, = ParseDialTarget()
}
if , := proxyattributes.Get(); {
return proxyDial(, , , )
}
return internal.NetDialerWithTCPKeepalive().DialContext(, , )
}
func ( error) bool {
switch err := .(type) {
case interface {
() bool
}:
return .()
case interface {
() bool
}:
return .()
}
return true
}
func (, context.Context, resolver.Address, ConnectOptions, func(GoAwayReason)) ( ClientTransport, error) {
:= "http"
, := context.WithCancel()
defer func() {
if != nil {
()
}
}()
= icredentials.NewClientHandshakeInfoContext(, credentials.ClientHandshakeInfo{Attributes: .Attributes})
, := dial(, .Dialer, , .UserAgent)
if != nil {
if .FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(), , "transport: error while dialing: %v", )
}
return nil, connectionErrorf(true, , "transport: Error while dialing: %v", )
}
defer func( net.Conn) {
if != nil {
.Close()
}
}()
:= grpcsync.NewEvent()
, := context.WithCancel()
defer func() {
()
<-.Done()
}()
go func( net.Conn) {
defer .Fire()
<-.Done()
if := .Err(); != nil {
if logger.V(logLevel) {
logger.Infof("Aborting due to connect deadline expiring: %v", )
}
.Close()
}
}()
:= .KeepaliveParams
if .Time == 0 {
.Time = defaultClientKeepaliveTime
}
if .Timeout == 0 {
.Timeout = defaultClientKeepaliveTimeout
}
:= false
if .Time != infinity {
if = isyscall.SetTCPUserTimeout(, .Timeout); != nil {
return nil, connectionErrorf(false, , "transport: failed to set TCP_USER_TIMEOUT: %v", )
}
= true
}
var (
bool
credentials.AuthInfo
)
:= .TransportCredentials
:= .PerRPCCredentials
if := .CredsBundle; != nil {
if := .TransportCredentials(); != nil {
=
}
if := .PerRPCCredentials(); != nil {
= append(, )
}
}
if != nil {
, , = .ClientHandshake(, .ServerName, )
if != nil {
return nil, connectionErrorf(isTemporary(), , "transport: authentication handshake failed: %v", )
}
for , := range {
if .RequireTransportSecurity() {
if , := .(interface {
() credentials.CommonAuthInfo
}); {
:= .().SecurityLevel
if != credentials.InvalidSecurityLevel && < credentials.PrivacyAndIntegrity {
return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
}
}
}
}
= true
if .Info().SecurityProtocol == "tls" {
= "https"
}
}
:= int32(initialWindowSize)
if .InitialConnWindowSize >= defaultWindowSize {
= .InitialConnWindowSize
}
:= .WriteBufferSize
:= .ReadBufferSize
:= defaultClientMaxHeaderListSize
if .MaxHeaderListSize != nil {
= *.MaxHeaderListSize
}
:= &http2Client{
ctx: ,
ctxDone: .Done(),
cancel: ,
userAgent: .UserAgent,
registeredCompressors: grpcutil.RegisteredCompressors(),
address: ,
conn: ,
remoteAddr: .RemoteAddr(),
localAddr: .LocalAddr(),
authInfo: ,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
keepaliveDone: make(chan struct{}),
framer: newFramer(, , , .SharedWriteBuffer, ),
fc: &trInFlow{limit: uint32()},
scheme: ,
activeStreams: make(map[uint32]*ClientStream),
isSecure: ,
perRPCCreds: ,
kp: ,
statsHandlers: .StatsHandlers,
initialWindowSize: initialWindowSize,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
keepaliveEnabled: ,
bufferPool: .BufferPool,
onClose: ,
}
var credentials.ChannelzSecurityValue
if , := .(credentials.ChannelzSecurityInfo); {
= .GetSecurityValue()
}
.channelz = channelz.RegisterSocket(
&channelz.Socket{
SocketType: channelz.SocketTypeNormal,
Parent: .ChannelzParent,
SocketMetrics: channelz.SocketMetrics{},
EphemeralMetrics: .socketMetrics,
LocalAddr: .localAddr,
RemoteAddr: .remoteAddr,
SocketOptions: channelz.GetSocketOption(.conn),
Security: ,
})
.logger = prefixLoggerForClientTransport()
.ctx = peer.NewContext(.ctx, .getPeer())
if , := .Metadata.(*metadata.MD); {
.md = *
} else if := imetadata.Get(); != nil {
.md =
}
.controlBuf = newControlBuffer(.ctxDone)
if .InitialWindowSize >= defaultWindowSize {
.initialWindowSize = .InitialWindowSize
}
if !.StaticWindowSize {
.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: .updateFlowControl,
}
}
for , := range .statsHandlers {
.ctx = .TagConn(.ctx, &stats.ConnTagInfo{
RemoteAddr: .remoteAddr,
LocalAddr: .localAddr,
})
:= &stats.ConnBegin{
Client: true,
}
.HandleConn(.ctx, )
}
if .keepaliveEnabled {
.kpDormancyCond = sync.NewCond(&.mu)
go .keepalive()
}
:= make(chan error, 1)
go .reader()
defer func() {
if != nil {
close(.writerDone)
.Close()
}
}()
, := .conn.Write(clientPreface)
if != nil {
= connectionErrorf(true, , "transport: failed to write client preface: %v", )
return nil,
}
if != len(clientPreface) {
= connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", , len(clientPreface))
return nil,
}
var []http2.Setting
if .initialWindowSize != defaultWindowSize {
= append(, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(.initialWindowSize),
})
}
if .MaxHeaderListSize != nil {
= append(, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *.MaxHeaderListSize,
})
}
= .framer.fr.WriteSettings(...)
if != nil {
= connectionErrorf(true, , "transport: failed to write initial settings frame: %v", )
return nil,
}
if := uint32( - defaultWindowSize); > 0 {
if := .framer.fr.WriteWindowUpdate(0, ); != nil {
= connectionErrorf(true, , "transport: failed to write window update: %v", )
return nil,
}
}
.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
if := .framer.writer.Flush(); != nil {
return nil,
}
if = <-; != nil {
return nil,
}
go func() {
.loopy = newLoopyWriter(clientSide, .framer, .controlBuf, .bdpEst, .conn, .logger, .outgoingGoAwayHandler, .bufferPool)
if := .loopy.run(); !isIOError() {
.conn.Close()
}
close(.writerDone)
}()
return , nil
}
func ( *http2Client) ( context.Context, *CallHdr) *ClientStream {
:= &ClientStream{
Stream: &Stream{
method: .Method,
sendCompress: .SendCompress,
buf: newRecvBuffer(),
contentSubtype: .ContentSubtype,
},
ct: ,
done: make(chan struct{}),
headerChan: make(chan struct{}),
doneFunc: .DoneFunc,
}
.wq = newWriteQuota(defaultWriteQuota, .done)
.requestRead = func( int) {
.adjustWindow(, uint32())
}
.ctx =
.trReader = &transportReader{
reader: &recvBufferReader{
ctx: .ctx,
ctxDone: .ctx.Done(),
recv: .buf,
closeStream: func( error) {
.Close()
},
},
windowHandler: func( int) {
.updateWindow(, uint32())
},
}
return
}
func ( *http2Client) () *peer.Peer {
return &peer.Peer{
Addr: .remoteAddr,
AuthInfo: .authInfo,
LocalAddr: .localAddr,
}
}
func ( *http2Client) ( *goAway) (bool, error) {
.mu.Lock()
:= .nextID - 2
.mu.Unlock()
if := .framer.fr.WriteGoAway(, http2.ErrCodeNo, .debugData); != nil {
return false,
}
return false, .closeConn
}
func ( *http2Client) ( context.Context, *CallHdr) ([]hpack.HeaderField, error) {
:= .createAudience()
:= credentials.RequestInfo{
Method: .Method,
AuthInfo: .authInfo,
}
:= credentials.NewContextWithRequestInfo(, )
, := .getTrAuthData(, )
if != nil {
return nil,
}
, := .getCallAuthData(, , )
if != nil {
return nil,
}
:= 7
+= len() + len()
:= make([]hpack.HeaderField, 0, )
= append(, hpack.HeaderField{Name: ":method", Value: "POST"})
= append(, hpack.HeaderField{Name: ":scheme", Value: .scheme})
= append(, hpack.HeaderField{Name: ":path", Value: .Method})
= append(, hpack.HeaderField{Name: ":authority", Value: .Host})
= append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.ContentSubtype)})
= append(, hpack.HeaderField{Name: "user-agent", Value: .userAgent})
= append(, hpack.HeaderField{Name: "te", Value: "trailers"})
if .PreviousAttempts > 0 {
= append(, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(.PreviousAttempts)})
}
:= .registeredCompressors
if .SendCompress != "" {
= append(, hpack.HeaderField{Name: "grpc-encoding", Value: .SendCompress})
if !grpcutil.IsCompressorNameRegistered(.SendCompress) {
if != "" {
+= ","
}
+= .SendCompress
}
}
if != "" {
= append(, hpack.HeaderField{Name: "grpc-accept-encoding", Value: })
}
if , := .Deadline(); {
:= time.Until()
if <= 0 {
return nil, status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
}
= append(, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration()})
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
if , , := metadataFromOutgoingContextRaw(); {
var string
for , := range {
if isReservedHeader() {
continue
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
for , := range {
for , := range {
if %2 == 0 {
= strings.ToLower()
continue
}
if isReservedHeader() {
continue
}
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
}
for , := range .md {
if isReservedHeader() {
continue
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
return , nil
}
func ( *http2Client) ( *CallHdr) string {
if len(.perRPCCreds) == 0 && .Creds == nil {
return ""
}
:= strings.TrimSuffix(.Host, ":443")
:= strings.LastIndex(.Method, "/")
if == -1 {
= len(.Method)
}
return "https://" + + .Method[:]
}
func ( *http2Client) ( context.Context, string) (map[string]string, error) {
if len(.perRPCCreds) == 0 {
return nil, nil
}
:= map[string]string{}
for , := range .perRPCCreds {
, := .GetRequestMetadata(, )
if != nil {
if , := status.FromError(); {
if istatus.IsRestrictedControlPlaneCode() {
= status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", )
}
return nil,
}
return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", )
}
for , := range {
= strings.ToLower()
[] =
}
}
return , nil
}
func ( *http2Client) ( context.Context, string, *CallHdr) (map[string]string, error) {
var map[string]string
if := .Creds; != nil {
if .RequireTransportSecurity() {
, := credentials.RequestInfoFromContext()
if !.isSecure || credentials.CheckSecurityLevel(.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
}
}
, := .GetRequestMetadata(, )
if != nil {
if , := status.FromError(); {
if istatus.IsRestrictedControlPlaneCode() {
= status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", )
}
return nil,
}
return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", )
}
= make(map[string]string, len())
for , := range {
= strings.ToLower()
[] =
}
}
return , nil
}
type NewStreamError struct {
Err error
AllowTransparentRetry bool
}
func ( NewStreamError) () string {
return .Err.Error()
}
func ( *http2Client) ( context.Context, *CallHdr) (*ClientStream, error) {
= peer.NewContext(, .getPeer())
if .address.ServerName != "" {
:= *
.Host = .address.ServerName
= &
}
if .Authority != "" {
, := .authInfo.(credentials.AuthorityValidator)
if ! {
return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "credentials type %q does not implement the AuthorityValidator interface, but authority override specified with CallAuthority call option", .authInfo.AuthType())}
}
if := .ValidateAuthority(.Authority); != nil {
return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "failed to validate authority %q : %v", .Authority, )}
}
:= *
.Host = .Authority
= &
}
, := .createHeaderFields(, )
if != nil {
return nil, &NewStreamError{Err: , AllowTransparentRetry: false}
}
:= .newStream(, )
:= func( error) {
if .swapState(streamDone) == streamDone {
return
}
.unprocessed.Store(true)
.write(recvMsg{err: })
close(.done)
if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
close(.headerChan)
}
}
:= &headerFrame{
hf: ,
endStream: false,
initStream: func(uint32) error {
.mu.Lock()
if .state == closing {
.mu.Unlock()
(ErrConnClosing)
return ErrConnClosing
}
if channelz.IsOn() {
.channelz.SocketMetrics.StreamsStarted.Add(1)
.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
if .kpDormant {
.kpDormancyCond.Signal()
}
.mu.Unlock()
return nil
},
onOrphaned: ,
wq: .wq,
}
:= true
var chan struct{}
:= false
:= func() bool {
if .streamQuota <= 0 {
if {
.waitingStreams++
}
= .streamsQuotaAvailable
return false
}
if ! {
.waitingStreams--
}
.streamQuota--
.mu.Lock()
if .state == draining || .activeStreams == nil {
.mu.Unlock()
return false
}
.streamID = .nextID
.nextID += 2
= .nextID > MaxStreamID
.id = .streamID
.fc = &inFlow{limit: uint32(.initialWindowSize)}
.activeStreams[.id] =
.mu.Unlock()
if .streamQuota > 0 && .waitingStreams > 0 {
select {
case .streamsQuotaAvailable <- struct{}{}:
default:
}
}
return true
}
var error
:= func() bool {
if .maxSendHeaderListSize == nil {
return true
}
var int64
for , := range .hf {
if += int64(.Size()); > int64(*.maxSendHeaderListSize) {
= status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *.maxSendHeaderListSize)
return false
}
}
return true
}
for {
, := .controlBuf.executeAndPut(func() bool {
return () && ()
}, )
if != nil {
return nil, &NewStreamError{Err: , AllowTransparentRetry: true}
}
if {
break
}
if != nil {
return nil, &NewStreamError{Err: }
}
= false
select {
case <-:
case <-.Done():
return nil, &NewStreamError{Err: ContextErr(.Err())}
case <-.goAway:
return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
case <-.ctx.Done():
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if len(.statsHandlers) != 0 {
, := metadata.FromOutgoingContext()
if {
.Set("user-agent", .userAgent)
} else {
= metadata.Pairs("user-agent", .userAgent)
}
for , := range .statsHandlers {
:= &stats.OutHeader{
Client: true,
FullMethod: .Method,
RemoteAddr: .remoteAddr,
LocalAddr: .localAddr,
Compression: .SendCompress,
Header: ,
}
.HandleRPC(.ctx, )
}
}
if {
if .logger.V(logLevel) {
.logger.Infof("Draining transport: t.nextID > MaxStreamID")
}
.GracefulClose()
}
return , nil
}
func ( *http2Client) ( *ClientStream, error, bool, http2.ErrCode, *status.Status, map[string][]string, bool) {
if .swapState(streamDone) == streamDone {
<-.done
return
}
.status =
if len() > 0 {
.trailer =
}
if != nil {
.write(recvMsg{err: })
}
if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
.noHeaders = true
close(.headerChan)
}
:= &cleanupStream{
streamID: .id,
onWrite: func() {
.mu.Lock()
if .activeStreams != nil {
delete(.activeStreams, .id)
}
.mu.Unlock()
if channelz.IsOn() {
if {
.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
},
rst: ,
rstCode: ,
}
:= func() bool {
.streamQuota++
if .streamQuota > 0 && .waitingStreams > 0 {
select {
case .streamsQuotaAvailable <- struct{}{}:
default:
}
}
return true
}
.controlBuf.executeAndPut(, )
close(.done)
if .doneFunc != nil {
.doneFunc()
}
}
func ( *http2Client) ( error) {
.conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
.mu.Lock()
if .state == closing {
.mu.Unlock()
return
}
if .logger.V(logLevel) {
.logger.Infof("Closing: %v", )
}
if .state != draining {
.onClose(GoAwayInvalid)
}
.state = closing
:= .activeStreams
.activeStreams = nil
if .kpDormant {
.kpDormancyCond.Signal()
}
:= .goAwayDebugMessage
.mu.Unlock()
.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: })
:= time.NewTimer(goAwayLoopyWriterTimeout)
defer .Stop()
select {
case <-.writerDone:
case <-.C:
.logger.Infof("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout)
}
.cancel()
.conn.Close()
<-.readerDone
if .keepaliveEnabled {
<-.keepaliveDone
}
channelz.RemoveEntry(.channelz.ID)
var *status.Status
if len() > 0 {
= status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", , )
= .Err()
} else {
= status.New(codes.Unavailable, .Error())
}
for , := range {
.closeStream(, , false, http2.ErrCodeNo, , nil, false)
}
for , := range .statsHandlers {
:= &stats.ConnEnd{
Client: true,
}
.HandleConn(.ctx, )
}
}
func ( *http2Client) () {
.mu.Lock()
if .state == draining || .state == closing {
.mu.Unlock()
return
}
if .logger.V(logLevel) {
.logger.Infof("GracefulClose called")
}
.onClose(GoAwayInvalid)
.state = draining
:= len(.activeStreams)
.mu.Unlock()
if == 0 {
.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
return
}
.controlBuf.put(&incomingGoAway{})
}
func ( *http2Client) ( *ClientStream, []byte, mem.BufferSlice, *WriteOptions) error {
if .Last {
if !.compareAndSwapState(streamActive, streamWriteDone) {
return errStreamDone
}
} else if .getState() != streamActive {
return errStreamDone
}
:= &dataFrame{
streamID: .id,
endStream: .Last,
h: ,
data: ,
}
:= .Len()
if != nil || != 0 {
if := .wq.get(int32(len() + )); != nil {
return
}
}
.Ref()
if := .controlBuf.put(); != nil {
.Free()
return
}
.incrMsgSent()
return nil
}
func ( *http2Client) ( http2.Frame) *ClientStream {
.mu.Lock()
:= .activeStreams[.Header().StreamID]
.mu.Unlock()
return
}
func ( *http2Client) ( *ClientStream, uint32) {
if := .fc.maybeAdjust(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
}
}
func ( *http2Client) ( *ClientStream, uint32) {
if := .fc.onRead(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
}
}
func ( *http2Client) ( uint32) {
:= func() bool {
.initialWindowSize = int32()
.mu.Lock()
for , := range .activeStreams {
.fc.newLimit()
}
.mu.Unlock()
return true
}
.controlBuf.executeAndPut(, &outgoingWindowUpdate{streamID: 0, increment: .fc.newLimit()})
.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
Val: ,
},
},
})
}
func ( *http2Client) ( *http2.DataFrame) {
:= .Header().Length
var bool
if .bdpEst != nil {
= .bdpEst.add()
}
if := .fc.onData(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
if {
if := .fc.reset(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
.controlBuf.put(bdpPing)
}
:= .getStream()
if == nil {
return
}
if > 0 {
if := .fc.onData(); != nil {
.closeStream(, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, .Error()), nil, false)
return
}
if .Header().Flags.Has(http2.FlagDataPadded) {
if := .fc.onRead( - uint32(len(.Data()))); > 0 {
.controlBuf.put(&outgoingWindowUpdate{.id, })
}
}
if len(.Data()) > 0 {
:= .bufferPool
if == nil {
= mem.DefaultBufferPool()
}
.write(recvMsg{buffer: mem.Copy(.Data(), )})
}
}
if .StreamEnded() {
.closeStream(, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
}
func ( *http2Client) ( *http2.RSTStreamFrame) {
:= .getStream()
if == nil {
return
}
if .ErrCode == http2.ErrCodeRefusedStream {
.unprocessed.Store(true)
}
, := http2ErrConvTab[.ErrCode]
if ! {
if .logger.V(logLevel) {
.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", .ErrCode)
}
= codes.Unknown
}
if == codes.Canceled {
if , := .ctx.Deadline(); && !.After(time.Now()) {
= codes.DeadlineExceeded
}
}
:= status.Newf(, "stream terminated by RST_STREAM with error code: %v", .ErrCode)
.closeStream(, .Err(), false, http2.ErrCodeNo, , nil, false)
}
func ( *http2Client) ( *http2.SettingsFrame, bool) {
if .IsAck() {
return
}
var *uint32
var []http2.Setting
var []func()
.ForeachSetting(func( http2.Setting) error {
switch .ID {
case http2.SettingMaxConcurrentStreams:
= new(uint32)
* = .Val
case http2.SettingMaxHeaderListSize:
= append(, func() {
.maxSendHeaderListSize = new(uint32)
*.maxSendHeaderListSize = .Val
})
default:
= append(, )
}
return nil
})
if && == nil {
= new(uint32)
* = math.MaxUint32
}
:= &incomingSettings{
ss: ,
}
if != nil {
:= func() {
:= int64(*) - int64(.maxConcurrentStreams)
.maxConcurrentStreams = *
.streamQuota +=
if > 0 && .waitingStreams > 0 {
close(.streamsQuotaAvailable)
.streamsQuotaAvailable = make(chan struct{}, 1)
}
}
= append(, )
}
.controlBuf.executeAndPut(func() bool {
for , := range {
()
}
return true
}, )
}
func ( *http2Client) ( *http2.PingFrame) {
if .IsAck() {
if .bdpEst != nil {
.bdpEst.calculate(.Data)
}
return
}
:= &ping{ack: true}
copy(.data[:], .Data[:])
.controlBuf.put()
}
func ( *http2Client) ( *http2.GoAwayFrame) error {
.mu.Lock()
if .state == closing {
.mu.Unlock()
return nil
}
if .ErrCode == http2.ErrCodeEnhanceYourCalm && string(.DebugData()) == "too_many_pings" {
logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
}
:= .LastStreamID
if > 0 && %2 == 0 {
.mu.Unlock()
return connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", )
}
select {
case <-.goAway:
if > .prevGoAwayID {
.mu.Unlock()
return connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", , .prevGoAwayID)
}
default:
.setGoAwayReason()
close(.goAway)
defer .controlBuf.put(&incomingGoAway{})
if .state != draining {
.onClose(.goAwayReason)
.state = draining
}
}
:= .prevGoAwayID
if == 0 {
= math.MaxUint32
}
.prevGoAwayID =
if len(.activeStreams) == 0 {
.mu.Unlock()
return connectionErrorf(true, nil, "received goaway and there are no active streams")
}
:= make([]*ClientStream, 0)
for , := range .activeStreams {
if > && <= {
.unprocessed.Store(true)
= append(, )
}
}
.mu.Unlock()
for , := range {
.closeStream(, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
return nil
}
func ( *http2Client) ( *http2.GoAwayFrame) {
.goAwayReason = GoAwayNoReason
if .ErrCode == http2.ErrCodeEnhanceYourCalm {
if string(.DebugData()) == "too_many_pings" {
.goAwayReason = GoAwayTooManyPings
}
}
if len(.DebugData()) == 0 {
.goAwayDebugMessage = fmt.Sprintf("code: %s", .ErrCode)
} else {
.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", .ErrCode, string(.DebugData()))
}
}
func ( *http2Client) () (GoAwayReason, string) {
.mu.Lock()
defer .mu.Unlock()
return .goAwayReason, .goAwayDebugMessage
}
func ( *http2Client) ( *http2.WindowUpdateFrame) {
.controlBuf.put(&incomingWindowUpdate{
streamID: .Header().StreamID,
increment: .Increment,
})
}
func ( *http2Client) ( *http2.MetaHeadersFrame) {
:= .getStream()
if == nil {
return
}
:= .StreamEnded()
.bytesReceived.Store(true)
:= atomic.LoadUint32(&.headerChanClosed) == 0
if ! && ! {
:= status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, false)
return
}
if .Truncated {
:= status.New(codes.Internal, "peer header list size exceeded limit")
.closeStream(, .Err(), true, http2.ErrCodeFrameSize, , nil, )
return
}
var (
= !
= make(map[string][]string)
= "malformed header: missing HTTP content-type"
string
string
*int
string
= codes.Unknown
string
)
if {
= "malformed header: missing HTTP status"
}
for , := range .Fields {
switch .Name {
case "content-type":
if , := grpcutil.ContentSubtype(.Value); ! {
= fmt.Sprintf("transport: received unexpected content-type %q", .Value)
break
}
= ""
[.Name] = append([.Name], .Value)
= true
case "grpc-encoding":
= .Value
case "grpc-status":
, := strconv.ParseInt(.Value, 10, 32)
if != nil {
:= status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", ))
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
return
}
= codes.Code(uint32())
case "grpc-message":
= decodeGrpcMessage(.Value)
case ":status":
if .Value == "200" {
= ""
:= 200
= &
break
}
, := strconv.ParseInt(.Value, 10, 32)
if != nil {
:= status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", ))
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
return
}
:= int()
= &
= fmt.Sprintf(
"unexpected HTTP status code received from server: %d (%s)",
,
http.StatusText(),
)
default:
if isReservedHeader(.Name) && !isWhitelistedHeader(.Name) {
break
}
, := decodeMetadataHeader(.Name, .Value)
if != nil {
= fmt.Sprintf("transport: malformed %s: %v", .Name, )
logger.Warningf("Failed to decode metadata header (%q, %q): %v", .Name, .Value, )
break
}
[.Name] = append([.Name], )
}
}
if ! || != "" {
var = codes.Internal
if != nil {
var bool
, = HTTPStatusConvTab[*]
if ! {
= codes.Unknown
}
}
var []string
if != "" {
= append(, )
}
if != "" {
= append(, )
}
:= status.New(, strings.Join(, "; "))
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
return
}
if != "" {
:= status.New(codes.Internal, )
.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
return
}
if ! {
if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
.headerValid = true
.recvCompress =
if len() > 0 {
.header =
}
close(.headerChan)
}
}
for , := range .statsHandlers {
if ! {
:= &stats.InHeader{
Client: true,
WireLength: int(.Header().Length),
Header: metadata.MD().Copy(),
Compression: .recvCompress,
}
.HandleRPC(.ctx, )
} else {
:= &stats.InTrailer{
Client: true,
WireLength: int(.Header().Length),
Trailer: metadata.MD().Copy(),
}
.HandleRPC(.ctx, )
}
}
if ! {
return
}
:= istatus.NewWithProto(, , [grpcStatusDetailsBinHeader])
:= .getState() == streamActive
.closeStream(, io.EOF, , http2.ErrCodeNo, , , true)
}
func ( *http2Client) () error {
, := .framer.fr.ReadFrame()
if != nil {
return connectionErrorf(true, , "error reading server preface: %v", )
}
, := .(*http2.SettingsFrame)
if ! {
return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", )
}
.handleSettings(, true)
return nil
}
func ( *http2Client) ( chan<- error) {
var error
defer func() {
close(.readerDone)
if != nil {
.Close()
}
}()
if := .readServerPreface(); != nil {
<-
return
}
close()
if .keepaliveEnabled {
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
}
for {
.controlBuf.throttle()
, := .framer.fr.ReadFrame()
if .keepaliveEnabled {
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
}
if != nil {
if , := .(http2.StreamError); {
.mu.Lock()
:= .activeStreams[.StreamID]
.mu.Unlock()
if != nil {
:= http2ErrConvTab[.Code]
:= .framer.fr.ErrorDetail()
var string
if != nil {
= .Error()
} else {
= "received invalid frame"
}
.closeStream(, status.Error(, ), true, http2.ErrCodeProtocol, status.New(, ), nil, false)
}
continue
}
= connectionErrorf(true, , "error reading from server: %v", )
return
}
switch frame := .(type) {
case *http2.MetaHeadersFrame:
.operateHeaders()
case *http2.DataFrame:
.handleData()
case *http2.RSTStreamFrame:
.handleRSTStream()
case *http2.SettingsFrame:
.handleSettings(, false)
case *http2.PingFrame:
.handlePing()
case *http2.GoAwayFrame:
= .handleGoAway()
case *http2.WindowUpdateFrame:
.handleWindowUpdate()
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", )
}
}
}
}
func ( *http2Client) () {
var error
defer func() {
close(.keepaliveDone)
if != nil {
.Close()
}
}()
:= &ping{data: [8]byte{}}
:= false
:= time.Duration(0)
:= time.Now().UnixNano()
:= time.NewTimer(.kp.Time)
for {
select {
case <-.C:
:= atomic.LoadInt64(&.lastRead)
if > {
= false
.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
=
continue
}
if && <= 0 {
= connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")
return
}
.mu.Lock()
if .state == closing {
.mu.Unlock()
return
}
if len(.activeStreams) < 1 && !.kp.PermitWithoutStream {
= false
.kpDormant = true
.kpDormancyCond.Wait()
}
.kpDormant = false
.mu.Unlock()
if ! {
if channelz.IsOn() {
.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
.controlBuf.put()
= .kp.Timeout
= true
}
:= min(.kp.Time, )
-=
.Reset()
case <-.ctx.Done():
if !.Stop() {
<-.C
}
return
}
}
}
func ( *http2Client) () <-chan struct{} {
return .ctx.Done()
}
func ( *http2Client) () <-chan struct{} {
return .goAway
}
func ( *http2Client) () *channelz.EphemeralSocketMetrics {
return &channelz.EphemeralSocketMetrics{
LocalFlowControlWindow: int64(.fc.getSize()),
RemoteFlowControlWindow: .getOutFlowWindow(),
}
}
func ( *http2Client) () net.Addr { return .remoteAddr }
func ( *http2Client) () {
if channelz.IsOn() {
.channelz.SocketMetrics.MessagesSent.Add(1)
.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
}
}
func ( *http2Client) () {
if channelz.IsOn() {
.channelz.SocketMetrics.MessagesReceived.Add(1)
.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
}
}
func ( *http2Client) () int64 {
:= make(chan uint32, 1)
:= time.NewTimer(time.Second)
defer .Stop()
.controlBuf.put(&outFlowControlSizeRequest{})
select {
case := <-:
return int64()
case <-.ctxDone:
return -1
case <-.C:
return -2
}
}
func ( *http2Client) () transportState {
.mu.Lock()
defer .mu.Unlock()
return .state
}