Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
17 changed files with 312 additions and 13 deletions
Showing only changes of commit 7dc100714d - Show all commits

153
metrics/counter_float64.go Normal file
View File

@ -0,0 +1,153 @@
package metrics
import (
"sync"
)
// CounterFloat64 holds a float64 value that can be incremented and decremented.
type CounterFloat64 interface {
Clear()
Count() float64
Dec(float64)
Inc(float64)
Snapshot() CounterFloat64
}
// GetOrRegisterCounterFloat64 returns an existing CounterFloat64 or constructs and registers
// a new StandardCounterFloat64.
func GetOrRegisterCounterFloat64(name string, r Registry) CounterFloat64 {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewCounterFloat64).(CounterFloat64)
}
// GetOrRegisterCounterFloat64Forced returns an existing CounterFloat64 or constructs and registers a
// new CounterFloat64 no matter the global switch is enabled or not.
// Be sure to unregister the counter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterCounterFloat64Forced(name string, r Registry) CounterFloat64 {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewCounterFloat64Forced).(CounterFloat64)
}
// NewCounterFloat64 constructs a new StandardCounterFloat64.
func NewCounterFloat64() CounterFloat64 {
if !Enabled {
return NilCounterFloat64{}
}
return &StandardCounterFloat64{count: 0.0}
}
// NewCounterFloat64Forced constructs a new StandardCounterFloat64 and returns it no matter if
// the global switch is enabled or not.
func NewCounterFloat64Forced() CounterFloat64 {
return &StandardCounterFloat64{count: 0.0}
}
// NewRegisteredCounterFloat64 constructs and registers a new StandardCounterFloat64.
func NewRegisteredCounterFloat64(name string, r Registry) CounterFloat64 {
c := NewCounterFloat64()
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// NewRegisteredCounterFloat64Forced constructs and registers a new StandardCounterFloat64
// and launches a goroutine no matter the global switch is enabled or not.
// Be sure to unregister the counter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredCounterFloat64Forced(name string, r Registry) CounterFloat64 {
c := NewCounterFloat64Forced()
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// CounterFloat64Snapshot is a read-only copy of another CounterFloat64.
type CounterFloat64Snapshot float64
// Clear panics.
func (CounterFloat64Snapshot) Clear() {
panic("Clear called on a CounterFloat64Snapshot")
}
// Count returns the value at the time the snapshot was taken.
func (c CounterFloat64Snapshot) Count() float64 { return float64(c) }
// Dec panics.
func (CounterFloat64Snapshot) Dec(float64) {
panic("Dec called on a CounterFloat64Snapshot")
}
// Inc panics.
func (CounterFloat64Snapshot) Inc(float64) {
panic("Inc called on a CounterFloat64Snapshot")
}
// Snapshot returns the snapshot.
func (c CounterFloat64Snapshot) Snapshot() CounterFloat64 { return c }
// NilCounterFloat64 is a no-op CounterFloat64.
type NilCounterFloat64 struct{}
// Clear is a no-op.
func (NilCounterFloat64) Clear() {}
// Count is a no-op.
func (NilCounterFloat64) Count() float64 { return 0.0 }
// Dec is a no-op.
func (NilCounterFloat64) Dec(i float64) {}
// Inc is a no-op.
func (NilCounterFloat64) Inc(i float64) {}
// Snapshot is a no-op.
func (NilCounterFloat64) Snapshot() CounterFloat64 { return NilCounterFloat64{} }
// StandardCounterFloat64 is the standard implementation of a CounterFloat64 and uses the
// sync.Mutex package to manage a single float64 value.
type StandardCounterFloat64 struct {
mutex sync.Mutex
count float64
}
// Clear sets the counter to zero.
func (c *StandardCounterFloat64) Clear() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.count = 0.0
}
// Count returns the current value.
func (c *StandardCounterFloat64) Count() float64 {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.count
}
// Dec decrements the counter by the given amount.
func (c *StandardCounterFloat64) Dec(v float64) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.count -= v
}
// Inc increments the counter by the given amount.
func (c *StandardCounterFloat64) Inc(v float64) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.count += v
}
// Snapshot returns a read-only copy of the counter.
func (c *StandardCounterFloat64) Snapshot() CounterFloat64 {
return CounterFloat64Snapshot(c.Count())
}

View File

