diff --git a/builder.go b/builder.go index 45b56ee..fc5aed3 100644 --- a/builder.go +++ b/builder.go @@ -21,14 +21,18 @@ package statediff import ( "bytes" + "context" "fmt" + "sync" "time" + iterutils "github.com/cerc-io/eth-iterator-utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" + "golang.org/x/sync/errgroup" "github.com/cerc-io/plugeth-statediff/adapt" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics" @@ -44,6 +48,8 @@ var ( emptyContractRoot = crypto.Keccak256Hash(emptyNode) nullCodeHash = crypto.Keccak256([]byte{}) zeroHash common.Hash + + defaultSubtrieWorkers uint = 1 ) // Builder interface exposes the method for building a state diff between two blocks @@ -54,11 +60,8 @@ type Builder interface { type StateDiffBuilder struct { // state cache is safe for concurrent reads - stateCache adapt.StateView -} - -type iterPair struct { - Older, Newer trie.NodeIterator + stateCache adapt.StateView + subtrieWorkers uint } type accountUpdate struct { @@ -74,19 +77,39 @@ func appender[T any](to *[]T) func(T) error { } } -// NewBuilder is used to create a statediff builder -func NewBuilder(stateCache adapt.StateView) Builder { - return &StateDiffBuilder{ - stateCache: stateCache, +func syncedAppender[T any](to *[]T) func(T) error { + var mtx sync.Mutex + return func(a T) error { + mtx.Lock() + *to = append(*to, a) + mtx.Unlock() + return nil } } +// NewBuilder is used to create a statediff builder +func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder { + return &StateDiffBuilder{ + stateCache: stateCache, + subtrieWorkers: defaultSubtrieWorkers, + } +} + +// SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers. +// Passing 0 will reset this to the default value. +func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) { + if n == 0 { + n = defaultSubtrieWorkers + } + sdb.subtrieWorkers = n +} + // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer) var stateNodes []sdtypes.StateLeafNode var iplds []sdtypes.IPLD - err := sdb.WriteStateDiff(args, params, appender(&stateNodes), appender(&iplds)) + err := sdb.WriteStateDiff(args, params, syncedAppender(&stateNodes), syncedAppender(&iplds)) if err != nil { return sdtypes.StateObject{}, err } @@ -106,35 +129,39 @@ func (sdb *StateDiffBuilder) WriteStateDiff( ) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer) // Load tries for old and new states - oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot) + triea, err := sdb.stateCache.OpenTrie(args.OldStateRoot) if err != nil { return fmt.Errorf("error opening old state trie: %w", err) } - newTrie, err := sdb.stateCache.OpenTrie(args.NewStateRoot) + trieb, err := sdb.stateCache.OpenTrie(args.NewStateRoot) if err != nil { return fmt.Errorf("error opening new state trie: %w", err) } + subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers)) + subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) - iters := iterPair{ - Older: oldTrie.NodeIterator(nil), - Newer: newTrie.NodeIterator(nil), - } logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) - - err = sdb.processAccounts( - iters.Older, iters.Newer, - params.watchedAddressesLeafPaths, - nodeSink, ipldSink, logger) - if err != nil { - return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err) + // errgroup will cancel if any gr fails + g, ctx := errgroup.WithContext(context.Background()) + for i := uint(0); i < sdb.subtrieWorkers; i++ { + func(subdiv uint) { + g.Go(func() error { + a, b := subitersA[subdiv], subitersB[subdiv] + return sdb.processAccounts(ctx, + a, b, params.watchedAddressesLeafPaths, + nodeSink, ipldSink, logger, + ) + }) + }(i) } - return nil + return g.Wait() } // processAccounts processes account creations and deletions, and returns a set of updated // existing accounts, indexed by leaf key. -func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator, - watchedAddressesLeafPaths [][]byte, +func (sdb *StateDiffBuilder) processAccounts( + ctx context.Context, + a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, logger log.Logger, ) error { @@ -146,7 +173,14 @@ func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator, // Cache the RLP of the previous node. When we hit a value node this will be the parent blob. var prevBlob []byte it, itCount := utils.NewSymmetricDifferenceIterator(a, b) + prevBlob = it.NodeBlob() for it.Next(true) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // ignore node if it is not along paths of interest if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { continue @@ -205,6 +239,7 @@ func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator, if it.Hash() == zeroHash { continue } + // TODO - this can be handled when value node is (craeted?) nodeVal := make([]byte, len(it.NodeBlob())) copy(nodeVal, it.NodeBlob()) // if doing a selective diff, we need to ensure this is a watched path diff --git a/builder_test.go b/builder_test.go index e17be82..68e7ea2 100644 --- a/builder_test.go +++ b/builder_test.go @@ -795,13 +795,13 @@ func TestBuilder(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), - tests, params, test_helpers.CheckedRoots{ - block0: bankAccountAtBlock0LeafNode, - block1: block1BranchRootNode, - block2: block2BranchRootNode, - block3: block3BranchRootNode, - }) + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ + block0: bankAccountAtBlock0LeafNode, + block1: block1BranchRootNode, + block2: block2BranchRootNode, + block3: block3BranchRootNode, + }.Check(t) } func TestBuilderWithWatchedAddressList(t *testing.T) { @@ -1009,12 +1009,13 @@ func TestBuilderWithWatchedAddressList(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{ + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ block0: bankAccountAtBlock0LeafNode, block1: block1BranchRootNode, block2: block2BranchRootNode, block3: block3BranchRootNode, - }) + }.Check(t) } func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { @@ -1259,11 +1260,12 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{ + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ block4: block4BranchRootNode, block5: block5BranchRootNode, block6: block6BranchRootNode, - }) + }.Check(t) } func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) { @@ -1393,11 +1395,12 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{ + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ block4: block4BranchRootNode, block5: block5BranchRootNode, block6: block6BranchRootNode, - }) + }.Check(t) } func TestBuilderWithRemovedWatchedAccount(t *testing.T) { @@ -1596,11 +1599,12 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{ + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ block4: block4BranchRootNode, block5: block5BranchRootNode, block6: block6BranchRootNode, - }) + }.Check(t) } var ( @@ -1823,10 +1827,11 @@ func TestBuilderWithMovedAccount(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{ + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ block1: block01BranchRootNode, block2: bankAccountAtBlock02LeafNode, - }) + }.Check(t) } /* @@ -2349,11 +2354,12 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{ + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ block1: block1bBranchRootNode, block2: block2bBranchRootNode, block3: block3bBranchRootNode, - }) + }.Check(t) } func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) { @@ -2550,11 +2556,12 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{ + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ block1: block1bBranchRootNode, block2: block2bBranchRootNode, block3: block3bBranchRootNode, - }) + }.Check(t) } /* diff --git a/config.go b/config.go index 4b8246a..ac16f74 100644 --- a/config.go +++ b/config.go @@ -44,6 +44,8 @@ type Config struct { BackfillCheckPastBlocks uint64 // Size of the worker pool NumWorkers uint + // Number of subtries to iterate in parallel + SubtrieWorkers uint // Should the statediff service wait until geth has synced to the head of the blockchain? WaitForSync bool // Context used during DB initialization diff --git a/go.mod b/go.mod index 851f888..ffb599b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cerc-io/plugeth-statediff go 1.19 require ( + github.com/cerc-io/eth-iterator-utils v0.1.1 github.com/cerc-io/eth-testing v0.2.1 github.com/ethereum/go-ethereum v1.11.6 github.com/georgysavva/scany v0.2.9 @@ -18,8 +19,9 @@ require ( github.com/openrelayxyz/plugeth-utils v1.2.0 github.com/pganalyze/pg_query_go/v4 v4.2.1 github.com/shopspring/decimal v1.2.0 - github.com/stretchr/testify v1.8.1 - github.com/thoas/go-funk v0.9.2 + github.com/stretchr/testify v1.8.2 + github.com/thoas/go-funk v0.9.3 + golang.org/x/sync v0.1.0 ) require ( @@ -109,7 +111,6 @@ require ( golang.org/x/crypto v0.1.0 // indirect golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect golang.org/x/net v0.8.0 // indirect - golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.6.0 // indirect golang.org/x/term v0.6.0 // indirect golang.org/x/text v0.8.0 // indirect @@ -123,6 +124,7 @@ require ( ) replace ( + github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.1.1 github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1 github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 diff --git a/go.sum b/go.sum index 29dc9cc..0cd8482 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +git.vdb.to/cerc-io/eth-iterator-utils v0.1.1 h1:AGen4U2GaYJVzPjEo3U+GPczSfOUEMkM1nWTM+cq5Dk= +git.vdb.to/cerc-io/eth-iterator-utils v0.1.1/go.mod h1:uiocO9elfDe78kd3c/VZ2in26V+gyXJuN+sdTxK4Xag= git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ= git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk= git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g= @@ -493,12 +495,12 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= -github.com/thoas/go-funk v0.9.2 h1:oKlNYv0AY5nyf9g+/GhMgS/UO2ces0QRdPKwkhY3VCk= -github.com/thoas/go-funk v0.9.2/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= +github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= +github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4= github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI= github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA= diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index 85d6958..1177b54 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -57,7 +57,7 @@ type StateDiffIndexer struct { chainConfig *params.ChainConfig nodeID string wg *sync.WaitGroup - removedCacheFlag *uint32 + removedCacheFlag uint32 } // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer @@ -130,7 +130,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - sdi.removedCacheFlag = new(uint32) start, t := time.Now(), time.Now() blockHash := block.Hash() blockHashStr := blockHash.String() @@ -223,11 +222,6 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string { sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode) - var baseFee *string - if header.BaseFee != nil { - baseFee = new(string) - *baseFee = header.BaseFee.String() - } headerID := header.Hash().String() sdi.fileWriter.upsertHeaderCID(models.HeaderModel{ NodeIDs: pq.StringArray([]string{sdi.nodeID}), @@ -388,8 +382,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // publish the state node var stateModel models.StateNodeModel if stateNode.Removed { - if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { - atomic.StoreUint32(sdi.removedCacheFlag, 1) + if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 { + atomic.StoreUint32(&sdi.removedCacheFlag, 1) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{}) } stateModel = models.StateNodeModel{ @@ -419,8 +413,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // if there are any storage nodes associated with this node, publish and index them for _, storageNode := range stateNode.StorageDiff { if storageNode.Removed { - if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { - atomic.StoreUint32(sdi.removedCacheFlag, 1) + if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 { + atomic.StoreUint32(&sdi.removedCacheFlag, 1) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{}) } storageModel := models.StorageNodeModel{ diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index 2fc2ae5..d41f3d6 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -84,9 +84,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { start, t := time.Now(), time.Now() blockHash := block.Hash() - blockHashStr := blockHash.String() height := block.NumberU64() - traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr) + traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash) transactions := block.Transactions() // Derive any missing fields if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil { @@ -155,20 +154,20 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } else { tDiff := time.Since(t) metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) + traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff) t = time.Now() if err := self.flush(); err != nil { rollback(sdi.ctx, tx) - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) + traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start)) log.Debug(traceMsg) return err } err = tx.Commit(sdi.ctx) tDiff = time.Since(t) metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff) - traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) + traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff) } - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) + traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start)) log.Debug(traceMsg) return err }, @@ -178,7 +177,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip tDiff := time.Since(t) metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff) - traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) + traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff) t = time.Now() // Publish and index header, collect headerID @@ -189,7 +188,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } tDiff = time.Since(t) metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) + traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff) t = time.Now() // Publish and index uncles err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles()) @@ -198,7 +197,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } tDiff = time.Since(t) metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) + traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff) t = time.Now() // Publish and index receipts and txs err = sdi.processReceiptsAndTxs(blockTx, processArgs{ @@ -215,7 +214,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } tDiff = time.Since(t) metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) + traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff) t = time.Now() return blockTx, err @@ -236,11 +235,6 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) - var baseFee *string - if header.BaseFee != nil { - baseFee = new(string) - *baseFee = header.BaseFee.String() - } headerID := header.Hash().String() // index header return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{ diff --git a/indexer/database/sql/interfaces.go b/indexer/database/sql/interfaces.go index db13ac7..845f603 100644 --- a/indexer/database/sql/interfaces.go +++ b/indexer/database/sql/interfaces.go @@ -58,18 +58,6 @@ type Statements interface { InsertStorageStm() string InsertIPLDStm() string InsertIPLDsStm() string - - // Table/column descriptions for use with CopyFrom and similar commands. - LogTableName() []string - LogColumnNames() []string - RctTableName() []string - RctColumnNames() []string - StateTableName() []string - StateColumnNames() []string - StorageTableName() []string - StorageColumnNames() []string - TxTableName() []string - TxColumnNames() []string } // Tx interface to accommodate different concrete SQL transaction types diff --git a/indexer/database/sql/postgres/database.go b/indexer/database/sql/postgres/database.go index ecebd7d..f73b882 100644 --- a/indexer/database/sql/postgres/database.go +++ b/indexer/database/sql/postgres/database.go @@ -107,43 +107,3 @@ func (db *DB) InsertIPLDStm() string { func (db *DB) InsertIPLDsStm() string { return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING` } - -func (db *DB) LogTableName() []string { - return []string{"eth", "log_cids"} -} - -func (db *DB) LogColumnNames() []string { - return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"} -} - -func (db *DB) RctTableName() []string { - return []string{"eth", "receipt_cids"} -} - -func (db *DB) RctColumnNames() []string { - return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"} -} - -func (db *DB) StateTableName() []string { - return []string{"eth", "state_cids"} -} - -func (db *DB) StateColumnNames() []string { - return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"} -} - -func (db *DB) StorageTableName() []string { - return []string{"eth", "storage_cids"} -} - -func (db *DB) StorageColumnNames() []string { - return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"} -} - -func (db *DB) TxTableName() []string { - return []string{"eth", "transaction_cids"} -} - -func (db *DB) TxColumnNames() []string { - return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"} -} diff --git a/indexer/database/sql/postgres/pgx_test.go b/indexer/database/sql/postgres/pgx_test.go index 5dff430..7b01502 100644 --- a/indexer/database/sql/postgres/pgx_test.go +++ b/indexer/database/sql/postgres/pgx_test.go @@ -36,12 +36,6 @@ var ( ctx = context.Background() ) -func expectContainsSubstring(t *testing.T, full string, sub string) { - if !strings.Contains(full, sub) { - t.Fatalf("Expected \"%v\" to contain substring \"%v\"\n", full, sub) - } -} - func TestPostgresPGX(t *testing.T) { t.Run("connects to the sql", func(t *testing.T) { dbPool, err := pgxpool.ConnectConfig(context.Background(), pgxConfig) @@ -105,7 +99,7 @@ func TestPostgresPGX(t *testing.T) { t.Fatal("Expected an error") } - expectContainsSubstring(t, err.Error(), postgres.DbConnectionFailedMsg) + require.Contains(t, err.Error(), postgres.DbConnectionFailedMsg) }) t.Run("throws error when can't create node", func(t *testing.T) { @@ -117,6 +111,6 @@ func TestPostgresPGX(t *testing.T) { t.Fatal("Expected an error") } - expectContainsSubstring(t, err.Error(), postgres.SettingNodeFailedMsg) + require.Contains(t, err.Error(), postgres.SettingNodeFailedMsg) }) } diff --git a/indexer/database/sql/postgres/sqlx_test.go b/indexer/database/sql/postgres/sqlx_test.go index 28f4473..f9bd2bd 100644 --- a/indexer/database/sql/postgres/sqlx_test.go +++ b/indexer/database/sql/postgres/sqlx_test.go @@ -102,7 +102,7 @@ func TestPostgresSQLX(t *testing.T) { t.Fatal("Expected an error") } - expectContainsSubstring(t, err.Error(), postgres.DbConnectionFailedMsg) + require.Contains(t, err.Error(), postgres.DbConnectionFailedMsg) }) t.Run("throws error when can't create node", func(t *testing.T) { @@ -114,6 +114,6 @@ func TestPostgresSQLX(t *testing.T) { t.Fatal("Expected an error") } - expectContainsSubstring(t, err.Error(), postgres.SettingNodeFailedMsg) + require.Contains(t, err.Error(), postgres.SettingNodeFailedMsg) }) } diff --git a/indexer/database/sql/writer.go b/indexer/database/sql/writer.go index 6d6dd31..e5e0c81 100644 --- a/indexer/database/sql/writer.go +++ b/indexer/database/sql/writer.go @@ -29,6 +29,7 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/models" + "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" ) // Writer handles processing and writing of indexed IPLD objects to Postgres @@ -65,7 +66,8 @@ func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]* // pgx misdetects the parameter OIDs and selects int8, which can overflow. // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text // and let PG handle the cast - err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) + err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), + strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) return gaps, err } @@ -177,7 +179,8 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { return insertError{"eth.transaction_cids", err, "COPY", transaction} } - _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), + _, err = tx.CopyFrom(w.db.Context(), + schema.TableTransaction.TableName(), schema.TableTransaction.ColumnNames(), toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, int(transaction.Type), value))) if err != nil { @@ -213,7 +216,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { return insertError{"eth.receipt_cids", err, "COPY", rct} } - _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), + _, err = tx.CopyFrom(w.db.Context(), schema.TableReceipt.TableName(), schema.TableReceipt.ColumnNames(), toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, int(rct.PostStatus)))) if err != nil { @@ -253,7 +256,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3)) } if nil != rows && len(rows) >= 0 { - _, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) + _, err := tx.CopyFrom(w.db.Context(), schema.TableLog.TableName(), schema.TableLog.ColumnNames(), rows) if err != nil { return insertError{"eth.log_cids", err, "COPY", rows} } @@ -302,7 +305,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { return insertError{"eth.state_cids", err, "COPY", stateNode} } - _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), + _, err = tx.CopyFrom(w.db.Context(), + schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(), toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) if err != nil { @@ -339,7 +343,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err return insertError{"eth.storage_cids", err, "COPY", storageCID} } - _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), + _, err = tx.CopyFrom(w.db.Context(), + schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(), toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, true, storageCID.Value, storageCID.Removed))) if err != nil { diff --git a/indexer/shared/schema/table.go b/indexer/shared/schema/table.go index 9bc19ac..bf6968e 100644 --- a/indexer/shared/schema/table.go +++ b/indexer/shared/schema/table.go @@ -45,6 +45,7 @@ type Column struct { Type colType Array bool } + type Table struct { Name string Columns []Column @@ -117,6 +118,20 @@ func (tbl *Table) ToInsertStatement(upsert bool) string { ) } +// TableName returns a pgx-compatible table name. +func (tbl *Table) TableName() []string { + return strings.Split(tbl.Name, ".") +} + +// ColumnNames returns the ordered list of column names. +func (tbl *Table) ColumnNames() []string { + var names []string + for _, col := range tbl.Columns { + names = append(names, col.Name) + } + return names +} + func sprintf(f string) colfmt { return func(x interface{}) string { return fmt.Sprintf(f, x) } } diff --git a/main/flags.go b/main/flags.go index fa2a723..c1f5c1a 100644 --- a/main/flags.go +++ b/main/flags.go @@ -46,7 +46,11 @@ func init() { ) Flags.UintVar(&config.NumWorkers, "statediff.workers", 1, - "Number of concurrent workers to use during statediff processing (default 1)", + "Number of concurrent workers to dispatch to during statediff processing", + ) + Flags.UintVar(&config.SubtrieWorkers, + "statediff.subtries", 1, + "Number of subtries to iterate in parallel", ) Flags.BoolVar(&config.WaitForSync, "statediff.waitforsync", false, diff --git a/mainnet_tests/builder_test.go b/mainnet_tests/builder_test.go index 11c60fe..e428643 100644 --- a/mainnet_tests/builder_test.go +++ b/mainnet_tests/builder_test.go @@ -566,7 +566,9 @@ func TestBuilderOnMainnetBlocks(t *testing.T) { }, StorageDiff: emptyStorage, }, - { // this is the new account created due to the coinbase mining a block, it's creation shouldn't affect 0x 0e 05 07 + { + // this is the new account created due to the coinbase mining a block, its + // creation shouldn't affect 0x 0e 05 07 Removed: false, AccountWrapper: sdtypes.AccountWrapper{ Account: block3CoinbaseAccount, @@ -622,11 +624,10 @@ func TestBuilderOnMainnetBlocks(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, - chain.StateCache(), - tests, params, test_helpers.CheckedRoots{ - block1: block1RootBranchNode, - block2: block2RootBranchNode, - block3: block3RootBranchNode, - }) + test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.CheckedRoots{ + block1: block1RootBranchNode, + block2: block2RootBranchNode, + block3: block3RootBranchNode, + }.Check(t) } diff --git a/service.go b/service.go index fe1ac69..7cc7f80 100644 --- a/service.go +++ b/service.go @@ -166,10 +166,12 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde workers = 1 } + builder := NewBuilder(blockChain.StateCache()) + builder.SetSubtrieWorkers(cfg.SubtrieWorkers) quitCh := make(chan bool) sds := &Service{ BlockChain: blockChain, - Builder: NewBuilder(blockChain.StateCache()), + Builder: builder, QuitChan: quitCh, Subscriptions: make(map[common.Hash]map[SubID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), @@ -785,6 +787,10 @@ func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { + if sds.indexer == nil { + return fmt.Errorf("indexer is not set; cannot write indexed diffs") + } + log := log.New("hash", block.Hash(), "number", block.Number()) if granted, relinquish := sds.claimExclusiveAccess(block); granted { defer relinquish() @@ -804,9 +810,6 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p start := countStateDiffBegin(block, log) defer countStateDiffEnd(start, log, &err) - if sds.indexer == nil { - return fmt.Errorf("indexer is not set; cannot write indexed diffs") - } if params.IncludeTD { totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64()) @@ -819,14 +822,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p return err } + var nodeMtx, ipldMtx sync.Mutex nodeSink := func(node types2.StateLeafNode) error { - defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log, - metrics.IndexerMetrics.OutputTimer) + defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer) + nodeMtx.Lock() + defer nodeMtx.Unlock() return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } ipldSink := func(c types2.IPLD) error { - defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log, - metrics.IndexerMetrics.IPLDOutputTimer) + defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.IPLDOutputTimer) + ipldMtx.Lock() + defer ipldMtx.Unlock() return sds.indexer.PushIPLD(tx, c) } diff --git a/test_helpers/builder.go b/test_helpers/builder.go index f0d62be..9a82851 100644 --- a/test_helpers/builder.go +++ b/test_helpers/builder.go @@ -2,12 +2,15 @@ package test_helpers import ( "bytes" + "fmt" + "math/big" "sort" "testing" statediff "github.com/cerc-io/plugeth-statediff" "github.com/cerc-io/plugeth-statediff/adapt" sdtypes "github.com/cerc-io/plugeth-statediff/types" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -20,28 +23,42 @@ type TestCase struct { Expected *sdtypes.StateObject } -type CheckedRoots = map[*types.Block][]byte +type CheckedRoots map[*types.Block][]byte + +// Replicates the statediff object, but indexes nodes by CID +type normalizedStateDiff struct { + BlockNumber *big.Int + BlockHash common.Hash + Nodes map[string]sdtypes.StateLeafNode + IPLDs map[string]sdtypes.IPLD +} func RunBuilderTests( t *testing.T, sdb state.Database, tests []TestCase, params statediff.Params, - roots CheckedRoots, + subtrieCounts []uint, ) { builder := statediff.NewBuilder(adapt.GethStateView(sdb)) for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { - diff, err := builder.BuildStateDiffObject(test.Args, params) - if err != nil { - t.Error(err) - } - - normalize(test.Expected) - normalize(&diff) - require.Equal(t, *test.Expected, diff) - }) + for _, subtries := range subtrieCounts { + t.Run(fmt.Sprintf("%s with %d subtries", test.Name, subtries), func(t *testing.T) { + builder.SetSubtrieWorkers(subtries) + diff, err := builder.BuildStateDiffObject(test.Args, params) + if err != nil { + t.Error(err) + } + require.Equal(t, + normalize(test.Expected), + normalize(&diff), + ) + }) + } } +} + +func (roots CheckedRoots) Check(t *testing.T) { // Let's also confirm that our root state nodes form the state root hash in the headers for block, node := range roots { require.Equal(t, block.Root(), crypto.Keccak256Hash(node), @@ -49,17 +66,13 @@ func RunBuilderTests( } } -// Sorts contained state nodes, storage nodes, and IPLDs -func normalize(diff *sdtypes.StateObject) { - sort.Slice(diff.IPLDs, func(i, j int) bool { - return diff.IPLDs[i].CID < diff.IPLDs[j].CID - }) - sort.Slice(diff.Nodes, func(i, j int) bool { - return bytes.Compare( - diff.Nodes[i].AccountWrapper.LeafKey, - diff.Nodes[j].AccountWrapper.LeafKey, - ) < 0 - }) +func normalize(diff *sdtypes.StateObject) normalizedStateDiff { + norm := normalizedStateDiff{ + BlockNumber: diff.BlockNumber, + BlockHash: diff.BlockHash, + Nodes: make(map[string]sdtypes.StateLeafNode), + IPLDs: make(map[string]sdtypes.IPLD), + } for _, node := range diff.Nodes { sort.Slice(node.StorageDiff, func(i, j int) bool { return bytes.Compare( @@ -67,5 +80,10 @@ func normalize(diff *sdtypes.StateObject) { node.StorageDiff[j].LeafKey, ) < 0 }) + norm.Nodes[node.AccountWrapper.CID] = node } + for _, ipld := range diff.IPLDs { + norm.IPLDs[ipld.CID] = ipld + } + return norm } diff --git a/utils/iterator_test.go b/utils/iterator_test.go index 57760d2..e7210ac 100644 --- a/utils/iterator_test.go +++ b/utils/iterator_test.go @@ -58,6 +58,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) { } assert.Equal(t, 2, *count) + // TODO will fail until fixed https://github.com/ethereum/go-ethereum/pull/27838 trieb := trie.NewEmpty(db) di, count = utils.NewSymmetricDifferenceIterator( triea.NodeIterator([]byte("jars")),