package http2
import (
mathrand
)
const (
transportDefaultConnFlow = 1 << 30
transportDefaultStreamFlow = 4 << 20
defaultUserAgent = "Go-http-client/2.0"
initialMaxConcurrentStreams = 100
defaultMaxConcurrentStreams = 1000
)
type Transport struct {
DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
TLSClientConfig *tls.Config
ConnPool ClientConnPool
DisableCompression bool
AllowHTTP bool
MaxHeaderListSize uint32
MaxReadFrameSize uint32
MaxDecoderHeaderTableSize uint32
MaxEncoderHeaderTableSize uint32
StrictMaxConcurrentStreams bool
ReadIdleTimeout time.Duration
PingTimeout time.Duration
WriteByteTimeout time.Duration
CountError func(errType string)
t1 *http.Transport
connPoolOnce sync.Once
connPoolOrDef ClientConnPool
}
func ( *Transport) () uint32 {
if .MaxHeaderListSize == 0 {
return 10 << 20
}
if .MaxHeaderListSize == 0xffffffff {
return 0
}
return .MaxHeaderListSize
}
func ( *Transport) () uint32 {
if .MaxReadFrameSize == 0 {
return 0
}
if .MaxReadFrameSize < minMaxFrameSize {
return minMaxFrameSize
}
if .MaxReadFrameSize > maxFrameSize {
return maxFrameSize
}
return .MaxReadFrameSize
}
func ( *Transport) () bool {
return .DisableCompression || (.t1 != nil && .t1.DisableCompression)
}
func ( *Transport) () time.Duration {
if .PingTimeout == 0 {
return 15 * time.Second
}
return .PingTimeout
}
func ( *http.Transport) error {
, := ConfigureTransports()
return
}
func ( *http.Transport) (*Transport, error) {
return configureTransports()
}
func ( *http.Transport) (*Transport, error) {
:= new(clientConnPool)
:= &Transport{
ConnPool: noDialClientConnPool{},
t1: ,
}
.t =
if := registerHTTPSProtocol(, noDialH2RoundTripper{}); != nil {
return nil,
}
if .TLSClientConfig == nil {
.TLSClientConfig = new(tls.Config)
}
if !strSliceContains(.TLSClientConfig.NextProtos, "h2") {
.TLSClientConfig.NextProtos = append([]string{"h2"}, .TLSClientConfig.NextProtos...)
}
if !strSliceContains(.TLSClientConfig.NextProtos, "http/1.1") {
.TLSClientConfig.NextProtos = append(.TLSClientConfig.NextProtos, "http/1.1")
}
:= func( string, *tls.Conn) http.RoundTripper {
:= authorityAddr("https", )
if , := .addConnIfNeeded(, , ); != nil {
go .Close()
return erringRoundTripper{}
} else if ! {
go .Close()
}
return
}
if := .TLSNextProto; len() == 0 {
.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{
"h2": ,
}
} else {
["h2"] =
}
return , nil
}
func ( *Transport) () ClientConnPool {
.connPoolOnce.Do(.initConnPool)
return .connPoolOrDef
}
func ( *Transport) () {
if .ConnPool != nil {
.connPoolOrDef = .ConnPool
} else {
.connPoolOrDef = &clientConnPool{t: }
}
}
type ClientConn struct {
t *Transport
tconn net.Conn
tconnClosed bool
tlsState *tls.ConnectionState
reused uint32
singleUse bool
getConnCalled bool
readerDone chan struct{}
readerErr error
idleTimeout time.Duration
idleTimer *time.Timer
mu sync.Mutex
cond *sync.Cond
flow outflow
inflow inflow
doNotReuse bool
closing bool
closed bool
seenSettings bool
wantSettingsAck bool
goAway *GoAwayFrame
goAwayDebug string
streams map[uint32]*clientStream
streamsReserved int
nextStreamID uint32
pendingRequests int
pings map[[8]byte]chan struct{}
br *bufio.Reader
lastActive time.Time
lastIdle time.Time
maxFrameSize uint32
maxConcurrentStreams uint32
peerMaxHeaderListSize uint64
peerMaxHeaderTableSize uint32
initialWindowSize uint32
reqHeaderMu chan struct{}
wmu sync.Mutex
bw *bufio.Writer
fr *Framer
werr error
hbuf bytes.Buffer
henc *hpack.Encoder
}
type clientStream struct {
cc *ClientConn
ctx context.Context
reqCancel <-chan struct{}
trace *httptrace.ClientTrace
ID uint32
bufPipe pipe
requestedGzip bool
isHead bool
abortOnce sync.Once
abort chan struct{}
abortErr error
peerClosed chan struct{}
donec chan struct{}
on100 chan struct{}
respHeaderRecv chan struct{}
res *http.Response
flow outflow
inflow inflow
bytesRemain int64
readErr error
reqBody io.ReadCloser
reqBodyContentLength int64
reqBodyClosed chan struct{}
sentEndStream bool
sentHeaders bool
firstByte bool
pastHeaders bool
pastTrailers bool
num1xx uint8
readClosed bool
readAborted bool
trailer http.Header
resTrailer *http.Header
}
var got1xxFuncForTests func(int, textproto.MIMEHeader) error
func ( *clientStream) () func(int, textproto.MIMEHeader) error {
if := got1xxFuncForTests; != nil {
return
}
return traceGot1xxResponseFunc(.trace)
}
func ( *clientStream) ( error) {
.cc.mu.Lock()
defer .cc.mu.Unlock()
.abortStreamLocked()
}
func ( *clientStream) ( error) {
.abortOnce.Do(func() {
.abortErr =
close(.abort)
})
if .reqBody != nil {
.closeReqBodyLocked()
}
if .cc.cond != nil {
.cc.cond.Broadcast()
}
}
func ( *clientStream) () {
:= .cc
.mu.Lock()
defer .mu.Unlock()
if .reqBody != nil && .reqBodyClosed == nil {
.closeReqBodyLocked()
.cond.Broadcast()
}
}
func ( *clientStream) () {
if .reqBodyClosed != nil {
return
}
.reqBodyClosed = make(chan struct{})
:= .reqBodyClosed
go func() {
.reqBody.Close()
close()
}()
}
type stickyErrWriter struct {
conn net.Conn
timeout time.Duration
err *error
}
func ( stickyErrWriter) ( []byte) ( int, error) {
if *.err != nil {
return 0, *.err
}
for {
if .timeout != 0 {
.conn.SetWriteDeadline(time.Now().Add(.timeout))
}
, := .conn.Write([:])
+=
if < len() && > 0 && errors.Is(, os.ErrDeadlineExceeded) {
continue
}
if .timeout != 0 {
.conn.SetWriteDeadline(time.Time{})
}
*.err =
return ,
}
}
type noCachedConnError struct{}
func (noCachedConnError) () {}
func (noCachedConnError) () string { return "http2: no cached connection was available" }
func ( error) bool {
, := .(interface{ () })
return
}
var ErrNoCachedConn error = noCachedConnError{}
type RoundTripOpt struct {
OnlyCachedConn bool
}
func ( *Transport) ( *http.Request) (*http.Response, error) {
return .RoundTripOpt(, RoundTripOpt{})
}
func ( string, string) ( string) {
, , := net.SplitHostPort()
if != nil {
= "443"
if == "http" {
= "80"
}
=
}
if , := idna.ToASCII(); == nil {
=
}
if strings.HasPrefix(, "[") && strings.HasSuffix(, "]") {
return + ":" +
}
return net.JoinHostPort(, )
}
var retryBackoffHook func(time.Duration) *time.Timer
func ( time.Duration) *time.Timer {
if retryBackoffHook != nil {
return retryBackoffHook()
}
return time.NewTimer()
}
func ( *Transport) ( *http.Request, RoundTripOpt) (*http.Response, error) {
if !(.URL.Scheme == "https" || (.URL.Scheme == "http" && .AllowHTTP)) {
return nil, errors.New("http2: unsupported scheme")
}
:= authorityAddr(.URL.Scheme, .URL.Host)
for := 0; ; ++ {
, := .connPool().GetClientConn(, )
if != nil {
.vlogf("http2: Transport failed to get client conn for %s: %v", , )
return nil,
}
:= !atomic.CompareAndSwapUint32(&.reused, 0, 1)
traceGotConn(, , )
, := .RoundTrip()
if != nil && <= 6 {
if , = shouldRetryRequest(, ); == nil {
if == 0 {
.vlogf("RoundTrip retrying after failure: %v", )
continue
}
:= float64(uint(1) << (uint() - 1))
+= * (0.1 * mathrand.Float64())
:= time.Second * time.Duration()
:= backoffNewTimer()
select {
case <-.C:
.vlogf("RoundTrip retrying after failure: %v", )
continue
case <-.Context().Done():
.Stop()
= .Context().Err()
}
}
}
if != nil {
.vlogf("RoundTrip failure: %v", )
return nil,
}
return , nil
}
}
func ( *Transport) () {
if , := .connPool().(clientConnPoolIdleCloser); {
.closeIdleConnections()
}
}
var (
errClientConnClosed = errors.New("http2: client conn is closed")
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)
func ( *http.Request, error) (*http.Request, error) {
if !canRetryError() {
return nil,
}
if .Body == nil || .Body == http.NoBody {
return , nil
}
if .GetBody != nil {
, := .GetBody()
if != nil {
return nil,
}
:= *
.Body =
return &, nil
}
if == errClientConnUnusable {
return , nil
}
return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", )
}
func ( error) bool {
if == errClientConnUnusable || == errClientConnGotGoAway {
return true
}
if , := .(StreamError); {
if .Code == ErrCodeProtocol && .Cause == errFromPeer {
return true
}
return .Code == ErrCodeRefusedStream
}
return false
}
func ( *Transport) ( context.Context, string, bool) (*ClientConn, error) {
, , := net.SplitHostPort()
if != nil {
return nil,
}
, := .dialTLS(, "tcp", , .newTLSConfig())
if != nil {
return nil,
}
return .newClientConn(, )
}
func ( *Transport) ( string) *tls.Config {
:= new(tls.Config)
if .TLSClientConfig != nil {
* = *.TLSClientConfig.Clone()
}
if !strSliceContains(.NextProtos, NextProtoTLS) {
.NextProtos = append([]string{NextProtoTLS}, .NextProtos...)
}
if .ServerName == "" {
.ServerName =
}
return
}
func ( *Transport) ( context.Context, , string, *tls.Config) (net.Conn, error) {
if .DialTLSContext != nil {
return .DialTLSContext(, , , )
} else if .DialTLS != nil {
return .DialTLS(, , )
}
, := .dialTLSWithContext(, , , )
if != nil {
return nil,
}
:= .ConnectionState()
if := .NegotiatedProtocol; != NextProtoTLS {
return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", , NextProtoTLS)
}
if !.NegotiatedProtocolIsMutual {
return nil, errors.New("http2: could not negotiate protocol mutually")
}
return , nil
}
func ( *Transport) () bool {
return .t1 != nil && .t1.DisableKeepAlives
}
func ( *Transport) () time.Duration {
if .t1 == nil {
return 0
}
return .t1.ExpectContinueTimeout
}
func ( *Transport) () uint32 {
if := .MaxDecoderHeaderTableSize; > 0 {
return
}
return initialHeaderTableSize
}
func ( *Transport) () uint32 {
if := .MaxEncoderHeaderTableSize; > 0 {
return
}
return initialHeaderTableSize
}
func ( *Transport) ( net.Conn) (*ClientConn, error) {
return .newClientConn(, .disableKeepAlives())
}
func ( *Transport) ( net.Conn, bool) (*ClientConn, error) {
:= &ClientConn{
t: ,
tconn: ,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10,
initialWindowSize: 65535,
maxConcurrentStreams: initialMaxConcurrentStreams,
peerMaxHeaderListSize: 0xffffffffffffffff,
streams: make(map[uint32]*clientStream),
singleUse: ,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
}
if := .idleConnTimeout(); != 0 {
.idleTimeout =
.idleTimer = time.AfterFunc(, .onIdleTimeout)
}
if VerboseLogs {
.vlogf("http2: Transport creating client conn %p to %v", , .RemoteAddr())
}
.cond = sync.NewCond(&.mu)
.flow.add(int32(initialWindowSize))
.bw = bufio.NewWriter(stickyErrWriter{
conn: ,
timeout: .WriteByteTimeout,
err: &.werr,
})
.br = bufio.NewReader()
.fr = NewFramer(.bw, .br)
if .maxFrameReadSize() != 0 {
.fr.SetMaxReadFrameSize(.maxFrameReadSize())
}
if .CountError != nil {
.fr.countError = .CountError
}
:= .maxDecoderHeaderTableSize()
.fr.ReadMetaHeaders = hpack.NewDecoder(, nil)
.fr.MaxHeaderListSize = .maxHeaderListSize()
.henc = hpack.NewEncoder(&.hbuf)
.henc.SetMaxDynamicTableSizeLimit(.maxEncoderHeaderTableSize())
.peerMaxHeaderTableSize = initialHeaderTableSize
if .AllowHTTP {
.nextStreamID = 3
}
if , := .(connectionStater); {
:= .ConnectionState()
.tlsState = &
}
:= []Setting{
{ID: SettingEnablePush, Val: 0},
{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
}
if := .maxFrameReadSize(); != 0 {
= append(, Setting{ID: SettingMaxFrameSize, Val: })
}
if := .maxHeaderListSize(); != 0 {
= append(, Setting{ID: SettingMaxHeaderListSize, Val: })
}
if != initialHeaderTableSize {
= append(, Setting{ID: SettingHeaderTableSize, Val: })
}
.bw.Write(clientPreface)
.fr.WriteSettings(...)
.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
.inflow.init(transportDefaultConnFlow + initialWindowSize)
.bw.Flush()
if .werr != nil {
.Close()
return nil, .werr
}
go .readLoop()
return , nil
}
func ( *ClientConn) () {
:= .t.pingTimeout()
, := context.WithTimeout(context.Background(), )
defer ()
.vlogf("http2: Transport sending health check")
:= .Ping()
if != nil {
.vlogf("http2: Transport health check failure: %v", )
.closeForLostPing()
} else {
.vlogf("http2: Transport health check success")
}
}
func ( *ClientConn) () {
.mu.Lock()
defer .mu.Unlock()
.doNotReuse = true
}
func ( *ClientConn) ( *GoAwayFrame) {
.mu.Lock()
defer .mu.Unlock()
:= .goAway
.goAway =
if .goAwayDebug == "" {
.goAwayDebug = string(.DebugData())
}
if != nil && .ErrCode != ErrCodeNo {
.goAway.ErrCode = .ErrCode
}
:= .LastStreamID
for , := range .streams {
if > {
.abortStreamLocked(errClientConnGotGoAway)
}
}
}
func ( *ClientConn) () bool {
.mu.Lock()
defer .mu.Unlock()
return .canTakeNewRequestLocked()
}
func ( *ClientConn) () bool {
.mu.Lock()
defer .mu.Unlock()
if := .idleStateLocked(); !.canTakeNewRequest {
return false
}
.streamsReserved++
return true
}
type ClientConnState struct {
Closed bool
Closing bool
StreamsActive int
StreamsReserved int
StreamsPending int
MaxConcurrentStreams uint32
LastIdle time.Time
}
func ( *ClientConn) () ClientConnState {
.wmu.Lock()
:= .maxConcurrentStreams
if !.seenSettings {
= 0
}
.wmu.Unlock()
.mu.Lock()
defer .mu.Unlock()
return ClientConnState{
Closed: .closed,
Closing: .closing || .singleUse || .doNotReuse || .goAway != nil,
StreamsActive: len(.streams),
StreamsReserved: .streamsReserved,
StreamsPending: .pendingRequests,
LastIdle: .lastIdle,
MaxConcurrentStreams: ,
}
}
type clientConnIdleState struct {
canTakeNewRequest bool
}
func ( *ClientConn) () clientConnIdleState {
.mu.Lock()
defer .mu.Unlock()
return .idleStateLocked()
}
func ( *ClientConn) () ( clientConnIdleState) {
if .singleUse && .nextStreamID > 1 {
return
}
var bool
if .t.StrictMaxConcurrentStreams {
= true
} else {
= int64(len(.streams)+.streamsReserved+1) <= int64(.maxConcurrentStreams)
}
.canTakeNewRequest = .goAway == nil && !.closed && !.closing && &&
!.doNotReuse &&
int64(.nextStreamID)+2*int64(.pendingRequests) < math.MaxInt32 &&
!.tooIdleLocked()
return
}
func ( *ClientConn) () bool {
:= .idleStateLocked()
return .canTakeNewRequest
}
func ( *ClientConn) () bool {
return .idleTimeout != 0 && !.lastIdle.IsZero() && time.Since(.lastIdle.Round(0)) > .idleTimeout
}
func ( *ClientConn) () {
.closeIfIdle()
}
func ( *ClientConn) () {
:= time.AfterFunc(250*time.Millisecond, .forceCloseConn)
defer .Stop()
.tconn.Close()
}
func ( *ClientConn) () {
, := .tconn.(*tls.Conn)
if ! {
return
}
if := tlsUnderlyingConn(); != nil {
.Close()
}
}
func ( *ClientConn) () {
.mu.Lock()
if len(.streams) > 0 || .streamsReserved > 0 {
.mu.Unlock()
return
}
.closed = true
:= .nextStreamID
.mu.Unlock()
if VerboseLogs {
.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", , .singleUse, -2)
}
.closeConn()
}
func ( *ClientConn) () bool {
.mu.Lock()
defer .mu.Unlock()
return .doNotReuse && len(.streams) == 0
}
var shutdownEnterWaitStateHook = func() {}
func ( *ClientConn) ( context.Context) error {
if := .sendGoAway(); != nil {
return
}
:= make(chan struct{})
:= false
go func() {
.mu.Lock()
defer .mu.Unlock()
for {
if len(.streams) == 0 || .closed {
.closed = true
close()
break
}
if {
break
}
.cond.Wait()
}
}()
shutdownEnterWaitStateHook()
select {
case <-:
.closeConn()
return nil
case <-.Done():
.mu.Lock()
= true
.cond.Broadcast()
.mu.Unlock()
return .Err()
}
}
func ( *ClientConn) () error {
.mu.Lock()
:= .closing
.closing = true
:= .nextStreamID
.mu.Unlock()
if {
return nil
}
.wmu.Lock()
defer .wmu.Unlock()
if := .fr.WriteGoAway(, ErrCodeNo, nil); != nil {
return
}
if := .bw.Flush(); != nil {
return
}
return nil
}
func ( *ClientConn) ( error) {
.mu.Lock()
.closed = true
for , := range .streams {
.abortStreamLocked()
}
.cond.Broadcast()
.mu.Unlock()
.closeConn()
}
func ( *ClientConn) () error {
:= errors.New("http2: client connection force closed via ClientConn.Close")
.closeForError()
return nil
}
func ( *ClientConn) () {
:= errors.New("http2: client connection lost")
if := .t.CountError; != nil {
("conn_close_lost_ping")
}
.closeForError()
}
var errRequestCanceled = errors.New("net/http: request canceled")
func ( *http.Request) (string, error) {
:= make([]string, 0, len(.Trailer))
for := range .Trailer {
= canonicalHeader()
switch {
case "Transfer-Encoding", "Trailer", "Content-Length":
return "", fmt.Errorf("invalid Trailer key %q", )
}
= append(, )
}
if len() > 0 {
sort.Strings()
return strings.Join(, ","), nil
}
return "", nil
}
func ( *ClientConn) () time.Duration {
if .t.t1 != nil {
return .t.t1.ResponseHeaderTimeout
}
return 0
}
func ( *http.Request) error {
if := .Header.Get("Upgrade"); != "" {
return fmt.Errorf("http2: invalid Upgrade request header: %q", .Header["Upgrade"])
}
if := .Header["Transfer-Encoding"]; len() > 0 && (len() > 1 || [0] != "" && [0] != "chunked") {
return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", )
}
if := .Header["Connection"]; len() > 0 && (len() > 1 || [0] != "" && !asciiEqualFold([0], "close") && !asciiEqualFold([0], "keep-alive")) {
return fmt.Errorf("http2: invalid Connection request header: %q", )
}
return nil
}
func ( *http.Request) int64 {
if .Body == nil || .Body == http.NoBody {
return 0
}
if .ContentLength != 0 {
return .ContentLength
}
return -1
}
func ( *ClientConn) () {
.mu.Lock()
defer .mu.Unlock()
.decrStreamReservationsLocked()
}
func ( *ClientConn) () {
if .streamsReserved > 0 {
.streamsReserved--
}
}
func ( *ClientConn) ( *http.Request) (*http.Response, error) {
:= .Context()
:= &clientStream{
cc: ,
ctx: ,
reqCancel: .Cancel,
isHead: .Method == "HEAD",
reqBody: .Body,
reqBodyContentLength: actualContentLength(),
trace: httptrace.ContextClientTrace(),
peerClosed: make(chan struct{}),
abort: make(chan struct{}),
respHeaderRecv: make(chan struct{}),
donec: make(chan struct{}),
}
go .doRequest()
:= func() error {
select {
case <-.donec:
return nil
case <-.Done():
return .Err()
case <-.reqCancel:
return errRequestCanceled
}
}
:= func() (*http.Response, error) {
:= .res
if .StatusCode > 299 {
.abortRequestBodyWrite()
}
.Request =
.TLS = .tlsState
if .Body == noBody && actualContentLength() == 0 {
if := (); != nil {
return nil,
}
}
return , nil
}
for {
select {
case <-.respHeaderRecv:
return ()
case <-.abort:
select {
case <-.respHeaderRecv:
return ()
default:
()
return nil, .abortErr
}
case <-.Done():
:= .Err()
.abortStream()
return nil,
case <-.reqCancel:
.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
}
func ( *clientStream) ( *http.Request) {
:= .writeRequest()
.cleanupWriteRequest()
}
func ( *clientStream) ( *http.Request) ( error) {
:= .cc
:= .ctx
if := checkConnHeaders(); != nil {
return
}
if .reqHeaderMu == nil {
panic("RoundTrip on uninitialized ClientConn")
}
select {
case .reqHeaderMu <- struct{}{}:
case <-.reqCancel:
return errRequestCanceled
case <-.Done():
return .Err()
}
.mu.Lock()
if .idleTimer != nil {
.idleTimer.Stop()
}
.decrStreamReservationsLocked()
if := .awaitOpenSlotForStreamLocked(); != nil {
.mu.Unlock()
<-.reqHeaderMu
return
}
.addStreamLocked()
if isConnectionCloseRequest() {
.doNotReuse = true
}
.mu.Unlock()
if !.t.disableCompression() &&
.Header.Get("Accept-Encoding") == "" &&
.Header.Get("Range") == "" &&
!.isHead {
.requestedGzip = true
}
:= .t.expectContinueTimeout()
if != 0 {
if !httpguts.HeaderValuesContainsToken(.Header["Expect"], "100-continue") {
= 0
} else {
.on100 = make(chan struct{}, 1)
}
}
= .encodeAndWriteHeaders()
<-.reqHeaderMu
if != nil {
return
}
:= .reqBodyContentLength != 0
if ! {
.sentEndStream = true
} else {
if != 0 {
traceWait100Continue(.trace)
:= time.NewTimer()
select {
case <-.C:
= nil
case <-.on100:
= nil
case <-.abort:
= .abortErr
case <-.Done():
= .Err()
case <-.reqCancel:
= errRequestCanceled
}
.Stop()
if != nil {
traceWroteRequest(.trace, )
return
}
}
if = .writeRequestBody(); != nil {
if != errStopReqBodyWrite {
traceWroteRequest(.trace, )
return
}
} else {
.sentEndStream = true
}
}
traceWroteRequest(.trace, )
var <-chan time.Time
var chan struct{}
if := .responseHeaderTimeout(); != 0 {
:= time.NewTimer()
defer .Stop()
= .C
= .respHeaderRecv
}
for {
select {
case <-.peerClosed:
return nil
case <-:
return errTimeout
case <-:
= nil
= nil
case <-.abort:
return .abortErr
case <-.Done():
return .Err()
case <-.reqCancel:
return errRequestCanceled
}
}
}
func ( *clientStream) ( *http.Request) error {
:= .cc
:= .ctx
.wmu.Lock()
defer .wmu.Unlock()
select {
case <-.abort:
return .abortErr
case <-.Done():
return .Err()
case <-.reqCancel:
return errRequestCanceled
default:
}
, := commaSeparatedTrailers()
if != nil {
return
}
:= != ""
:= actualContentLength()
:= != 0
, := .encodeHeaders(, .requestedGzip, , )
if != nil {
return
}
:= ! && !
.sentHeaders = true
= .writeHeaders(.ID, , int(.maxFrameSize), )
traceWroteHeaders(.trace)
return
}
func ( *clientStream) ( error) {
:= .cc
if .ID == 0 {
.decrStreamReservations()
}
.mu.Lock()
:= false
if .reqBody != nil && .reqBodyClosed == nil {
= true
.reqBodyClosed = make(chan struct{})
}
:= .reqBodyClosed
.mu.Unlock()
if {
.reqBody.Close()
close()
}
if != nil {
<-
}
if != nil && .sentEndStream {
select {
case <-.peerClosed:
= nil
default:
}
}
if != nil {
.abortStream()
if .sentHeaders {
if , := .(StreamError); {
if .Cause != errFromPeer {
.writeStreamReset(.ID, .Code, )
}
} else {
.writeStreamReset(.ID, ErrCodeCancel, )
}
}
.bufPipe.CloseWithError()
} else {
if .sentHeaders && !.sentEndStream {
.writeStreamReset(.ID, ErrCodeNo, nil)
}
.bufPipe.CloseWithError(errRequestCanceled)
}
if .ID != 0 {
.forgetStreamID(.ID)
}
.wmu.Lock()
:= .werr
.wmu.Unlock()
if != nil {
.Close()
}
close(.donec)
}
func ( *ClientConn) ( *clientStream) error {
for {
.lastActive = time.Now()
if .closed || !.canTakeNewRequestLocked() {
return errClientConnUnusable
}
.lastIdle = time.Time{}
if int64(len(.streams)) < int64(.maxConcurrentStreams) {
return nil
}
.pendingRequests++
.cond.Wait()
.pendingRequests--
select {
case <-.abort:
return .abortErr
default:
}
}
}
func ( *ClientConn) ( uint32, bool, int, []byte) error {
:= true
for len() > 0 && .werr == nil {
:=
if len() > {
= [:]
}
= [len():]
:= len() == 0
if {
.fr.WriteHeaders(HeadersFrameParam{
StreamID: ,
BlockFragment: ,
EndStream: ,
EndHeaders: ,
})
= false
} else {
.fr.WriteContinuation(, , )
}
}
.bw.Flush()
return .werr
}
var (
errStopReqBodyWrite = errors.New("http2: aborting request body write")
errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
)
func ( *clientStream) ( int) int {
const = 512 << 10
:= int64()
if > {
=
}
if := .reqBodyContentLength; != -1 && +1 < {
= + 1
}
if < 1 {
return 1
}
return int()
}
var bufPool sync.Pool
func ( *clientStream) ( *http.Request) ( error) {
:= .cc
:= .reqBody
:= false
:= .Trailer != nil
:= .reqBodyContentLength
:= != -1
.mu.Lock()
:= int(.maxFrameSize)
.mu.Unlock()
:= .frameScratchBufferLen()
var []byte
if , := bufPool.Get().(*[]byte); && len(*) >= {
defer bufPool.Put()
= *
} else {
= make([]byte, )
defer bufPool.Put(&)
}
var bool
for ! {
, := .Read()
if {
-= int64()
if == 0 && == nil {
var [1]byte
var int
, = .Read([:])
-= int64()
}
if < 0 {
= errReqBodyTooLong
return
}
}
if != nil {
.mu.Lock()
:= .reqBodyClosed != nil
.mu.Unlock()
switch {
case :
return errStopReqBodyWrite
case == io.EOF:
= true
= nil
default:
return
}
}
:= [:]
for len() > 0 && == nil {
var int32
, = .awaitFlowControl(len())
if != nil {
return
}
.wmu.Lock()
:= [:]
= [:]
= && len() == 0 && !
= .fr.WriteData(.ID, , )
if == nil {
= .bw.Flush()
}
.wmu.Unlock()
}
if != nil {
return
}
}
if {
return nil
}
.mu.Lock()
:= .Trailer
= .abortErr
.mu.Unlock()
if != nil {
return
}
.wmu.Lock()
defer .wmu.Unlock()
var []byte
if len() > 0 {
, = .encodeTrailers()
if != nil {
return
}
}
if len() > 0 {
= .writeHeaders(.ID, true, , )
} else {
= .fr.WriteData(.ID, true, nil)
}
if := .bw.Flush(); != nil && == nil {
=
}
return
}
func ( *clientStream) ( int) ( int32, error) {
:= .cc
:= .ctx
.mu.Lock()
defer .mu.Unlock()
for {
if .closed {
return 0, errClientConnClosed
}
if .reqBodyClosed != nil {
return 0, errStopReqBodyWrite
}
select {
case <-.abort:
return 0, .abortErr
case <-.Done():
return 0, .Err()
case <-.reqCancel:
return 0, errRequestCanceled
default:
}
if := .flow.available(); > 0 {
:=
if int() > {
= int32()
}
if > int32(.maxFrameSize) {
= int32(.maxFrameSize)
}
.flow.take()
return , nil
}
.cond.Wait()
}
}
var errNilRequestURL = errors.New("http2: Request.URI is nil")
func ( *ClientConn) ( *http.Request, bool, string, int64) ([]byte, error) {
.hbuf.Reset()
if .URL == nil {
return nil, errNilRequestURL
}
:= .Host
if == "" {
= .URL.Host
}
, := httpguts.PunycodeHostPort()
if != nil {
return nil,
}
var string
if .Method != "CONNECT" {
= .URL.RequestURI()
if !validPseudoPath() {
:=
= strings.TrimPrefix(, .URL.Scheme+"://"+)
if !validPseudoPath() {
if .URL.Opaque != "" {
return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", , .URL.Opaque)
} else {
return nil, fmt.Errorf("invalid request :path %q", )
}
}
}
}
for , := range .Header {
if !httpguts.ValidHeaderFieldName() {
return nil, fmt.Errorf("invalid HTTP header name %q", )
}
for , := range {
if !httpguts.ValidHeaderFieldValue() {
return nil, fmt.Errorf("invalid HTTP header value for header %q", )
}
}
}
:= func( func(, string)) {
(":authority", )
:= .Method
if == "" {
= http.MethodGet
}
(":method", )
if .Method != "CONNECT" {
(":path", )
(":scheme", .URL.Scheme)
}
if != "" {
("trailer", )
}
var bool
for , := range .Header {
if asciiEqualFold(, "host") || asciiEqualFold(, "content-length") {
continue
} else if asciiEqualFold(, "connection") ||
asciiEqualFold(, "proxy-connection") ||
asciiEqualFold(, "transfer-encoding") ||
asciiEqualFold(, "upgrade") ||
asciiEqualFold(, "keep-alive") {
continue
} else if asciiEqualFold(, "user-agent") {
= true
if len() < 1 {
continue
}
= [:1]
if [0] == "" {
continue
}
} else if asciiEqualFold(, "cookie") {
for , := range {
for {
:= strings.IndexByte(, ';')
if < 0 {
break
}
("cookie", [:])
++
for +1 <= len() && [] == ' ' {
++
}
= [:]
}
if len() > 0 {
("cookie", )
}
}
continue
}
for , := range {
(, )
}
}
if shouldSendReqContentLength(.Method, ) {
("content-length", strconv.FormatInt(, 10))
}
if {
("accept-encoding", "gzip")
}
if ! {
("user-agent", defaultUserAgent)
}
}
:= uint64(0)
(func(, string) {
:= hpack.HeaderField{Name: , Value: }
+= uint64(.Size())
})
if > .peerMaxHeaderListSize {
return nil, errRequestHeaderListSize
}
:= httptrace.ContextClientTrace(.Context())
:= traceHasWroteHeaderField()
(func(, string) {
, := lowerHeader()
if ! {
return
}
.writeHeader(, )
if {
traceWroteHeaderField(, , )
}
})
return .hbuf.Bytes(), nil
}
func ( string, int64) bool {
if > 0 {
return true
}
if < 0 {
return false
}
switch {
case "POST", "PUT", "PATCH":
return true
default:
return false
}
}
func ( *ClientConn) ( http.Header) ([]byte, error) {
.hbuf.Reset()
:= uint64(0)
for , := range {
for , := range {
:= hpack.HeaderField{Name: , Value: }
+= uint64(.Size())
}
}
if > .peerMaxHeaderListSize {
return nil, errRequestHeaderListSize
}
for , := range {
, := lowerHeader()
if ! {
continue
}
for , := range {
.writeHeader(, )
}
}
return .hbuf.Bytes(), nil
}
func ( *ClientConn) (, string) {
if VerboseLogs {
log.Printf("http2: Transport encoding header %q = %q", , )
}
.henc.WriteField(hpack.HeaderField{Name: , Value: })
}
type resAndError struct {
_ incomparable
res *http.Response
err error
}
func ( *ClientConn) ( *clientStream) {
.flow.add(int32(.initialWindowSize))
.flow.setConnFlow(&.flow)
.inflow.init(transportDefaultStreamFlow)
.ID = .nextStreamID
.nextStreamID += 2
.streams[.ID] =
if .ID == 0 {
panic("assigned stream ID 0")
}
}
func ( *ClientConn) ( uint32) {
.mu.Lock()
:= len(.streams)
delete(.streams, )
if len(.streams) != -1 {
panic("forgetting unknown stream id")
}
.lastActive = time.Now()
if len(.streams) == 0 && .idleTimer != nil {
.idleTimer.Reset(.idleTimeout)
.lastIdle = time.Now()
}
.cond.Broadcast()
:= .singleUse || .doNotReuse || .t.disableKeepAlives() || .goAway != nil
if && .streamsReserved == 0 && len(.streams) == 0 {
if VerboseLogs {
.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", , .singleUse, .nextStreamID-2)
}
.closed = true
defer .closeConn()
}
.mu.Unlock()
}
type clientConnReadLoop struct {
_ incomparable
cc *ClientConn
}
func ( *ClientConn) () {
:= &clientConnReadLoop{cc: }
defer .cleanup()
.readerErr = .run()
if , := .readerErr.(ConnectionError); {
.wmu.Lock()
.fr.WriteGoAway(0, ErrCode(), nil)
.wmu.Unlock()
}
}
type GoAwayError struct {
LastStreamID uint32
ErrCode ErrCode
DebugData string
}
func ( GoAwayError) () string {
return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
.LastStreamID, .ErrCode, .DebugData)
}
func ( error) bool {
if == io.EOF {
return true
}
, := .(*net.OpError)
return && .Op == "read"
}
func ( *clientConnReadLoop) () {
:= .cc
.t.connPool().MarkDead()
defer .closeConn()
defer close(.readerDone)
if .idleTimer != nil {
.idleTimer.Stop()
}
:= .readerErr
.mu.Lock()
if .goAway != nil && isEOFOrNetReadError() {
= GoAwayError{
LastStreamID: .goAway.LastStreamID,
ErrCode: .goAway.ErrCode,
DebugData: .goAwayDebug,
}
} else if == io.EOF {
= io.ErrUnexpectedEOF
}
.closed = true
for , := range .streams {
select {
case <-.peerClosed:
default:
.abortStreamLocked()
}
}
.cond.Broadcast()
.mu.Unlock()
}
func ( *ClientConn) ( error) {
:= .t.CountError
if == nil || == nil {
return
}
if , := .(ConnectionError); {
:= ErrCode()
(fmt.Sprintf("read_frame_conn_error_%s", .stringToken()))
return
}
if errors.Is(, io.EOF) {
("read_frame_eof")
return
}
if errors.Is(, io.ErrUnexpectedEOF) {
("read_frame_unexpected_eof")
return
}
if errors.Is(, ErrFrameTooLarge) {
("read_frame_too_large")
return
}
("read_frame_other")
}
func ( *clientConnReadLoop) () error {
:= .cc
:= false
:= .t.ReadIdleTimeout
var *time.Timer
if != 0 {
= time.AfterFunc(, .healthCheck)
defer .Stop()
}
for {
, := .fr.ReadFrame()
if != nil {
.Reset()
}
if != nil {
.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", , , )
}
if , := .(StreamError); {
if := .streamByID(.StreamID); != nil {
if .Cause == nil {
.Cause = .fr.errDetail
}
.endStreamError(, )
}
continue
} else if != nil {
.countReadFrameError()
return
}
if VerboseLogs {
.vlogf("http2: Transport received %s", summarizeFrame())
}
if ! {
if , := .(*SettingsFrame); ! {
.logf("protocol error: received %T before a SETTINGS frame", )
return ConnectionError(ErrCodeProtocol)
}
= true
}
switch f := .(type) {
case *MetaHeadersFrame:
= .processHeaders()
case *DataFrame:
= .processData()
case *GoAwayFrame:
= .processGoAway()
case *RSTStreamFrame:
= .processResetStream()
case *SettingsFrame:
= .processSettings()
case *PushPromiseFrame:
= .processPushPromise()
case *WindowUpdateFrame:
= .processWindowUpdate()
case *PingFrame:
= .processPing()
default:
.logf("Transport: unhandled response frame type %T", )
}
if != nil {
if VerboseLogs {
.vlogf("http2: Transport conn %p received error from processing frame %v: %v", , summarizeFrame(), )
}
return
}
}
}
func ( *clientConnReadLoop) ( *MetaHeadersFrame) error {
:= .streamByID(.StreamID)
if == nil {
return nil
}
if .readClosed {
.endStreamError(, StreamError{
StreamID: .StreamID,
Code: ErrCodeProtocol,
Cause: errors.New("protocol error: headers after END_STREAM"),
})
return nil
}
if !.firstByte {
if .trace != nil {
traceFirstResponseByte(.trace)
}
.firstByte = true
}
if !.pastHeaders {
.pastHeaders = true
} else {
return .processTrailers(, )
}
, := .handleResponse(, )
if != nil {
if , := .(ConnectionError); {
return
}
.endStreamError(, StreamError{
StreamID: .StreamID,
Code: ErrCodeProtocol,
Cause: ,
})
return nil
}
if == nil {
return nil
}
.resTrailer = &.Trailer
.res =
close(.respHeaderRecv)
if .StreamEnded() {
.endStream()
}
return nil
}
func ( *clientConnReadLoop) ( *clientStream, *MetaHeadersFrame) (*http.Response, error) {
if .Truncated {
return nil, errResponseHeaderListSize
}
:= .PseudoValue("status")
if == "" {
return nil, errors.New("malformed response from server: missing status pseudo header")
}
, := strconv.Atoi()
if != nil {
return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
}
:= .RegularFields()
:= make([]string, len())
:= make(http.Header, len())
:= &http.Response{
Proto: "HTTP/2.0",
ProtoMajor: 2,
Header: ,
StatusCode: ,
Status: + " " + http.StatusText(),
}
for , := range {
:= canonicalHeader(.Name)
if == "Trailer" {
:= .Trailer
if == nil {
= make(http.Header)
.Trailer =
}
foreachHeaderElement(.Value, func( string) {
[canonicalHeader()] = nil
})
} else {
:= []
if == nil && len() > 0 {
, = [:1:1], [1:]
[0] = .Value
[] =
} else {
[] = append(, .Value)
}
}
}
if >= 100 && <= 199 {
if .StreamEnded() {
return nil, errors.New("1xx informational response with END_STREAM flag")
}
.num1xx++
const = 5
if .num1xx > {
return nil, errors.New("http2: too many 1xx informational responses")
}
if := .get1xxTraceFunc(); != nil {
if := (, textproto.MIMEHeader()); != nil {
return nil,
}
}
if == 100 {
traceGot100Continue(.trace)
select {
case .on100 <- struct{}{}:
default:
}
}
.pastHeaders = false
return nil, nil
}
.ContentLength = -1
if := .Header["Content-Length"]; len() == 1 {
if , := strconv.ParseUint([0], 10, 63); == nil {
.ContentLength = int64()
} else {
}
} else if len() > 1 {
} else if .StreamEnded() && !.isHead {
.ContentLength = 0
}
if .isHead {
.Body = noBody
return , nil
}
if .StreamEnded() {
if .ContentLength > 0 {
.Body = missingBody{}
} else {
.Body = noBody
}
return , nil
}
.bufPipe.setBuffer(&dataBuffer{expected: .ContentLength})
.bytesRemain = .ContentLength
.Body = transportResponseBody{}
if .requestedGzip && asciiEqualFold(.Header.Get("Content-Encoding"), "gzip") {
.Header.Del("Content-Encoding")
.Header.Del("Content-Length")
.ContentLength = -1
.Body = &gzipReader{body: .Body}
.Uncompressed = true
}
return , nil
}
func ( *clientConnReadLoop) ( *clientStream, *MetaHeadersFrame) error {
if .pastTrailers {
return ConnectionError(ErrCodeProtocol)
}
.pastTrailers = true
if !.StreamEnded() {
return ConnectionError(ErrCodeProtocol)
}
if len(.PseudoFields()) > 0 {
return ConnectionError(ErrCodeProtocol)
}
:= make(http.Header)
for , := range .RegularFields() {
:= canonicalHeader(.Name)
[] = append([], .Value)
}
.trailer =
.endStream()
return nil
}
type transportResponseBody struct {
cs *clientStream
}
func ( transportResponseBody) ( []byte) ( int, error) {
:= .cs
:= .cc
if .readErr != nil {
return 0, .readErr
}
, = .cs.bufPipe.Read()
if .bytesRemain != -1 {
if int64() > .bytesRemain {
= int(.bytesRemain)
if == nil {
= errors.New("net/http: server replied with more than declared Content-Length; truncated")
.abortStream()
}
.readErr =
return int(.bytesRemain),
}
.bytesRemain -= int64()
if == io.EOF && .bytesRemain > 0 {
= io.ErrUnexpectedEOF
.readErr =
return ,
}
}
if == 0 {
return
}
.mu.Lock()
:= .inflow.add()
var int32
if == nil {
= .inflow.add()
}
.mu.Unlock()
if != 0 || != 0 {
.wmu.Lock()
defer .wmu.Unlock()
if != 0 {
.fr.WriteWindowUpdate(0, mustUint31())
}
if != 0 {
.fr.WriteWindowUpdate(.ID, mustUint31())
}
.bw.Flush()
}
return
}
var errClosedResponseBody = errors.New("http2: response body closed")
func ( transportResponseBody) () error {
:= .cs
:= .cc
:= .bufPipe.Len()
if > 0 {
.mu.Lock()
:= .inflow.add()
.mu.Unlock()
.wmu.Lock()
if > 0 {
.fr.WriteWindowUpdate(0, uint32())
}
.bw.Flush()
.wmu.Unlock()
}
.bufPipe.BreakWithError(errClosedResponseBody)
.abortStream(errClosedResponseBody)
select {
case <-.donec:
case <-.ctx.Done():
return nil
case <-.reqCancel:
return errRequestCanceled
}
return nil
}
func ( *clientConnReadLoop) ( *DataFrame) error {
:= .cc
:= .streamByID(.StreamID)
:= .Data()
if == nil {
.mu.Lock()
:= .nextStreamID
.mu.Unlock()
if .StreamID >= {
.logf("http2: Transport received unsolicited DATA frame; closing connection")
return ConnectionError(ErrCodeProtocol)
}
if .Length > 0 {
.mu.Lock()
:= .inflow.take(.Length)
:= .inflow.add(int(.Length))
.mu.Unlock()
if ! {
return ConnectionError(ErrCodeFlowControl)
}
if > 0 {
.wmu.Lock()
.fr.WriteWindowUpdate(0, uint32())
.bw.Flush()
.wmu.Unlock()
}
}
return nil
}
if .readClosed {
.logf("protocol error: received DATA after END_STREAM")
.endStreamError(, StreamError{
StreamID: .StreamID,
Code: ErrCodeProtocol,
})
return nil
}
if !.firstByte {
.logf("protocol error: received DATA before a HEADERS frame")
.endStreamError(, StreamError{
StreamID: .StreamID,
Code: ErrCodeProtocol,
})
return nil
}
if .Length > 0 {
if .isHead && len() > 0 {
.logf("protocol error: received DATA on a HEAD request")
.endStreamError(, StreamError{
StreamID: .StreamID,
Code: ErrCodeProtocol,
})
return nil
}
.mu.Lock()
if !takeInflows(&.inflow, &.inflow, .Length) {
.mu.Unlock()
return ConnectionError(ErrCodeFlowControl)
}
var int
if := int(.Length) - len(); > 0 {
+=
}
:= false
var error
if len() > 0 {
if _, = .bufPipe.Write(); != nil {
= true
+= len()
}
}
:= .inflow.add()
var int32
if ! {
= .inflow.add()
}
.mu.Unlock()
if > 0 || > 0 {
.wmu.Lock()
if > 0 {
.fr.WriteWindowUpdate(0, uint32())
}
if > 0 {
.fr.WriteWindowUpdate(.ID, uint32())
}
.bw.Flush()
.wmu.Unlock()
}
if != nil {
.endStreamError(, )
return nil
}
}
if .StreamEnded() {
.endStream()
}
return nil
}
func ( *clientConnReadLoop) ( *clientStream) {
if !.readClosed {
.readClosed = true
.cc.mu.Lock()
defer .cc.mu.Unlock()
.bufPipe.closeWithErrorAndCode(io.EOF, .copyTrailers)
close(.peerClosed)
}
}
func ( *clientConnReadLoop) ( *clientStream, error) {
.readAborted = true
.abortStream()
}
func ( *clientConnReadLoop) ( uint32) *clientStream {
.cc.mu.Lock()
defer .cc.mu.Unlock()
:= .cc.streams[]
if != nil && !.readAborted {
return
}
return nil
}
func ( *clientStream) () {
for , := range .trailer {
:= .resTrailer
if * == nil {
* = make(http.Header)
}
(*)[] =
}
}
func ( *clientConnReadLoop) ( *GoAwayFrame) error {
:= .cc
.t.connPool().MarkDead()
if .ErrCode != 0 {
.vlogf("transport got GOAWAY with error code = %v", .ErrCode)
if := .t.CountError; != nil {
("recv_goaway_" + .ErrCode.stringToken())
}
}
.setGoAway()
return nil
}
func ( *clientConnReadLoop) ( *SettingsFrame) error {
:= .cc
.wmu.Lock()
defer .wmu.Unlock()
if := .processSettingsNoWrite(); != nil {
return
}
if !.IsAck() {
.fr.WriteSettingsAck()
.bw.Flush()
}
return nil
}
func ( *clientConnReadLoop) ( *SettingsFrame) error {
:= .cc
.mu.Lock()
defer .mu.Unlock()
if .IsAck() {
if .wantSettingsAck {
.wantSettingsAck = false
return nil
}
return ConnectionError(ErrCodeProtocol)
}
var bool
:= .ForeachSetting(func( Setting) error {
switch .ID {
case SettingMaxFrameSize:
.maxFrameSize = .Val
case SettingMaxConcurrentStreams:
.maxConcurrentStreams = .Val
= true
case SettingMaxHeaderListSize:
.peerMaxHeaderListSize = uint64(.Val)
case SettingInitialWindowSize:
if .Val > math.MaxInt32 {
return ConnectionError(ErrCodeFlowControl)
}
:= int32(.Val) - int32(.initialWindowSize)
for , := range .streams {
.flow.add()
}
.cond.Broadcast()
.initialWindowSize = .Val
case SettingHeaderTableSize:
.henc.SetMaxDynamicTableSize(.Val)
.peerMaxHeaderTableSize = .Val
default:
.vlogf("Unhandled Setting: %v", )
}
return nil
})
if != nil {
return
}
if !.seenSettings {
if ! {
.maxConcurrentStreams = defaultMaxConcurrentStreams
}
.seenSettings = true
}
return nil
}
func ( *clientConnReadLoop) ( *WindowUpdateFrame) error {
:= .cc
:= .streamByID(.StreamID)
if .StreamID != 0 && == nil {
return nil
}
.mu.Lock()
defer .mu.Unlock()
:= &.flow
if != nil {
= &.flow
}
if !.add(int32(.Increment)) {
return ConnectionError(ErrCodeFlowControl)
}
.cond.Broadcast()
return nil
}
func ( *clientConnReadLoop) ( *RSTStreamFrame) error {
:= .streamByID(.StreamID)
if == nil {
return nil
}
:= streamError(.ID, .ErrCode)
.Cause = errFromPeer
if .ErrCode == ErrCodeProtocol {
.cc.SetDoNotReuse()
}
if := .cc.t.CountError; != nil {
("recv_rststream_" + .ErrCode.stringToken())
}
.abortStream()
.bufPipe.CloseWithError()
return nil
}
func ( *ClientConn) ( context.Context) error {
:= make(chan struct{})
var [8]byte
for {
if , := rand.Read([:]); != nil {
return
}
.mu.Lock()
if , := .pings[]; ! {
.pings[] =
.mu.Unlock()
break
}
.mu.Unlock()
}
:= make(chan error, 1)
go func() {
.wmu.Lock()
defer .wmu.Unlock()
if := .fr.WritePing(false, ); != nil {
<-
return
}
if := .bw.Flush(); != nil {
<-
return
}
}()
select {
case <-:
return nil
case := <-:
return
case <-.Done():
return .Err()
case <-.readerDone:
return .readerErr
}
}
func ( *clientConnReadLoop) ( *PingFrame) error {
if .IsAck() {
:= .cc
.mu.Lock()
defer .mu.Unlock()
if , := .pings[.Data]; {
close()
delete(.pings, .Data)
}
return nil
}
:= .cc
.wmu.Lock()
defer .wmu.Unlock()
if := .fr.WritePing(true, .Data); != nil {
return
}
return .bw.Flush()
}
func ( *clientConnReadLoop) ( *PushPromiseFrame) error {
return ConnectionError(ErrCodeProtocol)
}
func ( *ClientConn) ( uint32, ErrCode, error) {
.wmu.Lock()
.fr.WriteRSTStream(, )
.bw.Flush()
.wmu.Unlock()
}
var (
errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
)
func ( *ClientConn) ( string, ...interface{}) {
.t.logf(, ...)
}
func ( *ClientConn) ( string, ...interface{}) {
.t.vlogf(, ...)
}
func ( *Transport) ( string, ...interface{}) {
if VerboseLogs {
.logf(, ...)
}
}
func ( *Transport) ( string, ...interface{}) {
log.Printf(, ...)
}
var noBody io.ReadCloser = noBodyReader{}
type noBodyReader struct{}
func (noBodyReader) () error { return nil }
func (noBodyReader) ([]byte) (int, error) { return 0, io.EOF }
type missingBody struct{}
func (missingBody) () error { return nil }
func (missingBody) ([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
func ( []string, string) bool {
for , := range {
if == {
return true
}
}
return false
}
type erringRoundTripper struct{ err error }
func ( erringRoundTripper) () error { return .err }
func ( erringRoundTripper) (*http.Request) (*http.Response, error) { return nil, .err }
type gzipReader struct {
_ incomparable
body io.ReadCloser
zr *gzip.Reader
zerr error
}
func ( *gzipReader) ( []byte) ( int, error) {
if .zerr != nil {
return 0, .zerr
}
if .zr == nil {
.zr, = gzip.NewReader(.body)
if != nil {
.zerr =
return 0,
}
}
return .zr.Read()
}
func ( *gzipReader) () error {
if := .body.Close(); != nil {
return
}
.zerr = fs.ErrClosed
return nil
}
type errorReader struct{ err error }
func ( errorReader) ( []byte) (int, error) { return 0, .err }
func ( *http.Request) bool {
return .Close || httpguts.HeaderValuesContainsToken(.Header["Connection"], "close")
}
func ( *http.Transport, noDialH2RoundTripper) ( error) {
defer func() {
if := recover(); != nil {
= fmt.Errorf("%v", )
}
}()
.RegisterProtocol("https", )
return nil
}
type noDialH2RoundTripper struct{ *Transport }
func ( noDialH2RoundTripper) ( *http.Request) (*http.Response, error) {
, := .Transport.RoundTrip()
if isNoCachedConnError() {
return nil, http.ErrSkipAltProtocol
}
return ,
}
func ( *Transport) () time.Duration {
if .t1 != nil {
return .t1.IdleConnTimeout
}
return 0
}
func ( *http.Request, string) {
:= httptrace.ContextClientTrace(.Context())
if == nil || .GetConn == nil {
return
}
.GetConn()
}
func ( *http.Request, *ClientConn, bool) {
:= httptrace.ContextClientTrace(.Context())
if == nil || .GotConn == nil {
return
}
:= httptrace.GotConnInfo{Conn: .tconn}
.Reused =
.mu.Lock()
.WasIdle = len(.streams) == 0 &&
if .WasIdle && !.lastActive.IsZero() {
.IdleTime = time.Since(.lastActive)
}
.mu.Unlock()
.GotConn()
}
func ( *httptrace.ClientTrace) {
if != nil && .WroteHeaders != nil {
.WroteHeaders()
}
}
func ( *httptrace.ClientTrace) {
if != nil && .Got100Continue != nil {
.Got100Continue()
}
}
func ( *httptrace.ClientTrace) {
if != nil && .Wait100Continue != nil {
.Wait100Continue()
}
}
func ( *httptrace.ClientTrace, error) {
if != nil && .WroteRequest != nil {
.WroteRequest(httptrace.WroteRequestInfo{Err: })
}
}
func ( *httptrace.ClientTrace) {
if != nil && .GotFirstResponseByte != nil {
.GotFirstResponseByte()
}
}