package s3
import (
smithy
smithysync
smithyhttp
)
type SelectObjectContentEventStreamReader interface {
Events() <-chan types.SelectObjectContentEventStream
Close() error
Err() error
}
type selectObjectContentEventStreamReader struct {
stream chan types.SelectObjectContentEventStream
decoder *eventstream.Decoder
eventStream io.ReadCloser
err *smithysync.OnceErr
payloadBuf []byte
done chan struct{}
closeOnce sync.Once
}
func ( io.ReadCloser, *eventstream.Decoder) *selectObjectContentEventStreamReader {
:= &selectObjectContentEventStreamReader{
stream: make(chan types.SelectObjectContentEventStream),
decoder: ,
eventStream: ,
err: smithysync.NewOnceErr(),
done: make(chan struct{}),
payloadBuf: make([]byte, 10*1024),
}
go .readEventStream()
return
}
func ( *selectObjectContentEventStreamReader) () <-chan types.SelectObjectContentEventStream {
return .stream
}
func ( *selectObjectContentEventStreamReader) () {
defer .Close()
defer close(.stream)
for {
.payloadBuf = .payloadBuf[0:0]
, := .decoder.Decode(.eventStream, .payloadBuf)
if != nil {
if == io.EOF {
return
}
select {
case <-.done:
return
default:
.err.SetError()
return
}
}
, := .deserializeEventMessage(&)
if != nil {
.err.SetError()
return
}
select {
case .stream <- :
case <-.done:
return
}
}
}
func ( *selectObjectContentEventStreamReader) ( *eventstream.Message) (types.SelectObjectContentEventStream, error) {
:= .Headers.Get(eventstreamapi.MessageTypeHeader)
if == nil {
return nil, fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader)
}
switch .String() {
case eventstreamapi.EventMessageType:
var types.SelectObjectContentEventStream
if := awsRestxml_deserializeEventStreamSelectObjectContentEventStream(&, ); != nil {
return nil,
}
return , nil
case eventstreamapi.ExceptionMessageType:
return nil, awsRestxml_deserializeEventStreamExceptionSelectObjectContentEventStream()
case eventstreamapi.ErrorMessageType:
:= "UnknownError"
:=
if := .Headers.Get(eventstreamapi.ErrorCodeHeader); != nil {
= .String()
}
if := .Headers.Get(eventstreamapi.ErrorMessageHeader); != nil {
= .String()
}
return nil, &smithy.GenericAPIError{
Code: ,
Message: ,
}
default:
:= .Clone()
return nil, &UnknownEventMessageError{
Type: .String(),
Message: &,
}
}
}
func ( *selectObjectContentEventStreamReader) () <-chan struct{} {
return .err.ErrorSet()
}
func ( *selectObjectContentEventStreamReader) () error {
.closeOnce.Do(.safeClose)
return .Err()
}
func ( *selectObjectContentEventStreamReader) () {
close(.done)
.eventStream.Close()
}
func ( *selectObjectContentEventStreamReader) () error {
return .err.Err()
}
func ( *selectObjectContentEventStreamReader) () <-chan struct{} {
return .done
}
type awsRestxml_deserializeOpEventStreamSelectObjectContent struct {
LogEventStreamWrites bool
LogEventStreamReads bool
}
func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) () string {
return "OperationEventStreamDeserializer"
}
func ( *awsRestxml_deserializeOpEventStreamSelectObjectContent) ( context.Context, middleware.DeserializeInput, middleware.DeserializeHandler) (
middleware.DeserializeOutput, middleware.Metadata, error,
) {
defer func() {
if == nil {
return
}
.closeResponseBody()
}()
:= middleware.GetLogger()
, := .Request.(*smithyhttp.Request)
if ! {
return , , fmt.Errorf("unknown transport type: %T", .Request)
}
_ =
, , = .HandleDeserialize(, )
if != nil {
return , ,
}
, := .RawResponse.(*smithyhttp.Response)
if ! {
return , , fmt.Errorf("unknown transport type: %T", .RawResponse)
}
_ =
, := .Result.(*SelectObjectContentOutput)
if .Result != nil && ! {
return , , fmt.Errorf("unexpected output result type: %T", .Result)
} else if .Result == nil {
= &SelectObjectContentOutput{}
.Result =
}
:= newSelectObjectContentEventStreamReader(
.Body,
eventstream.NewDecoder(func( *eventstream.DecoderOptions) {
.Logger =
.LogMessages = .LogEventStreamReads
}),
)
defer func() {
if == nil {
return
}
_ = .Close()
}()
.eventStream = NewSelectObjectContentEventStream(func( *SelectObjectContentEventStream) {
.Reader =
})
go .eventStream.waitStreamClose()
return , , nil
}
func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) ( middleware.DeserializeOutput) {
if , := .RawResponse.(*smithyhttp.Response); && != nil && .Body != nil {
_, _ = io.Copy(ioutil.Discard, .Body)
_ = .Body.Close()
}
}
func ( *middleware.Stack, Options) error {
if := .Deserialize.Insert(&awsRestxml_deserializeOpEventStreamSelectObjectContent{
LogEventStreamWrites: .ClientLogMode.IsRequestEventMessage(),
LogEventStreamReads: .ClientLogMode.IsResponseEventMessage(),
}, "OperationDeserializer", middleware.Before); != nil {
return
}
return nil
}
type UnknownEventMessageError struct {
Type string
Message *eventstream.Message
}
func ( *UnknownEventMessageError) () string {
return "unknown event stream message type, " + .Type
}
func ( *Options, string) {
switch {
case "SelectObjectContent":
toggleEventStreamClientLogMode(, false, true)
return
default:
return
}
}
func ( *Options, , bool) {
:= .ClientLogMode
if && .IsRequestWithBody() {
.ClearRequestWithBody()
|= aws.LogRequest
}
if && .IsResponseWithBody() {
.ClearResponseWithBody()
|= aws.LogResponse
}
.ClientLogMode =
}