Statediff Prometheus metrics refactor #39
@ -14,6 +14,8 @@
|
|||||||
// You should have received a copy of the GNU Affero General Public License
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
// This package provides an interface for pushing and indexing IPLD objects into a Postgres database
|
||||||
|
// Metrics for reporting processing and connection stats are defined in ./metrics.go
|
||||||
package indexer
|
package indexer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -26,6 +28,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
|
||||||
@ -36,16 +39,21 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
||||||
|
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
|
||||||
|
)
|
||||||
|
|
||||||
// Indexer interface to allow substitution of mocks for testing
|
// Indexer interface to allow substitution of mocks for testing
|
||||||
type Indexer interface {
|
type Indexer interface {
|
||||||
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
|
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
|
||||||
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
|
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
|
||||||
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
||||||
|
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
|
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
|
||||||
@ -70,6 +78,25 @@ type BlockTx struct {
|
|||||||
Close func() error
|
Close func() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reporting function to run as goroutine
|
||||||
|
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
|
||||||
|
if !metrics.Enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ticker := time.NewTicker(delay)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
dbMetrics.Update(sdi.dbWriter.db.Stats())
|
||||||
|
case <-quit:
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
||||||
@ -109,12 +136,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
panic(p)
|
panic(p)
|
||||||
} else {
|
} else {
|
||||||
tDiff := time.Now().Sub(t)
|
tDiff := time.Now().Sub(t)
|
||||||
prom.SetTimeMetric("t_state_store_code_processing", tDiff)
|
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
tDiff = time.Now().Sub(t)
|
tDiff = time.Now().Sub(t)
|
||||||
prom.SetTimeMetric("t_postgres_commit", tDiff)
|
indexerMetrics.tPostgresCommit.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||||
}
|
}
|
||||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Now().Sub(start).String())
|
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Now().Sub(start).String())
|
||||||
@ -123,7 +150,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
tDiff := time.Now().Sub(t)
|
tDiff := time.Now().Sub(t)
|
||||||
prom.SetTimeMetric("t_free_postgres", tDiff)
|
indexerMetrics.tFreePostgres.Update(tDiff)
|
||||||
|
|
||||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
|
|
||||||
@ -133,7 +161,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tDiff = time.Now().Sub(t)
|
tDiff = time.Now().Sub(t)
|
||||||
prom.SetTimeMetric("t_header_processing", tDiff)
|
indexerMetrics.tHeaderProcessing.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
// Publish and index uncles
|
// Publish and index uncles
|
||||||
@ -141,7 +169,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tDiff = time.Now().Sub(t)
|
tDiff = time.Now().Sub(t)
|
||||||
prom.SetTimeMetric("t_uncle_processing", tDiff)
|
indexerMetrics.tUncleProcessing.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
// Publish and index receipts and txs
|
// Publish and index receipts and txs
|
||||||
@ -158,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tDiff = time.Now().Sub(t)
|
tDiff = time.Now().Sub(t)
|
||||||
prom.SetTimeMetric("t_tx_receipt_processing", tDiff)
|
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
|
|
||||||
|
124
statediff/indexer/metrics.go
Normal file
124
statediff/indexer/metrics.go
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
package indexer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
indexerNamespace = "indexer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Build a fully qualified metric name
|
||||||
|
func metricName(subsystem, name string) string {
|
||||||
|
if name == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
parts := []string{indexerNamespace, name}
|
||||||
|
if subsystem != "" {
|
||||||
|
parts = []string{indexerNamespace, subsystem, name}
|
||||||
|
}
|
||||||
|
// Prometheus uses _ but geth metrics uses / and replaces
|
||||||
|
return strings.Join(parts, "/")
|
||||||
|
}
|
||||||
|
|
||||||
|
type indexerMetricsHandles struct {
|
||||||
|
// The total number of processed blocks
|
||||||
|
blocks metrics.Counter
|
||||||
|
// The total number of processed transactions
|
||||||
|
transactions metrics.Counter
|
||||||
|
// The total number of processed receipts
|
||||||
|
receipts metrics.Counter
|
||||||
|
// Time spent waiting for free postgres tx
|
||||||
|
tFreePostgres metrics.Timer
|
||||||
|
// Postgres transaction commit duration
|
||||||
|
tPostgresCommit metrics.Timer
|
||||||
|
// Header processing time
|
||||||
|
tHeaderProcessing metrics.Timer
|
||||||
|
// Uncle processing time
|
||||||
|
tUncleProcessing metrics.Timer
|
||||||
|
// Tx and receipt processing time
|
||||||
|
tTxAndRecProcessing metrics.Timer
|
||||||
|
// State, storage, and code combined processing time
|
||||||
|
tStateStoreCodeProcessing metrics.Timer
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
||||||
|
ctx := indexerMetricsHandles{
|
||||||
|
blocks: metrics.NewCounter(),
|
||||||
|
transactions: metrics.NewCounter(),
|
||||||
|
receipts: metrics.NewCounter(),
|
||||||
|
tFreePostgres: metrics.NewTimer(),
|
||||||
|
tPostgresCommit: metrics.NewTimer(),
|
||||||
|
tHeaderProcessing: metrics.NewTimer(),
|
||||||
|
tUncleProcessing: metrics.NewTimer(),
|
||||||
|
tTxAndRecProcessing: metrics.NewTimer(),
|
||||||
|
tStateStoreCodeProcessing: metrics.NewTimer(),
|
||||||
|
}
|
||||||
|
subsys := "" // todo
|
||||||
|
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
||||||
|
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
||||||
|
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
||||||
|
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
|
||||||
|
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
|
||||||
|
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
|
||||||
|
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
|
||||||
|
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
|
||||||
|
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
type dbMetricsHandles struct {
|
||||||
|
// Maximum number of open connections to the database
|
||||||
|
maxOpen metrics.Gauge
|
||||||
|
// The number of established connections both in use and idle
|
||||||
|
open metrics.Gauge
|
||||||
|
// The number of connections currently in use
|
||||||
|
inUse metrics.Gauge
|
||||||
|
// The number of idle connections
|
||||||
|
idle metrics.Gauge
|
||||||
|
// The total number of connections waited for
|
||||||
|
waitedFor metrics.Counter
|
||||||
|
// The total time blocked waiting for a new connection
|
||||||
|
blockedMilliseconds metrics.Counter
|
||||||
|
// The total number of connections closed due to SetMaxIdleConns
|
||||||
|
closedMaxIdle metrics.Counter
|
||||||
|
// The total number of connections closed due to SetConnMaxLifetime
|
||||||
|
closedMaxLifetime metrics.Counter
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
|
||||||
|
ctx := dbMetricsHandles{
|
||||||
|
maxOpen: metrics.NewGauge(),
|
||||||
|
open: metrics.NewGauge(),
|
||||||
|
inUse: metrics.NewGauge(),
|
||||||
|
idle: metrics.NewGauge(),
|
||||||
|
waitedFor: metrics.NewCounter(),
|
||||||
|
blockedMilliseconds: metrics.NewCounter(),
|
||||||
|
closedMaxIdle: metrics.NewCounter(),
|
||||||
|
closedMaxLifetime: metrics.NewCounter(),
|
||||||
|
}
|
||||||
|
subsys := "connections"
|
||||||
|
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
|
||||||
|
reg.Register(metricName(subsys, "open"), ctx.open)
|
||||||
|
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
|
||||||
|
reg.Register(metricName(subsys, "idle"), ctx.idle)
|
||||||
|
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
|
||||||
|
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
|
||||||
|
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
|
||||||
|
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (met *dbMetricsHandles) Update(stats sql.DBStats) {
|
||||||
|
met.maxOpen.Update(int64(stats.MaxOpenConnections))
|
||||||
|
met.open.Update(int64(stats.OpenConnections))
|
||||||
|
met.inUse.Update(int64(stats.InUse))
|
||||||
|
met.idle.Update(int64(stats.Idle))
|
||||||
|
met.waitedFor.Inc(int64(stats.WaitCount))
|
||||||
|
met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds()))
|
||||||
|
met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed))
|
||||||
|
met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed))
|
||||||
|
}
|
@ -1,146 +0,0 @@
|
|||||||
package prom
|
|
||||||
|
|
||||||
import (
|
|
||||||
"database/sql"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
namespace = "ipld_eth_indexer"
|
|
||||||
subsystem = "connections"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DBStatsGetter is an interface that gets sql.DBStats.
|
|
||||||
type DBStatsGetter interface {
|
|
||||||
Stats() sql.DBStats
|
|
||||||
}
|
|
||||||
|
|
||||||
// DBStatsCollector implements the prometheus.Collector interface.
|
|
||||||
type DBStatsCollector struct {
|
|
||||||
sg DBStatsGetter
|
|
||||||
|
|
||||||
// descriptions of exported metrics
|
|
||||||
maxOpenDesc *prometheus.Desc
|
|
||||||
openDesc *prometheus.Desc
|
|
||||||
inUseDesc *prometheus.Desc
|
|
||||||
idleDesc *prometheus.Desc
|
|
||||||
waitedForDesc *prometheus.Desc
|
|
||||||
blockedSecondsDesc *prometheus.Desc
|
|
||||||
closedMaxIdleDesc *prometheus.Desc
|
|
||||||
closedMaxLifetimeDesc *prometheus.Desc
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDBStatsCollector creates a new DBStatsCollector.
|
|
||||||
func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector {
|
|
||||||
labels := prometheus.Labels{"db_name": dbName}
|
|
||||||
return &DBStatsCollector{
|
|
||||||
sg: sg,
|
|
||||||
maxOpenDesc: prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "max_open"),
|
|
||||||
"Maximum number of open connections to the database.",
|
|
||||||
nil,
|
|
||||||
labels,
|
|
||||||
),
|
|
||||||
openDesc: prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "open"),
|
|
||||||
"The number of established connections both in use and idle.",
|
|
||||||
nil,
|
|
||||||
labels,
|
|
||||||
),
|
|
||||||
inUseDesc: prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "in_use"),
|
|
||||||
"The number of connections currently in use.",
|
|
||||||
nil,
|
|
||||||
labels,
|
|
||||||
),
|
|
||||||
idleDesc: prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "idle"),
|
|
||||||
"The number of idle connections.",
|
|
||||||
nil,
|
|
||||||
labels,
|
|
||||||
),
|
|
||||||
waitedForDesc: prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "waited_for"),
|
|
||||||
"The total number of connections waited for.",
|
|
||||||
nil,
|
|
||||||
labels,
|
|
||||||
),
|
|
||||||
blockedSecondsDesc: prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"),
|
|
||||||
"The total time blocked waiting for a new connection.",
|
|
||||||
nil,
|
|
||||||
labels,
|
|
||||||
),
|
|
||||||
closedMaxIdleDesc: prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"),
|
|
||||||
"The total number of connections closed due to SetMaxIdleConns.",
|
|
||||||
nil,
|
|
||||||
labels,
|
|
||||||
),
|
|
||||||
closedMaxLifetimeDesc: prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"),
|
|
||||||
"The total number of connections closed due to SetConnMaxLifetime.",
|
|
||||||
nil,
|
|
||||||
labels,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Describe implements the prometheus.Collector interface.
|
|
||||||
func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) {
|
|
||||||
ch <- c.maxOpenDesc
|
|
||||||
ch <- c.openDesc
|
|
||||||
ch <- c.inUseDesc
|
|
||||||
ch <- c.idleDesc
|
|
||||||
ch <- c.waitedForDesc
|
|
||||||
ch <- c.blockedSecondsDesc
|
|
||||||
ch <- c.closedMaxIdleDesc
|
|
||||||
ch <- c.closedMaxLifetimeDesc
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect implements the prometheus.Collector interface.
|
|
||||||
func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) {
|
|
||||||
stats := c.sg.Stats()
|
|
||||||
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
c.maxOpenDesc,
|
|
||||||
prometheus.GaugeValue,
|
|
||||||
float64(stats.MaxOpenConnections),
|
|
||||||
)
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
c.openDesc,
|
|
||||||
prometheus.GaugeValue,
|
|
||||||
float64(stats.OpenConnections),
|
|
||||||
)
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
c.inUseDesc,
|
|
||||||
prometheus.GaugeValue,
|
|
||||||
float64(stats.InUse),
|
|
||||||
)
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
c.idleDesc,
|
|
||||||
prometheus.GaugeValue,
|
|
||||||
float64(stats.Idle),
|
|
||||||
)
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
c.waitedForDesc,
|
|
||||||
prometheus.CounterValue,
|
|
||||||
float64(stats.WaitCount),
|
|
||||||
)
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
c.blockedSecondsDesc,
|
|
||||||
prometheus.CounterValue,
|
|
||||||
stats.WaitDuration.Seconds(),
|
|
||||||
)
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
c.closedMaxIdleDesc,
|
|
||||||
prometheus.CounterValue,
|
|
||||||
float64(stats.MaxIdleClosed),
|
|
||||||
)
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
|
||||||
c.closedMaxLifetimeDesc,
|
|
||||||
prometheus.CounterValue,
|
|
||||||
float64(stats.MaxLifetimeClosed),
|
|
||||||
)
|
|
||||||
}
|
|
@ -1,151 +0,0 @@
|
|||||||
package prom
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
||||||
)
|
|
||||||
|
|
||||||
const statsSubsystem = "stats"
|
|
||||||
|
|
||||||
var (
|
|
||||||
metrics bool
|
|
||||||
|
|
||||||
receipts prometheus.Counter
|
|
||||||
transactions prometheus.Counter
|
|
||||||
blocks prometheus.Counter
|
|
||||||
|
|
||||||
lenPayloadChan prometheus.Gauge
|
|
||||||
|
|
||||||
tPayloadDecode prometheus.Histogram
|
|
||||||
tFreePostgres prometheus.Histogram
|
|
||||||
tPostgresCommit prometheus.Histogram
|
|
||||||
tHeaderProcessing prometheus.Histogram
|
|
||||||
tUncleProcessing prometheus.Histogram
|
|
||||||
tTxAndRecProcessing prometheus.Histogram
|
|
||||||
tStateAndStoreProcessing prometheus.Histogram
|
|
||||||
tCodeAndCodeHashProcessing prometheus.Histogram
|
|
||||||
)
|
|
||||||
|
|
||||||
// Init module initialization
|
|
||||||
func Init() {
|
|
||||||
metrics = true
|
|
||||||
|
|
||||||
blocks = promauto.NewCounter(prometheus.CounterOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Name: "blocks",
|
|
||||||
Help: "The total number of processed blocks",
|
|
||||||
})
|
|
||||||
transactions = promauto.NewCounter(prometheus.CounterOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Name: "transactions",
|
|
||||||
Help: "The total number of processed transactions",
|
|
||||||
})
|
|
||||||
receipts = promauto.NewCounter(prometheus.CounterOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Name: "receipts",
|
|
||||||
Help: "The total number of processed receipts",
|
|
||||||
})
|
|
||||||
|
|
||||||
lenPayloadChan = promauto.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Name: "len_payload_chan",
|
|
||||||
Help: "Current length of publishPayload",
|
|
||||||
})
|
|
||||||
|
|
||||||
tFreePostgres = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: statsSubsystem,
|
|
||||||
Name: "t_free_postgres",
|
|
||||||
Help: "Time spent waiting for free postgres tx",
|
|
||||||
})
|
|
||||||
tPostgresCommit = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: statsSubsystem,
|
|
||||||
Name: "t_postgres_commit",
|
|
||||||
Help: "Postgres transaction commit duration",
|
|
||||||
})
|
|
||||||
tHeaderProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: statsSubsystem,
|
|
||||||
Name: "t_header_processing",
|
|
||||||
Help: "Header processing time",
|
|
||||||
})
|
|
||||||
tUncleProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: statsSubsystem,
|
|
||||||
Name: "t_uncle_processing",
|
|
||||||
Help: "Uncle processing time",
|
|
||||||
})
|
|
||||||
tTxAndRecProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: statsSubsystem,
|
|
||||||
Name: "t_tx_receipt_processing",
|
|
||||||
Help: "Tx and receipt processing time",
|
|
||||||
})
|
|
||||||
tStateAndStoreProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: statsSubsystem,
|
|
||||||
Name: "t_state_store_code_processing",
|
|
||||||
Help: "State, storage, and code combinedprocessing time",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterDBCollector create metric colletor for given connection
|
|
||||||
func RegisterDBCollector(name string, db *sqlx.DB) {
|
|
||||||
if metrics {
|
|
||||||
prometheus.Register(NewDBStatsCollector(name, db))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// BlockInc block counter increment
|
|
||||||
func BlockInc() {
|
|
||||||
if metrics {
|
|
||||||
blocks.Inc()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TransactionInc transaction counter increment
|
|
||||||
func TransactionInc() {
|
|
||||||
if metrics {
|
|
||||||
transactions.Inc()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReceiptInc receipt counter increment
|
|
||||||
func ReceiptInc() {
|
|
||||||
if metrics {
|
|
||||||
receipts.Inc()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLenPayloadChan set chan length
|
|
||||||
func SetLenPayloadChan(ln int) {
|
|
||||||
if metrics {
|
|
||||||
lenPayloadChan.Set(float64(ln))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetTimeMetric time metric observation
|
|
||||||
func SetTimeMetric(name string, t time.Duration) {
|
|
||||||
if !metrics {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
tAsF64 := t.Seconds()
|
|
||||||
switch name {
|
|
||||||
case "t_free_postgres":
|
|
||||||
tFreePostgres.Observe(tAsF64)
|
|
||||||
case "t_postgres_commit":
|
|
||||||
tPostgresCommit.Observe(tAsF64)
|
|
||||||
case "t_header_processing":
|
|
||||||
tHeaderProcessing.Observe(tAsF64)
|
|
||||||
case "t_uncle_processing":
|
|
||||||
tUncleProcessing.Observe(tAsF64)
|
|
||||||
case "t_tx_receipt_processing":
|
|
||||||
tTxAndRecProcessing.Observe(tAsF64)
|
|
||||||
case "t_state_store_code_processing":
|
|
||||||
tStateAndStoreProcessing.Observe(tAsF64)
|
|
||||||
}
|
|
||||||
}
|
|
@ -17,12 +17,11 @@
|
|||||||
package indexer
|
package indexer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -30,7 +29,7 @@ var (
|
|||||||
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
|
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Indexer satisfies the Indexer interface for ethereum
|
// Handles processing and writing of indexed IPLD objects to Postgres
|
||||||
type PostgresCIDWriter struct {
|
type PostgresCIDWriter struct {
|
||||||
db *postgres.DB
|
db *postgres.DB
|
||||||
}
|
}
|
||||||
@ -51,7 +50,7 @@ func (in *PostgresCIDWriter) upsertHeaderCID(tx *sqlx.Tx, header models.HeaderMo
|
|||||||
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
||||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID)
|
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
prom.BlockInc()
|
indexerMetrics.blocks.Inc(1)
|
||||||
}
|
}
|
||||||
return headerID, err
|
return headerID, err
|
||||||
}
|
}
|
||||||
@ -73,7 +72,7 @@ func (in *PostgresCIDWriter) upsertTransactionAndReceiptCIDs(tx *sqlx.Tx, payloa
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
prom.TransactionInc()
|
indexerMetrics.transactions.Inc(1)
|
||||||
receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)]
|
receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)]
|
||||||
if ok {
|
if ok {
|
||||||
if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil {
|
if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil {
|
||||||
@ -91,7 +90,7 @@ func (in *PostgresCIDWriter) upsertTransactionCID(tx *sqlx.Tx, transaction model
|
|||||||
RETURNING id`,
|
RETURNING id`,
|
||||||
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID)
|
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
prom.TransactionInc()
|
indexerMetrics.transactions.Inc(1)
|
||||||
}
|
}
|
||||||
return txID, err
|
return txID, err
|
||||||
}
|
}
|
||||||
@ -101,7 +100,7 @@ func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct models.ReceiptMod
|
|||||||
ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) = ($2, $3, $4, $5, $6, $7, $8, $9, $10)`,
|
ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) = ($2, $3, $4, $5, $6, $7, $8, $9, $10)`,
|
||||||
txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey)
|
txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
prom.ReceiptInc()
|
indexerMetrics.receipts.Inc(1)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
@ -40,7 +41,6 @@ import (
|
|||||||
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
||||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
|
|
||||||
. "github.com/ethereum/go-ethereum/statediff/types"
|
. "github.com/ethereum/go-ethereum/statediff/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -74,7 +74,7 @@ type IService interface {
|
|||||||
// Main event loop for processing state diffs
|
// Main event loop for processing state diffs
|
||||||
Loop(chainEventCh chan core.ChainEvent)
|
Loop(chainEventCh chan core.ChainEvent)
|
||||||
// Method to subscribe to receive state diff processing output
|
// Method to subscribe to receive state diff processing output
|
||||||
Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params)
|
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
|
||||||
// Method to unsubscribe from state diff processing
|
// Method to unsubscribe from state diff processing
|
||||||
Unsubscribe(id rpc.ID) error
|
Unsubscribe(id rpc.ID) error
|
||||||
// Method to get state diff object at specific block
|
// Method to get state diff object at specific block
|
||||||
@ -139,7 +139,6 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit
|
|||||||
}
|
}
|
||||||
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
|
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
|
||||||
}
|
}
|
||||||
prom.Init()
|
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
@ -416,6 +415,7 @@ func (sds *Service) Start() error {
|
|||||||
if sds.enableWriteLoop {
|
if sds.enableWriteLoop {
|
||||||
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
|
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
|
||||||
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
|
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
|
||||||
|
go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -521,7 +521,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
|||||||
|
|
||||||
// Writes a state diff from the current block, parent state root, and provided params
|
// Writes a state diff from the current block, parent state root, and provided params
|
||||||
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||||
log.Info("writing state diff", "block height", block.Number().Uint64())
|
log.Info("Writing state diff", "block height", block.Number().Uint64())
|
||||||
var totalDifficulty *big.Int
|
var totalDifficulty *big.Int
|
||||||
var receipts types.Receipts
|
var receipts types.Receipts
|
||||||
if params.IncludeTD {
|
if params.IncludeTD {
|
||||||
|
Loading…
Reference in New Issue
Block a user