package transport
import (
)
var (
ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)
var serverConnectionCounter uint64
type http2Server struct {
lastRead int64
ctx context.Context
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{}
writerDone chan struct{}
remoteAddr net.Addr
localAddr net.Addr
authInfo credentials.AuthInfo
inTapHandle tap.ServerInHandle
framer *framer
maxStreams uint32
controlBuf *controlBuffer
fc *trInFlow
stats []stats.Handler
kp keepalive.ServerParameters
kep keepalive.EnforcementPolicy
lastPingAt time.Time
pingStrikes uint8
resetPingStrikes uint32
initialWindowSize int32
bdpEst *bdpEstimator
maxSendHeaderListSize *uint32
mu sync.Mutex
drainEvent *grpcsync.Event
state transportState
activeStreams map[uint32]*Stream
idle time.Time
channelzID *channelz.Identifier
czData *channelzData
bufferPool *bufferPool
connectionID uint64
maxStreamMu sync.Mutex
maxStreamID uint32
}
func ( net.Conn, *ServerConfig) ( ServerTransport, error) {
var credentials.AuthInfo
:=
if .Credentials != nil {
var error
, , = .Credentials.ServerHandshake()
if != nil {
if == credentials.ErrConnDispatched || == io.EOF {
return nil,
}
return nil, connectionErrorf(false, , "ServerHandshake(%q) failed: %v", .RemoteAddr(), )
}
}
:= .WriteBufferSize
:= .ReadBufferSize
:= defaultServerMaxHeaderListSize
if .MaxHeaderListSize != nil {
= *.MaxHeaderListSize
}
:= newFramer(, , , )
:= []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
:= .MaxStreams
if == 0 {
= math.MaxUint32
} else {
= append(, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: ,
})
}
:= true
:= int32(initialWindowSize)
if .InitialWindowSize >= defaultWindowSize {
= .InitialWindowSize
= false
}
:= int32(initialWindowSize)
if .InitialConnWindowSize >= defaultWindowSize {
= .InitialConnWindowSize
= false
}
if != defaultWindowSize {
= append(, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32()})
}
if .MaxHeaderListSize != nil {
= append(, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *.MaxHeaderListSize,
})
}
if .HeaderTableSize != nil {
= append(, http2.Setting{
ID: http2.SettingHeaderTableSize,
Val: *.HeaderTableSize,
})
}
if := .fr.WriteSettings(...); != nil {
return nil, connectionErrorf(false, , "transport: %v", )
}
if := uint32( - defaultWindowSize); > 0 {
if := .fr.WriteWindowUpdate(0, ); != nil {
return nil, connectionErrorf(false, , "transport: %v", )
}
}
:= .KeepaliveParams
if .MaxConnectionIdle == 0 {
.MaxConnectionIdle = defaultMaxConnectionIdle
}
if .MaxConnectionAge == 0 {
.MaxConnectionAge = defaultMaxConnectionAge
}
.MaxConnectionAge += getJitter(.MaxConnectionAge)
if .MaxConnectionAgeGrace == 0 {
.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
if .Time == 0 {
.Time = defaultServerKeepaliveTime
}
if .Timeout == 0 {
.Timeout = defaultServerKeepaliveTimeout
}
if .Time != infinity {
if = syscall.SetTCPUserTimeout(, .Timeout); != nil {
return nil, connectionErrorf(false, , "transport: failed to set TCP_USER_TIMEOUT: %v", )
}
}
:= .KeepalivePolicy
if .MinTime == 0 {
.MinTime = defaultKeepalivePolicyMinTime
}
:= make(chan struct{})
:= &http2Server{
ctx: setConnection(context.Background(), ),
done: ,
conn: ,
remoteAddr: .RemoteAddr(),
localAddr: .LocalAddr(),
authInfo: ,
framer: ,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
maxStreams: ,
inTapHandle: .InTapHandle,
fc: &trInFlow{limit: uint32()},
state: reachable,
activeStreams: make(map[uint32]*Stream),
stats: .StatsHandlers,
kp: ,
idle: time.Now(),
kep: ,
initialWindowSize: ,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
.ctx = peer.NewContext(.ctx, .getPeer())
.controlBuf = newControlBuffer(.done)
if {
.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: .updateFlowControl,
}
}
for , := range .stats {
.ctx = .TagConn(.ctx, &stats.ConnTagInfo{
RemoteAddr: .remoteAddr,
LocalAddr: .localAddr,
})
:= &stats.ConnBegin{}
.HandleConn(.ctx, )
}
.channelzID, = channelz.RegisterNormalSocket(, .ChannelzParentID, fmt.Sprintf("%s -> %s", .remoteAddr, .localAddr))
if != nil {
return nil,
}
.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
.framer.writer.Flush()
defer func() {
if != nil {
.Close()
}
}()
:= make([]byte, len(clientPreface))
if , := io.ReadFull(.conn, ); != nil {
if == io.EOF {
return nil, io.EOF
}
return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to receive the preface from client: %v", )
}
if !bytes.Equal(, clientPreface) {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", )
}
, := .framer.fr.ReadFrame()
if == io.EOF || == io.ErrUnexpectedEOF {
return nil,
}
if != nil {
return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to read initial settings frame: %v", )
}
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
, := .(*http2.SettingsFrame)
if ! {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", )
}
.handleSettings()
go func() {
.loopy = newLoopyWriter(serverSide, .framer, .controlBuf, .bdpEst)
.loopy.ssGoAwayHandler = .outgoingGoAwayHandler
:= .loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", )
}
.conn.Close()
.controlBuf.finish()
close(.writerDone)
}()
go .keepalive()
return , nil
}
func ( *http2Server) ( *http2.MetaHeadersFrame, func(*Stream), func(context.Context, string) context.Context) error {
.maxStreamMu.Lock()
defer .maxStreamMu.Unlock()
:= .Header().StreamID
if .Truncated {
.controlBuf.put(&cleanupStream{
streamID: ,
rst: true,
rstCode: http2.ErrCodeFrameSize,
onWrite: func() {},
})
return nil
}
if %2 != 1 || <= .maxStreamID {
return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", , )
}
.maxStreamID =
:= newRecvBuffer()
:= &Stream{
id: ,
st: ,
buf: ,
fc: &inFlow{limit: uint32(.initialWindowSize)},
}
var (
= false
= ""
= make(map[string][]string)
string
bool
*status.Status
bool
time.Duration
)
for , := range .Fields {
switch .Name {
case "content-type":
, := grpcutil.ContentSubtype(.Value)
if ! {
= .Value
break
}
[.Name] = append([.Name], .Value)
.contentSubtype =
= true
case "grpc-encoding":
.recvCompress = .Value
case ":method":
= .Value
case ":path":
.method = .Value
case "grpc-timeout":
= true
var error
if , = decodeTimeout(.Value); != nil {
= status.Newf(codes.Internal, "malformed grpc-timeout: %v", )
}
case "connection":
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
}
= true
default:
if isReservedHeader(.Name) && !isWhitelistedHeader(.Name) {
break
}
, := decodeMetadataHeader(.Name, .Value)
if != nil {
= status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", .Value, .Name, )
logger.Warningf("Failed to decode metadata header (%q, %q): %v", .Name, .Value, )
break
}
[.Name] = append([.Name], )
}
}
if len([":authority"]) > 1 || len(["host"]) > 1 {
:= fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len([":authority"]), len(["host"]))
if logger.V(logLevel) {
logger.Errorf("transport: %v", )
}
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: ,
contentSubtype: .contentSubtype,
status: status.New(codes.Internal, ),
rst: !.StreamEnded(),
})
return nil
}
if {
.controlBuf.put(&cleanupStream{
streamID: ,
rst: true,
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
return nil
}
if ! {
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusUnsupportedMediaType,
streamID: ,
contentSubtype: .contentSubtype,
status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", ),
rst: !.StreamEnded(),
})
return nil
}
if != nil {
.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: ,
contentSubtype: .contentSubtype,
status: ,
rst: !.StreamEnded(),
})
return nil
}
if len([":authority"]) == 0 {
if , := ["host"]; {
[":authority"] =
delete(, "host")
}
} else {
delete(, "host")
}
if .StreamEnded() {
.state = streamReadDone
}
if {
.ctx, .cancel = context.WithTimeout(.ctx, )
} else {
.ctx, .cancel = context.WithCancel(.ctx)
}
if len() > 0 {
.ctx = metadata.NewIncomingContext(.ctx, )
if := ["grpc-tags-bin"]; len() > 0 {
.ctx = stats.SetIncomingTags(.ctx, []byte([len()-1]))
}
if := ["grpc-trace-bin"]; len() > 0 {
.ctx = stats.SetIncomingTrace(.ctx, []byte([len()-1]))
}
}
.mu.Lock()
if .state != reachable {
.mu.Unlock()
.cancel()
return nil
}
if uint32(len(.activeStreams)) >= .maxStreams {
.mu.Unlock()
.controlBuf.put(&cleanupStream{
streamID: ,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
.cancel()
return nil
}
if != http.MethodPost {
.mu.Unlock()
:= fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", )
if logger.V(logLevel) {
logger.Infof("transport: %v", )
}
.controlBuf.put(&earlyAbortStream{
httpStatus: 405,
streamID: ,
contentSubtype: .contentSubtype,
status: status.New(codes.Internal, ),
rst: !.StreamEnded(),
})
.cancel()
return nil
}
if .inTapHandle != nil {
var error
if .ctx, = .inTapHandle(.ctx, &tap.Info{FullMethodName: .method}); != nil {
.mu.Unlock()
if logger.V(logLevel) {
logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", )
}
, := status.FromError()
if ! {
= status.New(codes.PermissionDenied, .Error())
}
.controlBuf.put(&earlyAbortStream{
httpStatus: 200,
streamID: .id,
contentSubtype: .contentSubtype,
status: ,
rst: !.StreamEnded(),
})
return nil
}
}
.activeStreams[] =
if len(.activeStreams) == 1 {
.idle = time.Time{}
}
.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&.czData.streamsStarted, 1)
atomic.StoreInt64(&.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
.requestRead = func( int) {
.adjustWindow(, uint32())
}
.ctx = (.ctx, .method)
for , := range .stats {
.ctx = .TagRPC(.ctx, &stats.RPCTagInfo{FullMethodName: .method})
:= &stats.InHeader{
FullMethod: .method,
RemoteAddr: .remoteAddr,
LocalAddr: .localAddr,
Compression: .recvCompress,
WireLength: int(.Header().Length),
Header: metadata.MD().Copy(),
}
.HandleRPC(.ctx, )
}
.ctxDone = .ctx.Done()
.wq = newWriteQuota(defaultWriteQuota, .ctxDone)
.trReader = &transportReader{
reader: &recvBufferReader{
ctx: .ctx,
ctxDone: .ctxDone,
recv: .buf,
freeBuffer: .bufferPool.put,
},
windowHandler: func( int) {
.updateWindow(, uint32())
},
}
.controlBuf.put(®isterStream{
streamID: .id,
wq: .wq,
})
()
return nil
}
func ( *http2Server) ( func(*Stream), func(context.Context, string) context.Context) {
defer close(.readerDone)
for {
.controlBuf.throttle()
, := .framer.fr.ReadFrame()
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
if != nil {
if , := .(http2.StreamError); {
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", )
}
.mu.Lock()
:= .activeStreams[.StreamID]
.mu.Unlock()
if != nil {
.closeStream(, true, .Code, false)
} else {
.controlBuf.put(&cleanupStream{
streamID: .StreamID,
rst: true,
rstCode: .Code,
onWrite: func() {},
})
}
continue
}
if == io.EOF || == io.ErrUnexpectedEOF {
.Close()
return
}
.Close()
return
}
switch frame := .(type) {
case *http2.MetaHeadersFrame:
if := .operateHeaders(, , ); != nil {
.Close()
break
}
case *http2.DataFrame:
.handleData()
case *http2.RSTStreamFrame:
.handleRSTStream()
case *http2.SettingsFrame:
.handleSettings()
case *http2.PingFrame:
.handlePing()
case *http2.WindowUpdateFrame:
.handleWindowUpdate()
case *http2.GoAwayFrame:
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", )
}
}
}
}
func ( *http2Server) ( http2.Frame) (*Stream, bool) {
.mu.Lock()
defer .mu.Unlock()
if .activeStreams == nil {
return nil, false
}
, := .activeStreams[.Header().StreamID]
if ! {
return nil, false
}
return , true
}
func ( *http2Server) ( *Stream, uint32) {
if := .fc.maybeAdjust(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
}
}
func ( *http2Server) ( *Stream, uint32) {
if := .fc.onRead(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id,
increment: ,
})
}
}
func ( *http2Server) ( uint32) {
.mu.Lock()
for , := range .activeStreams {
.fc.newLimit()
}
.initialWindowSize = int32()
.mu.Unlock()
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: .fc.newLimit(),
})
.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
Val: ,
},
},
})
}
func ( *http2Server) ( *http2.DataFrame) {
:= .Header().Length
var bool
if .bdpEst != nil {
= .bdpEst.add()
}
if := .fc.onData(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
if {
if := .fc.reset(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
.controlBuf.put(bdpPing)
}
, := .getStream()
if ! {
return
}
if .getState() == streamReadDone {
.closeStream(, true, http2.ErrCodeStreamClosed, false)
return
}
if > 0 {
if := .fc.onData(); != nil {
.closeStream(, true, http2.ErrCodeFlowControl, false)
return
}
if .Header().Flags.Has(http2.FlagDataPadded) {
if := .fc.onRead( - uint32(len(.Data()))); > 0 {
.controlBuf.put(&outgoingWindowUpdate{.id, })
}
}
if len(.Data()) > 0 {
:= .bufferPool.get()
.Reset()
.Write(.Data())
.write(recvMsg{buffer: })
}
}
if .StreamEnded() {
.compareAndSwapState(streamActive, streamReadDone)
.write(recvMsg{err: io.EOF})
}
}
func ( *http2Server) ( *http2.RSTStreamFrame) {
if , := .getStream(); {
.closeStream(, false, 0, false)
return
}
.controlBuf.put(&cleanupStream{
streamID: .Header().StreamID,
rst: false,
rstCode: 0,
onWrite: func() {},
})
}
func ( *http2Server) ( *http2.SettingsFrame) {
if .IsAck() {
return
}
var []http2.Setting
var []func()
.ForeachSetting(func( http2.Setting) error {
switch .ID {
case http2.SettingMaxHeaderListSize:
= append(, func() {
.maxSendHeaderListSize = new(uint32)
*.maxSendHeaderListSize = .Val
})
default:
= append(, )
}
return nil
})
.controlBuf.executeAndPut(func(interface{}) bool {
for , := range {
()
}
return true
}, &incomingSettings{
ss: ,
})
}
const (
maxPingStrikes = 2
defaultPingTimeout = 2 * time.Hour
)
func ( *http2Server) ( *http2.PingFrame) {
if .IsAck() {
if .Data == goAwayPing.data && .drainEvent != nil {
.drainEvent.Fire()
return
}
if .bdpEst != nil {
.bdpEst.calculate(.Data)
}
return
}
:= &ping{ack: true}
copy(.data[:], .Data[:])
.controlBuf.put()
:= time.Now()
defer func() {
.lastPingAt =
}()
if atomic.CompareAndSwapUint32(&.resetPingStrikes, 1, 0) {
.pingStrikes = 0
return
}
.mu.Lock()
:= len(.activeStreams)
.mu.Unlock()
if < 1 && !.kep.PermitWithoutStream {
if .lastPingAt.Add(defaultPingTimeout).After() {
.pingStrikes++
}
} else {
if .lastPingAt.Add(.kep.MinTime).After() {
.pingStrikes++
}
}
if .pingStrikes > maxPingStrikes {
.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
}
}
func ( *http2Server) ( *http2.WindowUpdateFrame) {
.controlBuf.put(&incomingWindowUpdate{
streamID: .Header().StreamID,
increment: .Increment,
})
}
func ( []hpack.HeaderField, metadata.MD) []hpack.HeaderField {
for , := range {
if isReservedHeader() {
continue
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
return
}
func ( *http2Server) ( interface{}) bool {
if .maxSendHeaderListSize == nil {
return true
}
:= .(*headerFrame)
var int64
for , := range .hf {
if += int64(.Size()); > int64(*.maxSendHeaderListSize) {
if logger.V(logLevel) {
logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *.maxSendHeaderListSize)
}
return false
}
}
return true
}
func ( *http2Server) ( *Stream) error {
select {
case <-.done:
return ErrConnClosing
default:
}
return ContextErr(.ctx.Err())
}
func ( *http2Server) ( *Stream, metadata.MD) error {
.hdrMu.Lock()
defer .hdrMu.Unlock()
if .getState() == streamDone {
return .streamContextErr()
}
if .updateHeaderSent() {
return ErrIllegalHeaderWrite
}
if .Len() > 0 {
if .header.Len() > 0 {
.header = metadata.Join(.header, )
} else {
.header =
}
}
if := .writeHeaderLocked(); != nil {
return status.Convert().Err()
}
return nil
}
func ( *http2Server) () {
atomic.StoreUint32(&.resetPingStrikes, 1)
}
func ( *http2Server) ( *Stream) error {
:= make([]hpack.HeaderField, 0, 2)
= append(, hpack.HeaderField{Name: ":status", Value: "200"})
= append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
if .sendCompress != "" {
= append(, hpack.HeaderField{Name: "grpc-encoding", Value: .sendCompress})
}
= appendHeaderFieldsFromMD(, .header)
, := .controlBuf.executeAndPut(.checkForHeaderListSize, &headerFrame{
streamID: .id,
hf: ,
endStream: false,
onWrite: .setResetPingStrikes,
})
if ! {
if != nil {
return
}
.closeStream(, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
for , := range .stats {
:= &stats.OutHeader{
Header: .header.Copy(),
Compression: .sendCompress,
}
.HandleRPC(.Context(), )
}
return nil
}
func ( *http2Server) ( *Stream, *status.Status) error {
.hdrMu.Lock()
defer .hdrMu.Unlock()
if .getState() == streamDone {
return nil
}
:= make([]hpack.HeaderField, 0, 2)
if !.updateHeaderSent() {
if len(.header) > 0 {
if := .writeHeaderLocked(); != nil {
return
}
} else {
= append(, hpack.HeaderField{Name: ":status", Value: "200"})
= append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
}
}
= append(, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(.Code()))})
= append(, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(.Message())})
if := .Proto(); != nil && len(.Details) > 0 {
, := proto.Marshal()
if != nil {
logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", , )
} else {
= append(, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader()})
}
}
= appendHeaderFieldsFromMD(, .trailer)
:= &headerFrame{
streamID: .id,
hf: ,
endStream: true,
onWrite: .setResetPingStrikes,
}
, := .controlBuf.execute(.checkForHeaderListSize, )
if ! {
if != nil {
return
}
.closeStream(, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
:= .getState() == streamActive
.finishStream(, , http2.ErrCodeNo, , true)
for , := range .stats {
.HandleRPC(.Context(), &stats.OutTrailer{
Trailer: .trailer.Copy(),
})
}
return nil
}
func ( *http2Server) ( *Stream, []byte, []byte, *Options) error {
if !.isHeaderSent() {
if := .WriteHeader(, nil); != nil {
return
}
} else {
if .getState() == streamDone {
return .streamContextErr()
}
}
:= &dataFrame{
streamID: .id,
h: ,
d: ,
onEachWrite: .setResetPingStrikes,
}
if := .wq.get(int32(len() + len())); != nil {
return .streamContextErr()
}
return .controlBuf.put()
}
func ( *http2Server) () {
:= &ping{}
:= false
:= time.Duration(0)
:= time.Now().UnixNano()
:= time.NewTimer(.kp.MaxConnectionIdle)
:= time.NewTimer(.kp.MaxConnectionAge)
:= time.NewTimer(.kp.Time)
defer func() {
.Stop()
.Stop()
.Stop()
}()
for {
select {
case <-.C:
.mu.Lock()
:= .idle
if .IsZero() {
.mu.Unlock()
.Reset(.kp.MaxConnectionIdle)
continue
}
:= .kp.MaxConnectionIdle - time.Since()
.mu.Unlock()
if <= 0 {
.Drain()
return
}
.Reset()
case <-.C:
.Drain()
.Reset(.kp.MaxConnectionAgeGrace)
select {
case <-.C:
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to maximum connection age.")
}
.controlBuf.put(closeConnection{})
case <-.done:
}
return
case <-.C:
:= atomic.LoadInt64(&.lastRead)
if > {
= false
.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
=
continue
}
if && <= 0 {
.Close(fmt.Errorf("keepalive ping not acked within timeout %s", .kp.Time))
return
}
if ! {
if channelz.IsOn() {
atomic.AddInt64(&.czData.kpCount, 1)
}
.controlBuf.put()
= .kp.Timeout
= true
}
:= minTime(.kp.Time, )
-=
.Reset()
case <-.done:
return
}
}
}
func ( *http2Server) ( error) {
.mu.Lock()
if .state == closing {
.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", )
}
.state = closing
:= .activeStreams
.activeStreams = nil
.mu.Unlock()
.controlBuf.finish()
close(.done)
if := .conn.Close(); != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", )
}
channelz.RemoveEntry(.channelzID)
for , := range {
.cancel()
}
for , := range .stats {
:= &stats.ConnEnd{}
.HandleConn(.ctx, )
}
}
func ( *http2Server) ( *Stream, bool) {
.mu.Lock()
if , := .activeStreams[.id]; {
delete(.activeStreams, .id)
if len(.activeStreams) == 0 {
.idle = time.Now()
}
}
.mu.Unlock()
if channelz.IsOn() {
if {
atomic.AddInt64(&.czData.streamsSucceeded, 1)
} else {
atomic.AddInt64(&.czData.streamsFailed, 1)
}
}
}
func ( *http2Server) ( *Stream, bool, http2.ErrCode, *headerFrame, bool) {
.cancel()
:= .swapState(streamDone)
if == streamDone {
return
}
.cleanup = &cleanupStream{
streamID: .id,
rst: ,
rstCode: ,
onWrite: func() {
.deleteStream(, )
},
}
.controlBuf.put()
}
func ( *http2Server) ( *Stream, bool, http2.ErrCode, bool) {
.cancel()
.swapState(streamDone)
.deleteStream(, )
.controlBuf.put(&cleanupStream{
streamID: .id,
rst: ,
rstCode: ,
onWrite: func() {},
})
}
func ( *http2Server) () net.Addr {
return .remoteAddr
}
func ( *http2Server) () {
.mu.Lock()
defer .mu.Unlock()
if .drainEvent != nil {
return
}
.drainEvent = grpcsync.NewEvent()
.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
}
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
func ( *http2Server) ( *goAway) (bool, error) {
.maxStreamMu.Lock()
.mu.Lock()
if .state == closing {
.mu.Unlock()
.maxStreamMu.Unlock()
return false, ErrConnClosing
}
if !.headsUp {
.state = draining
:= .maxStreamID
:= .closeConn
if len(.activeStreams) == 0 {
= errors.New("second GOAWAY written and no active streams left to process")
}
.mu.Unlock()
.maxStreamMu.Unlock()
if := .framer.fr.WriteGoAway(, .code, .debugData); != nil {
return false,
}
if != nil {
.framer.writer.Flush()
return false,
}
return true, nil
}
.mu.Unlock()
.maxStreamMu.Unlock()
if := .framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); != nil {
return false,
}
if := .framer.fr.WritePing(false, goAwayPing.data); != nil {
return false,
}
go func() {
:= time.NewTimer(time.Minute)
defer .Stop()
select {
case <-.drainEvent.Done():
case <-.C:
case <-.done:
return
}
.controlBuf.put(&goAway{code: .code, debugData: .debugData})
}()
return false, nil
}
func ( *http2Server) () *channelz.SocketInternalMetric {
:= channelz.SocketInternalMetric{
StreamsStarted: atomic.LoadInt64(&.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&.czData.kpCount),
LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(.fc.getSize()),
SocketOptions: channelz.GetSocketOption(.conn),
LocalAddr: .localAddr,
RemoteAddr: .remoteAddr,
}
if , := .authInfo.(credentials.ChannelzSecurityInfo); {
.Security = .GetSecurityValue()
}
.RemoteFlowControlWindow = .getOutFlowWindow()
return &
}
func ( *http2Server) () {
atomic.AddInt64(&.czData.msgSent, 1)
atomic.StoreInt64(&.czData.lastMsgSentTime, time.Now().UnixNano())
}
func ( *http2Server) () {
atomic.AddInt64(&.czData.msgRecv, 1)
atomic.StoreInt64(&.czData.lastMsgRecvTime, time.Now().UnixNano())
}
func ( *http2Server) () int64 {
:= make(chan uint32, 1)
:= time.NewTimer(time.Second)
defer .Stop()
.controlBuf.put(&outFlowControlSizeRequest{})
select {
case := <-:
return int64()
case <-.done:
return -1
case <-.C:
return -2
}
}
func ( *http2Server) () *peer.Peer {
return &peer.Peer{
Addr: .remoteAddr,
AuthInfo: .authInfo,
}
}
func ( time.Duration) time.Duration {
if == infinity {
return 0
}
:= int64( / 10)
:= grpcrand.Int63n(2*) -
return time.Duration()
}
type connectionKey struct{}
func ( context.Context) net.Conn {
, := .Value(connectionKey{}).(net.Conn)
return
}
func ( context.Context, net.Conn) context.Context {
return context.WithValue(, connectionKey{}, )
}