diff --git a/cmd/geth/config.go b/cmd/geth/config.go
index ae5332e30..99eed3d4c 100644
--- a/cmd/geth/config.go
+++ b/cmd/geth/config.go
@@ -268,6 +268,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
if ctx.IsSet(utils.StateDiffLogStatements.Name) {
pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name)
}
+ if ctx.IsSet(utils.StateDiffCopyFrom.Name) {
+ pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name)
+ }
indexerConfig = pgConfig
case shared.DUMP:
dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name)
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index 16c549fed..5b3b93d5c 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -180,6 +180,7 @@ var (
utils.StateDiffWatchedAddressesFilePath,
utils.StateDiffUpsert,
utils.StateDiffLogStatements,
+ utils.StateDiffCopyFrom,
configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags)
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 9f2624d3c..ffab935d8 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -1083,6 +1083,13 @@ var (
Usage: "Should the statediff service log all database statements? (Note: pgx only)",
Value: false,
}
+
+ StateDiffCopyFrom = &cli.BoolFlag{
+ Name: "statediff.db.copyfrom",
+ Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)",
+ Value: false,
+ }
+
StateDiffWritingFlag = &cli.BoolFlag{
Name: "statediff.writing",
Usage: "Activates progressive writing of state diffs to database as new block are synced",
diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go
index 1c38b80ea..b6e7b9511 100644
--- a/consensus/ethash/consensus.go
+++ b/consensus/ethash/consensus.go
@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
@@ -330,7 +331,13 @@ func (ethash *Ethash) verifyHeader(chain consensus.ChainHeaderReader, header, pa
// the difficulty that a new block should have when created at time
// given the parent block's time and difficulty.
func (ethash *Ethash) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int {
- return CalcDifficulty(chain.Config(), time, parent)
+ var config = chain.Config()
+ var ret = CalcDifficulty(config, time, parent)
+ if nil != config.CappedMaximumDifficulty && ret.Cmp(config.CappedMaximumDifficulty) >= 0 {
+ log.Info(fmt.Sprintf("Using capped difficulty %d", config.CappedMaximumDifficulty))
+ return config.CappedMaximumDifficulty
+ }
+ return ret
}
// CalcDifficulty is the difficulty adjustment algorithm. It returns
diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go
index 15bd4e6eb..2ee1dd9d2 100644
--- a/ethdb/leveldb/leveldb.go
+++ b/ethdb/leveldb/leveldb.go
@@ -63,6 +63,13 @@ type Database struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
+ getTimer metrics.Timer // Timer/counter for measuring time and invocations of Get().
+ putTimer metrics.Timer // Timer/counter for measuring time and invocations of Put().
+ deleteTimer metrics.Timer // Timer/counter for measuring time and invocations of Delete().
+ hasTimer metrics.Timer // Timer/counter for measuring time and invocations of Has().
+ batchWriteTimer metrics.Timer // Timer/counter for measuring time and invocations of batch writes.
+ batchItemCounter metrics.Counter // Counter for measuring number of batched items written.
+
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
@@ -144,6 +151,13 @@ func NewCustom(file string, namespace string, customize func(options *opt.Option
ldb.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil)
ldb.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil)
+ ldb.getTimer = metrics.NewRegisteredTimer(namespace+"db/get/time", nil)
+ ldb.putTimer = metrics.NewRegisteredTimer(namespace+"db/put/time", nil)
+ ldb.deleteTimer = metrics.NewRegisteredTimer(namespace+"db/delete/time", nil)
+ ldb.hasTimer = metrics.NewRegisteredTimer(namespace+"db/has/time", nil)
+ ldb.batchWriteTimer = metrics.NewRegisteredTimer(namespace+"db/batch_write/time", nil)
+ ldb.batchItemCounter = metrics.NewRegisteredCounter(namespace+"db/batch_item/count", nil)
+
// Start up the metrics gathering and return
go ldb.meter(metricsGatheringInterval)
return ldb, nil
@@ -182,11 +196,17 @@ func (db *Database) Close() error {
// Has retrieves if a key is present in the key-value store.
func (db *Database) Has(key []byte) (bool, error) {
+ if nil != db.hasTimer {
+ defer func(start time.Time) { db.hasTimer.UpdateSince(start) }(time.Now())
+ }
return db.db.Has(key, nil)
}
// Get retrieves the given key if it's present in the key-value store.
func (db *Database) Get(key []byte) ([]byte, error) {
+ if nil != db.getTimer {
+ defer func(start time.Time) { db.getTimer.UpdateSince(start) }(time.Now())
+ }
dat, err := db.db.Get(key, nil)
if err != nil {
return nil, err
@@ -196,11 +216,17 @@ func (db *Database) Get(key []byte) ([]byte, error) {
// Put inserts the given value into the key-value store.
func (db *Database) Put(key []byte, value []byte) error {
+ if nil != db.putTimer {
+ defer func(start time.Time) { db.putTimer.UpdateSince(start) }(time.Now())
+ }
return db.db.Put(key, value, nil)
}
// Delete removes the key from the key-value store.
func (db *Database) Delete(key []byte) error {
+ if nil != db.deleteTimer {
+ defer func(start time.Time) { db.deleteTimer.UpdateSince(start) }(time.Now())
+ }
return db.db.Delete(key, nil)
}
@@ -208,16 +234,20 @@ func (db *Database) Delete(key []byte) error {
// database until a final write is called.
func (db *Database) NewBatch() ethdb.Batch {
return &batch{
- db: db.db,
- b: new(leveldb.Batch),
+ db: db.db,
+ b: new(leveldb.Batch),
+ writeTimer: &db.batchWriteTimer,
+ itemCounter: &db.batchItemCounter,
}
}
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (db *Database) NewBatchWithSize(size int) ethdb.Batch {
return &batch{
- db: db.db,
- b: leveldb.MakeBatch(size),
+ db: db.db,
+ b: leveldb.MakeBatch(size),
+ writeTimer: &db.batchWriteTimer,
+ itemCounter: &db.batchItemCounter,
}
}
@@ -468,9 +498,11 @@ func (db *Database) meter(refresh time.Duration) {
// batch is a write-only leveldb batch that commits changes to its host database
// when Write is called. A batch cannot be used concurrently.
type batch struct {
- db *leveldb.DB
- b *leveldb.Batch
- size int
+ db *leveldb.DB
+ b *leveldb.Batch
+ size int
+ writeTimer *metrics.Timer
+ itemCounter *metrics.Counter
}
// Put inserts the given value into the batch for later committing.
@@ -494,6 +526,12 @@ func (b *batch) ValueSize() int {
// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
+ if nil != *b.writeTimer {
+ defer func(start time.Time) { (*b.writeTimer).UpdateSince(start) }(time.Now())
+ }
+ if nil != *b.itemCounter {
+ (*b.itemCounter).Inc(int64(b.size))
+ }
return b.db.Write(b.b, nil)
}
diff --git a/go.mod b/go.mod
index 0be358207..97dea73a8 100644
--- a/go.mod
+++ b/go.mod
@@ -63,7 +63,7 @@ require (
github.com/rs/cors v1.7.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4
- github.com/stretchr/testify v1.8.0
+ github.com/stretchr/testify v1.7.0
github.com/supranational/blst v0.3.8
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/thoas/go-funk v0.9.2
@@ -120,7 +120,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
- github.com/stretchr/objx v0.4.0 // indirect
+ github.com/stretchr/objx v0.3.0 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
diff --git a/go.sum b/go.sum
index a1a93a6fb..85f989d51 100644
--- a/go.sum
+++ b/go.sum
@@ -537,17 +537,15 @@ github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZL
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
-github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
-github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As=
+github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
-github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/supranational/blst v0.3.8 h1:glwLF4oBRSJOTr05lRBgNwGQST0ndP2wg29fSeTRKCY=
github.com/supranational/blst v0.3.8/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
diff --git a/metrics/prometheus/collector.go b/metrics/prometheus/collector.go
index e8d5e4f5d..76bece915 100644
--- a/metrics/prometheus/collector.go
+++ b/metrics/prometheus/collector.go
@@ -82,6 +82,9 @@ func (c *collector) addTimer(name string, m metrics.Timer) {
c.writeSummaryPercentile(name, strconv.FormatFloat(pv[i], 'f', -1, 64), ps[i])
}
c.buff.WriteRune('\n')
+
+ c.buff.WriteString(fmt.Sprintf(typeGaugeTpl, mutateKey(name+"_total")))
+ c.buff.WriteString(fmt.Sprintf(keyValueTpl, mutateKey(name+"_total"), m.Total()))
}
func (c *collector) addResettingTimer(name string, m metrics.ResettingTimer) {
diff --git a/metrics/prometheus/collector_test.go b/metrics/prometheus/collector_test.go
index 43f2f804d..194ff5541 100644
--- a/metrics/prometheus/collector_test.go
+++ b/metrics/prometheus/collector_test.go
@@ -92,6 +92,9 @@ test_timer {quantile="0.99"} 1.2e+08
test_timer {quantile="0.999"} 1.2e+08
test_timer {quantile="0.9999"} 1.2e+08
+# TYPE test_timer_total gauge
+test_timer_total 230000000
+
# TYPE test_resetting_timer_count counter
test_resetting_timer_count 6
diff --git a/metrics/timer.go b/metrics/timer.go
index a63c9dfb6..3d68c3135 100644
--- a/metrics/timer.go
+++ b/metrics/timer.go
@@ -25,6 +25,7 @@ type Timer interface {
Update(time.Duration)
UpdateSince(time.Time)
Variance() float64
+ Total() int64
}
// GetOrRegisterTimer returns an existing Timer or constructs and registers a
@@ -47,6 +48,7 @@ func NewCustomTimer(h Histogram, m Meter) Timer {
return &StandardTimer{
histogram: h,
meter: m,
+ total: NewCounter(),
}
}
@@ -72,6 +74,7 @@ func NewTimer() Timer {
return &StandardTimer{
histogram: NewHistogram(NewExpDecaySample(1028, 0.015)),
meter: NewMeter(),
+ total: NewCounter(),
}
}
@@ -134,11 +137,15 @@ func (NilTimer) UpdateSince(time.Time) {}
// Variance is a no-op.
func (NilTimer) Variance() float64 { return 0.0 }
+// Total is a no-op.
+func (NilTimer) Total() int64 { return int64(0) }
+
// StandardTimer is the standard implementation of a Timer and uses a Histogram
// and Meter.
type StandardTimer struct {
histogram Histogram
meter Meter
+ total Counter
mutex sync.Mutex
}
@@ -200,6 +207,7 @@ func (t *StandardTimer) Snapshot() Timer {
return &TimerSnapshot{
histogram: t.histogram.Snapshot().(*HistogramSnapshot),
meter: t.meter.Snapshot().(*MeterSnapshot),
+ total: t.total.Snapshot().(CounterSnapshot),
}
}
@@ -231,14 +239,12 @@ func (t *StandardTimer) Update(d time.Duration) {
defer t.mutex.Unlock()
t.histogram.Update(int64(d))
t.meter.Mark(1)
+ t.total.Inc(int64(d))
}
// Record the duration of an event that started at a time and ends now.
func (t *StandardTimer) UpdateSince(ts time.Time) {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- t.histogram.Update(int64(time.Since(ts)))
- t.meter.Mark(1)
+ t.Update(time.Since(ts))
}
// Variance returns the variance of the values in the sample.
@@ -246,10 +252,18 @@ func (t *StandardTimer) Variance() float64 {
return t.histogram.Variance()
}
+// Total returns the total time the timer has run in nanoseconds.
+// This differs from Sum in that it is a simple counter, not based
+// on a histogram Sample.
+func (t *StandardTimer) Total() int64 {
+ return t.total.Count()
+}
+
// TimerSnapshot is a read-only copy of another Timer.
type TimerSnapshot struct {
histogram *HistogramSnapshot
meter *MeterSnapshot
+ total CounterSnapshot
}
// Count returns the number of events recorded at the time the snapshot was
@@ -324,3 +338,6 @@ func (*TimerSnapshot) UpdateSince(time.Time) {
// Variance returns the variance of the values at the time the snapshot was
// taken.
func (t *TimerSnapshot) Variance() float64 { return t.histogram.Variance() }
+
+// Total returns the total time the timer has run in nanoseconds.
+func (t *TimerSnapshot) Total() int64 { return t.total.Count() }
diff --git a/params/config.go b/params/config.go
index 80e671f9b..b56a3a606 100644
--- a/params/config.go
+++ b/params/config.go
@@ -270,16 +270,16 @@ var (
//
// This configuration is intentionally not using keyed fields to force anyone
// adding flags to the config to also have to set these fields.
- AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil}
+ AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil}
// AllCliqueProtocolChanges contains every protocol change (EIPs) introduced
// and accepted by the Ethereum core developers into the Clique consensus.
//
// This configuration is intentionally not using keyed fields to force anyone
// adding flags to the config to also have to set these fields.
- AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}}
+ AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}}
- TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil}
+ TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil}
TestRules = TestChainConfig.Rules(new(big.Int), false)
)
@@ -377,6 +377,9 @@ type ChainConfig struct {
// the network that triggers the consensus upgrade.
TerminalTotalDifficulty *big.Int `json:"terminalTotalDifficulty,omitempty"`
+ // Cap the maximum total difficulty (for testnet use only).
+ CappedMaximumDifficulty *big.Int `json:"cappedMaximumDifficulty,omitempty"`
+
// TerminalTotalDifficultyPassed is a flag specifying that the network already
// passed the terminal total difficulty. Its purpose is to disable legacy sync
// even without having seen the TTD locally (safer long term).
@@ -419,7 +422,11 @@ func (c *ChainConfig) String() string {
switch {
case c.Ethash != nil:
if c.TerminalTotalDifficulty == nil {
- banner += "Consensus: Ethash (proof-of-work)\n"
+ if nil == c.CappedMaximumDifficulty {
+ banner += "Consensus: Ethash (proof-of-work)\n"
+ } else {
+ banner += fmt.Sprintf("Consensus: Ethash (proof-of-work, capped difficulty at %d)\n", c.CappedMaximumDifficulty)
+ }
} else if !c.TerminalTotalDifficultyPassed {
banner += "Consensus: Beacon (proof-of-stake), merging from Ethash (proof-of-work)\n"
} else {
@@ -434,7 +441,11 @@ func (c *ChainConfig) String() string {
banner += "Consensus: Beacon (proof-of-stake), merged from Clique (proof-of-authority)\n"
}
default:
- banner += "Consensus: unknown\n"
+ if nil == c.CappedMaximumDifficulty {
+ banner += "Consensus: unknown\n"
+ } else {
+ banner += fmt.Sprintf("Consensus: unknown (capped difficulty at %d)\n", c.CappedMaximumDifficulty)
+ }
}
banner += "\n"
diff --git a/statediff/builder.go b/statediff/builder.go
index 8fdbd5b37..9b79b1dee 100644
--- a/statediff/builder.go
+++ b/statediff/builder.go
@@ -22,6 +22,7 @@ package statediff
import (
"bytes"
"fmt"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@@ -29,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
+ metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/statediff/trie_helpers"
@@ -45,7 +47,7 @@ var (
// Builder interface exposes the method for building a state diff between two blocks
type Builder interface {
BuildStateDiffObject(args Args, params Params) (types2.StateObject, error)
- WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error
+ WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error
}
type StateDiffBuilder struct {
@@ -84,11 +86,10 @@ func NewBuilder(stateCache state.Database) Builder {
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (types2.StateObject, error) {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []types2.StateLeafNode
var iplds []types2.IPLD
- err := sdb.WriteStateDiffObject(
- types2.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot},
- params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds))
+ err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds))
if err != nil {
return types2.StateObject{}, err
}
@@ -101,8 +102,9 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (typ
}
// WriteStateDiffObject writes a statediff object to output sinks
-func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink,
+func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink,
ipldOutput types2.IPLDSink) error {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.WriteStateDiffObjectTimer)
// Load tries for old and new states
oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot)
if err != nil {
@@ -128,16 +130,19 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params
},
}
- return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput)
+ logger := log.New("hash", args.BlockHash.Hex(), "number", args.BlockNumber)
+ return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput, logger)
}
func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params,
- output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
+ output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error {
+ logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes")
+ defer metrics2.ReportAndUpdateDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer)
// collect a slice of all the nodes that were touched and exist at B (B-A)
// a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, err := sdb.createdAndUpdatedState(
- iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput)
+ iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
}
@@ -146,28 +151,34 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
// a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState(
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
- params.watchedAddressesLeafPaths, output)
+ params.watchedAddressesLeafPaths, output, logger)
if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
}
// collect and sort the leafkey keys for both account mappings into a slice
+ t := time.Now()
createKeys := trie_helpers.SortKeys(diffAccountsAtB)
deleteKeys := trie_helpers.SortKeys(diffAccountsAtA)
+ logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes sort duration=%dms", time.Since(t).Milliseconds()))
// and then find the intersection of these keys
// these are the leafkeys for the accounts which exist at both A and B but are different
// this also mutates the passed in createKeys and deleteKeys, removing the intersection keys
// and leaving the truly created or deleted keys in place
+ t = time.Now()
updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys)
+ logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes intersection count=%d duration=%dms",
+ len(updatedKeys),
+ time.Since(t).Milliseconds()))
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
- err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput)
+ err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for updated accounts: %v", err)
}
// build the diff nodes for created accounts
- err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput)
+ err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for created accounts: %v", err)
}
@@ -179,11 +190,13 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
- watchedAddressesLeafPaths [][]byte, output types2.IPLDSink) (types2.AccountMap, error) {
+ watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger) (types2.AccountMap, error) {
+ logger.Debug("statediff BEGIN createdAndUpdatedState")
+ defer metrics2.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateTimer)
diffAccountsAtB := make(types2.AccountMap)
watchingAddresses := len(watchedAddressesLeafPaths) > 0
- it, _ := trie.NewDifferenceIterator(a, b)
+ it, itCount := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
// ignore node if it is not along paths of interest
if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, it.Path()) {
@@ -236,6 +249,8 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
}
}
}
+ logger.Debug("statediff COUNTS createdAndUpdatedStateWithIntermediateNodes", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
+ metrics2.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
return diffAccountsAtB, it.Error()
}
@@ -275,7 +290,9 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watched
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap,
- watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink) (types2.AccountMap, error) {
+ watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger) (types2.AccountMap, error) {
+ logger.Debug("statediff BEGIN deletedOrUpdatedState")
+ defer metrics2.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer)
diffAccountAtA := make(types2.AccountMap)
watchingAddresses := len(watchedAddressesLeafPaths) > 0
@@ -330,7 +347,9 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
// needs to be called before building account creations and deletions as this mutates
// those account maps to remove the accounts which were updated
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.AccountMap, updatedKeys []string,
- output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
+ output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error {
+ logger.Debug("statediff BEGIN buildAccountUpdates", "creations", len(creations), "deletions", len(deletions), "updatedKeys", len(updatedKeys))
+ defer metrics2.ReportAndUpdateDuration("statediff END buildAccountUpdates ", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountUpdatesTimer)
var err error
for _, key := range updatedKeys {
createdAcc := creations[key]
@@ -361,7 +380,10 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.Acc
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
// it also returns the code and codehash for created contract accounts
-func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
+func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink,
+ ipldOutput types2.IPLDSink, logger log.Logger) error {
+ logger.Debug("statediff BEGIN buildAccountCreations")
+ defer metrics2.ReportAndUpdateDuration("statediff END buildAccountCreations", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountCreationsTimer)
for _, val := range accounts {
diff := types2.StateLeafNode{
AccountWrapper: val,
@@ -400,6 +422,7 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, o
// i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output types2.StorageNodeSink,
ipldOutput types2.IPLDSink) error {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesEventualTimer)
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil
}
@@ -421,6 +444,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output ty
// including intermediate nodes can be turned on or off
func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink,
ipldOutput types2.IPLDSink) error {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesFromTrieTimer)
for it.Next(true) {
if it.Leaf() {
storageLeafNode, err := sdb.processStorageValueNode(it)
@@ -470,6 +494,7 @@ func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator) (type
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output types2.StorageNodeSink) error {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedAccountStorageNodesTimer)
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil
}
@@ -489,6 +514,7 @@ func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, out
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink) error {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedStorageNodesFromTrieTimer)
for it.Next(true) {
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
leafKey := make([]byte, len(it.LeafKey()))
@@ -509,6 +535,7 @@ func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterat
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, output types2.StorageNodeSink,
ipldOutput types2.IPLDSink) error {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesIncrementalTimer)
if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) {
return nil
}
@@ -537,6 +564,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output types2.StorageNodeSink,
ipldOutput types2.IPLDSink) (map[string]bool, error) {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.CreatedAndUpdatedStorageTimer)
diffSlotsAtB := make(map[string]bool)
it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
@@ -566,6 +594,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, ou
}
func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, output types2.StorageNodeSink) error {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.DeletedOrUpdatedStorageTimer)
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if it.Leaf() {
@@ -602,6 +631,7 @@ func isValidPrefixPath(watchedAddressesLeafPaths [][]byte, currentPath []byte) b
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
func isWatchedAddress(watchedAddressesLeafPaths [][]byte, valueNodePath []byte) bool {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.IsWatchedAddressTimer)
for _, watchedAddressPath := range watchedAddressesLeafPaths {
if bytes.Equal(watchedAddressPath, valueNodePath) {
return true
diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go
index d0a0fddce..307e95b9c 100644
--- a/statediff/indexer/database/dump/indexer.go
+++ b/statediff/indexer/database/dump/indexer.go
@@ -29,9 +29,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@@ -41,10 +41,6 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
-var (
- indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
-)
-
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
dump io.WriteCloser
@@ -106,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
close(self.quit)
close(self.iplds)
tDiff := time.Since(t)
- indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
+ metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
@@ -115,7 +111,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return err
}
tDiff = time.Since(t)
- indexerMetrics.tPostgresCommit.Update(tDiff)
+ metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
@@ -125,7 +121,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
go blockTx.cache()
tDiff := time.Since(t)
- indexerMetrics.tFreePostgres.Update(tDiff)
+ metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
@@ -137,7 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
- indexerMetrics.tHeaderProcessing.Update(tDiff)
+ metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
@@ -146,7 +142,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
- indexerMetrics.tUncleProcessing.Update(tDiff)
+ metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
@@ -163,7 +159,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
- indexerMetrics.tTxAndRecProcessing.Update(tDiff)
+ metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()
diff --git a/statediff/indexer/database/dump/metrics.go b/statediff/indexer/database/dump/metrics.go
deleted file mode 100644
index 700e42dc0..000000000
--- a/statediff/indexer/database/dump/metrics.go
+++ /dev/null
@@ -1,94 +0,0 @@
-// VulcanizeDB
-// Copyright © 2021 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package dump
-
-import (
- "strings"
-
- "github.com/ethereum/go-ethereum/metrics"
-)
-
-const (
- namespace = "statediff"
-)
-
-// Build a fully qualified metric name
-func metricName(subsystem, name string) string {
- if name == "" {
- return ""
- }
- parts := []string{namespace, name}
- if subsystem != "" {
- parts = []string{namespace, subsystem, name}
- }
- // Prometheus uses _ but geth metrics uses / and replaces
- return strings.Join(parts, "/")
-}
-
-type indexerMetricsHandles struct {
- // The total number of processed blocks
- blocks metrics.Counter
- // The total number of processed transactions
- transactions metrics.Counter
- // The total number of processed receipts
- receipts metrics.Counter
- // The total number of processed logs
- logs metrics.Counter
- // The total number of access list entries processed
- accessListEntries metrics.Counter
- // Time spent waiting for free postgres tx
- tFreePostgres metrics.Timer
- // Postgres transaction commit duration
- tPostgresCommit metrics.Timer
- // Header processing time
- tHeaderProcessing metrics.Timer
- // Uncle processing time
- tUncleProcessing metrics.Timer
- // Tx and receipt processing time
- tTxAndRecProcessing metrics.Timer
- // State, storage, and code combined processing time
- tStateStoreCodeProcessing metrics.Timer
-}
-
-func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
- ctx := indexerMetricsHandles{
- blocks: metrics.NewCounter(),
- transactions: metrics.NewCounter(),
- receipts: metrics.NewCounter(),
- logs: metrics.NewCounter(),
- accessListEntries: metrics.NewCounter(),
- tFreePostgres: metrics.NewTimer(),
- tPostgresCommit: metrics.NewTimer(),
- tHeaderProcessing: metrics.NewTimer(),
- tUncleProcessing: metrics.NewTimer(),
- tTxAndRecProcessing: metrics.NewTimer(),
- tStateStoreCodeProcessing: metrics.NewTimer(),
- }
- subsys := "indexer"
- reg.Register(metricName(subsys, "blocks"), ctx.blocks)
- reg.Register(metricName(subsys, "transactions"), ctx.transactions)
- reg.Register(metricName(subsys, "receipts"), ctx.receipts)
- reg.Register(metricName(subsys, "logs"), ctx.logs)
- reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
- reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
- reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
- reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
- reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
- reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
- reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
- return ctx
-}
diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go
index 0261735a6..23e92296a 100644
--- a/statediff/indexer/database/file/csv_writer.go
+++ b/statediff/indexer/database/file/csv_writer.go
@@ -28,6 +28,7 @@ import (
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
@@ -235,7 +236,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase)
csw.rows <- tableRow{schema.TableHeader, values}
- indexerMetrics.blocks.Inc(1)
+ metrics.IndexerMetrics.BlocksCounter.Inc(1)
}
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
@@ -250,7 +251,7 @@ func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.Type, transaction.Value)
csw.rows <- tableRow{schema.TableTransaction, values}
- indexerMetrics.transactions.Inc(1)
+ metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
@@ -258,7 +259,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, rct.PostStatus)
csw.rows <- tableRow{schema.TableReceipt, values}
- indexerMetrics.receipts.Inc(1)
+ metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
}
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
@@ -267,7 +268,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3)
csw.rows <- tableRow{schema.TableLog, values}
- indexerMetrics.logs.Inc(1)
+ metrics.IndexerMetrics.LogsCounter.Inc(1)
}
}
@@ -284,13 +285,8 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
}
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
- var storageKey string
- if storageCID.StorageKey != nullHash.String() {
- storageKey = storageCID.StorageKey
- }
-
var values []interface{}
- values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageKey, storageCID.CID,
+ values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed)
csw.rows <- tableRow{schema.TableStorageNode, values}
}
diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go
index cc1515b8b..ae9d8694a 100644
--- a/statediff/indexer/database/file/indexer.go
+++ b/statediff/indexer/database/file/indexer.go
@@ -34,9 +34,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@@ -53,10 +53,6 @@ const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address,
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
-var (
- indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
-)
-
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
fileWriter FileWriter
@@ -172,12 +168,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
BlockNumber: block.Number().String(),
submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t)
- indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
+ metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
sdi.fileWriter.Flush()
tDiff = time.Since(t)
- indexerMetrics.tPostgresCommit.Update(tDiff)
+ metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
@@ -185,21 +181,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
},
}
tDiff := time.Since(t)
- indexerMetrics.tFreePostgres.Update(tDiff)
+ metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
// write header, collect headerID
headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty)
tDiff = time.Since(t)
- indexerMetrics.tHeaderProcessing.Update(tDiff)
+ metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// write uncles
sdi.processUncles(headerID, block.Number(), block.UncleHash(), block.Uncles())
tDiff = time.Since(t)
- indexerMetrics.tUncleProcessing.Update(tDiff)
+ metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
@@ -217,7 +213,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
- indexerMetrics.tTxAndRecProcessing.Update(tDiff)
+ metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()
diff --git a/statediff/indexer/database/file/metrics.go b/statediff/indexer/database/file/metrics.go
deleted file mode 100644
index ca6e88f2b..000000000
--- a/statediff/indexer/database/file/metrics.go
+++ /dev/null
@@ -1,94 +0,0 @@
-// VulcanizeDB
-// Copyright © 2021 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package file
-
-import (
- "strings"
-
- "github.com/ethereum/go-ethereum/metrics"
-)
-
-const (
- namespace = "statediff"
-)
-
-// Build a fully qualified metric name
-func metricName(subsystem, name string) string {
- if name == "" {
- return ""
- }
- parts := []string{namespace, name}
- if subsystem != "" {
- parts = []string{namespace, subsystem, name}
- }
- // Prometheus uses _ but geth metrics uses / and replaces
- return strings.Join(parts, "/")
-}
-
-type indexerMetricsHandles struct {
- // The total number of processed blocks
- blocks metrics.Counter
- // The total number of processed transactions
- transactions metrics.Counter
- // The total number of processed receipts
- receipts metrics.Counter
- // The total number of processed logs
- logs metrics.Counter
- // The total number of access list entries processed
- accessListEntries metrics.Counter
- // Time spent waiting for free postgres tx
- tFreePostgres metrics.Timer
- // Postgres transaction commit duration
- tPostgresCommit metrics.Timer
- // Header processing time
- tHeaderProcessing metrics.Timer
- // Uncle processing time
- tUncleProcessing metrics.Timer
- // Tx and receipt processing time
- tTxAndRecProcessing metrics.Timer
- // State, storage, and code combined processing time
- tStateStoreCodeProcessing metrics.Timer
-}
-
-func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
- ctx := indexerMetricsHandles{
- blocks: metrics.NewCounter(),
- transactions: metrics.NewCounter(),
- receipts: metrics.NewCounter(),
- logs: metrics.NewCounter(),
- accessListEntries: metrics.NewCounter(),
- tFreePostgres: metrics.NewTimer(),
- tPostgresCommit: metrics.NewTimer(),
- tHeaderProcessing: metrics.NewTimer(),
- tUncleProcessing: metrics.NewTimer(),
- tTxAndRecProcessing: metrics.NewTimer(),
- tStateStoreCodeProcessing: metrics.NewTimer(),
- }
- subsys := "indexer"
- reg.Register(metricName(subsys, "blocks"), ctx.blocks)
- reg.Register(metricName(subsys, "transactions"), ctx.transactions)
- reg.Register(metricName(subsys, "receipts"), ctx.receipts)
- reg.Register(metricName(subsys, "logs"), ctx.logs)
- reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
- reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
- reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
- reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
- reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
- reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
- reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
- return ctx
-}
diff --git a/statediff/indexer/database/file/sql_writer.go b/statediff/indexer/database/file/sql_writer.go
index c79ed843e..1e0acb21f 100644
--- a/statediff/indexer/database/file/sql_writer.go
+++ b/statediff/indexer/database/file/sql_writer.go
@@ -28,6 +28,7 @@ import (
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
@@ -35,7 +36,6 @@ import (
)
var (
- nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
pipeSize = 65336 // min(linuxPipeSize, macOSPipeSize)
writeBufferSize = pipeSize * 16 * 96
)
@@ -191,7 +191,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase)
sqw.stmts <- []byte(stmt)
- indexerMetrics.blocks.Inc(1)
+ metrics.IndexerMetrics.BlocksCounter.Inc(1)
}
func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
@@ -202,20 +202,20 @@ func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.Type, transaction.Value))
- indexerMetrics.transactions.Inc(1)
+ metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, rct.PostStatus))
- indexerMetrics.receipts.Inc(1)
+ metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3))
- indexerMetrics.logs.Inc(1)
+ metrics.IndexerMetrics.LogsCounter.Inc(1)
}
}
diff --git a/statediff/indexer/database/metrics/metrics.go b/statediff/indexer/database/metrics/metrics.go
new file mode 100644
index 000000000..34cb3a69b
--- /dev/null
+++ b/statediff/indexer/database/metrics/metrics.go
@@ -0,0 +1,263 @@
+// VulcanizeDB
+// Copyright © 2021 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package metrics
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+
+ "github.com/ethereum/go-ethereum/metrics"
+)
+
+const (
+ namespace = "statediff"
+)
+
+var (
+ IndexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
+ DBMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
+)
+
+// Build a fully qualified metric name
+func metricName(subsystem, name string) string {
+ if name == "" {
+ return ""
+ }
+ parts := []string{namespace, name}
+ if subsystem != "" {
+ parts = []string{namespace, subsystem, name}
+ }
+ // Prometheus uses _ but geth metrics uses / and replaces
+ return strings.Join(parts, "/")
+}
+
+type IndexerMetricsHandles struct {
+ // The total number of processed BlocksCounter
+ BlocksCounter metrics.Counter
+ // The total number of processed transactions
+ TransactionsCounter metrics.Counter
+ // The total number of processed receipts
+ ReceiptsCounter metrics.Counter
+ // The total number of processed logs
+ LogsCounter metrics.Counter
+ // The total number of access list entries processed
+ AccessListEntriesCounter metrics.Counter
+ // Time spent waiting for free postgres tx
+ FreePostgresTimer metrics.Timer
+ // Postgres transaction commit duration
+ PostgresCommitTimer metrics.Timer
+ // Header processing time
+ HeaderProcessingTimer metrics.Timer
+ // Uncle processing time
+ UncleProcessingTimer metrics.Timer
+ // Tx and receipt processing time
+ TxAndRecProcessingTimer metrics.Timer
+ // State, storage, and code combined processing time
+ StateStoreCodeProcessingTimer metrics.Timer
+
+ // Fine-grained code timers
+ BuildStateDiffWithIntermediateStateNodesTimer metrics.Timer
+ BuildStateDiffWithoutIntermediateStateNodesTimer metrics.Timer
+ CreatedAndUpdatedStateWithIntermediateNodesTimer metrics.Timer
+ DeletedOrUpdatedStateTimer metrics.Timer
+ BuildAccountUpdatesTimer metrics.Timer
+ BuildAccountCreationsTimer metrics.Timer
+ ResolveNodeTimer metrics.Timer
+ SortKeysTimer metrics.Timer
+ FindIntersectionTimer metrics.Timer
+ OutputTimer metrics.Timer
+ IPLDOutputTimer metrics.Timer
+ DifferenceIteratorNextTimer metrics.Timer
+ DifferenceIteratorCounter metrics.Counter
+ DeletedOrUpdatedStorageTimer metrics.Timer
+ CreatedAndUpdatedStorageTimer metrics.Timer
+ BuildStorageNodesIncrementalTimer metrics.Timer
+ BuildStateTrieObjectTimer metrics.Timer
+ BuildStateTrieTimer metrics.Timer
+ BuildStateDiffObjectTimer metrics.Timer
+ WriteStateDiffObjectTimer metrics.Timer
+ CreatedAndUpdatedStateTimer metrics.Timer
+ BuildStorageNodesEventualTimer metrics.Timer
+ BuildStorageNodesFromTrieTimer metrics.Timer
+ BuildRemovedAccountStorageNodesTimer metrics.Timer
+ BuildRemovedStorageNodesFromTrieTimer metrics.Timer
+ IsWatchedAddressTimer metrics.Timer
+}
+
+func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
+ ctx := IndexerMetricsHandles{
+ BlocksCounter: metrics.NewCounter(),
+ TransactionsCounter: metrics.NewCounter(),
+ ReceiptsCounter: metrics.NewCounter(),
+ LogsCounter: metrics.NewCounter(),
+ AccessListEntriesCounter: metrics.NewCounter(),
+ FreePostgresTimer: metrics.NewTimer(),
+ PostgresCommitTimer: metrics.NewTimer(),
+ HeaderProcessingTimer: metrics.NewTimer(),
+ UncleProcessingTimer: metrics.NewTimer(),
+ TxAndRecProcessingTimer: metrics.NewTimer(),
+ StateStoreCodeProcessingTimer: metrics.NewTimer(),
+ BuildStateDiffWithIntermediateStateNodesTimer: metrics.NewTimer(),
+ BuildStateDiffWithoutIntermediateStateNodesTimer: metrics.NewTimer(),
+ CreatedAndUpdatedStateWithIntermediateNodesTimer: metrics.NewTimer(),
+ DeletedOrUpdatedStateTimer: metrics.NewTimer(),
+ BuildAccountUpdatesTimer: metrics.NewTimer(),
+ BuildAccountCreationsTimer: metrics.NewTimer(),
+ ResolveNodeTimer: metrics.NewTimer(),
+ SortKeysTimer: metrics.NewTimer(),
+ FindIntersectionTimer: metrics.NewTimer(),
+ OutputTimer: metrics.NewTimer(),
+ IPLDOutputTimer: metrics.NewTimer(),
+ DifferenceIteratorNextTimer: metrics.NewTimer(),
+ DifferenceIteratorCounter: metrics.NewCounter(),
+ DeletedOrUpdatedStorageTimer: metrics.NewTimer(),
+ CreatedAndUpdatedStorageTimer: metrics.NewTimer(),
+ BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
+ BuildStateTrieObjectTimer: metrics.NewTimer(),
+ BuildStateTrieTimer: metrics.NewTimer(),
+ BuildStateDiffObjectTimer: metrics.NewTimer(),
+ WriteStateDiffObjectTimer: metrics.NewTimer(),
+ CreatedAndUpdatedStateTimer: metrics.NewTimer(),
+ BuildStorageNodesEventualTimer: metrics.NewTimer(),
+ BuildStorageNodesFromTrieTimer: metrics.NewTimer(),
+ BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(),
+ BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(),
+ IsWatchedAddressTimer: metrics.NewTimer(),
+ }
+ subsys := "indexer"
+ reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter)
+ reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter)
+ reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter)
+ reg.Register(metricName(subsys, "logs"), ctx.LogsCounter)
+ reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter)
+ reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer)
+ reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer)
+ reg.Register(metricName(subsys, "t_header_processing"), ctx.HeaderProcessingTimer)
+ reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
+ reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
+ reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
+ reg.Register(metricName(subsys, "t_build_statediff_with_intermediate_state_nodes"), ctx.BuildStateDiffWithIntermediateStateNodesTimer)
+ reg.Register(metricName(subsys, "t_build_statediff_without_intermediate_state_nodes"), ctx.BuildStateDiffWithoutIntermediateStateNodesTimer)
+ reg.Register(metricName(subsys, "t_created_and_update_state_with_intermediate_nodes"), ctx.CreatedAndUpdatedStateWithIntermediateNodesTimer)
+ reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer)
+ reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer)
+ reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer)
+ reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer)
+ reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer)
+ reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer)
+ reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer)
+ reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer)
+ reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer)
+ reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter)
+ reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer)
+ reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer)
+ reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
+ reg.Register(metricName(subsys, "t_build_state_trie_object"), ctx.BuildStateTrieObjectTimer)
+ reg.Register(metricName(subsys, "t_build_state_trie"), ctx.BuildStateTrieTimer)
+ reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
+ reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer)
+ reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer)
+ reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer)
+ reg.Register(metricName(subsys, "t_build_storage_nodes_from_trie"), ctx.BuildStorageNodesFromTrieTimer)
+ reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer)
+ reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer)
+ reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer)
+
+ log.Debug("Registering statediff indexer metrics.")
+ return ctx
+}
+
+type dbMetricsHandles struct {
+ // Maximum number of open connections to the sql
+ maxOpen metrics.Gauge
+ // The number of established connections both in use and idle
+ open metrics.Gauge
+ // The number of connections currently in use
+ inUse metrics.Gauge
+ // The number of idle connections
+ idle metrics.Gauge
+ // The total number of connections waited for
+ waitedFor metrics.Counter
+ // The total time blocked waiting for a new connection
+ blockedMilliseconds metrics.Counter
+ // The total number of connections closed due to SetMaxIdleConns
+ closedMaxIdle metrics.Counter
+ // The total number of connections closed due to SetConnMaxLifetime
+ closedMaxLifetime metrics.Counter
+}
+
+func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
+ ctx := dbMetricsHandles{
+ maxOpen: metrics.NewGauge(),
+ open: metrics.NewGauge(),
+ inUse: metrics.NewGauge(),
+ idle: metrics.NewGauge(),
+ waitedFor: metrics.NewCounter(),
+ blockedMilliseconds: metrics.NewCounter(),
+ closedMaxIdle: metrics.NewCounter(),
+ closedMaxLifetime: metrics.NewCounter(),
+ }
+ subsys := "connections"
+ reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
+ reg.Register(metricName(subsys, "open"), ctx.open)
+ reg.Register(metricName(subsys, "in_use"), ctx.inUse)
+ reg.Register(metricName(subsys, "idle"), ctx.idle)
+ reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
+ reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
+ reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
+ reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
+
+ log.Debug("Registering statediff DB metrics.")
+ return ctx
+}
+
+// DbStats interface to accommodate different concrete sql stats types
+type DbStats interface {
+ MaxOpen() int64
+ Open() int64
+ InUse() int64
+ Idle() int64
+ WaitCount() int64
+ WaitDuration() time.Duration
+ MaxIdleClosed() int64
+ MaxLifetimeClosed() int64
+}
+
+func (met *dbMetricsHandles) Update(stats DbStats) {
+ met.maxOpen.Update(stats.MaxOpen())
+ met.open.Update(stats.Open())
+ met.inUse.Update(stats.InUse())
+ met.idle.Update(stats.Idle())
+ met.waitedFor.Inc(stats.WaitCount())
+ met.blockedMilliseconds.Inc(stats.WaitDuration().Milliseconds())
+ met.closedMaxIdle.Inc(stats.MaxIdleClosed())
+ met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed())
+}
+
+func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) {
+ since := UpdateDuration(start, timer)
+ logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds()))
+}
+
+func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration {
+ since := time.Since(start)
+ timer.Update(since)
+ return since
+}
diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go
index 2960c90c5..a41e1631a 100644
--- a/statediff/indexer/database/sql/indexer.go
+++ b/statediff/indexer/database/sql/indexer.go
@@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
+ metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@@ -44,11 +45,6 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
-var (
- indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
- dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
-)
-
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql
type StateDiffIndexer struct {
ctx context.Context
@@ -75,7 +71,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
for {
select {
case <-ticker.C:
- dbMetrics.Update(sdi.dbWriter.db.Stats())
+ metrics2.DBMetrics.Update(sdi.dbWriter.db.Stats())
case <-quit:
ticker.Stop()
return
@@ -156,7 +152,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
rollback(sdi.ctx, tx)
} else {
tDiff := time.Since(t)
- indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
+ metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
@@ -167,7 +163,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
err = tx.Commit(sdi.ctx)
tDiff = time.Since(t)
- indexerMetrics.tPostgresCommit.Update(tDiff)
+ metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
@@ -178,7 +174,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
go blockTx.cache()
tDiff := time.Since(t)
- indexerMetrics.tFreePostgres.Update(tDiff)
+ metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
@@ -190,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
- indexerMetrics.tHeaderProcessing.Update(tDiff)
+ metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
@@ -199,7 +195,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
- indexerMetrics.tUncleProcessing.Update(tDiff)
+ metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
@@ -216,7 +212,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
- indexerMetrics.tTxAndRecProcessing.Update(tDiff)
+ metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()
diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go
index 3fee858d6..f964a2a90 100644
--- a/statediff/indexer/database/sql/interfaces.go
+++ b/statediff/indexer/database/sql/interfaces.go
@@ -19,7 +19,8 @@ package sql
import (
"context"
"io"
- "time"
+
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
)
// Database interfaces required by the sql indexer
@@ -30,12 +31,13 @@ type Database interface {
// Driver interface has all the methods required by a driver implementation to support the sql indexer
type Driver interface {
+ UseCopyFrom() bool
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Begin(ctx context.Context) (Tx, error)
- Stats() Stats
+ Stats() metrics.DbStats
NodeID() string
Context() context.Context
io.Closer
@@ -52,12 +54,25 @@ type Statements interface {
InsertStorageStm() string
InsertIPLDStm() string
InsertIPLDsStm() string
+
+ // Table/column descriptions for use with CopyFrom and similar commands.
+ LogTableName() []string
+ LogColumnNames() []string
+ RctTableName() []string
+ RctColumnNames() []string
+ StateTableName() []string
+ StateColumnNames() []string
+ StorageTableName() []string
+ StorageColumnNames() []string
+ TxTableName() []string
+ TxColumnNames() []string
}
// Tx interface to accommodate different concrete SQL transaction types
type Tx interface {
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
+ CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error)
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}
@@ -71,15 +86,3 @@ type ScannableRow interface {
type Result interface {
RowsAffected() (int64, error)
}
-
-// Stats interface to accommodate different concrete sql stats types
-type Stats interface {
- MaxOpen() int64
- Open() int64
- InUse() int64
- Idle() int64
- WaitCount() int64
- WaitDuration() time.Duration
- MaxIdleClosed() int64
- MaxLifetimeClosed() int64
-}
diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go
index 922bf84a0..b2445e0d8 100644
--- a/statediff/indexer/database/sql/lazy_tx.go
+++ b/statediff/indexer/database/sql/lazy_tx.go
@@ -2,10 +2,16 @@ package sql
import (
"context"
+ "reflect"
+
+ "github.com/ethereum/go-ethereum/log"
)
+// Changing this to 1 would make sure only sequential COPYs were combined.
+const copyFromCheckLimit = 100
+
type DelayedTx struct {
- cache []cachedStmt
+ cache []interface{}
db Database
}
type cachedStmt struct {
@@ -13,6 +19,20 @@ type cachedStmt struct {
args []interface{}
}
+type copyFrom struct {
+ tableName []string
+ columnNames []string
+ rows [][]interface{}
+}
+
+func (cf *copyFrom) appendRows(rows [][]interface{}) {
+ cf.rows = append(cf.rows, rows...)
+}
+
+func (cf *copyFrom) matches(tableName []string, columnNames []string) bool {
+ return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames)
+}
+
func NewDelayedTx(db Database) *DelayedTx {
return &DelayedTx{db: db}
}
@@ -21,6 +41,28 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
return tx.db.QueryRow(ctx, sql, args...)
}
+func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
+ for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
+ prevCopy, ok := tx.cache[pos].(*copyFrom)
+ if ok && prevCopy.matches(tableName, columnNames) {
+ return prevCopy, count
+ }
+ }
+ return nil, -1
+}
+
+func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
+ if prevCopy, distance := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy {
+ log.Trace("statediff lazy_tx : Appending to COPY", "table", tableName,
+ "current", len(prevCopy.rows), "new", len(rows), "distance", distance)
+ prevCopy.appendRows(rows)
+ } else {
+ tx.cache = append(tx.cache, ©From{tableName, columnNames, rows})
+ }
+
+ return 0, nil
+}
+
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
tx.cache = append(tx.cache, cachedStmt{sql, args})
return nil, nil
@@ -39,10 +81,19 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
rollback(ctx, base)
}
}()
- for _, stmt := range tx.cache {
- _, err := base.Exec(ctx, stmt.sql, stmt.args...)
- if err != nil {
- return err
+ for _, item := range tx.cache {
+ switch item := item.(type) {
+ case *copyFrom:
+ _, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
+ if err != nil {
+ log.Error("COPY error", "table", item.tableName, "err", err)
+ return err
+ }
+ case cachedStmt:
+ _, err := base.Exec(ctx, item.sql, item.args...)
+ if err != nil {
+ return err
+ }
}
}
tx.cache = nil
diff --git a/statediff/indexer/database/sql/metrics.go b/statediff/indexer/database/sql/metrics.go
deleted file mode 100644
index b0946a722..000000000
--- a/statediff/indexer/database/sql/metrics.go
+++ /dev/null
@@ -1,147 +0,0 @@
-// VulcanizeDB
-// Copyright © 2021 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package sql
-
-import (
- "strings"
-
- "github.com/ethereum/go-ethereum/metrics"
-)
-
-const (
- namespace = "statediff"
-)
-
-// Build a fully qualified metric name
-func metricName(subsystem, name string) string {
- if name == "" {
- return ""
- }
- parts := []string{namespace, name}
- if subsystem != "" {
- parts = []string{namespace, subsystem, name}
- }
- // Prometheus uses _ but geth metrics uses / and replaces
- return strings.Join(parts, "/")
-}
-
-type indexerMetricsHandles struct {
- // The total number of processed blocks
- blocks metrics.Counter
- // The total number of processed transactions
- transactions metrics.Counter
- // The total number of processed receipts
- receipts metrics.Counter
- // The total number of processed logs
- logs metrics.Counter
- // The total number of access list entries processed
- accessListEntries metrics.Counter
- // Time spent waiting for free postgres tx
- tFreePostgres metrics.Timer
- // Postgres transaction commit duration
- tPostgresCommit metrics.Timer
- // Header processing time
- tHeaderProcessing metrics.Timer
- // Uncle processing time
- tUncleProcessing metrics.Timer
- // Tx and receipt processing time
- tTxAndRecProcessing metrics.Timer
- // State, storage, and code combined processing time
- tStateStoreCodeProcessing metrics.Timer
-}
-
-func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
- ctx := indexerMetricsHandles{
- blocks: metrics.NewCounter(),
- transactions: metrics.NewCounter(),
- receipts: metrics.NewCounter(),
- logs: metrics.NewCounter(),
- accessListEntries: metrics.NewCounter(),
- tFreePostgres: metrics.NewTimer(),
- tPostgresCommit: metrics.NewTimer(),
- tHeaderProcessing: metrics.NewTimer(),
- tUncleProcessing: metrics.NewTimer(),
- tTxAndRecProcessing: metrics.NewTimer(),
- tStateStoreCodeProcessing: metrics.NewTimer(),
- }
- subsys := "indexer"
- reg.Register(metricName(subsys, "blocks"), ctx.blocks)
- reg.Register(metricName(subsys, "transactions"), ctx.transactions)
- reg.Register(metricName(subsys, "receipts"), ctx.receipts)
- reg.Register(metricName(subsys, "logs"), ctx.logs)
- reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
- reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
- reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
- reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
- reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
- reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
- reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
- return ctx
-}
-
-type dbMetricsHandles struct {
- // Maximum number of open connections to the sql
- maxOpen metrics.Gauge
- // The number of established connections both in use and idle
- open metrics.Gauge
- // The number of connections currently in use
- inUse metrics.Gauge
- // The number of idle connections
- idle metrics.Gauge
- // The total number of connections waited for
- waitedFor metrics.Counter
- // The total time blocked waiting for a new connection
- blockedMilliseconds metrics.Counter
- // The total number of connections closed due to SetMaxIdleConns
- closedMaxIdle metrics.Counter
- // The total number of connections closed due to SetConnMaxLifetime
- closedMaxLifetime metrics.Counter
-}
-
-func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
- ctx := dbMetricsHandles{
- maxOpen: metrics.NewGauge(),
- open: metrics.NewGauge(),
- inUse: metrics.NewGauge(),
- idle: metrics.NewGauge(),
- waitedFor: metrics.NewCounter(),
- blockedMilliseconds: metrics.NewCounter(),
- closedMaxIdle: metrics.NewCounter(),
- closedMaxLifetime: metrics.NewCounter(),
- }
- subsys := "connections"
- reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
- reg.Register(metricName(subsys, "open"), ctx.open)
- reg.Register(metricName(subsys, "in_use"), ctx.inUse)
- reg.Register(metricName(subsys, "idle"), ctx.idle)
- reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
- reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
- reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
- reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
- return ctx
-}
-
-func (met *dbMetricsHandles) Update(stats Stats) {
- met.maxOpen.Update(stats.MaxOpen())
- met.open.Update(stats.Open())
- met.inUse.Update(stats.InUse())
- met.idle.Update(stats.Idle())
- met.waitedFor.Inc(stats.WaitCount())
- met.blockedMilliseconds.Inc(stats.WaitDuration().Milliseconds())
- met.closedMaxIdle.Inc(stats.MaxIdleClosed())
- met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed())
-}
diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go
index 292548b75..80094a8d0 100644
--- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go
+++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go
@@ -28,7 +28,7 @@ import (
)
func setupLegacyPGXIndexer(t *testing.T) {
- db, err = postgres.SetupPGXDB()
+ db, err = postgres.SetupPGXDB(postgres.DefaultConfig)
if err != nil {
t.Fatal(err)
}
diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go
index 1dbf2dfa0..c0ce57c1f 100644
--- a/statediff/indexer/database/sql/pgx_indexer_test.go
+++ b/statediff/indexer/database/sql/pgx_indexer_test.go
@@ -29,8 +29,8 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/test"
)
-func setupPGXIndexer(t *testing.T) {
- db, err = postgres.SetupPGXDB()
+func setupPGXIndexer(t *testing.T, config postgres.Config) {
+ db, err = postgres.SetupPGXDB(config)
if err != nil {
t.Fatal(err)
}
@@ -39,12 +39,16 @@ func setupPGXIndexer(t *testing.T) {
}
func setupPGX(t *testing.T) {
- setupPGXIndexer(t)
+ setupPGXWithConfig(t, postgres.DefaultConfig)
+}
+
+func setupPGXWithConfig(t *testing.T, config postgres.Config) {
+ setupPGXIndexer(t, config)
test.SetupTestData(t, ind)
}
func setupPGXNonCanonical(t *testing.T) {
- setupPGXIndexer(t)
+ setupPGXIndexer(t, postgres.DefaultConfig)
test.SetupTestDataNonCanonical(t, ind)
}
@@ -97,6 +101,20 @@ func TestPGXIndexer(t *testing.T) {
test.TestPublishAndIndexStorageIPLDs(t, db)
})
+
+ t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) {
+ config := postgres.DefaultConfig
+ config.CopyFrom = true
+
+ setupPGXWithConfig(t, config)
+ defer tearDown(t)
+ defer checkTxClosure(t, 1, 0, 1)
+
+ test.TestPublishAndIndexStateIPLDs(t, db)
+ test.TestPublishAndIndexStorageIPLDs(t, db)
+ test.TestPublishAndIndexReceiptIPLDs(t, db)
+ test.TestPublishAndIndexLogIPLDs(t, db)
+ })
}
// Test indexer for a canonical + a non-canonical block at London height + a non-canonical block at London height + 1
@@ -151,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) {
}
func TestPGXWatchAddressMethods(t *testing.T) {
- setupPGXIndexer(t)
+ setupPGXIndexer(t, postgres.DefaultConfig)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go
index 2038bf1f5..1cbec55c1 100644
--- a/statediff/indexer/database/sql/postgres/config.go
+++ b/statediff/indexer/database/sql/postgres/config.go
@@ -92,6 +92,9 @@ type Config struct {
// toggle on/off upserts
Upsert bool
+
+ // toggle on/off CopyFrom
+ CopyFrom bool
}
// Type satisfies interfaces.Config
diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go
index a508da83f..b371a83b2 100644
--- a/statediff/indexer/database/sql/postgres/database.go
+++ b/statediff/indexer/database/sql/postgres/database.go
@@ -84,3 +84,43 @@ func (db *DB) InsertIPLDStm() string {
func (db *DB) InsertIPLDsStm() string {
return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING`
}
+
+func (db *DB) LogTableName() []string {
+ return []string{"eth", "log_cids"}
+}
+
+func (db *DB) LogColumnNames() []string {
+ return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"}
+}
+
+func (db *DB) RctTableName() []string {
+ return []string{"eth", "receipt_cids"}
+}
+
+func (db *DB) RctColumnNames() []string {
+ return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"}
+}
+
+func (db *DB) StateTableName() []string {
+ return []string{"eth", "state_cids"}
+}
+
+func (db *DB) StateColumnNames() []string {
+ return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"}
+}
+
+func (db *DB) StorageTableName() []string {
+ return []string{"eth", "storage_cids"}
+}
+
+func (db *DB) StorageColumnNames() []string {
+ return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"}
+}
+
+func (db *DB) TxTableName() []string {
+ return []string{"eth", "transaction_cids"}
+}
+
+func (db *DB) TxColumnNames() []string {
+ return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"}
+}
diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go
index 073d92744..7825e34bb 100644
--- a/statediff/indexer/database/sql/postgres/pgx.go
+++ b/statediff/indexer/database/sql/postgres/pgx.go
@@ -27,6 +27,7 @@ import (
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
)
@@ -37,6 +38,7 @@ type PGXDriver struct {
pool *pgxpool.Pool
nodeInfo node.Info
nodeID string
+ config Config
}
// ConnectPGX initializes and returns a PGX connection pool
@@ -55,7 +57,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive
if err != nil {
return nil, ErrDBConnectionFailed(err)
}
- pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node}
+ pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config}
nodeErr := pg.createNode()
if nodeErr != nil {
return &PGXDriver{}, ErrUnableToSetNode(nodeErr)
@@ -144,7 +146,7 @@ func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) {
return pgxTxWrapper{tx: tx}, nil
}
-func (pgx *PGXDriver) Stats() sql.Stats {
+func (pgx *PGXDriver) Stats() metrics.DbStats {
stats := pgx.pool.Stat()
return pgxStatsWrapper{stats: stats}
}
@@ -165,6 +167,11 @@ func (pgx *PGXDriver) Context() context.Context {
return pgx.ctx
}
+// HasCopy satisfies sql.Database
+func (pgx *PGXDriver) UseCopyFrom() bool {
+ return pgx.config.CopyFrom
+}
+
type resultWrapper struct {
ct pgconn.CommandTag
}
@@ -178,43 +185,43 @@ type pgxStatsWrapper struct {
stats *pgxpool.Stat
}
-// MaxOpen satisfies sql.Stats
+// MaxOpen satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxOpen() int64 {
return int64(s.stats.MaxConns())
}
-// Open satisfies sql.Stats
+// Open satisfies metrics.DbStats
func (s pgxStatsWrapper) Open() int64 {
return int64(s.stats.TotalConns())
}
-// InUse satisfies sql.Stats
+// InUse satisfies metrics.DbStats
func (s pgxStatsWrapper) InUse() int64 {
return int64(s.stats.AcquiredConns())
}
-// Idle satisfies sql.Stats
+// Idle satisfies metrics.DbStats
func (s pgxStatsWrapper) Idle() int64 {
return int64(s.stats.IdleConns())
}
-// WaitCount satisfies sql.Stats
+// WaitCount satisfies metrics.DbStats
func (s pgxStatsWrapper) WaitCount() int64 {
return s.stats.EmptyAcquireCount()
}
-// WaitDuration satisfies sql.Stats
+// WaitDuration satisfies metrics.DbStats
func (s pgxStatsWrapper) WaitDuration() time.Duration {
return s.stats.AcquireDuration()
}
-// MaxIdleClosed satisfies sql.Stats
+// MaxIdleClosed satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxIdleClosed() int64 {
// this stat isn't supported by pgxpool, but we don't want to panic
return 0
}
-// MaxLifetimeClosed satisfies sql.Stats
+// MaxLifetimeClosed satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxLifetimeClosed() int64 {
return s.stats.CanceledAcquireCount()
}
@@ -243,3 +250,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error {
func (t pgxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback(ctx)
}
+
+func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
+ return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows))
+}
diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go
index c41d39828..452b4988a 100644
--- a/statediff/indexer/database/sql/postgres/sqlx.go
+++ b/statediff/indexer/database/sql/postgres/sqlx.go
@@ -19,10 +19,12 @@ package postgres
import (
"context"
coresql "database/sql"
+ "errors"
"time"
"github.com/jmoiron/sqlx"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
)
@@ -109,7 +111,7 @@ func (driver *SQLXDriver) Begin(_ context.Context) (sql.Tx, error) {
return sqlxTxWrapper{tx: tx}, nil
}
-func (driver *SQLXDriver) Stats() sql.Stats {
+func (driver *SQLXDriver) Stats() metrics.DbStats {
stats := driver.db.Stats()
return sqlxStatsWrapper{stats: stats}
}
@@ -129,46 +131,52 @@ func (driver *SQLXDriver) Context() context.Context {
return driver.ctx
}
+// HasCopy satisfies sql.Database
+func (driver *SQLXDriver) UseCopyFrom() bool {
+ // sqlx does not currently support COPY.
+ return false
+}
+
type sqlxStatsWrapper struct {
stats coresql.DBStats
}
-// MaxOpen satisfies sql.Stats
+// MaxOpen satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxOpen() int64 {
return int64(s.stats.MaxOpenConnections)
}
-// Open satisfies sql.Stats
+// Open satisfies metrics.DbStats
func (s sqlxStatsWrapper) Open() int64 {
return int64(s.stats.OpenConnections)
}
-// InUse satisfies sql.Stats
+// InUse satisfies metrics.DbStats
func (s sqlxStatsWrapper) InUse() int64 {
return int64(s.stats.InUse)
}
-// Idle satisfies sql.Stats
+// Idle satisfies metrics.DbStats
func (s sqlxStatsWrapper) Idle() int64 {
return int64(s.stats.Idle)
}
-// WaitCount satisfies sql.Stats
+// WaitCount satisfies metrics.DbStats
func (s sqlxStatsWrapper) WaitCount() int64 {
return s.stats.WaitCount
}
-// WaitDuration satisfies sql.Stats
+// WaitDuration satisfies metrics.DbStats
func (s sqlxStatsWrapper) WaitDuration() time.Duration {
return s.stats.WaitDuration
}
-// MaxIdleClosed satisfies sql.Stats
+// MaxIdleClosed satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxIdleClosed() int64 {
return s.stats.MaxIdleClosed
}
-// MaxLifetimeClosed satisfies sql.Stats
+// MaxLifetimeClosed satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxLifetimeClosed() int64 {
return s.stats.MaxLifetimeClosed
}
@@ -196,3 +204,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error {
func (t sqlxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback()
}
+
+func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
+ return 0, errors.New("Unsupported Operation")
+}
diff --git a/statediff/indexer/database/sql/postgres/test_helpers.go b/statediff/indexer/database/sql/postgres/test_helpers.go
index f8311b413..cb5255429 100644
--- a/statediff/indexer/database/sql/postgres/test_helpers.go
+++ b/statediff/indexer/database/sql/postgres/test_helpers.go
@@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) {
}
// SetupPGXDB is used to setup a pgx db for tests
-func SetupPGXDB() (sql.Database, error) {
- driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{})
+func SetupPGXDB(config Config) (sql.Database, error) {
+ driver, err := NewPGXDriver(context.Background(), config, node.Info{})
if err != nil {
return nil, err
}
diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go
index bd6fb5b67..faa63cd21 100644
--- a/statediff/indexer/database/sql/writer.go
+++ b/statediff/indexer/database/sql/writer.go
@@ -18,9 +18,11 @@ package sql
import (
"fmt"
+ "strconv"
"github.com/lib/pq"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
)
@@ -66,7 +68,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
if err != nil {
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
}
- indexerMetrics.blocks.Inc(1)
+ metrics.IndexerMetrics.BlocksCounter.Inc(1)
return nil
}
@@ -94,20 +96,39 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr
ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
- _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
- transaction.BlockNumber,
- transaction.HeaderID,
- transaction.TxHash,
- transaction.CID,
- transaction.Dst,
- transaction.Src,
- transaction.Index,
- transaction.Type,
- transaction.Value)
- if err != nil {
- return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
+ if w.useCopyForTx(tx) {
+ blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64)
+ if err != nil {
+ return insertError{"eth.transaction_cids", err, "COPY", transaction}
+ }
+
+ value, err := strconv.ParseFloat(transaction.Value, 64)
+ if err != nil {
+ return insertError{"eth.transaction_cids", err, "COPY", transaction}
+ }
+
+ _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(),
+ toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
+ transaction.Src, transaction.Index, int(transaction.Type), value)))
+ if err != nil {
+ return insertError{"eth.transaction_cids", err, "COPY", transaction}
+ }
+ } else {
+ _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
+ transaction.BlockNumber,
+ transaction.HeaderID,
+ transaction.TxHash,
+ transaction.CID,
+ transaction.Dst,
+ transaction.Src,
+ transaction.Index,
+ transaction.Type,
+ transaction.Value)
+ if err != nil {
+ return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
+ }
}
- indexerMetrics.transactions.Inc(1)
+ metrics.IndexerMetrics.TransactionsCounter.Inc(1)
return nil
}
@@ -116,18 +137,32 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, cid, contract, pos
ON CONFLICT (tx_id, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
- _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
- rct.BlockNumber,
- rct.HeaderID,
- rct.TxID,
- rct.CID,
- rct.Contract,
- rct.PostState,
- rct.PostStatus)
- if err != nil {
- return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
+ if w.useCopyForTx(tx) {
+ blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64)
+ if err != nil {
+ return insertError{"eth.receipt_cids", err, "COPY", rct}
+ }
+
+ _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(),
+ toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
+ rct.PostState, int(rct.PostStatus))))
+ if err != nil {
+ return insertError{"eth.receipt_cids", err, "COPY", rct}
+ }
+ } else {
+ _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
+ rct.BlockNumber,
+ rct.HeaderID,
+ rct.TxID,
+ rct.CID,
+ rct.Contract,
+ rct.PostState,
+ rct.PostStatus)
+ if err != nil {
+ return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
+ }
}
- indexerMetrics.receipts.Inc(1)
+ metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
return nil
}
@@ -136,22 +171,42 @@ INSERT INTO eth.log_cids (block_number, header_id, cid, rct_id, address, index,
ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
- for _, log := range logs {
- _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
- log.BlockNumber,
- log.HeaderID,
- log.CID,
- log.ReceiptID,
- log.Address,
- log.Index,
- log.Topic0,
- log.Topic1,
- log.Topic2,
- log.Topic3)
- if err != nil {
- return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
+ if w.useCopyForTx(tx) {
+ var rows [][]interface{}
+ for _, log := range logs {
+ blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64)
+ if err != nil {
+ return insertError{"eth.log_cids", err, "COPY", log}
+ }
+
+ rows = append(rows, toRow(blockNum, log.HeaderID, log.CID, log.ReceiptID,
+ log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3))
+ }
+ if nil != rows && len(rows) >= 0 {
+ _, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows)
+ if err != nil {
+ return insertError{"eth.log_cids", err, "COPY", rows}
+ }
+ metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows)))
+ }
+ } else {
+ for _, log := range logs {
+ _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
+ log.BlockNumber,
+ log.HeaderID,
+ log.CID,
+ log.ReceiptID,
+ log.Address,
+ log.Index,
+ log.Topic0,
+ log.Topic1,
+ log.Topic2,
+ log.Topic3)
+ if err != nil {
+ return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
+ }
+ metrics.IndexerMetrics.LogsCounter.Inc(1)
}
- indexerMetrics.logs.Inc(1)
}
return nil
}
@@ -165,20 +220,39 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
if stateNode.Removed {
balance = "0"
}
- _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
- stateNode.BlockNumber,
- stateNode.HeaderID,
- stateNode.StateKey,
- stateNode.CID,
- true,
- balance,
- stateNode.Nonce,
- stateNode.CodeHash,
- stateNode.StorageRoot,
- stateNode.Removed,
- )
- if err != nil {
- return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
+
+ if w.useCopyForTx(tx) {
+ blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64)
+ if err != nil {
+ return insertError{"eth.state_cids", err, "COPY", stateNode}
+ }
+ balInt, err := strconv.ParseInt(balance, 10, 64)
+ if err != nil {
+ return insertError{"eth.state_cids", err, "COPY", stateNode}
+ }
+
+ _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(),
+ toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
+ true, balInt, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
+ if err != nil {
+ return insertError{"eth.state_cids", err, "COPY", stateNode}
+ }
+ } else {
+ _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
+ stateNode.BlockNumber,
+ stateNode.HeaderID,
+ stateNode.StateKey,
+ stateNode.CID,
+ true,
+ balance,
+ stateNode.Nonce,
+ stateNode.CodeHash,
+ stateNode.StorageRoot,
+ stateNode.Removed,
+ )
+ if err != nil {
+ return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
+ }
}
return nil
}
@@ -188,22 +262,57 @@ INSERT INTO eth.storage_cids (block_number, header_id, state_leaf_key, storage_l
ON CONFLICT (header_id, state_leaf_key, storage_leaf_key, block_number) DO NOTHING
*/
func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error {
- _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
- storageCID.BlockNumber,
- storageCID.HeaderID,
- storageCID.StateKey,
- storageCID.StorageKey,
- storageCID.CID,
- true,
- storageCID.Value,
- storageCID.Removed,
- )
- if err != nil {
- return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
+ if w.useCopyForTx(tx) {
+ blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64)
+ if err != nil {
+ return insertError{"eth.storage_cids", err, "COPY", storageCID}
+ }
+
+ _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(),
+ toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
+ true, storageCID.Value, storageCID.Removed)))
+ if err != nil {
+ return insertError{"eth.storage_cids", err, "COPY", storageCID}
+ }
+ } else {
+ _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
+ storageCID.BlockNumber,
+ storageCID.HeaderID,
+ storageCID.StateKey,
+ storageCID.StorageKey,
+ storageCID.CID,
+ true,
+ storageCID.Value,
+ storageCID.Removed,
+ )
+ if err != nil {
+ return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
+ }
}
return nil
}
+func (w *Writer) useCopyForTx(tx Tx) bool {
+ // Using COPY instead of INSERT only makes much sense if also using a DelayedTx, so that operations
+ // can be collected over time and then all submitted within in a single TX.
+ if _, ok := tx.(*DelayedTx); ok {
+ return w.db.UseCopyFrom()
+ }
+ return false
+}
+
+// combine args into a row
+func toRow(args ...interface{}) []interface{} {
+ var row []interface{}
+ row = append(row, args...)
+ return row
+}
+
+// combine row (or rows) into a slice of rows for CopyFrom
+func toRows(rows ...[]interface{}) [][]interface{} {
+ return rows
+}
+
type insertError struct {
table string
err error
diff --git a/statediff/service.go b/statediff/service.go
index 4bc5cda84..7325971c1 100644
--- a/statediff/service.go
+++ b/statediff/service.go
@@ -26,8 +26,6 @@ import (
"sync/atomic"
"time"
- "github.com/ethereum/go-ethereum/trie"
-
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
@@ -43,9 +41,11 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
ind "github.com/ethereum/go-ethereum/statediff/indexer"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
types2 "github.com/ethereum/go-ethereum/statediff/types"
+ "github.com/ethereum/go-ethereum/trie"
"github.com/thoas/go-funk"
)
@@ -794,15 +794,23 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
}
output := func(node types2.StateLeafNode) error {
+ defer func() {
+ // This is very noisy so we log at Trace.
+ since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
+ logger.Trace(fmt.Sprintf("statediff output duration=%dms", since.Milliseconds()))
+ }()
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
ipldOutput := func(c types2.IPLD) error {
+ defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, metrics.IndexerMetrics.IPLDOutputTimer)
return sds.indexer.PushIPLD(tx, c)
}
- err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
+ err = sds.Builder.WriteStateDiffObject(Args{
NewStateRoot: block.Root(),
OldStateRoot: parentRoot,
+ BlockHash: block.Hash(),
+ BlockNumber: block.Number(),
}, params, output, ipldOutput)
// TODO this anti-pattern needs to be sorted out eventually
if err := tx.Submit(err); err != nil {
diff --git a/statediff/service_test.go b/statediff/service_test.go
index e921d0893..1df068608 100644
--- a/statediff/service_test.go
+++ b/statediff/service_test.go
@@ -16,7 +16,6 @@
package statediff_test
-/*
import (
"bytes"
"math/big"
@@ -438,4 +437,3 @@ func testGetSyncStatus(t *testing.T) {
}
}
}
-*/
diff --git a/statediff/test_helpers/mocks/builder.go b/statediff/test_helpers/mocks/builder.go
index 9e3ba0ec5..490393e53 100644
--- a/statediff/test_helpers/mocks/builder.go
+++ b/statediff/test_helpers/mocks/builder.go
@@ -44,8 +44,8 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi
}
// BuildStateDiffObject mock method
-func (builder *Builder) WriteStateDiffObject(args sdtypes.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
- builder.StateRoots = args
+func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
+ builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}
builder.Params = params
return builder.builderError
diff --git a/statediff/trie_helpers/helpers.go b/statediff/trie_helpers/helpers.go
index 0f5152774..d6c024ee4 100644
--- a/statediff/trie_helpers/helpers.go
+++ b/statediff/trie_helpers/helpers.go
@@ -22,12 +22,16 @@ package trie_helpers
import (
"sort"
"strings"
+ "time"
+
+ metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/types"
)
// SortKeys sorts the keys in the account map
func SortKeys(data types.AccountMap) []string {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer)
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
@@ -41,6 +45,7 @@ func SortKeys(data types.AccountMap) []string {
// a and b must first be sorted
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
func FindIntersection(a, b []string) []string {
+ defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer)
lenA := len(a)
lenB := len(b)
iOfA, iOfB := 0, 0
diff --git a/trie/iterator.go b/trie/iterator.go
index d2c8b6a78..3c2bc4ada 100644
--- a/trie/iterator.go
+++ b/trie/iterator.go
@@ -20,6 +20,9 @@ import (
"bytes"
"container/heap"
"errors"
+ "time"
+
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
@@ -584,6 +587,7 @@ func (it *differenceIterator) AddResolver(resolver ethdb.KeyValueReader) {
}
func (it *differenceIterator) Next(bool) bool {
+ defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DifferenceIteratorNextTimer)
// Invariants:
// - We always advance at least one element in b.
// - At the start of this function, a's path is lexically greater than b's.