cherry pick Thomas' work #349

Merged
telackey merged 13 commits from ian/v5_cherry_pick into v1.10.26-statediff-v5 2023-03-24 14:13:45 +00:00
36 changed files with 835 additions and 543 deletions

View File

@ -268,6 +268,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
if ctx.IsSet(utils.StateDiffLogStatements.Name) {
pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name)
}
if ctx.IsSet(utils.StateDiffCopyFrom.Name) {
pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name)
}
indexerConfig = pgConfig
case shared.DUMP:
dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name)

View File

@ -180,6 +180,7 @@ var (
utils.StateDiffWatchedAddressesFilePath,
utils.StateDiffUpsert,
utils.StateDiffLogStatements,
utils.StateDiffCopyFrom,
configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags)

View File

@ -1083,6 +1083,13 @@ var (
Usage: "Should the statediff service log all database statements? (Note: pgx only)",
Value: false,
}
StateDiffCopyFrom = &cli.BoolFlag{
Name: "statediff.db.copyfrom",
Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)",
Value: false,
}
StateDiffWritingFlag = &cli.BoolFlag{
Name: "statediff.writing",
Usage: "Activates progressive writing of state diffs to database as new block are synced",

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
@ -330,7 +331,13 @@ func (ethash *Ethash) verifyHeader(chain consensus.ChainHeaderReader, header, pa
// the difficulty that a new block should have when created at time
// given the parent block's time and difficulty.
func (ethash *Ethash) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int {
return CalcDifficulty(chain.Config(), time, parent)
var config = chain.Config()
var ret = CalcDifficulty(config, time, parent)
if nil != config.CappedMaximumDifficulty && ret.Cmp(config.CappedMaximumDifficulty) >= 0 {
log.Info(fmt.Sprintf("Using capped difficulty %d", config.CappedMaximumDifficulty))
return config.CappedMaximumDifficulty
}
return ret
}
// CalcDifficulty is the difficulty adjustment algorithm. It returns

View File

@ -63,6 +63,13 @@ type Database struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
getTimer metrics.Timer // Timer/counter for measuring time and invocations of Get().
putTimer metrics.Timer // Timer/counter for measuring time and invocations of Put().
deleteTimer metrics.Timer // Timer/counter for measuring time and invocations of Delete().
hasTimer metrics.Timer // Timer/counter for measuring time and invocations of Has().
batchWriteTimer metrics.Timer // Timer/counter for measuring time and invocations of batch writes.
batchItemCounter metrics.Counter // Counter for measuring number of batched items written.
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
@ -144,6 +151,13 @@ func NewCustom(file string, namespace string, customize func(options *opt.Option
ldb.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil)
ldb.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil)
ldb.getTimer = metrics.NewRegisteredTimer(namespace+"db/get/time", nil)
ldb.putTimer = metrics.NewRegisteredTimer(namespace+"db/put/time", nil)
ldb.deleteTimer = metrics.NewRegisteredTimer(namespace+"db/delete/time", nil)
ldb.hasTimer = metrics.NewRegisteredTimer(namespace+"db/has/time", nil)
ldb.batchWriteTimer = metrics.NewRegisteredTimer(namespace+"db/batch_write/time", nil)
ldb.batchItemCounter = metrics.NewRegisteredCounter(namespace+"db/batch_item/count", nil)
// Start up the metrics gathering and return
go ldb.meter(metricsGatheringInterval)
return ldb, nil
@ -182,11 +196,17 @@ func (db *Database) Close() error {
// Has retrieves if a key is present in the key-value store.
func (db *Database) Has(key []byte) (bool, error) {
if nil != db.hasTimer {
defer func(start time.Time) { db.hasTimer.UpdateSince(start) }(time.Now())
}
return db.db.Has(key, nil)
}
// Get retrieves the given key if it's present in the key-value store.
func (db *Database) Get(key []byte) ([]byte, error) {
if nil != db.getTimer {
defer func(start time.Time) { db.getTimer.UpdateSince(start) }(time.Now())
}
dat, err := db.db.Get(key, nil)
if err != nil {
return nil, err
@ -196,11 +216,17 @@ func (db *Database) Get(key []byte) ([]byte, error) {
// Put inserts the given value into the key-value store.
func (db *Database) Put(key []byte, value []byte) error {
if nil != db.putTimer {
defer func(start time.Time) { db.putTimer.UpdateSince(start) }(time.Now())
}
return db.db.Put(key, value, nil)
}
// Delete removes the key from the key-value store.
func (db *Database) Delete(key []byte) error {
if nil != db.deleteTimer {
defer func(start time.Time) { db.deleteTimer.UpdateSince(start) }(time.Now())
}
return db.db.Delete(key, nil)
}
@ -208,16 +234,20 @@ func (db *Database) Delete(key []byte) error {
// database until a final write is called.
func (db *Database) NewBatch() ethdb.Batch {
return &batch{
db: db.db,
b: new(leveldb.Batch),
db: db.db,
b: new(leveldb.Batch),
writeTimer: &db.batchWriteTimer,
itemCounter: &db.batchItemCounter,
}
}
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (db *Database) NewBatchWithSize(size int) ethdb.Batch {
return &batch{
db: db.db,
b: leveldb.MakeBatch(size),
db: db.db,
b: leveldb.MakeBatch(size),
writeTimer: &db.batchWriteTimer,
itemCounter: &db.batchItemCounter,
}
}
@ -468,9 +498,11 @@ func (db *Database) meter(refresh time.Duration) {
// batch is a write-only leveldb batch that commits changes to its host database
// when Write is called. A batch cannot be used concurrently.
type batch struct {
db *leveldb.DB
b *leveldb.Batch
size int
db *leveldb.DB
b *leveldb.Batch
size int
writeTimer *metrics.Timer
itemCounter *metrics.Counter
}
// Put inserts the given value into the batch for later committing.
@ -494,6 +526,12 @@ func (b *batch) ValueSize() int {
// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
if nil != *b.writeTimer {
defer func(start time.Time) { (*b.writeTimer).UpdateSince(start) }(time.Now())
}
if nil != *b.itemCounter {
(*b.itemCounter).Inc(int64(b.size))
}
return b.db.Write(b.b, nil)
}

4
go.mod
View File

@ -63,7 +63,7 @@ require (
github.com/rs/cors v1.7.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.7.0
github.com/supranational/blst v0.3.8
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/thoas/go-funk v0.9.2
@ -120,7 +120,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect

8
go.sum
View File

@ -537,17 +537,15 @@ github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZL
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As=
github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/supranational/blst v0.3.8 h1:glwLF4oBRSJOTr05lRBgNwGQST0ndP2wg29fSeTRKCY=
github.com/supranational/blst v0.3.8/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=

View File

@ -82,6 +82,9 @@ func (c *collector) addTimer(name string, m metrics.Timer) {
c.writeSummaryPercentile(name, strconv.FormatFloat(pv[i], 'f', -1, 64), ps[i])
}
c.buff.WriteRune('\n')
c.buff.WriteString(fmt.Sprintf(typeGaugeTpl, mutateKey(name+"_total")))
c.buff.WriteString(fmt.Sprintf(keyValueTpl, mutateKey(name+"_total"), m.Total()))
}
func (c *collector) addResettingTimer(name string, m metrics.ResettingTimer) {

View File

@ -92,6 +92,9 @@ test_timer {quantile="0.99"} 1.2e+08
test_timer {quantile="0.999"} 1.2e+08
test_timer {quantile="0.9999"} 1.2e+08
# TYPE test_timer_total gauge
test_timer_total 230000000
# TYPE test_resetting_timer_count counter
test_resetting_timer_count 6

View File

@ -25,6 +25,7 @@ type Timer interface {
Update(time.Duration)
UpdateSince(time.Time)
Variance() float64
Total() int64
}
// GetOrRegisterTimer returns an existing Timer or constructs and registers a
@ -47,6 +48,7 @@ func NewCustomTimer(h Histogram, m Meter) Timer {
return &StandardTimer{
histogram: h,
meter: m,
total: NewCounter(),
}
}
@ -72,6 +74,7 @@ func NewTimer() Timer {
return &StandardTimer{
histogram: NewHistogram(NewExpDecaySample(1028, 0.015)),
meter: NewMeter(),
total: NewCounter(),
}
}
@ -134,11 +137,15 @@ func (NilTimer) UpdateSince(time.Time) {}
// Variance is a no-op.
func (NilTimer) Variance() float64 { return 0.0 }
// Total is a no-op.
func (NilTimer) Total() int64 { return int64(0) }
// StandardTimer is the standard implementation of a Timer and uses a Histogram
// and Meter.
type StandardTimer struct {
histogram Histogram
meter Meter
total Counter
mutex sync.Mutex
}
@ -200,6 +207,7 @@ func (t *StandardTimer) Snapshot() Timer {
return &TimerSnapshot{
histogram: t.histogram.Snapshot().(*HistogramSnapshot),
meter: t.meter.Snapshot().(*MeterSnapshot),
total: t.total.Snapshot().(CounterSnapshot),
}
}
@ -231,14 +239,12 @@ func (t *StandardTimer) Update(d time.Duration) {
defer t.mutex.Unlock()
t.histogram.Update(int64(d))
t.meter.Mark(1)
t.total.Inc(int64(d))
}
// Record the duration of an event that started at a time and ends now.
func (t *StandardTimer) UpdateSince(ts time.Time) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.histogram.Update(int64(time.Since(ts)))
t.meter.Mark(1)
t.Update(time.Since(ts))
}
// Variance returns the variance of the values in the sample.
@ -246,10 +252,18 @@ func (t *StandardTimer) Variance() float64 {
return t.histogram.Variance()
}
// Total returns the total time the timer has run in nanoseconds.
// This differs from Sum in that it is a simple counter, not based
// on a histogram Sample.
func (t *StandardTimer) Total() int64 {
return t.total.Count()
}
// TimerSnapshot is a read-only copy of another Timer.
type TimerSnapshot struct {
histogram *HistogramSnapshot
meter *MeterSnapshot
total CounterSnapshot
}
// Count returns the number of events recorded at the time the snapshot was
@ -324,3 +338,6 @@ func (*TimerSnapshot) UpdateSince(time.Time) {
// Variance returns the variance of the values at the time the snapshot was
// taken.
func (t *TimerSnapshot) Variance() float64 { return t.histogram.Variance() }
// Total returns the total time the timer has run in nanoseconds.
func (t *TimerSnapshot) Total() int64 { return t.total.Count() }

View File

@ -270,16 +270,16 @@ var (
//
// This configuration is intentionally not using keyed fields to force anyone
// adding flags to the config to also have to set these fields.
AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil}
AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil}
// AllCliqueProtocolChanges contains every protocol change (EIPs) introduced
// and accepted by the Ethereum core developers into the Clique consensus.
//
// This configuration is intentionally not using keyed fields to force anyone
// adding flags to the config to also have to set these fields.
AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}}
AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}}
TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil}
TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil}
TestRules = TestChainConfig.Rules(new(big.Int), false)
)
@ -377,6 +377,9 @@ type ChainConfig struct {
// the network that triggers the consensus upgrade.
TerminalTotalDifficulty *big.Int `json:"terminalTotalDifficulty,omitempty"`
// Cap the maximum total difficulty (for testnet use only).
CappedMaximumDifficulty *big.Int `json:"cappedMaximumDifficulty,omitempty"`
// TerminalTotalDifficultyPassed is a flag specifying that the network already
// passed the terminal total difficulty. Its purpose is to disable legacy sync
// even without having seen the TTD locally (safer long term).
@ -419,7 +422,11 @@ func (c *ChainConfig) String() string {
switch {
case c.Ethash != nil:
if c.TerminalTotalDifficulty == nil {
banner += "Consensus: Ethash (proof-of-work)\n"
if nil == c.CappedMaximumDifficulty {
banner += "Consensus: Ethash (proof-of-work)\n"
} else {
banner += fmt.Sprintf("Consensus: Ethash (proof-of-work, capped difficulty at %d)\n", c.CappedMaximumDifficulty)
}
} else if !c.TerminalTotalDifficultyPassed {
banner += "Consensus: Beacon (proof-of-stake), merging from Ethash (proof-of-work)\n"
} else {
@ -434,7 +441,11 @@ func (c *ChainConfig) String() string {
banner += "Consensus: Beacon (proof-of-stake), merged from Clique (proof-of-authority)\n"
}
default:
banner += "Consensus: unknown\n"
if nil == c.CappedMaximumDifficulty {
banner += "Consensus: unknown\n"
} else {
banner += fmt.Sprintf("Consensus: unknown (capped difficulty at %d)\n", c.CappedMaximumDifficulty)
}
}
banner += "\n"

View File

@ -22,6 +22,7 @@ package statediff
import (
"bytes"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@ -29,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/statediff/trie_helpers"
@ -45,7 +47,7 @@ var (
// Builder interface exposes the method for building a state diff between two blocks
type Builder interface {
BuildStateDiffObject(args Args, params Params) (types2.StateObject, error)
WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error
WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error
}
type StateDiffBuilder struct {
@ -84,11 +86,10 @@ func NewBuilder(stateCache state.Database) Builder {
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (types2.StateObject, error) {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []types2.StateLeafNode
var iplds []types2.IPLD
err := sdb.WriteStateDiffObject(
types2.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot},
params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds))
err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds))
if err != nil {
return types2.StateObject{}, err
}
@ -101,8 +102,9 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (typ
}
// WriteStateDiffObject writes a statediff object to output sinks
func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink,
func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink,
ipldOutput types2.IPLDSink) error {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.WriteStateDiffObjectTimer)
// Load tries for old and new states
oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot)
if err != nil {
@ -128,16 +130,19 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params
},
}
return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput)
logger := log.New("hash", args.BlockHash.Hex(), "number", args.BlockNumber)
return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput, logger)
}
func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params,
output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error {
logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes")
defer metrics2.ReportAndUpdateDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer)
// collect a slice of all the nodes that were touched and exist at B (B-A)
// a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, err := sdb.createdAndUpdatedState(
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput)
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
}
@ -146,28 +151,34 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
// a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState(
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
params.watchedAddressesLeafPaths, output)
params.watchedAddressesLeafPaths, output, logger)
if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
}
// collect and sort the leafkey keys for both account mappings into a slice
t := time.Now()
createKeys := trie_helpers.SortKeys(diffAccountsAtB)
deleteKeys := trie_helpers.SortKeys(diffAccountsAtA)
logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes sort duration=%dms", time.Since(t).Milliseconds()))
// and then find the intersection of these keys
// these are the leafkeys for the accounts which exist at both A and B but are different
// this also mutates the passed in createKeys and deleteKeys, removing the intersection keys
// and leaving the truly created or deleted keys in place
t = time.Now()
updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys)
logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes intersection count=%d duration=%dms",
len(updatedKeys),
time.Since(t).Milliseconds()))
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput)
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for updated accounts: %v", err)
}
// build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput)
err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for created accounts: %v", err)
}
@ -179,11 +190,13 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
watchedAddressesLeafPaths [][]byte, output types2.IPLDSink) (types2.AccountMap, error) {
watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger) (types2.AccountMap, error) {
logger.Debug("statediff BEGIN createdAndUpdatedState")
defer metrics2.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateTimer)
diffAccountsAtB := make(types2.AccountMap)
watchingAddresses := len(watchedAddressesLeafPaths) > 0
it, _ := trie.NewDifferenceIterator(a, b)
it, itCount := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
// ignore node if it is not along paths of interest
if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, it.Path()) {
@ -236,6 +249,8 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
}
}
}
logger.Debug("statediff COUNTS createdAndUpdatedStateWithIntermediateNodes", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
metrics2.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
return diffAccountsAtB, it.Error()
}
@ -275,7 +290,9 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watched
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap,
watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink) (types2.AccountMap, error) {
watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger) (types2.AccountMap, error) {
logger.Debug("statediff BEGIN deletedOrUpdatedState")
defer metrics2.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer)
diffAccountAtA := make(types2.AccountMap)
watchingAddresses := len(watchedAddressesLeafPaths) > 0
@ -330,7 +347,9 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
// needs to be called before building account creations and deletions as this mutates
// those account maps to remove the accounts which were updated
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.AccountMap, updatedKeys []string,
output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error {
logger.Debug("statediff BEGIN buildAccountUpdates", "creations", len(creations), "deletions", len(deletions), "updatedKeys", len(updatedKeys))
defer metrics2.ReportAndUpdateDuration("statediff END buildAccountUpdates ", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountUpdatesTimer)
var err error
for _, key := range updatedKeys {
createdAcc := creations[key]
@ -361,7 +380,10 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.Acc
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
// it also returns the code and codehash for created contract accounts
func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink,
ipldOutput types2.IPLDSink, logger log.Logger) error {
logger.Debug("statediff BEGIN buildAccountCreations")
defer metrics2.ReportAndUpdateDuration("statediff END buildAccountCreations", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountCreationsTimer)
for _, val := range accounts {
diff := types2.StateLeafNode{
AccountWrapper: val,
@ -400,6 +422,7 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, o
// i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output types2.StorageNodeSink,
ipldOutput types2.IPLDSink) error {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesEventualTimer)
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil
}
@ -421,6 +444,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output ty
// including intermediate nodes can be turned on or off
func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink,
ipldOutput types2.IPLDSink) error {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesFromTrieTimer)
for it.Next(true) {
if it.Leaf() {
storageLeafNode, err := sdb.processStorageValueNode(it)
@ -470,6 +494,7 @@ func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator) (type
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output types2.StorageNodeSink) error {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedAccountStorageNodesTimer)
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil
}
@ -489,6 +514,7 @@ func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, out
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink) error {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedStorageNodesFromTrieTimer)
for it.Next(true) {
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
leafKey := make([]byte, len(it.LeafKey()))
@ -509,6 +535,7 @@ func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterat
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, output types2.StorageNodeSink,
ipldOutput types2.IPLDSink) error {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesIncrementalTimer)
if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) {
return nil
}
@ -537,6 +564,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output types2.StorageNodeSink,
ipldOutput types2.IPLDSink) (map[string]bool, error) {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.CreatedAndUpdatedStorageTimer)
diffSlotsAtB := make(map[string]bool)
it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
@ -566,6 +594,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, ou
}
func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, output types2.StorageNodeSink) error {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.DeletedOrUpdatedStorageTimer)
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if it.Leaf() {
@ -602,6 +631,7 @@ func isValidPrefixPath(watchedAddressesLeafPaths [][]byte, currentPath []byte) b
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
func isWatchedAddress(watchedAddressesLeafPaths [][]byte, valueNodePath []byte) bool {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.IsWatchedAddressTimer)
for _, watchedAddressPath := range watchedAddressesLeafPaths {
if bytes.Equal(watchedAddressPath, valueNodePath) {
return true

View File

@ -29,9 +29,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@ -41,10 +41,6 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
var (
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
)
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
dump io.WriteCloser
@ -106,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
close(self.quit)
close(self.iplds)
tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
@ -115,7 +111,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return err
}
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
@ -125,7 +121,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
go blockTx.cache()
tDiff := time.Since(t)
indexerMetrics.tFreePostgres.Update(tDiff)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
@ -137,7 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tHeaderProcessing.Update(tDiff)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
@ -146,7 +142,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tUncleProcessing.Update(tDiff)
metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
@ -163,7 +159,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()

View File

@ -1,94 +0,0 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dump
import (
"strings"
"github.com/ethereum/go-ethereum/metrics"
)
const (
namespace = "statediff"
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
return ""
}
parts := []string{namespace, name}
if subsystem != "" {
parts = []string{namespace, subsystem, name}
}
// Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/")
}
type indexerMetricsHandles struct {
// The total number of processed blocks
blocks metrics.Counter
// The total number of processed transactions
transactions metrics.Counter
// The total number of processed receipts
receipts metrics.Counter
// The total number of processed logs
logs metrics.Counter
// The total number of access list entries processed
accessListEntries metrics.Counter
// Time spent waiting for free postgres tx
tFreePostgres metrics.Timer
// Postgres transaction commit duration
tPostgresCommit metrics.Timer
// Header processing time
tHeaderProcessing metrics.Timer
// Uncle processing time
tUncleProcessing metrics.Timer
// Tx and receipt processing time
tTxAndRecProcessing metrics.Timer
// State, storage, and code combined processing time
tStateStoreCodeProcessing metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
ctx := indexerMetricsHandles{
blocks: metrics.NewCounter(),
transactions: metrics.NewCounter(),
receipts: metrics.NewCounter(),
logs: metrics.NewCounter(),
accessListEntries: metrics.NewCounter(),
tFreePostgres: metrics.NewTimer(),
tPostgresCommit: metrics.NewTimer(),
tHeaderProcessing: metrics.NewTimer(),
tUncleProcessing: metrics.NewTimer(),
tTxAndRecProcessing: metrics.NewTimer(),
tStateStoreCodeProcessing: metrics.NewTimer(),
}
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
reg.Register(metricName(subsys, "logs"), ctx.logs)
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
return ctx
}

View File

@ -28,6 +28,7 @@ import (
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
@ -235,7 +236,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase)
csw.rows <- tableRow{schema.TableHeader, values}
indexerMetrics.blocks.Inc(1)
metrics.IndexerMetrics.BlocksCounter.Inc(1)
}
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
@ -250,7 +251,7 @@ func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.Type, transaction.Value)
csw.rows <- tableRow{schema.TableTransaction, values}
indexerMetrics.transactions.Inc(1)
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
@ -258,7 +259,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, rct.PostStatus)
csw.rows <- tableRow{schema.TableReceipt, values}
indexerMetrics.receipts.Inc(1)
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
}
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
@ -267,7 +268,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3)
csw.rows <- tableRow{schema.TableLog, values}
indexerMetrics.logs.Inc(1)
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
}
@ -284,13 +285,8 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
}
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
var storageKey string
if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey
}
var values []interface{}
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageKey, storageCID.CID,
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed)
csw.rows <- tableRow{schema.TableStorageNode, values}
}

