Statediff Prometheus metrics refactor #39
@ -14,6 +14,8 @@
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// This package provides an interface for pushing and indexing IPLD objects into a Postgres database
|
||||
// Metrics for reporting processing and connection stats are defined in ./metrics.go
|
||||
package indexer
|
||||
|
||||
import (
|
||||
@ -26,6 +28,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
|
||||
@ -36,16 +39,21 @@ import (
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
var (
|
||||
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
||||
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
|
||||
)
|
||||
|
||||
// Indexer interface to allow substitution of mocks for testing
|
||||
type Indexer interface {
|
||||
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
|
||||
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
|
||||
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
||||
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
||||
}
|
||||
|
||||
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
|
||||
@ -70,6 +78,25 @@ type BlockTx struct {
|
||||
Close func() error
|
||||
}
|
||||
|
||||
// Reporting function to run as goroutine
|
||||
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
|
||||
if !metrics.Enabled {
|
||||
return
|
||||
}
|
||||
ticker := time.NewTicker(delay)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
dbMetrics.Update(sdi.dbWriter.db.Stats())
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
||||
@ -109,12 +136,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
panic(p)
|
||||
} else {
|
||||
tDiff := time.Now().Sub(t)
|
||||
prom.SetTimeMetric("t_state_store_code_processing", tDiff)
|
||||
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
err = tx.Commit()
|
||||
tDiff = time.Now().Sub(t)
|
||||
prom.SetTimeMetric("t_postgres_commit", tDiff)
|
||||
indexerMetrics.tPostgresCommit.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
}
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Now().Sub(start).String())
|
||||
@ -123,7 +150,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
},
|
||||
}
|
||||
tDiff := time.Now().Sub(t)
|
||||
prom.SetTimeMetric("t_free_postgres", tDiff)
|
||||
indexerMetrics.tFreePostgres.Update(tDiff)
|
||||
|
||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
@ -133,7 +161,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Now().Sub(t)
|
||||
prom.SetTimeMetric("t_header_processing", tDiff)
|
||||
indexerMetrics.tHeaderProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
// Publish and index uncles
|
||||
@ -141,7 +169,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Now().Sub(t)
|
||||
prom.SetTimeMetric("t_uncle_processing", tDiff)
|
||||
indexerMetrics.tUncleProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
// Publish and index receipts and txs
|
||||
@ -158,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Now().Sub(t)
|
||||
prom.SetTimeMetric("t_tx_receipt_processing", tDiff)
|
||||
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
|
124
statediff/indexer/metrics.go
Normal file
124
statediff/indexer/metrics.go
Normal file
@ -0,0 +1,124 @@
|
||||
package indexer
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
indexerNamespace = "indexer"
|
||||
)
|
||||
|
||||
// Build a fully qualified metric name
|
||||
func metricName(subsystem, name string) string {
|
||||
if name == "" {
|
||||
return ""
|
||||
}
|
||||
parts := []string{indexerNamespace, name}
|
||||
if subsystem != "" {
|
||||
parts = []string{indexerNamespace, subsystem, name}
|
||||
}
|
||||
// Prometheus uses _ but geth metrics uses / and replaces
|
||||
return strings.Join(parts, "/")
|
||||
}
|
||||
|
||||
type indexerMetricsHandles struct {
|
||||
// The total number of processed blocks
|
||||
blocks metrics.Counter
|
||||
// The total number of processed transactions
|
||||
transactions metrics.Counter
|
||||
// The total number of processed receipts
|
||||
receipts metrics.Counter
|
||||
// Time spent waiting for free postgres tx
|
||||
tFreePostgres metrics.Timer
|
||||
// Postgres transaction commit duration
|
||||
tPostgresCommit metrics.Timer
|
||||
// Header processing time
|
||||
tHeaderProcessing metrics.Timer
|
||||
// Uncle processing time
|
||||
tUncleProcessing metrics.Timer
|
||||
// Tx and receipt processing time
|
||||
tTxAndRecProcessing metrics.Timer
|
||||
// State, storage, and code combined processing time
|
||||
tStateStoreCodeProcessing metrics.Timer
|
||||
}
|
||||
|
||||
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
||||
ctx := indexerMetricsHandles{
|
||||
blocks: metrics.NewCounter(),
|
||||
transactions: metrics.NewCounter(),
|
||||
receipts: metrics.NewCounter(),
|
||||
tFreePostgres: metrics.NewTimer(),
|
||||
tPostgresCommit: metrics.NewTimer(),
|
||||
tHeaderProcessing: metrics.NewTimer(),
|
||||
tUncleProcessing: metrics.NewTimer(),
|
||||
tTxAndRecProcessing: metrics.NewTimer(),
|
||||
tStateStoreCodeProcessing: metrics.NewTimer(),
|
||||
}
|
||||
subsys := "" // todo
|
||||
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
||||
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
||||
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
||||
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
|
||||
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
|
||||
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
|
||||
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
|
||||
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
|
||||
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
|
||||
return ctx
|
||||
}
|
||||
|
||||
type dbMetricsHandles struct {
|
||||
// Maximum number of open connections to the database
|
||||
maxOpen metrics.Gauge
|
||||
// The number of established connections both in use and idle
|
||||
open metrics.Gauge
|
||||
// The number of connections currently in use
|
||||
inUse metrics.Gauge
|
||||
// The number of idle connections
|
||||
idle metrics.Gauge
|
||||
// The total number of connections waited for
|
||||
waitedFor metrics.Counter
|
||||
// The total time blocked waiting for a new connection
|
||||
blockedMilliseconds metrics.Counter
|
||||
// The total number of connections closed due to SetMaxIdleConns
|
||||
closedMaxIdle metrics.Counter
|
||||
// The total number of connections closed due to SetConnMaxLifetime
|
||||
closedMaxLifetime metrics.Counter
|
||||
}
|
||||
|
||||
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
|
||||
ctx := dbMetricsHandles{
|
||||
maxOpen: metrics.NewGauge(),
|
||||
open: metrics.NewGauge(),
|
||||
inUse: metrics.NewGauge(),
|
||||
idle: metrics.NewGauge(),
|
||||
waitedFor: metrics.NewCounter(),
|
||||
blockedMilliseconds: metrics.NewCounter(),
|
||||
closedMaxIdle: metrics.NewCounter(),
|
||||
closedMaxLifetime: metrics.NewCounter(),
|
||||
}
|
||||
subsys := "connections"
|
||||
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
|
||||
reg.Register(metricName(subsys, "open"), ctx.open)
|
||||
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
|
||||
reg.Register(metricName(subsys, "idle"), ctx.idle)
|
||||
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
|
||||
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
|
||||
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
|
||||
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (met *dbMetricsHandles) Update(stats sql.DBStats) {
|
||||
met.maxOpen.Update(int64(stats.MaxOpenConnections))
|
||||
met.open.Update(int64(stats.OpenConnections))
|
||||
met.inUse.Update(int64(stats.InUse))
|
||||
met.idle.Update(int64(stats.Idle))
|
||||
met.waitedFor.Inc(int64(stats.WaitCount))
|
||||
met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds()))
|
||||
met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed))
|
||||
met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed))
|
||||
}
|
@ -1,146 +0,0 @@
|
||||
package prom
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "ipld_eth_indexer"
|
||||
subsystem = "connections"
|
||||
)
|
||||
|
||||
// DBStatsGetter is an interface that gets sql.DBStats.
|
||||
type DBStatsGetter interface {
|
||||
Stats() sql.DBStats
|
||||
}
|
||||
|
||||
// DBStatsCollector implements the prometheus.Collector interface.
|
||||
type DBStatsCollector struct {
|
||||
sg DBStatsGetter
|
||||
|
||||
// descriptions of exported metrics
|
||||
maxOpenDesc *prometheus.Desc
|
||||
openDesc *prometheus.Desc
|
||||
inUseDesc *prometheus.Desc
|
||||
idleDesc *prometheus.Desc
|
||||
waitedForDesc *prometheus.Desc
|
||||
blockedSecondsDesc *prometheus.Desc
|
||||
closedMaxIdleDesc *prometheus.Desc
|
||||
closedMaxLifetimeDesc *prometheus.Desc
|
||||
}
|
||||
|
||||
// NewDBStatsCollector creates a new DBStatsCollector.
|
||||
func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector {
|
||||
labels := prometheus.Labels{"db_name": dbName}
|
||||
return &DBStatsCollector{
|
||||
sg: sg,
|
||||
maxOpenDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "max_open"),
|
||||
"Maximum number of open connections to the database.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
openDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "open"),
|
||||
"The number of established connections both in use and idle.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
inUseDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "in_use"),
|
||||
"The number of connections currently in use.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
idleDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "idle"),
|
||||
"The number of idle connections.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
waitedForDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "waited_for"),
|
||||
"The total number of connections waited for.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
blockedSecondsDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"),
|
||||
"The total time blocked waiting for a new connection.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
closedMaxIdleDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"),
|
||||
"The total number of connections closed due to SetMaxIdleConns.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
closedMaxLifetimeDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"),
|
||||
"The total number of connections closed due to SetConnMaxLifetime.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Describe implements the prometheus.Collector interface.
|
||||
func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- c.maxOpenDesc
|
||||
ch <- c.openDesc
|
||||
ch <- c.inUseDesc
|
||||
ch <- c.idleDesc
|
||||
ch <- c.waitedForDesc
|
||||
ch <- c.blockedSecondsDesc
|
||||
ch <- c.closedMaxIdleDesc
|
||||
ch <- c.closedMaxLifetimeDesc
|
||||
}
|
||||
|
||||
// Collect implements the prometheus.Collector interface.
|
||||
func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
stats := c.sg.Stats()
|
||||
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.maxOpenDesc,
|
||||
prometheus.GaugeValue,
|
||||
float64(stats.MaxOpenConnections),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.openDesc,
|
||||
prometheus.GaugeValue,
|
||||
float64(stats.OpenConnections),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.inUseDesc,
|
||||
prometheus.GaugeValue,
|
||||
float64(stats.InUse),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.idleDesc,
|
||||
prometheus.GaugeValue,
|
||||
float64(stats.Idle),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.waitedForDesc,
|
||||
prometheus.CounterValue,
|
||||
float64(stats.WaitCount),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.blockedSecondsDesc,
|
||||
prometheus.CounterValue,
|
||||
stats.WaitDuration.Seconds(),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.closedMaxIdleDesc,
|
||||
prometheus.CounterValue,
|
||||
float64(stats.MaxIdleClosed),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.closedMaxLifetimeDesc,
|
||||
prometheus.CounterValue,
|
||||
float64(stats.MaxLifetimeClosed),
|
||||
)
|
||||
}
|
@ -1,151 +0,0 @@
|
||||
package prom
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
const statsSubsystem = "stats"
|
||||
|
||||
var (
|
||||
metrics bool
|
||||
|
||||
receipts prometheus.Counter
|
||||
transactions prometheus.Counter
|
||||
blocks prometheus.Counter
|
||||
|
||||
lenPayloadChan prometheus.Gauge
|
||||
|
||||
tPayloadDecode prometheus.Histogram
|
||||
tFreePostgres prometheus.Histogram
|
||||
tPostgresCommit prometheus.Histogram
|
||||
tHeaderProcessing prometheus.Histogram
|
||||
tUncleProcessing prometheus.Histogram
|
||||
tTxAndRecProcessing prometheus.Histogram
|
||||
tStateAndStoreProcessing prometheus.Histogram
|
||||
tCodeAndCodeHashProcessing prometheus.Histogram
|
||||
)
|
||||
|
||||
// Init module initialization
|
||||
func Init() {
|
||||
metrics = true
|
||||
|
||||
blocks = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "blocks",
|
||||
Help: "The total number of processed blocks",
|
||||
})
|
||||
transactions = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "transactions",
|
||||
Help: "The total number of processed transactions",
|
||||
})
|
||||
receipts = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "receipts",
|
||||
Help: "The total number of processed receipts",
|
||||
})
|
||||
|
||||
lenPayloadChan = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Name: "len_payload_chan",
|
||||
Help: "Current length of publishPayload",
|
||||
})
|
||||
|
||||
tFreePostgres = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: statsSubsystem,
|
||||
Name: "t_free_postgres",
|
||||
Help: "Time spent waiting for free postgres tx",
|
||||
})
|
||||
tPostgresCommit = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: statsSubsystem,
|
||||
Name: "t_postgres_commit",
|
||||
Help: "Postgres transaction commit duration",
|
||||
})
|
||||
tHeaderProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: statsSubsystem,
|
||||
Name: "t_header_processing",
|
||||
Help: "Header processing time",
|
||||
})
|
||||
tUncleProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: statsSubsystem,
|
||||
Name: "t_uncle_processing",
|
||||
Help: "Uncle processing time",
|
||||
})
|
||||
tTxAndRecProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: statsSubsystem,
|
||||
Name: "t_tx_receipt_processing",
|
||||
Help: "Tx and receipt processing time",
|
||||
})
|
||||
tStateAndStoreProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: statsSubsystem,
|
||||
Name: "t_state_store_code_processing",
|
||||
Help: "State, storage, and code combinedprocessing time",
|
||||
})
|
||||
}
|
||||
|
||||
// RegisterDBCollector create metric colletor for given connection
|
||||
func RegisterDBCollector(name string, db *sqlx.DB) {
|
||||
if metrics {
|
||||
prometheus.Register(NewDBStatsCollector(name, db))
|
||||
}
|
||||
}
|
||||
|
||||
// BlockInc block counter increment
|
||||
func BlockInc() {
|
||||
if metrics {
|
||||
blocks.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// TransactionInc transaction counter increment
|
||||
func TransactionInc() {
|
||||
if metrics {
|
||||
transactions.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// ReceiptInc receipt counter increment
|
||||
func ReceiptInc() {
|
||||
if metrics {
|
||||
receipts.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// SetLenPayloadChan set chan length
|
||||
func SetLenPayloadChan(ln int) {
|
||||
if metrics {
|
||||
lenPayloadChan.Set(float64(ln))
|
||||
}
|
||||
}
|
||||
|
||||
// SetTimeMetric time metric observation
|
||||
func SetTimeMetric(name string, t time.Duration) {
|
||||
if !metrics {
|
||||
return
|
||||
}
|
||||
tAsF64 := t.Seconds()
|
||||
switch name {
|
||||
case "t_free_postgres":
|
||||
tFreePostgres.Observe(tAsF64)
|
||||
case "t_postgres_commit":
|
||||
tPostgresCommit.Observe(tAsF64)
|
||||
case "t_header_processing":
|
||||
tHeaderProcessing.Observe(tAsF64)
|
||||
case "t_uncle_processing":
|
||||
tUncleProcessing.Observe(tAsF64)
|
||||
case "t_tx_receipt_processing":
|
||||
tTxAndRecProcessing.Observe(tAsF64)
|
||||
case "t_state_store_code_processing":
|
||||
tStateAndStoreProcessing.Observe(tAsF64)
|
||||
}
|
||||
}
|
@ -17,12 +17,11 @@
|
||||
package indexer
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
)
|
||||
|
||||
@ -30,7 +29,7 @@ var (
|
||||
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
|
||||
)
|
||||
|
||||
// Indexer satisfies the Indexer interface for ethereum
|
||||
// Handles processing and writing of indexed IPLD objects to Postgres
|
||||
type PostgresCIDWriter struct {
|
||||
db *postgres.DB
|
||||
}
|
||||
@ -51,7 +50,7 @@ func (in *PostgresCIDWriter) upsertHeaderCID(tx *sqlx.Tx, header models.HeaderMo
|
||||
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID)
|
||||
if err == nil {
|
||||
prom.BlockInc()
|
||||
indexerMetrics.blocks.Inc(1)
|
||||
}
|
||||
return headerID, err
|
||||
}
|
||||
@ -73,7 +72,7 @@ func (in *PostgresCIDWriter) upsertTransactionAndReceiptCIDs(tx *sqlx.Tx, payloa
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
prom.TransactionInc()
|
||||
indexerMetrics.transactions.Inc(1)
|
||||
receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)]
|
||||
if ok {
|
||||
if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil {
|
||||
@ -91,7 +90,7 @@ func (in *PostgresCIDWriter) upsertTransactionCID(tx *sqlx.Tx, transaction model
|
||||
RETURNING id`,
|
||||
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID)
|
||||
if err == nil {
|
||||
prom.TransactionInc()
|
||||
indexerMetrics.transactions.Inc(1)
|
||||
}
|
||||
return txID, err
|
||||
}
|
||||
@ -101,7 +100,7 @@ func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct models.ReceiptMod
|
||||
ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) = ($2, $3, $4, $5, $6, $7, $8, $9, $10)`,
|
||||
txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey)
|
||||
if err == nil {
|
||||
prom.ReceiptInc()
|
||||
indexerMetrics.receipts.Inc(1)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
@ -40,7 +41,6 @@ import (
|
||||
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
|
||||
. "github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
@ -74,7 +74,7 @@ type IService interface {
|
||||
// Main event loop for processing state diffs
|
||||
Loop(chainEventCh chan core.ChainEvent)
|
||||
// Method to subscribe to receive state diff processing output
|
||||
Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params)
|
||||
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
|
||||
// Method to unsubscribe from state diff processing
|
||||
Unsubscribe(id rpc.ID) error
|
||||
// Method to get state diff object at specific block
|
||||
@ -139,7 +139,6 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit
|
||||
}
|
||||
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
|
||||
}
|
||||
prom.Init()
|
||||
sds := &Service{
|
||||
Mutex: sync.Mutex{},
|
||||
BlockChain: blockChain,
|
||||
@ -416,6 +415,7 @@ func (sds *Service) Start() error {
|
||||
if sds.enableWriteLoop {
|
||||
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
|
||||
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
|
||||
go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -521,7 +521,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
||||
|
||||
// Writes a state diff from the current block, parent state root, and provided params
|
||||
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||
log.Info("writing state diff", "block height", block.Number().Uint64())
|
||||
log.Info("Writing state diff", "block height", block.Number().Uint64())
|
||||
var totalDifficulty *big.Int
|
||||
var receipts types.Receipts
|
||||
if params.IncludeTD {
|
||||
|
Loading…
Reference in New Issue
Block a user