metrics: use atomic type (#27121)

This commit is contained in:
s7v7nislands 2023-04-20 15:36:54 +08:00 committed by GitHub
parent 3f7afc3f57
commit ae93e0b484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 28 deletions

View File

@ -38,13 +38,13 @@ func NewCounter() Counter {
if !Enabled { if !Enabled {
return NilCounter{} return NilCounter{}
} }
return &StandardCounter{0} return &StandardCounter{}
} }
// NewCounterForced constructs a new StandardCounter and returns it no matter if // NewCounterForced constructs a new StandardCounter and returns it no matter if
// the global switch is enabled or not. // the global switch is enabled or not.
func NewCounterForced() Counter { func NewCounterForced() Counter {
return &StandardCounter{0} return &StandardCounter{}
} }
// NewRegisteredCounter constructs and registers a new StandardCounter. // NewRegisteredCounter constructs and registers a new StandardCounter.
@ -115,27 +115,27 @@ func (NilCounter) Snapshot() Counter { return NilCounter{} }
// StandardCounter is the standard implementation of a Counter and uses the // StandardCounter is the standard implementation of a Counter and uses the
// sync/atomic package to manage a single int64 value. // sync/atomic package to manage a single int64 value.
type StandardCounter struct { type StandardCounter struct {
count int64 count atomic.Int64
} }
// Clear sets the counter to zero. // Clear sets the counter to zero.
func (c *StandardCounter) Clear() { func (c *StandardCounter) Clear() {
atomic.StoreInt64(&c.count, 0) c.count.Store(0)
} }
// Count returns the current count. // Count returns the current count.
func (c *StandardCounter) Count() int64 { func (c *StandardCounter) Count() int64 {
return atomic.LoadInt64(&c.count) return c.count.Load()
} }
// Dec decrements the counter by the given amount. // Dec decrements the counter by the given amount.
func (c *StandardCounter) Dec(i int64) { func (c *StandardCounter) Dec(i int64) {
atomic.AddInt64(&c.count, -i) c.count.Add(-i)
} }
// Inc increments the counter by the given amount. // Inc increments the counter by the given amount.
func (c *StandardCounter) Inc(i int64) { func (c *StandardCounter) Inc(i int64) {
atomic.AddInt64(&c.count, i) c.count.Add(i)
} }
// Snapshot returns a read-only copy of the counter. // Snapshot returns a read-only copy of the counter.

View File

