package grpc
import (
rand
imetadata
iresolver
istatus
)
var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
type StreamHandler func(srv any, stream ServerStream) error
type StreamDesc struct {
StreamName string
Handler StreamHandler
ServerStreams bool
ClientStreams bool
}
type Stream interface {
Context() context.Context
SendMsg(m any) error
RecvMsg(m any) error
}
type ClientStream interface {
Header() (metadata.MD, error)
Trailer() metadata.MD
CloseSend() error
Context() context.Context
SendMsg(m any) error
RecvMsg(m any) error
}
func ( *ClientConn) ( context.Context, *StreamDesc, string, ...CallOption) (ClientStream, error) {
= combine(.dopts.callOptions, )
if .dopts.streamInt != nil {
return .dopts.streamInt(, , , , newClientStream, ...)
}
return newClientStream(, , , , ...)
}
func ( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) (ClientStream, error) {
return .NewStream(, , , ...)
}
func ( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) ( ClientStream, error) {
if := .idlenessMgr.OnCallBegin(); != nil {
return nil,
}
= append([]CallOption{OnFinish(func(error) { .idlenessMgr.OnCallEnd() })}, ...)
if , , := metadataFromOutgoingContextRaw(); {
if := imetadata.Validate(); != nil {
return nil, status.Error(codes.Internal, .Error())
}
for , := range {
for := 0; < len(); += 2 {
if := imetadata.ValidatePair([], [+1]); != nil {
return nil, status.Error(codes.Internal, .Error())
}
}
}
}
if channelz.IsOn() {
.incrCallsStarted()
defer func() {
if != nil {
.incrCallsFailed()
}
}()
}
, := .waitForResolvedAddrs()
if != nil {
return nil,
}
var serviceconfig.MethodConfig
var func()
:= func( context.Context, func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(, , , , , , , , ...)
}
:= iresolver.RPCInfo{Context: , Method: }
, := .safeConfigSelector.SelectConfig()
if != nil {
if , := status.FromError(); {
if istatus.IsRestrictedControlPlaneCode() {
= status.Errorf(codes.Internal, "config selector returned illegal status: %v", )
}
return nil,
}
return nil, toRPCErr()
}
if != nil {
if .Context != nil {
= .Context
}
= .MethodConfig
= .OnCommitted
if .Interceptor != nil {
.Context = nil
:=
= func( context.Context, func()) (iresolver.ClientStream, error) {
, := .Interceptor.NewStream(, , , )
if != nil {
return nil, toRPCErr()
}
return , nil
}
}
}
return (, func() {})
}
func ( context.Context, *StreamDesc, *ClientConn, string, serviceconfig.MethodConfig, , func(), bool, ...CallOption) ( iresolver.ClientStream, error) {
:= defaultCallInfo()
if .WaitForReady != nil {
.failFast = !*.WaitForReady
}
var context.CancelFunc
if .Timeout != nil && *.Timeout >= 0 {
, = context.WithTimeout(, *.Timeout)
} else {
, = context.WithCancel()
}
defer func() {
if != nil {
()
}
}()
for , := range {
if := .before(); != nil {
return nil, toRPCErr()
}
}
.maxSendMessageSize = getMaxSize(.MaxReqSize, .maxSendMessageSize, defaultClientMaxSendMessageSize)
.maxReceiveMessageSize = getMaxSize(.MaxRespSize, .maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
if := setCallInfoCodec(); != nil {
return nil,
}
:= &transport.CallHdr{
Host: .authority,
Method: ,
ContentSubtype: .contentSubtype,
DoneFunc: ,
Authority: .authority,
}
var Compressor
var encoding.Compressor
if := .compressorName; != "" {
.SendCompress =
if != encoding.Identity {
= encoding.GetCompressor()
if == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", )
}
}
} else if .dopts.compressorV0 != nil {
.SendCompress = .dopts.compressorV0.Type()
= .dopts.compressorV0
}
if .creds != nil {
.Creds = .creds
}
:= &clientStream{
callHdr: ,
ctx: ,
methodConfig: &,
opts: ,
callInfo: ,
cc: ,
desc: ,
codec: .codec,
compressorV0: ,
compressorV1: ,
cancel: ,
firstAttempt: true,
onCommit: ,
nameResolutionDelay: ,
}
if !.dopts.disableRetry {
.retryThrottler = .retryThrottler.Load().(*retryThrottler)
}
if := binarylog.GetMethodLogger(); != nil {
.binlogs = append(.binlogs, )
}
if .dopts.binaryLogger != nil {
if := .dopts.binaryLogger.GetMethodLogger(); != nil {
.binlogs = append(.binlogs, )
}
}
:= func( *csAttempt) error {
if := .getTransport(); != nil {
return
}
if := .newStream(); != nil {
return
}
.attempt =
return nil
}
if := .withRetry(, func() { .bufferForRetryLocked(0, , nil) }); != nil {
return nil,
}
if len(.binlogs) != 0 {
, := metadata.FromOutgoingContext()
:= &binarylog.ClientHeader{
OnClientSide: true,
Header: ,
MethodName: ,
Authority: .cc.authority,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
for , := range .binlogs {
.Log(.ctx, )
}
}
if != unaryStreamDesc {
go func() {
select {
case <-.ctx.Done():
.finish(ErrClientConnClosing)
case <-.Done():
.finish(toRPCErr(.Err()))
}
}()
}
return , nil
}
func ( *clientStream) ( bool) (*csAttempt, error) {
if := .ctx.Err(); != nil {
return nil, toRPCErr()
}
if := .cc.ctx.Err(); != nil {
return nil, ErrClientConnClosing
}
:= newContextWithRPCInfo(.ctx, .callInfo.failFast, .callInfo.codec, .compressorV0, .compressorV1)
:= .callHdr.Method
var time.Time
:= .cc.dopts.copts.StatsHandlers
for , := range {
= .TagRPC(, &stats.RPCTagInfo{FullMethodName: , FailFast: .callInfo.failFast, NameResolutionDelay: .nameResolutionDelay})
= time.Now()
:= &stats.Begin{
Client: true,
BeginTime: ,
FailFast: .callInfo.failFast,
IsClientStream: .desc.ClientStreams,
IsServerStream: .desc.ServerStreams,
IsTransparentRetryAttempt: ,
}
.HandleRPC(, )
}
var *traceInfo
if EnableTracing {
= &traceInfo{
tr: newTrace("grpc.Sent."+methodFamily(), ),
firstLine: firstLine{
client: true,
},
}
if , := .Deadline(); {
.firstLine.deadline = time.Until()
}
.tr.LazyLog(&.firstLine, false)
= newTraceContext(, .tr)
}
if .cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
= grpcutil.WithExtraMetadata(, metadata.Pairs(
"content-type", grpcutil.ContentType(.callHdr.ContentSubtype),
))
}
return &csAttempt{
ctx: ,
beginTime: ,
cs: ,
decompressorV0: .cc.dopts.dc,
statsHandlers: ,
trInfo: ,
}, nil
}
func ( *csAttempt) () error {
:= .cs
:= balancer.PickInfo{Ctx: .ctx, FullMethodName: .callHdr.Method}
, := .cc.pickerWrapper.pick(.ctx, .callInfo.failFast, )
.transport, .pickResult = .transport, .result
if != nil {
if , := .(dropError); {
= .error
.drop = true
}
return
}
if .trInfo != nil {
.trInfo.firstLine.SetRemoteAddr(.transport.RemoteAddr())
}
if .blocked {
for , := range .statsHandlers {
.HandleRPC(.ctx, &stats.DelayedPickComplete{})
}
}
return nil
}
func ( *csAttempt) () error {
:= .cs
.callHdr.PreviousAttempts = .numRetries
if .pickResult.Metadata != nil {
, := metadata.FromOutgoingContext(.ctx)
= metadata.Join(, .pickResult.Metadata)
.ctx = metadata.NewOutgoingContext(.ctx, )
}
, := .transport.NewStream(.ctx, .callHdr)
if != nil {
, := .(*transport.NewStreamError)
if ! {
return
}
if .AllowTransparentRetry {
.allowTransparentRetry = true
}
return toRPCErr(.Err)
}
.transportStream =
.ctx = .Context()
.parser = &parser{r: , bufferPool: .cs.cc.dopts.copts.BufferPool}
return nil
}
type clientStream struct {
callHdr *transport.CallHdr
opts []CallOption
callInfo *callInfo
cc *ClientConn
desc *StreamDesc
codec baseCodec
compressorV0 Compressor
compressorV1 encoding.Compressor
cancel context.CancelFunc
sentLast bool
methodConfig *MethodConfig
ctx context.Context
retryThrottler *retryThrottler
binlogs []binarylog.MethodLogger
serverHeaderBinlogged bool
mu sync.Mutex
firstAttempt bool
numRetries int
numRetriesSincePushback int
finished bool
attempt *csAttempt
committed bool
onCommit func()
replayBuffer []replayOp
replayBufferSize int
nameResolutionDelay bool
}
type replayOp struct {
op func(a *csAttempt) error
cleanup func()
}
type csAttempt struct {
ctx context.Context
cs *clientStream
transport transport.ClientTransport
transportStream *transport.ClientStream
parser *parser
pickResult balancer.PickResult
finished bool
decompressorV0 Decompressor
decompressorV1 encoding.Compressor
decompressorSet bool
mu sync.Mutex
trInfo *traceInfo
statsHandlers []stats.Handler
beginTime time.Time
allowTransparentRetry bool
drop bool
}
func ( *clientStream) () {
if !.committed && .onCommit != nil {
.onCommit()
}
.committed = true
for , := range .replayBuffer {
if .cleanup != nil {
.cleanup()
}
}
.replayBuffer = nil
}
func ( *clientStream) () {
.mu.Lock()
.commitAttemptLocked()
.mu.Unlock()
}
func ( *csAttempt) ( error) (bool, error) {
:= .cs
if .finished || .committed || .drop {
return false,
}
if .transportStream == nil && .allowTransparentRetry {
return true, nil
}
:= false
if .transportStream != nil {
<-.transportStream.Done()
= .transportStream.Unprocessed()
}
if .firstAttempt && {
return true, nil
}
if .cc.dopts.disableRetry {
return false,
}
:= 0
:= false
if .transportStream != nil {
if !.transportStream.TrailersOnly() {
return false,
}
:= .transportStream.Trailer()["grpc-retry-pushback-ms"]
if len() == 1 {
var error
if , = strconv.Atoi([0]); != nil || < 0 {
channelz.Infof(logger, .cc.channelz, "Server retry pushback specified to abort (%q).", [0])
.retryThrottler.throttle()
return false,
}
= true
} else if len() > 1 {
channelz.Warningf(logger, .cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", )
.retryThrottler.throttle()
return false,
}
}
var codes.Code
if .transportStream != nil {
= .transportStream.Status().Code()
} else {
= status.Code()
}
:= .methodConfig.RetryPolicy
if == nil || !.RetryableStatusCodes[] {
return false,
}
if .retryThrottler.throttle() {
return false,
}
if .numRetries+1 >= .MaxAttempts {
return false,
}
var time.Duration
if {
= time.Millisecond * time.Duration()
.numRetriesSincePushback = 0
} else {
:= math.Pow(.BackoffMultiplier, float64(.numRetriesSincePushback))
:= min(float64(.InitialBackoff)*, float64(.MaxBackoff))
*= 0.8 + 0.4*rand.Float64()
= time.Duration(int64())
.numRetriesSincePushback++
}
:= time.NewTimer()
select {
case <-.C:
.numRetries++
return false, nil
case <-.ctx.Done():
.Stop()
return false, status.FromContextError(.ctx.Err()).Err()
}
}
func ( *clientStream) ( *csAttempt, error) error {
for {
.finish(toRPCErr())
, := .shouldRetry()
if != nil {
.commitAttemptLocked()
return
}
.firstAttempt = false
, = .newAttemptLocked()
if != nil {
return
}
if = .replayBufferLocked(); == nil {
return nil
}
}
}
func ( *clientStream) () context.Context {
.commitAttempt()
if .attempt.transportStream != nil {
return .attempt.transportStream.Context()
}
return .ctx
}
func ( *clientStream) ( func( *csAttempt) error, func()) error {
.mu.Lock()
for {
if .committed {
.mu.Unlock()
return toRPCErr((.attempt))
}
if len(.replayBuffer) == 0 {
var error
if .attempt, = .newAttemptLocked(false ); != nil {
.mu.Unlock()
.finish()
return
}
}
:= .attempt
.mu.Unlock()
:= ()
.mu.Lock()
if != .attempt {
continue
}
if == io.EOF {
<-.transportStream.Done()
}
if == nil || ( == io.EOF && .transportStream.Status().Code() == codes.OK) {
()
.mu.Unlock()
return
}
if := .retryLocked(, ); != nil {
.mu.Unlock()
return
}
}
}
func ( *clientStream) () (metadata.MD, error) {
var metadata.MD
:= .withRetry(func( *csAttempt) error {
var error
, = .transportStream.Header()
return toRPCErr()
}, .commitAttemptLocked)
if == nil && == nil {
= io.EOF
}
if != nil {
.finish()
return nil, nil
}
if len(.binlogs) != 0 && !.serverHeaderBinlogged && != nil {
:= &binarylog.ServerHeader{
OnClientSide: true,
Header: ,
PeerAddr: nil,
}
if , := peer.FromContext(.Context()); {
.PeerAddr = .Addr
}
.serverHeaderBinlogged = true
for , := range .binlogs {
.Log(.ctx, )
}
}
return , nil
}
func ( *clientStream) () metadata.MD {
.commitAttempt()
if .attempt.transportStream == nil {
return nil
}
return .attempt.transportStream.Trailer()
}
func ( *clientStream) ( *csAttempt) error {
for , := range .replayBuffer {
if := .op(); != nil {
return
}
}
return nil
}
func ( *clientStream) ( int, func( *csAttempt) error, func()) {
if .committed {
return
}
.replayBufferSize +=
if .replayBufferSize > .callInfo.maxRetryRPCBufferSize {
.commitAttemptLocked()
()
return
}
.replayBuffer = append(.replayBuffer, replayOp{op: , cleanup: })
}
func ( *clientStream) ( any) ( error) {
defer func() {
if != nil && != io.EOF {
.finish()
}
}()
if .sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !.desc.ClientStreams {
.sentLast = true
}
, , , , := prepareMsg(, .codec, .compressorV0, .compressorV1, .cc.dopts.copts.BufferPool)
if != nil {
return
}
defer func() {
.Free()
if .isCompressed() {
.Free()
}
}()
:= .Len()
:= .Len()
if > *.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", , *.callInfo.maxSendMessageSize)
}
.Ref()
:= func( *csAttempt) error {
return .sendMsg(, , , , )
}
:= false
= .withRetry(, func() {
.bufferForRetryLocked(len()+, , .Free)
= true
})
if ! {
.Free()
}
if len(.binlogs) != 0 && == nil {
:= &binarylog.ClientMessage{
OnClientSide: true,
Message: .Materialize(),
}
for , := range .binlogs {
.Log(.ctx, )
}
}
return
}
func ( *clientStream) ( any) error {
if len(.binlogs) != 0 && !.serverHeaderBinlogged {
.Header()
}
var *payloadInfo
if len(.binlogs) != 0 {
= &payloadInfo{}
defer .free()
}
:= .withRetry(func( *csAttempt) error {
return .recvMsg(, )
}, .commitAttemptLocked)
if len(.binlogs) != 0 && == nil {
:= &binarylog.ServerMessage{
OnClientSide: true,
Message: .uncompressedBytes.Materialize(),
}
for , := range .binlogs {
.Log(.ctx, )
}
}
if != nil || !.desc.ServerStreams {
.finish()
}
return
}
func ( *clientStream) () error {
if .sentLast {
return nil
}
.sentLast = true
:= func( *csAttempt) error {
.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
return nil
}
.withRetry(, func() { .bufferForRetryLocked(0, , nil) })
if len(.binlogs) != 0 {
:= &binarylog.ClientHalfClose{
OnClientSide: true,
}
for , := range .binlogs {
.Log(.ctx, )
}
}
return nil
}
func ( *clientStream) ( error) {
if == io.EOF {
= nil
}
.mu.Lock()
if .finished {
.mu.Unlock()
return
}
.finished = true
for , := range .callInfo.onFinish {
()
}
.commitAttemptLocked()
if .attempt != nil {
.attempt.finish()
if .attempt.transportStream != nil {
for , := range .opts {
.after(.callInfo, .attempt)
}
}
}
.mu.Unlock()
if len(.binlogs) != 0 {
switch {
case errContextCanceled, errContextDeadline, ErrClientConnClosing:
:= &binarylog.Cancel{
OnClientSide: true,
}
for , := range .binlogs {
.Log(.ctx, )
}
default:
:= &binarylog.ServerTrailer{
OnClientSide: true,
Trailer: .Trailer(),
Err: ,
}
if , := peer.FromContext(.Context()); {
.PeerAddr = .Addr
}
for , := range .binlogs {
.Log(.ctx, )
}
}
}
if == nil {
.retryThrottler.successfulRPC()
}
if channelz.IsOn() {
if != nil {
.cc.incrCallsFailed()
} else {
.cc.incrCallsSucceeded()
}
}
.cancel()
}
func ( *csAttempt) ( any, []byte, mem.BufferSlice, , int) error {
:= .cs
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
.trInfo.tr.LazyLog(&payload{sent: true, msg: }, true)
}
.mu.Unlock()
}
if := .transportStream.Write(, , &transport.WriteOptions{Last: !.desc.ClientStreams}); != nil {
if !.desc.ClientStreams {
return nil
}
return io.EOF
}
if len(.statsHandlers) != 0 {
for , := range .statsHandlers {
.HandleRPC(.ctx, outPayload(true, , , , time.Now()))
}
}
return nil
}
func ( *csAttempt) ( any, *payloadInfo) ( error) {
:= .cs
if len(.statsHandlers) != 0 && == nil {
= &payloadInfo{}
defer .free()
}
if !.decompressorSet {
if := .transportStream.RecvCompress(); != "" && != encoding.Identity {
if .decompressorV0 == nil || .decompressorV0.Type() != {
.decompressorV0 = nil
.decompressorV1 = encoding.GetCompressor()
}
} else {
.decompressorV0 = nil
}
.decompressorSet = true
}
if := recv(.parser, .codec, .transportStream, .decompressorV0, , *.callInfo.maxReceiveMessageSize, , .decompressorV1, false); != nil {
if == io.EOF {
if := .transportStream.Status().Err(); != nil {
return
}
return io.EOF
}
return toRPCErr()
}
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
.trInfo.tr.LazyLog(&payload{sent: false, msg: }, true)
}
.mu.Unlock()
}
for , := range .statsHandlers {
.HandleRPC(.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: ,
WireLength: .compressedLength + headerLen,
CompressedLength: .compressedLength,
Length: .uncompressedBytes.Len(),
})
}
if .desc.ServerStreams {
return nil
}
if := recv(.parser, .codec, .transportStream, .decompressorV0, , *.callInfo.maxReceiveMessageSize, nil, .decompressorV1, false); == io.EOF {
return .transportStream.Status().Err()
} else if != nil {
return toRPCErr()
}
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func ( *csAttempt) ( error) {
.mu.Lock()
if .finished {
.mu.Unlock()
return
}
.finished = true
if == io.EOF {
= nil
}
var metadata.MD
if .transportStream != nil {
.transportStream.Close()
= .transportStream.Trailer()
}
if .pickResult.Done != nil {
:= false
if .transportStream != nil {
= .transportStream.BytesReceived()
}
.pickResult.Done(balancer.DoneInfo{
Err: ,
Trailer: ,
BytesSent: .transportStream != nil,
BytesReceived: ,
ServerLoad: balancerload.Parse(),
})
}
for , := range .statsHandlers {
:= &stats.End{
Client: true,
BeginTime: .beginTime,
EndTime: time.Now(),
Trailer: ,
Error: ,
}
.HandleRPC(.ctx, )
}
if .trInfo != nil && .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyPrintf("RPC: [OK]")
} else {
.trInfo.tr.LazyPrintf("RPC: [%v]", )
.trInfo.tr.SetError()
}
.trInfo.tr.Finish()
.trInfo.tr = nil
}
.mu.Unlock()
}
func ( context.Context, *StreamDesc, string, transport.ClientTransport, *addrConn, ...CallOption) ( ClientStream, error) {
if == nil {
return nil, errors.New("transport provided is nil")
}
:= &callInfo{}
, := context.WithCancel()
defer func() {
if != nil {
()
}
}()
for , := range {
if := .before(); != nil {
return nil, toRPCErr()
}
}
.maxReceiveMessageSize = getMaxSize(nil, .maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
.maxSendMessageSize = getMaxSize(nil, .maxSendMessageSize, defaultServerMaxSendMessageSize)
if := setCallInfoCodec(); != nil {
return nil,
}
:= &transport.CallHdr{
Host: .cc.authority,
Method: ,
ContentSubtype: .contentSubtype,
}
var Compressor
var encoding.Compressor
if := .compressorName; != "" {
.SendCompress =
if != encoding.Identity {
= encoding.GetCompressor()
if == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", )
}
}
} else if .cc.dopts.compressorV0 != nil {
.SendCompress = .cc.dopts.compressorV0.Type()
= .cc.dopts.compressorV0
}
if .creds != nil {
.Creds = .creds
}
:= &addrConnStream{
callHdr: ,
ac: ,
ctx: ,
cancel: ,
opts: ,
callInfo: ,
desc: ,
codec: .codec,
sendCompressorV0: ,
sendCompressorV1: ,
transport: ,
}
, := .transport.NewStream(.ctx, .callHdr)
if != nil {
= toRPCErr()
return nil,
}
.transportStream =
.parser = &parser{r: , bufferPool: .dopts.copts.BufferPool}
.incrCallsStarted()
if != unaryStreamDesc {
go func() {
.mu.Lock()
:= .ctx
.mu.Unlock()
select {
case <-.Done():
.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
case <-.Done():
.finish(toRPCErr(.Err()))
}
}()
}
return , nil
}
type addrConnStream struct {
transportStream *transport.ClientStream
ac *addrConn
callHdr *transport.CallHdr
cancel context.CancelFunc
opts []CallOption
callInfo *callInfo
transport transport.ClientTransport
ctx context.Context
sentLast bool
desc *StreamDesc
codec baseCodec
sendCompressorV0 Compressor
sendCompressorV1 encoding.Compressor
decompressorSet bool
decompressorV0 Decompressor
decompressorV1 encoding.Compressor
parser *parser
mu sync.Mutex
finished bool
}
func ( *addrConnStream) () (metadata.MD, error) {
, := .transportStream.Header()
if != nil {
.finish(toRPCErr())
}
return ,
}
func ( *addrConnStream) () metadata.MD {
return .transportStream.Trailer()
}
func ( *addrConnStream) () error {
if .sentLast {
return nil
}
.sentLast = true
.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
return nil
}
func ( *addrConnStream) () context.Context {
return .transportStream.Context()
}
func ( *addrConnStream) ( any) ( error) {
defer func() {
if != nil && != io.EOF {
.finish()
}
}()
if .sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !.desc.ClientStreams {
.sentLast = true
}
, , , , := prepareMsg(, .codec, .sendCompressorV0, .sendCompressorV1, .ac.dopts.copts.BufferPool)
if != nil {
return
}
defer func() {
.Free()
if .isCompressed() {
.Free()
}
}()
if .Len() > *.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", .Len(), *.callInfo.maxSendMessageSize)
}
if := .transportStream.Write(, , &transport.WriteOptions{Last: !.desc.ClientStreams}); != nil {
if !.desc.ClientStreams {
return nil
}
return io.EOF
}
return nil
}
func ( *addrConnStream) ( any) ( error) {
defer func() {
if != nil || !.desc.ServerStreams {
.finish()
}
}()
if !.decompressorSet {
if := .transportStream.RecvCompress(); != "" && != encoding.Identity {
if .decompressorV0 == nil || .decompressorV0.Type() != {
.decompressorV0 = nil
.decompressorV1 = encoding.GetCompressor()
}
} else {
.decompressorV0 = nil
}
.decompressorSet = true
}
if := recv(.parser, .codec, .transportStream, .decompressorV0, , *.callInfo.maxReceiveMessageSize, nil, .decompressorV1, false); != nil {
if == io.EOF {
if := .transportStream.Status().Err(); != nil {
return
}
return io.EOF
}
return toRPCErr()
}
if .desc.ServerStreams {
return nil
}
if := recv(.parser, .codec, .transportStream, .decompressorV0, , *.callInfo.maxReceiveMessageSize, nil, .decompressorV1, false); == io.EOF {
return .transportStream.Status().Err()
} else if != nil {
return toRPCErr()
}
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func ( *addrConnStream) ( error) {
.mu.Lock()
if .finished {
.mu.Unlock()
return
}
.finished = true
if == io.EOF {
= nil
}
if .transportStream != nil {
.transportStream.Close()
}
if != nil {
.ac.incrCallsFailed()
} else {
.ac.incrCallsSucceeded()
}
.cancel()
.mu.Unlock()
}
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
Context() context.Context
SendMsg(m any) error
RecvMsg(m any) error
}
type serverStream struct {
ctx context.Context
s *transport.ServerStream
p *parser
codec baseCodec
desc *StreamDesc
compressorV0 Compressor
compressorV1 encoding.Compressor
decompressorV0 Decompressor
decompressorV1 encoding.Compressor
sendCompressorName string
recvFirstMsg bool
maxReceiveMessageSize int
maxSendMessageSize int
trInfo *traceInfo
statsHandler []stats.Handler
binlogs []binarylog.MethodLogger
serverHeaderBinlogged bool
mu sync.Mutex
}
func ( *serverStream) () context.Context {
return .ctx
}
func ( *serverStream) ( metadata.MD) error {
if .Len() == 0 {
return nil
}
:= imetadata.Validate()
if != nil {
return status.Error(codes.Internal, .Error())
}
return .s.SetHeader()
}
func ( *serverStream) ( metadata.MD) error {
:= imetadata.Validate()
if != nil {
return status.Error(codes.Internal, .Error())
}
= .s.SendHeader()
if len(.binlogs) != 0 && !.serverHeaderBinlogged {
, := .s.Header()
:= &binarylog.ServerHeader{
Header: ,
}
.serverHeaderBinlogged = true
for , := range .binlogs {
.Log(.ctx, )
}
}
return
}
func ( *serverStream) ( metadata.MD) {
if .Len() == 0 {
return
}
if := imetadata.Validate(); != nil {
logger.Errorf("stream: failed to validate md when setting trailer, err: %v", )
}
.s.SetTrailer()
}
func ( *serverStream) ( any) ( error) {
defer func() {
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyLog(&payload{sent: true, msg: }, true)
} else {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
.trInfo.tr.SetError()
}
}
.mu.Unlock()
}
if != nil && != io.EOF {
, := status.FromError(toRPCErr())
.s.WriteStatus()
}
}()
if := .s.SendCompress(); != .sendCompressorName {
.compressorV1 = encoding.GetCompressor()
.sendCompressorName =
}
, , , , := prepareMsg(, .codec, .compressorV0, .compressorV1, .p.bufferPool)
if != nil {
return
}
defer func() {
.Free()
if .isCompressed() {
.Free()
}
}()
:= .Len()
:= .Len()
if > .maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", , .maxSendMessageSize)
}
if := .s.Write(, , &transport.WriteOptions{Last: false}); != nil {
return toRPCErr()
}
if len(.binlogs) != 0 {
if !.serverHeaderBinlogged {
, := .s.Header()
:= &binarylog.ServerHeader{
Header: ,
}
.serverHeaderBinlogged = true
for , := range .binlogs {
.Log(.ctx, )
}
}
:= &binarylog.ServerMessage{
Message: .Materialize(),
}
for , := range .binlogs {
.Log(.ctx, )
}
}
if len(.statsHandler) != 0 {
for , := range .statsHandler {
.HandleRPC(.s.Context(), outPayload(false, , , , time.Now()))
}
}
return nil
}
func ( *serverStream) ( any) ( error) {
defer func() {
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyLog(&payload{sent: false, msg: }, true)
} else if != io.EOF {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{}}, true)
.trInfo.tr.SetError()
}
}
.mu.Unlock()
}
if != nil && != io.EOF {
, := status.FromError(toRPCErr())
.s.WriteStatus()
}
}()
var *payloadInfo
if len(.statsHandler) != 0 || len(.binlogs) != 0 {
= &payloadInfo{}
defer .free()
}
if := recv(.p, .codec, .s, .decompressorV0, , .maxReceiveMessageSize, , .decompressorV1, true); != nil {
if == io.EOF {
if len(.binlogs) != 0 {
:= &binarylog.ClientHalfClose{}
for , := range .binlogs {
.Log(.ctx, )
}
}
if !.desc.ClientStreams && !.recvFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
}
return
}
if == io.ErrUnexpectedEOF {
= status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr()
}
.recvFirstMsg = true
if len(.statsHandler) != 0 {
for , := range .statsHandler {
.HandleRPC(.s.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: ,
Length: .uncompressedBytes.Len(),
WireLength: .compressedLength + headerLen,
CompressedLength: .compressedLength,
})
}
}
if len(.binlogs) != 0 {
:= &binarylog.ClientMessage{
Message: .uncompressedBytes.Materialize(),
}
for , := range .binlogs {
.Log(.ctx, )
}
}
if .desc.ClientStreams {
return nil
}
if := recv(.p, .codec, .s, .decompressorV0, , .maxReceiveMessageSize, nil, .decompressorV1, true); == io.EOF {
return nil
} else if != nil {
return
}
return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
}
func ( ServerStream) (string, bool) {
return Method(.Context())
}
func ( any, baseCodec, Compressor, encoding.Compressor, mem.BufferPool) ( []byte, , mem.BufferSlice, payloadFormat, error) {
if , := .(*PreparedMsg); {
return .hdr, .encodedData, .payload, .pf, nil
}
, = encode(, )
if != nil {
return nil, nil, nil, 0,
}
, , := compress(, , , )
if != nil {
.Free()
return nil, nil, nil, 0,
}
, = msgHeader(, , )
return , , , , nil
}