metrics, cmd/geth: informational metrics (prometheus, influxdb, opentsb) (#24877)

This chang creates a GaugeInfo metrics type for registering informational (textual) metrics, e.g. geth version number. It also improves the testing for backend-exporters, and uses a shared subpackage in 'internal' to provide sample datasets and ordered registry. 

Implements #21783

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
Jorge 2023-08-31 19:37:17 +02:00 committed by GitHub
parent 5b159498bb
commit 53f3c2ae65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 751 additions and 144 deletions

View File

@ -22,10 +22,10 @@ import (
"fmt"
"os"
"reflect"
"runtime"
"strings"
"unicode"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/external"
"github.com/ethereum/go-ethereum/accounts/keystore"
@ -43,6 +43,7 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/naoina/toml"
"github.com/urfave/cli/v2"
)
var (
@ -177,6 +178,20 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
}
backend, eth := utils.RegisterEthService(stack, &cfg.Eth)
// Create gauge with geth system and build information
if eth != nil { // The 'eth' backend may be nil in light mode
var protos []string
for _, p := range eth.Protocols() {
protos = append(protos, fmt.Sprintf("%v/%d", p.Name, p.Version))
}
metrics.NewRegisteredGaugeInfo("geth/info", nil).Update(metrics.GaugeInfoValue{
"arch": runtime.GOARCH,
"os": runtime.GOOS,
"version": cfg.Node.Version,
"eth_protocols": strings.Join(protos, ","),
})
}
// Configure log filter RPC API.
filterSystem := utils.RegisterFilterAPI(stack, backend, &cfg.Eth)

View File

@ -60,6 +60,8 @@ var (
headFinalizedBlockGauge = metrics.NewRegisteredGauge("chain/head/finalized", nil)
headSafeBlockGauge = metrics.NewRegisteredGauge("chain/head/safe", nil)
chainInfoGauge = metrics.NewRegisteredGaugeInfo("chain/info", nil)
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
@ -322,6 +324,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.currentFinalBlock.Store(nil)
bc.currentSafeBlock.Store(nil)
// Update chain info data metrics
chainInfoGauge.Update(metrics.GaugeInfoValue{"chain_id": bc.chainConfig.ChainID.String()})
// If Geth is initialized with an external ancient store, re-initialize the
// missing chain indexes and chain flags. This procedure can survive crash
// and can be resumed in next restart since chain flags are updated in last step.

View File

@ -95,6 +95,20 @@ func (exp *exp) getFloat(name string) *expvar.Float {
return v
}
func (exp *exp) getInfo(name string) *expvar.String {
var v *expvar.String
exp.expvarLock.Lock()
p := expvar.Get(name)
if p != nil {
v = p.(*expvar.String)
} else {
v = new(expvar.String)
expvar.Publish(name, v)
}
exp.expvarLock.Unlock()
return v
}
func (exp *exp) publishCounter(name string, metric metrics.Counter) {
v := exp.getInt(name)
v.Set(metric.Count())
@ -113,6 +127,10 @@ func (exp *exp) publishGaugeFloat64(name string, metric metrics.GaugeFloat64) {
exp.getFloat(name).Set(metric.Value())
}
func (exp *exp) publishGaugeInfo(name string, metric metrics.GaugeInfo) {
exp.getInfo(name).Set(metric.Value().String())
}
func (exp *exp) publishHistogram(name string, metric metrics.Histogram) {
h := metric.Snapshot()
ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
@ -178,6 +196,8 @@ func (exp *exp) syncToExpvar() {
exp.publishGauge(name, i)
case metrics.GaugeFloat64:
exp.publishGaugeFloat64(name, i)
case metrics.GaugeInfo:
exp.publishGaugeInfo(name, i)
case metrics.Histogram:
exp.publishHistogram(name, i)
case metrics.Meter:

144
metrics/gauge_info.go Normal file
View File

@ -0,0 +1,144 @@
package metrics
import (
"encoding/json"
"sync"
)
// GaugeInfos hold a GaugeInfoValue value that can be set arbitrarily.
type GaugeInfo interface {
Snapshot() GaugeInfo
Update(GaugeInfoValue)
Value() GaugeInfoValue
}
// GaugeInfoValue is a mappng of (string) keys to (string) values
type GaugeInfoValue map[string]string
func (val GaugeInfoValue) String() string {
data, _ := json.Marshal(val)
return string(data)
}
// GetOrRegisterGaugeInfo returns an existing GaugeInfo or constructs and registers a
// new StandardGaugeInfo.
func GetOrRegisterGaugeInfo(name string, r Registry) GaugeInfo {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewGaugeInfo()).(GaugeInfo)
}
// NewGaugeInfo constructs a new StandardGaugeInfo.
func NewGaugeInfo() GaugeInfo {
if !Enabled {
return NilGaugeInfo{}
}
return &StandardGaugeInfo{
value: GaugeInfoValue{},
}
}
// NewRegisteredGaugeInfo constructs and registers a new StandardGaugeInfo.
func NewRegisteredGaugeInfo(name string, r Registry) GaugeInfo {
c := NewGaugeInfo()
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// NewFunctionalGauge constructs a new FunctionalGauge.
func NewFunctionalGaugeInfo(f func() GaugeInfoValue) GaugeInfo {
if !Enabled {
return NilGaugeInfo{}
}
return &FunctionalGaugeInfo{value: f}
}
// NewRegisteredFunctionalGauge constructs and registers a new StandardGauge.
func NewRegisteredFunctionalGaugeInfo(name string, r Registry, f func() GaugeInfoValue) GaugeInfo {
c := NewFunctionalGaugeInfo(f)
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
}
// GaugeInfoSnapshot is a read-only copy of another GaugeInfo.
type GaugeInfoSnapshot GaugeInfoValue
// Snapshot returns the snapshot.
func (g GaugeInfoSnapshot) Snapshot() GaugeInfo { return g }
// Update panics.
func (GaugeInfoSnapshot) Update(GaugeInfoValue) {
panic("Update called on a GaugeInfoSnapshot")
}
// Value returns the value at the time the snapshot was taken.
func (g GaugeInfoSnapshot) Value() GaugeInfoValue { return GaugeInfoValue(g) }
// NilGauge is a no-op Gauge.
type NilGaugeInfo struct{}
// Snapshot is a no-op.
func (NilGaugeInfo) Snapshot() GaugeInfo { return NilGaugeInfo{} }
// Update is a no-op.
func (NilGaugeInfo) Update(v GaugeInfoValue) {}
// Value is a no-op.
func (NilGaugeInfo) Value() GaugeInfoValue { return GaugeInfoValue{} }
// StandardGaugeInfo is the standard implementation of a GaugeInfo and uses
// sync.Mutex to manage a single string value.
type StandardGaugeInfo struct {
mutex sync.Mutex
value GaugeInfoValue
}
// Snapshot returns a read-only copy of the gauge.
func (g *StandardGaugeInfo) Snapshot() GaugeInfo {
return GaugeInfoSnapshot(g.Value())
}
// Update updates the gauge's value.
func (g *StandardGaugeInfo) Update(v GaugeInfoValue) {
g.mutex.Lock()
defer g.mutex.Unlock()
g.value = v
}
// Value returns the gauge's current value.
func (g *StandardGaugeInfo) Value() GaugeInfoValue {
g.mutex.Lock()
defer g.mutex.Unlock()
return g.value
}
// FunctionalGaugeInfo returns value from given function
type FunctionalGaugeInfo struct {
value func() GaugeInfoValue
}
// Value returns the gauge's current value.
func (g FunctionalGaugeInfo) Value() GaugeInfoValue {
return g.value()
}
// Value returns the gauge's current value in JSON string format
func (g FunctionalGaugeInfo) ValueJsonString() string {
data, _ := json.Marshal(g.value())
return string(data)
}
// Snapshot returns the snapshot.
func (g FunctionalGaugeInfo) Snapshot() GaugeInfo { return GaugeInfoSnapshot(g.Value()) }
// Update panics.
func (FunctionalGaugeInfo) Update(GaugeInfoValue) {
panic("Update called on a FunctionalGaugeInfo")
}

View File

@ -0,0 +1,75 @@
package metrics
import (
"strconv"
"testing"
)
func TestGaugeInfoJsonString(t *testing.T) {
g := NewGaugeInfo()
g.Update(GaugeInfoValue{
"chain_id": "5",
"anotherKey": "any_string_value",
"third_key": "anything",
},
)
want := `{"anotherKey":"any_string_value","chain_id":"5","third_key":"anything"}`
if have := g.Value().String(); have != want {
t.Errorf("\nhave: %v\nwant: %v\n", have, want)
}
}
func TestGaugeInfoSnapshot(t *testing.T) {
g := NewGaugeInfo()
g.Update(GaugeInfoValue{"value": "original"})
snapshot := g.Snapshot() // Snapshot @chainid 5
g.Update(GaugeInfoValue{"value": "updated"})
// The 'g' should be updated
if have, want := g.Value().String(), `{"value":"updated"}`; have != want {
t.Errorf("\nhave: %v\nwant: %v\n", have, want)
}
// Snapshot should be unupdated
if have, want := snapshot.Value().String(), `{"value":"original"}`; have != want {
t.Errorf("\nhave: %v\nwant: %v\n", have, want)
}
}
func TestGetOrRegisterGaugeInfo(t *testing.T) {
r := NewRegistry()
NewRegisteredGaugeInfo("foo", r).Update(
GaugeInfoValue{"chain_id": "5"})
g := GetOrRegisterGaugeInfo("foo", r)
if have, want := g.Value().String(), `{"chain_id":"5"}`; have != want {
t.Errorf("have\n%v\nwant\n%v\n", have, want)
}
}
func TestFunctionalGaugeInfo(t *testing.T) {
info := GaugeInfoValue{"chain_id": "0"}
counter := 1
// A "functional" gauge invokes the method to obtain the value
fg := NewFunctionalGaugeInfo(func() GaugeInfoValue {
info["chain_id"] = strconv.Itoa(counter)
counter++
return info
})
fg.Value()
fg.Value()
if have, want := info["chain_id"], "2"; have != want {
t.Errorf("have %v want %v", have, want)
}
}
func TestGetOrRegisterFunctionalGaugeInfo(t *testing.T) {
r := NewRegistry()
NewRegisteredFunctionalGaugeInfo("foo", r, func() GaugeInfoValue {
return GaugeInfoValue{
"chain_id": "5",
}
})
want := `{"chain_id":"5"}`
have := GetOrRegisterGaugeInfo("foo", r).Value().String()
if have != want {
t.Errorf("have\n%v\nwant\n%v\n", have, want)
}
}

View File

@ -73,6 +73,8 @@ func graphite(c *GraphiteConfig) error {
fmt.Fprintf(w, "%s.%s.value %d %d\n", c.Prefix, name, metric.Value(), now)
case GaugeFloat64:
fmt.Fprintf(w, "%s.%s.value %f %d\n", c.Prefix, name, metric.Value(), now)
case GaugeInfo:
fmt.Fprintf(w, "%s.%s.value %s %d\n", c.Prefix, name, metric.Value().String(), now)
case Histogram:
h := metric.Snapshot()
ps := h.Percentiles(c.Percentiles)

View File

@ -32,6 +32,13 @@ func readMeter(namespace, name string, i interface{}) (string, map[string]interf
"value": metric.Snapshot().Value(),
}
return measurement, fields
case metrics.GaugeInfo:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": ms.Value().String(),
}
return measurement, fields
case metrics.Histogram:
ms := metric.Snapshot()
if ms.Count() <= 0 {

View File

@ -0,0 +1,114 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package influxdb
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics/internal"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
func TestMain(m *testing.M) {
metrics.Enabled = true
os.Exit(m.Run())
}
func TestExampleV1(t *testing.T) {
r := internal.ExampleMetrics()
var have, want string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
haveB, _ := io.ReadAll(r.Body)
have = string(haveB)
r.Body.Close()
}))
defer ts.Close()
u, _ := url.Parse(ts.URL)
rep := &reporter{
reg: r,
url: *u,
namespace: "goth.",
}
if err := rep.makeClient(); err != nil {
t.Fatal(err)
}
if err := rep.send(978307200); err != nil {
t.Fatal(err)
}
if wantB, err := os.ReadFile("./testdata/influxdbv1.want"); err != nil {
t.Fatal(err)
} else {
want = string(wantB)
}
if have != want {
t.Errorf("\nhave:\n%v\nwant:\n%v\n", have, want)
t.Logf("have vs want:\n%v", findFirstDiffPos(have, want))
}
}
func TestExampleV2(t *testing.T) {
r := internal.ExampleMetrics()
var have, want string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
haveB, _ := io.ReadAll(r.Body)
have = string(haveB)
r.Body.Close()
}))
defer ts.Close()
rep := &v2Reporter{
reg: r,
endpoint: ts.URL,
namespace: "goth.",
}
rep.client = influxdb2.NewClient(rep.endpoint, rep.token)
defer rep.client.Close()
rep.write = rep.client.WriteAPI(rep.organization, rep.bucket)
rep.send(978307200)
if wantB, err := os.ReadFile("./testdata/influxdbv2.want"); err != nil {
t.Fatal(err)
} else {
want = string(wantB)
}
if have != want {
t.Errorf("\nhave:\n%v\nwant:\n%v\n", have, want)
t.Logf("have vs want:\n %v", findFirstDiffPos(have, want))
}
}
func findFirstDiffPos(a, b string) string {
yy := strings.Split(b, "\n")
for i, x := range strings.Split(a, "\n") {
if i >= len(yy) {
return fmt.Sprintf("have:%d: %s\nwant:%d: <EOF>", i, x, i)
}
if y := yy[i]; x != y {
return fmt.Sprintf("have:%d: %s\nwant:%d: %s", i, x, i, y)
}
}
return ""
}

