/*
 *
 * Copyright 2014 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package transport

import (
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	icredentials 
	
	
	imetadata 
	istatus 
	
	
	
	
	
	
	
	
)

// clientConnectionCounter counts the number of connections a client has
// initiated (equal to the number of http2Clients created). Must be accessed
// atomically.
var clientConnectionCounter uint64

// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
	lastRead  int64 // Keep this field 64-bit aligned. Accessed atomically.
	ctx       context.Context
	cancel    context.CancelFunc
	ctxDone   <-chan struct{} // Cache the ctx.Done() chan.
	userAgent string
	// address contains the resolver returned address for this transport.
	// If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
	// passed to `NewStream`, when determining the :authority header.
	address    resolver.Address
	md         metadata.MD
	conn       net.Conn // underlying communication channel
	loopy      *loopyWriter
	remoteAddr net.Addr
	localAddr  net.Addr
	authInfo   credentials.AuthInfo // auth info about the connection

	readerDone chan struct{} // sync point to enable testing.
	writerDone chan struct{} // sync point to enable testing.
	// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
	// that the server sent GoAway on this transport.
	goAway chan struct{}

	framer *framer
	// controlBuf delivers all the control related tasks (e.g., window
	// updates, reset streams, and various settings) to the controller.
	// Do not access controlBuf with mu held.
	controlBuf *controlBuffer
	fc         *trInFlow
	// The scheme used: https if TLS is on, http otherwise.
	scheme string

	isSecure bool

	perRPCCreds []credentials.PerRPCCredentials

	kp               keepalive.ClientParameters
	keepaliveEnabled bool

	statsHandlers []stats.Handler

	initialWindowSize int32

	// configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
	maxSendHeaderListSize *uint32

	bdpEst *bdpEstimator

	maxConcurrentStreams  uint32
	streamQuota           int64
	streamsQuotaAvailable chan struct{}
	waitingStreams        uint32
	nextID                uint32
	registeredCompressors string

	// Do not access controlBuf with mu held.
	mu            sync.Mutex // guard the following variables
	state         transportState
	activeStreams map[uint32]*Stream
	// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
	prevGoAwayID uint32
	// goAwayReason records the http2.ErrCode and debug data received with the
	// GoAway frame.
	goAwayReason GoAwayReason
	// goAwayDebugMessage contains a detailed human readable string about a
	// GoAway frame, useful for error messages.
	goAwayDebugMessage string
	// A condition variable used to signal when the keepalive goroutine should
	// go dormant. The condition for dormancy is based on the number of active
	// streams and the `PermitWithoutStream` keepalive client parameter. And
	// since the number of active streams is guarded by the above mutex, we use
	// the same for this condition variable as well.
	kpDormancyCond *sync.Cond
	// A boolean to track whether the keepalive goroutine is dormant or not.
	// This is checked before attempting to signal the above condition
	// variable.
	kpDormant bool

	// Fields below are for channelz metric collection.
	channelzID *channelz.Identifier
	czData     *channelzData

	onClose func(GoAwayReason)

	bufferPool *bufferPool

	connectionID uint64
}

func ( context.Context,  func(context.Context, string) (net.Conn, error),  resolver.Address,  bool,  string) (net.Conn, error) {
	 := .Addr
	,  := networktype.Get()
	if  != nil {
		// Special handling for unix scheme with custom dialer. Back in the day,
		// we did not have a unix resolver and therefore targets with a unix
		// scheme would end up using the passthrough resolver. So, user's used a
		// custom dialer in this case and expected the original dial target to
		// be passed to the custom dialer. Now, we have a unix resolver. But if
		// a custom dialer is specified, we want to retain the old behavior in
		// terms of the address being passed to the custom dialer.
		if  == "unix" && !strings.HasPrefix(, "\x00") {
			// Supported unix targets are either "unix://absolute-path" or
			// "unix:relative-path".
			if filepath.IsAbs() {
				return (, "unix://"+)
			}
			return (, "unix:"+)
		}
		return (, )
	}
	if ! {
		,  = parseDialTarget()
	}
	if  == "tcp" &&  {
		return proxyDial(, , )
	}
	return (&net.Dialer{}).DialContext(, , )
}

func ( error) bool {
	switch err := .(type) {
	case interface {
		() bool
	}:
		return .()
	case interface {
		() bool
	}:
		// Timeouts may be resolved upon retry, and are thus treated as
		// temporary.
		return .()
	}
	return true
}

// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func (,  context.Context,  resolver.Address,  ConnectOptions,  func(GoAwayReason)) ( *http2Client,  error) {
	 := "http"
	,  := context.WithCancel()
	defer func() {
		if  != nil {
			()
		}
	}()

	// gRPC, resolver, balancer etc. can specify arbitrary data in the
	// Attributes field of resolver.Address, which is shoved into connectCtx
	// and passed to the dialer and credential handshaker. This makes it possible for
	// address specific arbitrary data to reach custom dialers and credential handshakers.
	 = icredentials.NewClientHandshakeInfoContext(, credentials.ClientHandshakeInfo{Attributes: .Attributes})

	,  := dial(, .Dialer, , .UseProxy, .UserAgent)
	if  != nil {
		if .FailOnNonTempDialError {
			return nil, connectionErrorf(isTemporary(), , "transport: error while dialing: %v", )
		}
		return nil, connectionErrorf(true, , "transport: Error while dialing: %v", )
	}

	// Any further errors will close the underlying connection
	defer func( net.Conn) {
		if  != nil {
			.Close()
		}
	}()

	// The following defer and goroutine monitor the connectCtx for cancelation
	// and deadline.  On context expiration, the connection is hard closed and
	// this function will naturally fail as a result.  Otherwise, the defer
	// waits for the goroutine to exit to prevent the context from being
	// monitored (and to prevent the connection from ever being closed) after
	// returning from this function.
	 := grpcsync.NewEvent()
	,  := context.WithCancel()
	defer func() {
		()         // Awaken the goroutine below if connectCtx hasn't expired.
		<-.Done() // Wait for the goroutine below to exit.
	}()
	go func( net.Conn) {
		defer .Fire() // Signal this goroutine has exited.
		<-.Done()       // Block until connectCtx expires or the defer above executes.
		if  := .Err();  != nil {
			// connectCtx expired before exiting the function.  Hard close the connection.
			if logger.V(logLevel) {
				logger.Infof("newClientTransport: aborting due to connectCtx: %v", )
			}
			.Close()
		}
	}()

	 := .KeepaliveParams
	// Validate keepalive parameters.
	if .Time == 0 {
		.Time = defaultClientKeepaliveTime
	}
	if .Timeout == 0 {
		.Timeout = defaultClientKeepaliveTimeout
	}
	 := false
	if .Time != infinity {
		if  = syscall.SetTCPUserTimeout(, .Timeout);  != nil {
			return nil, connectionErrorf(false, , "transport: failed to set TCP_USER_TIMEOUT: %v", )
		}
		 = true
	}
	var (
		 bool
		 credentials.AuthInfo
	)
	 := .TransportCredentials
	 := .PerRPCCredentials

	if  := .CredsBundle;  != nil {
		if  := .TransportCredentials();  != nil {
			 = 
		}
		if  := .PerRPCCredentials();  != nil {
			 = append(, )
		}
	}
	if  != nil {
		, ,  = .ClientHandshake(, .ServerName, )
		if  != nil {
			return nil, connectionErrorf(isTemporary(), , "transport: authentication handshake failed: %v", )
		}
		for ,  := range  {
			if .RequireTransportSecurity() {
				if ,  := .(interface {
					() credentials.CommonAuthInfo
				});  {
					 := .().SecurityLevel
					if  != credentials.InvalidSecurityLevel &&  < credentials.PrivacyAndIntegrity {
						return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
					}
				}
			}
		}
		 = true
		if .Info().SecurityProtocol == "tls" {
			 = "https"
		}
	}
	 := true
	 := int32(initialWindowSize)
	if .InitialConnWindowSize >= defaultWindowSize {
		 = .InitialConnWindowSize
		 = false
	}
	 := .WriteBufferSize
	 := .ReadBufferSize
	 := defaultClientMaxHeaderListSize
	if .MaxHeaderListSize != nil {
		 = *.MaxHeaderListSize
	}
	 := &http2Client{
		ctx:                   ,
		ctxDone:               .Done(), // Cache Done chan.
		cancel:                ,
		userAgent:             .UserAgent,
		registeredCompressors: grpcutil.RegisteredCompressors(),
		address:               ,
		conn:                  ,
		remoteAddr:            .RemoteAddr(),
		localAddr:             .LocalAddr(),
		authInfo:              ,
		readerDone:            make(chan struct{}),
		writerDone:            make(chan struct{}),
		goAway:                make(chan struct{}),
		framer:                newFramer(, , , ),
		fc:                    &trInFlow{limit: uint32()},
		scheme:                ,
		activeStreams:         make(map[uint32]*Stream),
		isSecure:              ,
		perRPCCreds:           ,
		kp:                    ,
		statsHandlers:         .StatsHandlers,
		initialWindowSize:     initialWindowSize,
		nextID:                1,
		maxConcurrentStreams:  defaultMaxStreamsClient,
		streamQuota:           defaultMaxStreamsClient,
		streamsQuotaAvailable: make(chan struct{}, 1),
		czData:                new(channelzData),
		keepaliveEnabled:      ,
		bufferPool:            newBufferPool(),
		onClose:               ,
	}
	// Add peer information to the http2client context.
	.ctx = peer.NewContext(.ctx, .getPeer())

	if ,  := .Metadata.(*metadata.MD);  {
		.md = *
	} else if  := imetadata.Get();  != nil {
		.md = 
	}
	.controlBuf = newControlBuffer(.ctxDone)
	if .InitialWindowSize >= defaultWindowSize {
		.initialWindowSize = .InitialWindowSize
		 = false
	}
	if  {
		.bdpEst = &bdpEstimator{
			bdp:               initialWindowSize,
			updateFlowControl: .updateFlowControl,
		}
	}
	for ,  := range .statsHandlers {
		.ctx = .TagConn(.ctx, &stats.ConnTagInfo{
			RemoteAddr: .remoteAddr,
			LocalAddr:  .localAddr,
		})
		 := &stats.ConnBegin{
			Client: true,
		}
		.HandleConn(.ctx, )
	}
	.channelzID,  = channelz.RegisterNormalSocket(, .ChannelzParentID, fmt.Sprintf("%s -> %s", .localAddr, .remoteAddr))
	if  != nil {
		return nil, 
	}
	if .keepaliveEnabled {
		.kpDormancyCond = sync.NewCond(&.mu)
		go .keepalive()
	}

	// Start the reader goroutine for incoming messages. Each transport has a
	// dedicated goroutine which reads HTTP2 frames from the network. Then it
	// dispatches the frame to the corresponding stream entity.  When the
	// server preface is received, readerErrCh is closed.  If an error occurs
	// first, an error is pushed to the channel.  This must be checked before
	// returning from this function.
	 := make(chan error, 1)
	go .reader()
	defer func() {
		if  == nil {
			 = <-
		}
		if  != nil {
			.Close()
		}
	}()

	// Send connection preface to server.
	,  := .conn.Write(clientPreface)
	if  != nil {
		 = connectionErrorf(true, , "transport: failed to write client preface: %v", )
		return nil, 
	}
	if  != len(clientPreface) {
		 = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", , len(clientPreface))
		return nil, 
	}
	var  []http2.Setting

	if .initialWindowSize != defaultWindowSize {
		 = append(, http2.Setting{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32(.initialWindowSize),
		})
	}
	if .MaxHeaderListSize != nil {
		 = append(, http2.Setting{
			ID:  http2.SettingMaxHeaderListSize,
			Val: *.MaxHeaderListSize,
		})
	}
	 = .framer.fr.WriteSettings(...)
	if  != nil {
		 = connectionErrorf(true, , "transport: failed to write initial settings frame: %v", )
		return nil, 
	}
	// Adjust the connection flow control window if needed.
	if  := uint32( - defaultWindowSize);  > 0 {
		if  := .framer.fr.WriteWindowUpdate(0, );  != nil {
			 = connectionErrorf(true, , "transport: failed to write window update: %v", )
			return nil, 
		}
	}

	.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)

	if  := .framer.writer.Flush();  != nil {
		return nil, 
	}
	go func() {
		.loopy = newLoopyWriter(clientSide, .framer, .controlBuf, .bdpEst)
		 := .loopy.run()
		if logger.V(logLevel) {
			logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", )
		}
		// Do not close the transport.  Let reader goroutine handle it since
		// there might be data in the buffers.
		.conn.Close()
		.controlBuf.finish()
		close(.writerDone)
	}()
	return , nil
}

func ( *http2Client) ( context.Context,  *CallHdr) *Stream {
	// TODO(zhaoq): Handle uint32 overflow of Stream.id.
	 := &Stream{
		ct:             ,
		done:           make(chan struct{}),
		method:         .Method,
		sendCompress:   .SendCompress,
		buf:            newRecvBuffer(),
		headerChan:     make(chan struct{}),
		contentSubtype: .ContentSubtype,
		doneFunc:       .DoneFunc,
	}
	.wq = newWriteQuota(defaultWriteQuota, .done)
	.requestRead = func( int) {
		.adjustWindow(, uint32())
	}
	// The client side stream context should have exactly the same life cycle with the user provided context.
	// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
	// So we use the original context here instead of creating a copy.
	.ctx = 
	.trReader = &transportReader{
		reader: &recvBufferReader{
			ctx:     .ctx,
			ctxDone: .ctx.Done(),
			recv:    .buf,
			closeStream: func( error) {
				.CloseStream(, )
			},
			freeBuffer: .bufferPool.put,
		},
		windowHandler: func( int) {
			.updateWindow(, uint32())
		},
	}
	return 
}

func ( *http2Client) () *peer.Peer {
	return &peer.Peer{
		Addr:     .remoteAddr,
		AuthInfo: .authInfo, // Can be nil
	}
}

func ( *http2Client) ( context.Context,  *CallHdr) ([]hpack.HeaderField, error) {
	 := .createAudience()
	 := credentials.RequestInfo{
		Method:   .Method,
		AuthInfo: .authInfo,
	}
	 := icredentials.NewRequestInfoContext(, )
	,  := .getTrAuthData(, )
	if  != nil {
		return nil, 
	}
	,  := .getCallAuthData(, , )
	if  != nil {
		return nil, 
	}
	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
	// first and create a slice of that exact size.
	// Make the slice of certain predictable size to reduce allocations made by append.
	 := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
	 += len() + len()
	 := make([]hpack.HeaderField, 0, )
	 = append(, hpack.HeaderField{Name: ":method", Value: "POST"})
	 = append(, hpack.HeaderField{Name: ":scheme", Value: .scheme})
	 = append(, hpack.HeaderField{Name: ":path", Value: .Method})
	 = append(, hpack.HeaderField{Name: ":authority", Value: .Host})
	 = append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.ContentSubtype)})
	 = append(, hpack.HeaderField{Name: "user-agent", Value: .userAgent})
	 = append(, hpack.HeaderField{Name: "te", Value: "trailers"})
	if .PreviousAttempts > 0 {
		 = append(, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(.PreviousAttempts)})
	}

	 := .registeredCompressors
	if .SendCompress != "" {
		 = append(, hpack.HeaderField{Name: "grpc-encoding", Value: .SendCompress})
		// Include the outgoing compressor name when compressor is not registered
		// via encoding.RegisterCompressor. This is possible when client uses
		// WithCompressor dial option.
		if !grpcutil.IsCompressorNameRegistered(.SendCompress) {
			if  != "" {
				 += ","
			}
			 += .SendCompress
		}
	}

	if  != "" {
		 = append(, hpack.HeaderField{Name: "grpc-accept-encoding", Value: })
	}
	if ,  := .Deadline();  {
		// Send out timeout regardless its value. The server can detect timeout context by itself.
		// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
		 := time.Until()
		 = append(, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration()})
	}
	for ,  := range  {
		 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
	}
	for ,  := range  {
		 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
	}
	if  := stats.OutgoingTags();  != nil {
		 = append(, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader()})
	}
	if  := stats.OutgoingTrace();  != nil {
		 = append(, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader()})
	}

	if , ,  := metadata.FromOutgoingContextRaw();  {
		var  string
		for ,  := range  {
			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
			if isReservedHeader() {
				continue
			}
			for ,  := range  {
				 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
			}
		}
		for ,  := range  {
			for ,  := range  {
				if %2 == 0 {
					 = strings.ToLower()
					continue
				}
				// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
				if isReservedHeader() {
					continue
				}
				 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
			}
		}
	}
	for ,  := range .md {
		if isReservedHeader() {
			continue
		}
		for ,  := range  {
			 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
		}
	}
	return , nil
}

func ( *http2Client) ( *CallHdr) string {
	// Create an audience string only if needed.
	if len(.perRPCCreds) == 0 && .Creds == nil {
		return ""
	}
	// Construct URI required to get auth request metadata.
	// Omit port if it is the default one.
	 := strings.TrimSuffix(.Host, ":443")
	 := strings.LastIndex(.Method, "/")
	if  == -1 {
		 = len(.Method)
	}
	return "https://" +  + .Method[:]
}

func ( *http2Client) ( context.Context,  string) (map[string]string, error) {
	if len(.perRPCCreds) == 0 {
		return nil, nil
	}
	 := map[string]string{}
	for ,  := range .perRPCCreds {
		,  := .GetRequestMetadata(, )
		if  != nil {
			if ,  := status.FromError();  {
				// Restrict the code to the list allowed by gRFC A54.
				if istatus.IsRestrictedControlPlaneCode() {
					 = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", )
				}
				return nil, 
			}

			return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", )
		}
		for ,  := range  {
			// Capital header names are illegal in HTTP/2.
			 = strings.ToLower()
			[] = 
		}
	}
	return , nil
}

func ( *http2Client) ( context.Context,  string,  *CallHdr) (map[string]string, error) {
	var  map[string]string
	// Check if credentials.PerRPCCredentials were provided via call options.
	// Note: if these credentials are provided both via dial options and call
	// options, then both sets of credentials will be applied.
	if  := .Creds;  != nil {
		if .RequireTransportSecurity() {
			,  := credentials.RequestInfoFromContext()
			if !.isSecure || credentials.CheckSecurityLevel(.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
				return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
			}
		}
		,  := .GetRequestMetadata(, )
		if  != nil {
			if ,  := status.FromError();  {
				// Restrict the code to the list allowed by gRFC A54.
				if istatus.IsRestrictedControlPlaneCode() {
					 = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", )
				}
				return nil, 
			}
			return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", )
		}
		 = make(map[string]string, len())
		for ,  := range  {
			// Capital header names are illegal in HTTP/2
			 = strings.ToLower()
			[] = 
		}
	}
	return , nil
}

// NewStreamError wraps an error and reports additional information.  Typically
// NewStream errors result in transparent retry, as they mean nothing went onto
// the wire.  However, there are two notable exceptions:
//
//  1. If the stream headers violate the max header list size allowed by the
//     server.  It's possible this could succeed on another transport, even if
//     it's unlikely, but do not transparently retry.
//  2. If the credentials errored when requesting their headers.  In this case,
//     it's possible a retry can fix the problem, but indefinitely transparently
//     retrying is not appropriate as it is likely the credentials, if they can
//     eventually succeed, would need I/O to do so.
type NewStreamError struct {
	Err error

	AllowTransparentRetry bool
}

func ( NewStreamError) () string {
	return .Err.Error()
}

// NewStream creates a stream and registers it into the transport as "active"
// streams.  All non-nil errors returned will be *NewStreamError.
func ( *http2Client) ( context.Context,  *CallHdr) (*Stream, error) {
	 = peer.NewContext(, .getPeer())

	// ServerName field of the resolver returned address takes precedence over
	// Host field of CallHdr to determine the :authority header. This is because,
	// the ServerName field takes precedence for server authentication during
	// TLS handshake, and the :authority header should match the value used
	// for server authentication.
	if .address.ServerName != "" {
		 := *
		.Host = .address.ServerName
		 = &
	}

	,  := .createHeaderFields(, )
	if  != nil {
		return nil, &NewStreamError{Err: , AllowTransparentRetry: false}
	}
	 := .newStream(, )
	 := func( error) {
		if .swapState(streamDone) == streamDone {
			// If it was already done, return.
			return
		}
		// The stream was unprocessed by the server.
		atomic.StoreUint32(&.unprocessed, 1)
		.write(recvMsg{err: })
		close(.done)
		// If headerChan isn't closed, then close it.
		if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
			close(.headerChan)
		}
	}
	 := &headerFrame{
		hf:        ,
		endStream: false,
		initStream: func( uint32) error {
			.mu.Lock()
			// TODO: handle transport closure in loopy instead and remove this
			// initStream is never called when transport is draining.
			if .state == closing {
				.mu.Unlock()
				(ErrConnClosing)
				return ErrConnClosing
			}
			if channelz.IsOn() {
				atomic.AddInt64(&.czData.streamsStarted, 1)
				atomic.StoreInt64(&.czData.lastStreamCreatedTime, time.Now().UnixNano())
			}
			// If the keepalive goroutine has gone dormant, wake it up.
			if .kpDormant {
				.kpDormancyCond.Signal()
			}
			.mu.Unlock()
			return nil
		},
		onOrphaned: ,
		wq:         .wq,
	}
	 := true
	var  chan struct{}
	 := false
	 := func( interface{}) bool {
		if .streamQuota <= 0 { // Can go negative if server decreases it.
			if  {
				.waitingStreams++
			}
			 = .streamsQuotaAvailable
			return false
		}
		if ! {
			.waitingStreams--
		}
		.streamQuota--
		 := .(*headerFrame)
		.streamID = .nextID
		.nextID += 2

		// Drain client transport if nextID > MaxStreamID which signals gRPC that
		// the connection is closed and a new one must be created for subsequent RPCs.
		 = .nextID > MaxStreamID

		.id = .streamID
		.fc = &inFlow{limit: uint32(.initialWindowSize)}
		.mu.Lock()
		if .activeStreams == nil { // Can be niled from Close().
			.mu.Unlock()
			return false // Don't create a stream if the transport is already closed.
		}
		.activeStreams[.id] = 
		.mu.Unlock()
		if .streamQuota > 0 && .waitingStreams > 0 {
			select {
			case .streamsQuotaAvailable <- struct{}{}:
			default:
			}
		}
		return true
	}
	var  error
	 := func( interface{}) bool {
		if .maxSendHeaderListSize == nil {
			return true
		}
		 := .(*headerFrame)
		var  int64
		for ,  := range .hf {
			if  += int64(.Size());  > int64(*.maxSendHeaderListSize) {
				 = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *.maxSendHeaderListSize)
				return false
			}
		}
		return true
	}
	for {
		,  := .controlBuf.executeAndPut(func( interface{}) bool {
			return () && ()
		}, )
		if  != nil {
			// Connection closed.
			return nil, &NewStreamError{Err: , AllowTransparentRetry: true}
		}
		if  {
			break
		}
		if  != nil {
			return nil, &NewStreamError{Err: }
		}
		 = false
		select {
		case <-:
		case <-.Done():
			return nil, &NewStreamError{Err: ContextErr(.Err())}
		case <-.goAway:
			return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
		case <-.ctx.Done():
			return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
		}
	}
	if len(.statsHandlers) != 0 {
		,  := metadata.FromOutgoingContext()
		if  {
			.Set("user-agent", .userAgent)
		} else {
			 = metadata.Pairs("user-agent", .userAgent)
		}
		for ,  := range .statsHandlers {
			// Note: The header fields are compressed with hpack after this call returns.
			// No WireLength field is set here.
			// Note: Creating a new stats object to prevent pollution.
			 := &stats.OutHeader{
				Client:      true,
				FullMethod:  .Method,
				RemoteAddr:  .remoteAddr,
				LocalAddr:   .localAddr,
				Compression: .SendCompress,
				Header:      ,
			}
			.HandleRPC(.ctx, )
		}
	}
	if  {
		if logger.V(logLevel) {
			logger.Infof("transport: t.nextID > MaxStreamID. Draining")
		}
		.GracefulClose()
	}
	return , nil
}

// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func ( *http2Client) ( *Stream,  error) {
	var (
		     bool
		 http2.ErrCode
	)
	if  != nil {
		 = true
		 = http2.ErrCodeCancel
	}
	.closeStream(, , , , status.Convert(), nil, false)
}

func ( *http2Client) ( *Stream,  error,  bool,  http2.ErrCode,  *status.Status,  map[string][]string,  bool) {
	// Set stream status to done.
	if .swapState(streamDone) == streamDone {
		// If it was already done, return.  If multiple closeStream calls
		// happen simultaneously, wait for the first to finish.
		<-.done
		return
	}
	// status and trailers can be updated here without any synchronization because the stream goroutine will
	// only read it after it sees an io.EOF error from read or write and we'll write those errors
	// only after updating this.
	.status = 
	if len() > 0 {
		.trailer = 
	}
	if  != nil {
		// This will unblock reads eventually.
		.write(recvMsg{err: })
	}
	// If headerChan isn't closed, then close it.
	if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
		.noHeaders = true
		close(.headerChan)
	}
	 := &cleanupStream{
		streamID: .id,
		onWrite: func() {
			.mu.Lock()
			if .activeStreams != nil {
				delete(.activeStreams, .id)
			}
			.mu.Unlock()
			if channelz.IsOn() {
				if  {
					atomic.AddInt64(&.czData.streamsSucceeded, 1)
				} else {
					atomic.AddInt64(&.czData.streamsFailed, 1)
				}
			}
		},
		rst:     ,
		rstCode: ,
	}
	 := func(interface{}) bool {
		.streamQuota++
		if .streamQuota > 0 && .waitingStreams > 0 {
			select {
			case .streamsQuotaAvailable <- struct{}{}:
			default:
			}
		}
		return true
	}
	.controlBuf.executeAndPut(, )
	// This will unblock write.
	close(.done)
	if .doneFunc != nil {
		.doneFunc()
	}
}

// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
func ( *http2Client) ( error) {
	.mu.Lock()
	// Make sure we only close once.
	if .state == closing {
		.mu.Unlock()
		return
	}
	if logger.V(logLevel) {
		logger.Infof("transport: closing: %v", )
	}
	// Call t.onClose ASAP to prevent the client from attempting to create new
	// streams.
	if .state != draining {
		.onClose(GoAwayInvalid)
	}
	.state = closing
	 := .activeStreams
	.activeStreams = nil
	if .kpDormant {
		// If the keepalive goroutine is blocked on this condition variable, we
		// should unblock it so that the goroutine eventually exits.
		.kpDormancyCond.Signal()
	}
	.mu.Unlock()
	.controlBuf.finish()
	.cancel()
	.conn.Close()
	channelz.RemoveEntry(.channelzID)
	// Append info about previous goaways if there were any, since this may be important
	// for understanding the root cause for this connection to be closed.
	,  := .GetGoAwayReason()

	var  *status.Status
	if len() > 0 {
		 = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", , )
		 = .Err()
	} else {
		 = status.New(codes.Unavailable, .Error())
	}

	// Notify all active streams.
	for ,  := range  {
		.closeStream(, , false, http2.ErrCodeNo, , nil, false)
	}
	for ,  := range .statsHandlers {
		 := &stats.ConnEnd{
			Client: true,
		}
		.HandleConn(.ctx, )
	}
}

// GracefulClose sets the state to draining, which prevents new streams from
// being created and causes the transport to be closed when the last active
// stream is closed.  If there are no active streams, the transport is closed
// immediately.  This does nothing if the transport is already draining or
// closing.
func ( *http2Client) () {
	.mu.Lock()
	// Make sure we move to draining only from active.
	if .state == draining || .state == closing {
		.mu.Unlock()
		return
	}
	if logger.V(logLevel) {
		logger.Infof("transport: GracefulClose called")
	}
	.onClose(GoAwayInvalid)
	.state = draining
	 := len(.activeStreams)
	.mu.Unlock()
	if  == 0 {
		.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
		return
	}
	.controlBuf.put(&incomingGoAway{})
}

// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func ( *http2Client) ( *Stream,  []byte,  []byte,  *Options) error {
	if .Last {
		// If it's the last message, update stream state.
		if !.compareAndSwapState(streamActive, streamWriteDone) {
			return errStreamDone
		}
	} else if .getState() != streamActive {
		return errStreamDone
	}
	 := &dataFrame{
		streamID:  .id,
		endStream: .Last,
		h:         ,
		d:         ,
	}
	if  != nil ||  != nil { // If it's not an empty data frame, check quota.
		if  := .wq.get(int32(len() + len()));  != nil {
			return 
		}
	}
	return .controlBuf.put()
}

func ( *http2Client) ( http2.Frame) *Stream {
	.mu.Lock()
	 := .activeStreams[.Header().StreamID]
	.mu.Unlock()
	return 
}

// adjustWindow sends out extra window update over the initial window size
// of stream if the application is requesting data larger in size than
// the window.
func ( *http2Client) ( *Stream,  uint32) {
	if  := .fc.maybeAdjust();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
	}
}

// updateWindow adjusts the inbound quota for the stream.
// Window updates will be sent out when the cumulative quota
// exceeds the corresponding threshold.
func ( *http2Client) ( *Stream,  uint32) {
	if  := .fc.onRead();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
	}
}

// updateFlowControl updates the incoming flow control windows
// for the transport and the stream based on the current bdp
// estimation.
func ( *http2Client) ( uint32) {
	 := func(interface{}) bool {
		.initialWindowSize = int32()
		.mu.Lock()
		for ,  := range .activeStreams {
			.fc.newLimit()
		}
		.mu.Unlock()
		return true
	}
	.controlBuf.executeAndPut(, &outgoingWindowUpdate{streamID: 0, increment: .fc.newLimit()})
	.controlBuf.put(&outgoingSettings{
		ss: []http2.Setting{
			{
				ID:  http2.SettingInitialWindowSize,
				Val: ,
			},
		},
	})
}

func ( *http2Client) ( *http2.DataFrame) {
	 := .Header().Length
	var  bool
	if .bdpEst != nil {
		 = .bdpEst.add()
	}
	// Decouple connection's flow control from application's read.
	// An update on connection's flow control should not depend on
	// whether user application has read the data or not. Such a
	// restriction is already imposed on the stream's flow control,
	// and therefore the sender will be blocked anyways.
	// Decoupling the connection flow control will prevent other
	// active(fast) streams from starving in presence of slow or
	// inactive streams.
	//
	if  := .fc.onData();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{
			streamID:  0,
			increment: ,
		})
	}
	if  {
		// Avoid excessive ping detection (e.g. in an L7 proxy)
		// by sending a window update prior to the BDP ping.

		if  := .fc.reset();  > 0 {
			.controlBuf.put(&outgoingWindowUpdate{
				streamID:  0,
				increment: ,
			})
		}

		.controlBuf.put(bdpPing)
	}
	// Select the right stream to dispatch.
	 := .getStream()
	if  == nil {
		return
	}
	if  > 0 {
		if  := .fc.onData();  != nil {
			.closeStream(, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, .Error()), nil, false)
			return
		}
		if .Header().Flags.Has(http2.FlagDataPadded) {
			if  := .fc.onRead( - uint32(len(.Data())));  > 0 {
				.controlBuf.put(&outgoingWindowUpdate{.id, })
			}
		}
		// TODO(bradfitz, zhaoq): A copy is required here because there is no
		// guarantee f.Data() is consumed before the arrival of next frame.
		// Can this copy be eliminated?
		if len(.Data()) > 0 {
			 := .bufferPool.get()
			.Reset()
			.Write(.Data())
			.write(recvMsg{buffer: })
		}
	}
	// The server has closed the stream without sending trailers.  Record that
	// the read direction is closed, and set the status appropriately.
	if .StreamEnded() {
		.closeStream(, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
	}
}

func ( *http2Client) ( *http2.RSTStreamFrame) {
	 := .getStream()
	if  == nil {
		return
	}
	if .ErrCode == http2.ErrCodeRefusedStream {
		// The stream was unprocessed by the server.
		atomic.StoreUint32(&.unprocessed, 1)
	}
	,  := http2ErrConvTab[.ErrCode]
	if ! {
		if logger.V(logLevel) {
			logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error: %v", .ErrCode)
		}
		 = codes.Unknown
	}
	if  == codes.Canceled {
		if ,  := .ctx.Deadline();  && !.After(time.Now()) {
			// Our deadline was already exceeded, and that was likely the cause
			// of this cancelation.  Alter the status code accordingly.
			 = codes.DeadlineExceeded
		}
	}
	.closeStream(, io.EOF, false, http2.ErrCodeNo, status.Newf(, "stream terminated by RST_STREAM with error code: %v", .ErrCode), nil, false)
}

func ( *http2Client) ( *http2.SettingsFrame,  bool) {
	if .IsAck() {
		return
	}
	var  *uint32
	var  []http2.Setting
	var  []func()
	.ForeachSetting(func( http2.Setting) error {
		switch .ID {
		case http2.SettingMaxConcurrentStreams:
			 = new(uint32)
			* = .Val
		case http2.SettingMaxHeaderListSize:
			 = append(, func() {
				.maxSendHeaderListSize = new(uint32)
				*.maxSendHeaderListSize = .Val
			})
		default:
			 = append(, )
		}
		return nil
	})
	if  &&  == nil {
		 = new(uint32)
		* = math.MaxUint32
	}
	 := &incomingSettings{
		ss: ,
	}
	if  != nil {
		 := func() {
			 := int64(*) - int64(.maxConcurrentStreams)
			.maxConcurrentStreams = *
			.streamQuota += 
			if  > 0 && .waitingStreams > 0 {
				close(.streamsQuotaAvailable) // wake all of them up.
				.streamsQuotaAvailable = make(chan struct{}, 1)
			}
		}
		 = append(, )
	}
	.controlBuf.executeAndPut(func(interface{}) bool {
		for ,  := range  {
			()
		}
		return true
	}, )
}

func ( *http2Client) ( *http2.PingFrame) {
	if .IsAck() {
		// Maybe it's a BDP ping.
		if .bdpEst != nil {
			.bdpEst.calculate(.Data)
		}
		return
	}
	 := &ping{ack: true}
	copy(.data[:], .Data[:])
	.controlBuf.put()
}

func ( *http2Client) ( *http2.GoAwayFrame) {
	.mu.Lock()
	if .state == closing {
		.mu.Unlock()
		return
	}
	if .ErrCode == http2.ErrCodeEnhanceYourCalm {
		if logger.V(logLevel) {
			logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
		}
	}
	 := .LastStreamID
	if  > 0 && %2 == 0 {
		.mu.Unlock()
		.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", ))
		return
	}
	// A client can receive multiple GoAways from the server (see
	// https://github.com/grpc/grpc-go/issues/1387).  The idea is that the first
	// GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
	// sent after an RTT delay with the ID of the last stream the server will
	// process.
	//
	// Therefore, when we get the first GoAway we don't necessarily close any
	// streams. While in case of second GoAway we close all streams created after
	// the GoAwayId. This way streams that were in-flight while the GoAway from
	// server was being sent don't get killed.
	select {
	case <-.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
		// If there are multiple GoAways the first one should always have an ID greater than the following ones.
		if  > .prevGoAwayID {
			.mu.Unlock()
			.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", , .prevGoAwayID))
			return
		}
	default:
		.setGoAwayReason()
		close(.goAway)
		defer .controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
		// Notify the clientconn about the GOAWAY before we set the state to
		// draining, to allow the client to stop attempting to create streams
		// before disallowing new streams on this connection.
		if .state != draining {
			.onClose(.goAwayReason)
			.state = draining
		}
	}
	// All streams with IDs greater than the GoAwayId
	// and smaller than the previous GoAway ID should be killed.
	 := .prevGoAwayID
	if  == 0 { // This is the first GoAway Frame.
		 = math.MaxUint32 // Kill all streams after the GoAway ID.
	}

	.prevGoAwayID = 
	if len(.activeStreams) == 0 {
		.mu.Unlock()
		.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
		return
	}

	 := make([]*Stream, 0)
	for ,  := range .activeStreams {
		if  >  &&  <=  {
			// The stream was unprocessed by the server.
			if  >  &&  <=  {
				atomic.StoreUint32(&.unprocessed, 1)
				 = append(, )
			}
		}
	}
	.mu.Unlock()
	// Called outside t.mu because closeStream can take controlBuf's mu, which
	// could induce deadlock and is not allowed.
	for ,  := range  {
		.closeStream(, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
	}
}

// setGoAwayReason sets the value of t.goAwayReason based
// on the GoAway frame received.
// It expects a lock on transport's mutext to be held by
// the caller.
func ( *http2Client) ( *http2.GoAwayFrame) {
	.goAwayReason = GoAwayNoReason
	switch .ErrCode {
	case http2.ErrCodeEnhanceYourCalm:
		if string(.DebugData()) == "too_many_pings" {
			.goAwayReason = GoAwayTooManyPings
		}
	}
	if len(.DebugData()) == 0 {
		.goAwayDebugMessage = fmt.Sprintf("code: %s", .ErrCode)
	} else {
		.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", .ErrCode, string(.DebugData()))
	}
}

func ( *http2Client) () (GoAwayReason, string) {
	.mu.Lock()
	defer .mu.Unlock()
	return .goAwayReason, .goAwayDebugMessage
}

func ( *http2Client) ( *http2.WindowUpdateFrame) {
	.controlBuf.put(&incomingWindowUpdate{
		streamID:  .Header().StreamID,
		increment: .Increment,
	})
}

// operateHeaders takes action on the decoded headers.
func ( *http2Client) ( *http2.MetaHeadersFrame) {
	 := .getStream()
	if  == nil {
		return
	}
	 := .StreamEnded()
	atomic.StoreUint32(&.bytesReceived, 1)
	 := atomic.LoadUint32(&.headerChanClosed) == 0

	if ! && ! {
		// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
		 := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
		.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, false)
		return
	}

	// frame.Truncated is set to true when framer detects that the current header
	// list size hits MaxHeaderListSize limit.
	if .Truncated {
		 := status.New(codes.Internal, "peer header list size exceeded limit")
		.closeStream(, .Err(), true, http2.ErrCodeFrameSize, , nil, )
		return
	}

	var (
		// If a gRPC Response-Headers has already been received, then it means
		// that the peer is speaking gRPC and we are in gRPC mode.
		         = !
		          = make(map[string][]string)
		 = "malformed header: missing HTTP content-type"
		    string
		      *status.Status
		   string
		 *int
		  string
		  = codes.Unknown
		// headerError is set if an error is encountered while parsing the headers
		 string
	)

	if  {
		 = "malformed header: missing HTTP status"
	}

	for ,  := range .Fields {
		switch .Name {
		case "content-type":
			if ,  := grpcutil.ContentSubtype(.Value); ! {
				 = fmt.Sprintf("transport: received unexpected content-type %q", .Value)
				break
			}
			 = ""
			[.Name] = append([.Name], .Value)
			 = true
		case "grpc-encoding":
			 = .Value
		case "grpc-status":
			,  := strconv.ParseInt(.Value, 10, 32)
			if  != nil {
				 := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", ))
				.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
				return
			}
			 = codes.Code(uint32())
		case "grpc-message":
			 = decodeGrpcMessage(.Value)
		case "grpc-status-details-bin":
			var  error
			,  = decodeGRPCStatusDetails(.Value)
			if  != nil {
				 = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", )
			}
		case ":status":
			if .Value == "200" {
				 = ""
				 := 200
				 = &
				break
			}

			,  := strconv.ParseInt(.Value, 10, 32)
			if  != nil {
				 := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", ))
				.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
				return
			}
			 := int()
			 = &

			 = fmt.Sprintf(
				"unexpected HTTP status code received from server: %d (%s)",
				,
				http.StatusText(),
			)
		default:
			if isReservedHeader(.Name) && !isWhitelistedHeader(.Name) {
				break
			}
			,  := decodeMetadataHeader(.Name, .Value)
			if  != nil {
				 = fmt.Sprintf("transport: malformed %s: %v", .Name, )
				logger.Warningf("Failed to decode metadata header (%q, %q): %v", .Name, .Value, )
				break
			}
			[.Name] = append([.Name], )
		}
	}

	if ! ||  != "" {
		var  = codes.Internal // when header does not include HTTP status, return INTERNAL

		if  != nil {
			var  bool
			,  = HTTPStatusConvTab[*]
			if ! {
				 = codes.Unknown
			}
		}
		var  []string
		if  != "" {
			 = append(, )
		}
		if  != "" {
			 = append(, )
		}
		// Verify the HTTP response is a 200.
		 := status.New(, strings.Join(, "; "))
		.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
		return
	}

	if  != "" {
		 := status.New(codes.Internal, )
		.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, )
		return
	}

	 := false

	// If headerChan hasn't been closed yet
	if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
		.headerValid = true
		if ! {
			// HEADERS frame block carries a Response-Headers.
			 = true
			// These values can be set without any synchronization because
			// stream goroutine will read it only after seeing a closed
			// headerChan which we'll close after setting this.
			.recvCompress = 
			if len() > 0 {
				.header = 
			}
		} else {
			// HEADERS frame block carries a Trailers-Only.
			.noHeaders = true
		}
		close(.headerChan)
	}

	for ,  := range .statsHandlers {
		if  {
			 := &stats.InHeader{
				Client:      true,
				WireLength:  int(.Header().Length),
				Header:      metadata.MD().Copy(),
				Compression: .recvCompress,
			}
			.HandleRPC(.ctx, )
		} else {
			 := &stats.InTrailer{
				Client:     true,
				WireLength: int(.Header().Length),
				Trailer:    metadata.MD().Copy(),
			}
			.HandleRPC(.ctx, )
		}
	}

	if ! {
		return
	}

	if  == nil {
		 = status.New(, )
	}

	// if client received END_STREAM from server while stream was still active, send RST_STREAM
	 := .getState() == streamActive
	.closeStream(, io.EOF, , http2.ErrCodeNo, , , true)
}

// readServerPreface reads and handles the initial settings frame from the
// server.
func ( *http2Client) () error {
	,  := .framer.fr.ReadFrame()
	if  != nil {
		return connectionErrorf(true, , "error reading server preface: %v", )
	}
	,  := .(*http2.SettingsFrame)
	if ! {
		return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", )
	}
	.handleSettings(, true)
	return nil
}

// reader verifies the server preface and reads all subsequent data from
// network connection.  If the server preface is not read successfully, an
// error is pushed to errCh; otherwise errCh is closed with no error.
func ( *http2Client) ( chan<- error) {
	defer close(.readerDone)

	if  := .readServerPreface();  != nil {
		 <- 
		return
	}
	close()
	if .keepaliveEnabled {
		atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
	}

	// loop to keep reading incoming messages on this transport.
	for {
		.controlBuf.throttle()
		,  := .framer.fr.ReadFrame()
		if .keepaliveEnabled {
			atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
		}
		if  != nil {
			// Abort an active stream if the http2.Framer returns a
			// http2.StreamError. This can happen only if the server's response
			// is malformed http2.
			if ,  := .(http2.StreamError);  {
				.mu.Lock()
				 := .activeStreams[.StreamID]
				.mu.Unlock()
				if  != nil {
					// use error detail to provide better err message
					 := http2ErrConvTab[.Code]
					 := .framer.fr.ErrorDetail()
					var  string
					if  != nil {
						 = .Error()
					} else {
						 = "received invalid frame"
					}
					.closeStream(, status.Error(, ), true, http2.ErrCodeProtocol, status.New(, ), nil, false)
				}
				continue
			} else {
				// Transport error.
				.Close(connectionErrorf(true, , "error reading from server: %v", ))
				return
			}
		}
		switch frame := .(type) {
		case *http2.MetaHeadersFrame:
			.operateHeaders()
		case *http2.DataFrame:
			.handleData()
		case *http2.RSTStreamFrame:
			.handleRSTStream()
		case *http2.SettingsFrame:
			.handleSettings(, false)
		case *http2.PingFrame:
			.handlePing()
		case *http2.GoAwayFrame:
			.handleGoAway()
		case *http2.WindowUpdateFrame:
			.handleWindowUpdate()
		default:
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", )
			}
		}
	}
}

func (,  time.Duration) time.Duration {
	if  <  {
		return 
	}
	return 
}

// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
func ( *http2Client) () {
	 := &ping{data: [8]byte{}}
	// True iff a ping has been sent, and no data has been received since then.
	 := false
	// Amount of time remaining before which we should receive an ACK for the
	// last sent ping.
	 := time.Duration(0)
	// Records the last value of t.lastRead before we go block on the timer.
	// This is required to check for read activity since then.
	 := time.Now().UnixNano()
	 := time.NewTimer(.kp.Time)
	for {
		select {
		case <-.C:
			 := atomic.LoadInt64(&.lastRead)
			if  >  {
				// There has been read activity since the last time we were here.
				 = false
				// Next timer should fire at kp.Time seconds from lastRead time.
				.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
				 = 
				continue
			}
			if  &&  <= 0 {
				.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
				return
			}
			.mu.Lock()
			if .state == closing {
				// If the transport is closing, we should exit from the
				// keepalive goroutine here. If not, we could have a race
				// between the call to Signal() from Close() and the call to
				// Wait() here, whereby the keepalive goroutine ends up
				// blocking on the condition variable which will never be
				// signalled again.
				.mu.Unlock()
				return
			}
			if len(.activeStreams) < 1 && !.kp.PermitWithoutStream {
				// If a ping was sent out previously (because there were active
				// streams at that point) which wasn't acked and its timeout
				// hadn't fired, but we got here and are about to go dormant,
				// we should make sure that we unconditionally send a ping once
				// we awaken.
				 = false
				.kpDormant = true
				.kpDormancyCond.Wait()
			}
			.kpDormant = false
			.mu.Unlock()

			// We get here either because we were dormant and a new stream was
			// created which unblocked the Wait() call, or because the
			// keepalive timer expired. In both cases, we need to send a ping.
			if ! {
				if channelz.IsOn() {
					atomic.AddInt64(&.czData.kpCount, 1)
				}
				.controlBuf.put()
				 = .kp.Timeout
				 = true
			}
			// The amount of time to sleep here is the minimum of kp.Time and
			// timeoutLeft. This will ensure that we wait only for kp.Time
			// before sending out the next ping (for cases where the ping is
			// acked).
			 := minTime(.kp.Time, )
			 -= 
			.Reset()
		case <-.ctx.Done():
			if !.Stop() {
				<-.C
			}
			return
		}
	}
}

func ( *http2Client) () <-chan struct{} {
	return .ctx.Done()
}

func ( *http2Client) () <-chan struct{} {
	return .goAway
}

func ( *http2Client) () *channelz.SocketInternalMetric {
	 := channelz.SocketInternalMetric{
		StreamsStarted:                  atomic.LoadInt64(&.czData.streamsStarted),
		StreamsSucceeded:                atomic.LoadInt64(&.czData.streamsSucceeded),
		StreamsFailed:                   atomic.LoadInt64(&.czData.streamsFailed),
		MessagesSent:                    atomic.LoadInt64(&.czData.msgSent),
		MessagesReceived:                atomic.LoadInt64(&.czData.msgRecv),
		KeepAlivesSent:                  atomic.LoadInt64(&.czData.kpCount),
		LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastStreamCreatedTime)),
		LastMessageSentTimestamp:        time.Unix(0, atomic.LoadInt64(&.czData.lastMsgSentTime)),
		LastMessageReceivedTimestamp:    time.Unix(0, atomic.LoadInt64(&.czData.lastMsgRecvTime)),
		LocalFlowControlWindow:          int64(.fc.getSize()),
		SocketOptions:                   channelz.GetSocketOption(.conn),
		LocalAddr:                       .localAddr,
		RemoteAddr:                      .remoteAddr,
		// RemoteName :
	}
	if ,  := .authInfo.(credentials.ChannelzSecurityInfo);  {
		.Security = .GetSecurityValue()
	}
	.RemoteFlowControlWindow = .getOutFlowWindow()
	return &
}

func ( *http2Client) () net.Addr { return .remoteAddr }

func ( *http2Client) () {
	atomic.AddInt64(&.czData.msgSent, 1)
	atomic.StoreInt64(&.czData.lastMsgSentTime, time.Now().UnixNano())
}

func ( *http2Client) () {
	atomic.AddInt64(&.czData.msgRecv, 1)
	atomic.StoreInt64(&.czData.lastMsgRecvTime, time.Now().UnixNano())
}

func ( *http2Client) () int64 {
	 := make(chan uint32, 1)
	 := time.NewTimer(time.Second)
	defer .Stop()
	.controlBuf.put(&outFlowControlSizeRequest{})
	select {
	case  := <-:
		return int64()
	case <-.ctxDone:
		return -1
	case <-.C:
		return -2
	}
}

func ( *http2Client) () transportState {
	.mu.Lock()
	defer .mu.Unlock()
	return .state
}