Source File
transport.go
Belonging Package
google.golang.org/grpc/internal/transport
/*** Copyright 2014 gRPC authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.**/// Package transport defines and implements message oriented communication// channel to complete various transactions (e.g., an RPC). It is meant for// grpc-internal usage and is not intended to be imported directly by users.package transportimport ()const logLevel = 2// recvMsg represents the received msg from the transport. All transport// protocol specific info has been removed.type recvMsg struct {buffer mem.Buffer// nil: received some data// io.EOF: stream is completed. data is nil.// other non-nil error: transport failure. data is nil.err error}// recvBuffer is an unbounded channel of recvMsg structs.//// Note: recvBuffer differs from buffer.Unbounded only in the fact that it// holds a channel of recvMsg structs instead of objects implementing "item"// interface. recvBuffer is written to much more often and using strict recvMsg// structs helps avoid allocation in "recvBuffer.put"type recvBuffer struct {c chan recvMsgmu sync.Mutexbacklog []recvMsgerr error}func () *recvBuffer {:= &recvBuffer{c: make(chan recvMsg, 1),}return}func ( *recvBuffer) ( recvMsg) {.mu.Lock()if .err != nil {// drop the buffer on the floor. Since b.err is not nil, any subsequent reads// will always return an error, making this buffer inaccessible..buffer.Free().mu.Unlock()// An error had occurred earlier, don't accept more// data or errors.return}.err = .errif len(.backlog) == 0 {select {case .c <- :.mu.Unlock()returndefault:}}.backlog = append(.backlog, ).mu.Unlock()}func ( *recvBuffer) () {.mu.Lock()if len(.backlog) > 0 {select {case .c <- .backlog[0]:.backlog[0] = recvMsg{}.backlog = .backlog[1:]default:}}.mu.Unlock()}// get returns the channel that receives a recvMsg in the buffer.//// Upon receipt of a recvMsg, the caller should call load to send another// recvMsg onto the channel if there is any.func ( *recvBuffer) () <-chan recvMsg {return .c}// recvBufferReader implements io.Reader interface to read the data from// recvBuffer.type recvBufferReader struct {closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.ctx context.ContextctxDone <-chan struct{} // cache of ctx.Done() (for performance).recv *recvBufferlast mem.Buffer // Stores the remaining data in the previous calls.err error}func ( *recvBufferReader) ( []byte) ( int, error) {if .err != nil {return 0, .err}if .last != nil {, .last = mem.ReadUnsafe(, .last)return , nil}if .closeStream != nil {, .err = .readMessageHeaderClient()} else {, .err = .readMessageHeader()}return , .err}// Read reads the next n bytes from last. If last is drained, it tries to read// additional data from recv. It blocks if there no additional data available in// recv. If Read returns any non-nil error, it will continue to return that// error.func ( *recvBufferReader) ( int) ( mem.Buffer, error) {if .err != nil {return nil, .err}if .last != nil {= .lastif .last.Len() > {, .last = mem.SplitUnsafe(, )} else {.last = nil}return , nil}if .closeStream != nil {, .err = .readClient()} else {, .err = .read()}return , .err}func ( *recvBufferReader) ( []byte) ( int, error) {select {case <-.ctxDone:return 0, ContextErr(.ctx.Err())case := <-.recv.get():return .readMessageHeaderAdditional(, )}}func ( *recvBufferReader) ( int) ( mem.Buffer, error) {select {case <-.ctxDone:return nil, ContextErr(.ctx.Err())case := <-.recv.get():return .readAdditional(, )}}func ( *recvBufferReader) ( []byte) ( int, error) {// If the context is canceled, then closes the stream with nil metadata.// closeStream writes its error parameter to r.recv as a recvMsg.// r.readAdditional acts on that message and returns the necessary error.select {case <-.ctxDone:// Note that this adds the ctx error to the end of recv buffer, and// reads from the head. This will delay the error until recv buffer is// empty, thus will delay ctx cancellation in Recv().//// It's done this way to fix a race between ctx cancel and trailer. The// race was, stream.Recv() may return ctx error if ctxDone wins the// race, but stream.Trailer() may return a non-nil md because the stream// was not marked as done when trailer is received. This closeStream// call will mark stream as done, thus fix the race.//// TODO: delaying ctx error seems like a unnecessary side effect. What// we really want is to mark the stream as done, and return ctx error// faster..closeStream(ContextErr(.ctx.Err())):= <-.recv.get()return .readMessageHeaderAdditional(, )case := <-.recv.get():return .readMessageHeaderAdditional(, )}}func ( *recvBufferReader) ( int) ( mem.Buffer, error) {// If the context is canceled, then closes the stream with nil metadata.// closeStream writes its error parameter to r.recv as a recvMsg.// r.readAdditional acts on that message and returns the necessary error.select {case <-.ctxDone:// Note that this adds the ctx error to the end of recv buffer, and// reads from the head. This will delay the error until recv buffer is// empty, thus will delay ctx cancellation in Recv().//// It's done this way to fix a race between ctx cancel and trailer. The// race was, stream.Recv() may return ctx error if ctxDone wins the// race, but stream.Trailer() may return a non-nil md because the stream// was not marked as done when trailer is received. This closeStream// call will mark stream as done, thus fix the race.//// TODO: delaying ctx error seems like a unnecessary side effect. What// we really want is to mark the stream as done, and return ctx error// faster..closeStream(ContextErr(.ctx.Err())):= <-.recv.get()return .readAdditional(, )case := <-.recv.get():return .readAdditional(, )}}func ( *recvBufferReader) ( recvMsg, []byte) ( int, error) {.recv.load()if .err != nil {if .buffer != nil {.buffer.Free()}return 0, .err}, .last = mem.ReadUnsafe(, .buffer)return , nil}func ( *recvBufferReader) ( recvMsg, int) ( mem.Buffer, error) {.recv.load()if .err != nil {if .buffer != nil {.buffer.Free()}return nil, .err}if .buffer.Len() > {.buffer, .last = mem.SplitUnsafe(.buffer, )}return .buffer, nil}type streamState uint32const (streamActive streamState = iotastreamWriteDone // EndStream sentstreamReadDone // EndStream receivedstreamDone // the entire stream is finished.)// Stream represents an RPC in the transport layer.type Stream struct {id uint32ctx context.Context // the associated context of the streammethod string // the associated RPC method of the streamrecvCompress stringsendCompress stringbuf *recvBuffertrReader *transportReaderfc *inFlowwq *writeQuota// Callback to state application's intentions to read data. This// is used to adjust flow control, if needed.requestRead func(int)state streamState// contentSubtype is the content-subtype for requests.// this must be lowercase or the behavior is undefined.contentSubtype stringtrailer metadata.MD // the key-value map of trailer metadata.}func ( *Stream) ( streamState) streamState {return streamState(atomic.SwapUint32((*uint32)(&.state), uint32()))}func ( *Stream) (, streamState) bool {return atomic.CompareAndSwapUint32((*uint32)(&.state), uint32(), uint32())}func ( *Stream) () streamState {return streamState(atomic.LoadUint32((*uint32)(&.state)))}// Trailer returns the cached trailer metadata. Note that if it is not called// after the entire stream is done, it could return an empty MD.// It can be safely read only after stream has ended that is either read// or write have returned io.EOF.func ( *Stream) () metadata.MD {return .trailer.Copy()}// Context returns the context of the stream.func ( *Stream) () context.Context {return .ctx}// Method returns the method for the stream.func ( *Stream) () string {return .method}func ( *Stream) ( recvMsg) {.buf.put()}// ReadMessageHeader reads data into the provided header slice from the stream.// It first checks if there was an error during a previous read operation and// returns it if present. It then requests a read operation for the length of// the header. It continues to read from the stream until the entire header// slice is filled or an error occurs. If an `io.EOF` error is encountered with// partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an// unexpected end of the stream. The method returns any error encountered during// the read process or nil if the header was successfully read.func ( *Stream) ( []byte) ( error) {// Don't request a read if there was an error earlierif := .trReader.er; != nil {return}.requestRead(len())for len() != 0 {, := .trReader.ReadMessageHeader()= [:]if len() == 0 {= nil}if != nil {if > 0 && == io.EOF {= io.ErrUnexpectedEOF}return}}return nil}// Read reads n bytes from the wire for this stream.func ( *Stream) ( int) ( mem.BufferSlice, error) {// Don't request a read if there was an error earlierif := .trReader.er; != nil {return nil,}.requestRead()for != 0 {, := .trReader.Read()var intif != nil {= .Len()}-=if == 0 {= nil}if != nil {if > 0 && == io.EOF {= io.ErrUnexpectedEOF}.Free()return nil,}= append(, )}return , nil}// transportReader reads all the data available for this Stream from the transport and// passes them into the decoder, which converts them into a gRPC message stream.// The error is io.EOF when the stream is done or another non-nil error if// the stream broke.type transportReader struct {reader *recvBufferReader// The handler to control the window update procedure for both this// particular stream and the associated transport.windowHandler func(int)er error}func ( *transportReader) ( []byte) (int, error) {, := .reader.ReadMessageHeader()if != nil {.er =return 0,}.windowHandler()return , nil}func ( *transportReader) ( int) (mem.Buffer, error) {, := .reader.Read()if != nil {.er =return ,}.windowHandler(.Len())return , nil}// GoString is implemented by Stream so context.String() won't// race when printing %#v.func ( *Stream) () string {return fmt.Sprintf("<stream: %p, %v>", , .method)}// state of transporttype transportState intconst (reachable transportState = iotaclosingdraining)// ServerConfig consists of all the configurations to establish a server transport.type ServerConfig struct {MaxStreams uint32ConnectionTimeout time.DurationCredentials credentials.TransportCredentialsInTapHandle tap.ServerInHandleStatsHandlers []stats.HandlerKeepaliveParams keepalive.ServerParametersKeepalivePolicy keepalive.EnforcementPolicyInitialWindowSize int32InitialConnWindowSize int32WriteBufferSize intReadBufferSize intSharedWriteBuffer boolChannelzParent *channelz.ServerMaxHeaderListSize *uint32HeaderTableSize *uint32BufferPool mem.BufferPoolStaticWindowSize bool}// ConnectOptions covers all relevant options for communicating with the server.type ConnectOptions struct {// UserAgent is the application user agent.UserAgent string// Dialer specifies how to dial a network address.Dialer func(context.Context, string) (net.Conn, error)// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.FailOnNonTempDialError bool// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.PerRPCCredentials []credentials.PerRPCCredentials// TransportCredentials stores the Authenticator required to setup a client// connection. Only one of TransportCredentials and CredsBundle is non-nil.TransportCredentials credentials.TransportCredentials// CredsBundle is the credentials bundle to be used. Only one of// TransportCredentials and CredsBundle is non-nil.CredsBundle credentials.Bundle// KeepaliveParams stores the keepalive parameters.KeepaliveParams keepalive.ClientParameters// StatsHandlers stores the handler for stats.StatsHandlers []stats.Handler// InitialWindowSize sets the initial window size for a stream.InitialWindowSize int32// InitialConnWindowSize sets the initial window size for a connection.InitialConnWindowSize int32// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.WriteBufferSize int// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.ReadBufferSize int// SharedWriteBuffer indicates whether connections should reuse write bufferSharedWriteBuffer bool// ChannelzParent sets the addrConn id which initiated the creation of this client transport.ChannelzParent *channelz.SubChannel// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.MaxHeaderListSize *uint32// The mem.BufferPool to use when reading/writing to the wire.BufferPool mem.BufferPool// StaticWindowSize controls whether dynamic window sizing is enabled.StaticWindowSize bool}// WriteOptions provides additional hints and information for message// transmission.type WriteOptions struct {// Last indicates whether this write is the last piece for// this stream.Last bool}// CallHdr carries the information of a particular RPC.type CallHdr struct {// Host specifies the peer's host.Host string// Method specifies the operation to perform.Method string// SendCompress specifies the compression algorithm applied on// outbound message.SendCompress string// Creds specifies credentials.PerRPCCredentials for a call.Creds credentials.PerRPCCredentials// ContentSubtype specifies the content-subtype for a request. For example, a// content-subtype of "proto" will result in a content-type of// "application/grpc+proto". The value of ContentSubtype must be all// lowercase, otherwise the behavior is undefined. See// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests// for more details.ContentSubtype stringPreviousAttempts int // value of grpc-previous-rpc-attempts header to setDoneFunc func() // called when the stream is finished// Authority is used to explicitly override the `:authority` header. If set,// this value takes precedence over the Host field and will be used as the// value for the `:authority` header.Authority string}// ClientTransport is the common interface for all gRPC client-side transport// implementations.type ClientTransport interface {// Close tears down this transport. Once it returns, the transport// should not be accessed any more. The caller must make sure this// is called only once.Close(err error)// GracefulClose starts to tear down the transport: the transport will stop// accepting new RPCs and NewStream will return error. Once all streams are// finished, the transport will close.//// It does not block.GracefulClose()// NewStream creates a Stream for an RPC.NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)// Error returns a channel that is closed when some I/O error// happens. Typically the caller should have a goroutine to monitor// this in order to take action (e.g., close the current transport// and create a new one) in error case. It should not return nil// once the transport is initiated.Error() <-chan struct{}// GoAway returns a channel that is closed when ClientTransport// receives the draining signal from the server (e.g., GOAWAY frame in// HTTP/2).GoAway() <-chan struct{}// GetGoAwayReason returns the reason why GoAway frame was received, along// with a human readable string with debug info.GetGoAwayReason() (GoAwayReason, string)// RemoteAddr returns the remote network address.RemoteAddr() net.Addr}// ServerTransport is the common interface for all gRPC server-side transport// implementations.//// Methods may be called concurrently from multiple goroutines, but// Write methods for a given Stream will be called serially.type ServerTransport interface {// HandleStreams receives incoming streams using the given handler.HandleStreams(context.Context, func(*ServerStream))// Close tears down the transport. Once it is called, the transport// should not be accessed any more. All the pending streams and their// handlers will be terminated asynchronously.Close(err error)// Peer returns the peer of the server transport.Peer() *peer.Peer// Drain notifies the client this ServerTransport stops accepting new RPCs.Drain(debugData string)}type internalServerTransport interface {ServerTransportwriteHeader(s *ServerStream, md metadata.MD) errorwrite(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) errorwriteStatus(s *ServerStream, st *status.Status) errorincrMsgRecv()}// connectionErrorf creates an ConnectionError with the specified error description.func ( bool, error, string, ...any) ConnectionError {return ConnectionError{Desc: fmt.Sprintf(, ...),temp: ,err: ,}}// ConnectionError is an error that results in the termination of the// entire connection and the retry of all the active streams.type ConnectionError struct {Desc stringtemp boolerr error}func ( ConnectionError) () string {return fmt.Sprintf("connection error: desc = %q", .Desc)}// Temporary indicates if this connection error is temporary or fatal.func ( ConnectionError) () bool {return .temp}// Origin returns the original error of this connection error.func ( ConnectionError) () error {// Never return nil error here.// If the original error is nil, return itself.if .err == nil {return}return .err}// Unwrap returns the original error of this connection error or nil when the// origin is nil.func ( ConnectionError) () error {return .err}var (// ErrConnClosing indicates that the transport is closing.ErrConnClosing = connectionErrorf(true, nil, "transport is closing")// errStreamDrain indicates that the stream is rejected because the// connection is draining. This could be caused by goaway or balancer// removing the address.errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")// errStreamDone is returned from write at the client side to indicate application// layer of an error.errStreamDone = errors.New("the stream is done")// StatusGoAway indicates that the server sent a GOAWAY that included this// stream's ID in unprocessed RPCs.statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection"))// GoAwayReason contains the reason for the GoAway frame received.type GoAwayReason uint8const (// GoAwayInvalid indicates that no GoAway frame is received.GoAwayInvalid GoAwayReason = 0// GoAwayNoReason is the default value when GoAway frame is received.GoAwayNoReason GoAwayReason = 1// GoAwayTooManyPings indicates that a GoAway frame with// ErrCodeEnhanceYourCalm was received and that the debug data said// "too_many_pings".GoAwayTooManyPings GoAwayReason = 2)// ContextErr converts the error from context package into a status error.func ( error) error {switch {case context.DeadlineExceeded:return status.Error(codes.DeadlineExceeded, .Error())case context.Canceled:return status.Error(codes.Canceled, .Error())}return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", )}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)