This chang creates a GaugeInfo metrics type for registering informational (textual) metrics, e.g. geth version number. It also improves the testing for backend-exporters, and uses a shared subpackage in 'internal' to provide sample datasets and ordered registry. Implements #21783 --------- Co-authored-by: Martin Holst Swende <martin@swende.se>
		
			
				
	
	
		
			153 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			153 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package influxdb
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	uurl "net/url"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/log"
 | |
| 	"github.com/ethereum/go-ethereum/metrics"
 | |
| 	client "github.com/influxdata/influxdb1-client/v2"
 | |
| )
 | |
| 
 | |
| type reporter struct {
 | |
| 	reg      metrics.Registry
 | |
| 	interval time.Duration
 | |
| 
 | |
| 	url       uurl.URL
 | |
| 	database  string
 | |
| 	username  string
 | |
| 	password  string
 | |
| 	namespace string
 | |
| 	tags      map[string]string
 | |
| 
 | |
| 	client client.Client
 | |
| 
 | |
| 	cache map[string]int64
 | |
| }
 | |
| 
 | |
| // InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
 | |
| func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
 | |
| 	InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
 | |
| }
 | |
| 
 | |
| // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
 | |
| func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
 | |
| 	u, err := uurl.Parse(url)
 | |
| 	if err != nil {
 | |
| 		log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	rep := &reporter{
 | |
| 		reg:       r,
 | |
| 		interval:  d,
 | |
| 		url:       *u,
 | |
| 		database:  database,
 | |
| 		username:  username,
 | |
| 		password:  password,
 | |
| 		namespace: namespace,
 | |
| 		tags:      tags,
 | |
| 		cache:     make(map[string]int64),
 | |
| 	}
 | |
| 	if err := rep.makeClient(); err != nil {
 | |
| 		log.Warn("Unable to make InfluxDB client", "err", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	rep.run()
 | |
| }
 | |
| 
 | |
| // InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
 | |
| func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
 | |
| 	u, err := uurl.Parse(url)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err)
 | |
| 	}
 | |
| 
 | |
| 	rep := &reporter{
 | |
| 		reg:       r,
 | |
| 		url:       *u,
 | |
| 		database:  database,
 | |
| 		username:  username,
 | |
| 		password:  password,
 | |
| 		namespace: namespace,
 | |
| 		tags:      tags,
 | |
| 		cache:     make(map[string]int64),
 | |
| 	}
 | |
| 	if err := rep.makeClient(); err != nil {
 | |
| 		return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	if err := rep.send(0); err != nil {
 | |
| 		return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *reporter) makeClient() (err error) {
 | |
| 	r.client, err = client.NewHTTPClient(client.HTTPConfig{
 | |
| 		Addr:     r.url.String(),
 | |
| 		Username: r.username,
 | |
| 		Password: r.password,
 | |
| 		Timeout:  10 * time.Second,
 | |
| 	})
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (r *reporter) run() {
 | |
| 	intervalTicker := time.NewTicker(r.interval)
 | |
| 	pingTicker := time.NewTicker(time.Second * 5)
 | |
| 
 | |
| 	defer intervalTicker.Stop()
 | |
| 	defer pingTicker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-intervalTicker.C:
 | |
| 			if err := r.send(0); err != nil {
 | |
| 				log.Warn("Unable to send to InfluxDB", "err", err)
 | |
| 			}
 | |
| 		case <-pingTicker.C:
 | |
| 			_, _, err := r.client.Ping(0)
 | |
| 			if err != nil {
 | |
| 				log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
 | |
| 
 | |
| 				if err = r.makeClient(); err != nil {
 | |
| 					log.Warn("Unable to make InfluxDB client", "err", err)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // send sends the measurements. If provided tstamp is >0, it is used. Otherwise,
 | |
| // a 'fresh' timestamp is used.
 | |
| func (r *reporter) send(tstamp int64) error {
 | |
| 	bps, err := client.NewBatchPoints(
 | |
| 		client.BatchPointsConfig{
 | |
| 			Database: r.database,
 | |
| 		})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	r.reg.Each(func(name string, i interface{}) {
 | |
| 		var now time.Time
 | |
| 		if tstamp <= 0 {
 | |
| 			now = time.Now()
 | |
| 		} else {
 | |
| 			now = time.Unix(tstamp, 0)
 | |
| 		}
 | |
| 		measurement, fields := readMeter(r.namespace, name, i)
 | |
| 		if fields == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		if p, err := client.NewPoint(measurement, r.tags, fields, now); err == nil {
 | |
| 			bps.AddPoint(p)
 | |
| 		}
 | |
| 	})
 | |
| 	return r.client.Write(bps)
 | |
| }
 |