package retry
import (
)
type adaptiveRateLimit struct {
tokenBucketEnabled bool
smooth float64
beta float64
scaleConstant float64
minFillRate float64
fillRate float64
calculatedRate float64
lastRefilled time.Time
measuredTxRate float64
lastTxRateBucket float64
requestCount int64
lastMaxRate float64
lastThrottleTime time.Time
timeWindow float64
tokenBucket *adaptiveTokenBucket
mu sync.Mutex
}
func () *adaptiveRateLimit {
:= sdk.NowTime()
return &adaptiveRateLimit{
smooth: 0.8,
beta: 0.7,
scaleConstant: 0.4,
minFillRate: 0.5,
lastTxRateBucket: math.Floor(timeFloat64Seconds()),
lastThrottleTime: ,
tokenBucket: newAdaptiveTokenBucket(0),
}
}
func ( *adaptiveRateLimit) ( bool) {
.mu.Lock()
defer .mu.Unlock()
.tokenBucketEnabled =
}
func ( *adaptiveRateLimit) ( uint) (
bool, time.Duration,
) {
.mu.Lock()
defer .mu.Unlock()
if !.tokenBucketEnabled {
return true, 0
}
.tokenBucketRefill()
, := .tokenBucket.Retrieve(float64())
if ! {
:= float64Seconds((float64() - ) / .fillRate)
return false,
}
return true, 0
}
func ( *adaptiveRateLimit) ( bool) {
.mu.Lock()
defer .mu.Unlock()
.updateMeasuredRate()
if {
:= .measuredTxRate
if .tokenBucketEnabled {
= math.Min(.measuredTxRate, .fillRate)
}
.lastMaxRate =
.calculateTimeWindow()
.lastThrottleTime = sdk.NowTime()
.calculatedRate = .cubicThrottle()
.tokenBucketEnabled = true
} else {
.calculateTimeWindow()
.calculatedRate = .cubicSuccess(sdk.NowTime())
}
:= math.Min(.calculatedRate, 2*.measuredTxRate)
.tokenBucketUpdateRate()
}
func ( *adaptiveRateLimit) ( time.Time) float64 {
:= secondsFloat64(.Sub(.lastThrottleTime))
return (.scaleConstant * math.Pow(-.timeWindow, 3)) + .lastMaxRate
}
func ( *adaptiveRateLimit) ( float64) float64 {
return * .beta
}
func ( *adaptiveRateLimit) () {
.timeWindow = math.Pow((.lastMaxRate*(1.-.beta))/.scaleConstant, 1./3.)
}
func ( *adaptiveRateLimit) ( float64) {
.tokenBucketRefill()
.fillRate = math.Max(, .minFillRate)
.tokenBucket.Resize()
}
func ( *adaptiveRateLimit) () {
:= sdk.NowTime()
:= math.Floor(timeFloat64Seconds()*2.) / 2.
.requestCount++
if > .lastTxRateBucket {
:= float64(.requestCount) / ( - .lastTxRateBucket)
.measuredTxRate = ( * .smooth) + (.measuredTxRate * (1. - .smooth))
.requestCount = 0
.lastTxRateBucket =
}
}
func ( *adaptiveRateLimit) () {
:= sdk.NowTime()
if .lastRefilled.IsZero() {
.lastRefilled =
return
}
:= secondsFloat64(.Sub(.lastRefilled)) * .fillRate
.tokenBucket.Refund()
.lastRefilled =
}
func ( float64) time.Duration {
return time.Duration( * float64(time.Second))
}
func ( time.Duration) float64 {
return float64() / float64(time.Second)
}
func ( time.Time) float64 {
return float64(.UnixNano()) / float64(time.Second)
}