View File

@ -34,9 +34,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@ -53,10 +53,6 @@ const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address,
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
var (
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
)
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
fileWriter FileWriter
@ -172,12 +168,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
BlockNumber: block.Number().String(),
submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
sdi.fileWriter.Flush()
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
@ -185,21 +181,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
},
}
tDiff := time.Since(t)
indexerMetrics.tFreePostgres.Update(tDiff)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
// write header, collect headerID
headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty)
tDiff = time.Since(t)
indexerMetrics.tHeaderProcessing.Update(tDiff)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// write uncles
sdi.processUncles(headerID, block.Number(), block.UncleHash(), block.Uncles())
tDiff = time.Since(t)
indexerMetrics.tUncleProcessing.Update(tDiff)
metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
@ -217,7 +213,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()

View File

@ -1,94 +0,0 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file
import (
"strings"
"github.com/ethereum/go-ethereum/metrics"
)
const (
namespace = "statediff"
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
return ""
}
parts := []string{namespace, name}
if subsystem != "" {
parts = []string{namespace, subsystem, name}
}
// Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/")
}
type indexerMetricsHandles struct {
// The total number of processed blocks
blocks metrics.Counter
// The total number of processed transactions
transactions metrics.Counter
// The total number of processed receipts
receipts metrics.Counter
// The total number of processed logs
logs metrics.Counter
// The total number of access list entries processed
accessListEntries metrics.Counter
// Time spent waiting for free postgres tx
tFreePostgres metrics.Timer
// Postgres transaction commit duration
tPostgresCommit metrics.Timer
// Header processing time
tHeaderProcessing metrics.Timer
// Uncle processing time
tUncleProcessing metrics.Timer
// Tx and receipt processing time
tTxAndRecProcessing metrics.Timer
// State, storage, and code combined processing time
tStateStoreCodeProcessing metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
ctx := indexerMetricsHandles{
blocks: metrics.NewCounter(),
transactions: metrics.NewCounter(),
receipts: metrics.NewCounter(),
logs: metrics.NewCounter(),
accessListEntries: metrics.NewCounter(),
tFreePostgres: metrics.NewTimer(),
tPostgresCommit: metrics.NewTimer(),
tHeaderProcessing: metrics.NewTimer(),
tUncleProcessing: metrics.NewTimer(),
tTxAndRecProcessing: metrics.NewTimer(),
tStateStoreCodeProcessing: metrics.NewTimer(),
}
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
reg.Register(metricName(subsys, "logs"), ctx.logs)
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
return ctx
}

