package metrics import ( "math" "math/rand" "sync" "time" "golang.org/x/exp/slices" ) const rescaleThreshold = time.Hour // Samples maintain a statistically-significant selection of values from // a stream. type Sample interface { Clear() Count() int64 Max() int64 Mean() float64 Min() int64 Percentile(float64) float64 Percentiles([]float64) []float64 Size() int Snapshot() Sample StdDev() float64 Sum() int64 Update(int64) Values() []int64 Variance() float64 } // ExpDecaySample is an exponentially-decaying sample using a forward-decaying // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time // Decay Model for Streaming Systems". // // type ExpDecaySample struct { alpha float64 count int64 mutex sync.Mutex reservoirSize int t0, t1 time.Time values *expDecaySampleHeap rand *rand.Rand } // NewExpDecaySample constructs a new exponentially-decaying sample with the // given reservoir size and alpha. func NewExpDecaySample(reservoirSize int, alpha float64) Sample { if !Enabled { return NilSample{} } s := &ExpDecaySample{ alpha: alpha, reservoirSize: reservoirSize, t0: time.Now(), values: newExpDecaySampleHeap(reservoirSize), } s.t1 = s.t0.Add(rescaleThreshold) return s } // SetRand sets the random source (useful in tests) func (s *ExpDecaySample) SetRand(prng *rand.Rand) Sample { s.rand = prng return s } // Clear clears all samples. func (s *ExpDecaySample) Clear() { s.mutex.Lock() defer s.mutex.Unlock() s.count = 0 s.t0 = time.Now() s.t1 = s.t0.Add(rescaleThreshold) s.values.Clear() } // Count returns the number of samples recorded, which may exceed the // reservoir size. func (s *ExpDecaySample) Count() int64 { s.mutex.Lock() defer s.mutex.Unlock() return s.count } // Max returns the maximum value in the sample, which may not be the maximum // value ever to be part of the sample. func (s *ExpDecaySample) Max() int64 { return SampleMax(s.Values()) } // Mean returns the mean of the values in the sample. func (s *ExpDecaySample) Mean() float64 { return SampleMean(s.Values()) } // Min returns the minimum value in the sample, which may not be the minimum // value ever to be part of the sample. func (s *ExpDecaySample) Min() int64 { return SampleMin(s.Values()) } // Percentile returns an arbitrary percentile of values in the sample. func (s *ExpDecaySample) Percentile(p float64) float64 { return SamplePercentile(s.Values(), p) } // Percentiles returns a slice of arbitrary percentiles of values in the // sample. func (s *ExpDecaySample) Percentiles(ps []float64) []float64 { return SamplePercentiles(s.Values(), ps) } // Size returns the size of the sample, which is at most the reservoir size. func (s *ExpDecaySample) Size() int { s.mutex.Lock() defer s.mutex.Unlock() return s.values.Size() } // Snapshot returns a read-only copy of the sample. func (s *ExpDecaySample) Snapshot() Sample { s.mutex.Lock() defer s.mutex.Unlock() vals := s.values.Values() values := make([]int64, len(vals)) for i, v := range vals { values[i] = v.v } return &SampleSnapshot{ count: s.count, values: values, } } // StdDev returns the standard deviation of the values in the sample. func (s *ExpDecaySample) StdDev() float64 { return SampleStdDev(s.Values()) } // Sum returns the sum of the values in the sample. func (s *ExpDecaySample) Sum() int64 { return SampleSum(s.Values()) } // Update samples a new value. func (s *ExpDecaySample) Update(v int64) { s.update(time.Now(), v) } // Values returns a copy of the values in the sample. func (s *ExpDecaySample) Values() []int64 { s.mutex.Lock() defer s.mutex.Unlock() vals := s.values.Values() values := make([]int64, len(vals)) for i, v := range vals { values[i] = v.v } return values } // Variance returns the variance of the values in the sample. func (s *ExpDecaySample) Variance() float64 { return SampleVariance(s.Values()) } // update samples a new value at a particular timestamp. This is a method all // its own to facilitate testing. func (s *ExpDecaySample) update(t time.Time, v int64) { s.mutex.Lock() defer s.mutex.Unlock() s.count++ if s.values.Size() == s.reservoirSize { s.values.Pop() } var f64 float64 if s.rand != nil { f64 = s.rand.Float64() } else { f64 = rand.Float64() } s.values.Push(expDecaySample{ k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / f64, v: v, }) if t.After(s.t1) { values := s.values.Values() t0 := s.t0 s.values.Clear() s.t0 = t s.t1 = s.t0.Add(rescaleThreshold) for _, v := range values { v.k = v.k * math.Exp(-s.alpha*s.t0.Sub(t0).Seconds()) s.values.Push(v) } } } // NilSample is a no-op Sample. type NilSample struct{} // Clear is a no-op. func (NilSample) Clear() {} // Count is a no-op. func (NilSample) Count() int64 { return 0 } // Max is a no-op. func (NilSample) Max() int64 { return 0 } // Mean is a no-op. func (NilSample) Mean() float64 { return 0.0 } // Min is a no-op. func (NilSample) Min() int64 { return 0 } // Percentile is a no-op. func (NilSample) Percentile(p float64) float64 { return 0.0 } // Percentiles is a no-op. func (NilSample) Percentiles(ps []float64) []float64 { return make([]float64, len(ps)) } // Size is a no-op. func (NilSample) Size() int { return 0 } // Sample is a no-op. func (NilSample) Snapshot() Sample { return NilSample{} } // StdDev is a no-op. func (NilSample) StdDev() float64 { return 0.0 } // Sum is a no-op. func (NilSample) Sum() int64 { return 0 } // Update is a no-op. func (NilSample) Update(v int64) {} // Values is a no-op. func (NilSample) Values() []int64 { return []int64{} } // Variance is a no-op. func (NilSample) Variance() float64 { return 0.0 } // SampleMax returns the maximum value of the slice of int64. func SampleMax(values []int64) int64 { if len(values) == 0 { return 0 } var max int64 = math.MinInt64 for _, v := range values { if max < v { max = v } } return max } // SampleMean returns the mean value of the slice of int64. func SampleMean(values []int64) float64 { if len(values) == 0 { return 0.0 } return float64(SampleSum(values)) / float64(len(values)) } // SampleMin returns the minimum value of the slice of int64. func SampleMin(values []int64) int64 { if len(values) == 0 { return 0 } var min int64 = math.MaxInt64 for _, v := range values { if min > v { min = v } } return min } // SamplePercentiles returns an arbitrary percentile of the slice of int64. func SamplePercentile(values []int64, p float64) float64 { return SamplePercentiles(values, []float64{p})[0] } // SamplePercentiles returns a slice of arbitrary percentiles of the slice of // int64. func SamplePercentiles(values []int64, ps []float64) []float64 { scores := make([]float64, len(ps)) size := len(values) if size > 0 { slices.Sort(values) for i, p := range ps { pos := p * float64(size+1) if pos < 1.0 { scores[i] = float64(values[0]) } else if pos >= float64(size) { scores[i] = float64(values[size-1]) } else { lower := float64(values[int(pos)-1]) upper := float64(values[int(pos)]) scores[i] = lower + (pos-math.Floor(pos))*(upper-lower) } } } return scores } // SampleSnapshot is a read-only copy of another Sample. type SampleSnapshot struct { count int64 values []int64 } func NewSampleSnapshot(count int64, values []int64) *SampleSnapshot { return &SampleSnapshot{ count: count, values: values, } } // Clear panics. func (*SampleSnapshot) Clear() { panic("Clear called on a SampleSnapshot") } // Count returns the count of inputs at the time the snapshot was taken. func (s *SampleSnapshot) Count() int64 { return s.count } // Max returns the maximal value at the time the snapshot was taken. func (s *SampleSnapshot) Max() int64 { return SampleMax(s.values) } // Mean returns the mean value at the time the snapshot was taken. func (s *SampleSnapshot) Mean() float64 { return SampleMean(s.values) } // Min returns the minimal value at the time the snapshot was taken. func (s *SampleSnapshot) Min() int64 { return SampleMin(s.values) } // Percentile returns an arbitrary percentile of values at the time the // snapshot was taken. func (s *SampleSnapshot) Percentile(p float64) float64 { return SamplePercentile(s.values, p) } // Percentiles returns a slice of arbitrary percentiles of values at the time // the snapshot was taken. func (s *SampleSnapshot) Percentiles(ps []float64) []float64 { return SamplePercentiles(s.values, ps) } // Size returns the size of the sample at the time the snapshot was taken. func (s *SampleSnapshot) Size() int { return len(s.values) } // Snapshot returns the snapshot. func (s *SampleSnapshot) Snapshot() Sample { return s } // StdDev returns the standard deviation of values at the time the snapshot was // taken. func (s *SampleSnapshot) StdDev() float64 { return SampleStdDev(s.values) } // Sum returns the sum of values at the time the snapshot was taken. func (s *SampleSnapshot) Sum() int64 { return SampleSum(s.values) } // Update panics. func (*SampleSnapshot) Update(int64) { panic("Update called on a SampleSnapshot") } // Values returns a copy of the values in the sample. func (s *SampleSnapshot) Values() []int64 { values := make([]int64, len(s.values)) copy(values, s.values) return values } // Variance returns the variance of values at the time the snapshot was taken. func (s *SampleSnapshot) Variance() float64 { return SampleVariance(s.values) } // SampleStdDev returns the standard deviation of the slice of int64. func SampleStdDev(values []int64) float64 { return math.Sqrt(SampleVariance(values)) } // SampleSum returns the sum of the slice of int64. func SampleSum(values []int64) int64 { var sum int64 for _, v := range values { sum += v } return sum } // SampleVariance returns the variance of the slice of int64. func SampleVariance(values []int64) float64 { if len(values) == 0 { return 0.0 } m := SampleMean(values) var sum float64 for _, v := range values { d := float64(v) - m sum += d * d } return sum / float64(len(values)) } // A uniform sample using Vitter's Algorithm R. // // type UniformSample struct { count int64 mutex sync.Mutex reservoirSize int values []int64 rand *rand.Rand } // NewUniformSample constructs a new uniform sample with the given reservoir // size. func NewUniformSample(reservoirSize int) Sample { if !Enabled { return NilSample{} } return &UniformSample{ reservoirSize: reservoirSize, values: make([]int64, 0, reservoirSize), } } // SetRand sets the random source (useful in tests) func (s *UniformSample) SetRand(prng *rand.Rand) Sample { s.rand = prng return s } // Clear clears all samples. func (s *UniformSample) Clear() { s.mutex.Lock() defer s.mutex.Unlock() s.count = 0 s.values = make([]int64, 0, s.reservoirSize) } // Count returns the number of samples recorded, which may exceed the // reservoir size. func (s *UniformSample) Count() int64 { s.mutex.Lock() defer s.mutex.Unlock() return s.count } // Max returns the maximum value in the sample, which may not be the maximum // value ever to be part of the sample. func (s *UniformSample) Max() int64 { s.mutex.Lock() defer s.mutex.Unlock() return SampleMax(s.values) } // Mean returns the mean of the values in the sample. func (s *UniformSample) Mean() float64 { s.mutex.Lock() defer s.mutex.Unlock() return SampleMean(s.values) } // Min returns the minimum value in the sample, which may not be the minimum // value ever to be part of the sample. func (s *UniformSample) Min() int64 { s.mutex.Lock() defer s.mutex.Unlock() return SampleMin(s.values) } // Percentile returns an arbitrary percentile of values in the sample. func (s *UniformSample) Percentile(p float64) float64 { s.mutex.Lock() defer s.mutex.Unlock() return SamplePercentile(s.values, p) } // Percentiles returns a slice of arbitrary percentiles of values in the // sample. func (s *UniformSample) Percentiles(ps []float64) []float64 { s.mutex.Lock() defer s.mutex.Unlock() return SamplePercentiles(s.values, ps) } // Size returns the size of the sample, which is at most the reservoir size. func (s *UniformSample) Size() int { s.mutex.Lock() defer s.mutex.Unlock() return len(s.values) } // Snapshot returns a read-only copy of the sample. func (s *UniformSample) Snapshot() Sample { s.mutex.Lock() defer s.mutex.Unlock() values := make([]int64, len(s.values)) copy(values, s.values) return &SampleSnapshot{ count: s.count, values: values, } } // StdDev returns the standard deviation of the values in the sample. func (s *UniformSample) StdDev() float64 { s.mutex.Lock() defer s.mutex.Unlock() return SampleStdDev(s.values) } // Sum returns the sum of the values in the sample. func (s *UniformSample) Sum() int64 { s.mutex.Lock() defer s.mutex.Unlock() return SampleSum(s.values) } // Update samples a new value. func (s *UniformSample) Update(v int64) { s.mutex.Lock() defer s.mutex.Unlock() s.count++ if len(s.values) < s.reservoirSize { s.values = append(s.values, v) } else { var r int64 if s.rand != nil { r = s.rand.Int63n(s.count) } else { r = rand.Int63n(s.count) } if r < int64(len(s.values)) { s.values[int(r)] = v } } } // Values returns a copy of the values in the sample. func (s *UniformSample) Values() []int64 { s.mutex.Lock() defer s.mutex.Unlock() values := make([]int64, len(s.values)) copy(values, s.values) return values } // Variance returns the variance of the values in the sample. func (s *UniformSample) Variance() float64 { s.mutex.Lock() defer s.mutex.Unlock() return SampleVariance(s.values) } // expDecaySample represents an individual sample in a heap. type expDecaySample struct { k float64 v int64 } func newExpDecaySampleHeap(reservoirSize int) *expDecaySampleHeap { return &expDecaySampleHeap{make([]expDecaySample, 0, reservoirSize)} } // expDecaySampleHeap is a min-heap of expDecaySamples. // The internal implementation is copied from the standard library's container/heap type expDecaySampleHeap struct { s []expDecaySample } func (h *expDecaySampleHeap) Clear() { h.s = h.s[:0] } func (h *expDecaySampleHeap) Push(s expDecaySample) { n := len(h.s) h.s = h.s[0 : n+1] h.s[n] = s h.up(n) } func (h *expDecaySampleHeap) Pop() expDecaySample { n := len(h.s) - 1 h.s[0], h.s[n] = h.s[n], h.s[0] h.down(0, n) n = len(h.s) s := h.s[n-1] h.s = h.s[0 : n-1] return s } func (h *expDecaySampleHeap) Size() int { return len(h.s) } func (h *expDecaySampleHeap) Values() []expDecaySample { return h.s } func (h *expDecaySampleHeap) up(j int) { for { i := (j - 1) / 2 // parent if i == j || !(h.s[j].k < h.s[i].k) { break } h.s[i], h.s[j] = h.s[j], h.s[i] j = i } } func (h *expDecaySampleHeap) down(i, n int) { for { j1 := 2*i + 1 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow break } j := j1 // left child if j2 := j1 + 1; j2 < n && !(h.s[j1].k < h.s[j2].k) { j = j2 // = 2*i + 2 // right child } if !(h.s[j].k < h.s[i].k) { break } h.s[i], h.s[j] = h.s[j], h.s[i] i = j } }