Source File
	client_stream.go
Belonging Package
	google.golang.org/grpc/internal/transport
/*** Copyright 2024 gRPC authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.**/package transportimport ()// ClientStream implements streaming functionality for a gRPC client.type ClientStream struct {*Stream // Embed for common stream functionality.ct *http2Clientdone chan struct{} // closed at the end of stream to unblock writers.doneFunc func() // invoked at the end of stream.headerChan chan struct{} // closed to indicate the end of header metadata.headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.// headerValid indicates whether a valid header was received. Only// meaningful after headerChan is closed (always call waitOnHeader() before// reading its value).headerValid boolheader metadata.MD // the received header metadatanoHeaders bool // set if the client never received headers (set only after the stream is done).bytesReceived atomic.Bool // indicates whether any bytes have been received on this streamunprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this streamstatus *status.Status // the status error received from the server}// Read reads an n byte message from the input stream.func ( *ClientStream) ( int) (mem.BufferSlice, error) {, := .Stream.read()if == nil {.ct.incrMsgRecv()}return ,}// Close closes the stream and propagates err to any readers.func ( *ClientStream) ( error) {var (boolhttp2.ErrCode)if != nil {= true= http2.ErrCodeCancel}.ct.closeStream(, , , , status.Convert(), nil, false)}// Write writes the hdr and data bytes to the output stream.func ( *ClientStream) ( []byte, mem.BufferSlice, *WriteOptions) error {return .ct.write(, , , )}// BytesReceived indicates whether any bytes have been received on this stream.func ( *ClientStream) () bool {return .bytesReceived.Load()}// Unprocessed indicates whether the server did not process this stream --// i.e. it sent a refused stream or GOAWAY including this stream ID.func ( *ClientStream) () bool {return .unprocessed.Load()}func ( *ClientStream) () {select {case <-.ctx.Done():// Close the stream to prevent headers/trailers from changing after// this function returns..Close(ContextErr(.ctx.Err()))// headerChan could possibly not be closed yet if closeStream raced// with operateHeaders; wait until it is closed explicitly here.<-.headerChancase <-.headerChan:}}// RecvCompress returns the compression algorithm applied to the inbound// message. It is empty string if there is no compression applied.func ( *ClientStream) () string {.waitOnHeader()return .recvCompress}// Done returns a channel which is closed when it receives the final status// from the server.func ( *ClientStream) () <-chan struct{} {return .done}// Header returns the header metadata of the stream. Acquires the key-value// pairs of header metadata once it is available. It blocks until i) the// metadata is ready or ii) there is no header metadata or iii) the stream is// canceled/expired.func ( *ClientStream) () (metadata.MD, error) {.waitOnHeader()if !.headerValid || .noHeaders {return nil, .status.Err()}return .header.Copy(), nil}// TrailersOnly blocks until a header or trailers-only frame is received and// then returns true if the stream was trailers-only. If the stream ends// before headers are received, returns true, nil.func ( *ClientStream) () bool {.waitOnHeader()return .noHeaders}// Status returns the status received from the server.// Status can be read safely only after the stream has ended,// that is, after Done() is closed.func ( *ClientStream) () *status.Status {return .status}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)