View File

@ -28,6 +28,7 @@ import (
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
@ -35,7 +36,6 @@ import (
)
var (
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
pipeSize = 65336 // min(linuxPipeSize, macOSPipeSize)
writeBufferSize = pipeSize * 16 * 96
)
@ -191,7 +191,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase)
sqw.stmts <- []byte(stmt)
indexerMetrics.blocks.Inc(1)
metrics.IndexerMetrics.BlocksCounter.Inc(1)
}
func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
@ -202,20 +202,20 @@ func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.Type, transaction.Value))
indexerMetrics.transactions.Inc(1)
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, rct.PostStatus))
indexerMetrics.receipts.Inc(1)
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3))
indexerMetrics.logs.Inc(1)
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
}

View File

@ -0,0 +1,263 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package metrics
import (
"fmt"
"strings"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
const (
namespace = "statediff"
)
var (
IndexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
DBMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
return ""
}
parts := []string{namespace, name}
if subsystem != "" {
parts = []string{namespace, subsystem, name}
}
// Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/")
}
type IndexerMetricsHandles struct {
// The total number of processed BlocksCounter
BlocksCounter metrics.Counter
// The total number of processed transactions
TransactionsCounter metrics.Counter
// The total number of processed receipts
ReceiptsCounter metrics.Counter
// The total number of processed logs
LogsCounter metrics.Counter
// The total number of access list entries processed
AccessListEntriesCounter metrics.Counter
// Time spent waiting for free postgres tx
FreePostgresTimer metrics.Timer
// Postgres transaction commit duration
PostgresCommitTimer metrics.Timer
// Header processing time
HeaderProcessingTimer metrics.Timer
// Uncle processing time
UncleProcessingTimer metrics.Timer
// Tx and receipt processing time
TxAndRecProcessingTimer metrics.Timer
// State, storage, and code combined processing time
StateStoreCodeProcessingTimer metrics.Timer
// Fine-grained code timers
BuildStateDiffWithIntermediateStateNodesTimer metrics.Timer
BuildStateDiffWithoutIntermediateStateNodesTimer metrics.Timer
CreatedAndUpdatedStateWithIntermediateNodesTimer metrics.Timer
DeletedOrUpdatedStateTimer metrics.Timer
BuildAccountUpdatesTimer metrics.Timer
BuildAccountCreationsTimer metrics.Timer
ResolveNodeTimer metrics.Timer
SortKeysTimer metrics.Timer
FindIntersectionTimer metrics.Timer
OutputTimer metrics.Timer
IPLDOutputTimer metrics.Timer
DifferenceIteratorNextTimer metrics.Timer
DifferenceIteratorCounter metrics.Counter
DeletedOrUpdatedStorageTimer metrics.Timer
CreatedAndUpdatedStorageTimer metrics.Timer
BuildStorageNodesIncrementalTimer metrics.Timer
BuildStateTrieObjectTimer metrics.Timer
BuildStateTrieTimer metrics.Timer
BuildStateDiffObjectTimer metrics.Timer
WriteStateDiffObjectTimer metrics.Timer
CreatedAndUpdatedStateTimer metrics.Timer
BuildStorageNodesEventualTimer metrics.Timer
BuildStorageNodesFromTrieTimer metrics.Timer
BuildRemovedAccountStorageNodesTimer metrics.Timer
BuildRemovedStorageNodesFromTrieTimer metrics.Timer
IsWatchedAddressTimer metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
ctx := IndexerMetricsHandles{
BlocksCounter: metrics.NewCounter(),
TransactionsCounter: metrics.NewCounter(),
ReceiptsCounter: metrics.NewCounter(),
LogsCounter: metrics.NewCounter(),
AccessListEntriesCounter: metrics.NewCounter(),
FreePostgresTimer: metrics.NewTimer(),
PostgresCommitTimer: metrics.NewTimer(),
HeaderProcessingTimer: metrics.NewTimer(),
UncleProcessingTimer: metrics.NewTimer(),
TxAndRecProcessingTimer: metrics.NewTimer(),
StateStoreCodeProcessingTimer: metrics.NewTimer(),
BuildStateDiffWithIntermediateStateNodesTimer: metrics.NewTimer(),
BuildStateDiffWithoutIntermediateStateNodesTimer: metrics.NewTimer(),
CreatedAndUpdatedStateWithIntermediateNodesTimer: metrics.NewTimer(),
DeletedOrUpdatedStateTimer: metrics.NewTimer(),
BuildAccountUpdatesTimer: metrics.NewTimer(),
BuildAccountCreationsTimer: metrics.NewTimer(),
ResolveNodeTimer: metrics.NewTimer(),
SortKeysTimer: metrics.NewTimer(),
FindIntersectionTimer: metrics.NewTimer(),
OutputTimer: metrics.NewTimer(),
IPLDOutputTimer: metrics.NewTimer(),
DifferenceIteratorNextTimer: metrics.NewTimer(),
DifferenceIteratorCounter: metrics.NewCounter(),
DeletedOrUpdatedStorageTimer: metrics.NewTimer(),
CreatedAndUpdatedStorageTimer: metrics.NewTimer(),
BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
BuildStateTrieObjectTimer: metrics.NewTimer(),
BuildStateTrieTimer: metrics.NewTimer(),
BuildStateDiffObjectTimer: metrics.NewTimer(),
WriteStateDiffObjectTimer: metrics.NewTimer(),
CreatedAndUpdatedStateTimer: metrics.NewTimer(),
BuildStorageNodesEventualTimer: metrics.NewTimer(),
BuildStorageNodesFromTrieTimer: metrics.NewTimer(),
BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(),
BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(),
IsWatchedAddressTimer: metrics.NewTimer(),
}
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter)
reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter)
reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter)
reg.Register(metricName(subsys, "logs"), ctx.LogsCounter)
reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer)
reg.Register(metricName(subsys, "t_header_processing"), ctx.HeaderProcessingTimer)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
reg.Register(metricName(subsys, "t_build_statediff_with_intermediate_state_nodes"), ctx.BuildStateDiffWithIntermediateStateNodesTimer)
reg.Register(metricName(subsys, "t_build_statediff_without_intermediate_state_nodes"), ctx.BuildStateDiffWithoutIntermediateStateNodesTimer)
reg.Register(metricName(subsys, "t_created_and_update_state_with_intermediate_nodes"), ctx.CreatedAndUpdatedStateWithIntermediateNodesTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer)
reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer)
reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer)
reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer)
reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer)
reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer)
reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer)
reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer)
reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer)
reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter)
reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
reg.Register(metricName(subsys, "t_build_state_trie_object"), ctx.BuildStateTrieObjectTimer)
reg.Register(metricName(subsys, "t_build_state_trie"), ctx.BuildStateTrieTimer)
reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer)
reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_from_trie"), ctx.BuildStorageNodesFromTrieTimer)
reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer)
reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer)
reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer)
log.Debug("Registering statediff indexer metrics.")
return ctx
}
type dbMetricsHandles struct {
// Maximum number of open connections to the sql
maxOpen metrics.Gauge
// The number of established connections both in use and idle
open metrics.Gauge
// The number of connections currently in use
inUse metrics.Gauge
// The number of idle connections
idle metrics.Gauge
// The total number of connections waited for
waitedFor metrics.Counter
// The total time blocked waiting for a new connection
blockedMilliseconds metrics.Counter
// The total number of connections closed due to SetMaxIdleConns
closedMaxIdle metrics.Counter
// The total number of connections closed due to SetConnMaxLifetime
closedMaxLifetime metrics.Counter
}
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
ctx := dbMetricsHandles{
maxOpen: metrics.NewGauge(),
open: metrics.NewGauge(),
inUse: metrics.NewGauge(),
idle: metrics.NewGauge(),
waitedFor: metrics.NewCounter(),
blockedMilliseconds: metrics.NewCounter(),
closedMaxIdle: metrics.NewCounter(),
closedMaxLifetime: metrics.NewCounter(),
}
subsys := "connections"
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
reg.Register(metricName(subsys, "open"), ctx.open)
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
reg.Register(metricName(subsys, "idle"), ctx.idle)
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
log.Debug("Registering statediff DB metrics.")
return ctx
}
// DbStats interface to accommodate different concrete sql stats types
type DbStats interface {
MaxOpen() int64
Open() int64
InUse() int64
Idle() int64
WaitCount() int64
WaitDuration() time.Duration
MaxIdleClosed() int64
MaxLifetimeClosed() int64
}
func (met *dbMetricsHandles) Update(stats DbStats) {
met.maxOpen.Update(stats.MaxOpen())
met.open.Update(stats.Open())
met.inUse.Update(stats.InUse())
met.idle.Update(stats.Idle())
met.waitedFor.Inc(stats.WaitCount())
met.blockedMilliseconds.Inc(stats.WaitDuration().Milliseconds())
met.closedMaxIdle.Inc(stats.MaxIdleClosed())
met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed())
}
func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) {
since := UpdateDuration(start, timer)
logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds()))
}
func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration {
since := time.Since(start)
timer.Update(since)
return since
}

