/*
 *
 * Copyright 2016 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

// This file is the implementation of a gRPC server using HTTP/2 which
// uses the standard Go http2 Server implementation (via the
// http.Handler interface), rather than speaking low-level HTTP/2
// frames itself. It is the implementation of *grpc.Server.ServeHTTP.

package transport

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
)

// NewServerHandlerTransport returns a ServerTransport handling gRPC from
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
func ( http.ResponseWriter,  *http.Request,  []stats.Handler) (ServerTransport, error) {
	if .ProtoMajor != 2 {
		 := "gRPC requires HTTP/2"
		http.Error(, , http.StatusBadRequest)
		return nil, errors.New()
	}
	if .Method != "POST" {
		 := fmt.Sprintf("invalid gRPC request method %q", .Method)
		http.Error(, , http.StatusBadRequest)
		return nil, errors.New()
	}
	 := .Header.Get("Content-Type")
	// TODO: do we assume contentType is lowercase? we did before
	,  := grpcutil.ContentSubtype()
	if ! {
		 := fmt.Sprintf("invalid gRPC request content-type %q", )
		http.Error(, , http.StatusUnsupportedMediaType)
		return nil, errors.New()
	}
	if ,  := .(http.Flusher); ! {
		 := "gRPC requires a ResponseWriter supporting http.Flusher"
		http.Error(, , http.StatusInternalServerError)
		return nil, errors.New()
	}

	 := &serverHandlerTransport{
		rw:             ,
		req:            ,
		closedCh:       make(chan struct{}),
		writes:         make(chan func()),
		contentType:    ,
		contentSubtype: ,
		stats:          ,
	}

	if  := .Header.Get("grpc-timeout");  != "" {
		,  := decodeTimeout()
		if  != nil {
			 := fmt.Sprintf("malformed grpc-timeout: %v", )
			http.Error(, , http.StatusBadRequest)
			return nil, status.Error(codes.Internal, )
		}
		.timeoutSet = true
		.timeout = 
	}

	 := []string{"content-type", }
	if .Host != "" {
		 = append(, ":authority", .Host)
	}
	for ,  := range .Header {
		 = strings.ToLower()
		if isReservedHeader() && !isWhitelistedHeader() {
			continue
		}
		for ,  := range  {
			,  := decodeMetadataHeader(, )
			if  != nil {
				 := fmt.Sprintf("malformed binary metadata %q in header %q: %v", , , )
				http.Error(, , http.StatusBadRequest)
				return nil, status.Error(codes.Internal, )
			}
			 = append(, , )
		}
	}
	.headerMD = metadata.Pairs(...)

	return , nil
}

// serverHandlerTransport is an implementation of ServerTransport
// which replies to exactly one gRPC request (exactly one HTTP request),
// using the net/http.Handler interface. This http.Handler is guaranteed
// at this point to be speaking over HTTP/2, so it's able to speak valid
// gRPC.
type serverHandlerTransport struct {
	rw         http.ResponseWriter
	req        *http.Request
	timeoutSet bool
	timeout    time.Duration

	headerMD metadata.MD

	closeOnce sync.Once
	closedCh  chan struct{} // closed on Close

	// writes is a channel of code to run serialized in the
	// ServeHTTP (HandleStreams) goroutine. The channel is closed
	// when WriteStatus is called.
	writes chan func()

	// block concurrent WriteStatus calls
	// e.g. grpc/(*serverStream).SendMsg/RecvMsg
	writeStatusMu sync.Mutex

	// we just mirror the request content-type
	contentType string
	// we store both contentType and contentSubtype so we don't keep recreating them
	// TODO make sure this is consistent across handler_server and http2_server
	contentSubtype string

	stats []stats.Handler
}

func ( *serverHandlerTransport) ( error) {
	.closeOnce.Do(func() {
		if logger.V(logLevel) {
			logger.Infof("Closing serverHandlerTransport: %v", )
		}
		close(.closedCh)
	})
}

func ( *serverHandlerTransport) () net.Addr { return strAddr(.req.RemoteAddr) }

// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
// the empty string if unknown.
type strAddr string

func ( strAddr) () string {
	if  != "" {
		// Per the documentation on net/http.Request.RemoteAddr, if this is
		// set, it's set to the IP:port of the peer (hence, TCP):
		// https://golang.org/pkg/net/http/#Request
		//
		// If we want to support Unix sockets later, we can
		// add our own grpc-specific convention within the
		// grpc codebase to set RemoteAddr to a different
		// format, or probably better: we can attach it to the
		// context and use that from serverHandlerTransport.RemoteAddr.
		return "tcp"
	}
	return ""
}

func ( strAddr) () string { return string() }

// do runs fn in the ServeHTTP goroutine.
func ( *serverHandlerTransport) ( func()) error {
	select {
	case <-.closedCh:
		return ErrConnClosing
	case .writes <- :
		return nil
	}
}

func ( *serverHandlerTransport) ( *Stream,  *status.Status) error {
	.writeStatusMu.Lock()
	defer .writeStatusMu.Unlock()

	 := .updateHeaderSent()
	 := .do(func() {
		if ! {
			.writePendingHeaders()
		}

		// And flush, in case no header or body has been sent yet.
		// This forces a separation of headers and trailers if this is the
		// first call (for example, in end2end tests's TestNoService).
		.rw.(http.Flusher).Flush()

		 := .rw.Header()
		.Set("Grpc-Status", fmt.Sprintf("%d", .Code()))
		if  := .Message();  != "" {
			.Set("Grpc-Message", encodeGrpcMessage())
		}

		if  := .Proto();  != nil && len(.Details) > 0 {
			,  := proto.Marshal()
			if  != nil {
				// TODO: return error instead, when callers are able to handle it.
				panic()
			}

			.Set("Grpc-Status-Details-Bin", encodeBinHeader())
		}

		if  := .Trailer(); len() > 0 {
			for ,  := range  {
				// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
				if isReservedHeader() {
					continue
				}
				for ,  := range  {
					// http2 ResponseWriter mechanism to send undeclared Trailers after
					// the headers have possibly been written.
					.Add(http2.TrailerPrefix+, encodeMetadataHeader(, ))
				}
			}
		}
	})

	if  == nil { // transport has not been closed
		// Note: The trailer fields are compressed with hpack after this call returns.
		// No WireLength field is set here.
		for ,  := range .stats {
			.HandleRPC(.Context(), &stats.OutTrailer{
				Trailer: .trailer.Copy(),
			})
		}
	}
	.Close(errors.New("finished writing status"))
	return 
}

// writePendingHeaders sets common and custom headers on the first
// write call (Write, WriteHeader, or WriteStatus)
func ( *serverHandlerTransport) ( *Stream) {
	.writeCommonHeaders()
	.writeCustomHeaders()
}

// writeCommonHeaders sets common headers on the first write
// call (Write, WriteHeader, or WriteStatus).
func ( *serverHandlerTransport) ( *Stream) {
	 := .rw.Header()
	["Date"] = nil // suppress Date to make tests happy; TODO: restore
	.Set("Content-Type", .contentType)

	// Predeclare trailers we'll set later in WriteStatus (after the body).
	// This is a SHOULD in the HTTP RFC, and the way you add (known)
	// Trailers per the net/http.ResponseWriter contract.
	// See https://golang.org/pkg/net/http/#ResponseWriter
	// and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
	.Add("Trailer", "Grpc-Status")
	.Add("Trailer", "Grpc-Message")
	.Add("Trailer", "Grpc-Status-Details-Bin")

	if .sendCompress != "" {
		.Set("Grpc-Encoding", .sendCompress)
	}
}

// writeCustomHeaders sets custom headers set on the stream via SetHeader
// on the first write call (Write, WriteHeader, or WriteStatus).
func ( *serverHandlerTransport) ( *Stream) {
	 := .rw.Header()

	.hdrMu.Lock()
	for ,  := range .header {
		if isReservedHeader() {
			continue
		}
		for ,  := range  {
			.Add(, encodeMetadataHeader(, ))
		}
	}

	.hdrMu.Unlock()
}

func ( *serverHandlerTransport) ( *Stream,  []byte,  []byte,  *Options) error {
	 := .updateHeaderSent()
	return .do(func() {
		if ! {
			.writePendingHeaders()
		}
		.rw.Write()
		.rw.Write()
		.rw.(http.Flusher).Flush()
	})
}

func ( *serverHandlerTransport) ( *Stream,  metadata.MD) error {
	if  := .SetHeader();  != nil {
		return 
	}

	 := .updateHeaderSent()
	 := .do(func() {
		if ! {
			.writePendingHeaders()
		}

		.rw.WriteHeader(200)
		.rw.(http.Flusher).Flush()
	})

	if  == nil {
		for ,  := range .stats {
			// Note: The header fields are compressed with hpack after this call returns.
			// No WireLength field is set here.
			.HandleRPC(.Context(), &stats.OutHeader{
				Header:      .Copy(),
				Compression: .sendCompress,
			})
		}
	}
	return 
}

func ( *serverHandlerTransport) ( func(*Stream),  func(context.Context, string) context.Context) {
	// With this transport type there will be exactly 1 stream: this HTTP request.

	 := .req.Context()
	var  context.CancelFunc
	if .timeoutSet {
		,  = context.WithTimeout(, .timeout)
	} else {
		,  = context.WithCancel()
	}

	// requestOver is closed when the status has been written via WriteStatus.
	 := make(chan struct{})
	go func() {
		select {
		case <-:
		case <-.closedCh:
		case <-.req.Context().Done():
		}
		()
		.Close(errors.New("request is done processing"))
	}()

	 := .req

	 := &Stream{
		id:             0, // irrelevant
		requestRead:    func(int) {},
		cancel:         ,
		buf:            newRecvBuffer(),
		st:             ,
		method:         .URL.Path,
		recvCompress:   .Header.Get("grpc-encoding"),
		contentSubtype: .contentSubtype,
	}
	 := &peer.Peer{
		Addr: .RemoteAddr(),
	}
	if .TLS != nil {
		.AuthInfo = credentials.TLSInfo{State: *.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
	}
	 = metadata.NewIncomingContext(, .headerMD)
	.ctx = peer.NewContext(, )
	for ,  := range .stats {
		.ctx = .TagRPC(.ctx, &stats.RPCTagInfo{FullMethodName: .method})
		 := &stats.InHeader{
			FullMethod:  .method,
			RemoteAddr:  .RemoteAddr(),
			Compression: .recvCompress,
		}
		.HandleRPC(.ctx, )
	}
	.trReader = &transportReader{
		reader:        &recvBufferReader{ctx: .ctx, ctxDone: .ctx.Done(), recv: .buf, freeBuffer: func(*bytes.Buffer) {}},
		windowHandler: func(int) {},
	}

	// readerDone is closed when the Body.Read-ing goroutine exits.
	 := make(chan struct{})
	go func() {
		defer close()

		// TODO: minimize garbage, optimize recvBuffer code/ownership
		const  = 8196
		for  := make([]byte, ); ; {
			,  := .Body.Read()
			if  > 0 {
				.buf.put(recvMsg{buffer: bytes.NewBuffer([::])})
				 = [:]
			}
			if  != nil {
				.buf.put(recvMsg{err: mapRecvMsgError()})
				return
			}
			if len() == 0 {
				 = make([]byte, )
			}
		}
	}()

	// startStream is provided by the *grpc.Server's serveStreams.
	// It starts a goroutine serving s and exits immediately.
	// The goroutine that is started is the one that then calls
	// into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
	()

	.runStream()
	close()

	// Wait for reading goroutine to finish.
	.Body.Close()
	<-
}

func ( *serverHandlerTransport) () {
	for {
		select {
		case  := <-.writes:
			()
		case <-.closedCh:
			return
		}
	}
}

func ( *serverHandlerTransport) () {}

func ( *serverHandlerTransport) () {}

func ( *serverHandlerTransport) () {
	panic("Drain() is not implemented")
}

// mapRecvMsgError returns the non-nil err into the appropriate
// error value as expected by callers of *grpc.parser.recvMsg.
// In particular, in can only be:
//   - io.EOF
//   - io.ErrUnexpectedEOF
//   - of type transport.ConnectionError
//   - an error from the status package
func ( error) error {
	if  == io.EOF ||  == io.ErrUnexpectedEOF {
		return 
	}
	if ,  := .(http2.StreamError);  {
		if ,  := http2ErrConvTab[.Code];  {
			return status.Error(, .Error())
		}
	}
	if strings.Contains(.Error(), "body closed by handler") {
		return status.Error(codes.Canceled, .Error())
	}
	return connectionErrorf(true, , .Error())
}