From 0f81dc32d24d43f4c57dd7f9aaefe1814da108dd Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 21 Oct 2021 11:57:44 -0500 Subject: [PATCH 1/5] prom pkg + monitoring --- monitoring/prometheus.yml | 7 ++ pkg/prom/db_stats_collector.go | 162 +++++++++++++++++++++++++ pkg/prom/prom.go | 209 +++++++++++++++++++++++++++++++++ pkg/prom/web.go | 40 +++++++ 4 files changed, 418 insertions(+) create mode 100644 monitoring/prometheus.yml create mode 100644 pkg/prom/db_stats_collector.go create mode 100644 pkg/prom/prom.go create mode 100644 pkg/prom/web.go diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml new file mode 100644 index 0000000..033137d --- /dev/null +++ b/monitoring/prometheus.yml @@ -0,0 +1,7 @@ +global: + scrape_interval: 10s + +scrape_configs: + - job_name: 'eth-statediiff-service' + static_configs: + - targets: ['localhost:8100'] diff --git a/pkg/prom/db_stats_collector.go b/pkg/prom/db_stats_collector.go new file mode 100644 index 0000000..738f60d --- /dev/null +++ b/pkg/prom/db_stats_collector.go @@ -0,0 +1,162 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package prom + +import ( + "database/sql" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "eth-statediff-service" + subsystem = "connections" +) + +// DBStatsGetter is an interface that gets sql.DBStats. +type DBStatsGetter interface { + Stats() sql.DBStats +} + +// DBStatsCollector implements the prometheus.Collector interface. +type DBStatsCollector struct { + sg DBStatsGetter + + // descriptions of exported metrics + maxOpenDesc *prometheus.Desc + openDesc *prometheus.Desc + inUseDesc *prometheus.Desc + idleDesc *prometheus.Desc + waitedForDesc *prometheus.Desc + blockedSecondsDesc *prometheus.Desc + closedMaxIdleDesc *prometheus.Desc + closedMaxLifetimeDesc *prometheus.Desc +} + +// NewDBStatsCollector creates a new DBStatsCollector. +func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector { + labels := prometheus.Labels{"db_name": dbName} + return &DBStatsCollector{ + sg: sg, + maxOpenDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "max_open"), + "Maximum number of open connections to the database.", + nil, + labels, + ), + openDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "open"), + "The number of established connections both in use and idle.", + nil, + labels, + ), + inUseDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "in_use"), + "The number of connections currently in use.", + nil, + labels, + ), + idleDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "idle"), + "The number of idle connections.", + nil, + labels, + ), + waitedForDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "waited_for"), + "The total number of connections waited for.", + nil, + labels, + ), + blockedSecondsDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"), + "The total time blocked waiting for a new connection.", + nil, + labels, + ), + closedMaxIdleDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"), + "The total number of connections closed due to SetMaxIdleConns.", + nil, + labels, + ), + closedMaxLifetimeDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"), + "The total number of connections closed due to SetConnMaxLifetime.", + nil, + labels, + ), + } +} + +// Describe implements the prometheus.Collector interface. +func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.maxOpenDesc + ch <- c.openDesc + ch <- c.inUseDesc + ch <- c.idleDesc + ch <- c.waitedForDesc + ch <- c.blockedSecondsDesc + ch <- c.closedMaxIdleDesc + ch <- c.closedMaxLifetimeDesc +} + +// Collect implements the prometheus.Collector interface. +func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) { + stats := c.sg.Stats() + + ch <- prometheus.MustNewConstMetric( + c.maxOpenDesc, + prometheus.GaugeValue, + float64(stats.MaxOpenConnections), + ) + ch <- prometheus.MustNewConstMetric( + c.openDesc, + prometheus.GaugeValue, + float64(stats.OpenConnections), + ) + ch <- prometheus.MustNewConstMetric( + c.inUseDesc, + prometheus.GaugeValue, + float64(stats.InUse), + ) + ch <- prometheus.MustNewConstMetric( + c.idleDesc, + prometheus.GaugeValue, + float64(stats.Idle), + ) + ch <- prometheus.MustNewConstMetric( + c.waitedForDesc, + prometheus.CounterValue, + float64(stats.WaitCount), + ) + ch <- prometheus.MustNewConstMetric( + c.blockedSecondsDesc, + prometheus.CounterValue, + stats.WaitDuration.Seconds(), + ) + ch <- prometheus.MustNewConstMetric( + c.closedMaxIdleDesc, + prometheus.CounterValue, + float64(stats.MaxIdleClosed), + ) + ch <- prometheus.MustNewConstMetric( + c.closedMaxLifetimeDesc, + prometheus.CounterValue, + float64(stats.MaxLifetimeClosed), + ) +} diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go new file mode 100644 index 0000000..75245cb --- /dev/null +++ b/pkg/prom/prom.go @@ -0,0 +1,209 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package prom + +import ( + "time" + + "github.com/jmoiron/sqlx" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const statsSubsystem = "stats" + +var ( + metrics bool + + receipts prometheus.Counter + transactions prometheus.Counter + blocks prometheus.Counter + logs prometheus.Counter + accessListEntries prometheus.Counter + + lenPayloadChan prometheus.Gauge + + tPayloadDecode prometheus.Histogram + tFreePostgres prometheus.Histogram + tPostgresCommit prometheus.Histogram + tHeaderProcessing prometheus.Histogram + tUncleProcessing prometheus.Histogram + tTxAndRecProcessing prometheus.Histogram + tStateAndStoreProcessing prometheus.Histogram + tCodeAndCodeHashProcessing prometheus.Histogram +) + +// Init module initialization +func Init() { + metrics = true + + blocks = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "blocks", + Help: "The total number of processed blocks", + }) + transactions = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "transactions", + Help: "The total number of processed transactions", + }) + receipts = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "receipts", + Help: "The total number of processed receipts", + }) + logs = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "logs", + Help: "The total number of processed logs", + }) + accessListEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "access_list_entries", + Help: "The total number of processed access list entries", + }) + + lenPayloadChan = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "len_payload_chan", + Help: "Current length of publishPayload", + }) + + tPayloadDecode = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_payload_decode", + Help: "Payload decoding time", + }) + tFreePostgres = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_free_postgres", + Help: "Time spent waiting for free postgres tx", + }) + tPostgresCommit = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_postgres_commit", + Help: "Postgres transaction commit duration", + }) + tHeaderProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_header_processing", + Help: "Header processing time", + }) + tUncleProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_uncle_processing", + Help: "Uncle processing time", + }) + tTxAndRecProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_tx_receipt_processing", + Help: "Tx and receipt processing time", + }) + tStateAndStoreProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_state_store_processing", + Help: "State and storage processing time", + }) + tCodeAndCodeHashProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_code_codehash_processing", + Help: "Code and codehash processing time", + }) +} + +// RegisterDBCollector create metric colletor for given connection +func RegisterDBCollector(name string, db *sqlx.DB) { + if metrics { + prometheus.Register(NewDBStatsCollector(name, db)) + } +} + +// BlockInc block counter increment +func BlockInc() { + if metrics { + blocks.Inc() + } +} + +// TransactionInc transaction counter increment +func TransactionInc() { + if metrics { + transactions.Inc() + } +} + +// ReceiptInc receipt counter increment +func ReceiptInc() { + if metrics { + receipts.Inc() + } +} + +// LogInc log counter increment +func LogInc() { + if metrics { + logs.Inc() + } +} + +// AccessListElementInc access list element counter increment +func AccessListElementInc() { + if metrics { + accessListEntries.Inc() + } +} + +// SetLenPayloadChan set chan length +func SetLenPayloadChan(ln int) { + if metrics { + lenPayloadChan.Set(float64(ln)) + } +} + +// SetTimeMetric time metric observation +func SetTimeMetric(name string, t time.Duration) { + if !metrics { + return + } + tAsF64 := t.Seconds() + switch name { + case "t_payload_decode": + tPayloadDecode.Observe(tAsF64) + case "t_free_postgres": + tFreePostgres.Observe(tAsF64) + case "t_postgres_commit": + tPostgresCommit.Observe(tAsF64) + case "t_header_processing": + tHeaderProcessing.Observe(tAsF64) + case "t_uncle_processing": + tUncleProcessing.Observe(tAsF64) + case "t_tx_receipt_processing": + tTxAndRecProcessing.Observe(tAsF64) + case "t_state_store_processing": + tStateAndStoreProcessing.Observe(tAsF64) + case "t_code_codehash_processing": + tCodeAndCodeHashProcessing.Observe(tAsF64) + } +} diff --git a/pkg/prom/web.go b/pkg/prom/web.go new file mode 100644 index 0000000..78fb5a0 --- /dev/null +++ b/pkg/prom/web.go @@ -0,0 +1,40 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package prom + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +// Listen start listening http +func Listen(addr string) *http.Server { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + srv := http.Server{ + Addr: addr, + Handler: mux, + } + go func() { + if err := srv.ListenAndServe(); err != nil { + logrus.Fatal(err) + } + }() + return &srv +} From 5a35f84de35903b831909c1e83011fb2874f6d00 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 21 Oct 2021 12:06:06 -0500 Subject: [PATCH 2/5] cli/cmd updates --- cmd/env.go | 10 ++++++++ cmd/root.go | 26 ++++++++++++++++++++ cmd/serve.go | 51 ++------------------------------------- cmd/util.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++ cmd/write.go | 51 ++------------------------------------- 5 files changed, 107 insertions(+), 98 deletions(-) create mode 100644 cmd/util.go diff --git a/cmd/env.go b/cmd/env.go index 9763de6..934d318 100644 --- a/cmd/env.go +++ b/cmd/env.go @@ -35,6 +35,11 @@ const ( LVLDB_ANCIENT = "LVLDB_ANCIENT" STATEDIFF_WORKERS = "STATEDIFF_WORKERS" WRITE_SERVER = "WRITE_SERVER" + + PROM_METRICS = "PROM_METRICS" + PROM_HTTP = "PROM_HTTP" + PROM_HTTP_ADDR = "PROM_HTTP_ADDR" + PROM_HTTP_PORT = "PROM_HTTP_PORT" ) // Bind env vars for eth node and DB configuration @@ -60,5 +65,10 @@ func init() { viper.BindEnv("leveldb.path", LVLDB_PATH) viper.BindEnv("leveldb.ancient", LVLDB_ANCIENT) + viper.BindEnv("prom.metrics", PROM_METRICS) + viper.BindEnv("prom.http", PROM_HTTP) + viper.BindEnv("prom.httpAddr", PROM_HTTP_ADDR) + viper.BindEnv("prom.httpPort", PROM_HTTP_PORT) + viper.BindEnv("statediff.workers", STATEDIFF_WORKERS) } diff --git a/cmd/root.go b/cmd/root.go index ebb92d6..34e322a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -26,6 +26,8 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" + + "github.com/vulcanize/eth-statediff-service/pkg/prom" ) var ( @@ -64,6 +66,19 @@ func initFuncs(cmd *cobra.Command, args []string) { if err := logLevel(); err != nil { log.Fatal("Could not set log level: ", err) } + + if viper.GetBool("prom.metrics") { + prom.Init() + } + + if viper.GetBool("prom.http") { + addr := fmt.Sprintf( + "%s:%s", + viper.GetString("prom.httpAddr"), + viper.GetString("prom.httpPort"), + ) + prom.Listen(addr) + } } func logLevel() error { @@ -106,9 +121,16 @@ func init() { "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", "eth genesis block hash") rootCmd.PersistentFlags().String("eth-network-id", "1", "eth network id") rootCmd.PersistentFlags().String("eth-chain-id", "1", "eth chain id") + rootCmd.PersistentFlags().Int("cache-db", 1024, "megabytes of memory allocated to database cache") rootCmd.PersistentFlags().Int("cache-trie", 1024, "Megabytes of memory allocated to trie cache") + rootCmd.PersistentFlags().Bool("prom-http", false, "enable prometheus http service") + rootCmd.PersistentFlags().String("prom-http-addr", "127.0.0.1", "prometheus http host") + rootCmd.PersistentFlags().String("prom-http-port", "8080", "prometheus http port") + + rootCmd.PersistentFlags().Bool("metrics", false, "enable metrics") + viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) viper.BindPFlag("statediff.workers", rootCmd.PersistentFlags().Lookup("workers")) @@ -126,6 +148,10 @@ func init() { viper.BindPFlag("ethereum.chainID", rootCmd.PersistentFlags().Lookup("eth-chain-id")) viper.BindPFlag("cache.database", rootCmd.PersistentFlags().Lookup("cache-db")) viper.BindPFlag("cache.trie", rootCmd.PersistentFlags().Lookup("cache-trie")) + viper.BindPFlag("prom.http", rootCmd.PersistentFlags().Lookup("prom-http")) + viper.BindPFlag("prom.httpAddr", rootCmd.PersistentFlags().Lookup("prom-http-addr")) + viper.BindPFlag("prom.httpPort", rootCmd.PersistentFlags().Lookup("prom-http-port")) + viper.BindPFlag("prom.metrics", rootCmd.PersistentFlags().Lookup("metrics")) } func initConfig() { diff --git a/cmd/serve.go b/cmd/serve.go index ba7baf5..c096f1e 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -24,14 +24,10 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/trie" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" - ind "github.com/ethereum/go-ethereum/statediff/indexer" - "github.com/ethereum/go-ethereum/statediff/indexer/postgres" - sd "github.com/vulcanize/eth-statediff-service/pkg" ) @@ -50,52 +46,9 @@ var serveCmd = &cobra.Command{ } func serve() { - logWithCommand.Info("starting statediff RPC service") + logWithCommand.Info("Running eth-statediff-service serve command") - // load params - path := viper.GetString("leveldb.path") - ancientPath := viper.GetString("leveldb.ancient") - if path == "" || ancientPath == "" { - logWithCommand.Fatal("require a valid eth leveldb primary datastore path and ancient datastore path") - } - - nodeInfo := GetEthNodeInfo() - config, err := chainConfig(nodeInfo.ChainID) - if err != nil { - logWithCommand.Fatal(err) - } - - // create leveldb reader - logWithCommand.Info("Creating leveldb reader") - conf := sd.ReaderConfig{ - TrieConfig: &trie.Config{ - Cache: viper.GetInt("cache.trie"), - Journal: "", - Preimages: false, - }, - ChainConfig: config, - Path: path, - AncientPath: ancientPath, - DBCacheSize: viper.GetInt("cache.database"), - } - lvlDBReader, err := sd.NewLvlDBReader(conf) - if err != nil { - logWithCommand.Fatal(err) - } - - // create statediff service - logWithCommand.Info("Creating statediff service") - db, err := postgres.NewDB(postgres.DbConnectionString(GetDBParams()), GetDBConfig(), nodeInfo) - if err != nil { - logWithCommand.Fatal(err) - } - - indexer, err := ind.NewStateDiffIndexer(config, db) - if err != nil { - logWithCommand.Fatal(err) - } - - statediffService, err := sd.NewStateDiffService(lvlDBReader, indexer, viper.GetUint("statediff.workers")) + statediffService, err := createStateDiffService() if err != nil { logWithCommand.Fatal(err) } diff --git a/cmd/util.go b/cmd/util.go new file mode 100644 index 0000000..c3e9d4d --- /dev/null +++ b/cmd/util.go @@ -0,0 +1,67 @@ +package cmd + +import ( + ind "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + "github.com/ethereum/go-ethereum/trie" + "github.com/spf13/viper" + + sd "github.com/vulcanize/eth-statediff-service/pkg" + "github.com/vulcanize/eth-statediff-service/pkg/prom" +) + +func createStateDiffService() (sd.IService, error) { + logWithCommand.Info("Loading statediff service parameters") + path := viper.GetString("leveldb.path") + ancientPath := viper.GetString("leveldb.ancient") + if path == "" || ancientPath == "" { + logWithCommand.Fatal("require a valid eth leveldb primary datastore path and ancient datastore path") + } + + nodeInfo := GetEthNodeInfo() + config, err := chainConfig(nodeInfo.ChainID) + if err != nil { + logWithCommand.Fatal(err) + } + + // create leveldb reader + logWithCommand.Info("Creating leveldb reader") + conf := sd.ReaderConfig{ + TrieConfig: &trie.Config{ + Cache: viper.GetInt("cache.trie"), + Journal: "", + Preimages: false, + }, + ChainConfig: config, + Path: path, + AncientPath: ancientPath, + DBCacheSize: viper.GetInt("cache.database"), + } + lvlDBReader, err := sd.NewLvlDBReader(conf) + if err != nil { + logWithCommand.Fatal(err) + } + + // create statediff service + logWithCommand.Info("Creating statediff service") + db, err := setupPostgres(nodeInfo) + if err != nil { + logWithCommand.Fatal(err) + } + indexer, err := ind.NewStateDiffIndexer(config, db) + if err != nil { + logWithCommand.Fatal(err) + } + return sd.NewStateDiffService(lvlDBReader, indexer, viper.GetUint("statediff.workers")) +} + +func setupPostgres(nodeInfo node.Info) (*postgres.DB, error) { + params := GetDBParams() + db, err := postgres.NewDB(postgres.DbConnectionString(params), GetDBConfig(), nodeInfo) + if err != nil { + return nil, err + } + prom.RegisterDBCollector(params.Name, db.DB) + return db, nil +} diff --git a/cmd/write.go b/cmd/write.go index 2c2c43a..47bee0c 100644 --- a/cmd/write.go +++ b/cmd/write.go @@ -23,14 +23,9 @@ import ( "time" gethsd "github.com/ethereum/go-ethereum/statediff" - ind "github.com/ethereum/go-ethereum/statediff/indexer" - "github.com/ethereum/go-ethereum/statediff/indexer/postgres" - "github.com/ethereum/go-ethereum/trie" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" - - sd "github.com/vulcanize/eth-statediff-service/pkg" ) var writeCmd = &cobra.Command{ @@ -55,53 +50,11 @@ func init() { } func write() { - logWithCommand.Info("Starting statediff writer") - // load params + logWithCommand.Info("Running eth-statediff-service write command") viper.BindEnv("write.serve", WRITE_SERVER) addr := viper.GetString("write.serve") - path := viper.GetString("leveldb.path") - ancientPath := viper.GetString("leveldb.ancient") - if path == "" || ancientPath == "" { - logWithCommand.Fatal("require a valid eth leveldb primary datastore path and ancient datastore path") - } - nodeInfo := GetEthNodeInfo() - config, err := chainConfig(nodeInfo.ChainID) - if err != nil { - logWithCommand.Fatal(err) - } - - // create leveldb reader - logWithCommand.Info("Creating leveldb reader") - conf := sd.ReaderConfig{ - TrieConfig: &trie.Config{ - Cache: viper.GetInt("cache.trie"), - Journal: "", - Preimages: false, - }, - ChainConfig: config, - Path: path, - AncientPath: ancientPath, - DBCacheSize: viper.GetInt("cache.database"), - } - lvlDBReader, err := sd.NewLvlDBReader(conf) - if err != nil { - logWithCommand.Fatal(err) - } - - // create statediff service - logWithCommand.Info("Creating statediff service") - db, err := postgres.NewDB(postgres.DbConnectionString(GetDBParams()), GetDBConfig(), nodeInfo) - if err != nil { - logWithCommand.Fatal(err) - } - - indexer, err := ind.NewStateDiffIndexer(config, db) - if err != nil { - logWithCommand.Fatal(err) - } - - statediffService, err := sd.NewStateDiffService(lvlDBReader, indexer, viper.GetUint("statediff.workers")) + statediffService, err := createStateDiffService() if err != nil { logWithCommand.Fatal(err) } From 072f035a0256315e9ab1c8ece2b4f65d8b858264 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 21 Oct 2021 13:00:30 -0500 Subject: [PATCH 3/5] wire up; customize metrics --- go.mod | 3 +- go.sum | 6 ++ pkg/prom/prom.go | 166 +++++++++++------------------------------------ pkg/service.go | 24 +++++-- 4 files changed, 65 insertions(+), 134 deletions(-) diff --git a/go.mod b/go.mod index 8c5b844..23512d1 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.13 require ( github.com/ethereum/go-ethereum v1.10.9 + github.com/jmoiron/sqlx v1.2.0 + github.com/prometheus/client_golang v1.0.0 github.com/sirupsen/logrus v1.7.0 github.com/spf13/cobra v1.1.1 github.com/spf13/viper v1.7.1 @@ -12,4 +14,3 @@ require ( ) replace github.com/ethereum/go-ethereum v1.10.9 => github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27 - diff --git a/go.sum b/go.sum index a32f64e..d34f251 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,7 @@ github.com/aws/aws-sdk-go-v2/service/sso v1.1.1/go.mod h1:SuZJxklHxLAXgLTc1iFXbE github.com/aws/aws-sdk-go-v2/service/sts v1.1.1/go.mod h1:Wi0EBZwiz/K44YliU0EKxqTCJGUfYTWXrrBwkq736bM= github.com/aws/smithy-go v1.1.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= @@ -364,6 +365,7 @@ github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA= github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= @@ -441,16 +443,20 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= +github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go index 75245cb..dee6167 100644 --- a/pkg/prom/prom.go +++ b/pkg/prom/prom.go @@ -29,107 +29,53 @@ const statsSubsystem = "stats" var ( metrics bool - receipts prometheus.Counter - transactions prometheus.Counter - blocks prometheus.Counter - logs prometheus.Counter - accessListEntries prometheus.Counter + lastLoadedHeight prometheus.Gauge + lastProcessedHeight prometheus.Gauge - lenPayloadChan prometheus.Gauge - - tPayloadDecode prometheus.Histogram - tFreePostgres prometheus.Histogram - tPostgresCommit prometheus.Histogram - tHeaderProcessing prometheus.Histogram - tUncleProcessing prometheus.Histogram - tTxAndRecProcessing prometheus.Histogram - tStateAndStoreProcessing prometheus.Histogram - tCodeAndCodeHashProcessing prometheus.Histogram + tBlockLoad prometheus.Histogram + tBlockProcessing prometheus.Histogram + tStateProcessing prometheus.Histogram + tTxCommit prometheus.Histogram ) // Init module initialization func Init() { metrics = true - blocks = promauto.NewCounter(prometheus.CounterOpts{ + lastLoadedHeight = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, - Name: "blocks", - Help: "The total number of processed blocks", + Name: "loaded_height", + Help: "The last block that was loaded for processing", }) - transactions = promauto.NewCounter(prometheus.CounterOpts{ + lastProcessedHeight = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, - Name: "transactions", - Help: "The total number of processed transactions", - }) - receipts = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "receipts", - Help: "The total number of processed receipts", - }) - logs = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "logs", - Help: "The total number of processed logs", - }) - accessListEntries = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "access_list_entries", - Help: "The total number of processed access list entries", + Name: "processed_height", + Help: "The last block that was processed", }) - lenPayloadChan = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "len_payload_chan", - Help: "Current length of publishPayload", - }) - - tPayloadDecode = promauto.NewHistogram(prometheus.HistogramOpts{ + tBlockLoad = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: statsSubsystem, - Name: "t_payload_decode", - Help: "Payload decoding time", + Name: "t_block_load", + Help: "Block loading time", }) - tFreePostgres = promauto.NewHistogram(prometheus.HistogramOpts{ + tBlockProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: statsSubsystem, - Name: "t_free_postgres", - Help: "Time spent waiting for free postgres tx", + Name: "t_block_processing", + Help: "Block (header, uncles, txs, rcts, tx trie, rct trie) processing time", }) - tPostgresCommit = promauto.NewHistogram(prometheus.HistogramOpts{ + tStateProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: statsSubsystem, - Name: "t_postgres_commit", - Help: "Postgres transaction commit duration", + Name: "t_state_processing", + Help: "State (state trie, storage tries, and code) processing time", }) - tHeaderProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + tTxCommit = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: statsSubsystem, - Name: "t_header_processing", - Help: "Header processing time", - }) - tUncleProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_uncle_processing", - Help: "Uncle processing time", - }) - tTxAndRecProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_tx_receipt_processing", - Help: "Tx and receipt processing time", - }) - tStateAndStoreProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_state_store_processing", - Help: "State and storage processing time", - }) - tCodeAndCodeHashProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_code_codehash_processing", - Help: "Code and codehash processing time", + Name: "t_postgres_tx_commit", + Help: "Postgres tx commit time", }) } @@ -140,45 +86,17 @@ func RegisterDBCollector(name string, db *sqlx.DB) { } } -// BlockInc block counter increment -func BlockInc() { +// SetLastLoadedHeight sets last loaded height +func SetLastLoadedHeight(height int64) { if metrics { - blocks.Inc() + lastLoadedHeight.Set(float64(height)) } } -// TransactionInc transaction counter increment -func TransactionInc() { +// SetLastProcessedHeight sets last processed height +func SetLastProcessedHeight(height int64) { if metrics { - transactions.Inc() - } -} - -// ReceiptInc receipt counter increment -func ReceiptInc() { - if metrics { - receipts.Inc() - } -} - -// LogInc log counter increment -func LogInc() { - if metrics { - logs.Inc() - } -} - -// AccessListElementInc access list element counter increment -func AccessListElementInc() { - if metrics { - accessListEntries.Inc() - } -} - -// SetLenPayloadChan set chan length -func SetLenPayloadChan(ln int) { - if metrics { - lenPayloadChan.Set(float64(ln)) + lastProcessedHeight.Set(float64(height)) } } @@ -189,21 +107,13 @@ func SetTimeMetric(name string, t time.Duration) { } tAsF64 := t.Seconds() switch name { - case "t_payload_decode": - tPayloadDecode.Observe(tAsF64) - case "t_free_postgres": - tFreePostgres.Observe(tAsF64) - case "t_postgres_commit": - tPostgresCommit.Observe(tAsF64) - case "t_header_processing": - tHeaderProcessing.Observe(tAsF64) - case "t_uncle_processing": - tUncleProcessing.Observe(tAsF64) - case "t_tx_receipt_processing": - tTxAndRecProcessing.Observe(tAsF64) - case "t_state_store_processing": - tStateAndStoreProcessing.Observe(tAsF64) - case "t_code_codehash_processing": - tCodeAndCodeHashProcessing.Observe(tAsF64) + case "t_block_load": + tBlockLoad.Observe(tAsF64) + case "t_block_processing": + tBlockProcessing.Observe(tAsF64) + case "t_state_processing": + tStateProcessing.Observe(tAsF64) + case "t_postgres_tx_commit": + tTxCommit.Observe(tAsF64) } } diff --git a/pkg/service.go b/pkg/service.go index 8ba3f55..00f02dd 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -20,6 +20,7 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -31,6 +32,7 @@ import ( sd "github.com/ethereum/go-ethereum/statediff" sdtypes "github.com/ethereum/go-ethereum/statediff/types" "github.com/sirupsen/logrus" + "github.com/vulcanize/eth-statediff-service/pkg/prom" ind "github.com/ethereum/go-ethereum/statediff/indexer" ) @@ -251,6 +253,7 @@ func (sds *Service) Stop() error { // for historical data func (sds *Service) WriteStateDiffAt(blockNumber uint64, params sd.Params) error { logrus.Info(fmt.Sprintf("Writing state diff at block %d", blockNumber)) + t := time.Now() currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) if err != nil { return err @@ -263,7 +266,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params sd.Params) error } parentRoot = parentBlock.Root() } - return sds.writeStateDiff(currentBlock, parentRoot, params) + return sds.writeStateDiff(currentBlock, parentRoot, params, t) } // WriteStateDiffFor writes a state diff for the specific blockHash directly to the database @@ -271,6 +274,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params sd.Params) error // for historical data func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params sd.Params) error { logrus.Info(fmt.Sprintf("Writing state diff for block %s", blockHash.Hex())) + t := time.Now() currentBlock, err := sds.lvlDBReader.GetBlockByHash(blockHash) if err != nil { return err @@ -283,11 +287,11 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params sd.Params) e } parentRoot = parentBlock.Root() } - return sds.writeStateDiff(currentBlock, parentRoot, params) + return sds.writeStateDiff(currentBlock, parentRoot, params, t) } // Writes a state diff from the current block, parent state root, and provided params -func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params sd.Params) error { +func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params sd.Params, t time.Time) error { var totalDifficulty *big.Int var receipts types.Receipts var err error @@ -303,7 +307,10 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p if err != nil { return err } - + height := block.Number().Int64() + prom.SetLastLoadedHeight(height) + prom.SetTimeMetric("t_block_load", time.Now().Sub(t)) + t = time.Now() tx, err := sds.indexer.PushBlock(block, receipts, totalDifficulty) if err != nil { return err @@ -315,9 +322,16 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p codeOutput := func(c sdtypes.CodeAndCodeHash) error { return sds.indexer.PushCodeAndCodeHash(tx, c) } + prom.SetTimeMetric("t_block_processing", time.Now().Sub(t)) + t = time.Now() err = sds.Builder.WriteStateDiffObject(sd.StateRoots{ NewStateRoot: block.Root(), OldStateRoot: parentRoot, }, params, output, codeOutput) - return tx.Close(err) + prom.SetTimeMetric("t_state_processing", time.Now().Sub(t)) + t = time.Now() + err = tx.Close(err) + prom.SetLastProcessedHeight(height) + prom.SetTimeMetric("t_postgres_tx_commit", time.Now().Sub(t)) + return err } From ca9d2a7bec11640dd615481eac355ba429d5a3de Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 21 Oct 2021 13:10:16 -0500 Subject: [PATCH 4/5] update config --- environments/example.toml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/environments/example.toml b/environments/example.toml index 940a48a..995cdd5 100644 --- a/environments/example.toml +++ b/environments/example.toml @@ -32,3 +32,9 @@ [cache] database = 1024 trie = 1024 + +[prom] + metrics = true + http = true + addr = "localhost" + port = "8889" From 7dc6c52dec2bacbf06d259c376aca2cb5141bbbe Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 21 Oct 2021 13:29:30 -0500 Subject: [PATCH 5/5] fix type --- monitoring/prometheus.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml index 033137d..00ca7f5 100644 --- a/monitoring/prometheus.yml +++ b/monitoring/prometheus.yml @@ -2,6 +2,6 @@ global: scrape_interval: 10s scrape_configs: - - job_name: 'eth-statediiff-service' + - job_name: 'eth-statediff-service' static_configs: - targets: ['localhost:8100']