View File

@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@ -44,11 +45,6 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
var (
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
)
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql
type StateDiffIndexer struct {
ctx context.Context
@ -75,7 +71,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
for {
select {
case <-ticker.C:
dbMetrics.Update(sdi.dbWriter.db.Stats())
metrics2.DBMetrics.Update(sdi.dbWriter.db.Stats())
case <-quit:
ticker.Stop()
return
@ -156,7 +152,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
rollback(sdi.ctx, tx)
} else {
tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
@ -167,7 +163,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
err = tx.Commit(sdi.ctx)
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
@ -178,7 +174,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
go blockTx.cache()
tDiff := time.Since(t)
indexerMetrics.tFreePostgres.Update(tDiff)
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
@ -190,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tHeaderProcessing.Update(tDiff)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
@ -199,7 +195,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tUncleProcessing.Update(tDiff)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
@ -216,7 +212,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()

View File

@ -19,7 +19,8 @@ package sql
import (
"context"
"io"
"time"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
)
// Database interfaces required by the sql indexer
@ -30,12 +31,13 @@ type Database interface {
// Driver interface has all the methods required by a driver implementation to support the sql indexer
type Driver interface {
UseCopyFrom() bool
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Begin(ctx context.Context) (Tx, error)
Stats() Stats
Stats() metrics.DbStats
NodeID() string
Context() context.Context
io.Closer
@ -52,12 +54,25 @@ type Statements interface {
InsertStorageStm() string
InsertIPLDStm() string
InsertIPLDsStm() string
// Table/column descriptions for use with CopyFrom and similar commands.
LogTableName() []string
LogColumnNames() []string
RctTableName() []string
RctColumnNames() []string
StateTableName() []string
StateColumnNames() []string
StorageTableName() []string
StorageColumnNames() []string
TxTableName() []string
TxColumnNames() []string
}
// Tx interface to accommodate different concrete SQL transaction types
type Tx interface {
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error)
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}
@ -71,15 +86,3 @@ type ScannableRow interface {
type Result interface {
RowsAffected() (int64, error)
}
// Stats interface to accommodate different concrete sql stats types
type Stats interface {
MaxOpen() int64
Open() int64
InUse() int64
Idle() int64
WaitCount() int64
WaitDuration() time.Duration
MaxIdleClosed() int64
MaxLifetimeClosed() int64
}

View File

@ -2,10 +2,16 @@ package sql
import (
"context"
"reflect"
"github.com/ethereum/go-ethereum/log"
)
// Changing this to 1 would make sure only sequential COPYs were combined.
const copyFromCheckLimit = 100
type DelayedTx struct {
cache []cachedStmt
cache []interface{}
db Database
}
type cachedStmt struct {
@ -13,6 +19,20 @@ type cachedStmt struct {
args []interface{}
}
type copyFrom struct {
tableName []string
columnNames []string
rows [][]interface{}
}
func (cf *copyFrom) appendRows(rows [][]interface{}) {
cf.rows = append(cf.rows, rows...)
}
func (cf *copyFrom) matches(tableName []string, columnNames []string) bool {
return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames)
}
func NewDelayedTx(db Database) *DelayedTx {
return &DelayedTx{db: db}
}
@ -21,6 +41,28 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
return tx.db.QueryRow(ctx, sql, args...)
}
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
prevCopy, ok := tx.cache[pos].(*copyFrom)
if ok && prevCopy.matches(tableName, columnNames) {
return prevCopy, count
}
}
return nil, -1
}
func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
if prevCopy, distance := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy {
log.Trace("statediff lazy_tx : Appending to COPY", "table", tableName,
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
prevCopy.appendRows(rows)
} else {
tx.cache = append(tx.cache, &copyFrom{tableName, columnNames, rows})
}
return 0, nil
}
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
tx.cache = append(tx.cache, cachedStmt{sql, args})
return nil, nil
@ -39,10 +81,19 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
rollback(ctx, base)
}
}()
for _, stmt := range tx.cache {
_, err := base.Exec(ctx, stmt.sql, stmt.args...)
if err != nil {
return err
for _, item := range tx.cache {
switch item := item.(type) {
case *copyFrom:
_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
if err != nil {
log.Error("COPY error", "table", item.tableName, "err", err)
return err
}
case cachedStmt:
_, err := base.Exec(ctx, item.sql, item.args...)
if err != nil {
return err
}
}
}
tx.cache = nil

View File

@ -1,147 +0,0 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package sql
import (
"strings"
"github.com/ethereum/go-ethereum/metrics"
)
const (
namespace = "statediff"
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
return ""
}
parts := []string{namespace, name}
if subsystem != "" {
parts = []string{namespace, subsystem, name}
}
// Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/")
}
type indexerMetricsHandles struct {
// The total number of processed blocks
blocks metrics.Counter
// The total number of processed transactions
transactions metrics.Counter
// The total number of processed receipts
receipts metrics.Counter
// The total number of processed logs
logs metrics.Counter
// The total number of access list entries processed
accessListEntries metrics.Counter
// Time spent waiting for free postgres tx
tFreePostgres metrics.Timer
// Postgres transaction commit duration
tPostgresCommit metrics.Timer
// Header processing time
tHeaderProcessing metrics.Timer
// Uncle processing time
tUncleProcessing metrics.Timer
// Tx and receipt processing time
tTxAndRecProcessing metrics.Timer
// State, storage, and code combined processing time
tStateStoreCodeProcessing metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
ctx := indexerMetricsHandles{
blocks: metrics.NewCounter(),
transactions: metrics.NewCounter(),
receipts: metrics.NewCounter(),
logs: metrics.NewCounter(),
accessListEntries: metrics.NewCounter(),
tFreePostgres: metrics.NewTimer(),
tPostgresCommit: metrics.NewTimer(),
tHeaderProcessing: metrics.NewTimer(),
tUncleProcessing: metrics.NewTimer(),
tTxAndRecProcessing: metrics.NewTimer(),
tStateStoreCodeProcessing: metrics.NewTimer(),
}
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
reg.Register(metricName(subsys, "logs"), ctx.logs)
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
return ctx
}
type dbMetricsHandles struct {
// Maximum number of open connections to the sql
maxOpen metrics.Gauge
// The number of established connections both in use and idle
open metrics.Gauge
// The number of connections currently in use
inUse metrics.Gauge
// The number of idle connections
idle metrics.Gauge
// The total number of connections waited for
waitedFor metrics.Counter
// The total time blocked waiting for a new connection
blockedMilliseconds metrics.Counter
// The total number of connections closed due to SetMaxIdleConns
closedMaxIdle metrics.Counter
// The total number of connections closed due to SetConnMaxLifetime
closedMaxLifetime metrics.Counter
}
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
ctx := dbMetricsHandles{
maxOpen: metrics.NewGauge(),
open: metrics.NewGauge(),
inUse: metrics.NewGauge(),
idle: metrics.NewGauge(),
waitedFor: metrics.NewCounter(),
blockedMilliseconds: metrics.NewCounter(),
closedMaxIdle: metrics.NewCounter(),
closedMaxLifetime: metrics.NewCounter(),
}
subsys := "connections"
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
reg.Register(metricName(subsys, "open"), ctx.open)
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
reg.Register(metricName(subsys, "idle"), ctx.idle)
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
return ctx
}
func (met *dbMetricsHandles) Update(stats Stats) {
met.maxOpen.Update(stats.MaxOpen())
met.open.Update(stats.Open())
met.inUse.Update(stats.InUse())
met.idle.Update(stats.Idle())
met.waitedFor.Inc(stats.WaitCount())
met.blockedMilliseconds.Inc(stats.WaitDuration().Milliseconds())
met.closedMaxIdle.Inc(stats.MaxIdleClosed())
met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed())
}