@ -0,0 +1,77 @@
package metrics
import "testing"
func BenchmarkCounterFloat64(b *testing.B) {
c := NewCounterFloat64()
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.Inc(1.0)
}
}
func TestCounterFloat64Clear(t *testing.T) {
c := NewCounterFloat64()
c.Inc(1.0)
c.Clear()
if count := c.Count(); count != 0 {
t.Errorf("c.Count(): 0 != %v\n", count)
}
}
func TestCounterFloat64Dec1(t *testing.T) {
c := NewCounterFloat64()
c.Dec(1.0)
if count := c.Count(); count != -1.0 {
t.Errorf("c.Count(): -1.0 != %v\n", count)
}
}
func TestCounterFloat64Dec2(t *testing.T) {
c := NewCounterFloat64()
c.Dec(2.0)
if count := c.Count(); count != -2.0 {
t.Errorf("c.Count(): -2.0 != %v\n", count)
}
}
func TestCounterFloat64Inc1(t *testing.T) {
c := NewCounterFloat64()
c.Inc(1.0)
if count := c.Count(); count != 1.0 {
t.Errorf("c.Count(): 1.0 != %v\n", count)
}
}
func TestCounterFloat64Inc2(t *testing.T) {
c := NewCounterFloat64()
c.Inc(2.0)
if count := c.Count(); count != 2.0 {
t.Errorf("c.Count(): 2.0 != %v\n", count)
}
}
func TestCounterFloat64Snapshot(t *testing.T) {
c := NewCounterFloat64()
c.Inc(1.0)
snapshot := c.Snapshot()
c.Inc(1.0)
if count := snapshot.Count(); count != 1.0 {
t.Errorf("c.Count(): 1.0 != %v\n", count)
}
}
func TestCounterFloat64Zero(t *testing.T) {
c := NewCounterFloat64()
if count := c.Count(); count != 0 {
t.Errorf("c.Count(): 0 != %v\n", count)
}
}
func TestGetOrRegisterCounterFloat64(t *testing.T) {
r := NewRegistry()
NewRegisteredCounterFloat64("foo", r).Inc(47.0)
if c := GetOrRegisterCounterFloat64("foo", r); c.Count() != 47.0 {
t.Fatal(c)
}
}

View File

