package transport
import (
)
func ( http.ResponseWriter, *http.Request, []stats.Handler) (ServerTransport, error) {
if .ProtoMajor != 2 {
:= "gRPC requires HTTP/2"
http.Error(, , http.StatusBadRequest)
return nil, errors.New()
}
if .Method != "POST" {
:= fmt.Sprintf("invalid gRPC request method %q", .Method)
http.Error(, , http.StatusBadRequest)
return nil, errors.New()
}
:= .Header.Get("Content-Type")
, := grpcutil.ContentSubtype()
if ! {
:= fmt.Sprintf("invalid gRPC request content-type %q", )
http.Error(, , http.StatusUnsupportedMediaType)
return nil, errors.New()
}
if , := .(http.Flusher); ! {
:= "gRPC requires a ResponseWriter supporting http.Flusher"
http.Error(, , http.StatusInternalServerError)
return nil, errors.New()
}
:= &serverHandlerTransport{
rw: ,
req: ,
closedCh: make(chan struct{}),
writes: make(chan func()),
contentType: ,
contentSubtype: ,
stats: ,
}
if := .Header.Get("grpc-timeout"); != "" {
, := decodeTimeout()
if != nil {
:= fmt.Sprintf("malformed grpc-timeout: %v", )
http.Error(, , http.StatusBadRequest)
return nil, status.Error(codes.Internal, )
}
.timeoutSet = true
.timeout =
}
:= []string{"content-type", }
if .Host != "" {
= append(, ":authority", .Host)
}
for , := range .Header {
= strings.ToLower()
if isReservedHeader() && !isWhitelistedHeader() {
continue
}
for , := range {
, := decodeMetadataHeader(, )
if != nil {
:= fmt.Sprintf("malformed binary metadata %q in header %q: %v", , , )
http.Error(, , http.StatusBadRequest)
return nil, status.Error(codes.Internal, )
}
= append(, , )
}
}
.headerMD = metadata.Pairs(...)
return , nil
}
type serverHandlerTransport struct {
rw http.ResponseWriter
req *http.Request
timeoutSet bool
timeout time.Duration
headerMD metadata.MD
closeOnce sync.Once
closedCh chan struct{}
writes chan func()
writeStatusMu sync.Mutex
contentType string
contentSubtype string
stats []stats.Handler
}
func ( *serverHandlerTransport) ( error) {
.closeOnce.Do(func() {
if logger.V(logLevel) {
logger.Infof("Closing serverHandlerTransport: %v", )
}
close(.closedCh)
})
}
func ( *serverHandlerTransport) () net.Addr { return strAddr(.req.RemoteAddr) }
type strAddr string
func ( strAddr) () string {
if != "" {
return "tcp"
}
return ""
}
func ( strAddr) () string { return string() }
func ( *serverHandlerTransport) ( func()) error {
select {
case <-.closedCh:
return ErrConnClosing
case .writes <- :
return nil
}
}
func ( *serverHandlerTransport) ( *Stream, *status.Status) error {
.writeStatusMu.Lock()
defer .writeStatusMu.Unlock()
:= .updateHeaderSent()
:= .do(func() {
if ! {
.writePendingHeaders()
}
.rw.(http.Flusher).Flush()
:= .rw.Header()
.Set("Grpc-Status", fmt.Sprintf("%d", .Code()))
if := .Message(); != "" {
.Set("Grpc-Message", encodeGrpcMessage())
}
if := .Proto(); != nil && len(.Details) > 0 {
, := proto.Marshal()
if != nil {
panic()
}
.Set("Grpc-Status-Details-Bin", encodeBinHeader())
}
if := .Trailer(); len() > 0 {
for , := range {
if isReservedHeader() {
continue
}
for , := range {
.Add(http2.TrailerPrefix+, encodeMetadataHeader(, ))
}
}
}
})
if == nil {
for , := range .stats {
.HandleRPC(.Context(), &stats.OutTrailer{
Trailer: .trailer.Copy(),
})
}
}
.Close(errors.New("finished writing status"))
return
}
func ( *serverHandlerTransport) ( *Stream) {
.writeCommonHeaders()
.writeCustomHeaders()
}
func ( *serverHandlerTransport) ( *Stream) {
:= .rw.Header()
["Date"] = nil
.Set("Content-Type", .contentType)
.Add("Trailer", "Grpc-Status")
.Add("Trailer", "Grpc-Message")
.Add("Trailer", "Grpc-Status-Details-Bin")
if .sendCompress != "" {
.Set("Grpc-Encoding", .sendCompress)
}
}
func ( *serverHandlerTransport) ( *Stream) {
:= .rw.Header()
.hdrMu.Lock()
for , := range .header {
if isReservedHeader() {
continue
}
for , := range {
.Add(, encodeMetadataHeader(, ))
}
}
.hdrMu.Unlock()
}
func ( *serverHandlerTransport) ( *Stream, []byte, []byte, *Options) error {
:= .updateHeaderSent()
return .do(func() {
if ! {
.writePendingHeaders()
}
.rw.Write()
.rw.Write()
.rw.(http.Flusher).Flush()
})
}
func ( *serverHandlerTransport) ( *Stream, metadata.MD) error {
if := .SetHeader(); != nil {
return
}
:= .updateHeaderSent()
:= .do(func() {
if ! {
.writePendingHeaders()
}
.rw.WriteHeader(200)
.rw.(http.Flusher).Flush()
})
if == nil {
for , := range .stats {
.HandleRPC(.Context(), &stats.OutHeader{
Header: .Copy(),
Compression: .sendCompress,
})
}
}
return
}
func ( *serverHandlerTransport) ( func(*Stream), func(context.Context, string) context.Context) {
:= .req.Context()
var context.CancelFunc
if .timeoutSet {
, = context.WithTimeout(, .timeout)
} else {
, = context.WithCancel()
}
:= make(chan struct{})
go func() {
select {
case <-:
case <-.closedCh:
case <-.req.Context().Done():
}
()
.Close(errors.New("request is done processing"))
}()
:= .req
:= &Stream{
id: 0,
requestRead: func(int) {},
cancel: ,
buf: newRecvBuffer(),
st: ,
method: .URL.Path,
recvCompress: .Header.Get("grpc-encoding"),
contentSubtype: .contentSubtype,
}
:= &peer.Peer{
Addr: .RemoteAddr(),
}
if .TLS != nil {
.AuthInfo = credentials.TLSInfo{State: *.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
}
= metadata.NewIncomingContext(, .headerMD)
.ctx = peer.NewContext(, )
for , := range .stats {
.ctx = .TagRPC(.ctx, &stats.RPCTagInfo{FullMethodName: .method})
:= &stats.InHeader{
FullMethod: .method,
RemoteAddr: .RemoteAddr(),
Compression: .recvCompress,
}
.HandleRPC(.ctx, )
}
.trReader = &transportReader{
reader: &recvBufferReader{ctx: .ctx, ctxDone: .ctx.Done(), recv: .buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
}
:= make(chan struct{})
go func() {
defer close()
const = 8196
for := make([]byte, ); ; {
, := .Body.Read()
if > 0 {
.buf.put(recvMsg{buffer: bytes.NewBuffer([::])})
= [:]
}
if != nil {
.buf.put(recvMsg{err: mapRecvMsgError()})
return
}
if len() == 0 {
= make([]byte, )
}
}
}()
()
.runStream()
close()
.Body.Close()
<-
}
func ( *serverHandlerTransport) () {
for {
select {
case := <-.writes:
()
case <-.closedCh:
return
}
}
}
func ( *serverHandlerTransport) () {}
func ( *serverHandlerTransport) () {}
func ( *serverHandlerTransport) () {
panic("Drain() is not implemented")
}
func ( error) error {
if == io.EOF || == io.ErrUnexpectedEOF {
return
}
if , := .(http2.StreamError); {
if , := http2ErrConvTab[.Code]; {
return status.Error(, .Error())
}
}
if strings.Contains(.Error(), "body closed by handler") {
return status.Error(codes.Canceled, .Error())
}
return connectionErrorf(true, , .Error())
}