View File

@ -28,7 +28,7 @@ import (
)
func setupLegacyPGXIndexer(t *testing.T) {
db, err = postgres.SetupPGXDB()
db, err = postgres.SetupPGXDB(postgres.DefaultConfig)
if err != nil {
t.Fatal(err)
}

View File

@ -29,8 +29,8 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/test"
)
func setupPGXIndexer(t *testing.T) {
db, err = postgres.SetupPGXDB()
func setupPGXIndexer(t *testing.T, config postgres.Config) {
db, err = postgres.SetupPGXDB(config)
if err != nil {
t.Fatal(err)
}
@ -39,12 +39,16 @@ func setupPGXIndexer(t *testing.T) {
}
func setupPGX(t *testing.T) {
setupPGXIndexer(t)
setupPGXWithConfig(t, postgres.DefaultConfig)
}
func setupPGXWithConfig(t *testing.T, config postgres.Config) {
setupPGXIndexer(t, config)
test.SetupTestData(t, ind)
}
func setupPGXNonCanonical(t *testing.T) {
setupPGXIndexer(t)
setupPGXIndexer(t, postgres.DefaultConfig)
test.SetupTestDataNonCanonical(t, ind)
}
@ -97,6 +101,20 @@ func TestPGXIndexer(t *testing.T) {
test.TestPublishAndIndexStorageIPLDs(t, db)
})
t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) {
config := postgres.DefaultConfig
config.CopyFrom = true
setupPGXWithConfig(t, config)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
test.TestPublishAndIndexStateIPLDs(t, db)
test.TestPublishAndIndexStorageIPLDs(t, db)
test.TestPublishAndIndexReceiptIPLDs(t, db)
test.TestPublishAndIndexLogIPLDs(t, db)
})
}
// Test indexer for a canonical + a non-canonical block at London height + a non-canonical block at London height + 1
@ -151,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) {
}
func TestPGXWatchAddressMethods(t *testing.T) {
setupPGXIndexer(t)
setupPGXIndexer(t, postgres.DefaultConfig)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)