@ -75,7 +75,7 @@ func (NilEWMA) Update(n int64) {}
// of uncounted events and processes them on each tick. It uses the // of uncounted events and processes them on each tick. It uses the
// sync/atomic package to manage uncounted events. // sync/atomic package to manage uncounted events.
type StandardEWMA struct { type StandardEWMA struct {
uncounted int64 // /!\ this should be the first member to ensure 64-bit alignment uncounted atomic.Int64
alpha float64 alpha float64
rate float64 rate float64
init bool init bool
@ -97,8 +97,8 @@ func (a *StandardEWMA) Snapshot() EWMA {
// Tick ticks the clock to update the moving average. It assumes it is called // Tick ticks the clock to update the moving average. It assumes it is called
// every five seconds. // every five seconds.
func (a *StandardEWMA) Tick() { func (a *StandardEWMA) Tick() {
count := atomic.LoadInt64(&a.uncounted) count := a.uncounted.Load()
atomic.AddInt64(&a.uncounted, -count) a.uncounted.Add(-count)
instantRate := float64(count) / float64(5*time.Second) instantRate := float64(count) / float64(5*time.Second)
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
@ -112,5 +112,5 @@ func (a *StandardEWMA) Tick() {
// Update adds n uncounted events. // Update adds n uncounted events.
func (a *StandardEWMA) Update(n int64) { func (a *StandardEWMA) Update(n int64) {
atomic.AddInt64(&a.uncounted, n) a.uncounted.Add(n)
} }

View File

@ -25,7 +25,7 @@ func NewGauge() Gauge {
if !Enabled { if !Enabled {
return NilGauge{} return NilGauge{}
} }
return &StandardGauge{0} return &StandardGauge{}
} }
// NewRegisteredGauge constructs and registers a new StandardGauge. // NewRegisteredGauge constructs and registers a new StandardGauge.
@ -101,7 +101,7 @@ func (NilGauge) Value() int64 { return 0 }
// StandardGauge is the standard implementation of a Gauge and uses the // StandardGauge is the standard implementation of a Gauge and uses the
// sync/atomic package to manage a single int64 value. // sync/atomic package to manage a single int64 value.
type StandardGauge struct { type StandardGauge struct {
value int64 value atomic.Int64
} }
// Snapshot returns a read-only copy of the gauge. // Snapshot returns a read-only copy of the gauge.
@ -111,22 +111,22 @@ func (g *StandardGauge) Snapshot() Gauge {
// Update updates the gauge's value. // Update updates the gauge's value.
func (g *StandardGauge) Update(v int64) { func (g *StandardGauge) Update(v int64) {
atomic.StoreInt64(&g.value, v) g.value.Store(v)
} }
// Value returns the gauge's current value. // Value returns the gauge's current value.
func (g *StandardGauge) Value() int64 { func (g *StandardGauge) Value() int64 {
return atomic.LoadInt64(&g.value) return g.value.Load()
} }
// Dec decrements the gauge's current value by the given amount. // Dec decrements the gauge's current value by the given amount.
func (g *StandardGauge) Dec(i int64) { func (g *StandardGauge) Dec(i int64) {
atomic.AddInt64(&g.value, -i) g.value.Add(-i)
} }
// Inc increments the gauge's current value by the given amount. // Inc increments the gauge's current value by the given amount.
func (g *StandardGauge) Inc(i int64) { func (g *StandardGauge) Inc(i int64) {
atomic.AddInt64(&g.value, i) g.value.Add(i)
} }
// FunctionalGauge returns value from given function // FunctionalGauge returns value from given function

View File

@ -101,11 +101,7 @@ func NewRegisteredMeterForced(name string, r Registry) Meter {
// MeterSnapshot is a read-only copy of another Meter. // MeterSnapshot is a read-only copy of another Meter.
type MeterSnapshot struct { type MeterSnapshot struct {
// WARNING: The `temp` field is accessed atomically. temp atomic.Int64
// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
// guaranteed to be so aligned, so take advantage of that. For more information,
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
temp int64
count int64 count int64
rate1, rate5, rate15, rateMean float64 rate1, rate5, rate15, rateMean float64
} }
@ -173,7 +169,7 @@ type StandardMeter struct {
snapshot *MeterSnapshot snapshot *MeterSnapshot
a1, a5, a15 EWMA a1, a5, a15 EWMA
startTime time.Time startTime time.Time
stopped uint32 stopped atomic.Bool
} }
func newStandardMeter() *StandardMeter { func newStandardMeter() *StandardMeter {
@ -188,8 +184,8 @@ func newStandardMeter() *StandardMeter {
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped. // Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() { func (m *StandardMeter) Stop() {
stopped := atomic.SwapUint32(&m.stopped, 1) stopped := m.stopped.Swap(true)
if stopped != 1 { if !stopped {
arbiter.Lock() arbiter.Lock()
delete(arbiter.meters, m) delete(arbiter.meters, m)
arbiter.Unlock() arbiter.Unlock()
@ -207,7 +203,7 @@ func (m *StandardMeter) Count() int64 {
// Mark records the occurrence of n events. // Mark records the occurrence of n events.
func (m *StandardMeter) Mark(n int64) { func (m *StandardMeter) Mark(n int64) {
atomic.AddInt64(&m.snapshot.temp, n) m.snapshot.temp.Add(n)
} }
// Rate1 returns the one-minute moving average rate of events per second. // Rate1 returns the one-minute moving average rate of events per second.
@ -241,7 +237,14 @@ func (m *StandardMeter) RateMean() float64 {
// Snapshot returns a read-only copy of the meter. // Snapshot returns a read-only copy of the meter.
func (m *StandardMeter) Snapshot() Meter { func (m *StandardMeter) Snapshot() Meter {
m.lock.RLock() m.lock.RLock()
snapshot := *m.snapshot snapshot := MeterSnapshot{
count: m.snapshot.count,
rate1: m.snapshot.rate1,
rate5: m.snapshot.rate5,
rate15: m.snapshot.rate15,
rateMean: m.snapshot.rateMean,
}
snapshot.temp.Store(m.snapshot.temp.Load())
m.lock.RUnlock() m.lock.RUnlock()
return &snapshot return &snapshot
} }
@ -257,7 +260,7 @@ func (m *StandardMeter) updateSnapshot() {
func (m *StandardMeter) updateMeter() { func (m *StandardMeter) updateMeter() {
// should only run with write lock held on m.lock // should only run with write lock held on m.lock
n := atomic.SwapInt64(&m.snapshot.temp, 0) n := m.snapshot.temp.Swap(0)
m.snapshot.count += n m.snapshot.count += n
m.a1.Update(n) m.a1.Update(n)
m.a5.Update(n) m.a5.Update(n)