/*
 *
 * Copyright 2024 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package transport

import (
	

	
	
	
	
)

// ClientStream implements streaming functionality for a gRPC client.
type ClientStream struct {
	*Stream // Embed for common stream functionality.

	ct       *http2Client
	done     chan struct{} // closed at the end of stream to unblock writers.
	doneFunc func()        // invoked at the end of stream.

	headerChan       chan struct{} // closed to indicate the end of header metadata.
	headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
	// headerValid indicates whether a valid header was received.  Only
	// meaningful after headerChan is closed (always call waitOnHeader() before
	// reading its value).
	headerValid bool
	header      metadata.MD // the received header metadata
	noHeaders   bool        // set if the client never received headers (set only after the stream is done).

	bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
	unprocessed   atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream

	status *status.Status // the status error received from the server
}

// Read reads an n byte message from the input stream.
func ( *ClientStream) ( int) (mem.BufferSlice, error) {
	,  := .Stream.read()
	if  == nil {
		.ct.incrMsgRecv()
	}
	return , 
}

// Close closes the stream and propagates err to any readers.
func ( *ClientStream) ( error) {
	var (
		     bool
		 http2.ErrCode
	)
	if  != nil {
		 = true
		 = http2.ErrCodeCancel
	}
	.ct.closeStream(, , , , status.Convert(), nil, false)
}

// Write writes the hdr and data bytes to the output stream.
func ( *ClientStream) ( []byte,  mem.BufferSlice,  *WriteOptions) error {
	return .ct.write(, , , )
}

// BytesReceived indicates whether any bytes have been received on this stream.
func ( *ClientStream) () bool {
	return .bytesReceived.Load()
}

// Unprocessed indicates whether the server did not process this stream --
// i.e. it sent a refused stream or GOAWAY including this stream ID.
func ( *ClientStream) () bool {
	return .unprocessed.Load()
}

func ( *ClientStream) () {
	select {
	case <-.ctx.Done():
		// Close the stream to prevent headers/trailers from changing after
		// this function returns.
		.Close(ContextErr(.ctx.Err()))
		// headerChan could possibly not be closed yet if closeStream raced
		// with operateHeaders; wait until it is closed explicitly here.
		<-.headerChan
	case <-.headerChan:
	}
}

// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func ( *ClientStream) () string {
	.waitOnHeader()
	return .recvCompress
}

// Done returns a channel which is closed when it receives the final status
// from the server.
func ( *ClientStream) () <-chan struct{} {
	return .done
}

// Header returns the header metadata of the stream. Acquires the key-value
// pairs of header metadata once it is available. It blocks until i) the
// metadata is ready or ii) there is no header metadata or iii) the stream is
// canceled/expired.
func ( *ClientStream) () (metadata.MD, error) {
	.waitOnHeader()

	if !.headerValid || .noHeaders {
		return nil, .status.Err()
	}

	return .header.Copy(), nil
}

// TrailersOnly blocks until a header or trailers-only frame is received and
// then returns true if the stream was trailers-only.  If the stream ends
// before headers are received, returns true, nil.
func ( *ClientStream) () bool {
	.waitOnHeader()
	return .noHeaders
}

// Status returns the status received from the server.
// Status can be read safely only after the stream has ended,
// that is, after Done() is closed.
func ( *ClientStream) () *status.Status {
	return .status
}