From dc18a76e3089fe44f609ee71d9b5be0eaa5f9595 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Sat, 24 Oct 2020 21:29:55 +0800 Subject: [PATCH] prometheus metrics output --- go.mod | 1 + go.sum | 6 + statediff/indexer/indexer.go | 29 +++- statediff/indexer/prom/db_stats_collector.go | 146 ++++++++++++++++++ statediff/indexer/prom/prom.go | 151 +++++++++++++++++++ 5 files changed, 326 insertions(+), 7 deletions(-) create mode 100644 statediff/indexer/prom/db_stats_collector.go create mode 100644 statediff/indexer/prom/prom.go diff --git a/go.mod b/go.mod index f4bcf4cf9..a42ccde0e 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,7 @@ require ( github.com/onsi/gomega v1.4.3 github.com/pborman/uuid v0.0.0-20170112150404-1b00554d8222 github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 + github.com/prometheus/client_golang v0.9.3 github.com/prometheus/tsdb v0.7.1 github.com/rjeczalik/notify v0.9.1 github.com/rs/cors v0.0.0-20160617231935-a62a804a8a00 diff --git a/go.sum b/go.sum index d4b813f87..b7277644b 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,7 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj github.com/aws/aws-sdk-go v1.25.48 h1:J82DYDGZHOKHdhx6hD24Tm30c2C3GchYGfN0mf9iKUk= github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= @@ -269,6 +270,7 @@ github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= @@ -333,12 +335,16 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 h1:sofwID9zm4tzrgykg80hfFph1mryUeLRsUfoocVVmRY= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.6.2-0.20190402121629-4f204dcbc150 h1:ZeU+auZj1iNzN8iVhff6M38Mfu73FQiJve/GEXYJBjE= github.com/prometheus/tsdb v0.6.2-0.20190402121629-4f204dcbc150/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 1dafe6fde..1e5b749f0 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -36,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/prom" "github.com/ethereum/go-ethereum/statediff/indexer/shared" sdtypes "github.com/ethereum/go-ethereum/statediff/types" ) @@ -92,7 +93,6 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Calculate reward reward := CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts) - traceMsg += fmt.Sprintf("payload decoding duration: %s\r\n", time.Now().Sub(t).String()) t = time.Now() // Begin new db tx for everything tx, err := sdi.dbWriter.db.Beginx() @@ -108,15 +108,23 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip shared.Rollback(tx) panic(p) } else { + tDiff := time.Now().Sub(t) + prom.SetTimeMetric("t_state_store_code_processing", tDiff) + traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) + t = time.Now() err = tx.Commit() - traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", time.Now().Sub(t).String()) + tDiff = time.Now().Sub(t) + prom.SetTimeMetric("t_postgres_commit", tDiff) + traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) } traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Now().Sub(start).String()) log.Info(traceMsg) return err }, } - traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", time.Now().Sub(t).String()) + tDiff := time.Now().Sub(t) + prom.SetTimeMetric("t_free_postgres", tDiff) + traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() // Publish and index header, collect headerID @@ -124,13 +132,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if err != nil { return nil, err } - traceMsg += fmt.Sprintf("header processing duration: %s\r\n", time.Now().Sub(t).String()) + tDiff = time.Now().Sub(t) + prom.SetTimeMetric("t_header_processing", tDiff) + traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles if err := sdi.processUncles(tx, headerID, height, uncleNodes); err != nil { return nil, err } - traceMsg += fmt.Sprintf("uncle processing duration: %s\r\n", time.Now().Sub(t).String()) + tDiff = time.Now().Sub(t) + prom.SetTimeMetric("t_uncle_processing", tDiff) + traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs if err := sdi.processReceiptsAndTxs(tx, processArgs{ @@ -145,8 +157,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip }); err != nil { return nil, err } - traceMsg += fmt.Sprintf("tx and receipt processing duration: %s\r\n", time.Now().Sub(t).String()) - // t = time.Now() + tDiff = time.Now().Sub(t) + prom.SetTimeMetric("t_tx_receipt_processing", tDiff) + traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) + t = time.Now() + blocktx.BlockNumber = height blocktx.headerID = headerID return &blocktx, err // return error explicity so that the defer() assigns to it diff --git a/statediff/indexer/prom/db_stats_collector.go b/statediff/indexer/prom/db_stats_collector.go new file mode 100644 index 000000000..3bab65730 --- /dev/null +++ b/statediff/indexer/prom/db_stats_collector.go @@ -0,0 +1,146 @@ +package prom + +import ( + "database/sql" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "ipld_eth_indexer" + subsystem = "connections" +) + +// DBStatsGetter is an interface that gets sql.DBStats. +type DBStatsGetter interface { + Stats() sql.DBStats +} + +// DBStatsCollector implements the prometheus.Collector interface. +type DBStatsCollector struct { + sg DBStatsGetter + + // descriptions of exported metrics + maxOpenDesc *prometheus.Desc + openDesc *prometheus.Desc + inUseDesc *prometheus.Desc + idleDesc *prometheus.Desc + waitedForDesc *prometheus.Desc + blockedSecondsDesc *prometheus.Desc + closedMaxIdleDesc *prometheus.Desc + closedMaxLifetimeDesc *prometheus.Desc +} + +// NewDBStatsCollector creates a new DBStatsCollector. +func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector { + labels := prometheus.Labels{"db_name": dbName} + return &DBStatsCollector{ + sg: sg, + maxOpenDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "max_open"), + "Maximum number of open connections to the database.", + nil, + labels, + ), + openDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "open"), + "The number of established connections both in use and idle.", + nil, + labels, + ), + inUseDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "in_use"), + "The number of connections currently in use.", + nil, + labels, + ), + idleDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "idle"), + "The number of idle connections.", + nil, + labels, + ), + waitedForDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "waited_for"), + "The total number of connections waited for.", + nil, + labels, + ), + blockedSecondsDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"), + "The total time blocked waiting for a new connection.", + nil, + labels, + ), + closedMaxIdleDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"), + "The total number of connections closed due to SetMaxIdleConns.", + nil, + labels, + ), + closedMaxLifetimeDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"), + "The total number of connections closed due to SetConnMaxLifetime.", + nil, + labels, + ), + } +} + +// Describe implements the prometheus.Collector interface. +func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.maxOpenDesc + ch <- c.openDesc + ch <- c.inUseDesc + ch <- c.idleDesc + ch <- c.waitedForDesc + ch <- c.blockedSecondsDesc + ch <- c.closedMaxIdleDesc + ch <- c.closedMaxLifetimeDesc +} + +// Collect implements the prometheus.Collector interface. +func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) { + stats := c.sg.Stats() + + ch <- prometheus.MustNewConstMetric( + c.maxOpenDesc, + prometheus.GaugeValue, + float64(stats.MaxOpenConnections), + ) + ch <- prometheus.MustNewConstMetric( + c.openDesc, + prometheus.GaugeValue, + float64(stats.OpenConnections), + ) + ch <- prometheus.MustNewConstMetric( + c.inUseDesc, + prometheus.GaugeValue, + float64(stats.InUse), + ) + ch <- prometheus.MustNewConstMetric( + c.idleDesc, + prometheus.GaugeValue, + float64(stats.Idle), + ) + ch <- prometheus.MustNewConstMetric( + c.waitedForDesc, + prometheus.CounterValue, + float64(stats.WaitCount), + ) + ch <- prometheus.MustNewConstMetric( + c.blockedSecondsDesc, + prometheus.CounterValue, + stats.WaitDuration.Seconds(), + ) + ch <- prometheus.MustNewConstMetric( + c.closedMaxIdleDesc, + prometheus.CounterValue, + float64(stats.MaxIdleClosed), + ) + ch <- prometheus.MustNewConstMetric( + c.closedMaxLifetimeDesc, + prometheus.CounterValue, + float64(stats.MaxLifetimeClosed), + ) +} diff --git a/statediff/indexer/prom/prom.go b/statediff/indexer/prom/prom.go new file mode 100644 index 000000000..2b9436462 --- /dev/null +++ b/statediff/indexer/prom/prom.go @@ -0,0 +1,151 @@ +package prom + +import ( + "time" + + "github.com/jmoiron/sqlx" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const statsSubsystem = "stats" + +var ( + metrics bool + + receipts prometheus.Counter + transactions prometheus.Counter + blocks prometheus.Counter + + lenPayloadChan prometheus.Gauge + + tPayloadDecode prometheus.Histogram + tFreePostgres prometheus.Histogram + tPostgresCommit prometheus.Histogram + tHeaderProcessing prometheus.Histogram + tUncleProcessing prometheus.Histogram + tTxAndRecProcessing prometheus.Histogram + tStateAndStoreProcessing prometheus.Histogram + tCodeAndCodeHashProcessing prometheus.Histogram +) + +// Init module initialization +func Init() { + metrics = true + + blocks = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "blocks", + Help: "The total number of processed blocks", + }) + transactions = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "transactions", + Help: "The total number of processed transactions", + }) + receipts = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "receipts", + Help: "The total number of processed receipts", + }) + + lenPayloadChan = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "len_payload_chan", + Help: "Current length of publishPayload", + }) + + tFreePostgres = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_free_postgres", + Help: "Time spent waiting for free postgres tx", + }) + tPostgresCommit = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_postgres_commit", + Help: "Postgres transaction commit duration", + }) + tHeaderProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_header_processing", + Help: "Header processing time", + }) + tUncleProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_uncle_processing", + Help: "Uncle processing time", + }) + tTxAndRecProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_tx_receipt_processing", + Help: "Tx and receipt processing time", + }) + tStateAndStoreProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_state_store_code_processing", + Help: "State, storage, and code combinedprocessing time", + }) +} + +// RegisterDBCollector create metric colletor for given connection +func RegisterDBCollector(name string, db *sqlx.DB) { + if metrics { + prometheus.Register(NewDBStatsCollector(name, db)) + } +} + +// BlockInc block counter increment +func BlockInc() { + if metrics { + blocks.Inc() + } +} + +// TransactionInc transaction counter increment +func TransactionInc() { + if metrics { + transactions.Inc() + } +} + +// ReceiptInc receipt counter increment +func ReceiptInc() { + if metrics { + receipts.Inc() + } +} + +// SetLenPayloadChan set chan length +func SetLenPayloadChan(ln int) { + if metrics { + lenPayloadChan.Set(float64(ln)) + } +} + +// SetTimeMetric time metric observation +func SetTimeMetric(name string, t time.Duration) { + if !metrics { + return + } + tAsF64 := t.Seconds() + switch name { + case "t_free_postgres": + tFreePostgres.Observe(tAsF64) + case "t_postgres_commit": + tPostgresCommit.Observe(tAsF64) + case "t_header_processing": + tHeaderProcessing.Observe(tAsF64) + case "t_uncle_processing": + tUncleProcessing.Observe(tAsF64) + case "t_tx_receipt_processing": + tTxAndRecProcessing.Observe(tAsF64) + case "t_state_store_code_processing": + tStateAndStoreProcessing.Observe(tAsF64) + } +}