package http2
import (
)
const (
prefaceTimeout = 10 * time.Second
firstSettingsTimeout = 2 * time.Second
handlerChunkWriteSize = 4 << 10
defaultMaxStreams = 250
maxQueuedControlFrames = 10000
)
var (
errClientDisconnected = errors.New("client disconnected")
errClosedBody = errors.New("body closed by handler")
errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
errStreamClosed = errors.New("http2: stream closed")
)
var responseWriterStatePool = sync.Pool{
New: func() interface{} {
:= &responseWriterState{}
.bw = bufio.NewWriterSize(chunkWriter{}, handlerChunkWriteSize)
return
},
}
var (
testHookOnConn func()
testHookGetServerConn func(*serverConn)
testHookOnPanicMu *sync.Mutex
testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
)
type Server struct {
MaxHandlers int
MaxConcurrentStreams uint32
MaxDecoderHeaderTableSize uint32
MaxEncoderHeaderTableSize uint32
MaxReadFrameSize uint32
PermitProhibitedCipherSuites bool
IdleTimeout time.Duration
MaxUploadBufferPerConnection int32
MaxUploadBufferPerStream int32
NewWriteScheduler func() WriteScheduler
CountError func(errType string)
state *serverInternalState
}
func ( *Server) () int32 {
if .MaxUploadBufferPerConnection >= initialWindowSize {
return .MaxUploadBufferPerConnection
}
return 1 << 20
}
func ( *Server) () int32 {
if .MaxUploadBufferPerStream > 0 {
return .MaxUploadBufferPerStream
}
return 1 << 20
}
func ( *Server) () uint32 {
if := .MaxReadFrameSize; >= minMaxFrameSize && <= maxFrameSize {
return
}
return defaultMaxReadFrameSize
}
func ( *Server) () uint32 {
if := .MaxConcurrentStreams; > 0 {
return
}
return defaultMaxStreams
}
func ( *Server) () uint32 {
if := .MaxDecoderHeaderTableSize; > 0 {
return
}
return initialHeaderTableSize
}
func ( *Server) () uint32 {
if := .MaxEncoderHeaderTableSize; > 0 {
return
}
return initialHeaderTableSize
}
func ( *Server) () int {
return maxQueuedControlFrames
}
type serverInternalState struct {
mu sync.Mutex
activeConns map[*serverConn]struct{}
}
func ( *serverInternalState) ( *serverConn) {
if == nil {
return
}
.mu.Lock()
.activeConns[] = struct{}{}
.mu.Unlock()
}
func ( *serverInternalState) ( *serverConn) {
if == nil {
return
}
.mu.Lock()
delete(.activeConns, )
.mu.Unlock()
}
func ( *serverInternalState) () {
if == nil {
return
}
.mu.Lock()
for := range .activeConns {
.startGracefulShutdown()
}
.mu.Unlock()
}
func ( *http.Server, *Server) error {
if == nil {
panic("nil *http.Server")
}
if == nil {
= new(Server)
}
.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
if , := , ; .IdleTimeout == 0 {
if .IdleTimeout != 0 {
.IdleTimeout = .IdleTimeout
} else {
.IdleTimeout = .ReadTimeout
}
}
.RegisterOnShutdown(.state.startGracefulShutdown)
if .TLSConfig == nil {
.TLSConfig = new(tls.Config)
} else if .TLSConfig.CipherSuites != nil && .TLSConfig.MinVersion < tls.VersionTLS13 {
:= false
for , := range .TLSConfig.CipherSuites {
switch {
case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
= true
}
}
if ! {
return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256)")
}
}
.TLSConfig.PreferServerCipherSuites = true
if !strSliceContains(.TLSConfig.NextProtos, NextProtoTLS) {
.TLSConfig.NextProtos = append(.TLSConfig.NextProtos, NextProtoTLS)
}
if !strSliceContains(.TLSConfig.NextProtos, "http/1.1") {
.TLSConfig.NextProtos = append(.TLSConfig.NextProtos, "http/1.1")
}
if .TLSNextProto == nil {
.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
}
:= func( *http.Server, *tls.Conn, http.Handler) {
if testHookOnConn != nil {
testHookOnConn()
}
var context.Context
type interface {
() context.Context
}
if , := .(); {
= .()
}
.ServeConn(, &ServeConnOpts{
Context: ,
Handler: ,
BaseConfig: ,
})
}
.TLSNextProto[NextProtoTLS] =
return nil
}
type ServeConnOpts struct {
Context context.Context
BaseConfig *http.Server
Handler http.Handler
UpgradeRequest *http.Request
Settings []byte
SawClientPreface bool
}
func ( *ServeConnOpts) () context.Context {
if != nil && .Context != nil {
return .Context
}
return context.Background()
}
func ( *ServeConnOpts) () *http.Server {
if != nil && .BaseConfig != nil {
return .BaseConfig
}
return new(http.Server)
}
func ( *ServeConnOpts) () http.Handler {
if != nil {
if .Handler != nil {
return .Handler
}
if .BaseConfig != nil && .BaseConfig.Handler != nil {
return .BaseConfig.Handler
}
}
return http.DefaultServeMux
}
func ( *Server) ( net.Conn, *ServeConnOpts) {
, := serverConnBaseContext(, )
defer ()
:= &serverConn{
srv: ,
hs: .baseConfig(),
conn: ,
baseCtx: ,
remoteAddrStr: .RemoteAddr().String(),
bw: newBufferedWriter(),
handler: .handler(),
streams: make(map[uint32]*stream),
readFrameCh: make(chan readFrameResult),
wantWriteFrameCh: make(chan FrameWriteRequest, 8),
serveMsgCh: make(chan interface{}, 8),
wroteFrameCh: make(chan frameWriteResult, 1),
bodyReadCh: make(chan bodyReadMsg),
doneServing: make(chan struct{}),
clientMaxStreams: math.MaxUint32,
advMaxStreams: .maxConcurrentStreams(),
initialStreamSendWindowSize: initialWindowSize,
maxFrameSize: initialMaxFrameSize,
serveG: newGoroutineLock(),
pushEnabled: true,
sawClientPreface: .SawClientPreface,
}
.state.registerConn()
defer .state.unregisterConn()
if .hs.WriteTimeout != 0 {
.conn.SetWriteDeadline(time.Time{})
}
if .NewWriteScheduler != nil {
.writeSched = .NewWriteScheduler()
} else {
.writeSched = NewPriorityWriteScheduler(nil)
}
.flow.add(initialWindowSize)
.inflow.init(initialWindowSize)
.hpackEncoder = hpack.NewEncoder(&.headerWriteBuf)
.hpackEncoder.SetMaxDynamicTableSizeLimit(.maxEncoderHeaderTableSize())
:= NewFramer(.bw, )
if .CountError != nil {
.countError = .CountError
}
.ReadMetaHeaders = hpack.NewDecoder(.maxDecoderHeaderTableSize(), nil)
.MaxHeaderListSize = .maxHeaderListSize()
.SetMaxReadFrameSize(.maxReadFrameSize())
.framer =
if , := .(connectionStater); {
.tlsState = new(tls.ConnectionState)
*.tlsState = .ConnectionState()
if .tlsState.Version < tls.VersionTLS12 {
.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
return
}
if .tlsState.ServerName == "" {
}
if !.PermitProhibitedCipherSuites && isBadCipher(.tlsState.CipherSuite) {
.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", .tlsState.CipherSuite))
return
}
}
if .Settings != nil {
:= &SettingsFrame{
FrameHeader: FrameHeader{valid: true},
p: .Settings,
}
if := .ForeachSetting(.processSetting); != nil {
.rejectConn(ErrCodeProtocol, "invalid settings")
return
}
.Settings = nil
}
if := testHookGetServerConn; != nil {
()
}
if .UpgradeRequest != nil {
.upgradeRequest(.UpgradeRequest)
.UpgradeRequest = nil
}
.serve()
}
func ( net.Conn, *ServeConnOpts) ( context.Context, func()) {
, = context.WithCancel(.context())
= context.WithValue(, http.LocalAddrContextKey, .LocalAddr())
if := .baseConfig(); != nil {
= context.WithValue(, http.ServerContextKey, )
}
return
}
func ( *serverConn) ( ErrCode, string) {
.vlogf("http2: server rejecting conn: %v, %s", , )
.framer.WriteGoAway(0, , []byte())
.bw.Flush()
.conn.Close()
}
type serverConn struct {
srv *Server
hs *http.Server
conn net.Conn
bw *bufferedWriter
handler http.Handler
baseCtx context.Context
framer *Framer
doneServing chan struct{}
readFrameCh chan readFrameResult
wantWriteFrameCh chan FrameWriteRequest
wroteFrameCh chan frameWriteResult
bodyReadCh chan bodyReadMsg
serveMsgCh chan interface{}
flow outflow
inflow inflow
tlsState *tls.ConnectionState
remoteAddrStr string
writeSched WriteScheduler
serveG goroutineLock
pushEnabled bool
sawClientPreface bool
sawFirstSettings bool
needToSendSettingsAck bool
unackedSettings int
queuedControlFrames int
clientMaxStreams uint32
advMaxStreams uint32
curClientStreams uint32
curPushedStreams uint32
maxClientStreamID uint32
maxPushPromiseID uint32
streams map[uint32]*stream
initialStreamSendWindowSize int32
maxFrameSize int32
peerMaxHeaderListSize uint32
canonHeader map[string]string
canonHeaderKeysSize int
writingFrame bool
writingFrameAsync bool
needsFrameFlush bool
inGoAway bool
inFrameScheduleLoop bool
needToSendGoAway bool
goAwayCode ErrCode
shutdownTimer *time.Timer
idleTimer *time.Timer
headerWriteBuf bytes.Buffer
hpackEncoder *hpack.Encoder
shutdownOnce sync.Once
}
func ( *serverConn) () uint32 {
:= .hs.MaxHeaderBytes
if <= 0 {
= http.DefaultMaxHeaderBytes
}
const = 32
const = 10
return uint32( + *)
}
func ( *serverConn) () uint32 {
.serveG.check()
return .curClientStreams + .curPushedStreams
}
type stream struct {
sc *serverConn
id uint32
body *pipe
cw closeWaiter
ctx context.Context
cancelCtx func()
bodyBytes int64
declBodyBytes int64
flow outflow
inflow inflow
state streamState
resetQueued bool
gotTrailerHeader bool
wroteHeaders bool
readDeadline *time.Timer
writeDeadline *time.Timer
closeErr error
trailer http.Header
reqTrailer http.Header
}
func ( *serverConn) () *Framer { return .framer }
func ( *serverConn) () error { return .conn.Close() }
func ( *serverConn) () error { return .bw.Flush() }
func ( *serverConn) () (*hpack.Encoder, *bytes.Buffer) {
return .hpackEncoder, &.headerWriteBuf
}
func ( *serverConn) ( uint32) (streamState, *stream) {
.serveG.check()
if , := .streams[]; {
return .state,
}
if %2 == 1 {
if <= .maxClientStreamID {
return stateClosed, nil
}
} else {
if <= .maxPushPromiseID {
return stateClosed, nil
}
}
return stateIdle, nil
}
func ( *serverConn) ( http.ConnState) {
if .hs.ConnState != nil {
.hs.ConnState(.conn, )
}
}
func ( *serverConn) ( string, ...interface{}) {
if VerboseLogs {
.logf(, ...)
}
}
func ( *serverConn) ( string, ...interface{}) {
if := .hs.ErrorLog; != nil {
.Printf(, ...)
} else {
log.Printf(, ...)
}
}
func ( error) uintptr {
if := reflect.ValueOf(); .Kind() == reflect.Uintptr {
return uintptr(.Uint())
}
return 0
}
func ( error) bool {
if == nil {
return false
}
:= .Error()
if strings.Contains(, "use of closed network connection") {
return true
}
if runtime.GOOS == "windows" {
if , := .(*net.OpError); && .Op == "read" {
if , := .Err.(*os.SyscallError); && .Syscall == "wsarecv" {
const = 10053
const = 10054
if := errno(.Err); == || == {
return true
}
}
}
}
return false
}
func ( *serverConn) ( error, string, ...interface{}) {
if == nil {
return
}
if == io.EOF || == io.ErrUnexpectedEOF || isClosedConnError() || == errPrefaceTimeout {
.vlogf(, ...)
} else {
.logf(, ...)
}
}
const maxCachedCanonicalHeadersKeysSize = 2048
func ( *serverConn) ( string) string {
.serveG.check()
buildCommonHeaderMapsOnce()
, := commonCanonHeader[]
if {
return
}
, = .canonHeader[]
if {
return
}
if .canonHeader == nil {
.canonHeader = make(map[string]string)
}
= http.CanonicalHeaderKey()
:= 100 + len()*2
if .canonHeaderKeysSize+ <= maxCachedCanonicalHeadersKeysSize {
.canonHeader[] =
.canonHeaderKeysSize +=
}
return
}
type readFrameResult struct {
f Frame
err error
readMore func()
}
func ( *serverConn) () {
:= make(gate)
:= .Done
for {
, := .framer.ReadFrame()
select {
case .readFrameCh <- readFrameResult{, , }:
case <-.doneServing:
return
}
select {
case <-:
case <-.doneServing:
return
}
if terminalReadFrameError() {
return
}
}
}
type frameWriteResult struct {
_ incomparable
wr FrameWriteRequest
err error
}
func ( *serverConn) ( FrameWriteRequest, *writeData) {
var error
if == nil {
= .write.writeFrame()
} else {
= .framer.endWrite()
}
.wroteFrameCh <- frameWriteResult{wr: , err: }
}
func ( *serverConn) () {
.serveG.check()
for , := range .streams {
.closeStream(, errClientDisconnected)
}
}
func ( *serverConn) () {
.serveG.check()
if := .shutdownTimer; != nil {
.Stop()
}
}
func ( *serverConn) () {
if testHookOnPanicMu != nil {
testHookOnPanicMu.Lock()
defer testHookOnPanicMu.Unlock()
}
if testHookOnPanic != nil {
if := recover(); != nil {
if testHookOnPanic(, ) {
panic()
}
}
}
}
func ( *serverConn) () {
.serveG.check()
defer .notePanic()
defer .conn.Close()
defer .closeAllStreamsOnConnClose()
defer .stopShutdownTimer()
defer close(.doneServing)
if VerboseLogs {
.vlogf("http2: server connection from %v on %p", .conn.RemoteAddr(), .hs)
}
.writeFrame(FrameWriteRequest{
write: writeSettings{
{SettingMaxFrameSize, .srv.maxReadFrameSize()},
{SettingMaxConcurrentStreams, .advMaxStreams},
{SettingMaxHeaderListSize, .maxHeaderListSize()},
{SettingHeaderTableSize, .srv.maxDecoderHeaderTableSize()},
{SettingInitialWindowSize, uint32(.srv.initialStreamRecvWindowSize())},
},
})
.unackedSettings++
if := .srv.initialConnRecvWindowSize() - initialWindowSize; > 0 {
.sendWindowUpdate(nil, int())
}
if := .readPreface(); != nil {
.condlogf(, "http2: server: error reading preface from client %v: %v", .conn.RemoteAddr(), )
return
}
.setConnState(http.StateActive)
.setConnState(http.StateIdle)
if .srv.IdleTimeout != 0 {
.idleTimer = time.AfterFunc(.srv.IdleTimeout, .onIdleTimer)
defer .idleTimer.Stop()
}
go .readFrames()
:= time.AfterFunc(firstSettingsTimeout, .onSettingsTimer)
defer .Stop()
:= 0
for {
++
select {
case := <-.wantWriteFrameCh:
if , := .write.(StreamError); {
.resetStream()
break
}
.writeFrame()
case := <-.wroteFrameCh:
.wroteFrame()
case := <-.readFrameCh:
if .writingFrameAsync {
select {
case := <-.wroteFrameCh:
.wroteFrame()
default:
}
}
if !.processFrameFromReader() {
return
}
.readMore()
if != nil {
.Stop()
= nil
}
case := <-.bodyReadCh:
.noteBodyRead(.st, .n)
case := <-.serveMsgCh:
switch v := .(type) {
case func(int):
()
case *serverMessage:
switch {
case settingsTimerMsg:
.logf("timeout waiting for SETTINGS frames from %v", .conn.RemoteAddr())
return
case idleTimerMsg:
.vlogf("connection is idle")
.goAway(ErrCodeNo)
case shutdownTimerMsg:
.vlogf("GOAWAY close timer fired; closing conn from %v", .conn.RemoteAddr())
return
case gracefulShutdownMsg:
.startGracefulShutdownInternal()
default:
panic("unknown timer")
}
case *startPushRequest:
.startPush()
case func(*serverConn):
()
default:
panic(fmt.Sprintf("unexpected type %T", ))
}
}
if .queuedControlFrames > .srv.maxQueuedControlFrames() {
.vlogf("http2: too many control frames in send queue, closing connection")
return
}
:= .inGoAway && !.needToSendGoAway && !.writingFrame
:= .goAwayCode == ErrCodeNo && .curOpenStreams() == 0
if && .shutdownTimer == nil && (.goAwayCode != ErrCodeNo || ) {
.shutDownIn(goAwayTimeout)
}
}
}
func ( *serverConn) ( <-chan struct{}, chan struct{}) {
select {
case <-.doneServing:
case <-:
close()
}
}
type serverMessage int
var (
settingsTimerMsg = new(serverMessage)
idleTimerMsg = new(serverMessage)
shutdownTimerMsg = new(serverMessage)
gracefulShutdownMsg = new(serverMessage)
)
func ( *serverConn) () { .sendServeMsg(settingsTimerMsg) }
func ( *serverConn) () { .sendServeMsg(idleTimerMsg) }
func ( *serverConn) () { .sendServeMsg(shutdownTimerMsg) }
func ( *serverConn) ( interface{}) {
.serveG.checkNotOn()
select {
case .serveMsgCh <- :
case <-.doneServing:
}
}
var errPrefaceTimeout = errors.New("timeout waiting for client preface")
func ( *serverConn) () error {
if .sawClientPreface {
return nil
}
:= make(chan error, 1)
go func() {
:= make([]byte, len(ClientPreface))
if , := io.ReadFull(.conn, ); != nil {
<-
} else if !bytes.Equal(, clientPreface) {
<- fmt.Errorf("bogus greeting %q", )
} else {
<- nil
}
}()
:= time.NewTimer(prefaceTimeout)
defer .Stop()
select {
case <-.C:
return errPrefaceTimeout
case := <-:
if == nil {
if VerboseLogs {
.vlogf("http2: server: client %v said hello", .conn.RemoteAddr())
}
}
return
}
}
var errChanPool = sync.Pool{
New: func() interface{} { return make(chan error, 1) },
}
var writeDataPool = sync.Pool{
New: func() interface{} { return new(writeData) },
}
func ( *serverConn) ( *stream, []byte, bool) error {
:= errChanPool.Get().(chan error)
:= writeDataPool.Get().(*writeData)
* = writeData{.id, , }
:= .writeFrameFromHandler(FrameWriteRequest{
write: ,
stream: ,
done: ,
})
if != nil {
return
}
var bool
select {
case = <-:
= true
case <-.doneServing:
return errClientDisconnected
case <-.cw:
select {
case = <-:
= true
default:
return errStreamClosed
}
}
errChanPool.Put()
if {
writeDataPool.Put()
}
return
}
func ( *serverConn) ( FrameWriteRequest) error {
.serveG.checkNotOn()
select {
case .wantWriteFrameCh <- :
return nil
case <-.doneServing:
return errClientDisconnected
}
}
func ( *serverConn) ( FrameWriteRequest) {
.serveG.check()
var bool
if .StreamID() != 0 {
, := .write.(StreamError)
if , := .state(.StreamID()); == stateClosed && ! {
= true
}
}
switch .write.(type) {
case *writeResHeaders:
.stream.wroteHeaders = true
case write100ContinueHeadersFrame:
if .stream.wroteHeaders {
if .done != nil {
panic("wr.done != nil for write100ContinueHeadersFrame")
}
= true
}
}
if ! {
if .isControl() {
.queuedControlFrames++
if .queuedControlFrames < 0 {
.conn.Close()
}
}
.writeSched.Push()
}
.scheduleFrameWrite()
}
func ( *serverConn) ( FrameWriteRequest) {
.serveG.check()
if .writingFrame {
panic("internal error: can only be writing one frame at a time")
}
:= .stream
if != nil {
switch .state {
case stateHalfClosedLocal:
switch .write.(type) {
case StreamError, handlerPanicRST, writeWindowUpdate:
default:
panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", ))
}
case stateClosed:
panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", ))
}
}
if , := .write.(*writePushPromise); {
var error
.promisedID, = .allocatePromisedID()
if != nil {
.writingFrameAsync = false
.replyToWriter()
return
}
}
.writingFrame = true
.needsFrameFlush = true
if .write.staysWithinBuffer(.bw.Available()) {
.writingFrameAsync = false
:= .write.writeFrame()
.wroteFrame(frameWriteResult{wr: , err: })
} else if , := .write.(*writeData); {
.framer.startWriteDataPadded(.streamID, .endStream, .p, nil)
.writingFrameAsync = true
go .writeFrameAsync(, )
} else {
.writingFrameAsync = true
go .writeFrameAsync(, nil)
}
}
var errHandlerPanicked = errors.New("http2: handler panicked")
func ( *serverConn) ( frameWriteResult) {
.serveG.check()
if !.writingFrame {
panic("internal error: expected to be already writing a frame")
}
.writingFrame = false
.writingFrameAsync = false
:= .wr
if writeEndsStream(.write) {
:= .stream
if == nil {
panic("internal error: expecting non-nil stream")
}
switch .state {
case stateOpen:
.state = stateHalfClosedLocal
.resetStream(streamError(.id, ErrCodeNo))
case stateHalfClosedRemote:
.closeStream(, errHandlerComplete)
}
} else {
switch v := .write.(type) {
case StreamError:
if , := .streams[.StreamID]; {
.closeStream(, )
}
case handlerPanicRST:
.closeStream(.stream, errHandlerPanicked)
}
}
.replyToWriter(.err)
.scheduleFrameWrite()
}
func ( *serverConn) () {
.serveG.check()
if .writingFrame || .inFrameScheduleLoop {
return
}
.inFrameScheduleLoop = true
for !.writingFrameAsync {
if .needToSendGoAway {
.needToSendGoAway = false
.startFrameWrite(FrameWriteRequest{
write: &writeGoAway{
maxStreamID: .maxClientStreamID,
code: .goAwayCode,
},
})
continue
}
if .needToSendSettingsAck {
.needToSendSettingsAck = false
.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
continue
}
if !.inGoAway || .goAwayCode == ErrCodeNo {
if , := .writeSched.Pop(); {
if .isControl() {
.queuedControlFrames--
}
.startFrameWrite()
continue
}
}
if .needsFrameFlush {
.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
.needsFrameFlush = false
continue
}
break
}
.inFrameScheduleLoop = false
}
func ( *serverConn) () {
.serveG.checkNotOn()
.shutdownOnce.Do(func() { .sendServeMsg(gracefulShutdownMsg) })
}
var goAwayTimeout = 1 * time.Second
func ( *serverConn) () {
.goAway(ErrCodeNo)
}
func ( *serverConn) ( ErrCode) {
.serveG.check()
if .inGoAway {
if .goAwayCode == ErrCodeNo {
.goAwayCode =
}
return
}
.inGoAway = true
.needToSendGoAway = true
.goAwayCode =
.scheduleFrameWrite()
}
func ( *serverConn) ( time.Duration) {
.serveG.check()
.shutdownTimer = time.AfterFunc(, .onShutdownTimer)
}
func ( *serverConn) ( StreamError) {
.serveG.check()
.writeFrame(FrameWriteRequest{write: })
if , := .streams[.StreamID]; {
.resetQueued = true
}
}
func ( *serverConn) ( readFrameResult) bool {
.serveG.check()
:= .err
if != nil {
if == ErrFrameTooLarge {
.goAway(ErrCodeFrameSize)
return true
}
:= == io.EOF || == io.ErrUnexpectedEOF || isClosedConnError()
if {
return false
}
} else {
:= .f
if VerboseLogs {
.vlogf("http2: server read frame %v", summarizeFrame())
}
= .processFrame()
if == nil {
return true
}
}
switch ev := .(type) {
case StreamError:
.resetStream()
return true
case goAwayFlowError:
.goAway(ErrCodeFlowControl)
return true
case ConnectionError:
.logf("http2: server connection error from %v: %v", .conn.RemoteAddr(), )
.goAway(ErrCode())
return true
default:
if .err != nil {
.vlogf("http2: server closing client connection; error reading frame from client %s: %v", .conn.RemoteAddr(), )
} else {
.logf("http2: server closing client connection: %v", )
}
return false
}
}
func ( *serverConn) ( Frame) error {
.serveG.check()
if !.sawFirstSettings {
if , := .(*SettingsFrame); ! {
return .countError("first_settings", ConnectionError(ErrCodeProtocol))
}
.sawFirstSettings = true
}
if .inGoAway && (.goAwayCode != ErrCodeNo || .Header().StreamID > .maxClientStreamID) {
if , := .(*DataFrame); {
if !.inflow.take(.Length) {
return .countError("data_flow", streamError(.Header().StreamID, ErrCodeFlowControl))
}
.sendWindowUpdate(nil, int(.Length))
}
return nil
}
switch f := .(type) {
case *SettingsFrame:
return .processSettings()
case *MetaHeadersFrame:
return .processHeaders()
case *WindowUpdateFrame:
return .processWindowUpdate()
case *PingFrame:
return .processPing()
case *DataFrame:
return .processData()
case *RSTStreamFrame:
return .processResetStream()
case *PriorityFrame:
return .processPriority()
case *GoAwayFrame:
return .processGoAway()
case *PushPromiseFrame:
return .countError("push_promise", ConnectionError(ErrCodeProtocol))
default:
.vlogf("http2: server ignoring frame: %v", .Header())
return nil
}
}
func ( *serverConn) ( *PingFrame) error {
.serveG.check()
if .IsAck() {
return nil
}
if .StreamID != 0 {
return .countError("ping_on_stream", ConnectionError(ErrCodeProtocol))
}
.writeFrame(FrameWriteRequest{write: writePingAck{}})
return nil
}
func ( *serverConn) ( *WindowUpdateFrame) error {
.serveG.check()
switch {
case .StreamID != 0:
, := .state(.StreamID)
if == stateIdle {
return .countError("stream_idle", ConnectionError(ErrCodeProtocol))
}
if == nil {
return nil
}
if !.flow.add(int32(.Increment)) {
return .countError("bad_flow", streamError(.StreamID, ErrCodeFlowControl))
}
default:
if !.flow.add(int32(.Increment)) {
return goAwayFlowError{}
}
}
.scheduleFrameWrite()
return nil
}
func ( *serverConn) ( *RSTStreamFrame) error {
.serveG.check()
, := .state(.StreamID)
if == stateIdle {
return .countError("reset_idle_stream", ConnectionError(ErrCodeProtocol))
}
if != nil {
.cancelCtx()
.closeStream(, streamError(.StreamID, .ErrCode))
}
return nil
}
func ( *serverConn) ( *stream, error) {
.serveG.check()
if .state == stateIdle || .state == stateClosed {
panic(fmt.Sprintf("invariant; can't close stream in state %v", .state))
}
.state = stateClosed
if .readDeadline != nil {
.readDeadline.Stop()
}
if .writeDeadline != nil {
.writeDeadline.Stop()
}
if .isPushed() {
.curPushedStreams--
} else {
.curClientStreams--
}
delete(.streams, .id)
if len(.streams) == 0 {
.setConnState(http.StateIdle)
if .srv.IdleTimeout != 0 {
.idleTimer.Reset(.srv.IdleTimeout)
}
if h1ServerKeepAlivesDisabled(.hs) {
.startGracefulShutdownInternal()
}
}
if := .body; != nil {
.sendWindowUpdate(nil, .Len())
.CloseWithError()
}
if , := .(StreamError); {
if .Cause != nil {
= .Cause
} else {
= errStreamClosed
}
}
.closeErr =
.cw.Close()
.writeSched.CloseStream(.id)
}
func ( *serverConn) ( *SettingsFrame) error {
.serveG.check()
if .IsAck() {
.unackedSettings--
if .unackedSettings < 0 {
return .countError("ack_mystery", ConnectionError(ErrCodeProtocol))
}
return nil
}
if .NumSettings() > 100 || .HasDuplicates() {
return .countError("settings_big_or_dups", ConnectionError(ErrCodeProtocol))
}
if := .ForeachSetting(.processSetting); != nil {
return
}
.needToSendSettingsAck = true
.scheduleFrameWrite()
return nil
}
func ( *serverConn) ( Setting) error {
.serveG.check()
if := .Valid(); != nil {
return
}
if VerboseLogs {
.vlogf("http2: server processing setting %v", )
}
switch .ID {
case SettingHeaderTableSize:
.hpackEncoder.SetMaxDynamicTableSize(.Val)
case SettingEnablePush:
.pushEnabled = .Val != 0
case SettingMaxConcurrentStreams:
.clientMaxStreams = .Val
case SettingInitialWindowSize:
return .processSettingInitialWindowSize(.Val)
case SettingMaxFrameSize:
.maxFrameSize = int32(.Val)
case SettingMaxHeaderListSize:
.peerMaxHeaderListSize = .Val
default:
if VerboseLogs {
.vlogf("http2: server ignoring unknown setting %v", )
}
}
return nil
}
func ( *serverConn) ( uint32) error {
.serveG.check()
:= .initialStreamSendWindowSize
.initialStreamSendWindowSize = int32()
:= int32() -
for , := range .streams {
if !.flow.add() {
return .countError("setting_win_size", ConnectionError(ErrCodeFlowControl))
}
}
return nil
}
func ( *serverConn) ( *DataFrame) error {
.serveG.check()
:= .Header().StreamID
:= .Data()
, := .state()
if == 0 || == stateIdle {
return .countError("data_on_idle", ConnectionError(ErrCodeProtocol))
}
if == nil || != stateOpen || .gotTrailerHeader || .resetQueued {
if !.inflow.take(.Length) {
return .countError("data_flow", streamError(, ErrCodeFlowControl))
}
.sendWindowUpdate(nil, int(.Length))
if != nil && .resetQueued {
return nil
}
return .countError("closed", streamError(, ErrCodeStreamClosed))
}
if .body == nil {
panic("internal error: should have a body in this state")
}
if .declBodyBytes != -1 && .bodyBytes+int64(len()) > .declBodyBytes {
if !.inflow.take(.Length) {
return .countError("data_flow", streamError(, ErrCodeFlowControl))
}
.sendWindowUpdate(nil, int(.Length))
.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", .declBodyBytes))
return .countError("send_too_much", streamError(, ErrCodeProtocol))
}
if .Length > 0 {
if !takeInflows(&.inflow, &.inflow, .Length) {
return .countError("flow_on_data_length", streamError(, ErrCodeFlowControl))
}
if len() > 0 {
, := .body.Write()
if != nil {
.sendWindowUpdate(nil, int(.Length)-)
return .countError("body_write_err", streamError(, ErrCodeStreamClosed))
}
if != len() {
panic("internal error: bad Writer")
}
.bodyBytes += int64(len())
}
:= int32(.Length) - int32(len())
.sendWindowUpdate32(nil, )
.sendWindowUpdate32(, )
}
if .StreamEnded() {
.endStream()
}
return nil
}
func ( *serverConn) ( *GoAwayFrame) error {
.serveG.check()
if .ErrCode != ErrCodeNo {
.logf("http2: received GOAWAY %+v, starting graceful shutdown", )
} else {
.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", )
}
.startGracefulShutdownInternal()
.pushEnabled = false
return nil
}
func ( *stream) () bool {
return .id%2 == 0
}
func ( *stream) () {
:= .sc
.serveG.check()
if .declBodyBytes != -1 && .declBodyBytes != .bodyBytes {
.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
.declBodyBytes, .bodyBytes))
} else {
.body.closeWithErrorAndCode(io.EOF, .copyTrailersToHandlerRequest)
.body.CloseWithError(io.EOF)
}
.state = stateHalfClosedRemote
}
func ( *stream) () {
for , := range .trailer {
if , := .reqTrailer[]; {
.reqTrailer[] =
}
}
}
func ( *stream) () {
.body.CloseWithError(fmt.Errorf("%w", os.ErrDeadlineExceeded))
}
func ( *stream) () {
.sc.writeFrameFromHandler(FrameWriteRequest{write: StreamError{
StreamID: .id,
Code: ErrCodeInternal,
Cause: os.ErrDeadlineExceeded,
}})
}
func ( *serverConn) ( *MetaHeadersFrame) error {
.serveG.check()
:= .StreamID
if %2 != 1 {
return .countError("headers_even", ConnectionError(ErrCodeProtocol))
}
if := .streams[.StreamID]; != nil {
if .resetQueued {
return nil
}
if .state == stateHalfClosedRemote {
return .countError("headers_half_closed", streamError(, ErrCodeStreamClosed))
}
return .processTrailerHeaders()
}
if <= .maxClientStreamID {
return .countError("stream_went_down", ConnectionError(ErrCodeProtocol))
}
.maxClientStreamID =
if .idleTimer != nil {
.idleTimer.Stop()
}
if .curClientStreams+1 > .advMaxStreams {
if .unackedSettings == 0 {
return .countError("over_max_streams", streamError(, ErrCodeProtocol))
}
return .countError("over_max_streams_race", streamError(, ErrCodeRefusedStream))
}
:= stateOpen
if .StreamEnded() {
= stateHalfClosedRemote
}
:= .newStream(, 0, )
if .HasPriority() {
if := .checkPriority(.StreamID, .Priority); != nil {
return
}
.writeSched.AdjustStream(.id, .Priority)
}
, , := .newWriterAndRequest(, )
if != nil {
return
}
.reqTrailer = .Trailer
if .reqTrailer != nil {
.trailer = make(http.Header)
}
.body = .Body.(*requestBody).pipe
.declBodyBytes = .ContentLength
:= .handler.ServeHTTP
if .Truncated {
= handleHeaderListTooLong
} else if := checkValidHTTP2RequestHeaders(.Header); != nil {
= new400Handler()
}
if .hs.ReadTimeout != 0 {
.conn.SetReadDeadline(time.Time{})
if .body != nil {
.readDeadline = time.AfterFunc(.hs.ReadTimeout, .onReadTimeout)
}
}
go .runHandler(, , )
return nil
}
func ( *serverConn) ( *http.Request) {
.serveG.check()
:= uint32(1)
.maxClientStreamID =
:= .newStream(, 0, stateHalfClosedRemote)
.reqTrailer = .Trailer
if .reqTrailer != nil {
.trailer = make(http.Header)
}
:= .newResponseWriter(, )
if .hs.ReadTimeout != 0 {
.conn.SetReadDeadline(time.Time{})
}
go .runHandler(, , .handler.ServeHTTP)
}
func ( *stream) ( *MetaHeadersFrame) error {
:= .sc
.serveG.check()
if .gotTrailerHeader {
return .countError("dup_trailers", ConnectionError(ErrCodeProtocol))
}
.gotTrailerHeader = true
if !.StreamEnded() {
return .countError("trailers_not_ended", streamError(.id, ErrCodeProtocol))
}
if len(.PseudoFields()) > 0 {
return .countError("trailers_pseudo", streamError(.id, ErrCodeProtocol))
}
if .trailer != nil {
for , := range .RegularFields() {
:= .canonicalHeader(.Name)
if !httpguts.ValidTrailerHeader() {
return .countError("trailers_bogus", streamError(.id, ErrCodeProtocol))
}
.trailer[] = append(.trailer[], .Value)
}
}
.endStream()
return nil
}
func ( *serverConn) ( uint32, PriorityParam) error {
if == .StreamDep {
return .countError("priority", streamError(, ErrCodeProtocol))
}
return nil
}
func ( *serverConn) ( *PriorityFrame) error {
if := .checkPriority(.StreamID, .PriorityParam); != nil {
return
}
.writeSched.AdjustStream(.StreamID, .PriorityParam)
return nil
}
func ( *serverConn) (, uint32, streamState) *stream {
.serveG.check()
if == 0 {
panic("internal error: cannot create stream with id 0")
}
, := context.WithCancel(.baseCtx)
:= &stream{
sc: ,
id: ,
state: ,
ctx: ,
cancelCtx: ,
}
.cw.Init()
.flow.conn = &.flow
.flow.add(.initialStreamSendWindowSize)
.inflow.init(.srv.initialStreamRecvWindowSize())
if .hs.WriteTimeout != 0 {
.writeDeadline = time.AfterFunc(.hs.WriteTimeout, .onWriteTimeout)
}
.streams[] =
.writeSched.OpenStream(.id, OpenStreamOptions{PusherID: })
if .isPushed() {
.curPushedStreams++
} else {
.curClientStreams++
}
if .curOpenStreams() == 1 {
.setConnState(http.StateActive)
}
return
}
func ( *serverConn) ( *stream, *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
.serveG.check()
:= requestParam{
method: .PseudoValue("method"),
scheme: .PseudoValue("scheme"),
authority: .PseudoValue("authority"),
path: .PseudoValue("path"),
}
:= .method == "CONNECT"
if {
if .path != "" || .scheme != "" || .authority == "" {
return nil, nil, .countError("bad_connect", streamError(.StreamID, ErrCodeProtocol))
}
} else if .method == "" || .path == "" || (.scheme != "https" && .scheme != "http") {
return nil, nil, .countError("bad_path_method", streamError(.StreamID, ErrCodeProtocol))
}
.header = make(http.Header)
for , := range .RegularFields() {
.header.Add(.canonicalHeader(.Name), .Value)
}
if .authority == "" {
.authority = .header.Get("Host")
}
, , := .newWriterAndRequestNoBody(, )
if != nil {
return nil, nil,
}
:= !.StreamEnded()
if {
if , := .header["Content-Length"]; {
if , := strconv.ParseUint([0], 10, 63); == nil {
.ContentLength = int64()
} else {
.ContentLength = 0
}
} else {
.ContentLength = -1
}
.Body.(*requestBody).pipe = &pipe{
b: &dataBuffer{expected: .ContentLength},
}
}
return , , nil
}
type requestParam struct {
method string
scheme, authority, path string
header http.Header
}
func ( *serverConn) ( *stream, requestParam) (*responseWriter, *http.Request, error) {
.serveG.check()
var *tls.ConnectionState
if .scheme == "https" {
= .tlsState
}
:= httpguts.HeaderValuesContainsToken(.header["Expect"], "100-continue")
if {
.header.Del("Expect")
}
if := .header["Cookie"]; len() > 1 {
.header.Set("Cookie", strings.Join(, "; "))
}
var http.Header
for , := range .header["Trailer"] {
for , := range strings.Split(, ",") {
= http.CanonicalHeaderKey(textproto.TrimString())
switch {
case "Transfer-Encoding", "Trailer", "Content-Length":
default:
if == nil {
= make(http.Header)
}
[] = nil
}
}
}
delete(.header, "Trailer")
var *url.URL
var string
if .method == "CONNECT" {
= &url.URL{Host: .authority}
= .authority
} else {
var error
, = url.ParseRequestURI(.path)
if != nil {
return nil, nil, .countError("bad_path", streamError(.id, ErrCodeProtocol))
}
= .path
}
:= &requestBody{
conn: ,
stream: ,
needsContinue: ,
}
:= &http.Request{
Method: .method,
URL: ,
RemoteAddr: .remoteAddrStr,
Header: .header,
RequestURI: ,
Proto: "HTTP/2.0",
ProtoMajor: 2,
ProtoMinor: 0,
TLS: ,
Host: .authority,
Body: ,
Trailer: ,
}
= .WithContext(.ctx)
:= .newResponseWriter(, )
return , , nil
}
func ( *serverConn) ( *stream, *http.Request) *responseWriter {
:= responseWriterStatePool.Get().(*responseWriterState)
:= .bw
* = responseWriterState{}
.conn =
.bw =
.bw.Reset(chunkWriter{})
.stream =
.req =
return &responseWriter{rws: }
}
func ( *serverConn) ( *responseWriter, *http.Request, func(http.ResponseWriter, *http.Request)) {
:= true
defer func() {
.rws.stream.cancelCtx()
if .MultipartForm != nil {
.MultipartForm.RemoveAll()
}
if {
:= recover()
.writeFrameFromHandler(FrameWriteRequest{
write: handlerPanicRST{.rws.stream.id},
stream: .rws.stream,
})
if != nil && != http.ErrAbortHandler {
const = 64 << 10
:= make([]byte, )
= [:runtime.Stack(, false)]
.logf("http2: panic serving %v: %v\n%s", .conn.RemoteAddr(), , )
}
return
}
.handlerDone()
}()
(, )
= false
}
func ( http.ResponseWriter, *http.Request) {
const = 431
.WriteHeader()
io.WriteString(, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
}
func ( *serverConn) ( *stream, *writeResHeaders) error {
.serveG.checkNotOn()
var chan error
if .h != nil {
= errChanPool.Get().(chan error)
}
if := .writeFrameFromHandler(FrameWriteRequest{
write: ,
stream: ,
done: ,
}); != nil {
return
}
if != nil {
select {
case := <-:
errChanPool.Put()
return
case <-.doneServing:
return errClientDisconnected
case <-.cw:
return errStreamClosed
}
}
return nil
}
func ( *serverConn) ( *stream) {
.writeFrameFromHandler(FrameWriteRequest{
write: write100ContinueHeadersFrame{.id},
stream: ,
})
}
type bodyReadMsg struct {
st *stream
n int
}
func ( *serverConn) ( *stream, int, error) {
.serveG.checkNotOn()
if > 0 {
select {
case .bodyReadCh <- bodyReadMsg{, }:
case <-.doneServing:
}
}
}
func ( *serverConn) ( *stream, int) {
.serveG.check()
.sendWindowUpdate(nil, )
if .state != stateHalfClosedRemote && .state != stateClosed {
.sendWindowUpdate(, )
}
}
func ( *serverConn) ( *stream, int32) {
.sendWindowUpdate(, int())
}
func ( *serverConn) ( *stream, int) {
.serveG.check()
var uint32
var int32
if == nil {
= .inflow.add()
} else {
= .id
= .inflow.add()
}
if == 0 {
return
}
.writeFrame(FrameWriteRequest{
write: writeWindowUpdate{streamID: , n: uint32()},
stream: ,
})
}
type requestBody struct {
_ incomparable
stream *stream
conn *serverConn
closeOnce sync.Once
sawEOF bool
pipe *pipe
needsContinue bool
}
func ( *requestBody) () error {
.closeOnce.Do(func() {
if .pipe != nil {
.pipe.BreakWithError(errClosedBody)
}
})
return nil
}
func ( *requestBody) ( []byte) ( int, error) {
if .needsContinue {
.needsContinue = false
.conn.write100ContinueHeaders(.stream)
}
if .pipe == nil || .sawEOF {
return 0, io.EOF
}
, = .pipe.Read()
if == io.EOF {
.sawEOF = true
}
if .conn == nil && inTests {
return
}
.conn.noteBodyReadFromHandler(.stream, , )
return
}
type responseWriter struct {
rws *responseWriterState
}
var (
_ http.CloseNotifier = (*responseWriter)(nil)
_ http.Flusher = (*responseWriter)(nil)
_ stringWriter = (*responseWriter)(nil)
)
type responseWriterState struct {
stream *stream
req *http.Request
conn *serverConn
bw *bufio.Writer
handlerHeader http.Header
snapHeader http.Header
trailers []string
status int
wroteHeader bool
sentHeader bool
handlerDone bool
dirty bool
sentContentLen int64
wroteBytes int64
closeNotifierMu sync.Mutex
closeNotifierCh chan bool
}
type chunkWriter struct{ rws *responseWriterState }
func ( chunkWriter) ( []byte) ( int, error) {
, = .rws.writeChunk()
if == errStreamClosed {
= .rws.stream.closeErr
}
return ,
}
func ( *responseWriterState) () bool { return len(.trailers) > 0 }
func ( *responseWriterState) () bool {
for , := range .trailers {
if , := .handlerHeader[]; {
return true
}
}
return false
}
func ( *responseWriterState) ( string) {
= http.CanonicalHeaderKey()
if !httpguts.ValidTrailerHeader() {
.conn.logf("ignoring invalid trailer %q", )
return
}
if !strSliceContains(.trailers, ) {
.trailers = append(.trailers, )
}
}
func ( *responseWriterState) ( []byte) ( int, error) {
if !.wroteHeader {
.writeHeader(200)
}
if .handlerDone {
.promoteUndeclaredTrailers()
}
:= .req.Method == "HEAD"
if !.sentHeader {
.sentHeader = true
var , string
if = .snapHeader.Get("Content-Length"); != "" {
.snapHeader.Del("Content-Length")
if , := strconv.ParseUint(, 10, 63); == nil {
.sentContentLen = int64()
} else {
= ""
}
}
if == "" && .handlerDone && bodyAllowedForStatus(.status) && (len() > 0 || !) {
= strconv.Itoa(len())
}
, := .snapHeader["Content-Type"]
:= .snapHeader.Get("Content-Encoding")
:= len() > 0
if ! && ! && bodyAllowedForStatus(.status) && len() > 0 {
= http.DetectContentType()
}
var string
if , := .snapHeader["Date"]; ! {
= time.Now().UTC().Format(http.TimeFormat)
}
for , := range .snapHeader["Trailer"] {
foreachHeaderElement(, .declareTrailer)
}
if , := .snapHeader["Connection"]; {
:= .snapHeader.Get("Connection")
delete(.snapHeader, "Connection")
if == "close" {
.conn.startGracefulShutdown()
}
}
:= (.handlerDone && !.hasTrailers() && len() == 0) ||
= .conn.writeHeaders(.stream, &writeResHeaders{
streamID: .stream.id,
httpResCode: .status,
h: .snapHeader,
endStream: ,
contentType: ,
contentLength: ,
date: ,
})
if != nil {
.dirty = true
return 0,
}
if {
return 0, nil
}
}
if {
return len(), nil
}
if len() == 0 && !.handlerDone {
return 0, nil
}
:= .hasNonemptyTrailers()
:= .handlerDone && !
if len() > 0 || {
if := .conn.writeDataFromHandler(.stream, , ); != nil {
.dirty = true
return 0,
}
}
if .handlerDone && {
= .conn.writeHeaders(.stream, &writeResHeaders{
streamID: .stream.id,
h: .handlerHeader,
trailers: .trailers,
endStream: true,
})
if != nil {
.dirty = true
}
return len(),
}
return len(), nil
}
const TrailerPrefix = "Trailer:"
func ( *responseWriterState) () {
for , := range .handlerHeader {
if !strings.HasPrefix(, TrailerPrefix) {
continue
}
:= strings.TrimPrefix(, TrailerPrefix)
.declareTrailer()
.handlerHeader[http.CanonicalHeaderKey()] =
}
if len(.trailers) > 1 {
:= sorterPool.Get().(*sorter)
.SortStrings(.trailers)
sorterPool.Put()
}
}
func ( *responseWriter) ( time.Time) error {
:= .rws.stream
if !.IsZero() && .Before(time.Now()) {
.onReadTimeout()
return nil
}
.rws.conn.sendServeMsg(func( *serverConn) {
if .readDeadline != nil {
if !.readDeadline.Stop() {
return
}
}
if .IsZero() {
.readDeadline = nil
} else if .readDeadline == nil {
.readDeadline = time.AfterFunc(.Sub(time.Now()), .onReadTimeout)
} else {
.readDeadline.Reset(.Sub(time.Now()))
}
})
return nil
}
func ( *responseWriter) ( time.Time) error {
:= .rws.stream
if !.IsZero() && .Before(time.Now()) {
.onWriteTimeout()
return nil
}
.rws.conn.sendServeMsg(func( *serverConn) {
if .writeDeadline != nil {
if !.writeDeadline.Stop() {
return
}
}
if .IsZero() {
.writeDeadline = nil
} else if .writeDeadline == nil {
.writeDeadline = time.AfterFunc(.Sub(time.Now()), .onWriteTimeout)
} else {
.writeDeadline.Reset(.Sub(time.Now()))
}
})
return nil
}
func ( *responseWriter) () {
.FlushError()
}
func ( *responseWriter) () error {
:= .rws
if == nil {
panic("Header called after Handler finished")
}
var error
if .bw.Buffered() > 0 {
= .bw.Flush()
} else {
_, = chunkWriter{}.Write(nil)
if == nil {
select {
case <-.stream.cw:
= .stream.closeErr
default:
}
}
}
return
}
func ( *responseWriter) () <-chan bool {
:= .rws
if == nil {
panic("CloseNotify called after Handler finished")
}
.closeNotifierMu.Lock()
:= .closeNotifierCh
if == nil {
= make(chan bool, 1)
.closeNotifierCh =
:= .stream.cw
go func() {
.Wait()
<- true
}()
}
.closeNotifierMu.Unlock()
return
}
func ( *responseWriter) () http.Header {
:= .rws
if == nil {
panic("Header called after Handler finished")
}
if .handlerHeader == nil {
.handlerHeader = make(http.Header)
}
return .handlerHeader
}
func ( int) {
if < 100 || > 999 {
panic(fmt.Sprintf("invalid WriteHeader code %v", ))
}
}
func ( *responseWriter) ( int) {
:= .rws
if == nil {
panic("WriteHeader called after Handler finished")
}
.writeHeader()
}
func ( *responseWriterState) ( int) {
if .wroteHeader {
return
}
checkWriteHeaderCode()
if >= 100 && <= 199 {
:= .handlerHeader
, := ["Content-Length"]
, := ["Transfer-Encoding"]
if || {
= .Clone()
.Del("Content-Length")
.Del("Transfer-Encoding")
}
if .conn.writeHeaders(.stream, &writeResHeaders{
streamID: .stream.id,
httpResCode: ,
h: ,
endStream: .handlerDone && !.hasTrailers(),
}) != nil {
.dirty = true
}
return
}
.wroteHeader = true
.status =
if len(.handlerHeader) > 0 {
.snapHeader = cloneHeader(.handlerHeader)
}
}
func ( http.Header) http.Header {
:= make(http.Header, len())
for , := range {
:= make([]string, len())
copy(, )
[] =
}
return
}
func ( *responseWriter) ( []byte) ( int, error) {
return .write(len(), , "")
}
func ( *responseWriter) ( string) ( int, error) {
return .write(len(), nil, )
}
func ( *responseWriter) ( int, []byte, string) ( int, error) {
:= .rws
if == nil {
panic("Write called after Handler finished")
}
if !.wroteHeader {
.WriteHeader(200)
}
if !bodyAllowedForStatus(.status) {
return 0, http.ErrBodyNotAllowed
}
.wroteBytes += int64(len()) + int64(len())
if .sentContentLen != 0 && .wroteBytes > .sentContentLen {
return 0, errors.New("http2: handler wrote more than declared Content-Length")
}
if != nil {
return .bw.Write()
} else {
return .bw.WriteString()
}
}
func ( *responseWriter) () {
:= .rws
:= .dirty
.handlerDone = true
.Flush()
.rws = nil
if ! {
responseWriterStatePool.Put()
}
}
var (
ErrRecursivePush = errors.New("http2: recursive push not allowed")
ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
)
var _ http.Pusher = (*responseWriter)(nil)
func ( *responseWriter) ( string, *http.PushOptions) error {
:= .rws.stream
:= .sc
.serveG.checkNotOn()
if .isPushed() {
return ErrRecursivePush
}
if == nil {
= new(http.PushOptions)
}
if .Method == "" {
.Method = "GET"
}
if .Header == nil {
.Header = http.Header{}
}
:= "http"
if .rws.req.TLS != nil {
= "https"
}
, := url.Parse()
if != nil {
return
}
if .Scheme == "" {
if !strings.HasPrefix(, "/") {
return fmt.Errorf("target must be an absolute URL or an absolute path: %q", )
}
.Scheme =
.Host = .rws.req.Host
} else {
if .Scheme != {
return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", .Scheme, )
}
if .Host == "" {
return errors.New("URL must have a host")
}
}
for := range .Header {
if strings.HasPrefix(, ":") {
return fmt.Errorf("promised request headers cannot include pseudo header %q", )
}
if asciiEqualFold(, "content-length") ||
asciiEqualFold(, "content-encoding") ||
asciiEqualFold(, "trailer") ||
asciiEqualFold(, "te") ||
asciiEqualFold(, "expect") ||
asciiEqualFold(, "host") {
return fmt.Errorf("promised request headers cannot include %q", )
}
}
if := checkValidHTTP2RequestHeaders(.Header); != nil {
return
}
if .Method != "GET" && .Method != "HEAD" {
return fmt.Errorf("method %q must be GET or HEAD", .Method)
}
:= &startPushRequest{
parent: ,
method: .Method,
url: ,
header: cloneHeader(.Header),
done: errChanPool.Get().(chan error),
}
select {
case <-.doneServing:
return errClientDisconnected
case <-.cw:
return errStreamClosed
case .serveMsgCh <- :
}
select {
case <-.doneServing:
return errClientDisconnected
case <-.cw:
return errStreamClosed
case := <-.done:
errChanPool.Put(.done)
return
}
}
type startPushRequest struct {
parent *stream
method string
url *url.URL
header http.Header
done chan error
}
func ( *serverConn) ( *startPushRequest) {
.serveG.check()
if .parent.state != stateOpen && .parent.state != stateHalfClosedRemote {
.done <- errStreamClosed
return
}
if !.pushEnabled {
.done <- http.ErrNotSupported
return
}
:= func() (uint32, error) {
.serveG.check()
if !.pushEnabled {
return 0, http.ErrNotSupported
}
if .curPushedStreams+1 > .clientMaxStreams {
return 0, ErrPushLimitReached
}
if .maxPushPromiseID+2 >= 1<<31 {
.startGracefulShutdownInternal()
return 0, ErrPushLimitReached
}
.maxPushPromiseID += 2
:= .maxPushPromiseID
:= .newStream(, .parent.id, stateHalfClosedRemote)
, , := .newWriterAndRequestNoBody(, requestParam{
method: .method,
scheme: .url.Scheme,
authority: .url.Host,
path: .url.RequestURI(),
header: cloneHeader(.header),
})
if != nil {
panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", .url, ))
}
go .runHandler(, , .handler.ServeHTTP)
return , nil
}
.writeFrame(FrameWriteRequest{
write: &writePushPromise{
streamID: .parent.id,
method: .method,
url: .url,
h: .header,
allocatePromisedID: ,
},
stream: .parent,
done: .done,
})
}
func ( string, func(string)) {
= textproto.TrimString()
if == "" {
return
}
if !strings.Contains(, ",") {
()
return
}
for , := range strings.Split(, ",") {
if = textproto.TrimString(); != "" {
()
}
}
}
var connHeaders = []string{
"Connection",
"Keep-Alive",
"Proxy-Connection",
"Transfer-Encoding",
"Upgrade",
}
func ( http.Header) error {
for , := range connHeaders {
if , := []; {
return fmt.Errorf("request header %q is not valid in HTTP/2", )
}
}
:= ["Te"]
if len() > 0 && (len() > 1 || ([0] != "trailers" && [0] != "")) {
return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
}
return nil
}
func ( error) http.HandlerFunc {
return func( http.ResponseWriter, *http.Request) {
http.Error(, .Error(), http.StatusBadRequest)
}
}
func ( *http.Server) bool {
var interface{} =
type interface {
() bool
}
if , := .(); {
return !.()
}
return false
}
func ( *serverConn) ( string, error) error {
if == nil || .srv == nil {
return
}
:= .srv.CountError
if == nil {
return
}
var string
var ErrCode
switch e := .(type) {
case ConnectionError:
= "conn"
= ErrCode()
case StreamError:
= "stream"
= ErrCode(.Code)
default:
return
}
:= errCodeName[]
if == "" {
= strconv.Itoa(int())
}
(fmt.Sprintf("%s_%s_%s", , , ))
return
}