package grpc
import (
)
type Compressor interface {
Do(w io.Writer, p []byte) error
Type() string
}
type gzipCompressor struct {
pool sync.Pool
}
func () Compressor {
, := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
return
}
func ( int) (Compressor, error) {
if < gzip.DefaultCompression || > gzip.BestCompression {
return nil, fmt.Errorf("grpc: invalid compression level: %d", )
}
return &gzipCompressor{
pool: sync.Pool{
New: func() any {
, := gzip.NewWriterLevel(io.Discard, )
if != nil {
panic()
}
return
},
},
}, nil
}
func ( *gzipCompressor) ( io.Writer, []byte) error {
:= .pool.Get().(*gzip.Writer)
defer .pool.Put()
.Reset()
if , := .Write(); != nil {
return
}
return .Close()
}
func ( *gzipCompressor) () string {
return "gzip"
}
type Decompressor interface {
Do(r io.Reader) ([]byte, error)
Type() string
}
type gzipDecompressor struct {
pool sync.Pool
}
func () Decompressor {
return &gzipDecompressor{}
}
func ( *gzipDecompressor) ( io.Reader) ([]byte, error) {
var *gzip.Reader
switch maybeZ := .pool.Get().(type) {
case nil:
, := gzip.NewReader()
if != nil {
return nil,
}
=
case *gzip.Reader:
=
if := .Reset(); != nil {
.pool.Put()
return nil,
}
}
defer func() {
.Close()
.pool.Put()
}()
return io.ReadAll()
}
func ( *gzipDecompressor) () string {
return "gzip"
}
type callInfo struct {
compressorName string
failFast bool
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
contentSubtype string
codec baseCodec
maxRetryRPCBufferSize int
onFinish []func(err error)
authority string
}
func () *callInfo {
return &callInfo{
failFast: true,
maxRetryRPCBufferSize: 256 * 1024,
}
}
type CallOption interface {
before(*callInfo) error
after(*callInfo, *csAttempt)
}
type EmptyCallOption struct{}
func (EmptyCallOption) (*callInfo) error { return nil }
func (EmptyCallOption) (*callInfo, *csAttempt) {}
func () CallOption {
return StaticMethodCallOption{}
}
type StaticMethodCallOption struct {
EmptyCallOption
}
func ( *metadata.MD) CallOption {
return HeaderCallOption{HeaderAddr: }
}
type HeaderCallOption struct {
HeaderAddr *metadata.MD
}
func ( HeaderCallOption) (*callInfo) error { return nil }
func ( HeaderCallOption) ( *callInfo, *csAttempt) {
*.HeaderAddr, _ = .transportStream.Header()
}
func ( *metadata.MD) CallOption {
return TrailerCallOption{TrailerAddr: }
}
type TrailerCallOption struct {
TrailerAddr *metadata.MD
}
func ( TrailerCallOption) (*callInfo) error { return nil }
func ( TrailerCallOption) ( *callInfo, *csAttempt) {
*.TrailerAddr = .transportStream.Trailer()
}
func ( *peer.Peer) CallOption {
return PeerCallOption{PeerAddr: }
}
type PeerCallOption struct {
PeerAddr *peer.Peer
}
func ( PeerCallOption) (*callInfo) error { return nil }
func ( PeerCallOption) ( *callInfo, *csAttempt) {
if , := peer.FromContext(.transportStream.Context()); {
*.PeerAddr = *
}
}
func ( bool) CallOption {
return FailFastCallOption{FailFast: !}
}
func ( bool) CallOption {
return FailFastCallOption{FailFast: }
}
type FailFastCallOption struct {
FailFast bool
}
func ( FailFastCallOption) ( *callInfo) error {
.failFast = .FailFast
return nil
}
func ( FailFastCallOption) (*callInfo, *csAttempt) {}
func ( func( error)) CallOption {
return OnFinishCallOption{
OnFinish: ,
}
}
type OnFinishCallOption struct {
OnFinish func(error)
}
func ( OnFinishCallOption) ( *callInfo) error {
.onFinish = append(.onFinish, .OnFinish)
return nil
}
func ( OnFinishCallOption) (*callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: }
}
type MaxRecvMsgSizeCallOption struct {
MaxRecvMsgSize int
}
func ( MaxRecvMsgSizeCallOption) ( *callInfo) error {
.maxReceiveMessageSize = &.MaxRecvMsgSize
return nil
}
func ( MaxRecvMsgSizeCallOption) (*callInfo, *csAttempt) {}
func ( string) CallOption {
return AuthorityOverrideCallOption{Authority: }
}
type AuthorityOverrideCallOption struct {
Authority string
}
func ( AuthorityOverrideCallOption) ( *callInfo) error {
.authority = .Authority
return nil
}
func ( AuthorityOverrideCallOption) (*callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxSendMsgSizeCallOption{MaxSendMsgSize: }
}
type MaxSendMsgSizeCallOption struct {
MaxSendMsgSize int
}
func ( MaxSendMsgSizeCallOption) ( *callInfo) error {
.maxSendMessageSize = &.MaxSendMsgSize
return nil
}
func ( MaxSendMsgSizeCallOption) (*callInfo, *csAttempt) {}
func ( credentials.PerRPCCredentials) CallOption {
return PerRPCCredsCallOption{Creds: }
}
type PerRPCCredsCallOption struct {
Creds credentials.PerRPCCredentials
}
func ( PerRPCCredsCallOption) ( *callInfo) error {
.creds = .Creds
return nil
}
func ( PerRPCCredsCallOption) (*callInfo, *csAttempt) {}
func ( string) CallOption {
return CompressorCallOption{CompressorType: }
}
type CompressorCallOption struct {
CompressorType string
}
func ( CompressorCallOption) ( *callInfo) error {
.compressorName = .CompressorType
return nil
}
func ( CompressorCallOption) (*callInfo, *csAttempt) {}
func ( string) CallOption {
return ContentSubtypeCallOption{ContentSubtype: strings.ToLower()}
}
type ContentSubtypeCallOption struct {
ContentSubtype string
}
func ( ContentSubtypeCallOption) ( *callInfo) error {
.contentSubtype = .ContentSubtype
return nil
}
func ( ContentSubtypeCallOption) (*callInfo, *csAttempt) {}
func ( encoding.Codec) CallOption {
return ForceCodecCallOption{Codec: }
}
type ForceCodecCallOption struct {
Codec encoding.Codec
}
func ( ForceCodecCallOption) ( *callInfo) error {
.codec = newCodecV1Bridge(.Codec)
return nil
}
func ( ForceCodecCallOption) (*callInfo, *csAttempt) {}
func ( encoding.CodecV2) CallOption {
return ForceCodecV2CallOption{CodecV2: }
}
type ForceCodecV2CallOption struct {
CodecV2 encoding.CodecV2
}
func ( ForceCodecV2CallOption) ( *callInfo) error {
.codec = .CodecV2
return nil
}
func ( ForceCodecV2CallOption) (*callInfo, *csAttempt) {}
func ( Codec) CallOption {
return CustomCodecCallOption{Codec: }
}
type CustomCodecCallOption struct {
Codec Codec
}
func ( CustomCodecCallOption) ( *callInfo) error {
.codec = newCodecV0Bridge(.Codec)
return nil
}
func ( CustomCodecCallOption) (*callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxRetryRPCBufferSizeCallOption{}
}
type MaxRetryRPCBufferSizeCallOption struct {
MaxRetryRPCBufferSize int
}
func ( MaxRetryRPCBufferSizeCallOption) ( *callInfo) error {
.maxRetryRPCBufferSize = .MaxRetryRPCBufferSize
return nil
}
func ( MaxRetryRPCBufferSizeCallOption) (*callInfo, *csAttempt) {}
type payloadFormat uint8
const (
compressionNone payloadFormat = 0
compressionMade payloadFormat = 1
)
func ( payloadFormat) () bool {
return == compressionMade
}
type streamReader interface {
ReadMessageHeader(header []byte) error
Read(n int) (mem.BufferSlice, error)
}
type parser struct {
r streamReader
header [5]byte
bufferPool mem.BufferPool
}
func ( *parser) ( int) (payloadFormat, mem.BufferSlice, error) {
:= .r.ReadMessageHeader(.header[:])
if != nil {
return 0, nil,
}
:= payloadFormat(.header[0])
:= binary.BigEndian.Uint32(.header[1:])
if int64() > int64(maxInt) {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", , maxInt)
}
if int() > {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", , )
}
, := .r.Read(int())
if != nil {
if == io.EOF {
= io.ErrUnexpectedEOF
}
return 0, nil,
}
return , , nil
}
func ( baseCodec, any) (mem.BufferSlice, error) {
if == nil {
return nil, nil
}
, := .Marshal()
if != nil {
return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", .Error())
}
if := uint(.Len()); > math.MaxUint32 {
.Free()
return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", )
}
return , nil
}
func ( mem.BufferSlice, Compressor, encoding.Compressor, mem.BufferPool) (mem.BufferSlice, payloadFormat, error) {
if ( == nil && == nil) || .Len() == 0 {
return nil, compressionNone, nil
}
var mem.BufferSlice
:= mem.NewWriter(&, )
:= func( error) error {
.Free()
return status.Errorf(codes.Internal, "grpc: error while compressing: %v", .Error())
}
if != nil {
, := .Compress()
if != nil {
return nil, 0, ()
}
for , := range {
if , := .Write(.ReadOnlyData()); != nil {
return nil, 0, ()
}
}
if := .Close(); != nil {
return nil, 0, ()
}
} else {
:= .MaterializeToBuffer()
defer .Free()
if := .Do(, .ReadOnlyData()); != nil {
return nil, 0, ()
}
}
return , compressionMade, nil
}
const (
payloadLen = 1
sizeLen = 4
headerLen = payloadLen + sizeLen
)
func (, mem.BufferSlice, payloadFormat) ( []byte, mem.BufferSlice) {
= make([]byte, headerLen)
[0] = byte()
var uint32
if .isCompressed() {
= uint32(.Len())
=
} else {
= uint32(.Len())
=
}
binary.BigEndian.PutUint32([payloadLen:], )
return ,
}
func ( bool, any, , int, time.Time) *stats.OutPayload {
return &stats.OutPayload{
Client: ,
Payload: ,
Length: ,
WireLength: + headerLen,
CompressedLength: ,
SentTime: ,
}
}
func ( payloadFormat, string, bool, bool) *status.Status {
switch {
case compressionNone:
case compressionMade:
if == "" || == encoding.Identity {
return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
}
if ! {
if {
return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
}
return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", )
}
default:
return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", )
}
return nil
}
type payloadInfo struct {
compressedLength int
uncompressedBytes mem.BufferSlice
}
func ( *payloadInfo) () {
if != nil && .uncompressedBytes != nil {
.uncompressedBytes.Free()
}
}
func ( *parser, recvCompressor, Decompressor, int, *payloadInfo, encoding.Compressor, bool,
) ( mem.BufferSlice, error) {
, , := .recvMsg()
if != nil {
return nil,
}
:= .Len()
if := checkRecvPayload(, .RecvCompress(), != nil || != nil, ); != nil {
.Free()
return nil, .Err()
}
if .isCompressed() {
defer .Free()
, = decompress(, , , , .bufferPool)
if != nil {
return nil,
}
} else {
=
}
if != nil {
.compressedLength =
.Ref()
.uncompressedBytes =
}
return , nil
}
func ( encoding.Compressor, mem.BufferSlice, Decompressor, int, mem.BufferPool) (mem.BufferSlice, error) {
if != nil {
, := .Do(.Reader())
if != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", )
}
if len() > {
return nil, status.Errorf(codes.ResourceExhausted, "grpc: message after decompression larger than max (%d vs. %d)", len(), )
}
return mem.BufferSlice{mem.SliceBuffer()}, nil
}
if != nil {
, := .Decompress(.Reader())
if != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the message: %v", )
}
if := int64(); < math.MaxInt64 {
= io.LimitReader(, +1)
}
, := mem.ReadAll(, )
if != nil {
.Free()
return nil, status.Errorf(codes.Internal, "grpc: failed to read decompressed data: %v", )
}
if .Len() > {
.Free()
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max %d", )
}
return , nil
}
return nil, status.Errorf(codes.Internal, "grpc: no decompressor available for compressed payload")
}
type recvCompressor interface {
RecvCompress() string
}
func ( *parser, baseCodec, recvCompressor, Decompressor, any, int, *payloadInfo, encoding.Compressor, bool) error {
, := recvAndDecompress(, , , , , , )
if != nil {
return
}
defer .Free()
if := .Unmarshal(, ); != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", )
}
return nil
}
type rpcInfo struct {
failfast bool
preloaderInfo *compressorInfo
}
type compressorInfo struct {
codec baseCodec
cp Compressor
comp encoding.Compressor
}
type rpcInfoContextKey struct{}
func ( context.Context, bool, baseCodec, Compressor, encoding.Compressor) context.Context {
return context.WithValue(, rpcInfoContextKey{}, &rpcInfo{
failfast: ,
preloaderInfo: &compressorInfo{
codec: ,
cp: ,
comp: ,
},
})
}
func ( context.Context) ( *rpcInfo, bool) {
, = .Value(rpcInfoContextKey{}).(*rpcInfo)
return
}
func ( error) codes.Code {
return status.Code()
}
func ( error) string {
return status.Convert().Message()
}
func ( codes.Code, string, ...any) error {
return status.Errorf(, , ...)
}
var errContextCanceled = status.Error(codes.Canceled, context.Canceled.Error())
var errContextDeadline = status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
func ( error) error {
switch {
case nil, io.EOF:
return
case context.DeadlineExceeded:
return errContextDeadline
case context.Canceled:
return errContextCanceled
case io.ErrUnexpectedEOF:
return status.Error(codes.Internal, .Error())
}
switch e := .(type) {
case transport.ConnectionError:
return status.Error(codes.Unavailable, .Desc)
case *transport.NewStreamError:
return (.Err)
}
if , := status.FromError(); {
return
}
return status.Error(codes.Unknown, .Error())
}
func ( *callInfo) error {
if .codec != nil {
if .contentSubtype == "" {
if , := .codec.(encoding.CodecV2); {
.contentSubtype = strings.ToLower(.Name())
}
}
return nil
}
if .contentSubtype == "" {
.codec = getCodec(proto.Name)
return nil
}
.codec = getCodec(.contentSubtype)
if .codec == nil {
return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", .contentSubtype)
}
return nil
}
const (
SupportPackageIsVersion3 = true
SupportPackageIsVersion4 = true
SupportPackageIsVersion5 = true
SupportPackageIsVersion6 = true
SupportPackageIsVersion7 = true
SupportPackageIsVersion8 = true
SupportPackageIsVersion9 = true
)
const grpcUA = "grpc-go/" + Version