View File

@ -79,7 +79,7 @@ func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password,
return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
}
if err := rep.send(); err != nil {
if err := rep.send(0); err != nil {
return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
}
@ -107,7 +107,7 @@ func (r *reporter) run() {
for {
select {
case <-intervalTicker.C:
if err := r.send(); err != nil {
if err := r.send(0); err != nil {
log.Warn("Unable to send to InfluxDB", "err", err)
}
case <-pingTicker.C:
@ -123,7 +123,9 @@ func (r *reporter) run() {
}
}
func (r *reporter) send() error {
// send sends the measurements. If provided tstamp is >0, it is used. Otherwise,
// a 'fresh' timestamp is used.
func (r *reporter) send(tstamp int64) error {
bps, err := client.NewBatchPoints(
client.BatchPointsConfig{
Database: r.database,
@ -132,7 +134,12 @@ func (r *reporter) send() error {
return err
}
r.reg.Each(func(name string, i interface{}) {
now := time.Now()
var now time.Time
if tstamp <= 0 {
now = time.Now()
} else {
now = time.Unix(tstamp, 0)
}
measurement, fields := readMeter(r.namespace, name, i)
if fields == nil {
return

View File

@ -64,7 +64,7 @@ func (r *v2Reporter) run() {
for {
select {
case <-intervalTicker.C:
r.send()
r.send(0)
case <-pingTicker.C:
_, err := r.client.Health(context.Background())
if err != nil {
@ -74,9 +74,16 @@ func (r *v2Reporter) run() {
}
}
func (r *v2Reporter) send() {
// send sends the measurements. If provided tstamp is >0, it is used. Otherwise,
// a 'fresh' timestamp is used.
func (r *v2Reporter) send(tstamp int64) {
r.reg.Each(func(name string, i interface{}) {
now := time.Now()
var now time.Time
if tstamp <= 0 {
now = time.Now()
} else {
now = time.Unix(tstamp, 0)
}
measurement, fields := readMeter(r.namespace, name, i)
if fields == nil {
return

View File

@ -0,0 +1,9 @@
goth.test/counter.count value=12345 978307200000000000
goth.test/counter_float64.count value=54321.98 978307200000000000
goth.test/gauge.gauge value=23456i 978307200000000000
goth.test/gauge_float64.gauge value=34567.89 978307200000000000
goth.test/gauge_info.gauge value="{\"arch\":\"amd64\",\"commit\":\"7caa2d8163ae3132c1c2d6978c76610caee2d949\",\"os\":\"linux\",\"protocol_versions\":\"64 65 66\",\"version\":\"1.10.18-unstable\"}" 978307200000000000
goth.test/histogram.histogram count=3i,max=3i,mean=2,min=1i,p25=1,p50=2,p75=3,p95=3,p99=3,p999=3,p9999=3,stddev=0.816496580927726,variance=0.6666666666666666 978307200000000000
goth.test/meter.meter count=0i,m1=0,m15=0,m5=0,mean=0 978307200000000000
goth.test/resetting_timer.span count=6i,max=120000000i,mean=30000000,min=10000000i,p50=12000000i,p95=120000000i,p99=120000000i 978307200000000000
goth.test/timer.timer count=6i,m1=0,m15=0,m5=0,max=120000000i,mean=38333333.333333336,meanrate=0,min=20000000i,p50=22500000,p75=48000000,p95=120000000,p99=120000000,p999=120000000,p9999=120000000,stddev=36545253.529775314,variance=1335555555555555.2 978307200000000000

View File

@ -0,0 +1,9 @@
goth.test/counter.count value=12345 978307200000000000
goth.test/counter_float64.count value=54321.98 978307200000000000
goth.test/gauge.gauge value=23456i 978307200000000000
goth.test/gauge_float64.gauge value=34567.89 978307200000000000
goth.test/gauge_info.gauge value="{\"arch\":\"amd64\",\"commit\":\"7caa2d8163ae3132c1c2d6978c76610caee2d949\",\"os\":\"linux\",\"protocol_versions\":\"64 65 66\",\"version\":\"1.10.18-unstable\"}" 978307200000000000
goth.test/histogram.histogram count=3i,max=3i,mean=2,min=1i,p25=1,p50=2,p75=3,p95=3,p99=3,p999=3,p9999=3,stddev=0.816496580927726,variance=0.6666666666666666 978307200000000000
goth.test/meter.meter count=0i,m1=0,m15=0,m5=0,mean=0 978307200000000000
goth.test/resetting_timer.span count=6i,max=120000000i,mean=30000000,min=10000000i,p50=12000000i,p95=120000000i,p99=120000000i 978307200000000000
goth.test/timer.timer count=6i,m1=0,m15=0,m5=0,max=120000000i,mean=38333333.333333336,meanrate=0,min=20000000i,p50=22500000,p75=48000000,p95=120000000,p99=120000000,p999=120000000,p9999=120000000,stddev=36545253.529775314,variance=1335555555555555.2 978307200000000000

View File

@ -0,0 +1,64 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package internal
import (
"time"
"github.com/ethereum/go-ethereum/metrics"
)
// ExampleMetrics returns an ordered registry populated with a sample of metrics.
func ExampleMetrics() metrics.Registry {
var registry = metrics.NewOrderedRegistry()
metrics.NewRegisteredCounterFloat64("test/counter", registry).Inc(12345)
metrics.NewRegisteredCounterFloat64("test/counter_float64", registry).Inc(54321.98)
metrics.NewRegisteredGauge("test/gauge", registry).Update(23456)
metrics.NewRegisteredGaugeFloat64("test/gauge_float64", registry).Update(34567.89)
metrics.NewRegisteredGaugeInfo("test/gauge_info", registry).Update(
metrics.GaugeInfoValue{
"version": "1.10.18-unstable",
"arch": "amd64",
"os": "linux",
"commit": "7caa2d8163ae3132c1c2d6978c76610caee2d949",
"protocol_versions": "64 65 66",
})
metrics.NewRegisteredHistogram("test/histogram", registry, metrics.NewSampleSnapshot(3, []int64{1, 2, 3}))
registry.Register("test/meter", metrics.NewInactiveMeter())
{
timer := metrics.NewRegisteredResettingTimer("test/resetting_timer", registry)
timer.Update(10 * time.Millisecond)
timer.Update(11 * time.Millisecond)
timer.Update(12 * time.Millisecond)
timer.Update(120 * time.Millisecond)
timer.Update(13 * time.Millisecond)
timer.Update(14 * time.Millisecond)
}
{
timer := metrics.NewRegisteredTimer("test/timer", registry)
timer.Update(20 * time.Millisecond)
timer.Update(21 * time.Millisecond)
timer.Update(22 * time.Millisecond)
timer.Update(120 * time.Millisecond)
timer.Update(23 * time.Millisecond)
timer.Update(24 * time.Millisecond)
timer.Stop()
}
registry.Register("test/empty_resetting_timer", metrics.NewResettingTimer().Snapshot())
return registry
}

View File

@ -126,6 +126,10 @@ func (rep *Reporter) BuildRequest(now time.Time, r metrics.Registry) (snapshot B
measurement[Name] = name
measurement[Value] = m.Value()
snapshot.Gauges = append(snapshot.Gauges, measurement)
case metrics.GaugeInfo:
measurement[Name] = name
measurement[Value] = m.Value()
snapshot.Gauges = append(snapshot.Gauges, measurement)
case metrics.Histogram:
if m.Count() > 0 {
gauges := make([]Measurement, histogramGaugeCount)

View File

@ -33,6 +33,9 @@ func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) {
case GaugeFloat64:
l.Printf("gauge %s\n", name)
l.Printf(" value: %f\n", metric.Value())
case GaugeInfo:
l.Printf("gauge %s\n", name)
l.Printf(" value: %s\n", metric.Value())
case Healthcheck:
metric.Check()
l.Printf("healthcheck %s\n", name)

View File

@ -58,6 +58,16 @@ func NewMeter() Meter {
return m
}
// NewInactiveMeter returns a meter but does not start any goroutines. This
// method is mainly intended for testing.
func NewInactiveMeter() Meter {
if !Enabled {
return NilMeter{}
}
m := newStandardMeter()
return m
}
// NewMeterForced constructs a new StandardMeter and launches a goroutine no matter
// the global switch is enabled or not.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.

View File

@ -3,6 +3,7 @@ package metrics
import (
"bufio"
"fmt"
"io"
"log"
"net"
"os"
@ -57,16 +58,10 @@ func getShortHostname() string {
return shortHostName
}
func openTSDB(c *OpenTSDBConfig) error {
shortHostname := getShortHostname()
now := time.Now().Unix()
// writeRegistry writes the registry-metrics on the opentsb format.
func (c *OpenTSDBConfig) writeRegistry(w io.Writer, now int64, shortHostname string) {
du := float64(c.DurationUnit)
conn, err := net.DialTCP("tcp", nil, c.Addr)
if nil != err {
return err
}
defer conn.Close()
w := bufio.NewWriter(conn)
c.Registry.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
@ -77,6 +72,8 @@ func openTSDB(c *OpenTSDBConfig) error {
fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname)
case GaugeFloat64:
fmt.Fprintf(w, "put %s.%s.value %d %f host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname)
case GaugeInfo:
fmt.Fprintf(w, "put %s.%s.value %d %s host=%s\n", c.Prefix, name, now, metric.Value().String(), shortHostname)
case Histogram:
h := metric.Snapshot()
ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
@ -115,7 +112,17 @@ func openTSDB(c *OpenTSDBConfig) error {
fmt.Fprintf(w, "put %s.%s.fifteen-minute %d %.2f host=%s\n", c.Prefix, name, now, t.Rate15(), shortHostname)
fmt.Fprintf(w, "put %s.%s.mean-rate %d %.2f host=%s\n", c.Prefix, name, now, t.RateMean(), shortHostname)
}
w.Flush()
})
}
func openTSDB(c *OpenTSDBConfig) error {
conn, err := net.DialTCP("tcp", nil, c.Addr)
if nil != err {
return err
}
defer conn.Close()
w := bufio.NewWriter(conn)
c.writeRegistry(w, time.Now().Unix(), getShortHostname())
w.Flush()
return nil
}

View File

@ -2,6 +2,9 @@ package metrics
import (
"net"
"os"
"strings"
"testing"
"time"
)
@ -19,3 +22,30 @@ func ExampleOpenTSDBWithConfig() {
DurationUnit: time.Millisecond,
})
}
func TestExampleOpenTSB(t *testing.T) {
r := NewOrderedRegistry()
NewRegisteredGaugeInfo("foo", r).Update(GaugeInfoValue{"chain_id": "5"})
NewRegisteredGaugeFloat64("pi", r).Update(3.14)
NewRegisteredCounter("months", r).Inc(12)
NewRegisteredCounterFloat64("tau", r).Inc(1.57)
NewRegisteredMeter("elite", r).Mark(1337)
NewRegisteredTimer("second", r).Update(time.Second)
NewRegisteredCounterFloat64("tau", r).Inc(1.57)
NewRegisteredCounterFloat64("tau", r).Inc(1.57)
w := new(strings.Builder)
(&OpenTSDBConfig{
Registry: r,
DurationUnit: time.Millisecond,
Prefix: "pre",
}).writeRegistry(w, 978307200, "hal9000")
wantB, err := os.ReadFile("./testdata/opentsb.want")
if err != nil {
t.Fatal(err)
}
if have, want := w.String(), string(wantB); have != want {
t.Errorf("\nhave:\n%v\nwant:\n%v\n", have, want)
}
}

View File

@ -19,6 +19,7 @@ package prometheus
import (
"bytes"
"fmt"
"sort"
"strconv"
"strings"
@ -46,6 +47,34 @@ func newCollector() *collector {
}
}
// Add adds the metric i to the collector. This method returns an error if the
// metric type is not supported/known.
func (c *collector) Add(name string, i any) error {
switch m := i.(type) {
case metrics.Counter:
c.addCounter(name, m.Snapshot())
case metrics.CounterFloat64:
c.addCounterFloat64(name, m.Snapshot())
case metrics.Gauge:
c.addGauge(name, m.Snapshot())
case metrics.GaugeFloat64:
c.addGaugeFloat64(name, m.Snapshot())
case metrics.GaugeInfo:
c.addGaugeInfo(name, m.Snapshot())
case metrics.Histogram:
c.addHistogram(name, m.Snapshot())
case metrics.Meter:
c.addMeter(name, m.Snapshot())
case metrics.Timer:
c.addTimer(name, m.Snapshot())
case metrics.ResettingTimer:
c.addResettingTimer(name, m.Snapshot())
default:
return fmt.Errorf("unknown prometheus metric type %T", i)
}
return nil
}
func (c *collector) addCounter(name string, m metrics.Counter) {
c.writeGaugeCounter(name, m.Count())
}
@ -62,6 +91,10 @@ func (c *collector) addGaugeFloat64(name string, m metrics.GaugeFloat64) {
c.writeGaugeCounter(name, m.Value())
}
func (c *collector) addGaugeInfo(name string, m metrics.GaugeInfo) {
c.writeGaugeInfo(name, m.Value())
}
func (c *collector) addHistogram(name string, m metrics.Histogram) {
pv := []float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}
ps := m.Percentiles(pv)
@ -102,6 +135,19 @@ func (c *collector) addResettingTimer(name string, m metrics.ResettingTimer) {
c.buff.WriteRune('\n')
}
func (c *collector) writeGaugeInfo(name string, value metrics.GaugeInfoValue) {
name = mutateKey(name)
c.buff.WriteString(fmt.Sprintf(typeGaugeTpl, name))
c.buff.WriteString(name)
c.buff.WriteString(" ")
var kvs []string
for k, v := range value {
kvs = append(kvs, fmt.Sprintf("%v=%q", k, v))
}
sort.Strings(kvs)
c.buff.WriteString(fmt.Sprintf("{%v} 1\n\n", strings.Join(kvs, ", ")))
}
func (c *collector) writeGaugeCounter(name string, value interface{}) {
name = mutateKey(name)
c.buff.WriteString(fmt.Sprintf(typeGaugeTpl, name))

View File

@ -1,11 +1,29 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package prometheus
import (
"fmt"
"os"
"strings"
"testing"
"time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics/internal"
)
func TestMain(m *testing.M) {
@ -14,104 +32,34 @@ func TestMain(m *testing.M) {
}
func TestCollector(t *testing.T) {
c := newCollector()
counter := metrics.NewCounter()
counter.Inc(12345)
c.addCounter("test/counter", counter)
counterfloat64 := metrics.NewCounterFloat64()
counterfloat64.Inc(54321.98)
c.addCounterFloat64("test/counter_float64", counterfloat64)
gauge := metrics.NewGauge()
gauge.Update(23456)
c.addGauge("test/gauge", gauge)
gaugeFloat64 := metrics.NewGaugeFloat64()
gaugeFloat64.Update(34567.89)
c.addGaugeFloat64("test/gauge_float64", gaugeFloat64)
histogram := metrics.NewHistogram(&metrics.NilSample{})
c.addHistogram("test/histogram", histogram)
meter := metrics.NewMeter()
defer meter.Stop()
meter.Mark(9999999)
c.addMeter("test/meter", meter)
timer := metrics.NewTimer()
defer timer.Stop()
timer.Update(20 * time.Millisecond)
timer.Update(21 * time.Millisecond)
timer.Update(22 * time.Millisecond)
timer.Update(120 * time.Millisecond)
timer.Update(23 * time.Millisecond)
timer.Update(24 * time.Millisecond)
c.addTimer("test/timer", timer)
resettingTimer := metrics.NewResettingTimer()
resettingTimer.Update(10 * time.Millisecond)
resettingTimer.Update(11 * time.Millisecond)
resettingTimer.Update(12 * time.Millisecond)
resettingTimer.Update(120 * time.Millisecond)
resettingTimer.Update(13 * time.Millisecond)
resettingTimer.Update(14 * time.Millisecond)
c.addResettingTimer("test/resetting_timer", resettingTimer.Snapshot())
emptyResettingTimer := metrics.NewResettingTimer().Snapshot()
c.addResettingTimer("test/empty_resetting_timer", emptyResettingTimer)
const expectedOutput = `# TYPE test_counter gauge
test_counter 12345
# TYPE test_counter_float64 gauge
test_counter_float64 54321.98
# TYPE test_gauge gauge
test_gauge 23456
# TYPE test_gauge_float64 gauge
test_gauge_float64 34567.89
# TYPE test_histogram_count counter
test_histogram_count 0
# TYPE test_histogram summary
test_histogram {quantile="0.5"} 0
test_histogram {quantile="0.75"} 0
test_histogram {quantile="0.95"} 0
test_histogram {quantile="0.99"} 0
test_histogram {quantile="0.999"} 0
test_histogram {quantile="0.9999"} 0
# TYPE test_meter gauge
test_meter 9999999
# TYPE test_timer_count counter
test_timer_count 6
# TYPE test_timer summary
test_timer {quantile="0.5"} 2.25e+07
test_timer {quantile="0.75"} 4.8e+07
test_timer {quantile="0.95"} 1.2e+08
test_timer {quantile="0.99"} 1.2e+08
test_timer {quantile="0.999"} 1.2e+08
test_timer {quantile="0.9999"} 1.2e+08
# TYPE test_resetting_timer_count counter
test_resetting_timer_count 6
# TYPE test_resetting_timer summary
test_resetting_timer {quantile="0.50"} 12000000
test_resetting_timer {quantile="0.95"} 120000000
test_resetting_timer {quantile="0.99"} 120000000
`
exp := c.buff.String()
if exp != expectedOutput {
t.Log("Expected Output:\n", expectedOutput)
t.Log("Actual Output:\n", exp)
t.Fatal("unexpected collector output")
var (
c = newCollector()
want string
)
internal.ExampleMetrics().Each(func(name string, i interface{}) {
c.Add(name, i)
})
if wantB, err := os.ReadFile("./testdata/prometheus.want"); err != nil {
t.Fatal(err)
} else {
want = string(wantB)
}
if have := c.buff.String(); have != want {
t.Logf("have\n%v", have)
t.Logf("have vs want:\n%v", findFirstDiffPos(have, want))
t.Fatalf("unexpected collector output")
}
}
func findFirstDiffPos(a, b string) string {
yy := strings.Split(b, "\n")
for i, x := range strings.Split(a, "\n") {
if i >= len(yy) {
return fmt.Sprintf("a:%d: %s\nb:%d: <EOF>", i, x, i)
}
if y := yy[i]; x != y {
return fmt.Sprintf("a:%d: %s\nb:%d: %s", i, x, i, y)
}
}
return ""
}

View File

@ -41,25 +41,7 @@ func Handler(reg metrics.Registry) http.Handler {
for _, name := range names {
i := reg.Get(name)
switch m := i.(type) {
case metrics.Counter:
c.addCounter(name, m.Snapshot())
case metrics.CounterFloat64:
c.addCounterFloat64(name, m.Snapshot())
case metrics.Gauge:
c.addGauge(name, m.Snapshot())
case metrics.GaugeFloat64:
c.addGaugeFloat64(name, m.Snapshot())
case metrics.Histogram:
c.addHistogram(name, m.Snapshot())
case metrics.Meter:
c.addMeter(name, m.Snapshot())
case metrics.Timer:
c.addTimer(name, m.Snapshot())
case metrics.ResettingTimer:
c.addResettingTimer(name, m.Snapshot())
default:
if err := c.Add(name, i); err != nil {
log.Warn("Unknown Prometheus metric type", "type", fmt.Sprintf("%T", i))
}
}

View File

@ -0,0 +1,48 @@
# TYPE test_counter gauge
test_counter 12345
# TYPE test_counter_float64 gauge
test_counter_float64 54321.98
# TYPE test_gauge gauge
test_gauge 23456
# TYPE test_gauge_float64 gauge
test_gauge_float64 34567.89
# TYPE test_gauge_info gauge
test_gauge_info {arch="amd64", commit="7caa2d8163ae3132c1c2d6978c76610caee2d949", os="linux", protocol_versions="64 65 66", version="1.10.18-unstable"} 1
# TYPE test_histogram_count counter
test_histogram_count 3
# TYPE test_histogram summary
test_histogram {quantile="0.5"} 2
test_histogram {quantile="0.75"} 3
test_histogram {quantile="0.95"} 3
test_histogram {quantile="0.99"} 3
test_histogram {quantile="0.999"} 3
test_histogram {quantile="0.9999"} 3
# TYPE test_meter gauge
test_meter 0
# TYPE test_resetting_timer_count counter
test_resetting_timer_count 6
# TYPE test_resetting_timer summary
test_resetting_timer {quantile="0.50"} 12000000
test_resetting_timer {quantile="0.95"} 120000000
test_resetting_timer {quantile="0.99"} 120000000
# TYPE test_timer_count counter
test_timer_count 6
# TYPE test_timer summary
test_timer {quantile="0.5"} 2.25e+07
test_timer {quantile="0.75"} 4.8e+07
test_timer {quantile="0.95"} 1.2e+08
test_timer {quantile="0.99"} 1.2e+08
test_timer {quantile="0.999"} 1.2e+08
test_timer {quantile="0.9999"} 1.2e+08

View File

@ -3,6 +3,7 @@ package metrics
import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
)
@ -47,17 +48,39 @@ type Registry interface {
Unregister(string)
}
type orderedRegistry struct {
StandardRegistry
}
// Call the given function for each registered metric.
func (r *orderedRegistry) Each(f func(string, interface{})) {
var names []string
reg := r.registered()
for name := range reg {
names = append(names, name)
}
sort.Strings(names)
for _, name := range names {
f(name, reg[name])
}
}
// NewRegistry creates a new registry.
func NewRegistry() Registry {
return new(StandardRegistry)
}
// NewOrderedRegistry creates a new ordered registry (for testing).
func NewOrderedRegistry() Registry {
return new(orderedRegistry)
}
// The standard implementation of a Registry uses sync.map
// of names to metrics.
type StandardRegistry struct {
metrics sync.Map
}
// Create a new registry.
func NewRegistry() Registry {
return &StandardRegistry{}
}
// Call the given function for each registered metric.
func (r *StandardRegistry) Each(f func(string, interface{})) {
for name, i := range r.registered() {
@ -191,7 +214,7 @@ func (r *StandardRegistry) Unregister(name string) {
func (r *StandardRegistry) loadOrRegister(name string, i interface{}) (interface{}, bool, bool) {
switch i.(type) {
case Counter, CounterFloat64, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer:
case Counter, CounterFloat64, Gauge, GaugeFloat64, GaugeInfo, Healthcheck, Histogram, Meter, Timer, ResettingTimer:
default:
return nil, false, false
}

View File

@ -23,6 +23,8 @@ func Syslog(r Registry, d time.Duration, w *syslog.Writer) {
w.Info(fmt.Sprintf("gauge %s: value: %d", name, metric.Value()))
case GaugeFloat64:
w.Info(fmt.Sprintf("gauge %s: value: %f", name, metric.Value()))
case GaugeInfo:
w.Info(fmt.Sprintf("gauge %s: value: %s", name, metric.Value()))
case Healthcheck:
metric.Check()
w.Info(fmt.Sprintf("healthcheck %s: error: %v", name, metric.Error()))

23
metrics/testdata/opentsb.want vendored Normal file
View File

@ -0,0 +1,23 @@
put pre.elite.count 978307200 0 host=hal9000
put pre.elite.one-minute 978307200 0.00 host=hal9000
put pre.elite.five-minute 978307200 0.00 host=hal9000
put pre.elite.fifteen-minute 978307200 0.00 host=hal9000
put pre.elite.mean 978307200 0.00 host=hal9000
put pre.foo.value 978307200 {"chain_id":"5"} host=hal9000
put pre.months.count 978307200 12 host=hal9000
put pre.pi.value 978307200 3.140000 host=hal9000
put pre.second.count 978307200 1 host=hal9000
put pre.second.min 978307200 1000 host=hal9000
put pre.second.max 978307200 1000 host=hal9000
put pre.second.mean 978307200 1000.00 host=hal9000
put pre.second.std-dev 978307200 0.00 host=hal9000
put pre.second.50-percentile 978307200 1000.00 host=hal9000
put pre.second.75-percentile 978307200 1000.00 host=hal9000
put pre.second.95-percentile 978307200 1000.00 host=hal9000
put pre.second.99-percentile 978307200 1000.00 host=hal9000
put pre.second.999-percentile 978307200 1000.00 host=hal9000
put pre.second.one-minute 978307200 0.00 host=hal9000
put pre.second.five-minute 978307200 0.00 host=hal9000
put pre.second.fifteen-minute 978307200 0.00 host=hal9000
put pre.second.mean-rate 978307200 0.00 host=hal9000
put pre.tau.count 978307200 1.570000 host=hal9000

View File

@ -39,6 +39,9 @@ func WriteOnce(r Registry, w io.Writer) {
case GaugeFloat64:
fmt.Fprintf(w, "gauge %s\n", namedMetric.name)
fmt.Fprintf(w, " value: %f\n", metric.Value())
case GaugeInfo:
fmt.Fprintf(w, "gauge %s\n", namedMetric.name)
fmt.Fprintf(w, " value: %s\n", metric.Value().String())
case Healthcheck:
metric.Check()
fmt.Fprintf(w, "healthcheck %s\n", namedMetric.name)