Package-Level Type Names (total 54, in which 10 are exported)
/* sort exporteds by: | */
CallHdr carries the information of a particular RPC.
ContentSubtype specifies the content-subtype for a request. For example, a
content-subtype of "proto" will result in a content-type of
"application/grpc+proto". The value of ContentSubtype must be all
lowercase, otherwise the behavior is undefined. See
https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
for more details.
Creds specifies credentials.PerRPCCredentials for a call.
// called when the stream is finished
Host specifies the peer's host.
Method specifies the operation to perform.
// value of grpc-previous-rpc-attempts header to set
SendCompress specifies the compression algorithm applied on
outbound message.
func ClientTransport.NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
ClientTransport is the common interface for all gRPC client-side transport
implementations.
Close tears down this transport. Once it returns, the transport
should not be accessed any more. The caller must make sure this
is called only once.
CloseStream clears the footprint of a stream when the stream is
not needed any more. The err indicates the error incurred when
CloseStream is called. Must be called when a stream is finished
unless the associated transport is closing.
Error returns a channel that is closed when some I/O error
happens. Typically the caller should have a goroutine to monitor
this in order to take action (e.g., close the current transport
and create a new one) in error case. It should not return nil
once the transport is initiated.
GetGoAwayReason returns the reason why GoAway frame was received, along
with a human readable string with debug info.
GoAway returns a channel that is closed when ClientTransport
receives the draining signal from the server (e.g., GOAWAY frame in
HTTP/2).
GracefulClose starts to tear down the transport: the transport will stop
accepting new RPCs and NewStream will return error. Once all streams are
finished, the transport will close.
It does not block.
IncrMsgRecv increments the number of message received through this transport.
IncrMsgSent increments the number of message sent through this transport.
NewStream creates a Stream for an RPC.
RemoteAddr returns the remote network address.
Write sends the data for the given stream. A nil stream indicates
the write is to be performed on the transport as a whole.
*http2Client
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error)
func google.golang.org/grpc.(*ClientConn).getTransport(ctx context.Context, failfast bool, method string) (ClientTransport, balancer.PickResult, error)
func google.golang.org/grpc.newNonRetryClientStream(ctx context.Context, desc *grpc.StreamDesc, method string, t ClientTransport, ac *grpc.addrConn, opts ...grpc.CallOption) (_ grpc.ClientStream, err error)
ConnectionError is an error that results in the termination of the
entire connection and the retry of all the active streams.
Descstringerrerrortempbool( ConnectionError) Error() string
Origin returns the original error of this connection error.
Temporary indicates if this connection error is temporary or fatal.
Unwrap returns the original error of this connection error or nil when the
origin is nil.
ConnectionError : error
ConnectionError : net.temporary
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError
var ErrConnClosing
ConnectOptions covers all relevant options for communicating with the server.
ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
CredsBundle is the credentials bundle to be used. Only one of
TransportCredentials and CredsBundle is non-nil.
Dialer specifies how to dial a network address.
FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
InitialConnWindowSize sets the initial window size for a connection.
InitialWindowSize sets the initial window size for a stream.
KeepaliveParams stores the keepalive parameters.
MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
StatsHandlers stores the handler for stats.
TransportCredentials stores the Authenticator required to setup a client
connection. Only one of TransportCredentials and CredsBundle is non-nil.
UseProxy specifies if a proxy should be used.
UserAgent is the application user agent.
WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error)
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error)
NewStreamError wraps an error and reports additional information. Typically
NewStream errors result in transparent retry, as they mean nothing went onto
the wire. However, there are two notable exceptions:
1. If the stream headers violate the max header list size allowed by the
server. It's possible this could succeed on another transport, even if
it's unlikely, but do not transparently retry.
2. If the credentials errored when requesting their headers. In this case,
it's possible a retry can fix the problem, but indefinitely transparently
retrying is not appropriate as it is likely the credentials, if they can
eventually succeed, would need I/O to do so.
AllowTransparentRetryboolErrerror( NewStreamError) Error() string
NewStreamError : error
ServerTransport is the common interface for all gRPC server-side transport
implementations.
Methods may be called concurrently from multiple goroutines, but
Write methods for a given Stream will be called serially.
Close tears down the transport. Once it is called, the transport
should not be accessed any more. All the pending streams and their
handlers will be terminated asynchronously.
Drain notifies the client this ServerTransport stops accepting new RPCs.
HandleStreams receives incoming streams using the given handler.
IncrMsgRecv increments the number of message received through this transport.
IncrMsgSent increments the number of message sent through this transport.
RemoteAddr returns the remote network address.
Write sends the data for the given stream.
Write may not be called on all streams.
WriteHeader sends the header metadata for the given stream.
WriteHeader may not be called on all streams.
WriteStatus sends the status of a stream to the client. WriteStatus is
the final call made on a stream and always occurs.
*http2Server
*serverHandlerTransport
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error)
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error)
func google.golang.org/grpc.(*Server).newHTTP2Transport(c net.Conn) ServerTransport
func google.golang.org/grpc.(*Server).addConn(addr string, st ServerTransport) bool
func google.golang.org/grpc.(*Server).handleStream(t ServerTransport, stream *Stream, trInfo *grpc.traceInfo)
func google.golang.org/grpc.(*Server).processStreamingRPC(t ServerTransport, stream *Stream, info *grpc.serviceInfo, sd *grpc.StreamDesc, trInfo *grpc.traceInfo) (err error)
func google.golang.org/grpc.(*Server).processUnaryRPC(t ServerTransport, stream *Stream, info *grpc.serviceInfo, md *grpc.MethodDesc, trInfo *grpc.traceInfo) (err error)
func google.golang.org/grpc.(*Server).removeConn(addr string, st ServerTransport)
func google.golang.org/grpc.(*Server).sendResponse(t ServerTransport, stream *Stream, msg interface{}, cp grpc.Compressor, opts *Options, comp encoding.Compressor) error
func google.golang.org/grpc.(*Server).serveStreams(st ServerTransport)
func google.golang.org/grpc.(*Server).traceInfo(st ServerTransport, stream *Stream) (trInfo *grpc.traceInfo)
Stream represents an RPC in the transport layer.
buf*recvBuffer
// indicates whether any bytes have been received on this stream
// always nil for client side Stream
contentSubtype is the content-subtype for requests.
this must be lowercase or the behavior is undefined.
// nil for server side Stream
// the associated context of the stream
// same as done chan but for server side. Cache of ctx.Done() (for performance)
// closed at the end of stream to unblock writers. On the client side.
// invoked at the end of stream on client side.
fc*inFlow
hdrMu protects header and trailer metadata on the server-side.
On client side, header keeps the received header metadata.
On server side, header keeps the header set by SetHeader(). The complete
header will merged into this after t.WriteHeader() is called.
// closed to indicate the end of header metadata.
// set when headerChan is closed. Used to avoid closing headerChan multiple times.
On the server-side, headerSent is atomically set to 1 when the headers are sent out.
headerValid indicates whether a valid header was received. Only
meaningful after headerChan is closed (always call waitOnHeader() before
reading its value). Not valid on server side.
iduint32
// the associated RPC method of the stream
// set if the client never received headers (set only after the stream is done).
recvCompressstring
Callback to state application's intentions to read data. This
is used to adjust flow control, if needed.
sendCompressstring
// nil for client side Stream
statestreamState
On client-side it is the status error received from the server.
On server-side it is unused.
trReaderio.Reader
// the key-value map of trailer metadata.
// set if the server sends a refused stream or GOAWAY including this stream
wq*writeQuota
BytesReceived indicates whether any bytes have been received on this stream.
ContentSubtype returns the content-subtype for a request. For example, a
content-subtype of "proto" will result in a content-type of
"application/grpc+proto". This will always be lowercase. See
https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
more details.
Context returns the context of the stream.
Done returns a channel which is closed when it receives the final status
from the server.
GoString is implemented by Stream so context.String() won't
race when printing %#v.
Header returns the header metadata of the stream.
On client side, it acquires the key-value pairs of header metadata once it is
available. It blocks until i) the metadata is ready or ii) there is no header
metadata or iii) the stream is canceled/expired.
On server side, it returns the out header after t.WriteHeader is called. It
does not block and must not be called until after WriteHeader.
Method returns the method for the stream.
Read reads all p bytes from the wire for this stream.
RecvCompress returns the compression algorithm applied to the inbound
message. It is empty string if there is no compression applied.
SendHeader sends the given header metadata. The given metadata is
combined with any metadata set by previous calls to SetHeader and
then written to the transport stream.
SetHeader sets the header metadata. This can be called multiple times.
Server side only.
This should not be called in parallel to other data writes.
SetSendCompress sets the compression algorithm to the stream.
SetTrailer sets the trailer metadata which will be sent with the RPC status
by the server. This can be called multiple times. Server side only.
This should not be called parallel to other data writes.
Status returns the status received from the server.
Status can be read safely only after the stream has ended,
that is, after Done() is closed.
Trailer returns the cached trailer metedata. Note that if it is not called
after the entire stream is done, it could return an empty MD. Client
side only.
It can be safely read only after stream has ended that is either read
or write have returned io.EOF.
TrailersOnly blocks until a header or trailers-only frame is received and
then returns true if the stream was trailers-only. If the stream ends
before headers are received, returns true, nil. Client-side only.
Unprocessed indicates whether the server did not process this stream --
i.e. it sent a refused stream or GOAWAY including this stream ID.
(*Stream) compareAndSwapState(oldState, newState streamState) bool(*Stream) getState() streamState
isHeaderSent is only valid on the server-side.
(*Stream) swapState(st streamState) streamState
updateHeaderSent updates headerSent and returns true
if it was alreay set. It is valid only on server-side.
(*Stream) waitOnHeader()(*Stream) write(m recvMsg)
*Stream : google.golang.org/grpc.ServerTransportStream
*Stream : fmt.GoStringer
*Stream : io.Reader
func ClientTransport.NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
func ClientTransport.CloseStream(stream *Stream, err error)
func ClientTransport.Write(s *Stream, hdr []byte, data []byte, opts *Options) error
func ServerTransport.Write(s *Stream, hdr []byte, data []byte, opts *Options) error
func ServerTransport.WriteHeader(s *Stream, md metadata.MD) error
func ServerTransport.WriteStatus(s *Stream, st *status.Status) error
func google.golang.org/grpc.recv(p *grpc.parser, c grpc.baseCodec, s *Stream, dc grpc.Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *grpc.payloadInfo, compressor encoding.Compressor) error
func google.golang.org/grpc.recvAndDecompress(p *grpc.parser, s *Stream, dc grpc.Decompressor, maxReceiveMessageSize int, payInfo *grpc.payloadInfo, compressor encoding.Compressor) ([]byte, error)
func google.golang.org/grpc.(*Server).handleStream(t ServerTransport, stream *Stream, trInfo *grpc.traceInfo)
func google.golang.org/grpc.(*Server).processStreamingRPC(t ServerTransport, stream *Stream, info *grpc.serviceInfo, sd *grpc.StreamDesc, trInfo *grpc.traceInfo) (err error)
func google.golang.org/grpc.(*Server).processUnaryRPC(t ServerTransport, stream *Stream, info *grpc.serviceInfo, md *grpc.MethodDesc, trInfo *grpc.traceInfo) (err error)
func google.golang.org/grpc.(*Server).sendResponse(t ServerTransport, stream *Stream, msg interface{}, cp grpc.Compressor, opts *Options, comp encoding.Compressor) error
func google.golang.org/grpc.(*Server).traceInfo(st ServerTransport, stream *Stream) (trInfo *grpc.traceInfo)
bdp is the current bdp estimate.
bwMax is the maximum bandwidth noted so far (bytes/sec).
bool to keep track of the beginning of a new measurement cycle.
musync.Mutex
round trip time (seconds)
sample is the number of bytes received in one measurement cycle.
sampleCount is the number of samples taken so far.
sentAt is the time when the ping was sent.
Callback to update the window sizes.
add adds bytes to the current sample for calculating bdp.
It returns true only if a ping must be sent. This can be used
by the caller (handleData) to make decision about batching
a window update with it.
calculate is called when an ack for a bdp ping is received.
Here we calculate the current bdp and bandwidth sample and
decide if the flow control windows should go up.
timesnap registers the time bdp ping was sent out so that
network rtt can be calculated when its ack is received.
It is called (by controller) when the bdpPing is
being written on the wire.
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter
To read a response from a net.Conn, http.ReadResponse() takes a bufio.Reader.
It's possible that this reader reads more than what's need for the response and stores
those bytes in the buffer.
bufConn wraps the original net.Conn and the bufio.Reader to make sure we don't lose the
bytes in the buffer.
Connnet.Connrio.Reader
Close closes the connection.
Any blocked Read or Write operations will be unblocked and return errors.
LocalAddr returns the local network address, if known.
(*bufConn) Read(b []byte) (int, error)
RemoteAddr returns the remote network address, if known.
SetDeadline sets the read and write deadlines associated
with the connection. It is equivalent to calling both
SetReadDeadline and SetWriteDeadline.
A deadline is an absolute time after which I/O operations
fail instead of blocking. The deadline applies to all future
and pending I/O, not just the immediately following call to
Read or Write. After a deadline has been exceeded, the
connection can be refreshed by setting a deadline in the future.
If the deadline is exceeded a call to Read or Write or to other
I/O methods will return an error that wraps os.ErrDeadlineExceeded.
This can be tested using errors.Is(err, os.ErrDeadlineExceeded).
The error's Timeout method will return true, but note that there
are other possible errors for which the Timeout method will
return true even if the deadline has not been exceeded.
An idle timeout can be implemented by repeatedly extending
the deadline after successful Read or Write calls.
A zero value for t means I/O operations will not time out.
SetReadDeadline sets the deadline for future Read calls
and any currently-blocked Read call.
A zero value for t means Read will not time out.
SetWriteDeadline sets the deadline for future Write calls
and any currently-blocked Write call.
Even if write times out, it may return n > 0, indicating that
some of the data was successfully written.
A zero value for t means Write will not time out.
Write writes data to the connection.
Write can be made to time out and return an error after a fixed
time limit; see SetDeadline and SetWriteDeadline.
bufConn : io.Closer
*bufConn : io.ReadCloser
*bufConn : io.Reader
*bufConn : io.ReadWriteCloser
*bufConn : io.ReadWriter
bufConn : io.WriteCloser
bufConn : io.Writer
*bufConn : net.Conn
channelzData is used to store channelz related data for http2Client and http2Server.
These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
kpCountint64lastMsgRecvTimeint64lastMsgSentTimeint64
lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
instead of time.Time since it's more costly to atomically update time.Time variable than int64
variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
msgRecvint64msgSentint64streamsFailedint64
The number of streams that have started, including already finished ones.
Client side: The number of streams that have ended successfully by receiving
EoS bit set frame from server.
Server side: The number of streams that have ended successfully by sending
frame with EoS bit set.
closeConnection is an instruction to tell the loopy writer to flush the
framer and exit, which will cause the transport's connection to be closed
(by the client or server). The transport itself will close after the reader
encounters the EOF caused by the connection closure.
( closeConnection) isTransportResponseFrame() bool
closeConnection : cbItem
controlBuffer is a way to pass information to loopy.
Information is passed as specific struct types called control frames.
A control frame not only represents data, messages or headers to be sent out
but can also be used to instruct loopy to update its internal state.
It shouldn't be confused with an HTTP2 frame, although some of the control frames
like dataFrame and headerFrame do go out on wire as HTTP2 frames.
chchan struct{}consumerWaitingbooldone<-chan struct{}errerrorlist*itemListmusync.Mutex
transportResponseFrames counts the number of queued items that represent
the response of an action initiated by the peer. trfChan is created
when transportResponseFrames >= maxQueuedTransportResponseFrames and is
closed and nilled when transportResponseFrames drops below the
threshold. Both fields are protected by mu.
// chan struct{}
Note argument f should never be nil.
(*controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error)(*controlBuffer) finish()(*controlBuffer) get(block bool) (interface{}, error)(*controlBuffer) put(it cbItem) error
throttle blocks if there are too many incomingSettings/cleanupStreams in the
controlbuf.
func newControlBuffer(done <-chan struct{}) *controlBuffer
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter
headerFrame is also used to register stream on the client-side.
// Valid on the server side.
// Valid on server side.
hf[]hpack.HeaderField
// Used only on the client side.
// Valid on client-side
onWritefunc()streamIDuint32
// write quota for the stream created.
(*headerFrame) isTransportResponseFrame() bool
*headerFrame : cbItem
http2Client implements the ClientTransport interface with HTTP2.
activeStreamsmap[uint32]*Stream
address contains the resolver returned address for this transport.
If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
passed to `NewStream`, when determining the :authority header.
// auth info about the connection
bdpEst*bdpEstimatorbufferPool*bufferPoolcancelcontext.CancelFunc
Fields below are for channelz metric collection.
// underlying communication channel
connectionIDuint64
controlBuf delivers all the control related tasks (e.g., window
updates, reset streams, and various settings) to the controller.
Do not access controlBuf with mu held.
ctxcontext.Context
// Cache the ctx.Done() chan.
czData*channelzDatafc*trInFlowframer*framer
goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
that the server sent GoAway on this transport.
goAwayDebugMessage contains a detailed human readable string about a
GoAway frame, useful for error messages.
goAwayReason records the http2.ErrCode and debug data received with the
GoAway frame.
initialWindowSizeint32isSecureboolkeepaliveEnabledboolkpkeepalive.ClientParameters
A condition variable used to signal when the keepalive goroutine should
go dormant. The condition for dormancy is based on the number of active
streams and the `PermitWithoutStream` keepalive client parameter. And
since the number of active streams is guarded by the above mutex, we use
the same for this condition variable as well.
A boolean to track whether the keepalive goroutine is dormant or not.
This is checked before attempting to signal the above condition
variable.
// Keep this field 64-bit aligned. Accessed atomically.
localAddrnet.Addrloopy*loopyWritermaxConcurrentStreamsuint32
configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
mdmetadata.MD
Do not access controlBuf with mu held.
// guard the following variables
nextIDuint32onClosefunc(GoAwayReason)perRPCCreds[]credentials.PerRPCCredentials
prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
// sync point to enable testing.
registeredCompressorsstringremoteAddrnet.Addr
The scheme used: https if TLS is on, http otherwise.
statetransportStatestatsHandlers[]stats.HandlerstreamQuotaint64streamsQuotaAvailablechan struct{}userAgentstringwaitingStreamsuint32
// sync point to enable testing.
(*http2Client) ChannelzMetric() *channelz.SocketInternalMetric
Close kicks off the shutdown process of the transport. This should be called
only once on a transport. Once it is called, the transport should not be
accessed any more.
CloseStream clears the footprint of a stream when the stream is not needed any more.
This must not be executed in reader's goroutine.
(*http2Client) Error() <-chan struct{}(*http2Client) GetGoAwayReason() (GoAwayReason, string)(*http2Client) GoAway() <-chan struct{}
GracefulClose sets the state to draining, which prevents new streams from
being created and causes the transport to be closed when the last active
stream is closed. If there are no active streams, the transport is closed
immediately. This does nothing if the transport is already draining or
closing.
(*http2Client) IncrMsgRecv()(*http2Client) IncrMsgSent()
NewStream creates a stream and registers it into the transport as "active"
streams. All non-nil errors returned will be *NewStreamError.
(*http2Client) RemoteAddr() net.Addr
Write formats the data into HTTP2 data frame(s) and sends it out. The caller
should proceed only if Write returns nil.
adjustWindow sends out extra window update over the initial window size
of stream if the application is requesting data larger in size than
the window.
(*http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool)(*http2Client) createAudience(callHdr *CallHdr) string(*http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error)(*http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error)(*http2Client) getOutFlowWindow() int64(*http2Client) getPeer() *peer.Peer(*http2Client) getStream(f http2.Frame) *Stream(*http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error)(*http2Client) handleData(f *http2.DataFrame)(*http2Client) handleGoAway(f *http2.GoAwayFrame)(*http2Client) handlePing(f *http2.PingFrame)(*http2Client) handleRSTStream(f *http2.RSTStreamFrame)(*http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool)(*http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame)
keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
(*http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream
operateHeaders takes action on the decoded headers.
readServerPreface reads and handles the initial settings frame from the
server.
reader verifies the server preface and reads all subsequent data from
network connection. If the server preface is not read successfully, an
error is pushed to errCh; otherwise errCh is closed with no error.
setGoAwayReason sets the value of t.goAwayReason based
on the GoAway frame received.
It expects a lock on transport's mutext to be held by
the caller.
(*http2Client) stateForTesting() transportState
updateFlowControl updates the incoming flow control windows
for the transport and the stream based on the current bdp
estimation.
updateWindow adjusts the inbound quota for the stream.
Window updates will be sent out when the cumulative quota
exceeds the corresponding threshold.
*http2Client : ClientTransport
*http2Client : google.golang.org/grpc/internal/channelz.Socket
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error)
http2Server implements the ServerTransport interface with HTTP2.
activeStreamsmap[uint32]*Stream
// auth info about the connection
bdpEst*bdpEstimatorbufferPool*bufferPool
Fields below are for channelz metric collection.
connnet.ConnconnectionIDuint64
controlBuf delivers all the control related tasks (e.g., window
updates, reset streams, and various settings) to the controller.
ctxcontext.ContextczData*channelzDatadonechan struct{}
drainEvent is initialized when Drain() is called the first time. After
which the server writes out the first GoAway(with ID 2^31-1) frame. Then
an independent goroutine will be launched to later send the second
GoAway. During this time we don't want to write another first GoAway(with
ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
already initialized since draining is already underway.
fc*trInFlowframer*framer
idle is the time instant when the connection went idle.
This is either the beginning of the connection or when the number of
RPCs go down to 0.
When the connection is busy, this value is set to 0.
inTapHandletap.ServerInHandleinitialWindowSizeint32
Keepalive enforcement policy.
Keepalive and max-age parameters for the server.
The time instance last ping was received.
// Keep this field 64-bit aligned. Accessed atomically.
localAddrnet.Addrloopy*loopyWritermaxSendHeaderListSize*uint32
// max stream ID ever seen
maxStreamMu guards the maximum stream ID
This lock may not be taken if mu is already held.
The max number of concurrent streams.
// guard the following
Number of times the client has violated keepalive ping policy so far.
// sync point to enable testing.
remoteAddrnet.Addr
Flag to signify that number of ping strikes should be reset to 0.
This is set whenever data or header frames are sent.
1 means yes.
// Accessed atomically.
statetransportStatestats[]stats.Handler
// sync point to enable testing.
(*http2Server) ChannelzMetric() *channelz.SocketInternalMetric
Close starts shutting down the http2Server transport.
TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
could cause some resource issue. Revisit this later.
(*http2Server) Drain()
HandleStreams receives incoming streams using the given handler. This is
typically run in a separate goroutine.
traceCtx attaches trace to ctx and returns the new context.
(*http2Server) IncrMsgRecv()(*http2Server) IncrMsgSent()(*http2Server) RemoteAddr() net.Addr
Write converts the data into HTTP2 data frame and sends it out. Non-nil error
is returns if it fails (e.g., framing error, transport error).
WriteHeader sends the header metadata md back to the client.
WriteStatus sends stream status to the client and terminates the stream.
There is no further I/O operations being able to perform on this stream.
TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
OK is adopted.
adjustWindow sends out extra window update over the initial window size
of stream if the application is requesting data larger in size than
the window.
(*http2Server) checkForHeaderListSize(it interface{}) bool
closeStream clears the footprint of a stream when the stream is not needed any more.
deleteStream deletes the stream s from transport's active streams.
finishStream closes the stream and puts the trailing headerFrame into controlbuf.
(*http2Server) getOutFlowWindow() int64(*http2Server) getPeer() *peer.Peer(*http2Server) getStream(f http2.Frame) (*Stream, bool)(*http2Server) handleData(f *http2.DataFrame)(*http2Server) handlePing(f *http2.PingFrame)(*http2Server) handleRSTStream(f *http2.RSTStreamFrame)(*http2Server) handleSettings(f *http2.SettingsFrame)(*http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame)
keepalive running in a separate goroutine does the following:
1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
after an additional duration of keepalive.Timeout.
operateHeaders takes action on the decoded headers. Returns an error if fatal
error encountered and transport needs to close, otherwise returns nil.
Handles outgoing GoAway and returns true if loopy needs to put itself
in draining mode.
(*http2Server) setResetPingStrikes()(*http2Server) streamContextErr(s *Stream) error
updateFlowControl updates the incoming flow control windows
for the transport and the stream based on the current bdp
estimation.
updateWindow adjusts the inbound quota for the stream and the transport.
Window updates will deliver to the controller for sending when
the cumulative quota exceeds the corresponding threshold.
(*http2Server) writeHeaderLocked(s *Stream) error
*http2Server : ServerTransport
*http2Server : google.golang.org/grpc/internal/channelz.Socket
TODO(mmukhi): Simplify this code.
inFlow deals with inbound flow control
delta is the extra window update given by receiver when an application
is reading data bigger in size than the inFlow limit.
The inbound flow control limit for pending data.
musync.Mutex
pendingData is the overall data which have been received but not been
consumed by applications.
The amount of data the application has consumed but grpc has not sent
window update for them. Used to reduce window update frequency.
(*inFlow) maybeAdjust(n uint32) uint32
newLimit updates the inflow window to a new value n.
It assumes that n is always greater than the old limit.
onData is invoked when some data frame is received. It updates pendingData.
onRead is invoked when the application reads the data. It returns the window size
to be sent to the peer.
Loopy receives frames from the control buffer.
Each frame is handled individually; most of the work done by loopy goes
into handling data frames. Loopy maintains a queue of active streams, and each
stream maintains a queue of data frames; as loopy receives data frames
it gets added to the queue of the relevant stream.
Loopy goes over this list of active streams by processing one node every iteration,
thereby closely resemebling to a round-robin scheduling over all streams. While
processing a stream, loopy writes out data bytes from this stream capped by the min
of http2MaxFrameLen, connection-level flow control and stream-level flow control.
activeStreams is a linked-list of all streams that have data to send and some
stream-level flow control quota.
Each of these streams internally have a list of data items(and perhaps trailers
on the server-side) to be sent out.
bdpEst*bdpEstimatorcbuf*controlBufferdrainingbool
estdStreams is map of all established streams that are not cleaned-up yet.
On client-side, this is all streams whose headers were sent out.
On server-side, this is all streams whose headers were received.
// Established streams.
framer*framer
// The buffer for HPACK encoding.
// HPACK encoder.
// outbound initial window size.
sendQuotauint32sideside
Side-specific handlers
(*loopyWriter) applySettings(ss []http2.Setting) error(*loopyWriter) cleanupStreamHandler(c *cleanupStream) error(*loopyWriter) closeConnectionHandler() error(*loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error(*loopyWriter) goAwayHandler(g *goAway) error(*loopyWriter) handle(i interface{}) error(*loopyWriter) headerHandler(h *headerFrame) error(*loopyWriter) incomingGoAwayHandler(*incomingGoAway) error(*loopyWriter) incomingSettingsHandler(s *incomingSettings) error(*loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error(*loopyWriter) originateStream(str *outStream, hdr *headerFrame) error(*loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error(*loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error(*loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error(*loopyWriter) pingHandler(p *ping) error(*loopyWriter) preprocessData(df *dataFrame) error
processData removes the first stream from active streams, writes out at most 16KB
of its data and then puts it at the end of activeStreams if there's still more data
to be sent and stream has some stream-level flow control.
(*loopyWriter) registerStreamHandler(h *registerStream) error
run should be run in a separate goroutine.
It reads control frames from controlBuf and processes them by:
1. Updating loopy's internal state, or/and
2. Writing out HTTP2 frames on the wire.
Loopy keeps all active streams with data to send in a linked-list.
All streams in the activeStreams linked-list must have both:
1. Data to send, and
2. Stream level flow control quota available.
In each iteration of run loop, other than processing the incoming control
frame, loopy calls processData, which processes one node from the activeStreams linked-list.
This results in writing of HTTP2 frames into an underlying write buffer.
When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
As an optimization, to increase the batch size for each flush, loopy yields the processor, once
if the batch size is too low to give stream goroutines a chance to fill it up.
(*loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter
Following are sentinel objects that mark the
beginning and end of the list. They do not
contain any item lists. All valid objects are
inserted in between them.
This is needed so that an outStream object can
deleteSelf() in O(1) time without knowing which
list it belongs to.
tail*outStream
remove from the beginning of the list.
(*outStreamList) enqueue(s *outStream)
func newOutStreamList() *outStreamList
recvBuffer is an unbounded channel of recvMsg structs.
Note: recvBuffer differs from buffer.Unbounded only in the fact that it
holds a channel of recvMsg structs instead of objects implementing "item"
interface. recvBuffer is written to much more often and using strict recvMsg
structs helps avoid allocation in "recvBuffer.put"
backlog[]recvMsgcchan recvMsgerrerrormusync.Mutex
get returns the channel that receives a recvMsg in the buffer.
Upon receipt of a recvMsg, the caller should call load to send another
recvMsg onto the channel if there is any.
(*recvBuffer) load()(*recvBuffer) put(r recvMsg)
func newRecvBuffer() *recvBuffer
recvBufferReader implements io.Reader interface to read the data from
recvBuffer.
// Closes the client transport stream with the given error and nil trailer metadata.
ctxcontext.Context
// cache of ctx.Done() (for performance).
errerrorfreeBufferfunc(*bytes.Buffer)
// Stores the remaining data in the previous calls.
recv*recvBuffer
Read reads the next len(p) bytes from last. If last is drained, it tries to
read additional data from recv. It blocks if there no additional data available
in recv. If Read returns any non-nil error, it will continue to return that error.
(*recvBufferReader) read(p []byte) (n int, err error)(*recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error)(*recvBufferReader) readClient(p []byte) (n int, err error)
*recvBufferReader : io.Reader
recvMsg represents the received msg from the transport. All transport
protocol specific info has been removed.
buffer*bytes.Buffer
nil: received some data
io.EOF: stream is completed. data is nil.
other non-nil error: transport failure. data is nil.
func (*Stream).write(m recvMsg)
serverHandlerTransport is an implementation of ServerTransport
which replies to exactly one gRPC request (exactly one HTTP request),
using the net/http.Handler interface. This http.Handler is guaranteed
at this point to be speaking over HTTP/2, so it's able to speak valid
gRPC.
closeOncesync.Once
// closed on Close
we store both contentType and contentSubtype so we don't keep recreating them
TODO make sure this is consistent across handler_server and http2_server
we just mirror the request content-type
headerMDmetadata.MDreq*http.Requestrwhttp.ResponseWriterstats[]stats.Handlertimeouttime.DurationtimeoutSetbool
block concurrent WriteStatus calls
e.g. grpc/(*serverStream).SendMsg/RecvMsg
writes is a channel of code to run serialized in the
ServeHTTP (HandleStreams) goroutine. The channel is closed
when WriteStatus is called.
(*serverHandlerTransport) Close(err error)(*serverHandlerTransport) Drain()(*serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context)(*serverHandlerTransport) IncrMsgRecv()(*serverHandlerTransport) IncrMsgSent()(*serverHandlerTransport) RemoteAddr() net.Addr(*serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error(*serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error(*serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error
do runs fn in the ServeHTTP goroutine.
(*serverHandlerTransport) runStream()
writeCommonHeaders sets common headers on the first write
call (Write, WriteHeader, or WriteStatus).
writeCustomHeaders sets custom headers set on the stream via SetHeader
on the first write call (Write, WriteHeader, or WriteStatus).
writePendingHeaders sets common and custom headers on the first
write call (Write, WriteHeader, or WriteStatus)
*serverHandlerTransport : ServerTransport
tranportReader reads all the data available for this Stream from the transport and
passes them into the decoder, which converts them into a gRPC message stream.
The error is io.EOF when the stream is done or another non-nil error if
the stream broke.
ererrorreaderio.Reader
The handler to control the window update procedure for both this
particular stream and the associated transport.
(*transportReader) Read(p []byte) (n int, err error)
*transportReader : io.Reader
writeQuota is a soft limit on the amount of data a stream can
schedule before some of it is written out.
get waits on read from when quota goes less than or equal to zero.
replenish writes on it when quota goes positive again.
done is triggered in error case.
quotaint32
replenish is called by loopyWriter to give quota back to.
It is implemented as a field so that it can be updated
by tests.
(*writeQuota) get(sz int32) error(*writeQuota) realReplenish(n int)
func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota
Package-Level Functions (total 42, in which 5 are exported)
ContextErr converts the error from context package into a status error.
GetConnection gets the connection from the context.
NewClientTransport establishes the transport with the required ConnectOptions
and returns it to the caller.
NewServerHandlerTransport returns a ServerTransport handling gRPC from
inside an http.Handler, or writes an HTTP error to w and returns an error.
It requires that the http Server supports HTTP/2.
NewServerTransport creates a http2 transport with conn and configuration
options from config.
It returns a non-nil transport and a nil error on success. On failure, it
returns a nil transport and a non-nil error. For a special case where the
underlying conn gets closed before the client preface could be read, it
returns a nil transport and a nil error.
encodeGrpcMessage is used to encode status code in header field
"grpc-message". It does percent encoding and also replaces invalid utf-8
characters with Unicode replacement character.
It checks to see if each individual byte in msg is an allowable byte, and
then either percent encoding or passing it through. When percent encoding,
the byte is converted into hexadecimal notation with a '%' prepended.
isReservedHeader checks whether hdr belongs to HTTP2 headers
reserved by gRPC protocol. Any other headers are classified as the
user-specified metadata.
mapRecvMsgError returns the non-nil err into the appropriate
error value as expected by callers of *grpc.parser.recvMsg.
In particular, in can only be:
- io.EOF
- io.ErrUnexpectedEOF
- of type transport.ConnectionError
- an error from the status package
newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
and starts to receive messages on it. Non-nil error returns if construction
fails.
parseDialTarget returns the network and address to pass to dialer.
proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy
is necessary, dials, does the HTTP CONNECT handshake, and returns the
connection.
SetConnection adds the connection to the context to be able to get
information about the destination ip and port for an incoming RPC. This also
allows any unary or streaming interceptors to see the connection.
Package-Level Variables (total 18, in which 6 are exported)
ErrConnClosing indicates that the transport is closing.
ErrHeaderListSizeLimitViolation indicates that the header list size is larger
than the limit set by peer.
ErrIllegalHeaderWrite indicates that setting header is illegal because of
the stream's state.
ErrNoHeaders is used as a signal that a trailers only response was received,
and is not a real error.
HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table.
MaxStreamID is the upper bound for the stream ID before the current
transport gracefully closes and new transport is created for subsequent RPCs.
This is set to 75% of 2^31-1. Streams are identified with an unsigned 31-bit
integer. It's exported so that tests can override it.
Adding arbitrary data to ping so that its ack can be identified.
Easter-egg: what does the ping message say?
clientConnectionCounter counts the number of connections a client has
initiated (equal to the number of http2Clients created). Must be accessed
atomically.
errStreamDone is returned from write at the client side to indiacte application
layer of an error.
errStreamDrain indicates that the stream is rejected because the
connection is draining. This could be caused by goaway or balancer
removing the address.
alpha is a constant factor used to keep a moving average
of RTTs.
bdpLimit is the maximum value the flow control windows will be increased
to. TCP typically limits this to 4MB, but some systems go up to 16MB.
Since this is only a limit, it is safe to make it optimistic.
If the current bdp sample is greater than or equal to
our beta * our estimated bdp and the current bandwidth
sample is the maximum bandwidth observed so far, we
increase our bbp estimate by a factor of gamma.
To put our bdp to be smaller than or equal to twice the real BDP,
we should multiply our current sample with 4/3, however to round things out
we use 2 as the multiplication factor.
maxQueuedTransportResponseFrames is the most queued "transport response"
frames we will buffer before preventing new reads from occurring on the
transport. These are control frames sent in response to client requests,
such as RST_STREAM due to bad headers or settings acks.