/*
 *
 * Copyright 2024 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package transport

import (
	
	
	
	
	

	
	
	
)

// ServerStream implements streaming functionality for a gRPC server.
type ServerStream struct {
	*Stream // Embed for common stream functionality.

	st      internalServerTransport
	ctxDone <-chan struct{} // closed at the end of stream.  Cache of ctx.Done() (for performance)
	// cancel is invoked at the end of stream to cancel ctx. It also stops the
	// timer for monitoring the rpc deadline if configured.
	cancel func()

	// Holds compressor names passed in grpc-accept-encoding metadata from the
	// client.
	clientAdvertisedCompressors string
	headerWireLength            int

	// hdrMu protects outgoing header and trailer metadata.
	hdrMu      sync.Mutex
	header     metadata.MD // the outgoing header metadata.  Updated by WriteHeader.
	headerSent atomic.Bool // atomically set when the headers are sent out.
}

// Read reads an n byte message from the input stream.
func ( *ServerStream) ( int) (mem.BufferSlice, error) {
	,  := .Stream.read()
	if  == nil {
		.st.incrMsgRecv()
	}
	return , 
}

// SendHeader sends the header metadata for the given stream.
func ( *ServerStream) ( metadata.MD) error {
	return .st.writeHeader(, )
}

// Write writes the hdr and data bytes to the output stream.
func ( *ServerStream) ( []byte,  mem.BufferSlice,  *WriteOptions) error {
	return .st.write(, , , )
}

// WriteStatus sends the status of a stream to the client.  WriteStatus is
// the final call made on a stream and always occurs.
func ( *ServerStream) ( *status.Status) error {
	return .st.writeStatus(, )
}

// isHeaderSent indicates whether headers have been sent.
func ( *ServerStream) () bool {
	return .headerSent.Load()
}

// updateHeaderSent updates headerSent and returns true
// if it was already set.
func ( *ServerStream) () bool {
	return .headerSent.Swap(true)
}

// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func ( *ServerStream) () string {
	return .recvCompress
}

// SendCompress returns the send compressor name.
func ( *ServerStream) () string {
	return .sendCompress
}

// ContentSubtype returns the content-subtype for a request. For example, a
// content-subtype of "proto" will result in a content-type of
// "application/grpc+proto". This will always be lowercase.  See
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
func ( *ServerStream) () string {
	return .contentSubtype
}

// SetSendCompress sets the compression algorithm to the stream.
func ( *ServerStream) ( string) error {
	if .isHeaderSent() || .getState() == streamDone {
		return errors.New("transport: set send compressor called after headers sent or stream done")
	}

	.sendCompress = 
	return nil
}

// SetContext sets the context of the stream. This will be deleted once the
// stats handler callouts all move to gRPC layer.
func ( *ServerStream) ( context.Context) {
	.ctx = 
}

// ClientAdvertisedCompressors returns the compressor names advertised by the
// client via grpc-accept-encoding header.
func ( *ServerStream) () []string {
	 := strings.Split(.clientAdvertisedCompressors, ",")
	for ,  := range  {
		[] = strings.TrimSpace()
	}
	return 
}

// Header returns the header metadata of the stream.  It returns the out header
// after t.WriteHeader is called.  It does not block and must not be called
// until after WriteHeader.
func ( *ServerStream) () (metadata.MD, error) {
	// Return the header in stream. It will be the out
	// header after t.WriteHeader is called.
	return .header.Copy(), nil
}

// HeaderWireLength returns the size of the headers of the stream as received
// from the wire.
func ( *ServerStream) () int {
	return .headerWireLength
}

// SetHeader sets the header metadata. This can be called multiple times.
// This should not be called in parallel to other data writes.
func ( *ServerStream) ( metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	if .isHeaderSent() || .getState() == streamDone {
		return ErrIllegalHeaderWrite
	}
	.hdrMu.Lock()
	.header = metadata.Join(.header, )
	.hdrMu.Unlock()
	return nil
}

// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can be called multiple times.
// This should not be called parallel to other data writes.
func ( *ServerStream) ( metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	if .getState() == streamDone {
		return ErrIllegalHeaderWrite
	}
	.hdrMu.Lock()
	.trailer = metadata.Join(.trailer, )
	.hdrMu.Unlock()
	return nil
}