diff --git a/cmd/geth/config.go b/cmd/geth/config.go index ae5332e30..99eed3d4c 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -268,6 +268,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if ctx.IsSet(utils.StateDiffLogStatements.Name) { pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name) } + if ctx.IsSet(utils.StateDiffCopyFrom.Name) { + pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name) + } indexerConfig = pgConfig case shared.DUMP: dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 16c549fed..5b3b93d5c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -180,6 +180,7 @@ var ( utils.StateDiffWatchedAddressesFilePath, utils.StateDiffUpsert, utils.StateDiffLogStatements, + utils.StateDiffCopyFrom, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9f2624d3c..ffab935d8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1083,6 +1083,13 @@ var ( Usage: "Should the statediff service log all database statements? (Note: pgx only)", Value: false, } + + StateDiffCopyFrom = &cli.BoolFlag{ + Name: "statediff.db.copyfrom", + Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)", + Value: false, + } + StateDiffWritingFlag = &cli.BoolFlag{ Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 1c38b80ea..b6e7b9511 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -330,7 +331,13 @@ func (ethash *Ethash) verifyHeader(chain consensus.ChainHeaderReader, header, pa // the difficulty that a new block should have when created at time // given the parent block's time and difficulty. func (ethash *Ethash) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int { - return CalcDifficulty(chain.Config(), time, parent) + var config = chain.Config() + var ret = CalcDifficulty(config, time, parent) + if nil != config.CappedMaximumDifficulty && ret.Cmp(config.CappedMaximumDifficulty) >= 0 { + log.Info(fmt.Sprintf("Using capped difficulty %d", config.CappedMaximumDifficulty)) + return config.CappedMaximumDifficulty + } + return ret } // CalcDifficulty is the difficulty adjustment algorithm. It returns diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 15bd4e6eb..2ee1dd9d2 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -63,6 +63,13 @@ type Database struct { fn string // filename for reporting db *leveldb.DB // LevelDB instance + getTimer metrics.Timer // Timer/counter for measuring time and invocations of Get(). + putTimer metrics.Timer // Timer/counter for measuring time and invocations of Put(). + deleteTimer metrics.Timer // Timer/counter for measuring time and invocations of Delete(). + hasTimer metrics.Timer // Timer/counter for measuring time and invocations of Has(). + batchWriteTimer metrics.Timer // Timer/counter for measuring time and invocations of batch writes. + batchItemCounter metrics.Counter // Counter for measuring number of batched items written. + compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction compReadMeter metrics.Meter // Meter for measuring the data read during compaction compWriteMeter metrics.Meter // Meter for measuring the data written during compaction @@ -144,6 +151,13 @@ func NewCustom(file string, namespace string, customize func(options *opt.Option ldb.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil) ldb.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil) + ldb.getTimer = metrics.NewRegisteredTimer(namespace+"db/get/time", nil) + ldb.putTimer = metrics.NewRegisteredTimer(namespace+"db/put/time", nil) + ldb.deleteTimer = metrics.NewRegisteredTimer(namespace+"db/delete/time", nil) + ldb.hasTimer = metrics.NewRegisteredTimer(namespace+"db/has/time", nil) + ldb.batchWriteTimer = metrics.NewRegisteredTimer(namespace+"db/batch_write/time", nil) + ldb.batchItemCounter = metrics.NewRegisteredCounter(namespace+"db/batch_item/count", nil) + // Start up the metrics gathering and return go ldb.meter(metricsGatheringInterval) return ldb, nil @@ -182,11 +196,17 @@ func (db *Database) Close() error { // Has retrieves if a key is present in the key-value store. func (db *Database) Has(key []byte) (bool, error) { + if nil != db.hasTimer { + defer func(start time.Time) { db.hasTimer.UpdateSince(start) }(time.Now()) + } return db.db.Has(key, nil) } // Get retrieves the given key if it's present in the key-value store. func (db *Database) Get(key []byte) ([]byte, error) { + if nil != db.getTimer { + defer func(start time.Time) { db.getTimer.UpdateSince(start) }(time.Now()) + } dat, err := db.db.Get(key, nil) if err != nil { return nil, err @@ -196,11 +216,17 @@ func (db *Database) Get(key []byte) ([]byte, error) { // Put inserts the given value into the key-value store. func (db *Database) Put(key []byte, value []byte) error { + if nil != db.putTimer { + defer func(start time.Time) { db.putTimer.UpdateSince(start) }(time.Now()) + } return db.db.Put(key, value, nil) } // Delete removes the key from the key-value store. func (db *Database) Delete(key []byte) error { + if nil != db.deleteTimer { + defer func(start time.Time) { db.deleteTimer.UpdateSince(start) }(time.Now()) + } return db.db.Delete(key, nil) } @@ -208,16 +234,20 @@ func (db *Database) Delete(key []byte) error { // database until a final write is called. func (db *Database) NewBatch() ethdb.Batch { return &batch{ - db: db.db, - b: new(leveldb.Batch), + db: db.db, + b: new(leveldb.Batch), + writeTimer: &db.batchWriteTimer, + itemCounter: &db.batchItemCounter, } } // NewBatchWithSize creates a write-only database batch with pre-allocated buffer. func (db *Database) NewBatchWithSize(size int) ethdb.Batch { return &batch{ - db: db.db, - b: leveldb.MakeBatch(size), + db: db.db, + b: leveldb.MakeBatch(size), + writeTimer: &db.batchWriteTimer, + itemCounter: &db.batchItemCounter, } } @@ -468,9 +498,11 @@ func (db *Database) meter(refresh time.Duration) { // batch is a write-only leveldb batch that commits changes to its host database // when Write is called. A batch cannot be used concurrently. type batch struct { - db *leveldb.DB - b *leveldb.Batch - size int + db *leveldb.DB + b *leveldb.Batch + size int + writeTimer *metrics.Timer + itemCounter *metrics.Counter } // Put inserts the given value into the batch for later committing. @@ -494,6 +526,12 @@ func (b *batch) ValueSize() int { // Write flushes any accumulated data to disk. func (b *batch) Write() error { + if nil != *b.writeTimer { + defer func(start time.Time) { (*b.writeTimer).UpdateSince(start) }(time.Now()) + } + if nil != *b.itemCounter { + (*b.itemCounter).Inc(int64(b.size)) + } return b.db.Write(b.b, nil) } diff --git a/go.mod b/go.mod index 0be358207..97dea73a8 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/rs/cors v1.7.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.7.0 github.com/supranational/blst v0.3.8 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/thoas/go-funk v0.9.2 @@ -120,7 +120,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/stretchr/objx v0.4.0 // indirect + github.com/stretchr/objx v0.3.0 // indirect github.com/tklauser/numcpus v0.2.2 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect diff --git a/go.sum b/go.sum index a1a93a6fb..85f989d51 100644 --- a/go.sum +++ b/go.sum @@ -537,17 +537,15 @@ github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZL github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/supranational/blst v0.3.8 h1:glwLF4oBRSJOTr05lRBgNwGQST0ndP2wg29fSeTRKCY= github.com/supranational/blst v0.3.8/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= diff --git a/metrics/prometheus/collector.go b/metrics/prometheus/collector.go index e8d5e4f5d..76bece915 100644 --- a/metrics/prometheus/collector.go +++ b/metrics/prometheus/collector.go @@ -82,6 +82,9 @@ func (c *collector) addTimer(name string, m metrics.Timer) { c.writeSummaryPercentile(name, strconv.FormatFloat(pv[i], 'f', -1, 64), ps[i]) } c.buff.WriteRune('\n') + + c.buff.WriteString(fmt.Sprintf(typeGaugeTpl, mutateKey(name+"_total"))) + c.buff.WriteString(fmt.Sprintf(keyValueTpl, mutateKey(name+"_total"), m.Total())) } func (c *collector) addResettingTimer(name string, m metrics.ResettingTimer) { diff --git a/metrics/prometheus/collector_test.go b/metrics/prometheus/collector_test.go index 43f2f804d..194ff5541 100644 --- a/metrics/prometheus/collector_test.go +++ b/metrics/prometheus/collector_test.go @@ -92,6 +92,9 @@ test_timer {quantile="0.99"} 1.2e+08 test_timer {quantile="0.999"} 1.2e+08 test_timer {quantile="0.9999"} 1.2e+08 +# TYPE test_timer_total gauge +test_timer_total 230000000 + # TYPE test_resetting_timer_count counter test_resetting_timer_count 6 diff --git a/metrics/timer.go b/metrics/timer.go index a63c9dfb6..3d68c3135 100644 --- a/metrics/timer.go +++ b/metrics/timer.go @@ -25,6 +25,7 @@ type Timer interface { Update(time.Duration) UpdateSince(time.Time) Variance() float64 + Total() int64 } // GetOrRegisterTimer returns an existing Timer or constructs and registers a @@ -47,6 +48,7 @@ func NewCustomTimer(h Histogram, m Meter) Timer { return &StandardTimer{ histogram: h, meter: m, + total: NewCounter(), } } @@ -72,6 +74,7 @@ func NewTimer() Timer { return &StandardTimer{ histogram: NewHistogram(NewExpDecaySample(1028, 0.015)), meter: NewMeter(), + total: NewCounter(), } } @@ -134,11 +137,15 @@ func (NilTimer) UpdateSince(time.Time) {} // Variance is a no-op. func (NilTimer) Variance() float64 { return 0.0 } +// Total is a no-op. +func (NilTimer) Total() int64 { return int64(0) } + // StandardTimer is the standard implementation of a Timer and uses a Histogram // and Meter. type StandardTimer struct { histogram Histogram meter Meter + total Counter mutex sync.Mutex } @@ -200,6 +207,7 @@ func (t *StandardTimer) Snapshot() Timer { return &TimerSnapshot{ histogram: t.histogram.Snapshot().(*HistogramSnapshot), meter: t.meter.Snapshot().(*MeterSnapshot), + total: t.total.Snapshot().(CounterSnapshot), } } @@ -231,14 +239,12 @@ func (t *StandardTimer) Update(d time.Duration) { defer t.mutex.Unlock() t.histogram.Update(int64(d)) t.meter.Mark(1) + t.total.Inc(int64(d)) } // Record the duration of an event that started at a time and ends now. func (t *StandardTimer) UpdateSince(ts time.Time) { - t.mutex.Lock() - defer t.mutex.Unlock() - t.histogram.Update(int64(time.Since(ts))) - t.meter.Mark(1) + t.Update(time.Since(ts)) } // Variance returns the variance of the values in the sample. @@ -246,10 +252,18 @@ func (t *StandardTimer) Variance() float64 { return t.histogram.Variance() } +// Total returns the total time the timer has run in nanoseconds. +// This differs from Sum in that it is a simple counter, not based +// on a histogram Sample. +func (t *StandardTimer) Total() int64 { + return t.total.Count() +} + // TimerSnapshot is a read-only copy of another Timer. type TimerSnapshot struct { histogram *HistogramSnapshot meter *MeterSnapshot + total CounterSnapshot } // Count returns the number of events recorded at the time the snapshot was @@ -324,3 +338,6 @@ func (*TimerSnapshot) UpdateSince(time.Time) { // Variance returns the variance of the values at the time the snapshot was // taken. func (t *TimerSnapshot) Variance() float64 { return t.histogram.Variance() } + +// Total returns the total time the timer has run in nanoseconds. +func (t *TimerSnapshot) Total() int64 { return t.total.Count() } diff --git a/params/config.go b/params/config.go index 80e671f9b..b56a3a606 100644 --- a/params/config.go +++ b/params/config.go @@ -270,16 +270,16 @@ var ( // // This configuration is intentionally not using keyed fields to force anyone // adding flags to the config to also have to set these fields. - AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil} + AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil} // AllCliqueProtocolChanges contains every protocol change (EIPs) introduced // and accepted by the Ethereum core developers into the Clique consensus. // // This configuration is intentionally not using keyed fields to force anyone // adding flags to the config to also have to set these fields. - AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}} + AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}} - TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil} + TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil} TestRules = TestChainConfig.Rules(new(big.Int), false) ) @@ -377,6 +377,9 @@ type ChainConfig struct { // the network that triggers the consensus upgrade. TerminalTotalDifficulty *big.Int `json:"terminalTotalDifficulty,omitempty"` + // Cap the maximum total difficulty (for testnet use only). + CappedMaximumDifficulty *big.Int `json:"cappedMaximumDifficulty,omitempty"` + // TerminalTotalDifficultyPassed is a flag specifying that the network already // passed the terminal total difficulty. Its purpose is to disable legacy sync // even without having seen the TTD locally (safer long term). @@ -419,7 +422,11 @@ func (c *ChainConfig) String() string { switch { case c.Ethash != nil: if c.TerminalTotalDifficulty == nil { - banner += "Consensus: Ethash (proof-of-work)\n" + if nil == c.CappedMaximumDifficulty { + banner += "Consensus: Ethash (proof-of-work)\n" + } else { + banner += fmt.Sprintf("Consensus: Ethash (proof-of-work, capped difficulty at %d)\n", c.CappedMaximumDifficulty) + } } else if !c.TerminalTotalDifficultyPassed { banner += "Consensus: Beacon (proof-of-stake), merging from Ethash (proof-of-work)\n" } else { @@ -434,7 +441,11 @@ func (c *ChainConfig) String() string { banner += "Consensus: Beacon (proof-of-stake), merged from Clique (proof-of-authority)\n" } default: - banner += "Consensus: unknown\n" + if nil == c.CappedMaximumDifficulty { + banner += "Consensus: unknown\n" + } else { + banner += fmt.Sprintf("Consensus: unknown (capped difficulty at %d)\n", c.CappedMaximumDifficulty) + } } banner += "\n" diff --git a/statediff/builder.go b/statediff/builder.go index 8fdbd5b37..9b79b1dee 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -22,6 +22,7 @@ package statediff import ( "bytes" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -29,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/trie_helpers" @@ -45,7 +47,7 @@ var ( // Builder interface exposes the method for building a state diff between two blocks type Builder interface { BuildStateDiffObject(args Args, params Params) (types2.StateObject, error) - WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error + WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error } type StateDiffBuilder struct { @@ -84,11 +86,10 @@ func NewBuilder(stateCache state.Database) Builder { // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (types2.StateObject, error) { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStateDiffObjectTimer) var stateNodes []types2.StateLeafNode var iplds []types2.IPLD - err := sdb.WriteStateDiffObject( - types2.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}, - params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds)) + err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds)) if err != nil { return types2.StateObject{}, err } @@ -101,8 +102,9 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (typ } // WriteStateDiffObject writes a statediff object to output sinks -func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, +func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.WriteStateDiffObjectTimer) // Load tries for old and new states oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot) if err != nil { @@ -128,16 +130,19 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params }, } - return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput) + logger := log.New("hash", args.BlockHash.Hex(), "number", args.BlockNumber) + return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput, logger) } func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params, - output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { + output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { + logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes") + defer metrics2.ReportAndUpdateDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer) // collect a slice of all the nodes that were touched and exist at B (B-A) // a map of their leafkey to all the accounts that were touched and exist at B // and a slice of all the paths for the nodes in both of the above sets diffAccountsAtB, err := sdb.createdAndUpdatedState( - iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput) + iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger) if err != nil { return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err) } @@ -146,28 +151,34 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs // a map of their leafkey to all the accounts that were touched and exist at A diffAccountsAtA, err := sdb.deletedOrUpdatedState( iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB, - params.watchedAddressesLeafPaths, output) + params.watchedAddressesLeafPaths, output, logger) if err != nil { return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err) } // collect and sort the leafkey keys for both account mappings into a slice + t := time.Now() createKeys := trie_helpers.SortKeys(diffAccountsAtB) deleteKeys := trie_helpers.SortKeys(diffAccountsAtA) + logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes sort duration=%dms", time.Since(t).Milliseconds())) // and then find the intersection of these keys // these are the leafkeys for the accounts which exist at both A and B but are different // this also mutates the passed in createKeys and deleteKeys, removing the intersection keys // and leaving the truly created or deleted keys in place + t = time.Now() updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys) + logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes intersection count=%d duration=%dms", + len(updatedKeys), + time.Since(t).Milliseconds())) // build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two - err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput) + err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger) if err != nil { return fmt.Errorf("error building diff for updated accounts: %v", err) } // build the diff nodes for created accounts - err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput) + err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger) if err != nil { return fmt.Errorf("error building diff for created accounts: %v", err) } @@ -179,11 +190,13 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs // a mapping of their leafkeys to all the accounts that exist in a different state at B than A // and a slice of the paths for all of the nodes included in both func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, - watchedAddressesLeafPaths [][]byte, output types2.IPLDSink) (types2.AccountMap, error) { + watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger) (types2.AccountMap, error) { + logger.Debug("statediff BEGIN createdAndUpdatedState") + defer metrics2.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateTimer) diffAccountsAtB := make(types2.AccountMap) watchingAddresses := len(watchedAddressesLeafPaths) > 0 - it, _ := trie.NewDifferenceIterator(a, b) + it, itCount := trie.NewDifferenceIterator(a, b) for it.Next(true) { // ignore node if it is not along paths of interest if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, it.Path()) { @@ -236,6 +249,8 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, } } } + logger.Debug("statediff COUNTS createdAndUpdatedStateWithIntermediateNodes", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB)) + metrics2.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) return diffAccountsAtB, it.Error() } @@ -275,7 +290,9 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watched // deletedOrUpdatedState returns a slice of all the pathes that are emptied at B // and a mapping of their leafkeys to all the accounts that exist in a different state at A than B func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap, - watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink) (types2.AccountMap, error) { + watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger) (types2.AccountMap, error) { + logger.Debug("statediff BEGIN deletedOrUpdatedState") + defer metrics2.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer) diffAccountAtA := make(types2.AccountMap) watchingAddresses := len(watchedAddressesLeafPaths) > 0 @@ -330,7 +347,9 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA // needs to be called before building account creations and deletions as this mutates // those account maps to remove the accounts which were updated func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.AccountMap, updatedKeys []string, - output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { + output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { + logger.Debug("statediff BEGIN buildAccountUpdates", "creations", len(creations), "deletions", len(deletions), "updatedKeys", len(updatedKeys)) + defer metrics2.ReportAndUpdateDuration("statediff END buildAccountUpdates ", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountUpdatesTimer) var err error for _, key := range updatedKeys { createdAcc := creations[key] @@ -361,7 +380,10 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.Acc // buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A // it also returns the code and codehash for created contract accounts -func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { +func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, + ipldOutput types2.IPLDSink, logger log.Logger) error { + logger.Debug("statediff BEGIN buildAccountCreations") + defer metrics2.ReportAndUpdateDuration("statediff END buildAccountCreations", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountCreationsTimer) for _, val := range accounts { diff := types2.StateLeafNode{ AccountWrapper: val, @@ -400,6 +422,7 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, o // i.e. it returns all the storage nodes at this state, since there is no previous state func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output types2.StorageNodeSink, ipldOutput types2.IPLDSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesEventualTimer) if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { return nil } @@ -421,6 +444,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output ty // including intermediate nodes can be turned on or off func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink, ipldOutput types2.IPLDSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesFromTrieTimer) for it.Next(true) { if it.Leaf() { storageLeafNode, err := sdb.processStorageValueNode(it) @@ -470,6 +494,7 @@ func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator) (type // buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output types2.StorageNodeSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedAccountStorageNodesTimer) if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { return nil } @@ -489,6 +514,7 @@ func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, out // buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedStorageNodesFromTrieTimer) for it.Next(true) { if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes leafKey := make([]byte, len(it.LeafKey())) @@ -509,6 +535,7 @@ func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterat // buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, output types2.StorageNodeSink, ipldOutput types2.IPLDSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesIncrementalTimer) if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) { return nil } @@ -537,6 +564,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output types2.StorageNodeSink, ipldOutput types2.IPLDSink) (map[string]bool, error) { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.CreatedAndUpdatedStorageTimer) diffSlotsAtB := make(map[string]bool) it, _ := trie.NewDifferenceIterator(a, b) for it.Next(true) { @@ -566,6 +594,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, ou } func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, output types2.StorageNodeSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.DeletedOrUpdatedStorageTimer) it, _ := trie.NewDifferenceIterator(b, a) for it.Next(true) { if it.Leaf() { @@ -602,6 +631,7 @@ func isValidPrefixPath(watchedAddressesLeafPaths [][]byte, currentPath []byte) b // isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch func isWatchedAddress(watchedAddressesLeafPaths [][]byte, valueNodePath []byte) bool { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.IsWatchedAddressTimer) for _, watchedAddressPath := range watchedAddressesLeafPaths { if bytes.Equal(watchedAddressPath, valueNodePath) { return true diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index d0a0fddce..307e95b9c 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -29,9 +29,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -41,10 +41,6 @@ import ( var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { dump io.WriteCloser @@ -106,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip close(self.quit) close(self.iplds) tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() if err := self.flush(); err != nil { @@ -115,7 +111,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return err } tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) log.Debug(traceMsg) @@ -125,7 +121,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip go blockTx.cache() tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() @@ -137,7 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles @@ -146,7 +142,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs @@ -163,7 +159,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/dump/metrics.go b/statediff/indexer/database/dump/metrics.go deleted file mode 100644 index 700e42dc0..000000000 --- a/statediff/indexer/database/dump/metrics.go +++ /dev/null @@ -1,94 +0,0 @@ -// VulcanizeDB -// Copyright © 2021 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dump - -import ( - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter - // The total number of processed transactions - transactions metrics.Counter - // The total number of processed receipts - receipts metrics.Counter - // The total number of processed logs - logs metrics.Counter - // The total number of access list entries processed - accessListEntries metrics.Counter - // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer - // Postgres transaction commit duration - tPostgresCommit metrics.Timer - // Header processing time - tHeaderProcessing metrics.Timer - // Uncle processing time - tUncleProcessing metrics.Timer - // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer - // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer -} - -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), - } - subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) - return ctx -} diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index 0261735a6..23e92296a 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -28,6 +28,7 @@ import ( "github.com/thoas/go-funk" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" @@ -235,7 +236,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase) csw.rows <- tableRow{schema.TableHeader, values} - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) } func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) { @@ -250,7 +251,7 @@ func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) { values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.Type, transaction.Value) csw.rows <- tableRow{schema.TableTransaction, values} - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) } func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { @@ -258,7 +259,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, rct.PostStatus) csw.rows <- tableRow{schema.TableReceipt, values} - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) } func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { @@ -267,7 +268,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3) csw.rows <- tableRow{schema.TableLog, values} - indexerMetrics.logs.Inc(1) + metrics.IndexerMetrics.LogsCounter.Inc(1) } } @@ -284,13 +285,8 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { } func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { - var storageKey string - if storageCID.StorageKey != nullHash.String() { - storageKey = storageCID.StorageKey - } - var values []interface{} - values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageKey, storageCID.CID, + values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, true, storageCID.Value, storageCID.Removed) csw.rows <- tableRow{schema.TableStorageNode, values} } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index cc1515b8b..ae9d8694a 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -34,9 +34,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -53,10 +53,6 @@ const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { fileWriter FileWriter @@ -172,12 +168,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip BlockNumber: block.Number().String(), submit: func(self *BatchTx, err error) error { tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() sdi.fileWriter.Flush() tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) log.Debug(traceMsg) @@ -185,21 +181,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip }, } tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() // write header, collect headerID headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty) tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // write uncles sdi.processUncles(headerID, block.Number(), block.UncleHash(), block.Uncles()) tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() @@ -217,7 +213,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/file/metrics.go b/statediff/indexer/database/file/metrics.go deleted file mode 100644 index ca6e88f2b..000000000 --- a/statediff/indexer/database/file/metrics.go +++ /dev/null @@ -1,94 +0,0 @@ -// VulcanizeDB -// Copyright © 2021 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package file - -import ( - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter - // The total number of processed transactions - transactions metrics.Counter - // The total number of processed receipts - receipts metrics.Counter - // The total number of processed logs - logs metrics.Counter - // The total number of access list entries processed - accessListEntries metrics.Counter - // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer - // Postgres transaction commit duration - tPostgresCommit metrics.Timer - // Header processing time - tHeaderProcessing metrics.Timer - // Uncle processing time - tUncleProcessing metrics.Timer - // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer - // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer -} - -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), - } - subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) - return ctx -} diff --git a/statediff/indexer/database/file/sql_writer.go b/statediff/indexer/database/file/sql_writer.go index c79ed843e..1e0acb21f 100644 --- a/statediff/indexer/database/file/sql_writer.go +++ b/statediff/indexer/database/file/sql_writer.go @@ -28,6 +28,7 @@ import ( "github.com/thoas/go-funk" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" @@ -35,7 +36,6 @@ import ( ) var ( - nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") pipeSize = 65336 // min(linuxPipeSize, macOSPipeSize) writeBufferSize = pipeSize * 16 * 96 ) @@ -191,7 +191,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase) sqw.stmts <- []byte(stmt) - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) } func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { @@ -202,20 +202,20 @@ func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) { sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.Type, transaction.Value)) - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) } func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, rct.PostStatus)) - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) } func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { for _, l := range logs { sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3)) - indexerMetrics.logs.Inc(1) + metrics.IndexerMetrics.LogsCounter.Inc(1) } } diff --git a/statediff/indexer/database/metrics/metrics.go b/statediff/indexer/database/metrics/metrics.go new file mode 100644 index 000000000..34cb3a69b --- /dev/null +++ b/statediff/indexer/database/metrics/metrics.go @@ -0,0 +1,263 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package metrics + +import ( + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + namespace = "statediff" +) + +var ( + IndexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) + DBMetrics = RegisterDBMetrics(metrics.DefaultRegistry) +) + +// Build a fully qualified metric name +func metricName(subsystem, name string) string { + if name == "" { + return "" + } + parts := []string{namespace, name} + if subsystem != "" { + parts = []string{namespace, subsystem, name} + } + // Prometheus uses _ but geth metrics uses / and replaces + return strings.Join(parts, "/") +} + +type IndexerMetricsHandles struct { + // The total number of processed BlocksCounter + BlocksCounter metrics.Counter + // The total number of processed transactions + TransactionsCounter metrics.Counter + // The total number of processed receipts + ReceiptsCounter metrics.Counter + // The total number of processed logs + LogsCounter metrics.Counter + // The total number of access list entries processed + AccessListEntriesCounter metrics.Counter + // Time spent waiting for free postgres tx + FreePostgresTimer metrics.Timer + // Postgres transaction commit duration + PostgresCommitTimer metrics.Timer + // Header processing time + HeaderProcessingTimer metrics.Timer + // Uncle processing time + UncleProcessingTimer metrics.Timer + // Tx and receipt processing time + TxAndRecProcessingTimer metrics.Timer + // State, storage, and code combined processing time + StateStoreCodeProcessingTimer metrics.Timer + + // Fine-grained code timers + BuildStateDiffWithIntermediateStateNodesTimer metrics.Timer + BuildStateDiffWithoutIntermediateStateNodesTimer metrics.Timer + CreatedAndUpdatedStateWithIntermediateNodesTimer metrics.Timer + DeletedOrUpdatedStateTimer metrics.Timer + BuildAccountUpdatesTimer metrics.Timer + BuildAccountCreationsTimer metrics.Timer + ResolveNodeTimer metrics.Timer + SortKeysTimer metrics.Timer + FindIntersectionTimer metrics.Timer + OutputTimer metrics.Timer + IPLDOutputTimer metrics.Timer + DifferenceIteratorNextTimer metrics.Timer + DifferenceIteratorCounter metrics.Counter + DeletedOrUpdatedStorageTimer metrics.Timer + CreatedAndUpdatedStorageTimer metrics.Timer + BuildStorageNodesIncrementalTimer metrics.Timer + BuildStateTrieObjectTimer metrics.Timer + BuildStateTrieTimer metrics.Timer + BuildStateDiffObjectTimer metrics.Timer + WriteStateDiffObjectTimer metrics.Timer + CreatedAndUpdatedStateTimer metrics.Timer + BuildStorageNodesEventualTimer metrics.Timer + BuildStorageNodesFromTrieTimer metrics.Timer + BuildRemovedAccountStorageNodesTimer metrics.Timer + BuildRemovedStorageNodesFromTrieTimer metrics.Timer + IsWatchedAddressTimer metrics.Timer +} + +func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { + ctx := IndexerMetricsHandles{ + BlocksCounter: metrics.NewCounter(), + TransactionsCounter: metrics.NewCounter(), + ReceiptsCounter: metrics.NewCounter(), + LogsCounter: metrics.NewCounter(), + AccessListEntriesCounter: metrics.NewCounter(), + FreePostgresTimer: metrics.NewTimer(), + PostgresCommitTimer: metrics.NewTimer(), + HeaderProcessingTimer: metrics.NewTimer(), + UncleProcessingTimer: metrics.NewTimer(), + TxAndRecProcessingTimer: metrics.NewTimer(), + StateStoreCodeProcessingTimer: metrics.NewTimer(), + BuildStateDiffWithIntermediateStateNodesTimer: metrics.NewTimer(), + BuildStateDiffWithoutIntermediateStateNodesTimer: metrics.NewTimer(), + CreatedAndUpdatedStateWithIntermediateNodesTimer: metrics.NewTimer(), + DeletedOrUpdatedStateTimer: metrics.NewTimer(), + BuildAccountUpdatesTimer: metrics.NewTimer(), + BuildAccountCreationsTimer: metrics.NewTimer(), + ResolveNodeTimer: metrics.NewTimer(), + SortKeysTimer: metrics.NewTimer(), + FindIntersectionTimer: metrics.NewTimer(), + OutputTimer: metrics.NewTimer(), + IPLDOutputTimer: metrics.NewTimer(), + DifferenceIteratorNextTimer: metrics.NewTimer(), + DifferenceIteratorCounter: metrics.NewCounter(), + DeletedOrUpdatedStorageTimer: metrics.NewTimer(), + CreatedAndUpdatedStorageTimer: metrics.NewTimer(), + BuildStorageNodesIncrementalTimer: metrics.NewTimer(), + BuildStateTrieObjectTimer: metrics.NewTimer(), + BuildStateTrieTimer: metrics.NewTimer(), + BuildStateDiffObjectTimer: metrics.NewTimer(), + WriteStateDiffObjectTimer: metrics.NewTimer(), + CreatedAndUpdatedStateTimer: metrics.NewTimer(), + BuildStorageNodesEventualTimer: metrics.NewTimer(), + BuildStorageNodesFromTrieTimer: metrics.NewTimer(), + BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(), + BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(), + IsWatchedAddressTimer: metrics.NewTimer(), + } + subsys := "indexer" + reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter) + reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter) + reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter) + reg.Register(metricName(subsys, "logs"), ctx.LogsCounter) + reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter) + reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer) + reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer) + reg.Register(metricName(subsys, "t_header_processing"), ctx.HeaderProcessingTimer) + reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer) + reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer) + reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer) + reg.Register(metricName(subsys, "t_build_statediff_with_intermediate_state_nodes"), ctx.BuildStateDiffWithIntermediateStateNodesTimer) + reg.Register(metricName(subsys, "t_build_statediff_without_intermediate_state_nodes"), ctx.BuildStateDiffWithoutIntermediateStateNodesTimer) + reg.Register(metricName(subsys, "t_created_and_update_state_with_intermediate_nodes"), ctx.CreatedAndUpdatedStateWithIntermediateNodesTimer) + reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer) + reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer) + reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer) + reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer) + reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer) + reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer) + reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer) + reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer) + reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer) + reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter) + reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer) + reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer) + reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer) + reg.Register(metricName(subsys, "t_build_state_trie_object"), ctx.BuildStateTrieObjectTimer) + reg.Register(metricName(subsys, "t_build_state_trie"), ctx.BuildStateTrieTimer) + reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer) + reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer) + reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer) + reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer) + reg.Register(metricName(subsys, "t_build_storage_nodes_from_trie"), ctx.BuildStorageNodesFromTrieTimer) + reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer) + reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer) + reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer) + + log.Debug("Registering statediff indexer metrics.") + return ctx +} + +type dbMetricsHandles struct { + // Maximum number of open connections to the sql + maxOpen metrics.Gauge + // The number of established connections both in use and idle + open metrics.Gauge + // The number of connections currently in use + inUse metrics.Gauge + // The number of idle connections + idle metrics.Gauge + // The total number of connections waited for + waitedFor metrics.Counter + // The total time blocked waiting for a new connection + blockedMilliseconds metrics.Counter + // The total number of connections closed due to SetMaxIdleConns + closedMaxIdle metrics.Counter + // The total number of connections closed due to SetConnMaxLifetime + closedMaxLifetime metrics.Counter +} + +func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles { + ctx := dbMetricsHandles{ + maxOpen: metrics.NewGauge(), + open: metrics.NewGauge(), + inUse: metrics.NewGauge(), + idle: metrics.NewGauge(), + waitedFor: metrics.NewCounter(), + blockedMilliseconds: metrics.NewCounter(), + closedMaxIdle: metrics.NewCounter(), + closedMaxLifetime: metrics.NewCounter(), + } + subsys := "connections" + reg.Register(metricName(subsys, "max_open"), ctx.maxOpen) + reg.Register(metricName(subsys, "open"), ctx.open) + reg.Register(metricName(subsys, "in_use"), ctx.inUse) + reg.Register(metricName(subsys, "idle"), ctx.idle) + reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor) + reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds) + reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle) + reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime) + + log.Debug("Registering statediff DB metrics.") + return ctx +} + +// DbStats interface to accommodate different concrete sql stats types +type DbStats interface { + MaxOpen() int64 + Open() int64 + InUse() int64 + Idle() int64 + WaitCount() int64 + WaitDuration() time.Duration + MaxIdleClosed() int64 + MaxLifetimeClosed() int64 +} + +func (met *dbMetricsHandles) Update(stats DbStats) { + met.maxOpen.Update(stats.MaxOpen()) + met.open.Update(stats.Open()) + met.inUse.Update(stats.InUse()) + met.idle.Update(stats.Idle()) + met.waitedFor.Inc(stats.WaitCount()) + met.blockedMilliseconds.Inc(stats.WaitDuration().Milliseconds()) + met.closedMaxIdle.Inc(stats.MaxIdleClosed()) + met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed()) +} + +func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) { + since := UpdateDuration(start, timer) + logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds())) +} + +func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration { + since := time.Since(start) + timer.Update(since) + return since +} diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 2960c90c5..a41e1631a 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -35,6 +35,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -44,11 +45,6 @@ import ( var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) - dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql type StateDiffIndexer struct { ctx context.Context @@ -75,7 +71,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo for { select { case <-ticker.C: - dbMetrics.Update(sdi.dbWriter.db.Stats()) + metrics2.DBMetrics.Update(sdi.dbWriter.db.Stats()) case <-quit: ticker.Stop() return @@ -156,7 +152,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip rollback(sdi.ctx, tx) } else { tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() if err := self.flush(); err != nil { @@ -167,7 +163,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } err = tx.Commit(sdi.ctx) tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) } traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) @@ -178,7 +174,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip go blockTx.cache() tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() @@ -190,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles @@ -199,7 +195,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs @@ -216,7 +212,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 3fee858d6..f964a2a90 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -19,7 +19,8 @@ package sql import ( "context" "io" - "time" + + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" ) // Database interfaces required by the sql indexer @@ -30,12 +31,13 @@ type Database interface { // Driver interface has all the methods required by a driver implementation to support the sql indexer type Driver interface { + UseCopyFrom() bool QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error Begin(ctx context.Context) (Tx, error) - Stats() Stats + Stats() metrics.DbStats NodeID() string Context() context.Context io.Closer @@ -52,12 +54,25 @@ type Statements interface { InsertStorageStm() string InsertIPLDStm() string InsertIPLDsStm() string + + // Table/column descriptions for use with CopyFrom and similar commands. + LogTableName() []string + LogColumnNames() []string + RctTableName() []string + RctColumnNames() []string + StateTableName() []string + StateColumnNames() []string + StorageTableName() []string + StorageColumnNames() []string + TxTableName() []string + TxColumnNames() []string } // Tx interface to accommodate different concrete SQL transaction types type Tx interface { QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) + CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) Commit(ctx context.Context) error Rollback(ctx context.Context) error } @@ -71,15 +86,3 @@ type ScannableRow interface { type Result interface { RowsAffected() (int64, error) } - -// Stats interface to accommodate different concrete sql stats types -type Stats interface { - MaxOpen() int64 - Open() int64 - InUse() int64 - Idle() int64 - WaitCount() int64 - WaitDuration() time.Duration - MaxIdleClosed() int64 - MaxLifetimeClosed() int64 -} diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index 922bf84a0..b2445e0d8 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -2,10 +2,16 @@ package sql import ( "context" + "reflect" + + "github.com/ethereum/go-ethereum/log" ) +// Changing this to 1 would make sure only sequential COPYs were combined. +const copyFromCheckLimit = 100 + type DelayedTx struct { - cache []cachedStmt + cache []interface{} db Database } type cachedStmt struct { @@ -13,6 +19,20 @@ type cachedStmt struct { args []interface{} } +type copyFrom struct { + tableName []string + columnNames []string + rows [][]interface{} +} + +func (cf *copyFrom) appendRows(rows [][]interface{}) { + cf.rows = append(cf.rows, rows...) +} + +func (cf *copyFrom) matches(tableName []string, columnNames []string) bool { + return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames) +} + func NewDelayedTx(db Database) *DelayedTx { return &DelayedTx{db: db} } @@ -21,6 +41,28 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface return tx.db.QueryRow(ctx, sql, args...) } +func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) { + for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 { + prevCopy, ok := tx.cache[pos].(*copyFrom) + if ok && prevCopy.matches(tableName, columnNames) { + return prevCopy, count + } + } + return nil, -1 +} + +func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + if prevCopy, distance := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy { + log.Trace("statediff lazy_tx : Appending to COPY", "table", tableName, + "current", len(prevCopy.rows), "new", len(rows), "distance", distance) + prevCopy.appendRows(rows) + } else { + tx.cache = append(tx.cache, ©From{tableName, columnNames, rows}) + } + + return 0, nil +} + func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { tx.cache = append(tx.cache, cachedStmt{sql, args}) return nil, nil @@ -39,10 +81,19 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { rollback(ctx, base) } }() - for _, stmt := range tx.cache { - _, err := base.Exec(ctx, stmt.sql, stmt.args...) - if err != nil { - return err + for _, item := range tx.cache { + switch item := item.(type) { + case *copyFrom: + _, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) + if err != nil { + log.Error("COPY error", "table", item.tableName, "err", err) + return err + } + case cachedStmt: + _, err := base.Exec(ctx, item.sql, item.args...) + if err != nil { + return err + } } } tx.cache = nil diff --git a/statediff/indexer/database/sql/metrics.go b/statediff/indexer/database/sql/metrics.go deleted file mode 100644 index b0946a722..000000000 --- a/statediff/indexer/database/sql/metrics.go +++ /dev/null @@ -1,147 +0,0 @@ -// VulcanizeDB -// Copyright © 2021 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package sql - -import ( - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter - // The total number of processed transactions - transactions metrics.Counter - // The total number of processed receipts - receipts metrics.Counter - // The total number of processed logs - logs metrics.Counter - // The total number of access list entries processed - accessListEntries metrics.Counter - // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer - // Postgres transaction commit duration - tPostgresCommit metrics.Timer - // Header processing time - tHeaderProcessing metrics.Timer - // Uncle processing time - tUncleProcessing metrics.Timer - // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer - // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer -} - -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), - } - subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) - return ctx -} - -type dbMetricsHandles struct { - // Maximum number of open connections to the sql - maxOpen metrics.Gauge - // The number of established connections both in use and idle - open metrics.Gauge - // The number of connections currently in use - inUse metrics.Gauge - // The number of idle connections - idle metrics.Gauge - // The total number of connections waited for - waitedFor metrics.Counter - // The total time blocked waiting for a new connection - blockedMilliseconds metrics.Counter - // The total number of connections closed due to SetMaxIdleConns - closedMaxIdle metrics.Counter - // The total number of connections closed due to SetConnMaxLifetime - closedMaxLifetime metrics.Counter -} - -func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles { - ctx := dbMetricsHandles{ - maxOpen: metrics.NewGauge(), - open: metrics.NewGauge(), - inUse: metrics.NewGauge(), - idle: metrics.NewGauge(), - waitedFor: metrics.NewCounter(), - blockedMilliseconds: metrics.NewCounter(), - closedMaxIdle: metrics.NewCounter(), - closedMaxLifetime: metrics.NewCounter(), - } - subsys := "connections" - reg.Register(metricName(subsys, "max_open"), ctx.maxOpen) - reg.Register(metricName(subsys, "open"), ctx.open) - reg.Register(metricName(subsys, "in_use"), ctx.inUse) - reg.Register(metricName(subsys, "idle"), ctx.idle) - reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor) - reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds) - reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle) - reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime) - return ctx -} - -func (met *dbMetricsHandles) Update(stats Stats) { - met.maxOpen.Update(stats.MaxOpen()) - met.open.Update(stats.Open()) - met.inUse.Update(stats.InUse()) - met.idle.Update(stats.Idle()) - met.waitedFor.Inc(stats.WaitCount()) - met.blockedMilliseconds.Inc(stats.WaitDuration().Milliseconds()) - met.closedMaxIdle.Inc(stats.MaxIdleClosed()) - met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed()) -} diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go index 292548b75..80094a8d0 100644 --- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -28,7 +28,7 @@ import ( ) func setupLegacyPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB() + db, err = postgres.SetupPGXDB(postgres.DefaultConfig) if err != nil { t.Fatal(err) } diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index 1dbf2dfa0..c0ce57c1f 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -29,8 +29,8 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/test" ) -func setupPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB() +func setupPGXIndexer(t *testing.T, config postgres.Config) { + db, err = postgres.SetupPGXDB(config) if err != nil { t.Fatal(err) } @@ -39,12 +39,16 @@ func setupPGXIndexer(t *testing.T) { } func setupPGX(t *testing.T) { - setupPGXIndexer(t) + setupPGXWithConfig(t, postgres.DefaultConfig) +} + +func setupPGXWithConfig(t *testing.T, config postgres.Config) { + setupPGXIndexer(t, config) test.SetupTestData(t, ind) } func setupPGXNonCanonical(t *testing.T) { - setupPGXIndexer(t) + setupPGXIndexer(t, postgres.DefaultConfig) test.SetupTestDataNonCanonical(t, ind) } @@ -97,6 +101,20 @@ func TestPGXIndexer(t *testing.T) { test.TestPublishAndIndexStorageIPLDs(t, db) }) + + t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) { + config := postgres.DefaultConfig + config.CopyFrom = true + + setupPGXWithConfig(t, config) + defer tearDown(t) + defer checkTxClosure(t, 1, 0, 1) + + test.TestPublishAndIndexStateIPLDs(t, db) + test.TestPublishAndIndexStorageIPLDs(t, db) + test.TestPublishAndIndexReceiptIPLDs(t, db) + test.TestPublishAndIndexLogIPLDs(t, db) + }) } // Test indexer for a canonical + a non-canonical block at London height + a non-canonical block at London height + 1 @@ -151,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) { } func TestPGXWatchAddressMethods(t *testing.T) { - setupPGXIndexer(t) + setupPGXIndexer(t, postgres.DefaultConfig) defer tearDown(t) defer checkTxClosure(t, 1, 0, 1) diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 2038bf1f5..1cbec55c1 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -92,6 +92,9 @@ type Config struct { // toggle on/off upserts Upsert bool + + // toggle on/off CopyFrom + CopyFrom bool } // Type satisfies interfaces.Config diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index a508da83f..b371a83b2 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -84,3 +84,43 @@ func (db *DB) InsertIPLDStm() string { func (db *DB) InsertIPLDsStm() string { return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING` } + +func (db *DB) LogTableName() []string { + return []string{"eth", "log_cids"} +} + +func (db *DB) LogColumnNames() []string { + return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"} +} + +func (db *DB) RctTableName() []string { + return []string{"eth", "receipt_cids"} +} + +func (db *DB) RctColumnNames() []string { + return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"} +} + +func (db *DB) StateTableName() []string { + return []string{"eth", "state_cids"} +} + +func (db *DB) StateColumnNames() []string { + return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"} +} + +func (db *DB) StorageTableName() []string { + return []string{"eth", "storage_cids"} +} + +func (db *DB) StorageColumnNames() []string { + return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"} +} + +func (db *DB) TxTableName() []string { + return []string{"eth", "transaction_cids"} +} + +func (db *DB) TxColumnNames() []string { + return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"} +} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 073d92744..7825e34bb 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -27,6 +27,7 @@ import ( "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/node" ) @@ -37,6 +38,7 @@ type PGXDriver struct { pool *pgxpool.Pool nodeInfo node.Info nodeID string + config Config } // ConnectPGX initializes and returns a PGX connection pool @@ -55,7 +57,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive if err != nil { return nil, ErrDBConnectionFailed(err) } - pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node} + pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config} nodeErr := pg.createNode() if nodeErr != nil { return &PGXDriver{}, ErrUnableToSetNode(nodeErr) @@ -144,7 +146,7 @@ func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) { return pgxTxWrapper{tx: tx}, nil } -func (pgx *PGXDriver) Stats() sql.Stats { +func (pgx *PGXDriver) Stats() metrics.DbStats { stats := pgx.pool.Stat() return pgxStatsWrapper{stats: stats} } @@ -165,6 +167,11 @@ func (pgx *PGXDriver) Context() context.Context { return pgx.ctx } +// HasCopy satisfies sql.Database +func (pgx *PGXDriver) UseCopyFrom() bool { + return pgx.config.CopyFrom +} + type resultWrapper struct { ct pgconn.CommandTag } @@ -178,43 +185,43 @@ type pgxStatsWrapper struct { stats *pgxpool.Stat } -// MaxOpen satisfies sql.Stats +// MaxOpen satisfies metrics.DbStats func (s pgxStatsWrapper) MaxOpen() int64 { return int64(s.stats.MaxConns()) } -// Open satisfies sql.Stats +// Open satisfies metrics.DbStats func (s pgxStatsWrapper) Open() int64 { return int64(s.stats.TotalConns()) } -// InUse satisfies sql.Stats +// InUse satisfies metrics.DbStats func (s pgxStatsWrapper) InUse() int64 { return int64(s.stats.AcquiredConns()) } -// Idle satisfies sql.Stats +// Idle satisfies metrics.DbStats func (s pgxStatsWrapper) Idle() int64 { return int64(s.stats.IdleConns()) } -// WaitCount satisfies sql.Stats +// WaitCount satisfies metrics.DbStats func (s pgxStatsWrapper) WaitCount() int64 { return s.stats.EmptyAcquireCount() } -// WaitDuration satisfies sql.Stats +// WaitDuration satisfies metrics.DbStats func (s pgxStatsWrapper) WaitDuration() time.Duration { return s.stats.AcquireDuration() } -// MaxIdleClosed satisfies sql.Stats +// MaxIdleClosed satisfies metrics.DbStats func (s pgxStatsWrapper) MaxIdleClosed() int64 { // this stat isn't supported by pgxpool, but we don't want to panic return 0 } -// MaxLifetimeClosed satisfies sql.Stats +// MaxLifetimeClosed satisfies metrics.DbStats func (s pgxStatsWrapper) MaxLifetimeClosed() int64 { return s.stats.CanceledAcquireCount() } @@ -243,3 +250,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error { func (t pgxTxWrapper) Rollback(ctx context.Context) error { return t.tx.Rollback(ctx) } + +func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows)) +} diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index c41d39828..452b4988a 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -19,10 +19,12 @@ package postgres import ( "context" coresql "database/sql" + "errors" "time" "github.com/jmoiron/sqlx" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/node" ) @@ -109,7 +111,7 @@ func (driver *SQLXDriver) Begin(_ context.Context) (sql.Tx, error) { return sqlxTxWrapper{tx: tx}, nil } -func (driver *SQLXDriver) Stats() sql.Stats { +func (driver *SQLXDriver) Stats() metrics.DbStats { stats := driver.db.Stats() return sqlxStatsWrapper{stats: stats} } @@ -129,46 +131,52 @@ func (driver *SQLXDriver) Context() context.Context { return driver.ctx } +// HasCopy satisfies sql.Database +func (driver *SQLXDriver) UseCopyFrom() bool { + // sqlx does not currently support COPY. + return false +} + type sqlxStatsWrapper struct { stats coresql.DBStats } -// MaxOpen satisfies sql.Stats +// MaxOpen satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxOpen() int64 { return int64(s.stats.MaxOpenConnections) } -// Open satisfies sql.Stats +// Open satisfies metrics.DbStats func (s sqlxStatsWrapper) Open() int64 { return int64(s.stats.OpenConnections) } -// InUse satisfies sql.Stats +// InUse satisfies metrics.DbStats func (s sqlxStatsWrapper) InUse() int64 { return int64(s.stats.InUse) } -// Idle satisfies sql.Stats +// Idle satisfies metrics.DbStats func (s sqlxStatsWrapper) Idle() int64 { return int64(s.stats.Idle) } -// WaitCount satisfies sql.Stats +// WaitCount satisfies metrics.DbStats func (s sqlxStatsWrapper) WaitCount() int64 { return s.stats.WaitCount } -// WaitDuration satisfies sql.Stats +// WaitDuration satisfies metrics.DbStats func (s sqlxStatsWrapper) WaitDuration() time.Duration { return s.stats.WaitDuration } -// MaxIdleClosed satisfies sql.Stats +// MaxIdleClosed satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxIdleClosed() int64 { return s.stats.MaxIdleClosed } -// MaxLifetimeClosed satisfies sql.Stats +// MaxLifetimeClosed satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxLifetimeClosed() int64 { return s.stats.MaxLifetimeClosed } @@ -196,3 +204,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error { func (t sqlxTxWrapper) Rollback(ctx context.Context) error { return t.tx.Rollback() } + +func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + return 0, errors.New("Unsupported Operation") +} diff --git a/statediff/indexer/database/sql/postgres/test_helpers.go b/statediff/indexer/database/sql/postgres/test_helpers.go index f8311b413..cb5255429 100644 --- a/statediff/indexer/database/sql/postgres/test_helpers.go +++ b/statediff/indexer/database/sql/postgres/test_helpers.go @@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) { } // SetupPGXDB is used to setup a pgx db for tests -func SetupPGXDB() (sql.Database, error) { - driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{}) +func SetupPGXDB(config Config) (sql.Database, error) { + driver, err := NewPGXDriver(context.Background(), config, node.Info{}) if err != nil { return nil, err } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index bd6fb5b67..faa63cd21 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -18,9 +18,11 @@ package sql import ( "fmt" + "strconv" "github.com/lib/pq" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/models" ) @@ -66,7 +68,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { if err != nil { return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} } - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) return nil } @@ -94,20 +96,39 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING */ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), - transaction.BlockNumber, - transaction.HeaderID, - transaction.TxHash, - transaction.CID, - transaction.Dst, - transaction.Src, - transaction.Index, - transaction.Type, - transaction.Value) - if err != nil { - return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + + value, err := strconv.ParseFloat(transaction.Value, 64) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + + _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), + toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, + transaction.Src, transaction.Index, int(transaction.Type), value))) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), + transaction.BlockNumber, + transaction.HeaderID, + transaction.TxHash, + transaction.CID, + transaction.Dst, + transaction.Src, + transaction.Index, + transaction.Type, + transaction.Value) + if err != nil { + return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} + } } - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) return nil } @@ -116,18 +137,32 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, cid, contract, pos ON CONFLICT (tx_id, header_id, block_number) DO NOTHING */ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), - rct.BlockNumber, - rct.HeaderID, - rct.TxID, - rct.CID, - rct.Contract, - rct.PostState, - rct.PostStatus) - if err != nil { - return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.receipt_cids", err, "COPY", rct} + } + + _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), + toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, + rct.PostState, int(rct.PostStatus)))) + if err != nil { + return insertError{"eth.receipt_cids", err, "COPY", rct} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), + rct.BlockNumber, + rct.HeaderID, + rct.TxID, + rct.CID, + rct.Contract, + rct.PostState, + rct.PostStatus) + if err != nil { + return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} + } } - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) return nil } @@ -136,22 +171,42 @@ INSERT INTO eth.log_cids (block_number, header_id, cid, rct_id, address, index, ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { - for _, log := range logs { - _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), - log.BlockNumber, - log.HeaderID, - log.CID, - log.ReceiptID, - log.Address, - log.Index, - log.Topic0, - log.Topic1, - log.Topic2, - log.Topic3) - if err != nil { - return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} + if w.useCopyForTx(tx) { + var rows [][]interface{} + for _, log := range logs { + blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.log_cids", err, "COPY", log} + } + + rows = append(rows, toRow(blockNum, log.HeaderID, log.CID, log.ReceiptID, + log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3)) + } + if nil != rows && len(rows) >= 0 { + _, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) + if err != nil { + return insertError{"eth.log_cids", err, "COPY", rows} + } + metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows))) + } + } else { + for _, log := range logs { + _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), + log.BlockNumber, + log.HeaderID, + log.CID, + log.ReceiptID, + log.Address, + log.Index, + log.Topic0, + log.Topic1, + log.Topic2, + log.Topic3) + if err != nil { + return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} + } + metrics.IndexerMetrics.LogsCounter.Inc(1) } - indexerMetrics.logs.Inc(1) } return nil } @@ -165,20 +220,39 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { if stateNode.Removed { balance = "0" } - _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), - stateNode.BlockNumber, - stateNode.HeaderID, - stateNode.StateKey, - stateNode.CID, - true, - balance, - stateNode.Nonce, - stateNode.CodeHash, - stateNode.StorageRoot, - stateNode.Removed, - ) - if err != nil { - return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} + + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } + balInt, err := strconv.ParseInt(balance, 10, 64) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } + + _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), + toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, + true, balInt, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), + stateNode.BlockNumber, + stateNode.HeaderID, + stateNode.StateKey, + stateNode.CID, + true, + balance, + stateNode.Nonce, + stateNode.CodeHash, + stateNode.StorageRoot, + stateNode.Removed, + ) + if err != nil { + return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} + } } return nil } @@ -188,22 +262,57 @@ INSERT INTO eth.storage_cids (block_number, header_id, state_leaf_key, storage_l ON CONFLICT (header_id, state_leaf_key, storage_leaf_key, block_number) DO NOTHING */ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), - storageCID.BlockNumber, - storageCID.HeaderID, - storageCID.StateKey, - storageCID.StorageKey, - storageCID.CID, - true, - storageCID.Value, - storageCID.Removed, - ) - if err != nil { - return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.storage_cids", err, "COPY", storageCID} + } + + _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), + toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, + true, storageCID.Value, storageCID.Removed))) + if err != nil { + return insertError{"eth.storage_cids", err, "COPY", storageCID} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), + storageCID.BlockNumber, + storageCID.HeaderID, + storageCID.StateKey, + storageCID.StorageKey, + storageCID.CID, + true, + storageCID.Value, + storageCID.Removed, + ) + if err != nil { + return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} + } } return nil } +func (w *Writer) useCopyForTx(tx Tx) bool { + // Using COPY instead of INSERT only makes much sense if also using a DelayedTx, so that operations + // can be collected over time and then all submitted within in a single TX. + if _, ok := tx.(*DelayedTx); ok { + return w.db.UseCopyFrom() + } + return false +} + +// combine args into a row +func toRow(args ...interface{}) []interface{} { + var row []interface{} + row = append(row, args...) + return row +} + +// combine row (or rows) into a slice of rows for CopyFrom +func toRows(rows ...[]interface{}) [][]interface{} { + return rows +} + type insertError struct { table string err error diff --git a/statediff/service.go b/statediff/service.go index 4bc5cda84..7325971c1 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -26,8 +26,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/trie" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -43,9 +41,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" ind "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" types2 "github.com/ethereum/go-ethereum/statediff/types" + "github.com/ethereum/go-ethereum/trie" "github.com/thoas/go-funk" ) @@ -794,15 +794,23 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p } output := func(node types2.StateLeafNode) error { + defer func() { + // This is very noisy so we log at Trace. + since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer) + logger.Trace(fmt.Sprintf("statediff output duration=%dms", since.Milliseconds())) + }() return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } ipldOutput := func(c types2.IPLD) error { + defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, metrics.IndexerMetrics.IPLDOutputTimer) return sds.indexer.PushIPLD(tx, c) } - err = sds.Builder.WriteStateDiffObject(types2.StateRoots{ + err = sds.Builder.WriteStateDiffObject(Args{ NewStateRoot: block.Root(), OldStateRoot: parentRoot, + BlockHash: block.Hash(), + BlockNumber: block.Number(), }, params, output, ipldOutput) // TODO this anti-pattern needs to be sorted out eventually if err := tx.Submit(err); err != nil { diff --git a/statediff/service_test.go b/statediff/service_test.go index e921d0893..1df068608 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -16,7 +16,6 @@ package statediff_test -/* import ( "bytes" "math/big" @@ -438,4 +437,3 @@ func testGetSyncStatus(t *testing.T) { } } } -*/ diff --git a/statediff/test_helpers/mocks/builder.go b/statediff/test_helpers/mocks/builder.go index 9e3ba0ec5..490393e53 100644 --- a/statediff/test_helpers/mocks/builder.go +++ b/statediff/test_helpers/mocks/builder.go @@ -44,8 +44,8 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi } // BuildStateDiffObject mock method -func (builder *Builder) WriteStateDiffObject(args sdtypes.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error { - builder.StateRoots = args +func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error { + builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot} builder.Params = params return builder.builderError diff --git a/statediff/trie_helpers/helpers.go b/statediff/trie_helpers/helpers.go index 0f5152774..d6c024ee4 100644 --- a/statediff/trie_helpers/helpers.go +++ b/statediff/trie_helpers/helpers.go @@ -22,12 +22,16 @@ package trie_helpers import ( "sort" "strings" + "time" + + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/types" ) // SortKeys sorts the keys in the account map func SortKeys(data types.AccountMap) []string { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer) keys := make([]string, 0, len(data)) for key := range data { keys = append(keys, key) @@ -41,6 +45,7 @@ func SortKeys(data types.AccountMap) []string { // a and b must first be sorted // this is used to find which keys have been both "deleted" and "created" i.e. they were updated func FindIntersection(a, b []string) []string { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer) lenA := len(a) lenB := len(b) iOfA, iOfB := 0, 0 diff --git a/trie/iterator.go b/trie/iterator.go index d2c8b6a78..3c2bc4ada 100644 --- a/trie/iterator.go +++ b/trie/iterator.go @@ -20,6 +20,9 @@ import ( "bytes" "container/heap" "errors" + "time" + + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" @@ -584,6 +587,7 @@ func (it *differenceIterator) AddResolver(resolver ethdb.KeyValueReader) { } func (it *differenceIterator) Next(bool) bool { + defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DifferenceIteratorNextTimer) // Invariants: // - We always advance at least one element in b. // - At the start of this function, a's path is lexically greater than b's.