Source File
server_stream.go
Belonging Package
google.golang.org/grpc/internal/transport
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package transport
import (
)
// ServerStream implements streaming functionality for a gRPC server.
type ServerStream struct {
*Stream // Embed for common stream functionality.
st internalServerTransport
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
// cancel is invoked at the end of stream to cancel ctx. It also stops the
// timer for monitoring the rpc deadline if configured.
cancel func()
// Holds compressor names passed in grpc-accept-encoding metadata from the
// client.
clientAdvertisedCompressors string
headerWireLength int
// hdrMu protects outgoing header and trailer metadata.
hdrMu sync.Mutex
header metadata.MD // the outgoing header metadata. Updated by WriteHeader.
headerSent atomic.Bool // atomically set when the headers are sent out.
}
// Read reads an n byte message from the input stream.
func ( *ServerStream) ( int) (mem.BufferSlice, error) {
, := .Stream.read()
if == nil {
.st.incrMsgRecv()
}
return ,
}
// SendHeader sends the header metadata for the given stream.
func ( *ServerStream) ( metadata.MD) error {
return .st.writeHeader(, )
}
// Write writes the hdr and data bytes to the output stream.
func ( *ServerStream) ( []byte, mem.BufferSlice, *WriteOptions) error {
return .st.write(, , , )
}
// WriteStatus sends the status of a stream to the client. WriteStatus is
// the final call made on a stream and always occurs.
func ( *ServerStream) ( *status.Status) error {
return .st.writeStatus(, )
}
// isHeaderSent indicates whether headers have been sent.
func ( *ServerStream) () bool {
return .headerSent.Load()
}
// updateHeaderSent updates headerSent and returns true
// if it was already set.
func ( *ServerStream) () bool {
return .headerSent.Swap(true)
}
// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func ( *ServerStream) () string {
return .recvCompress
}
// SendCompress returns the send compressor name.
func ( *ServerStream) () string {
return .sendCompress
}
// ContentSubtype returns the content-subtype for a request. For example, a
// content-subtype of "proto" will result in a content-type of
// "application/grpc+proto". This will always be lowercase. See
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
func ( *ServerStream) () string {
return .contentSubtype
}
// SetSendCompress sets the compression algorithm to the stream.
func ( *ServerStream) ( string) error {
if .isHeaderSent() || .getState() == streamDone {
return errors.New("transport: set send compressor called after headers sent or stream done")
}
.sendCompress =
return nil
}
// SetContext sets the context of the stream. This will be deleted once the
// stats handler callouts all move to gRPC layer.
func ( *ServerStream) ( context.Context) {
.ctx =
}
// ClientAdvertisedCompressors returns the compressor names advertised by the
// client via grpc-accept-encoding header.
func ( *ServerStream) () []string {
:= strings.Split(.clientAdvertisedCompressors, ",")
for , := range {
[] = strings.TrimSpace()
}
return
}
// Header returns the header metadata of the stream. It returns the out header
// after t.WriteHeader is called. It does not block and must not be called
// until after WriteHeader.
func ( *ServerStream) () (metadata.MD, error) {
// Return the header in stream. It will be the out
// header after t.WriteHeader is called.
return .header.Copy(), nil
}
// HeaderWireLength returns the size of the headers of the stream as received
// from the wire.
func ( *ServerStream) () int {
return .headerWireLength
}
// SetHeader sets the header metadata. This can be called multiple times.
// This should not be called in parallel to other data writes.
func ( *ServerStream) ( metadata.MD) error {
if .Len() == 0 {
return nil
}
if .isHeaderSent() || .getState() == streamDone {
return ErrIllegalHeaderWrite
}
.hdrMu.Lock()
.header = metadata.Join(.header, )
.hdrMu.Unlock()
return nil
}
// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can be called multiple times.
// This should not be called parallel to other data writes.
func ( *ServerStream) ( metadata.MD) error {
if .Len() == 0 {
return nil
}
if .getState() == streamDone {
return ErrIllegalHeaderWrite
}
.hdrMu.Lock()
.trailer = metadata.Join(.trailer, )
.hdrMu.Unlock()
return nil
}
The pages are generated with Golds v0.7.6. (GOOS=linux GOARCH=amd64)