@ -100,6 +100,11 @@ func (exp *exp) publishCounter(name string, metric metrics.Counter) {
v.Set(metric.Count()) v.Set(metric.Count())
} }
func (exp *exp) publishCounterFloat64(name string, metric metrics.CounterFloat64) {
v := exp.getFloat(name)
v.Set(metric.Count())
}
func (exp *exp) publishGauge(name string, metric metrics.Gauge) { func (exp *exp) publishGauge(name string, metric metrics.Gauge) {
v := exp.getInt(name) v := exp.getInt(name)
v.Set(metric.Value()) v.Set(metric.Value())
@ -167,6 +172,8 @@ func (exp *exp) syncToExpvar() {
switch i := i.(type) { switch i := i.(type) {
case metrics.Counter: case metrics.Counter:
exp.publishCounter(name, i) exp.publishCounter(name, i)
case metrics.CounterFloat64:
exp.publishCounterFloat64(name, i)
case metrics.Gauge: case metrics.Gauge:
exp.publishGauge(name, i) exp.publishGauge(name, i)
case metrics.GaugeFloat64: case metrics.GaugeFloat64:

View File

@ -67,6 +67,8 @@ func graphite(c *GraphiteConfig) error {
switch metric := i.(type) { switch metric := i.(type) {
case Counter: case Counter:
fmt.Fprintf(w, "%s.%s.count %d %d\n", c.Prefix, name, metric.Count(), now) fmt.Fprintf(w, "%s.%s.count %d %d\n", c.Prefix, name, metric.Count(), now)
case CounterFloat64:
fmt.Fprintf(w, "%s.%s.count %f %d\n", c.Prefix, name, metric.Count(), now)
case Gauge: case Gauge:
fmt.Fprintf(w, "%s.%s.value %d %d\n", c.Prefix, name, metric.Value(), now) fmt.Fprintf(w, "%s.%s.value %d %d\n", c.Prefix, name, metric.Value(), now)
case GaugeFloat64: case GaugeFloat64:

View File

@ -141,6 +141,16 @@ func (r *reporter) send() error {
}, },
Time: now, Time: now,
}) })
case metrics.CounterFloat64:
count := metric.Count()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": count,
},
Time: now,
})
case metrics.Gauge: case metrics.Gauge:
ms := metric.Snapshot() ms := metric.Snapshot()
pts = append(pts, client.Point{ pts = append(pts, client.Point{

View File

@ -24,8 +24,6 @@ type v2Reporter struct {
client influxdb2.Client client influxdb2.Client
write api.WriteAPI write api.WriteAPI
cache map[string]int64
} }
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
@ -39,7 +37,6 @@ func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, to
organization: organization, organization: organization,
namespace: namespace, namespace: namespace,
tags: tags, tags: tags,
cache: make(map[string]int64),
} }
rep.client = influxdb2.NewClient(rep.endpoint, rep.token) rep.client = influxdb2.NewClient(rep.endpoint, rep.token)
@ -86,17 +83,25 @@ func (r *v2Reporter) send() {
switch metric := i.(type) { switch metric := i.(type) {
case metrics.Counter: case metrics.Counter:
v := metric.Count() v := metric.Count()
l := r.cache[name]
measurement := fmt.Sprintf("%s%s.count", namespace, name) measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{ fields := map[string]interface{}{
"value": v - l, "value": v,
} }
pt := influxdb2.NewPoint(measurement, r.tags, fields, now) pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt) r.write.WritePoint(pt)
r.cache[name] = v case metrics.CounterFloat64:
v := metric.Count()
measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{
"value": v,
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.Gauge: case metrics.Gauge:
ms := metric.Snapshot() ms := metric.Snapshot()

View File

@ -107,6 +107,17 @@ func (rep *Reporter) BuildRequest(now time.Time, r metrics.Registry) (snapshot B
} }
snapshot.Counters = append(snapshot.Counters, measurement) snapshot.Counters = append(snapshot.Counters, measurement)
} }
case metrics.CounterFloat64:
if m.Count() > 0 {
measurement[Name] = fmt.Sprintf("%s.%s", name, "count")
measurement[Value] = m.Count()
measurement[Attributes] = map[string]interface{}{
DisplayUnitsLong: Operations,
DisplayUnitsShort: OperationsShort,
DisplayMin: "0",
}
snapshot.Counters = append(snapshot.Counters, measurement)
}
case metrics.Gauge: case metrics.Gauge:
measurement[Name] = name measurement[Name] = name
measurement[Value] = float64(m.Value()) measurement[Value] = float64(m.Value())

View File

@ -24,6 +24,9 @@ func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) {
case Counter: case Counter:
l.Printf("counter %s\n", name) l.Printf("counter %s\n", name)
l.Printf(" count: %9d\n", metric.Count()) l.Printf(" count: %9d\n", metric.Count())
case CounterFloat64:
l.Printf("counter %s\n", name)
l.Printf(" count: %f\n", metric.Count())
case Gauge: case Gauge:
l.Printf("gauge %s\n", name) l.Printf("gauge %s\n", name)
l.Printf(" value: %9d\n", metric.Value()) l.Printf(" value: %9d\n", metric.Value())

View File

@ -144,6 +144,9 @@ func CollectProcessMetrics(refresh time.Duration) {
cpuSysLoad = GetOrRegisterGauge("system/cpu/sysload", DefaultRegistry) cpuSysLoad = GetOrRegisterGauge("system/cpu/sysload", DefaultRegistry)
cpuSysWait = GetOrRegisterGauge("system/cpu/syswait", DefaultRegistry) cpuSysWait = GetOrRegisterGauge("system/cpu/syswait", DefaultRegistry)
cpuProcLoad = GetOrRegisterGauge("system/cpu/procload", DefaultRegistry) cpuProcLoad = GetOrRegisterGauge("system/cpu/procload", DefaultRegistry)
cpuSysLoadTotal = GetOrRegisterCounterFloat64("system/cpu/sysload/total", DefaultRegistry)
cpuSysWaitTotal = GetOrRegisterCounterFloat64("system/cpu/syswait/total", DefaultRegistry)
cpuProcLoadTotal = GetOrRegisterCounterFloat64("system/cpu/procload/total", DefaultRegistry)
cpuThreads = GetOrRegisterGauge("system/cpu/threads", DefaultRegistry) cpuThreads = GetOrRegisterGauge("system/cpu/threads", DefaultRegistry)
cpuGoroutines = GetOrRegisterGauge("system/cpu/goroutines", DefaultRegistry) cpuGoroutines = GetOrRegisterGauge("system/cpu/goroutines", DefaultRegistry)
cpuSchedLatency = getOrRegisterRuntimeHistogram("system/cpu/schedlatency", secondsToNs, nil) cpuSchedLatency = getOrRegisterRuntimeHistogram("system/cpu/schedlatency", secondsToNs, nil)
@ -172,13 +175,17 @@ func CollectProcessMetrics(refresh time.Duration) {
secondsSinceLastCollect := collectTime.Sub(lastCollectTime).Seconds() secondsSinceLastCollect := collectTime.Sub(lastCollectTime).Seconds()
lastCollectTime = collectTime lastCollectTime = collectTime
if secondsSinceLastCollect > 0 { if secondsSinceLastCollect > 0 {
sysLoad := (cpustats[now].GlobalTime - cpustats[prev].GlobalTime) / secondsSinceLastCollect sysLoad := cpustats[now].GlobalTime - cpustats[prev].GlobalTime
sysWait := (cpustats[now].GlobalWait - cpustats[prev].GlobalWait) / secondsSinceLastCollect sysWait := cpustats[now].GlobalWait - cpustats[prev].GlobalWait
procLoad := (cpustats[now].LocalTime - cpustats[prev].LocalTime) / secondsSinceLastCollect procLoad := cpustats[now].LocalTime - cpustats[prev].LocalTime
// Convert to integer percentage. // Convert to integer percentage.
cpuSysLoad.Update(int64(sysLoad * 100)) cpuSysLoad.Update(int64(sysLoad / secondsSinceLastCollect * 100))
cpuSysWait.Update(int64(sysWait * 100)) cpuSysWait.Update(int64(sysWait / secondsSinceLastCollect * 100))
cpuProcLoad.Update(int64(procLoad * 100)) cpuProcLoad.Update(int64(procLoad / secondsSinceLastCollect * 100))
// increment counters (ms)
cpuSysLoadTotal.Inc(sysLoad)
cpuSysWaitTotal.Inc(sysWait)
cpuProcLoadTotal.Inc(procLoad)
} }
// Threads // Threads

View File

@ -18,6 +18,7 @@ func TestReadRuntimeValues(t *testing.T) {
func BenchmarkMetrics(b *testing.B) { func BenchmarkMetrics(b *testing.B) {
r := NewRegistry() r := NewRegistry()
c := NewRegisteredCounter("counter", r) c := NewRegisteredCounter("counter", r)
cf := NewRegisteredCounterFloat64("counterfloat64", r)
g := NewRegisteredGauge("gauge", r) g := NewRegisteredGauge("gauge", r)
gf := NewRegisteredGaugeFloat64("gaugefloat64", r) gf := NewRegisteredGaugeFloat64("gaugefloat64", r)
h := NewRegisteredHistogram("histogram", r, NewUniformSample(100)) h := NewRegisteredHistogram("histogram", r, NewUniformSample(100))
@ -71,6 +72,7 @@ func BenchmarkMetrics(b *testing.B) {
//log.Println("go", i) //log.Println("go", i)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
c.Inc(1) c.Inc(1)
cf.Inc(1.0)
g.Update(int64(i)) g.Update(int64(i))
gf.Update(float64(i)) gf.Update(float64(i))
h.Update(int64(i)) h.Update(int64(i))

View File

@ -71,6 +71,8 @@ func openTSDB(c *OpenTSDBConfig) error {
switch metric := i.(type) { switch metric := i.(type) {
case Counter: case Counter:
fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname) fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname)
case CounterFloat64:
fmt.Fprintf(w, "put %s.%s.count %d %f host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname)
case Gauge: case Gauge:
fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname) fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname)
case GaugeFloat64: case GaugeFloat64:

View File

@ -50,6 +50,10 @@ func (c *collector) addCounter(name string, m metrics.Counter) {
c.writeGaugeCounter(name, m.Count()) c.writeGaugeCounter(name, m.Count())
} }
func (c *collector) addCounterFloat64(name string, m metrics.CounterFloat64) {
c.writeGaugeCounter(name, m.Count())
}
func (c *collector) addGauge(name string, m metrics.Gauge) { func (c *collector) addGauge(name string, m metrics.Gauge) {
c.writeGaugeCounter(name, m.Value()) c.writeGaugeCounter(name, m.Value())
} }

View File

@ -20,6 +20,10 @@ func TestCollector(t *testing.T) {
counter.Inc(12345) counter.Inc(12345)
c.addCounter("test/counter", counter) c.addCounter("test/counter", counter)
counterfloat64 := metrics.NewCounterFloat64()
counterfloat64.Inc(54321.98)
c.addCounterFloat64("test/counter_float64", counterfloat64)
gauge := metrics.NewGauge() gauge := metrics.NewGauge()
gauge.Update(23456) gauge.Update(23456)
c.addGauge("test/gauge", gauge) c.addGauge("test/gauge", gauge)
@ -61,6 +65,9 @@ func TestCollector(t *testing.T) {
const expectedOutput = `# TYPE test_counter gauge const expectedOutput = `# TYPE test_counter gauge
test_counter 12345 test_counter 12345
# TYPE test_counter_float64 gauge
test_counter_float64 54321.98
# TYPE test_gauge gauge # TYPE test_gauge gauge
test_gauge 23456 test_gauge 23456

View File

@ -45,6 +45,8 @@ func Handler(reg metrics.Registry) http.Handler {
switch m := i.(type) { switch m := i.(type) {
case metrics.Counter: case metrics.Counter:
c.addCounter(name, m.Snapshot()) c.addCounter(name, m.Snapshot())
case metrics.CounterFloat64:
c.addCounterFloat64(name, m.Snapshot())
case metrics.Gauge: case metrics.Gauge:
c.addGauge(name, m.Snapshot()) c.addGauge(name, m.Snapshot())
case metrics.GaugeFloat64: case metrics.GaugeFloat64:

View File

@ -120,6 +120,8 @@ func (r *StandardRegistry) GetAll() map[string]map[string]interface{} {
switch metric := i.(type) { switch metric := i.(type) {
case Counter: case Counter:
values["count"] = metric.Count() values["count"] = metric.Count()
case CounterFloat64:
values["count"] = metric.Count()
case Gauge: case Gauge:
values["value"] = metric.Value() values["value"] = metric.Value()
case GaugeFloat64: case GaugeFloat64:
@ -196,7 +198,7 @@ func (r *StandardRegistry) register(name string, i interface{}) error {
return DuplicateMetric(name) return DuplicateMetric(name)
} }
switch i.(type) { switch i.(type) {
case Counter, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer: case Counter, CounterFloat64, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer:
r.metrics[name] = i r.metrics[name] = i
} }
return nil return nil

View File

@ -17,6 +17,8 @@ func Syslog(r Registry, d time.Duration, w *syslog.Writer) {
switch metric := i.(type) { switch metric := i.(type) {
case Counter: case Counter:
w.Info(fmt.Sprintf("counter %s: count: %d", name, metric.Count())) w.Info(fmt.Sprintf("counter %s: count: %d", name, metric.Count()))
case CounterFloat64:
w.Info(fmt.Sprintf("counter %s: count: %f", name, metric.Count()))
case Gauge: case Gauge:
w.Info(fmt.Sprintf("gauge %s: value: %d", name, metric.Value())) w.Info(fmt.Sprintf("gauge %s: value: %d", name, metric.Value()))
case GaugeFloat64: case GaugeFloat64:

View File

@ -29,6 +29,9 @@ func WriteOnce(r Registry, w io.Writer) {
case Counter: case Counter:
fmt.Fprintf(w, "counter %s\n", namedMetric.name) fmt.Fprintf(w, "counter %s\n", namedMetric.name)
fmt.Fprintf(w, " count: %9d\n", metric.Count()) fmt.Fprintf(w, " count: %9d\n", metric.Count())
case CounterFloat64:
fmt.Fprintf(w, "counter %s\n", namedMetric.name)
fmt.Fprintf(w, " count: %f\n", metric.Count())
case Gauge: case Gauge:
fmt.Fprintf(w, "gauge %s\n", namedMetric.name) fmt.Fprintf(w, "gauge %s\n", namedMetric.name)
fmt.Fprintf(w, " value: %9d\n", metric.Value()) fmt.Fprintf(w, " value: %9d\n", metric.Value())