/* * * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */package transportimport ()// writeQuota is a soft limit on the amount of data a stream can// schedule before some of it is written out.typewriteQuotastruct {quotaint32// get waits on read from when quota goes less than or equal to zero. // replenish writes on it when quota goes positive again.chchanstruct{}// done is triggered in error case.done <-chanstruct{}// replenish is called by loopyWriter to give quota back to. // It is implemented as a field so that it can be updated // by tests.replenishfunc(n int)}func ( int32, <-chanstruct{}) *writeQuota { := &writeQuota{quota: ,ch: make(chanstruct{}, 1),done: , } .replenish = .realReplenishreturn}func ( *writeQuota) ( int32) error {for {ifatomic.LoadInt32(&.quota) > 0 {atomic.AddInt32(&.quota, -)returnnil }select {case<-.ch:continuecase<-.done:returnerrStreamDone } }}func ( *writeQuota) ( int) { := int32() := atomic.AddInt32(&.quota, ) := - if <= 0 && > 0 {select {case .ch<-struct{}{}:default: } }}typetrInFlowstruct {limituint32unackeduint32effectiveWindowSizeuint32}func ( *trInFlow) ( uint32) uint32 { := - .limit .limit = .updateEffectiveWindowSize()return}func ( *trInFlow) ( uint32) uint32 { .unacked += if .unacked < .limit/4 { .updateEffectiveWindowSize()return0 }return .reset()}func ( *trInFlow) () uint32 { := .unacked .unacked = 0 .updateEffectiveWindowSize()return}func ( *trInFlow) () {atomic.StoreUint32(&.effectiveWindowSize, .limit-.unacked)}func ( *trInFlow) () uint32 {returnatomic.LoadUint32(&.effectiveWindowSize)}// TODO(mmukhi): Simplify this code.// inFlow deals with inbound flow controltypeinFlowstruct {musync.Mutex// The inbound flow control limit for pending data.limituint32// pendingData is the overall data which have been received but not been // consumed by applications.pendingDatauint32// The amount of data the application has consumed but grpc has not sent // window update for them. Used to reduce window update frequency.pendingUpdateuint32// delta is the extra window update given by receiver when an application // is reading data bigger in size than the inFlow limit.deltauint32}// newLimit updates the inflow window to a new value n.// It assumes that n is always greater than the old limit.func ( *inFlow) ( uint32) { .mu.Lock() .limit = .mu.Unlock()}func ( *inFlow) ( uint32) uint32 {if > uint32(math.MaxInt32) { = uint32(math.MaxInt32) } .mu.Lock()defer .mu.Unlock()// estSenderQuota is the receiver's view of the maximum number of bytes the sender // can send without a window update. := int32(.limit - (.pendingData + .pendingUpdate))// estUntransmittedData is the maximum number of bytes the sends might not have put // on the wire yet. A value of 0 or less means that we have already received all or // more bytes than the application is requesting to read. := int32( - .pendingData) // Casting into int32 since it could be negative.// This implies that unless we send a window update, the sender won't be able to send all the bytes // for this message. Therefore we must send an update over the limit since there's an active read // request from the application.if > {// Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.if .limit+ > maxWindowSize { .delta = maxWindowSize - .limit } else {// Send a window update for the whole message and not just the difference between // estUntransmittedData and estSenderQuota. This will be helpful in case the message // is padded; We will fallback on the current available window(at least a 1/4th of the limit). .delta = }return .delta }return0}// onData is invoked when some data frame is received. It updates pendingData.func ( *inFlow) ( uint32) error { .mu.Lock() .pendingData += if .pendingData+.pendingUpdate > .limit+.delta { := .limit := .pendingData + .pendingUpdate .mu.Unlock()returnfmt.Errorf("received %d-bytes data exceeding the limit %d bytes", , ) } .mu.Unlock()returnnil}// onRead is invoked when the application reads the data. It returns the window size// to be sent to the peer.func ( *inFlow) ( uint32) uint32 { .mu.Lock()if .pendingData == 0 { .mu.Unlock()return0 } .pendingData -= if > .delta { -= .delta .delta = 0 } else { .delta -= = 0 } .pendingUpdate += if .pendingUpdate >= .limit/4 { := .pendingUpdate .pendingUpdate = 0 .mu.Unlock()return } .mu.Unlock()return0}
The pages are generated with Goldsv0.7.6. (GOOS=linux GOARCH=amd64)