// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package timeseries implements a time series structure for stats collection.
package timeseries // import "golang.org/x/net/internal/timeseries" import ( ) const ( timeSeriesNumBuckets = 64 minuteHourSeriesNumBuckets = 60 ) var timeSeriesResolutions = []time.Duration{ 1 * time.Second, 10 * time.Second, 1 * time.Minute, 10 * time.Minute, 1 * time.Hour, 6 * time.Hour, 24 * time.Hour, // 1 day 7 * 24 * time.Hour, // 1 week 4 * 7 * 24 * time.Hour, // 4 weeks 16 * 7 * 24 * time.Hour, // 16 weeks } var minuteHourSeriesResolutions = []time.Duration{ 1 * time.Second, 1 * time.Minute, } // An Observable is a kind of data that can be aggregated in a time series. type Observable interface { Multiply(ratio float64) // Multiplies the data in self by a given ratio Add(other Observable) // Adds the data from a different observation to self Clear() // Clears the observation so it can be reused. CopyFrom(other Observable) // Copies the contents of a given observation to self } // Float attaches the methods of Observable to a float64. type Float float64 // NewFloat returns a Float. func () Observable { := Float(0) return & } // String returns the float as a string. func ( *Float) () string { return fmt.Sprintf("%g", .Value()) } // Value returns the float's value. func ( *Float) () float64 { return float64(*) } func ( *Float) ( float64) { * *= Float() } func ( *Float) ( Observable) { := .(*Float) * += * } func ( *Float) () { * = 0 } func ( *Float) ( Observable) { := .(*Float) * = * } // A Clock tells the current time. type Clock interface { Time() time.Time } type defaultClock int var defaultClockInstance defaultClock func (defaultClock) () time.Time { return time.Now() } // Information kept per level. Each level consists of a circular list of // observations. The start of the level may be derived from end and the // len(buckets) * sizeInMillis. type tsLevel struct { oldest int // index to oldest bucketed Observable newest int // index to newest bucketed Observable end time.Time // end timestamp for this level size time.Duration // duration of the bucketed Observable buckets []Observable // collections of observations provider func() Observable // used for creating new Observable } func ( *tsLevel) () { .oldest = 0 .newest = len(.buckets) - 1 .end = time.Time{} for := range .buckets { if .buckets[] != nil { .buckets[].Clear() .buckets[] = nil } } } func ( *tsLevel) ( time.Duration, int, func() Observable) { .size = .provider = .buckets = make([]Observable, ) } // Keeps a sequence of levels. Each level is responsible for storing data at // a given resolution. For example, the first level stores data at a one // minute resolution while the second level stores data at a one hour // resolution. // Each level is represented by a sequence of buckets. Each bucket spans an // interval equal to the resolution of the level. New observations are added // to the last bucket. type timeSeries struct { provider func() Observable // make more Observable numBuckets int // number of buckets in each level levels []*tsLevel // levels of bucketed Observable lastAdd time.Time // time of last Observable tracked total Observable // convenient aggregation of all Observable clock Clock // Clock for getting current time pending Observable // observations not yet bucketed pendingTime time.Time // what time are we keeping in pending dirty bool // if there are pending observations } // init initializes a level according to the supplied criteria. func ( *timeSeries) ( []time.Duration, func() Observable, int, Clock) { .provider = .numBuckets = .clock = .levels = make([]*tsLevel, len()) for := range { if > 0 && [-1] >= [] { log.Print("timeseries: resolutions must be monotonically increasing") break } := new(tsLevel) .InitLevel([], .numBuckets, .provider) .levels[] = } .Clear() } // Clear removes all observations from the time series. func ( *timeSeries) () { .lastAdd = time.Time{} .total = .resetObservation(.total) .pending = .resetObservation(.pending) .pendingTime = time.Time{} .dirty = false for := range .levels { .levels[].Clear() } } // Add records an observation at the current time. func ( *timeSeries) ( Observable) { .AddWithTime(, .clock.Time()) } // AddWithTime records an observation at the specified time. func ( *timeSeries) ( Observable, time.Time) { := .levels[0].size if .After(.lastAdd) { .lastAdd = } if .After(.pendingTime) { .advance() .mergePendingUpdates() .pendingTime = .levels[0].end .pending.CopyFrom() .dirty = true } else if .After(.pendingTime.Add(-1 * )) { // The observation is close enough to go into the pending bucket. // This compensates for clock skewing and small scheduling delays // by letting the update stay in the fast path. .pending.Add() .dirty = true } else { .mergeValue(, ) } } // mergeValue inserts the observation at the specified time in the past into all levels. func ( *timeSeries) ( Observable, time.Time) { for , := range .levels { := (.numBuckets - 1) - int(.end.Sub()/.size) if 0 <= && < .numBuckets { := (.oldest + ) % .numBuckets if .buckets[] == nil { .buckets[] = .provider() } .buckets[].Add() } } .total.Add() } // mergePendingUpdates applies the pending updates into all levels. func ( *timeSeries) () { if .dirty { .mergeValue(.pending, .pendingTime) .pending = .resetObservation(.pending) .dirty = false } } // advance cycles the buckets at each level until the latest bucket in // each level can hold the time specified. func ( *timeSeries) ( time.Time) { if !.After(.levels[0].end) { return } for := 0; < len(.levels); ++ { := .levels[] if !.end.Before() { break } // If the time is sufficiently far, just clear the level and advance // directly. if !.Before(.end.Add(.size * time.Duration(.numBuckets))) { for , := range .buckets { .resetObservation() } .end = time.Unix(0, (.UnixNano()/.size.Nanoseconds())*.size.Nanoseconds()) } for .After(.end) { .end = .end.Add(.size) .newest = .oldest .oldest = (.oldest + 1) % .numBuckets .resetObservation(.buckets[.newest]) } = .end } } // Latest returns the sum of the num latest buckets from the level. func ( *timeSeries) (, int) Observable { := .clock.Time() if .levels[0].end.Before() { .advance() } .mergePendingUpdates() := .provider() := .levels[] := .newest for := 0; < ; ++ { if .buckets[] != nil { .Add(.buckets[]) } if == 0 { = .numBuckets } -- } return } // LatestBuckets returns a copy of the num latest buckets from level. func ( *timeSeries) (, int) []Observable { if < 0 || > len(.levels) { log.Print("timeseries: bad level argument: ", ) return nil } if < 0 || >= .numBuckets { log.Print("timeseries: bad num argument: ", ) return nil } := make([]Observable, ) := .clock.Time() if .levels[0].end.Before() { .advance() } .mergePendingUpdates() := .levels[] := .newest for := 0; < ; ++ { := .provider() [] = if .buckets[] != nil { .CopyFrom(.buckets[]) } if == 0 { = .numBuckets } -= 1 } return } // ScaleBy updates observations by scaling by factor. func ( *timeSeries) ( float64) { for , := range .levels { for := 0; < .numBuckets; ++ { .buckets[].Multiply() } } .total.Multiply() .pending.Multiply() } // Range returns the sum of observations added over the specified time range. // If start or finish times don't fall on bucket boundaries of the same // level, then return values are approximate answers. func ( *timeSeries) (, time.Time) Observable { return .ComputeRange(, , 1)[0] } // Recent returns the sum of observations from the last delta. func ( *timeSeries) ( time.Duration) Observable { := .clock.Time() return .Range(.Add(-), ) } // Total returns the total of all observations. func ( *timeSeries) () Observable { .mergePendingUpdates() return .total } // ComputeRange computes a specified number of values into a slice using // the observations recorded over the specified time period. The return // values are approximate if the start or finish times don't fall on the // bucket boundaries at the same level or if the number of buckets spanning // the range is not an integral multiple of num. func ( *timeSeries) (, time.Time, int) []Observable { if .After() { log.Printf("timeseries: start > finish, %v>%v", , ) return nil } if < 0 { log.Printf("timeseries: num < 0, %v", ) return nil } := make([]Observable, ) for , := range .levels { if !.Before(.end.Add(-.size * time.Duration(.numBuckets))) { .extract(, , , , ) return } } // Failed to find a level that covers the desired range. So just // extract from the last level, even if it doesn't cover the entire // desired range. .extract(.levels[len(.levels)-1], , , , ) return } // RecentList returns the specified number of values in slice over the most // recent time period of the specified range. func ( *timeSeries) ( time.Duration, int) []Observable { if < 0 { return nil } := .clock.Time() return .ComputeRange(.Add(-), , ) } // extract returns a slice of specified number of observations from a given // level over a given range. func ( *timeSeries) ( *tsLevel, , time.Time, int, []Observable) { .mergePendingUpdates() := .size := .Sub() / time.Duration() := := .end.Add(- * time.Duration(.numBuckets)) := 0 // Where should scanning start? if .After() { := int(.Sub() / ) += = .Add(time.Duration() * ) } // The i'th value is computed as show below. // interval = (finish/start)/num // i'th value = sum of observation in range // [ start + i * interval, // start + (i + 1) * interval ) for := 0; < ; ++ { [] = .resetObservation([]) := .Add() for < .numBuckets && .Before() { := .Add() if .After(.lastAdd) { = .lastAdd } if !.Before() { := .buckets[(+.oldest)%.numBuckets] if !.Before() && !.After() { // dst completely contains src. if != nil { [].Add() } } else { // dst partially overlaps src. := maxTime(, ) := minTime(, ) := .Sub() := .Sub().Seconds() / .Seconds() := .provider() if != nil { .CopyFrom() } .Multiply() [].Add() } if .After() { break } } ++ = .Add() } = .Add() } } // resetObservation clears the content so the struct may be reused. func ( *timeSeries) ( Observable) Observable { if == nil { = .provider() } else { .Clear() } return } // TimeSeries tracks data at granularities from 1 second to 16 weeks. type TimeSeries struct { timeSeries } // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable. func ( func() Observable) *TimeSeries { return NewTimeSeriesWithClock(, defaultClockInstance) } // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for // assigning timestamps. func ( func() Observable, Clock) *TimeSeries { := new(TimeSeries) .timeSeries.init(timeSeriesResolutions, , timeSeriesNumBuckets, ) return } // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour. type MinuteHourSeries struct { timeSeries } // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable. func ( func() Observable) *MinuteHourSeries { return NewMinuteHourSeriesWithClock(, defaultClockInstance) } // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for // assigning timestamps. func ( func() Observable, Clock) *MinuteHourSeries { := new(MinuteHourSeries) .timeSeries.init(minuteHourSeriesResolutions, , minuteHourSeriesNumBuckets, ) return } func ( *MinuteHourSeries) () Observable { return .timeSeries.Latest(0, 60) } func ( *MinuteHourSeries) () Observable { return .timeSeries.Latest(1, 60) } func (, time.Time) time.Time { if .Before() { return } return } func (, time.Time) time.Time { if .After() { return } return }