View File

@ -92,6 +92,9 @@ type Config struct {
// toggle on/off upserts
Upsert bool
// toggle on/off CopyFrom
CopyFrom bool
}
// Type satisfies interfaces.Config

View File

@ -84,3 +84,43 @@ func (db *DB) InsertIPLDStm() string {
func (db *DB) InsertIPLDsStm() string {
return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING`
}
func (db *DB) LogTableName() []string {
return []string{"eth", "log_cids"}
}
func (db *DB) LogColumnNames() []string {
return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"}
}
func (db *DB) RctTableName() []string {
return []string{"eth", "receipt_cids"}
}
func (db *DB) RctColumnNames() []string {
return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"}
}
func (db *DB) StateTableName() []string {
return []string{"eth", "state_cids"}
}
func (db *DB) StateColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"}
}
func (db *DB) StorageTableName() []string {
return []string{"eth", "storage_cids"}
}
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"}
}
func (db *DB) TxTableName() []string {
return []string{"eth", "transaction_cids"}
}
func (db *DB) TxColumnNames() []string {
return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"}
}

View File

@ -27,6 +27,7 @@ import (
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
)
@ -37,6 +38,7 @@ type PGXDriver struct {
pool *pgxpool.Pool
nodeInfo node.Info
nodeID string
config Config
}
// ConnectPGX initializes and returns a PGX connection pool
@ -55,7 +57,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive
if err != nil {
return nil, ErrDBConnectionFailed(err)
}
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node}
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config}
nodeErr := pg.createNode()
if nodeErr != nil {
return &PGXDriver{}, ErrUnableToSetNode(nodeErr)
@ -144,7 +146,7 @@ func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) {
return pgxTxWrapper{tx: tx}, nil
}
func (pgx *PGXDriver) Stats() sql.Stats {
func (pgx *PGXDriver) Stats() metrics.DbStats {
stats := pgx.pool.Stat()
return pgxStatsWrapper{stats: stats}
}
@ -165,6 +167,11 @@ func (pgx *PGXDriver) Context() context.Context {
return pgx.ctx
}
// HasCopy satisfies sql.Database
func (pgx *PGXDriver) UseCopyFrom() bool {
return pgx.config.CopyFrom
}
type resultWrapper struct {
ct pgconn.CommandTag
}
@ -178,43 +185,43 @@ type pgxStatsWrapper struct {
stats *pgxpool.Stat
}
// MaxOpen satisfies sql.Stats
// MaxOpen satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxOpen() int64 {
return int64(s.stats.MaxConns())
}
// Open satisfies sql.Stats
// Open satisfies metrics.DbStats
func (s pgxStatsWrapper) Open() int64 {
return int64(s.stats.TotalConns())
}
// InUse satisfies sql.Stats
// InUse satisfies metrics.DbStats
func (s pgxStatsWrapper) InUse() int64 {
return int64(s.stats.AcquiredConns())
}
// Idle satisfies sql.Stats
// Idle satisfies metrics.DbStats
func (s pgxStatsWrapper) Idle() int64 {
return int64(s.stats.IdleConns())
}
// WaitCount satisfies sql.Stats
// WaitCount satisfies metrics.DbStats
func (s pgxStatsWrapper) WaitCount() int64 {
return s.stats.EmptyAcquireCount()
}
// WaitDuration satisfies sql.Stats
// WaitDuration satisfies metrics.DbStats
func (s pgxStatsWrapper) WaitDuration() time.Duration {
return s.stats.AcquireDuration()
}
// MaxIdleClosed satisfies sql.Stats
// MaxIdleClosed satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxIdleClosed() int64 {
// this stat isn't supported by pgxpool, but we don't want to panic
return 0
}
// MaxLifetimeClosed satisfies sql.Stats
// MaxLifetimeClosed satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxLifetimeClosed() int64 {
return s.stats.CanceledAcquireCount()
}
@ -243,3 +250,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error {
func (t pgxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback(ctx)
}
func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows))
}

View File

@ -19,10 +19,12 @@ package postgres
import (
"context"
coresql "database/sql"
"errors"
"time"
"github.com/jmoiron/sqlx"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
)
@ -109,7 +111,7 @@ func (driver *SQLXDriver) Begin(_ context.Context) (sql.Tx, error) {
return sqlxTxWrapper{tx: tx}, nil
}
func (driver *SQLXDriver) Stats() sql.Stats {
func (driver *SQLXDriver) Stats() metrics.DbStats {
stats := driver.db.Stats()
return sqlxStatsWrapper{stats: stats}
}
@ -129,46 +131,52 @@ func (driver *SQLXDriver) Context() context.Context {
return driver.ctx
}
// HasCopy satisfies sql.Database
func (driver *SQLXDriver) UseCopyFrom() bool {
// sqlx does not currently support COPY.
return false
}
type sqlxStatsWrapper struct {
stats coresql.DBStats
}
// MaxOpen satisfies sql.Stats
// MaxOpen satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxOpen() int64 {
return int64(s.stats.MaxOpenConnections)
}
// Open satisfies sql.Stats
// Open satisfies metrics.DbStats
func (s sqlxStatsWrapper) Open() int64 {
return int64(s.stats.OpenConnections)
}
// InUse satisfies sql.Stats
// InUse satisfies metrics.DbStats
func (s sqlxStatsWrapper) InUse() int64 {
return int64(s.stats.InUse)
}
// Idle satisfies sql.Stats
// Idle satisfies metrics.DbStats
func (s sqlxStatsWrapper) Idle() int64 {
return int64(s.stats.Idle)
}
// WaitCount satisfies sql.Stats
// WaitCount satisfies metrics.DbStats
func (s sqlxStatsWrapper) WaitCount() int64 {
return s.stats.WaitCount
}
// WaitDuration satisfies sql.Stats
// WaitDuration satisfies metrics.DbStats
func (s sqlxStatsWrapper) WaitDuration() time.Duration {
return s.stats.WaitDuration
}
// MaxIdleClosed satisfies sql.Stats
// MaxIdleClosed satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxIdleClosed() int64 {
return s.stats.MaxIdleClosed
}
// MaxLifetimeClosed satisfies sql.Stats
// MaxLifetimeClosed satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxLifetimeClosed() int64 {
return s.stats.MaxLifetimeClosed
}
@ -196,3 +204,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error {
func (t sqlxTxWrapper) Rollback(ctx context.Context) error {
return t.tx.Rollback()
}
func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
return 0, errors.New("Unsupported Operation")
}

View File

@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) {
}
// SetupPGXDB is used to setup a pgx db for tests
func SetupPGXDB() (sql.Database, error) {
driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{})
func SetupPGXDB(config Config) (sql.Database, error) {
driver, err := NewPGXDriver(context.Background(), config, node.Info{})
if err != nil {
return nil, err
}

View File

@ -18,9 +18,11 @@ package sql
import (
"fmt"
"strconv"
"github.com/lib/pq"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
)
@ -66,7 +68,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
if err != nil {
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
}
indexerMetrics.blocks.Inc(1)
metrics.IndexerMetrics.BlocksCounter.Inc(1)
return nil
}
@ -94,20 +96,39 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr
ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
transaction.BlockNumber,
transaction.HeaderID,
transaction.TxHash,
transaction.CID,
transaction.Dst,
transaction.Src,
transaction.Index,
transaction.Type,
transaction.Value)
if err != nil {
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
value, err := strconv.ParseFloat(transaction.Value, 64)
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(),
toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, int(transaction.Type), value)))
if err != nil {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
transaction.BlockNumber,
transaction.HeaderID,
transaction.TxHash,
transaction.CID,
transaction.Dst,
transaction.Src,
transaction.Index,
transaction.Type,
transaction.Value)
if err != nil {
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
}
}
indexerMetrics.transactions.Inc(1)
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
return nil
}
@ -116,18 +137,32 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, cid, contract, pos
ON CONFLICT (tx_id, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
rct.BlockNumber,
rct.HeaderID,
rct.TxID,
rct.CID,
rct.Contract,
rct.PostState,
rct.PostStatus)
if err != nil {
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.receipt_cids", err, "COPY", rct}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(),
toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, int(rct.PostStatus))))
if err != nil {
return insertError{"eth.receipt_cids", err, "COPY", rct}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
rct.BlockNumber,
rct.HeaderID,
rct.TxID,
rct.CID,
rct.Contract,
rct.PostState,
rct.PostStatus)
if err != nil {
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
}
}
indexerMetrics.receipts.Inc(1)
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
return nil
}
@ -136,22 +171,42 @@ INSERT INTO eth.log_cids (block_number, header_id, cid, rct_id, address, index,
ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING
*/
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
for _, log := range logs {
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
log.BlockNumber,
log.HeaderID,
log.CID,
log.ReceiptID,
log.Address,
log.Index,
log.Topic0,
log.Topic1,
log.Topic2,
log.Topic3)
if err != nil {
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
if w.useCopyForTx(tx) {
var rows [][]interface{}
for _, log := range logs {
blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.log_cids", err, "COPY", log}
}
rows = append(rows, toRow(blockNum, log.HeaderID, log.CID, log.ReceiptID,
log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3))
}
if nil != rows && len(rows) >= 0 {
_, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows)
if err != nil {
return insertError{"eth.log_cids", err, "COPY", rows}
}
metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows)))
}
} else {
for _, log := range logs {
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
log.BlockNumber,
log.HeaderID,
log.CID,
log.ReceiptID,
log.Address,
log.Index,
log.Topic0,
log.Topic1,
log.Topic2,
log.Topic3)
if err != nil {
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
}
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
indexerMetrics.logs.Inc(1)
}
return nil
}
@ -165,20 +220,39 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
if stateNode.Removed {
balance = "0"
}
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
stateNode.BlockNumber,
stateNode.HeaderID,
stateNode.StateKey,
stateNode.CID,
true,
balance,
stateNode.Nonce,
stateNode.CodeHash,
stateNode.StorageRoot,
stateNode.Removed,
)
if err != nil {
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.state_cids", err, "COPY", stateNode}
}
balInt, err := strconv.ParseInt(balance, 10, 64)
if err != nil {
return insertError{"eth.state_cids", err, "COPY", stateNode}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(),
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
true, balInt, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
if err != nil {
return insertError{"eth.state_cids", err, "COPY", stateNode}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
stateNode.BlockNumber,
stateNode.HeaderID,
stateNode.StateKey,
stateNode.CID,
true,
balance,
stateNode.Nonce,
stateNode.CodeHash,
stateNode.StorageRoot,
stateNode.Removed,
)
if err != nil {
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
}
}
return nil
}
@ -188,22 +262,57 @@ INSERT INTO eth.storage_cids (block_number, header_id, state_leaf_key, storage_l
ON CONFLICT (header_id, state_leaf_key, storage_leaf_key, block_number) DO NOTHING
*/
func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
storageCID.BlockNumber,
storageCID.HeaderID,
storageCID.StateKey,
storageCID.StorageKey,
storageCID.CID,
true,
storageCID.Value,
storageCID.Removed,
)
if err != nil {
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.storage_cids", err, "COPY", storageCID}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(),
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed)))
if err != nil {
return insertError{"eth.storage_cids", err, "COPY", storageCID}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
storageCID.BlockNumber,
storageCID.HeaderID,
storageCID.StateKey,
storageCID.StorageKey,
storageCID.CID,
true,
storageCID.Value,
storageCID.Removed,
)
if err != nil {
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
}
}
return nil
}
func (w *Writer) useCopyForTx(tx Tx) bool {
// Using COPY instead of INSERT only makes much sense if also using a DelayedTx, so that operations
// can be collected over time and then all submitted within in a single TX.
if _, ok := tx.(*DelayedTx); ok {
return w.db.UseCopyFrom()
}
return false
}
// combine args into a row
func toRow(args ...interface{}) []interface{} {
var row []interface{}
row = append(row, args...)
return row
}
// combine row (or rows) into a slice of rows for CopyFrom
func toRows(rows ...[]interface{}) [][]interface{} {
return rows
}
type insertError struct {
table string
err error

View File

@ -26,8 +26,6 @@ import (
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
@ -43,9 +41,11 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
ind "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
types2 "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"
"github.com/thoas/go-funk"
)
@ -794,15 +794,23 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
}
output := func(node types2.StateLeafNode) error {
defer func() {
// This is very noisy so we log at Trace.
since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
logger.Trace(fmt.Sprintf("statediff output duration=%dms", since.Milliseconds()))
}()
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
ipldOutput := func(c types2.IPLD) error {
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, metrics.IndexerMetrics.IPLDOutputTimer)
return sds.indexer.PushIPLD(tx, c)
}
err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
err = sds.Builder.WriteStateDiffObject(Args{
NewStateRoot: block.Root(),
OldStateRoot: parentRoot,
BlockHash: block.Hash(),
BlockNumber: block.Number(),
}, params, output, ipldOutput)
// TODO this anti-pattern needs to be sorted out eventually
if err := tx.Submit(err); err != nil {

View File

@ -16,7 +16,6 @@
package statediff_test
/*
import (
"bytes"
"math/big"
@ -438,4 +437,3 @@ func testGetSyncStatus(t *testing.T) {
}
}
}
*/

View File

@ -44,8 +44,8 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi
}
// BuildStateDiffObject mock method
func (builder *Builder) WriteStateDiffObject(args sdtypes.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
builder.StateRoots = args
func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}
builder.Params = params
return builder.builderError

View File

@ -22,12 +22,16 @@ package trie_helpers
import (
"sort"
"strings"
"time"
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/types"
)
// SortKeys sorts the keys in the account map
func SortKeys(data types.AccountMap) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer)
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
@ -41,6 +45,7 @@ func SortKeys(data types.AccountMap) []string {
// a and b must first be sorted
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
func FindIntersection(a, b []string) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer)
lenA := len(a)
lenB := len(b)
iOfA, iOfB := 0, 0

View File

@ -20,6 +20,9 @@ import (
"bytes"
"container/heap"
"errors"
"time"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
@ -584,6 +587,7 @@ func (it *differenceIterator) AddResolver(resolver ethdb.KeyValueReader) {
}
func (it *differenceIterator) Next(bool) bool {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DifferenceIteratorNextTimer)
// Invariants:
// - We always advance at least one element in b.
// - At the start of this function, a's path is lexically greater than b's.