package grpc
import (
imetadata
iresolver
istatus
)
type StreamHandler func(srv interface{}, stream ServerStream) error
type StreamDesc struct {
StreamName string
Handler StreamHandler
ServerStreams bool
ClientStreams bool
}
type Stream interface {
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}
type ClientStream interface {
Header() (metadata.MD, error)
Trailer() metadata.MD
CloseSend() error
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}
func ( *ClientConn) ( context.Context, *StreamDesc, string, ...CallOption) (ClientStream, error) {
= combine(.dopts.callOptions, )
if .dopts.streamInt != nil {
return .dopts.streamInt(, , , , newClientStream, ...)
}
return newClientStream(, , , , ...)
}
func ( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) (ClientStream, error) {
return .NewStream(, , , ...)
}
func ( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) ( ClientStream, error) {
if , , := metadata.FromOutgoingContextRaw(); {
if := imetadata.Validate(); != nil {
return nil, status.Error(codes.Internal, .Error())
}
}
if channelz.IsOn() {
.incrCallsStarted()
defer func() {
if != nil {
.incrCallsFailed()
}
}()
}
if := .waitForResolvedAddrs(); != nil {
return nil,
}
var serviceconfig.MethodConfig
var func()
var = func( context.Context, func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(, , , , , , , ...)
}
:= iresolver.RPCInfo{Context: , Method: }
, := .safeConfigSelector.SelectConfig()
if != nil {
if , := status.FromError(); {
if istatus.IsRestrictedControlPlaneCode() {
= status.Errorf(codes.Internal, "config selector returned illegal status: %v", )
}
return nil,
}
return nil, toRPCErr()
}
if != nil {
if .Context != nil {
= .Context
}
= .MethodConfig
= .OnCommitted
if .Interceptor != nil {
.Context = nil
:=
= func( context.Context, func()) (iresolver.ClientStream, error) {
, := .Interceptor.NewStream(, , , )
if != nil {
return nil, toRPCErr()
}
return , nil
}
}
}
return (, func() {})
}
func ( context.Context, *StreamDesc, *ClientConn, string, serviceconfig.MethodConfig, , func(), ...CallOption) ( iresolver.ClientStream, error) {
:= defaultCallInfo()
if .WaitForReady != nil {
.failFast = !*.WaitForReady
}
var context.CancelFunc
if .Timeout != nil && *.Timeout >= 0 {
, = context.WithTimeout(, *.Timeout)
} else {
, = context.WithCancel()
}
defer func() {
if != nil {
()
}
}()
for , := range {
if := .before(); != nil {
return nil, toRPCErr()
}
}
.maxSendMessageSize = getMaxSize(.MaxReqSize, .maxSendMessageSize, defaultClientMaxSendMessageSize)
.maxReceiveMessageSize = getMaxSize(.MaxRespSize, .maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
if := setCallInfoCodec(); != nil {
return nil,
}
:= &transport.CallHdr{
Host: .authority,
Method: ,
ContentSubtype: .contentSubtype,
DoneFunc: ,
}
var Compressor
var encoding.Compressor
if := .compressorType; != "" {
.SendCompress =
if != encoding.Identity {
= encoding.GetCompressor()
if == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", )
}
}
} else if .dopts.cp != nil {
.SendCompress = .dopts.cp.Type()
= .dopts.cp
}
if .creds != nil {
.Creds = .creds
}
:= &clientStream{
callHdr: ,
ctx: ,
methodConfig: &,
opts: ,
callInfo: ,
cc: ,
desc: ,
codec: .codec,
cp: ,
comp: ,
cancel: ,
firstAttempt: true,
onCommit: ,
}
if !.dopts.disableRetry {
.retryThrottler = .retryThrottler.Load().(*retryThrottler)
}
if := binarylog.GetMethodLogger(); != nil {
.binlogs = append(.binlogs, )
}
if .dopts.binaryLogger != nil {
if := .dopts.binaryLogger.GetMethodLogger(); != nil {
.binlogs = append(.binlogs, )
}
}
:= func( *csAttempt) error {
if := .getTransport(); != nil {
return
}
if := .newStream(); != nil {
return
}
.attempt =
return nil
}
if := .withRetry(, func() { .bufferForRetryLocked(0, ) }); != nil {
return nil,
}
if len(.binlogs) != 0 {
, := metadata.FromOutgoingContext()
:= &binarylog.ClientHeader{
OnClientSide: true,
Header: ,
MethodName: ,
Authority: .cc.authority,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
for , := range .binlogs {
.Log()
}
}
if != unaryStreamDesc {
go func() {
select {
case <-.ctx.Done():
.finish(ErrClientConnClosing)
case <-.Done():
.finish(toRPCErr(.Err()))
}
}()
}
return , nil
}
func ( *clientStream) ( bool) (*csAttempt, error) {
if := .ctx.Err(); != nil {
return nil, toRPCErr()
}
if := .cc.ctx.Err(); != nil {
return nil, ErrClientConnClosing
}
:= newContextWithRPCInfo(.ctx, .callInfo.failFast, .callInfo.codec, .cp, .comp)
:= .callHdr.Method
var time.Time
:= .cc.dopts.copts.StatsHandlers
for , := range {
= .TagRPC(, &stats.RPCTagInfo{FullMethodName: , FailFast: .callInfo.failFast})
= time.Now()
:= &stats.Begin{
Client: true,
BeginTime: ,
FailFast: .callInfo.failFast,
IsClientStream: .desc.ClientStreams,
IsServerStream: .desc.ServerStreams,
IsTransparentRetryAttempt: ,
}
.HandleRPC(, )
}
var *traceInfo
if EnableTracing {
= &traceInfo{
tr: trace.New("grpc.Sent."+methodFamily(), ),
firstLine: firstLine{
client: true,
},
}
if , := .Deadline(); {
.firstLine.deadline = time.Until()
}
.tr.LazyLog(&.firstLine, false)
= trace.NewContext(, .tr)
}
if .cc.parsedTarget.URL.Scheme == "xds" {
= grpcutil.WithExtraMetadata(, metadata.Pairs(
"content-type", grpcutil.ContentType(.callHdr.ContentSubtype),
))
}
return &csAttempt{
ctx: ,
beginTime: ,
cs: ,
dc: .cc.dopts.dc,
statsHandlers: ,
trInfo: ,
}, nil
}
func ( *csAttempt) () error {
:= .cs
var error
.t, .pickResult, = .cc.getTransport(.ctx, .callInfo.failFast, .callHdr.Method)
if != nil {
if , := .(dropError); {
= .error
.drop = true
}
return
}
if .trInfo != nil {
.trInfo.firstLine.SetRemoteAddr(.t.RemoteAddr())
}
return nil
}
func ( *csAttempt) () error {
:= .cs
.callHdr.PreviousAttempts = .numRetries
if .pickResult.Metatada != nil {
, := metadata.FromOutgoingContext(.ctx)
= metadata.Join(, .pickResult.Metatada)
.ctx = metadata.NewOutgoingContext(.ctx, )
}
, := .t.NewStream(.ctx, .callHdr)
if != nil {
, := .(*transport.NewStreamError)
if ! {
return
}
if .AllowTransparentRetry {
.allowTransparentRetry = true
}
return toRPCErr(.Err)
}
.s =
.p = &parser{r: }
return nil
}
type clientStream struct {
callHdr *transport.CallHdr
opts []CallOption
callInfo *callInfo
cc *ClientConn
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
cancel context.CancelFunc
sentLast bool
methodConfig *MethodConfig
ctx context.Context
retryThrottler *retryThrottler
binlogs []binarylog.MethodLogger
serverHeaderBinlogged bool
mu sync.Mutex
firstAttempt bool
numRetries int
numRetriesSincePushback int
finished bool
attempt *csAttempt
committed bool
onCommit func()
buffer []func(a *csAttempt) error
bufferSize int
}
type csAttempt struct {
ctx context.Context
cs *clientStream
t transport.ClientTransport
s *transport.Stream
p *parser
pickResult balancer.PickResult
finished bool
dc Decompressor
decomp encoding.Compressor
decompSet bool
mu sync.Mutex
trInfo *traceInfo
statsHandlers []stats.Handler
beginTime time.Time
allowTransparentRetry bool
drop bool
}
func ( *clientStream) () {
if !.committed && .onCommit != nil {
.onCommit()
}
.committed = true
.buffer = nil
}
func ( *clientStream) () {
.mu.Lock()
.commitAttemptLocked()
.mu.Unlock()
}
func ( *csAttempt) ( error) (bool, error) {
:= .cs
if .finished || .committed || .drop {
return false,
}
if .s == nil && .allowTransparentRetry {
return true, nil
}
:= false
if .s != nil {
<-.s.Done()
= .s.Unprocessed()
}
if .firstAttempt && {
return true, nil
}
if .cc.dopts.disableRetry {
return false,
}
:= 0
:= false
if .s != nil {
if !.s.TrailersOnly() {
return false,
}
:= .s.Trailer()["grpc-retry-pushback-ms"]
if len() == 1 {
var error
if , = strconv.Atoi([0]); != nil || < 0 {
channelz.Infof(logger, .cc.channelzID, "Server retry pushback specified to abort (%q).", [0])
.retryThrottler.throttle()
return false,
}
= true
} else if len() > 1 {
channelz.Warningf(logger, .cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", )
.retryThrottler.throttle()
return false,
}
}
var codes.Code
if .s != nil {
= .s.Status().Code()
} else {
= status.Code()
}
:= .methodConfig.RetryPolicy
if == nil || !.RetryableStatusCodes[] {
return false,
}
if .retryThrottler.throttle() {
return false,
}
if .numRetries+1 >= .MaxAttempts {
return false,
}
var time.Duration
if {
= time.Millisecond * time.Duration()
.numRetriesSincePushback = 0
} else {
:= math.Pow(.BackoffMultiplier, float64(.numRetriesSincePushback))
:= float64(.InitialBackoff) *
if := float64(.MaxBackoff); > {
=
}
= time.Duration(grpcrand.Int63n(int64()))
.numRetriesSincePushback++
}
:= time.NewTimer()
select {
case <-.C:
.numRetries++
return false, nil
case <-.ctx.Done():
.Stop()
return false, status.FromContextError(.ctx.Err()).Err()
}
}
func ( *clientStream) ( *csAttempt, error) error {
for {
.finish(toRPCErr())
, := .shouldRetry()
if != nil {
.commitAttemptLocked()
return
}
.firstAttempt = false
, = .newAttemptLocked()
if != nil {
return
}
if = .replayBufferLocked(); == nil {
return nil
}
}
}
func ( *clientStream) () context.Context {
.commitAttempt()
if .attempt.s != nil {
return .attempt.s.Context()
}
return .ctx
}
func ( *clientStream) ( func( *csAttempt) error, func()) error {
.mu.Lock()
for {
if .committed {
.mu.Unlock()
return toRPCErr((.attempt))
}
if len(.buffer) == 0 {
var error
if .attempt, = .newAttemptLocked(false ); != nil {
.mu.Unlock()
.finish()
return
}
}
:= .attempt
.mu.Unlock()
:= ()
.mu.Lock()
if != .attempt {
continue
}
if == io.EOF {
<-.s.Done()
}
if == nil || ( == io.EOF && .s.Status().Code() == codes.OK) {
()
.mu.Unlock()
return
}
if := .retryLocked(, ); != nil {
.mu.Unlock()
return
}
}
}
func ( *clientStream) () (metadata.MD, error) {
var metadata.MD
:= false
:= .withRetry(func( *csAttempt) error {
var error
, = .s.Header()
if == transport.ErrNoHeaders {
= true
return nil
}
return toRPCErr()
}, .commitAttemptLocked)
if != nil {
.finish()
return nil,
}
if len(.binlogs) != 0 && !.serverHeaderBinlogged && ! {
:= &binarylog.ServerHeader{
OnClientSide: true,
Header: ,
PeerAddr: nil,
}
if , := peer.FromContext(.Context()); {
.PeerAddr = .Addr
}
.serverHeaderBinlogged = true
for , := range .binlogs {
.Log()
}
}
return , nil
}
func ( *clientStream) () metadata.MD {
.commitAttempt()
if .attempt.s == nil {
return nil
}
return .attempt.s.Trailer()
}
func ( *clientStream) ( *csAttempt) error {
for , := range .buffer {
if := (); != nil {
return
}
}
return nil
}
func ( *clientStream) ( int, func( *csAttempt) error) {
if .committed {
return
}
.bufferSize +=
if .bufferSize > .callInfo.maxRetryRPCBufferSize {
.commitAttemptLocked()
return
}
.buffer = append(.buffer, )
}
func ( *clientStream) ( interface{}) ( error) {
defer func() {
if != nil && != io.EOF {
.finish()
}
}()
if .sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !.desc.ClientStreams {
.sentLast = true
}
, , , := prepareMsg(, .codec, .cp, .comp)
if != nil {
return
}
if len() > *.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), *.callInfo.maxSendMessageSize)
}
:= func( *csAttempt) error {
return .sendMsg(, , , )
}
= .withRetry(, func() { .bufferForRetryLocked(len()+len(), ) })
if len(.binlogs) != 0 && == nil {
:= &binarylog.ClientMessage{
OnClientSide: true,
Message: ,
}
for , := range .binlogs {
.Log()
}
}
return
}
func ( *clientStream) ( interface{}) error {
if len(.binlogs) != 0 && !.serverHeaderBinlogged {
.Header()
}
var *payloadInfo
if len(.binlogs) != 0 {
= &payloadInfo{}
}
:= .withRetry(func( *csAttempt) error {
return .recvMsg(, )
}, .commitAttemptLocked)
if len(.binlogs) != 0 && == nil {
:= &binarylog.ServerMessage{
OnClientSide: true,
Message: .uncompressedBytes,
}
for , := range .binlogs {
.Log()
}
}
if != nil || !.desc.ServerStreams {
.finish()
if len(.binlogs) != 0 {
:= &binarylog.ServerTrailer{
OnClientSide: true,
Trailer: .Trailer(),
Err: ,
}
if .Err == io.EOF {
.Err = nil
}
if , := peer.FromContext(.Context()); {
.PeerAddr = .Addr
}
for , := range .binlogs {
.Log()
}
}
}
return
}
func ( *clientStream) () error {
if .sentLast {
return nil
}
.sentLast = true
:= func( *csAttempt) error {
.t.Write(.s, nil, nil, &transport.Options{Last: true})
return nil
}
.withRetry(, func() { .bufferForRetryLocked(0, ) })
if len(.binlogs) != 0 {
:= &binarylog.ClientHalfClose{
OnClientSide: true,
}
for , := range .binlogs {
.Log()
}
}
return nil
}
func ( *clientStream) ( error) {
if == io.EOF {
= nil
}
.mu.Lock()
if .finished {
.mu.Unlock()
return
}
.finished = true
.commitAttemptLocked()
if .attempt != nil {
.attempt.finish()
if .attempt.s != nil {
for , := range .opts {
.after(.callInfo, .attempt)
}
}
}
.mu.Unlock()
if len(.binlogs) != 0 && status.Code() == codes.Canceled {
:= &binarylog.Cancel{
OnClientSide: true,
}
for , := range .binlogs {
.Log()
}
}
if == nil {
.retryThrottler.successfulRPC()
}
if channelz.IsOn() {
if != nil {
.cc.incrCallsFailed()
} else {
.cc.incrCallsSucceeded()
}
}
.cancel()
}
func ( *csAttempt) ( interface{}, , , []byte) error {
:= .cs
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
.trInfo.tr.LazyLog(&payload{sent: true, msg: }, true)
}
.mu.Unlock()
}
if := .t.Write(.s, , , &transport.Options{Last: !.desc.ClientStreams}); != nil {
if !.desc.ClientStreams {
return nil
}
return io.EOF
}
for , := range .statsHandlers {
.HandleRPC(.ctx, outPayload(true, , , , time.Now()))
}
if channelz.IsOn() {
.t.IncrMsgSent()
}
return nil
}
func ( *csAttempt) ( interface{}, *payloadInfo) ( error) {
:= .cs
if len(.statsHandlers) != 0 && == nil {
= &payloadInfo{}
}
if !.decompSet {
if := .s.RecvCompress(); != "" && != encoding.Identity {
if .dc == nil || .dc.Type() != {
.dc = nil
.decomp = encoding.GetCompressor()
}
} else {
.dc = nil
}
.decompSet = true
}
= recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, , .decomp)
if != nil {
if == io.EOF {
if := .s.Status().Err(); != nil {
return
}
return io.EOF
}
return toRPCErr()
}
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
.trInfo.tr.LazyLog(&payload{sent: false, msg: }, true)
}
.mu.Unlock()
}
for , := range .statsHandlers {
.HandleRPC(.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: ,
Data: .uncompressedBytes,
WireLength: .wireLength + headerLen,
Length: len(.uncompressedBytes),
})
}
if channelz.IsOn() {
.t.IncrMsgRecv()
}
if .desc.ServerStreams {
return nil
}
= recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, nil, .decomp)
if == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if == io.EOF {
return .s.Status().Err()
}
return toRPCErr()
}
func ( *csAttempt) ( error) {
.mu.Lock()
if .finished {
.mu.Unlock()
return
}
.finished = true
if == io.EOF {
= nil
}
var metadata.MD
if .s != nil {
.t.CloseStream(.s, )
= .s.Trailer()
}
if .pickResult.Done != nil {
:= false
if .s != nil {
= .s.BytesReceived()
}
.pickResult.Done(balancer.DoneInfo{
Err: ,
Trailer: ,
BytesSent: .s != nil,
BytesReceived: ,
ServerLoad: balancerload.Parse(),
})
}
for , := range .statsHandlers {
:= &stats.End{
Client: true,
BeginTime: .beginTime,
EndTime: time.Now(),
Trailer: ,
Error: ,
}
.HandleRPC(.ctx, )
}
if .trInfo != nil && .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyPrintf("RPC: [OK]")
} else {
.trInfo.tr.LazyPrintf("RPC: [%v]", )
.trInfo.tr.SetError()
}
.trInfo.tr.Finish()
.trInfo.tr = nil
}
.mu.Unlock()
}
func ( context.Context, *StreamDesc, string, transport.ClientTransport, *addrConn, ...CallOption) ( ClientStream, error) {
if == nil {
return nil, errors.New("transport provided is nil")
}
:= &callInfo{}
, := context.WithCancel()
defer func() {
if != nil {
()
}
}()
for , := range {
if := .before(); != nil {
return nil, toRPCErr()
}
}
.maxReceiveMessageSize = getMaxSize(nil, .maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
.maxSendMessageSize = getMaxSize(nil, .maxSendMessageSize, defaultServerMaxSendMessageSize)
if := setCallInfoCodec(); != nil {
return nil,
}
:= &transport.CallHdr{
Host: .cc.authority,
Method: ,
ContentSubtype: .contentSubtype,
}
var Compressor
var encoding.Compressor
if := .compressorType; != "" {
.SendCompress =
if != encoding.Identity {
= encoding.GetCompressor()
if == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", )
}
}
} else if .cc.dopts.cp != nil {
.SendCompress = .cc.dopts.cp.Type()
= .cc.dopts.cp
}
if .creds != nil {
.Creds = .creds
}
:= &addrConnStream{
callHdr: ,
ac: ,
ctx: ,
cancel: ,
opts: ,
callInfo: ,
desc: ,
codec: .codec,
cp: ,
comp: ,
t: ,
}
, := .t.NewStream(.ctx, .callHdr)
if != nil {
= toRPCErr()
return nil,
}
.s =
.p = &parser{r: }
.incrCallsStarted()
if != unaryStreamDesc {
go func() {
select {
case <-.ctx.Done():
.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
case <-.Done():
.finish(toRPCErr(.Err()))
}
}()
}
return , nil
}
type addrConnStream struct {
s *transport.Stream
ac *addrConn
callHdr *transport.CallHdr
cancel context.CancelFunc
opts []CallOption
callInfo *callInfo
t transport.ClientTransport
ctx context.Context
sentLast bool
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
decompSet bool
dc Decompressor
decomp encoding.Compressor
p *parser
mu sync.Mutex
finished bool
}
func ( *addrConnStream) () (metadata.MD, error) {
, := .s.Header()
if != nil {
.finish(toRPCErr())
}
return ,
}
func ( *addrConnStream) () metadata.MD {
return .s.Trailer()
}
func ( *addrConnStream) () error {
if .sentLast {
return nil
}
.sentLast = true
.t.Write(.s, nil, nil, &transport.Options{Last: true})
return nil
}
func ( *addrConnStream) () context.Context {
return .s.Context()
}
func ( *addrConnStream) ( interface{}) ( error) {
defer func() {
if != nil && != io.EOF {
.finish()
}
}()
if .sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !.desc.ClientStreams {
.sentLast = true
}
, , , := prepareMsg(, .codec, .cp, .comp)
if != nil {
return
}
if len() > *.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), *.callInfo.maxSendMessageSize)
}
if := .t.Write(.s, , , &transport.Options{Last: !.desc.ClientStreams}); != nil {
if !.desc.ClientStreams {
return nil
}
return io.EOF
}
if channelz.IsOn() {
.t.IncrMsgSent()
}
return nil
}
func ( *addrConnStream) ( interface{}) ( error) {
defer func() {
if != nil || !.desc.ServerStreams {
.finish()
}
}()
if !.decompSet {
if := .s.RecvCompress(); != "" && != encoding.Identity {
if .dc == nil || .dc.Type() != {
.dc = nil
.decomp = encoding.GetCompressor()
}
} else {
.dc = nil
}
.decompSet = true
}
= recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, nil, .decomp)
if != nil {
if == io.EOF {
if := .s.Status().Err(); != nil {
return
}
return io.EOF
}
return toRPCErr()
}
if channelz.IsOn() {
.t.IncrMsgRecv()
}
if .desc.ServerStreams {
return nil
}
= recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, nil, .decomp)
if == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if == io.EOF {
return .s.Status().Err()
}
return toRPCErr()
}
func ( *addrConnStream) ( error) {
.mu.Lock()
if .finished {
.mu.Unlock()
return
}
.finished = true
if == io.EOF {
= nil
}
if .s != nil {
.t.CloseStream(.s, )
}
if != nil {
.ac.incrCallsFailed()
} else {
.ac.incrCallsSucceeded()
}
.cancel()
.mu.Unlock()
}
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}
type serverStream struct {
ctx context.Context
t transport.ServerTransport
s *transport.Stream
p *parser
codec baseCodec
cp Compressor
dc Decompressor
comp encoding.Compressor
decomp encoding.Compressor
maxReceiveMessageSize int
maxSendMessageSize int
trInfo *traceInfo
statsHandler []stats.Handler
binlogs []binarylog.MethodLogger
serverHeaderBinlogged bool
mu sync.Mutex
}
func ( *serverStream) () context.Context {
return .ctx
}
func ( *serverStream) ( metadata.MD) error {
if .Len() == 0 {
return nil
}
:= imetadata.Validate()
if != nil {
return status.Error(codes.Internal, .Error())
}
return .s.SetHeader()
}
func ( *serverStream) ( metadata.MD) error {
:= imetadata.Validate()
if != nil {
return status.Error(codes.Internal, .Error())
}
= .t.WriteHeader(.s, )
if len(.binlogs) != 0 && !.serverHeaderBinlogged {
, := .s.Header()
:= &binarylog.ServerHeader{
Header: ,
}
.serverHeaderBinlogged = true
for , := range .binlogs {
.Log()
}
}
return
}
func ( *serverStream) ( metadata.MD) {
if .Len() == 0 {
return
}
if := imetadata.Validate(); != nil {
logger.Errorf("stream: failed to validate md when setting trailer, err: %v", )
}
.s.SetTrailer()
}
func ( *serverStream) ( interface{}) ( error) {
defer func() {
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyLog(&payload{sent: true, msg: }, true)
} else {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.trInfo.tr.SetError()
}
}
.mu.Unlock()
}
if != nil && != io.EOF {
, := status.FromError(toRPCErr())
.t.WriteStatus(.s, )
}
if channelz.IsOn() && == nil {
.t.IncrMsgSent()
}
}()
, , , := prepareMsg(, .codec, .cp, .comp)
if != nil {
return
}
if len() > .maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), .maxSendMessageSize)
}
if := .t.Write(.s, , , &transport.Options{Last: false}); != nil {
return toRPCErr()
}
if len(.binlogs) != 0 {
if !.serverHeaderBinlogged {
, := .s.Header()
:= &binarylog.ServerHeader{
Header: ,
}
.serverHeaderBinlogged = true
for , := range .binlogs {
.Log()
}
}
:= &binarylog.ServerMessage{
Message: ,
}
for , := range .binlogs {
.Log()
}
}
if len(.statsHandler) != 0 {
for , := range .statsHandler {
.HandleRPC(.s.Context(), outPayload(false, , , , time.Now()))
}
}
return nil
}
func ( *serverStream) ( interface{}) ( error) {
defer func() {
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyLog(&payload{sent: false, msg: }, true)
} else if != io.EOF {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.trInfo.tr.SetError()
}
}
.mu.Unlock()
}
if != nil && != io.EOF {
, := status.FromError(toRPCErr())
.t.WriteStatus(.s, )
}
if channelz.IsOn() && == nil {
.t.IncrMsgRecv()
}
}()
var *payloadInfo
if len(.statsHandler) != 0 || len(.binlogs) != 0 {
= &payloadInfo{}
}
if := recv(.p, .codec, .s, .dc, , .maxReceiveMessageSize, , .decomp); != nil {
if == io.EOF {
if len(.binlogs) != 0 {
:= &binarylog.ClientHalfClose{}
for , := range .binlogs {
.Log()
}
}
return
}
if == io.ErrUnexpectedEOF {
= status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr()
}
if len(.statsHandler) != 0 {
for , := range .statsHandler {
.HandleRPC(.s.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: ,
Data: .uncompressedBytes,
WireLength: .wireLength + headerLen,
Length: len(.uncompressedBytes),
})
}
}
if len(.binlogs) != 0 {
:= &binarylog.ClientMessage{
Message: .uncompressedBytes,
}
for , := range .binlogs {
.Log()
}
}
return nil
}
func ( ServerStream) (string, bool) {
return Method(.Context())
}
func ( interface{}, baseCodec, Compressor, encoding.Compressor) (, , []byte, error) {
if , := .(*PreparedMsg); {
return .hdr, .payload, .encodedData, nil
}
, = encode(, )
if != nil {
return nil, nil, nil,
}
, := compress(, , )
if != nil {
return nil, nil, nil,
}
, = msgHeader(, )
return , , , nil
}