package grpc
import (
)
type Compressor interface {
Do(w io.Writer, p []byte) error
Type() string
}
type gzipCompressor struct {
pool sync.Pool
}
func () Compressor {
, := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
return
}
func ( int) (Compressor, error) {
if < gzip.DefaultCompression || > gzip.BestCompression {
return nil, fmt.Errorf("grpc: invalid compression level: %d", )
}
return &gzipCompressor{
pool: sync.Pool{
New: func() interface{} {
, := gzip.NewWriterLevel(io.Discard, )
if != nil {
panic()
}
return
},
},
}, nil
}
func ( *gzipCompressor) ( io.Writer, []byte) error {
:= .pool.Get().(*gzip.Writer)
defer .pool.Put()
.Reset()
if , := .Write(); != nil {
return
}
return .Close()
}
func ( *gzipCompressor) () string {
return "gzip"
}
type Decompressor interface {
Do(r io.Reader) ([]byte, error)
Type() string
}
type gzipDecompressor struct {
pool sync.Pool
}
func () Decompressor {
return &gzipDecompressor{}
}
func ( *gzipDecompressor) ( io.Reader) ([]byte, error) {
var *gzip.Reader
switch maybeZ := .pool.Get().(type) {
case nil:
, := gzip.NewReader()
if != nil {
return nil,
}
=
case *gzip.Reader:
=
if := .Reset(); != nil {
.pool.Put()
return nil,
}
}
defer func() {
.Close()
.pool.Put()
}()
return io.ReadAll()
}
func ( *gzipDecompressor) () string {
return "gzip"
}
type callInfo struct {
compressorType string
failFast bool
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
contentSubtype string
codec baseCodec
maxRetryRPCBufferSize int
}
func () *callInfo {
return &callInfo{
failFast: true,
maxRetryRPCBufferSize: 256 * 1024,
}
}
type CallOption interface {
before(*callInfo) error
after(*callInfo, *csAttempt)
}
type EmptyCallOption struct{}
func (EmptyCallOption) (*callInfo) error { return nil }
func (EmptyCallOption) (*callInfo, *csAttempt) {}
func ( *metadata.MD) CallOption {
return HeaderCallOption{HeaderAddr: }
}
type HeaderCallOption struct {
HeaderAddr *metadata.MD
}
func ( HeaderCallOption) ( *callInfo) error { return nil }
func ( HeaderCallOption) ( *callInfo, *csAttempt) {
*.HeaderAddr, _ = .s.Header()
}
func ( *metadata.MD) CallOption {
return TrailerCallOption{TrailerAddr: }
}
type TrailerCallOption struct {
TrailerAddr *metadata.MD
}
func ( TrailerCallOption) ( *callInfo) error { return nil }
func ( TrailerCallOption) ( *callInfo, *csAttempt) {
*.TrailerAddr = .s.Trailer()
}
func ( *peer.Peer) CallOption {
return PeerCallOption{PeerAddr: }
}
type PeerCallOption struct {
PeerAddr *peer.Peer
}
func ( PeerCallOption) ( *callInfo) error { return nil }
func ( PeerCallOption) ( *callInfo, *csAttempt) {
if , := peer.FromContext(.s.Context()); {
*.PeerAddr = *
}
}
func ( bool) CallOption {
return FailFastCallOption{FailFast: !}
}
func ( bool) CallOption {
return FailFastCallOption{FailFast: }
}
type FailFastCallOption struct {
FailFast bool
}
func ( FailFastCallOption) ( *callInfo) error {
.failFast = .FailFast
return nil
}
func ( FailFastCallOption) ( *callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: }
}
type MaxRecvMsgSizeCallOption struct {
MaxRecvMsgSize int
}
func ( MaxRecvMsgSizeCallOption) ( *callInfo) error {
.maxReceiveMessageSize = &.MaxRecvMsgSize
return nil
}
func ( MaxRecvMsgSizeCallOption) ( *callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxSendMsgSizeCallOption{MaxSendMsgSize: }
}
type MaxSendMsgSizeCallOption struct {
MaxSendMsgSize int
}
func ( MaxSendMsgSizeCallOption) ( *callInfo) error {
.maxSendMessageSize = &.MaxSendMsgSize
return nil
}
func ( MaxSendMsgSizeCallOption) ( *callInfo, *csAttempt) {}
func ( credentials.PerRPCCredentials) CallOption {
return PerRPCCredsCallOption{Creds: }
}
type PerRPCCredsCallOption struct {
Creds credentials.PerRPCCredentials
}
func ( PerRPCCredsCallOption) ( *callInfo) error {
.creds = .Creds
return nil
}
func ( PerRPCCredsCallOption) ( *callInfo, *csAttempt) {}
func ( string) CallOption {
return CompressorCallOption{CompressorType: }
}
type CompressorCallOption struct {
CompressorType string
}
func ( CompressorCallOption) ( *callInfo) error {
.compressorType = .CompressorType
return nil
}
func ( CompressorCallOption) ( *callInfo, *csAttempt) {}
func ( string) CallOption {
return ContentSubtypeCallOption{ContentSubtype: strings.ToLower()}
}
type ContentSubtypeCallOption struct {
ContentSubtype string
}
func ( ContentSubtypeCallOption) ( *callInfo) error {
.contentSubtype = .ContentSubtype
return nil
}
func ( ContentSubtypeCallOption) ( *callInfo, *csAttempt) {}
func ( encoding.Codec) CallOption {
return ForceCodecCallOption{Codec: }
}
type ForceCodecCallOption struct {
Codec encoding.Codec
}
func ( ForceCodecCallOption) ( *callInfo) error {
.codec = .Codec
return nil
}
func ( ForceCodecCallOption) ( *callInfo, *csAttempt) {}
func ( Codec) CallOption {
return CustomCodecCallOption{Codec: }
}
type CustomCodecCallOption struct {
Codec Codec
}
func ( CustomCodecCallOption) ( *callInfo) error {
.codec = .Codec
return nil
}
func ( CustomCodecCallOption) ( *callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxRetryRPCBufferSizeCallOption{}
}
type MaxRetryRPCBufferSizeCallOption struct {
MaxRetryRPCBufferSize int
}
func ( MaxRetryRPCBufferSizeCallOption) ( *callInfo) error {
.maxRetryRPCBufferSize = .MaxRetryRPCBufferSize
return nil
}
func ( MaxRetryRPCBufferSizeCallOption) ( *callInfo, *csAttempt) {}
type payloadFormat uint8
const (
compressionNone payloadFormat = 0
compressionMade payloadFormat = 1
)
type parser struct {
r io.Reader
header [5]byte
}
func ( *parser) ( int) ( payloadFormat, []byte, error) {
if , := .r.Read(.header[:]); != nil {
return 0, nil,
}
= payloadFormat(.header[0])
:= binary.BigEndian.Uint32(.header[1:])
if == 0 {
return , nil, nil
}
if int64() > int64(maxInt) {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", , maxInt)
}
if int() > {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", , )
}
= make([]byte, int())
if , := .r.Read(); != nil {
if == io.EOF {
= io.ErrUnexpectedEOF
}
return 0, nil,
}
return , , nil
}
func ( baseCodec, interface{}) ([]byte, error) {
if == nil {
return nil, nil
}
, := .Marshal()
if != nil {
return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", .Error())
}
if uint(len()) > math.MaxUint32 {
return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len())
}
return , nil
}
func ( []byte, Compressor, encoding.Compressor) ([]byte, error) {
if == nil && == nil {
return nil, nil
}
:= func( error) error {
return status.Errorf(codes.Internal, "grpc: error while compressing: %v", .Error())
}
:= &bytes.Buffer{}
if != nil {
, := .Compress()
if != nil {
return nil, ()
}
if , := .Write(); != nil {
return nil, ()
}
if := .Close(); != nil {
return nil, ()
}
} else {
if := .Do(, ); != nil {
return nil, ()
}
}
return .Bytes(), nil
}
const (
payloadLen = 1
sizeLen = 4
headerLen = payloadLen + sizeLen
)
func (, []byte) ( []byte, []byte) {
= make([]byte, headerLen)
if != nil {
[0] = byte(compressionMade)
=
} else {
[0] = byte(compressionNone)
}
binary.BigEndian.PutUint32([payloadLen:], uint32(len()))
return ,
}
func ( bool, interface{}, , []byte, time.Time) *stats.OutPayload {
return &stats.OutPayload{
Client: ,
Payload: ,
Data: ,
Length: len(),
WireLength: len() + headerLen,
SentTime: ,
}
}
func ( payloadFormat, string, bool) *status.Status {
switch {
case compressionNone:
case compressionMade:
if == "" || == encoding.Identity {
return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
}
if ! {
return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
}
default:
return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", )
}
return nil
}
type payloadInfo struct {
wireLength int
uncompressedBytes []byte
}
func ( *parser, *transport.Stream, Decompressor, int, *payloadInfo, encoding.Compressor) ([]byte, error) {
, , := .recvMsg()
if != nil {
return nil,
}
if != nil {
.wireLength = len()
}
if := checkRecvPayload(, .RecvCompress(), != nil || != nil); != nil {
return nil, .Err()
}
var int
if == compressionMade {
if != nil {
, = .Do(bytes.NewReader())
= len()
} else {
, , = decompress(, , )
}
if != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", )
}
if > {
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", , )
}
}
return , nil
}
func ( encoding.Compressor, []byte, int) ([]byte, int, error) {
, := .Decompress(bytes.NewReader())
if != nil {
return nil, 0,
}
if , := .(interface {
( []byte) int
}); {
if := .(); >= 0 {
if > {
return nil, , nil
}
:= bytes.NewBuffer(make([]byte, 0, +bytes.MinRead))
, := .ReadFrom(io.LimitReader(, int64()+1))
return .Bytes(), int(),
}
}
, = io.ReadAll(io.LimitReader(, int64()+1))
return , len(),
}
func ( *parser, baseCodec, *transport.Stream, Decompressor, interface{}, int, *payloadInfo, encoding.Compressor) error {
, := recvAndDecompress(, , , , , )
if != nil {
return
}
if := .Unmarshal(, ); != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", )
}
if != nil {
.uncompressedBytes =
}
return nil
}
type rpcInfo struct {
failfast bool
preloaderInfo *compressorInfo
}
type compressorInfo struct {
codec baseCodec
cp Compressor
comp encoding.Compressor
}
type rpcInfoContextKey struct{}
func ( context.Context, bool, baseCodec, Compressor, encoding.Compressor) context.Context {
return context.WithValue(, rpcInfoContextKey{}, &rpcInfo{
failfast: ,
preloaderInfo: &compressorInfo{
codec: ,
cp: ,
comp: ,
},
})
}
func ( context.Context) ( *rpcInfo, bool) {
, = .Value(rpcInfoContextKey{}).(*rpcInfo)
return
}
func ( error) codes.Code {
return status.Code()
}
func ( error) string {
return status.Convert().Message()
}
func ( codes.Code, string, ...interface{}) error {
return status.Errorf(, , ...)
}
func ( error) error {
switch {
case nil, io.EOF:
return
case context.DeadlineExceeded:
return status.Error(codes.DeadlineExceeded, .Error())
case context.Canceled:
return status.Error(codes.Canceled, .Error())
case io.ErrUnexpectedEOF:
return status.Error(codes.Internal, .Error())
}
switch e := .(type) {
case transport.ConnectionError:
return status.Error(codes.Unavailable, .Desc)
case *transport.NewStreamError:
return (.Err)
}
if , := status.FromError(); {
return
}
return status.Error(codes.Unknown, .Error())
}
func ( *callInfo) error {
if .codec != nil {
if .contentSubtype == "" {
if , := .codec.(encoding.Codec); {
.contentSubtype = strings.ToLower(.Name())
}
}
return nil
}
if .contentSubtype == "" {
.codec = encoding.GetCodec(proto.Name)
return nil
}
.codec = encoding.GetCodec(.contentSubtype)
if .codec == nil {
return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", .contentSubtype)
}
return nil
}
type channelzData struct {
callsStarted int64
callsFailed int64
callsSucceeded int64
lastCallStartedTime int64
}
const (
SupportPackageIsVersion3 = true
SupportPackageIsVersion4 = true
SupportPackageIsVersion5 = true
SupportPackageIsVersion6 = true
SupportPackageIsVersion7 = true
)
const grpcUA = "grpc-go/" + Version