package transport
import (
rand
istatus
)
var (
ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)
var serverConnectionCounter uint64
type http2Server struct {
lastRead int64
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{}
loopyWriterDone chan struct{}
peer peer.Peer
inTapHandle tap.ServerInHandle
framer *framer
maxStreams uint32
controlBuf *controlBuffer
fc *trInFlow
stats []stats.Handler
kp keepalive.ServerParameters
kep keepalive.EnforcementPolicy
lastPingAt time.Time
pingStrikes uint8
resetPingStrikes uint32
initialWindowSize int32
bdpEst *bdpEstimator
maxSendHeaderListSize *uint32
mu sync.Mutex
drainEvent *grpcsync.Event
state transportState
activeStreams map[uint32]*ServerStream
idle time.Time
channelz *channelz.Socket
bufferPool mem.BufferPool
connectionID uint64
maxStreamMu sync.Mutex
maxStreamID uint32
logger *grpclog.PrefixLogger
setResetPingStrikes func()
}
func ( net.Conn, *ServerConfig) ( ServerTransport, error) {
var credentials.AuthInfo
:=
if .Credentials != nil {
var error
, , = .Credentials.ServerHandshake()
if != nil {
if == credentials.ErrConnDispatched || == io.EOF {
return nil,
}
return nil, connectionErrorf(false, , "ServerHandshake(%q) failed: %v", .RemoteAddr(), )
}
}
:= .WriteBufferSize
:= .ReadBufferSize
:= defaultServerMaxHeaderListSize
if .MaxHeaderListSize != nil {
= *.MaxHeaderListSize
}
:= newFramer(, , , .SharedWriteBuffer, )
:= []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
if .MaxStreams != math.MaxUint32 {
= append(, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: .MaxStreams,
})
}
:= int32(initialWindowSize)
if .InitialWindowSize >= defaultWindowSize {
= .InitialWindowSize
}
:= int32(initialWindowSize)
if .InitialConnWindowSize >= defaultWindowSize {
= .InitialConnWindowSize
}
if != defaultWindowSize {
= append(, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32()})
}
if .MaxHeaderListSize != nil {
= append(, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *.MaxHeaderListSize,
})
}
if .HeaderTableSize != nil {
= append(, http2.Setting{
ID: http2.SettingHeaderTableSize,
Val: *.HeaderTableSize,
})
}
if := .fr.WriteSettings(...); != nil {
return nil, connectionErrorf(false, , "transport: %v", )
}
if := uint32( - defaultWindowSize); > 0 {
if := .fr.WriteWindowUpdate(0, ); != nil {
return nil, connectionErrorf(false, , "transport: %v", )
}
}
:= .KeepaliveParams
if .MaxConnectionIdle == 0 {
.MaxConnectionIdle = defaultMaxConnectionIdle
}
if .MaxConnectionAge == 0 {
.MaxConnectionAge = defaultMaxConnectionAge
}
.MaxConnectionAge += getJitter(.MaxConnectionAge)
if .MaxConnectionAgeGrace == 0 {
.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
if .Time == 0 {
.Time = defaultServerKeepaliveTime
}
if .Timeout == 0 {
.Timeout = defaultServerKeepaliveTimeout
}
if .Time != infinity {
if = syscall.SetTCPUserTimeout(, .Timeout); != nil {
return nil, connectionErrorf(false, , "transport: failed to set TCP_USER_TIMEOUT: %v", )
}
}
:= .KeepalivePolicy
if .MinTime == 0 {
.MinTime = defaultKeepalivePolicyMinTime
}
:= make(chan struct{})
:= peer.Peer{
Addr: .RemoteAddr(),
LocalAddr: .LocalAddr(),
AuthInfo: ,
}
:= &http2Server{
done: ,
conn: ,
peer: ,
framer: ,
readerDone: make(chan struct{}),
loopyWriterDone: make(chan struct{}),
maxStreams: .MaxStreams,
inTapHandle: .InTapHandle,
fc: &trInFlow{limit: uint32()},
state: reachable,
activeStreams: make(map[uint32]*ServerStream),
stats: .StatsHandlers,
kp: ,
idle: time.Now(),
kep: ,
initialWindowSize: ,
bufferPool: .BufferPool,
}
.setResetPingStrikes = func() {
atomic.StoreUint32(&.resetPingStrikes, 1)
}
var credentials.ChannelzSecurityValue
if , := .(credentials.ChannelzSecurityInfo); {
= .GetSecurityValue()
}
.channelz = channelz.RegisterSocket(
&channelz.Socket{
SocketType: channelz.SocketTypeNormal,
Parent: .ChannelzParent,
SocketMetrics: channelz.SocketMetrics{},
EphemeralMetrics: .socketMetrics,
LocalAddr: .peer.LocalAddr,
RemoteAddr: .peer.Addr,
SocketOptions: channelz.GetSocketOption(.conn),
Security: ,
},
)
.logger = prefixLoggerForServerTransport()
.controlBuf = newControlBuffer(.done)
if !.StaticWindowSize {
.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: .updateFlowControl,
}
}
.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
.framer.writer.Flush()
defer func() {
if != nil {
.Close()
}
}()
:= make([]byte, len(clientPreface))
if , := io.ReadFull(.conn, ); != nil {
if == io.EOF {
return nil, io.EOF
}
return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to receive the preface from client: %v", )
}
if !bytes.Equal(, clientPreface) {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", )
}
, := .framer.fr.ReadFrame()
if == io.EOF || == io.ErrUnexpectedEOF {
return nil,
}
if != nil {
return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to read initial settings frame: %v", )
}
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
, := .(*http2.SettingsFrame)
if ! {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", )
}
.handleSettings()
go func() {
.loopy = newLoopyWriter(serverSide, .framer, .controlBuf, .bdpEst, .conn, .logger, .outgoingGoAwayHandler, .bufferPool)
:= .loopy.run()
close(.loopyWriterDone)
if !isIOError() {
:= time.NewTimer(time.Second)
defer .Stop()
select {
case <-.readerDone:
case <-.C:
}
.conn.Close()
}
}()
go .keepalive()
return , nil
}
func ( *http2Server) ( context.Context, *http2.MetaHeadersFrame, func(*ServerStream)) error {
.maxStreamMu.Lock()
defer .maxStreamMu.Unlock()
:= .Header().StreamID
if .Truncated {
.controlBuf.put(&cleanupStream{
streamID: ,
rst: true,
rstCode: http2.ErrCodeFrameSize,
onWrite: func() {},
})
return nil
}
if %2 != 1 || <= .maxStreamID {
return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", , )
}
.maxStreamID =
:= newRecvBuffer()
:= &ServerStream{
Stream: &Stream{
id: ,
buf: ,
fc: &inFlow{limit: uint32(.initialWindowSize)},
},
st: ,
headerWireLength: int(.Header().Length),
}
var (
= false
= ""
= make(metadata.MD, len(.Fields))
string
bool
*status.Status
bool
time.Duration
)
for , := range .Fields {
switch .Name {
case "content-type":
, := grpcutil.ContentSubtype(.Value)
if ! {
= .Value
break
}
[.Name] = append([.Name], .Value)
.contentSubtype =
= true
case "grpc-accept-encoding":
[.Name] = append([.Name], .Value)
if .Value == "" {
continue
}
:= .Value
if .clientAdvertisedCompressors != "" {
= .clientAdvertisedCompressors + "," +
}
.clientAdvertisedCompressors =
case "grpc-encoding":
.recvCompress = .Value
case ":method":
= .Value
case ":path":
.method = .Value
case "grpc-timeout":
= true
var error
if , = decodeTimeout(.Value); != nil {
= status.Newf(codes.Internal, "malformed grpc-timeout: %v", )
}
case "connection":
if .logger.V(logLevel) {
.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
}
= true
default:
if isReservedHeader(.Name) && !isWhitelistedHeader(.Name) {
break
}
, := decodeMetadataHeader(.Name, .Value)
if != nil {
= status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", .Value, .Name, )
.logger.Warningf("Failed to decode metadata header (%q, %q): %v", .Name, .Value, )
break
}
[.Name] = append([.Name], )
}
}
if len([":authority"]) > 1 || len(["host"]) > 1 {
:= fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len([":authority"]), len(["host"]))
if .logger.V(logLevel) {
.logger.Infof("Aborting the stream early: %v", )
}
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: ,
contentSubtype: .contentSubtype,
status: status.New(codes.Internal, ),
rst: !.StreamEnded(),
})
return nil
}
if {
.controlBuf.put(&cleanupStream{
streamID: ,
rst: true,
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
return nil
}
if ! {
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusUnsupportedMediaType,
streamID: ,
contentSubtype: .contentSubtype,
status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", ),
rst: !.StreamEnded(),
})
return nil
}
if != nil {
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: ,
contentSubtype: .contentSubtype,
status: ,
rst: !.StreamEnded(),
})
return nil
}
if len([":authority"]) == 0 {
if , := ["host"]; {
[":authority"] =
delete(, "host")
}
} else {
delete(, "host")
}
if .StreamEnded() {
.state = streamReadDone
}
if {
.ctx, .cancel = context.WithTimeout(, )
} else {
.ctx, .cancel = context.WithCancel()
}
if len() > 0 {
.ctx = metadata.NewIncomingContext(.ctx, )
}
.mu.Lock()
if .state != reachable {
.mu.Unlock()
.cancel()
return nil
}
if uint32(len(.activeStreams)) >= .maxStreams {
.mu.Unlock()
.controlBuf.put(&cleanupStream{
streamID: ,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
.cancel()
return nil
}
if != http.MethodPost {
.mu.Unlock()
:= fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", )
if .logger.V(logLevel) {
.logger.Infof("Aborting the stream early: %v", )
}
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusMethodNotAllowed,
streamID: ,
contentSubtype: .contentSubtype,
status: status.New(codes.Internal, ),
rst: !.StreamEnded(),
})
.cancel()
return nil
}
if .inTapHandle != nil {
var error
if .ctx, = .inTapHandle(.ctx, &tap.Info{FullMethodName: .method, Header: }); != nil {
.mu.Unlock()
if .logger.V(logLevel) {
.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", )
}
, := status.FromError()
if ! {
= status.New(codes.PermissionDenied, .Error())
}
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusOK,
streamID: .id,
contentSubtype: .contentSubtype,
status: ,
rst: !.StreamEnded(),
})
return nil
}
}
if .ctx.Err() != nil {
.mu.Unlock()
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusOK,
streamID: .id,
contentSubtype: .contentSubtype,
status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
rst: !.StreamEnded(),
})
return nil
}
.activeStreams[] =
if len(.activeStreams) == 1 {
.idle = time.Time{}
}
if {
:= make(chan struct{})
:= internal.TimeAfterFunc(, func() {
<-
.closeStream(, true, http2.ErrCodeCancel, false)
})
:= .cancel
.cancel = func() {
()
.Stop()
}
close()
}
.mu.Unlock()
if channelz.IsOn() {
.channelz.SocketMetrics.StreamsStarted.Add(1)
.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
.requestRead = func( int) {
.adjustWindow(, uint32())
}
.ctxDone = .ctx.Done()
.wq = newWriteQuota(defaultWriteQuota, .ctxDone)
.trReader = &transportReader{
reader: &recvBufferReader{
ctx: .ctx,
ctxDone: .ctxDone,
recv: .buf,
},
windowHandler: func( int) {
.updateWindow(, uint32())
},
}
.controlBuf.put(®isterStream{
streamID: .id,
wq: .wq,
})
()
return nil
}
func ( *http2Server) ( context.Context, func(*ServerStream)) {
defer func() {
close(.readerDone)
<-.loopyWriterDone
}()
for {
.controlBuf.throttle()
, := .framer.fr.ReadFrame()
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
if != nil {
if , := .(http2.StreamError); {
if .logger.V(logLevel) {
.logger.Warningf("Encountered http2.StreamError: %v", )
}
.mu.Lock()
:= .activeStreams[.StreamID]
.mu.Unlock()
if != nil {
.closeStream(, true, .Code, false)
} else {
.controlBuf.put(&cleanupStream{
streamID: .StreamID,
rst: true,
rstCode: .Code,
onWrite: func() {},
})
}
continue
}
.Close()
return
}
switch frame := .(type) {
case *http2.MetaHeadersFrame:
if := .operateHeaders(, , ); != nil {
.controlBuf.put(&goAway{
code: http2.ErrCodeProtocol,
debugData: []byte(.Error()),
closeConn: ,
})
continue
}
case *http2.DataFrame:
.handleData()
case *http2.RSTStreamFrame:
.handleRSTStream()
case *http2.SettingsFrame:
.handleSettings()
case *http2.PingFrame:
.handlePing()
case *http2.WindowUpdateFrame:
.handleWindowUpdate()
case *http2.GoAwayFrame:
default:
if .logger.V(logLevel) {
.logger.Infof("Received unsupported frame type %T", )
}
}
}
}
func ( *http2Server) ( http2.Frame) (*ServerStream, bool) {
.mu.Lock()
defer .mu.Unlock()
if .activeStreams == nil {
return nil, false
}
, := .activeStreams[.Header().StreamID]
if ! {
return nil, false
}
return , true
}
func ( *http2Server) ( *ServerStream, uint32) {
if := .fc.maybeAdjust(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
}
}
func ( *http2Server) ( *ServerStream, uint32) {
if := .fc.onRead(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id,
increment: ,
})
}
}
func ( *http2Server) ( uint32) {
.mu.Lock()
for , := range .activeStreams {
.fc.newLimit()
}
.initialWindowSize = int32()
.mu.Unlock()
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: .fc.newLimit(),
})
.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
Val: ,
},
},
})
}
func ( *http2Server) ( *http2.DataFrame) {
:= .Header().Length
var bool
if .bdpEst != nil {
= .bdpEst.add()
}
if := .fc.onData(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
if {
if := .fc.reset(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
.controlBuf.put(bdpPing)
}
, := .getStream()
if ! {
return
}
if .getState() == streamReadDone {
.closeStream(, true, http2.ErrCodeStreamClosed, false)
return
}
if > 0 {
if := .fc.onData(); != nil {
.closeStream(, true, http2.ErrCodeFlowControl, false)
return
}
if .Header().Flags.Has(http2.FlagDataPadded) {
if := .fc.onRead( - uint32(len(.Data()))); > 0 {
.controlBuf.put(&outgoingWindowUpdate{.id, })
}
}
if len(.Data()) > 0 {
:= .bufferPool
if == nil {
= mem.DefaultBufferPool()
}
.write(recvMsg{buffer: mem.Copy(.Data(), )})
}
}
if .StreamEnded() {
.compareAndSwapState(streamActive, streamReadDone)
.write(recvMsg{err: io.EOF})
}
}
func ( *http2Server) ( *http2.RSTStreamFrame) {
if , := .getStream(); {
.closeStream(, false, 0, false)
return
}
.controlBuf.put(&cleanupStream{
streamID: .Header().StreamID,
rst: false,
rstCode: 0,
onWrite: func() {},
})
}
func ( *http2Server) ( *http2.SettingsFrame) {
if .IsAck() {
return
}
var []http2.Setting
var []func()
.ForeachSetting(func( http2.Setting) error {
switch .ID {
case http2.SettingMaxHeaderListSize:
= append(, func() {
.maxSendHeaderListSize = new(uint32)
*.maxSendHeaderListSize = .Val
})
default:
= append(, )
}
return nil
})
.controlBuf.executeAndPut(func() bool {
for , := range {
()
}
return true
}, &incomingSettings{
ss: ,
})
}
const (
maxPingStrikes = 2
defaultPingTimeout = 2 * time.Hour
)
func ( *http2Server) ( *http2.PingFrame) {
if .IsAck() {
if .Data == goAwayPing.data && .drainEvent != nil {
.drainEvent.Fire()
return
}
if .bdpEst != nil {
.bdpEst.calculate(.Data)
}
return
}
:= &ping{ack: true}
copy(.data[:], .Data[:])
.controlBuf.put()
:= time.Now()
defer func() {
.lastPingAt =
}()
if atomic.CompareAndSwapUint32(&.resetPingStrikes, 1, 0) {
.pingStrikes = 0
return
}
.mu.Lock()
:= len(.activeStreams)
.mu.Unlock()
if < 1 && !.kep.PermitWithoutStream {
if .lastPingAt.Add(defaultPingTimeout).After() {
.pingStrikes++
}
} else {
if .lastPingAt.Add(.kep.MinTime).After() {
.pingStrikes++
}
}
if .pingStrikes > maxPingStrikes {
.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
}
}
func ( *http2Server) ( *http2.WindowUpdateFrame) {
.controlBuf.put(&incomingWindowUpdate{
streamID: .Header().StreamID,
increment: .Increment,
})
}
func ( []hpack.HeaderField, metadata.MD) []hpack.HeaderField {
for , := range {
if isReservedHeader() {
continue
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
return
}
func ( *http2Server) ( any) bool {
if .maxSendHeaderListSize == nil {
return true
}
:= .(*headerFrame)
var int64
for , := range .hf {
if += int64(.Size()); > int64(*.maxSendHeaderListSize) {
if .logger.V(logLevel) {
.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *.maxSendHeaderListSize)
}
return false
}
}
return true
}
func ( *http2Server) ( *ServerStream) error {
select {
case <-.done:
return ErrConnClosing
default:
}
return ContextErr(.ctx.Err())
}
func ( *http2Server) ( *ServerStream, metadata.MD) error {
.hdrMu.Lock()
defer .hdrMu.Unlock()
if .getState() == streamDone {
return .streamContextErr()
}
if .updateHeaderSent() {
return ErrIllegalHeaderWrite
}
if .Len() > 0 {
if .header.Len() > 0 {
.header = metadata.Join(.header, )
} else {
.header =
}
}
if := .writeHeaderLocked(); != nil {
switch e := .(type) {
case ConnectionError:
return status.Error(codes.Unavailable, .Desc)
default:
return status.Convert().Err()
}
}
return nil
}
func ( *http2Server) ( *ServerStream) error {
:= make([]hpack.HeaderField, 0, 2)
= append(, hpack.HeaderField{Name: ":status", Value: "200"})
= append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
if .sendCompress != "" {
= append(, hpack.HeaderField{Name: "grpc-encoding", Value: .sendCompress})
}
= appendHeaderFieldsFromMD(, .header)
:= &headerFrame{
streamID: .id,
hf: ,
endStream: false,
onWrite: .setResetPingStrikes,
}
, := .controlBuf.executeAndPut(func() bool { return .checkForHeaderListSize() }, )
if ! {
if != nil {
return
}
.closeStream(, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
for , := range .stats {
:= &stats.OutHeader{
Header: .header.Copy(),
Compression: .sendCompress,
}
.HandleRPC(.Context(), )
}
return nil
}
func ( *http2Server) ( *ServerStream, *status.Status) error {
.hdrMu.Lock()
defer .hdrMu.Unlock()
if .getState() == streamDone {
return nil
}
:= make([]hpack.HeaderField, 0, 2)
if !.updateHeaderSent() {
if len(.header) > 0 {
if := .writeHeaderLocked(); != nil {
return
}
} else {
= append(, hpack.HeaderField{Name: ":status", Value: "200"})
= append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
}
}
= append(, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(.Code()))})
= append(, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(.Message())})
if := istatus.RawStatusProto(); len(.GetDetails()) > 0 {
delete(.trailer, grpcStatusDetailsBinHeader)
, := proto.Marshal()
if != nil {
.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(), )
} else {
= append(, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader()})
}
}
= appendHeaderFieldsFromMD(, .trailer)
:= &headerFrame{
streamID: .id,
hf: ,
endStream: true,
onWrite: .setResetPingStrikes,
}
, := .controlBuf.executeAndPut(func() bool {
return .checkForHeaderListSize()
}, nil)
if ! {
if != nil {
return
}
.closeStream(, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
:= .getState() == streamActive
.finishStream(, , http2.ErrCodeNo, , true)
for , := range .stats {
.HandleRPC(.Context(), &stats.OutTrailer{
Trailer: .trailer.Copy(),
})
}
return nil
}
func ( *http2Server) ( *ServerStream, []byte, mem.BufferSlice, *WriteOptions) error {
if !.isHeaderSent() {
if := .writeHeader(, nil); != nil {
return
}
} else {
if .getState() == streamDone {
return .streamContextErr()
}
}
:= &dataFrame{
streamID: .id,
h: ,
data: ,
onEachWrite: .setResetPingStrikes,
}
:= .Len()
if := .wq.get(int32(len() + )); != nil {
return .streamContextErr()
}
.Ref()
if := .controlBuf.put(); != nil {
.Free()
return
}
.incrMsgSent()
return nil
}
func ( *http2Server) () {
:= &ping{}
:= false
:= time.Duration(0)
:= time.Now().UnixNano()
:= time.NewTimer(.kp.MaxConnectionIdle)
:= time.NewTimer(.kp.MaxConnectionAge)
:= time.NewTimer(.kp.Time)
defer func() {
.Stop()
.Stop()
.Stop()
}()
for {
select {
case <-.C:
.mu.Lock()
:= .idle
if .IsZero() {
.mu.Unlock()
.Reset(.kp.MaxConnectionIdle)
continue
}
:= .kp.MaxConnectionIdle - time.Since()
.mu.Unlock()
if <= 0 {
.Drain("max_idle")
return
}
.Reset()
case <-.C:
.Drain("max_age")
.Reset(.kp.MaxConnectionAgeGrace)
select {
case <-.C:
if .logger.V(logLevel) {
.logger.Infof("Closing server transport due to maximum connection age")
}
.controlBuf.put(closeConnection{})
case <-.done:
}
return
case <-.C:
:= atomic.LoadInt64(&.lastRead)
if > {
= false
.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
=
continue
}
if && <= 0 {
.Close(fmt.Errorf("keepalive ping not acked within timeout %s", .kp.Timeout))
return
}
if ! {
if channelz.IsOn() {
.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
.controlBuf.put()
= .kp.Timeout
= true
}
:= min(.kp.Time, )
-=
.Reset()
case <-.done:
return
}
}
}
func ( *http2Server) ( error) {
.mu.Lock()
if .state == closing {
.mu.Unlock()
return
}
if .logger.V(logLevel) {
.logger.Infof("Closing: %v", )
}
.state = closing
:= .activeStreams
.activeStreams = nil
.mu.Unlock()
.controlBuf.finish()
close(.done)
if := .conn.Close(); != nil && .logger.V(logLevel) {
.logger.Infof("Error closing underlying net.Conn during Close: %v", )
}
channelz.RemoveEntry(.channelz.ID)
for , := range {
.cancel()
}
}
func ( *http2Server) ( *ServerStream, bool) {
.mu.Lock()
if , := .activeStreams[.id]; {
delete(.activeStreams, .id)
if len(.activeStreams) == 0 {
.idle = time.Now()
}
}
.mu.Unlock()
if channelz.IsOn() {
if {
.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
}
func ( *http2Server) ( *ServerStream, bool, http2.ErrCode, *headerFrame, bool) {
.cancel()
:= .swapState(streamDone)
if == streamDone {
return
}
.cleanup = &cleanupStream{
streamID: .id,
rst: ,
rstCode: ,
onWrite: func() {
.deleteStream(, )
},
}
.controlBuf.put()
}
func ( *http2Server) ( *ServerStream, bool, http2.ErrCode, bool) {
.cancel()
:= .swapState(streamDone)
if == streamDone {
return
}
.deleteStream(, )
.controlBuf.put(&cleanupStream{
streamID: .id,
rst: ,
rstCode: ,
onWrite: func() {},
})
}
func ( *http2Server) ( string) {
.mu.Lock()
defer .mu.Unlock()
if .drainEvent != nil {
return
}
.drainEvent = grpcsync.NewEvent()
.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(), headsUp: true})
}
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
func ( *http2Server) ( *goAway) (bool, error) {
.maxStreamMu.Lock()
.mu.Lock()
if .state == closing {
.mu.Unlock()
.maxStreamMu.Unlock()
return false, ErrConnClosing
}
if !.headsUp {
.state = draining
:= .maxStreamID
:= .closeConn
if len(.activeStreams) == 0 {
= errors.New("second GOAWAY written and no active streams left to process")
}
.mu.Unlock()
.maxStreamMu.Unlock()
if := .framer.fr.WriteGoAway(, .code, .debugData); != nil {
return false,
}
.framer.writer.Flush()
if != nil {
return false,
}
return true, nil
}
.mu.Unlock()
.maxStreamMu.Unlock()
if := .framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, .debugData); != nil {
return false,
}
if := .framer.fr.WritePing(false, goAwayPing.data); != nil {
return false,
}
go func() {
:= time.NewTimer(5 * time.Second)
defer .Stop()
select {
case <-.drainEvent.Done():
case <-.C:
case <-.done:
return
}
.controlBuf.put(&goAway{code: .code, debugData: .debugData})
}()
return false, nil
}
func ( *http2Server) () *channelz.EphemeralSocketMetrics {
return &channelz.EphemeralSocketMetrics{
LocalFlowControlWindow: int64(.fc.getSize()),
RemoteFlowControlWindow: .getOutFlowWindow(),
}
}
func ( *http2Server) () {
if channelz.IsOn() {
.channelz.SocketMetrics.MessagesSent.Add(1)
.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
}
}
func ( *http2Server) () {
if channelz.IsOn() {
.channelz.SocketMetrics.MessagesReceived.Add(1)
.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
}
}
func ( *http2Server) () int64 {
:= make(chan uint32, 1)
:= time.NewTimer(time.Second)
defer .Stop()
.controlBuf.put(&outFlowControlSizeRequest{})
select {
case := <-:
return int64()
case <-.done:
return -1
case <-.C:
return -2
}
}
func ( *http2Server) () *peer.Peer {
return &peer.Peer{
Addr: .peer.Addr,
LocalAddr: .peer.LocalAddr,
AuthInfo: .peer.AuthInfo,
}
}
func ( time.Duration) time.Duration {
if == infinity {
return 0
}
:= int64( / 10)
:= rand.Int64N(2*) -
return time.Duration()
}
type connectionKey struct{}
func ( context.Context) net.Conn {
, := .Value(connectionKey{}).(net.Conn)
return
}
func ( context.Context, net.Conn) context.Context {
return context.WithValue(, connectionKey{}, )
}