go-ethereum/metrics/influxdb/influxdbv2.go
Felix Lange b628d72766
build: upgrade to go 1.19 (#25726)
This changes the CI / release builds to use the latest Go version. It also
upgrades golangci-lint to a newer version compatible with Go 1.19.

In Go 1.19, godoc has gained official support for links and lists. The
syntax for code blocks in doc comments has changed and now requires a
leading tab character. gofmt adapts comments to the new syntax
automatically, so there are a lot of comment re-formatting changes in this
PR. We need to apply the new format in order to pass the CI lint stage with
Go 1.19.

With the linter upgrade, I have decided to disable 'gosec' - it produces
too many false-positive warnings. The 'deadcode' and 'varcheck' linters
have also been removed because golangci-lint warns about them being
unmaintained. 'unused' provides similar coverage and we already have it
enabled, so we don't lose much with this change.
2022-09-10 13:25:40 +02:00

214 lines
5.2 KiB
Go

package influxdb
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
type v2Reporter struct {
reg metrics.Registry
interval time.Duration
endpoint string
token string
bucket string
organization string
namespace string
tags map[string]string
client influxdb2.Client
write api.WriteAPI
cache map[string]int64
}
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, token string, bucket string, organization string, namespace string, tags map[string]string) {
rep := &v2Reporter{
reg: r,
interval: d,
endpoint: endpoint,
token: token,
bucket: bucket,
organization: organization,
namespace: namespace,
tags: tags,
cache: make(map[string]int64),
}
rep.client = influxdb2.NewClient(rep.endpoint, rep.token)
defer rep.client.Close()
// async write client
rep.write = rep.client.WriteAPI(rep.organization, rep.bucket)
errorsCh := rep.write.Errors()
// have to handle write errors in a separate goroutine like this b/c the channel is unbuffered and will block writes if not read
go func() {
for err := range errorsCh {
log.Warn("write error", "err", err.Error())
}
}()
rep.run()
}
func (r *v2Reporter) run() {
intervalTicker := time.NewTicker(r.interval)
pingTicker := time.NewTicker(time.Second * 5)
for {
select {
case <-intervalTicker.C:
r.send()
case <-pingTicker.C:
_, err := r.client.Health(context.Background())
if err != nil {
log.Warn("Got error from influxdb client health check", "err", err.Error())
}
}
}
}
func (r *v2Reporter) send() {
r.reg.Each(func(name string, i interface{}) {
now := time.Now()
namespace := r.namespace
switch metric := i.(type) {
case metrics.Counter:
v := metric.Count()
l := r.cache[name]
measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{
"value": v - l,
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
r.cache[name] = v
case metrics.Gauge:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": ms.Value(),
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.GaugeFloat64:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": ms.Value(),
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.Histogram:
ms := metric.Snapshot()
if ms.Count() > 0 {
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
}
case metrics.Meter:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.meter", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"mean": ms.RateMean(),
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.Timer:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
measurement := fmt.Sprintf("%s%s.timer", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"meanrate": ms.RateMean(),
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.ResettingTimer:
t := metric.Snapshot()
if len(t.Values()) > 0 {
ps := t.Percentiles([]float64{50, 95, 99})
val := t.Values()
measurement := fmt.Sprintf("%s%s.span", namespace, name)
fields := map[string]interface{}{
"count": len(val),
"max": val[len(val)-1],
"mean": t.Mean(),
"min": val[0],
"p50": ps[0],
"p95": ps[1],
"p99": ps[2],
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
}
}
})
// Force all unwritten data to be sent
r.write.Flush()
}