Source File
client_stream.go
Belonging Package
google.golang.org/grpc/internal/transport
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
)
// ClientStream implements streaming functionality for a gRPC client.
type ClientStream struct {
*Stream // Embed for common stream functionality.
ct *http2Client
done chan struct{} // closed at the end of stream to unblock writers.
doneFunc func() // invoked at the end of stream.
headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
// headerValid indicates whether a valid header was received. Only
// meaningful after headerChan is closed (always call waitOnHeader() before
// reading its value).
headerValid bool
header metadata.MD // the received header metadata
noHeaders bool // set if the client never received headers (set only after the stream is done).
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
status *status.Status // the status error received from the server
}
// Read reads an n byte message from the input stream.
func ( *ClientStream) ( int) (mem.BufferSlice, error) {
, := .Stream.read()
if == nil {
.ct.incrMsgRecv()
}
return ,
}
// Close closes the stream and propagates err to any readers.
func ( *ClientStream) ( error) {
var (
bool
http2.ErrCode
)
if != nil {
= true
= http2.ErrCodeCancel
}
.ct.closeStream(, , , , status.Convert(), nil, false)
}
// Write writes the hdr and data bytes to the output stream.
func ( *ClientStream) ( []byte, mem.BufferSlice, *WriteOptions) error {
return .ct.write(, , , )
}
// BytesReceived indicates whether any bytes have been received on this stream.
func ( *ClientStream) () bool {
return .bytesReceived.Load()
}
// Unprocessed indicates whether the server did not process this stream --
// i.e. it sent a refused stream or GOAWAY including this stream ID.
func ( *ClientStream) () bool {
return .unprocessed.Load()
}
func ( *ClientStream) () {
select {
case <-.ctx.Done():
// Close the stream to prevent headers/trailers from changing after
// this function returns.
.Close(ContextErr(.ctx.Err()))
// headerChan could possibly not be closed yet if closeStream raced
// with operateHeaders; wait until it is closed explicitly here.
<-.headerChan
case <-.headerChan:
}
}
// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func ( *ClientStream) () string {
.waitOnHeader()
return .recvCompress
}
// Done returns a channel which is closed when it receives the final status
// from the server.
func ( *ClientStream) () <-chan struct{} {
return .done
}
// Header returns the header metadata of the stream. Acquires the key-value
// pairs of header metadata once it is available. It blocks until i) the
// metadata is ready or ii) there is no header metadata or iii) the stream is
// canceled/expired.
func ( *ClientStream) () (metadata.MD, error) {
.waitOnHeader()
if !.headerValid || .noHeaders {
return nil, .status.Err()
}
return .header.Copy(), nil
}
// TrailersOnly blocks until a header or trailers-only frame is received and
// then returns true if the stream was trailers-only. If the stream ends
// before headers are received, returns true, nil.
func ( *ClientStream) () bool {
.waitOnHeader()
return .noHeaders
}
// Status returns the status received from the server.
// Status can be read safely only after the stream has ended,
// that is, after Done() is closed.
func ( *ClientStream) () *status.Status {
return .status
}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)