diff --git a/.gitea/workflows/test.yml b/.gitea/workflows/test.yml index 3b40968..dcd898a 100644 --- a/.gitea/workflows/test.yml +++ b/.gitea/workflows/test.yml @@ -89,3 +89,47 @@ jobs: pip install pytest pip install -r requirements.txt pytest -v -k test_basic_db + + compliance-test: + name: Run compliance tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + path: ./plugeth-statediff + - uses: actions/checkout@v3 + with: + repository: cerc-io/eth-statediff-compliance + ref: v0.1.0 + path: ./eth-statediff-compliance + token: ${{ secrets.CICD_REPO_TOKEN }} + - uses: actions/setup-go@v4 + with: + go-version-file: './eth-statediff-compliance/go.mod' + check-latest: true + - name: Install jq + run: apt-get update && apt-get install -yq jq + - name: Set up Gitea access token + env: + TOKEN: ${{ secrets.CICD_REPO_TOKEN }} + run: | + git config --global url."https://$TOKEN:@git.vdb.to/".insteadOf https://git.vdb.to/ + + - name: Update go.mod for dumpdiff-geth + working-directory: ./eth-statediff-compliance/ + run: ./scripts/update-mod.sh ../plugeth-statediff dumpdiff-geth/ + - name: Update go.mod for dumpdiff-plugeth + working-directory: ./eth-statediff-compliance/ + run: ./scripts/update-mod.sh ../plugeth-statediff dumpdiff-plugeth/ + - name: Update go.mod for dumpdiff-plugeth-parallel + working-directory: ./eth-statediff-compliance/ + run: ./scripts/update-mod.sh ../plugeth-statediff dumpdiff-plugeth-parallel/ + - name: Build tools + working-directory: ./eth-statediff-compliance/ + run: make all + - name: Compare output of geth and plugeth + working-directory: ./eth-statediff-compliance/ + run: ./scripts/compare-diffs.sh geth plugeth + - name: Compare output of geth and plugeth-parallel + working-directory: ./eth-statediff-compliance/ + run: ./scripts/compare-diffs.sh geth plugeth-parallel diff --git a/builder.go b/builder.go index fc5aed3..80cdf88 100644 --- a/builder.go +++ b/builder.go @@ -27,6 +27,7 @@ import ( "time" iterutils "github.com/cerc-io/eth-iterator-utils" + "github.com/cerc-io/eth-iterator-utils/tracker" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -58,7 +59,7 @@ type Builder interface { WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error } -type StateDiffBuilder struct { +type builder struct { // state cache is safe for concurrent reads stateCache adapt.StateView subtrieWorkers uint @@ -88,8 +89,8 @@ func syncedAppender[T any](to *[]T) func(T) error { } // NewBuilder is used to create a statediff builder -func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder { - return &StateDiffBuilder{ +func NewBuilder(stateCache adapt.StateView) *builder { + return &builder{ stateCache: stateCache, subtrieWorkers: defaultSubtrieWorkers, } @@ -97,7 +98,7 @@ func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder { // SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers. // Passing 0 will reset this to the default value. -func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) { +func (sdb *builder) SetSubtrieWorkers(n uint) { if n == 0 { n = defaultSubtrieWorkers } @@ -105,7 +106,7 @@ func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) { } // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters -func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { +func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer) var stateNodes []sdtypes.StateLeafNode var iplds []sdtypes.IPLD @@ -122,7 +123,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt } // WriteStateDiff writes a statediff object to output sinks -func (sdb *StateDiffBuilder) WriteStateDiff( +func (sdb *builder) WriteStateDiff( args Args, params Params, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, @@ -141,14 +142,16 @@ func (sdb *StateDiffBuilder) WriteStateDiff( subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) - // errgroup will cancel if any gr fails + // errgroup will cancel if any group fails g, ctx := errgroup.WithContext(context.Background()) for i := uint(0); i < sdb.subtrieWorkers; i++ { func(subdiv uint) { g.Go(func() error { a, b := subitersA[subdiv], subitersB[subdiv] + it := utils.NewSymmetricDifferenceIterator(a, b) return sdb.processAccounts(ctx, - a, b, params.watchedAddressesLeafPaths, + it, &it.SymmDiffState, + params.watchedAddressesLeafPaths, nodeSink, ipldSink, logger, ) }) @@ -157,11 +160,60 @@ func (sdb *StateDiffBuilder) WriteStateDiff( return g.Wait() } -// processAccounts processes account creations and deletions, and returns a set of updated -// existing accounts, indexed by leaf key. -func (sdb *StateDiffBuilder) processAccounts( +// WriteStateDiff writes a statediff object to output sinks +func (sdb *builder) WriteStateSnapshot( + stateRoot common.Hash, params Params, + nodeSink sdtypes.StateNodeSink, + ipldSink sdtypes.IPLDSink, + tracker tracker.IteratorTracker, +) error { + defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer) + // Load tries for old and new states + tree, err := sdb.stateCache.OpenTrie(stateRoot) + if err != nil { + return fmt.Errorf("error opening new state trie: %w", err) + } + + subiters, _, err := tracker.Restore(tree.NodeIterator) + if err != nil { + return fmt.Errorf("error restoring iterators: %w", err) + } + if len(subiters) != 0 { + // Completed iterators are not saved by the tracker, so restoring fewer than configured is ok, + // but having too many is a problem. + if len(subiters) > int(sdb.subtrieWorkers) { + return fmt.Errorf("restored too many iterators: expected %d, got %d", + sdb.subtrieWorkers, len(subiters)) + } + } else { + subiters = iterutils.SubtrieIterators(tree.NodeIterator, uint(sdb.subtrieWorkers)) + for i := range subiters { + subiters[i] = tracker.Tracked(subiters[i]) + } + } + // errgroup will cancel if any group fails + g, ctx := errgroup.WithContext(context.Background()) + for i := range subiters { + func(subdiv uint) { + g.Go(func() error { + symdiff := utils.AlwaysBState() + return sdb.processAccounts(ctx, + subiters[subdiv], &symdiff, + params.watchedAddressesLeafPaths, + nodeSink, ipldSink, log.DefaultLogger, + ) + }) + }(uint(i)) + } + return g.Wait() +} + +// processAccounts processes account creations, deletions, and updates +// the NodeIterator and SymmDiffIterator instances should refer to the same object, will only be used +func (sdb *builder) processAccounts( ctx context.Context, - a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte, + it trie.NodeIterator, symdiff *utils.SymmDiffState, + watchedAddressesLeafPaths [][]byte, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, logger log.Logger, ) error { @@ -171,9 +223,7 @@ func (sdb *StateDiffBuilder) processAccounts( updates := make(accountUpdateMap) // Cache the RLP of the previous node. When we hit a value node this will be the parent blob. - var prevBlob []byte - it, itCount := utils.NewSymmetricDifferenceIterator(a, b) - prevBlob = it.NodeBlob() + var prevBlob = it.NodeBlob() for it.Next(true) { select { case <-ctx.Done(): @@ -185,7 +235,7 @@ func (sdb *StateDiffBuilder) processAccounts( if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { continue } - if it.FromA() { // Node exists in the old trie + if symdiff.FromA() { // Node exists in the old trie if it.Leaf() { var account types.StateAccount if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { @@ -194,7 +244,7 @@ func (sdb *StateDiffBuilder) processAccounts( leafKey := make([]byte, len(it.LeafKey())) copy(leafKey, it.LeafKey()) - if it.CommonPath() { + if symdiff.CommonPath() { // If B also contains this leaf node, this is the old state of an updated account. if update, ok := updates[string(leafKey)]; ok { update.oldRoot = account.Root @@ -212,14 +262,14 @@ func (sdb *StateDiffBuilder) processAccounts( } continue } - // Node exists in the new trie + // Node exists in the new trie (B) if it.Leaf() { accountW, err := sdb.decodeStateLeaf(it, prevBlob) if err != nil { return err } - if it.CommonPath() { + if symdiff.CommonPath() { // If A also contains this leaf node, this is the new state of an updated account. if update, ok := updates[string(accountW.LeafKey)]; ok { update.new = *accountW @@ -232,42 +282,41 @@ func (sdb *StateDiffBuilder) processAccounts( return err } } - } else { - // New trie nodes will be written to blockstore only. - // Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually - // signifies a "value" node. - if it.Hash() == zeroHash { - continue - } - // TODO - this can be handled when value node is (craeted?) - nodeVal := make([]byte, len(it.NodeBlob())) - copy(nodeVal, it.NodeBlob()) - // if doing a selective diff, we need to ensure this is a watched path - if len(watchedAddressesLeafPaths) > 0 { - var elements []interface{} - if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { - return err - } - ok, err := isLeaf(elements) - if err != nil { - return err - } - if ok { - partialPath := utils.CompactToHex(elements[0].([]byte)) - valueNodePath := append(it.Path(), partialPath...) - if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) { - continue - } - } - } - if err := ipldSink(sdtypes.IPLD{ - CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(), - Content: nodeVal, - }); err != nil { + continue + } + // New inner trie nodes will be written to blockstore only. + // Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually + // signifies a "value" node. + if it.Hash() == zeroHash { + continue + } + nodeVal := make([]byte, len(it.NodeBlob())) + copy(nodeVal, it.NodeBlob()) + // if doing a selective diff, we need to ensure this is a watched path + if len(watchedAddressesLeafPaths) > 0 { + var elements []interface{} + if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { return err } - prevBlob = nodeVal + ok, err := isLeaf(elements) + if err != nil { + return err + } + if ok { + partialPath := utils.CompactToHex(elements[0].([]byte)) + valueNodePath := append(it.Path(), partialPath...) + if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) { + continue + } + } } + if err := ipldSink(sdtypes.IPLD{ + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(), + Content: nodeVal, + }); err != nil { + return err + } + prevBlob = nodeVal } for key, update := range updates { @@ -287,12 +336,10 @@ func (sdb *StateDiffBuilder) processAccounts( return err } } - - metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) return it.Error() } -func (sdb *StateDiffBuilder) processAccountDeletion( +func (sdb *builder) processAccountDeletion( leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink, ) error { diff := sdtypes.StateLeafNode{ @@ -309,7 +356,7 @@ func (sdb *StateDiffBuilder) processAccountDeletion( return nodeSink(diff) } -func (sdb *StateDiffBuilder) processAccountCreation( +func (sdb *builder) processAccountCreation( accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink, ) error { diff := sdtypes.StateLeafNode{ @@ -340,7 +387,7 @@ func (sdb *StateDiffBuilder) processAccountCreation( // decodes account at leaf and encodes RLP data to CID // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. -func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { +func (sdb *builder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { var account types.StateAccount if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err) @@ -357,7 +404,7 @@ func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob [] // processStorageCreations processes the storage node records for a newly created account // i.e. it returns all the storage nodes at this state, since there is no previous state. -func (sdb *StateDiffBuilder) processStorageCreations( +func (sdb *builder) processStorageCreations( sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink, ) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer) @@ -394,8 +441,9 @@ func (sdb *StateDiffBuilder) processStorageCreations( return it.Error() } -// processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A -func (sdb *StateDiffBuilder) processStorageUpdates( +// processStorageUpdates builds the storage diff node objects for all nodes that exist in a +// different state at B than A +func (sdb *builder) processStorageUpdates( oldroot common.Hash, newroot common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink, @@ -416,7 +464,7 @@ func (sdb *StateDiffBuilder) processStorageUpdates( var prevBlob []byte a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil) - it, _ := utils.NewSymmetricDifferenceIterator(a, b) + it := utils.NewSymmetricDifferenceIterator(a, b) for it.Next(true) { if it.FromA() { if it.Leaf() && !it.CommonPath() { @@ -457,7 +505,7 @@ func (sdb *StateDiffBuilder) processStorageUpdates( } // processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account -func (sdb *StateDiffBuilder) processRemovedAccountStorage( +func (sdb *builder) processRemovedAccountStorage( sr common.Hash, storageSink sdtypes.StorageNodeSink, ) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer) @@ -491,7 +539,7 @@ func (sdb *StateDiffBuilder) processRemovedAccountStorage( // decodes slot at leaf and encodes RLP data to CID // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. -func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { +func (sdb *builder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { leafKey := make([]byte, len(it.LeafKey())) copy(leafKey, it.LeafKey()) value := make([]byte, len(it.LeafBlob())) diff --git a/builder_snapshot_test.go b/builder_snapshot_test.go new file mode 100644 index 0000000..a071c38 --- /dev/null +++ b/builder_snapshot_test.go @@ -0,0 +1,538 @@ +package statediff_test + +import ( + "testing" + + statediff "github.com/cerc-io/plugeth-statediff" + "github.com/cerc-io/plugeth-statediff/indexer/ipld" + "github.com/cerc-io/plugeth-statediff/test_helpers" + sdtypes "github.com/cerc-io/plugeth-statediff/types" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" +) + +func TestBuilderSnapshot(t *testing.T) { + blocks, chain := test_helpers.MakeChain(3, test_helpers.Genesis, test_helpers.TestChainGen) + contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr) + defer chain.Stop() + block0 = test_helpers.Genesis + block1 = blocks[0] + block2 = blocks[1] + block3 = blocks[2] + params := statediff.Params{} + + tests := []test_helpers.SnapshotTestCase{ + { + "testEmptyDiff", + common.Hash{}, + &sdtypes.StateObject{ + Nodes: emptyDiffs, + }, + }, + { + "testBlock0", + //10000 transferred from testBankAddress to account1Addr + block0.Root(), + &sdtypes.StateObject{ + Nodes: []sdtypes.StateLeafNode{ + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: bankAccountAtBlock0, + LeafKey: test_helpers.BankLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock0LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + }, + IPLDs: []sdtypes.IPLD{ + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock0LeafNode)).String(), + Content: bankAccountAtBlock0LeafNode, + }, + }, + }, + }, + { + "testBlock1", + //10000 transferred from testBankAddress to account1Addr + block1.Root(), + &sdtypes.StateObject{ + Nodes: []sdtypes.StateLeafNode{ + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: bankAccountAtBlock1, + LeafKey: test_helpers.BankLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock1LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: minerAccountAtBlock1, + LeafKey: minerLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock1LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: account1AtBlock1, + LeafKey: test_helpers.Account1LeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock1LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + }, + IPLDs: []sdtypes.IPLD{ + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1BranchRootNode)).String(), + Content: block1BranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock1LeafNode)).String(), + Content: bankAccountAtBlock1LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock1LeafNode)).String(), + Content: minerAccountAtBlock1LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock1LeafNode)).String(), + Content: account1AtBlock1LeafNode, + }, + }, + }, + }, + { + "testBlock2", + //1000 transferred from testBankAddress to account1Addr + //1000 transferred from account1Addr to account2Addr + block2.Root(), + &sdtypes.StateObject{ + Nodes: []sdtypes.StateLeafNode{ + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: bankAccountAtBlock2, + LeafKey: test_helpers.BankLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock2LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: minerAccountAtBlock2, + LeafKey: minerLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock2LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: account1AtBlock2, + LeafKey: test_helpers.Account1LeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: contractAccountAtBlock2, + LeafKey: contractLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock2LeafNode)).String()}, + StorageDiff: []sdtypes.StorageLeafNode{ + { + Removed: false, + Value: slot0StorageValue, + LeafKey: slot0StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(), + }, + { + Removed: false, + Value: slot1StorageValue, + LeafKey: slot1StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(), + }, + }, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: account2AtBlock2, + LeafKey: test_helpers.Account2LeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account2AtBlock2LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + }, + IPLDs: []sdtypes.IPLD{ + { + CID: ipld.Keccak256ToCid(ipld.RawBinary, test_helpers.CodeHash.Bytes()).String(), + Content: test_helpers.ByteCodeAfterDeployment, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block2BranchRootNode)).String(), + Content: block2BranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock2LeafNode)).String(), + Content: bankAccountAtBlock2LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock2LeafNode)).String(), + Content: minerAccountAtBlock2LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String(), + Content: account1AtBlock2LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock2LeafNode)).String(), + Content: contractAccountAtBlock2LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(block2StorageBranchRootNode)).String(), + Content: block2StorageBranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(), + Content: slot0StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(), + Content: slot1StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account2AtBlock2LeafNode)).String(), + Content: account2AtBlock2LeafNode, + }, + }, + }, + }, + { + "testBlock3", + //the contract's storage is changed + //and the block is mined by account 2 + block3.Root(), + &sdtypes.StateObject{ + Nodes: []sdtypes.StateLeafNode{ + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: minerAccountAtBlock2, + LeafKey: minerLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock2LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: account1AtBlock2, + LeafKey: test_helpers.Account1LeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: bankAccountAtBlock3, + LeafKey: test_helpers.BankLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock3LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: contractAccountAtBlock3, + LeafKey: contractLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock3LeafNode)).String()}, + StorageDiff: []sdtypes.StorageLeafNode{ + + { + Removed: false, + Value: slot0StorageValue, + LeafKey: slot0StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(), + }, + { + Removed: false, + Value: slot1StorageValue, + LeafKey: slot1StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(), + }, + + { + Removed: false, + Value: slot3StorageValue, + LeafKey: slot3StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot3StorageLeafNode)).String(), + }, + }, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: account2AtBlock3, + LeafKey: test_helpers.Account2LeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account2AtBlock3LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + }, + IPLDs: []sdtypes.IPLD{ + { + CID: ipld.Keccak256ToCid(ipld.RawBinary, test_helpers.CodeHash.Bytes()).String(), + Content: test_helpers.ByteCodeAfterDeployment, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock2LeafNode)).String(), + Content: minerAccountAtBlock2LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String(), + Content: account1AtBlock2LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(), + Content: slot0StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(), + Content: slot1StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block3BranchRootNode)).String(), + Content: block3BranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock3LeafNode)).String(), + Content: bankAccountAtBlock3LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock3LeafNode)).String(), + Content: contractAccountAtBlock3LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(block3StorageBranchRootNode)).String(), + Content: block3StorageBranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot3StorageLeafNode)).String(), + Content: slot3StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account2AtBlock3LeafNode)).String(), + Content: account2AtBlock3LeafNode, + }, + }, + }, + }, + } + + for _, test := range tests { + test_helpers.RunStateSnapshot(t, chain.StateCache(), test, params) + } +} + +func TestBuilderSnapshotWithWatchedAddressList(t *testing.T) { + blocks, chain := test_helpers.MakeChain(3, test_helpers.Genesis, test_helpers.TestChainGen) + contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr) + defer chain.Stop() + block0 = test_helpers.Genesis + block1 = blocks[0] + block2 = blocks[1] + block3 = blocks[2] + params := statediff.Params{ + WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.ContractAddr}, + } + params.ComputeWatchedAddressesLeafPaths() + + var tests = []test_helpers.SnapshotTestCase{ + { + "testBlock0", + //10000 transferred from testBankAddress to account1Addr + block0.Root(), + &sdtypes.StateObject{ + Nodes: emptyDiffs, + }, + }, + { + "testBlock1", + //10000 transferred from testBankAddress to account1Addr + block1.Root(), + &sdtypes.StateObject{ + Nodes: []sdtypes.StateLeafNode{ + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: account1AtBlock1, + LeafKey: test_helpers.Account1LeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock1LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + }, + IPLDs: []sdtypes.IPLD{ + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1BranchRootNode)).String(), + Content: block1BranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock1LeafNode)).String(), + Content: account1AtBlock1LeafNode, + }, + }, + }, + }, + { + "testBlock2", + //1000 transferred from testBankAddress to account1Addr + //1000 transferred from account1Addr to account2Addr + block2.Root(), + &sdtypes.StateObject{ + Nodes: []sdtypes.StateLeafNode{ + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: contractAccountAtBlock2, + LeafKey: contractLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock2LeafNode)).String(), + }, + StorageDiff: []sdtypes.StorageLeafNode{ + { + Removed: false, + Value: slot0StorageValue, + LeafKey: slot0StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(), + }, + { + Removed: false, + Value: slot1StorageValue, + LeafKey: slot1StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(), + }, + }, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: account1AtBlock2, + LeafKey: test_helpers.Account1LeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + }, + IPLDs: []sdtypes.IPLD{ + { + CID: ipld.Keccak256ToCid(ipld.RawBinary, test_helpers.CodeHash.Bytes()).String(), + Content: test_helpers.ByteCodeAfterDeployment, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block2BranchRootNode)).String(), + Content: block2BranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock2LeafNode)).String(), + Content: contractAccountAtBlock2LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(block2StorageBranchRootNode)).String(), + Content: block2StorageBranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(), + Content: slot0StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(), + Content: slot1StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String(), + Content: account1AtBlock2LeafNode, + }, + }, + }, + }, + { + "testBlock3", + //the contract's storage is changed + //and the block is mined by account 2 + block3.Root(), + &sdtypes.StateObject{ + Nodes: []sdtypes.StateLeafNode{ + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: account1AtBlock2, + LeafKey: test_helpers.Account1LeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String()}, + StorageDiff: emptyStorage, + }, + { + Removed: false, + AccountWrapper: sdtypes.AccountWrapper{ + Account: contractAccountAtBlock3, + LeafKey: contractLeafKey, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock3LeafNode)).String()}, + StorageDiff: []sdtypes.StorageLeafNode{ + { + Removed: false, + Value: slot0StorageValue, + LeafKey: slot0StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(), + }, + { + Removed: false, + Value: slot1StorageValue, + LeafKey: slot1StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(), + }, + { + Removed: false, + Value: slot3StorageValue, + LeafKey: slot3StorageKey.Bytes(), + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot3StorageLeafNode)).String(), + }, + }, + }, + }, + IPLDs: []sdtypes.IPLD{ + { + CID: ipld.Keccak256ToCid(ipld.RawBinary, test_helpers.CodeHash.Bytes()).String(), + Content: test_helpers.ByteCodeAfterDeployment, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String(), + Content: account1AtBlock2LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(), + Content: slot0StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(), + Content: slot1StorageLeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block3BranchRootNode)).String(), + Content: block3BranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock3LeafNode)).String(), + Content: contractAccountAtBlock3LeafNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(block3StorageBranchRootNode)).String(), + Content: block3StorageBranchRootNode, + }, + { + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot3StorageLeafNode)).String(), + Content: slot3StorageLeafNode, + }, + }, + }, + }, + } + + for _, test := range tests { + test_helpers.RunStateSnapshot(t, chain.StateCache(), test, params) + } +} diff --git a/builder_test.go b/builder_test.go index 68e7ea2..1540880 100644 --- a/builder_test.go +++ b/builder_test.go @@ -503,7 +503,7 @@ func TestBuilder(t *testing.T) { block3 = blocks[2] params := statediff.Params{} - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ { "testEmptyDiff", statediff.Args{ @@ -795,7 +795,7 @@ func TestBuilder(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block0: bankAccountAtBlock0LeafNode, block1: block1BranchRootNode, @@ -817,7 +817,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) { } params.ComputeWatchedAddressesLeafPaths() - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ { "testEmptyDiff", statediff.Args{ @@ -1009,7 +1009,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block0: bankAccountAtBlock0LeafNode, block1: block1BranchRootNode, @@ -1028,7 +1028,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { block6 = blocks[5] params := statediff.Params{} - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ // blocks 0-3 are the same as in TestBuilderWithIntermediateNodes { "testBlock4", @@ -1260,7 +1260,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block4: block4BranchRootNode, block5: block5BranchRootNode, @@ -1281,7 +1281,7 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) { } params.ComputeWatchedAddressesLeafPaths() - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ { "testBlock4", statediff.Args{ @@ -1395,7 +1395,7 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block4: block4BranchRootNode, block5: block5BranchRootNode, @@ -1416,7 +1416,7 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) { } params.ComputeWatchedAddressesLeafPaths() - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ { "testBlock4", statediff.Args{ @@ -1599,7 +1599,7 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block4: block4BranchRootNode, block5: block5BranchRootNode, @@ -1700,7 +1700,7 @@ func TestBuilderWithMovedAccount(t *testing.T) { block2 = blocks[1] params := statediff.Params{} - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ { "testBlock1", statediff.Args{ @@ -1827,7 +1827,7 @@ func TestBuilderWithMovedAccount(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block1: block01BranchRootNode, block2: bankAccountAtBlock02LeafNode, @@ -2088,7 +2088,7 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) { block3 = blocks[2] params := statediff.Params{} - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ { "testEmptyDiff", statediff.Args{ @@ -2354,7 +2354,7 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block1: block1bBranchRootNode, block2: block2bBranchRootNode, @@ -2377,7 +2377,7 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) { } params.ComputeWatchedAddressesLeafPaths() - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ { "testEmptyDiff", statediff.Args{ @@ -2556,7 +2556,7 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block1: block1bBranchRootNode, block2: block2bBranchRootNode, diff --git a/config.go b/config.go index ac16f74..960b44a 100644 --- a/config.go +++ b/config.go @@ -48,16 +48,16 @@ type Config struct { SubtrieWorkers uint // Should the statediff service wait until geth has synced to the head of the blockchain? WaitForSync bool - // Context used during DB initialization + // Context passed to all DB method calls Context context.Context } // Params contains config parameters for the state diff builder type Params struct { - IncludeBlock bool + IncludeBlock bool // TODO: not used in write-requests IncludeReceipts bool IncludeTD bool - IncludeCode bool + IncludeCode bool // TODO: not used by anything? WatchedAddresses []common.Address watchedAddressesLeafPaths [][]byte } diff --git a/go.mod b/go.mod index ffb599b..d84ab58 100644 --- a/go.mod +++ b/go.mod @@ -124,8 +124,8 @@ require ( ) replace ( - github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.1.1 - github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1 + github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.1.2 + github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.3.1 github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 ) diff --git a/go.sum b/go.sum index 0cd8482..39e1e6b 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -git.vdb.to/cerc-io/eth-iterator-utils v0.1.1 h1:AGen4U2GaYJVzPjEo3U+GPczSfOUEMkM1nWTM+cq5Dk= -git.vdb.to/cerc-io/eth-iterator-utils v0.1.1/go.mod h1:uiocO9elfDe78kd3c/VZ2in26V+gyXJuN+sdTxK4Xag= -git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ= -git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk= +git.vdb.to/cerc-io/eth-iterator-utils v0.1.2 h1:PdMR5B9wrQSYuYpFhN+9Kc8AEZ0pTt5eKCmu8oCtFcY= +git.vdb.to/cerc-io/eth-iterator-utils v0.1.2/go.mod h1:OvXbdWbZ5viBXC/Ui1EkhsSmGB+AUX+TjGa3UDAfjfg= +git.vdb.to/cerc-io/eth-testing v0.3.1 h1:sPnlMev6oEgTjsW7GtUkSsjKNG/+X6P9q0izSejLGpM= +git.vdb.to/cerc-io/eth-testing v0.3.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk= git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g= git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU= git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 h1:KYcbbne/RXd7AuxbUd/3hgk1jPN+33k2CKiNsUsMCC0= diff --git a/indexer/constructor.go b/indexer/constructor.go index c44ab64..13f6d65 100644 --- a/indexer/constructor.go +++ b/indexer/constructor.go @@ -33,7 +33,16 @@ import ( ) // NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface. -func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (sql.Database, interfaces.StateDiffIndexer, error) { +func NewStateDiffIndexer( + ctx context.Context, + chainConfig *params.ChainConfig, + nodeInfo node.Info, + config interfaces.Config, +) ( + sql.Database, + interfaces.StateDiffIndexer, + error, +) { switch config.Type() { case shared.FILE: log.Info("Starting statediff service in SQL file writing mode") @@ -41,8 +50,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n if !ok { return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{}) } - fc.NodeInfo = nodeInfo - ind, err := file.NewStateDiffIndexer(chainConfig, fc) + ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo) return nil, ind, err case shared.POSTGRES: log.Info("Starting statediff service in Postgres writing mode") diff --git a/indexer/database/dump/batch_tx.go b/indexer/database/dump/batch_tx.go index 1923622..a36d7ce 100644 --- a/indexer/database/dump/batch_tx.go +++ b/indexer/database/dump/batch_tx.go @@ -19,26 +19,55 @@ package dump import ( "fmt" "io" + "math/big" "github.com/cerc-io/plugeth-statediff/indexer/ipld" - "github.com/cerc-io/plugeth-statediff/indexer/models" + "github.com/cerc-io/plugeth-statediff/utils/log" ) // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber string - dump io.Writer - quit chan struct{} - iplds chan models.IPLDModel - ipldCache models.IPLDBatch - - submit func(blockTx *BatchTx, err error) error + blockNum string + dump io.Writer + quit chan struct{} + iplds chan models.IPLDModel + ipldCache models.IPLDBatch } -// Submit satisfies indexer.AtomicTx -func (tx *BatchTx) Submit(err error) error { - return tx.submit(tx, err) +func NewBatch(number *big.Int, dest io.Writer) *BatchTx { + batch := &BatchTx{ + blockNum: number.String(), + dump: dest, + iplds: make(chan models.IPLDModel), + quit: make(chan struct{}), + ipldCache: models.IPLDBatch{}, + } + go batch.cache() + return batch +} + +func (self *BatchTx) Submit() error { + close(self.quit) + close(self.iplds) + + if err := self.flush(); err != nil { + return err + } + return nil +} + +func (tx *BatchTx) BlockNumber() string { + return tx.blockNum +} + +func (tx *BatchTx) RollbackOnFailure(err error) { + if p := recover(); p != nil { + log.Info("panic detected before tx submission, but rollback not supported", "panic", p) + panic(p) + } else if err != nil { + log.Info("error detected before tx submission, but rollback not supported", "error", err) + } } func (tx *BatchTx) flush() error { @@ -65,7 +94,7 @@ func (tx *BatchTx) cache() { func (tx *BatchTx) cacheDirect(key string, value []byte) { tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: key, Data: value, } @@ -73,7 +102,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) { func (tx *BatchTx) cacheIPLD(i ipld.IPLD) { tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: i.Cid().String(), Data: i.RawData(), } diff --git a/indexer/database/dump/indexer.go b/indexer/database/dump/indexer.go index 70e86b7..7307989 100644 --- a/indexer/database/dump/indexer.go +++ b/indexer/database/dump/indexer.go @@ -17,6 +17,7 @@ package dump import ( + "context" "encoding/hex" "fmt" "io" @@ -37,7 +38,6 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/shared" sdtypes "github.com/cerc-io/plugeth-statediff/types" - "github.com/cerc-io/plugeth-statediff/utils/log" ) var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} @@ -62,7 +62,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - start, t := time.Now(), time.Now() + t := time.Now() blockHash := block.Hash() blockHashStr := blockHash.String() height := block.NumberU64() @@ -74,7 +74,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } @@ -91,49 +91,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } else { reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts) } - t = time.Now() - blockTx := &BatchTx{ - BlockNumber: block.Number().String(), - dump: sdi.dump, - iplds: make(chan models.IPLDModel), - quit: make(chan struct{}), - ipldCache: models.IPLDBatch{}, - submit: func(self *BatchTx, err error) error { - close(self.quit) - close(self.iplds) - tDiff := time.Since(t) - metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) - t = time.Now() - if err := self.flush(); err != nil { - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) - log.Debug(traceMsg) - return err - } - tDiff = time.Since(t) - metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) - traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) - log.Debug(traceMsg) - return err - }, - } - go blockTx.cache() - - tDiff := time.Since(t) - metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) - - traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) + blockTx := NewBatch(block.Number(), sdi.dump) t = time.Now() // Publish and index header, collect headerID var headerID string - headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty) + headerID, err = sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty) if err != nil { return nil, err } - tDiff = time.Since(t) + tDiff := time.Since(t) metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() @@ -167,9 +135,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return blockTx, err } -// processHeader publishes and indexes a header IPLD in Postgres +// PushHeader publishes and indexes a header IPLD in Postgres // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { +func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { + tx, ok := batch.(*BatchTx) + if !ok { + return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) + } + headerNode, err := ipld.NewEthHeader(header) + if err != nil { + return "", err + } tx.cacheIPLD(headerNode) headerID := header.Hash().String() @@ -189,7 +165,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he Coinbase: header.Coinbase.String(), Canonical: true, } - _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) + _, err = fmt.Fprintf(sdi.dump, "%+v\r\n", mod) return headerID, err } @@ -344,7 +320,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, @@ -352,7 +328,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } } else { stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: stateNode.AccountWrapper.CID, @@ -375,7 +351,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -388,7 +364,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt continue } storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -432,6 +408,10 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber return nil, nil } +func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch { + return NewBatch(number, sdi.dump) +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() diff --git a/indexer/database/file/batch_tx.go b/indexer/database/file/batch_tx.go index d38bd12..3096204 100644 --- a/indexer/database/file/batch_tx.go +++ b/indexer/database/file/batch_tx.go @@ -16,14 +16,29 @@ package file +import "github.com/cerc-io/plugeth-statediff/utils/log" + // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber string - - submit func(blockTx *BatchTx, err error) error + blockNum string + fileWriter FileWriter } // Submit satisfies indexer.AtomicTx -func (tx *BatchTx) Submit(err error) error { - return tx.submit(tx, err) +func (tx *BatchTx) Submit() error { + tx.fileWriter.Flush() + return nil +} + +func (tx *BatchTx) BlockNumber() string { + return tx.blockNum +} + +func (tx *BatchTx) RollbackOnFailure(err error) { + if p := recover(); p != nil { + log.Info("panic detected before tx submission, but rollback not supported", "panic", p) + panic(p) + } else if err != nil { + log.Info("error detected before tx submission, but rollback not supported", "error", err) + } } diff --git a/indexer/database/file/config.go b/indexer/database/file/config.go index fc8fd8c..9c01327 100644 --- a/indexer/database/file/config.go +++ b/indexer/database/file/config.go @@ -20,7 +20,6 @@ import ( "fmt" "strings" - "github.com/cerc-io/plugeth-statediff/indexer/node" "github.com/cerc-io/plugeth-statediff/indexer/shared" ) @@ -30,7 +29,6 @@ type Config struct { OutputDir string FilePath string WatchedAddressesFilePath string - NodeInfo node.Info } // FileMode to explicitly type the mode of file writer we are using @@ -70,20 +68,11 @@ func (c Config) Type() shared.DBType { return shared.FILE } -var nodeInfo = node.Info{ - GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", - NetworkID: "1", - ChainID: 1, - ID: "mockNodeID", - ClientName: "go-ethereum", -} - // CSVTestConfig config for unit tests var CSVTestConfig = Config{ Mode: CSV, OutputDir: "./statediffing_test", WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv", - NodeInfo: nodeInfo, } // SQLTestConfig config for unit tests @@ -91,5 +80,4 @@ var SQLTestConfig = Config{ Mode: SQL, FilePath: "./statediffing_test_file.sql", WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql", - NodeInfo: nodeInfo, } diff --git a/indexer/database/file/csv_indexer_legacy_test.go b/indexer/database/file/csv_indexer_legacy_test.go index c117f75..238423b 100644 --- a/indexer/database/file/csv_indexer_legacy_test.go +++ b/indexer/database/file/csv_indexer_legacy_test.go @@ -43,7 +43,7 @@ func setupLegacyCSVIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig) + ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/csv_indexer_test.go b/indexer/database/file/csv_indexer_test.go index 4e6526f..06aa366 100644 --- a/indexer/database/file/csv_indexer_test.go +++ b/indexer/database/file/csv_indexer_test.go @@ -41,7 +41,7 @@ func setupCSVIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig) + ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index 1177b54..56719b8 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -17,6 +17,7 @@ package file import ( + "context" "errors" "fmt" "math/big" @@ -37,6 +38,7 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/models" + "github.com/cerc-io/plugeth-statediff/indexer/node" "github.com/cerc-io/plugeth-statediff/indexer/shared" sdtypes "github.com/cerc-io/plugeth-statediff/types" "github.com/cerc-io/plugeth-statediff/utils/log" @@ -61,7 +63,7 @@ type StateDiffIndexer struct { } // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer -func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) { +func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info) (*StateDiffIndexer, error) { var err error var writer FileWriter @@ -114,12 +116,12 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config) (*State wg := new(sync.WaitGroup) writer.Loop() - writer.upsertNode(config.NodeInfo) + writer.upsertNode(nodeInfo) return &StateDiffIndexer{ fileWriter: writer, chainConfig: chainConfig, - nodeID: config.NodeInfo.ID, + nodeID: nodeInfo.ID, wg: wg, }, nil } @@ -130,7 +132,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - start, t := time.Now(), time.Now() + t := time.Now() blockHash := block.Hash() blockHashStr := blockHash.String() height := block.NumberU64() @@ -142,7 +144,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } @@ -159,32 +161,19 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } else { reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts) } - t = time.Now() blockTx := &BatchTx{ - BlockNumber: block.Number().String(), - submit: func(self *BatchTx, err error) error { - tDiff := time.Since(t) - metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) - t = time.Now() - sdi.fileWriter.Flush() - tDiff = time.Since(t) - metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) - traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) - log.Trace(traceMsg) - return err - }, + blockNum: block.Number().String(), + fileWriter: sdi.fileWriter, } - tDiff := time.Since(t) - metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) - traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() // write header, collect headerID - headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty) - tDiff = time.Since(t) + headerID, err := sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty) + if err != nil { + return nil, err + } + tDiff := time.Since(t) metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() @@ -217,9 +206,14 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return blockTx, err } -// processHeader write a header IPLD insert SQL stmt to a file +// PushHeader write a header IPLD insert SQL stmt to a file // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string { +func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { + // Process the header + headerNode, err := ipld.NewEthHeader(header) + if err != nil { + return "", err + } sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode) headerID := header.Hash().String() @@ -240,7 +234,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld Coinbase: header.Coinbase.String(), Canonical: true, }) - return headerID + return headerID, nil } // processUncles publishes and indexes uncle IPLDs in Postgres @@ -374,20 +368,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } // PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { - tx, ok := batch.(*BatchTx) - if !ok { - return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch) - } +func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { // publish the state node var stateModel models.StateNodeModel if stateNode.Removed { if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 { atomic.StoreUint32(&sdi.removedCacheFlag, 1) - sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{}) + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStateCID, []byte{}) } stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, @@ -395,7 +385,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } } else { stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: stateNode.AccountWrapper.CID, @@ -415,10 +405,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt if storageNode.Removed { if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 { atomic.StoreUint32(&sdi.removedCacheFlag, 1) - sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{}) + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStorageCID, []byte{}) } storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -430,7 +420,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt continue } storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -445,12 +435,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } // PushIPLD writes iplds to ipld.blocks -func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error { - tx, ok := batch.(*BatchTx) - if !ok { - return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch) - } - sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, ipld.CID, ipld.Content) +func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, ipld sdtypes.IPLD) error { + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), ipld.CID, ipld.Content) return nil } @@ -472,6 +458,13 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er return false, nil } +func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch { + return &BatchTx{ + blockNum: number.String(), + fileWriter: sdi.fileWriter, + } +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.fileWriter.Close() diff --git a/indexer/database/file/mainnet_tests/indexer_test.go b/indexer/database/file/mainnet_tests/indexer_test.go index 3fe53a0..cdb8042 100644 --- a/indexer/database/file/mainnet_tests/indexer_test.go +++ b/indexer/database/file/mainnet_tests/indexer_test.go @@ -83,7 +83,7 @@ func setupMainnetIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig) + ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/sql_indexer_legacy_test.go b/indexer/database/file/sql_indexer_legacy_test.go index b46348a..6d721d5 100644 --- a/indexer/database/file/sql_indexer_legacy_test.go +++ b/indexer/database/file/sql_indexer_legacy_test.go @@ -44,7 +44,7 @@ func setupLegacySQLIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig) + ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/file/sql_indexer_test.go b/indexer/database/file/sql_indexer_test.go index 8663f27..3aab7b8 100644 --- a/indexer/database/file/sql_indexer_test.go +++ b/indexer/database/file/sql_indexer_test.go @@ -41,7 +41,7 @@ func setupIndexer(t *testing.T) { require.NoError(t, err) } - ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig) + ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo) require.NoError(t, err) db, err = postgres.SetupSQLXDB() diff --git a/indexer/database/sql/batch_tx.go b/indexer/database/sql/batch_tx.go index 5c35e88..3225aa7 100644 --- a/indexer/database/sql/batch_tx.go +++ b/indexer/database/sql/batch_tx.go @@ -18,11 +18,14 @@ package sql import ( "context" + "math/big" "sync" "sync/atomic" + "time" "github.com/lib/pq" + "github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/utils/log" @@ -32,7 +35,7 @@ const startingCacheCapacity = 1024 * 24 // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber string + blockNum string ctx context.Context dbtx Tx stm string @@ -42,13 +45,68 @@ type BatchTx struct { removedCacheFlag *uint32 // Tracks expected cache size and ensures cache is caught up before flush cacheWg sync.WaitGroup - - submit func(blockTx *BatchTx, err error) error } -// Submit satisfies indexer.AtomicTx -func (tx *BatchTx) Submit(err error) error { - return tx.submit(tx, err) +func NewBatch(insertStm string, ctx context.Context, number *big.Int, tx Tx) *BatchTx { + blockTx := &BatchTx{ + removedCacheFlag: new(uint32), + ctx: ctx, + blockNum: number.String(), + stm: insertStm, + iplds: make(chan models.IPLDModel), + quit: make(chan (chan<- struct{})), + ipldCache: models.IPLDBatch{ + BlockNumbers: make([]string, 0, startingCacheCapacity), + Keys: make([]string, 0, startingCacheCapacity), + Values: make([][]byte, 0, startingCacheCapacity), + }, + dbtx: tx, + } + go blockTx.cache() + return blockTx +} + +// Submit satisfies indexer.Batch +func (tx *BatchTx) Submit() error { + defer tx.close() + + t := time.Now() + if err := tx.flush(); err != nil { + rollback(tx.ctx, tx.dbtx) + return err + } + err := tx.dbtx.Commit(tx.ctx) + metrics.IndexerMetrics.PostgresCommitTimer.Update(time.Since(t)) + return err +} + +func (tx *BatchTx) BlockNumber() string { + return tx.blockNum +} + +func (tx *BatchTx) RollbackOnFailure(err error) { + if p := recover(); p != nil { + defer tx.close() + log.Info("panic detected before tx submission, rolling back the tx", "panic", p) + rollback(tx.ctx, tx.dbtx) + panic(p) + } else if err != nil { + defer tx.close() + log.Info("error detected before tx submission, rolling back the tx", "error", err) + rollback(tx.ctx, tx.dbtx) + } +} + +func (tx *BatchTx) close() { + if tx.quit == nil { + return + } + confirm := make(chan struct{}) + tx.quit <- confirm + close(tx.quit) + <-confirm + close(tx.iplds) + tx.quit = nil } func (tx *BatchTx) flush() error { @@ -92,7 +150,7 @@ func (tx *BatchTx) cache() { func (tx *BatchTx) cacheDirect(key string, value []byte) { tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: key, Data: value, } @@ -101,7 +159,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) { func (tx *BatchTx) cacheIPLD(i ipld.IPLD) { tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: i.Cid().String(), Data: i.RawData(), } @@ -112,7 +170,7 @@ func (tx *BatchTx) cacheRemoved(key string, value []byte) { atomic.StoreUint32(tx.removedCacheFlag, 1) tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: key, Data: value, } diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index d41f3d6..6dab963 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -39,7 +39,6 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/shared" sdtypes "github.com/cerc-io/plugeth-statediff/types" - "github.com/cerc-io/plugeth-statediff/utils/log" ) var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} @@ -82,24 +81,26 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - start, t := time.Now(), time.Now() + t := time.Now() blockHash := block.Hash() height := block.NumberU64() - traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash) transactions := block.Transactions() + var err error + // Derive any missing fields if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil { return nil, err } // Generate the block iplds - headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { - return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) + return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err) } if len(txNodes) != len(rctNodes) { - return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes)) + return nil, fmt.Errorf("expected number of transactions (%d) does not match number of receipts (%d)", + len(txNodes), len(rctNodes)) } // Calculate reward @@ -108,99 +109,35 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if sdi.chainConfig.Clique != nil { reward = big.NewInt(0) } else { - reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts) + reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), transactions, receipts) } t = time.Now() // Begin new DB tx for everything - tx := NewDelayedTx(sdi.dbWriter.db) - defer func() { - if p := recover(); p != nil { - rollback(sdi.ctx, tx) - panic(p) - } else if err != nil { - rollback(sdi.ctx, tx) - } - }() - blockTx := &BatchTx{ - removedCacheFlag: new(uint32), - ctx: sdi.ctx, - BlockNumber: block.Number().String(), - stm: sdi.dbWriter.db.InsertIPLDsStm(), - iplds: make(chan models.IPLDModel), - quit: make(chan (chan<- struct{})), - ipldCache: models.IPLDBatch{ - BlockNumbers: make([]string, 0, startingCacheCapacity), - Keys: make([]string, 0, startingCacheCapacity), - Values: make([][]byte, 0, startingCacheCapacity), - }, - dbtx: tx, - // handle transaction commit or rollback for any return case - submit: func(self *BatchTx, err error) error { - defer func() { - confirm := make(chan struct{}) - self.quit <- confirm - close(self.quit) - <-confirm - close(self.iplds) - }() - if p := recover(); p != nil { - log.Info("panic detected before tx submission, rolling back the tx", "panic", p) - rollback(sdi.ctx, tx) - panic(p) - } else if err != nil { - log.Info("error detected before tx submission, rolling back the tx", "error", err) - rollback(sdi.ctx, tx) - } else { - tDiff := time.Since(t) - metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff) - t = time.Now() - if err := self.flush(); err != nil { - rollback(sdi.ctx, tx) - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start)) - log.Debug(traceMsg) - return err - } - err = tx.Commit(sdi.ctx) - tDiff = time.Since(t) - metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff) - traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff) - } - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start)) - log.Debug(traceMsg) - return err - }, - } - go blockTx.cache() - - tDiff := time.Since(t) - metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff) - - traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff) - t = time.Now() + batch := NewBatch( + sdi.dbWriter.db.InsertIPLDsStm(), sdi.ctx, + block.Number(), + NewDelayedTx(sdi.dbWriter.db), + ) + // handle transaction rollback for failures in this scope + defer batch.RollbackOnFailure(err) // Publish and index header, collect headerID - var headerID string - headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty) + headerID, err := sdi.PushHeader(batch, block.Header(), reward, totalDifficulty) if err != nil { return nil, err } - tDiff = time.Since(t) - metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff) + metrics2.IndexerMetrics.HeaderProcessingTimer.Update(time.Since(t)) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles()) + err = sdi.processUncles(batch, headerID, block.Number(), block.UncleHash(), block.Uncles()) if err != nil { return nil, err } - tDiff = time.Since(t) - metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff) + metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t)) t = time.Now() // Publish and index receipts and txs - err = sdi.processReceiptsAndTxs(blockTx, processArgs{ + err = sdi.processReceiptsAndTxs(batch, processArgs{ headerID: headerID, blockNumber: block.Number(), receipts: receipts, @@ -212,12 +149,9 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if err != nil { return nil, err } - tDiff = time.Since(t) - metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff) - t = time.Now() + metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(time.Since(t)) - return blockTx, err + return batch, err } // CurrentBlock returns the HeaderModel of the highest existing block in the database. @@ -230,9 +164,18 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber return sdi.dbWriter.detectGaps(beginBlockNumber, endBlockNumber) } -// processHeader publishes and indexes a header IPLD in Postgres +// PushHeader publishes and indexes a header IPLD in Postgres // it returns the headerID -func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { +func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { + tx, ok := batch.(*BatchTx) + if !ok { + return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) + } + // Process the header + headerNode, err := ipld.NewEthHeader(header) + if err != nil { + return "", err + } tx.cacheIPLD(headerNode) headerID := header.Hash().String() @@ -406,7 +349,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt if stateNode.Removed { tx.cacheRemoved(shared.RemovedNodeStateCID, []byte{}) stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, @@ -414,7 +357,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } } else { stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: stateNode.AccountWrapper.CID, @@ -436,7 +379,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt if storageNode.Removed { tx.cacheRemoved(shared.RemovedNodeStorageCID, []byte{}) storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -450,7 +393,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt continue } storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -481,6 +424,15 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er return sdi.dbWriter.hasHeader(hash, number) } +func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch { + return NewBatch( + sdi.dbWriter.db.InsertIPLDsStm(), + ctx, + number, + NewDelayedTx(sdi.dbWriter.db), + ) +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dbWriter.Close() diff --git a/indexer/database/sql/lazy_tx.go b/indexer/database/sql/lazy_tx.go index 7543944..d34d8ae 100644 --- a/indexer/database/sql/lazy_tx.go +++ b/indexer/database/sql/lazy_tx.go @@ -3,7 +3,9 @@ package sql import ( "context" "reflect" + "time" + "github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/utils/log" ) @@ -69,10 +71,12 @@ func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) } func (tx *DelayedTx) Commit(ctx context.Context) error { + t := time.Now() base, err := tx.db.Begin(ctx) if err != nil { return err } + metrics.IndexerMetrics.FreePostgresTimer.Update(time.Since(t)) defer func() { if p := recover(); p != nil { rollback(ctx, base) diff --git a/indexer/database/sql/postgres/config.go b/indexer/database/sql/postgres/config.go index a2c63b4..cb26e53 100644 --- a/indexer/database/sql/postgres/config.go +++ b/indexer/database/sql/postgres/config.go @@ -44,10 +44,6 @@ type Config struct { ConnTimeout time.Duration LogStatements bool - // node info params - ID string - ClientName string - // driver type Driver DriverType diff --git a/indexer/interfaces/interfaces.go b/indexer/interfaces/interfaces.go index ad3f42c..18dc735 100644 --- a/indexer/interfaces/interfaces.go +++ b/indexer/interfaces/interfaces.go @@ -17,6 +17,7 @@ package interfaces import ( + "context" "math/big" "time" @@ -34,10 +35,13 @@ type StateDiffIndexer interface { CurrentBlock() (*models.HeaderModel, error) HasBlock(hash common.Hash, number uint64) (bool, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) + PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error) PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error PushIPLD(tx Batch, ipld sdtypes.IPLD) error ReportDBMetrics(delay time.Duration, quit <-chan bool) + BeginTx(number *big.Int, ctx context.Context) Batch + // Methods used by WatchAddress API/functionality LoadWatchedAddresses() ([]common.Address, error) InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error @@ -50,7 +54,12 @@ type StateDiffIndexer interface { // Batch required for indexing data atomically type Batch interface { - Submit(err error) error + // Submit commits the batch transaction + Submit() error + // BlockNumber is the block number of the header this batch contains + BlockNumber() string + // RollbackOnFailure rolls back the batch transaction if the error is not nil + RollbackOnFailure(error) } // Config used to configure different underlying implementations diff --git a/indexer/ipld/eth_parser.go b/indexer/ipld/eth_parser.go index 9ce7155..5ec8bf9 100644 --- a/indexer/ipld/eth_parser.go +++ b/indexer/ipld/eth_parser.go @@ -22,23 +22,17 @@ import ( // FromBlockAndReceipts takes a block and processes it // to return it a set of IPLD nodes for further processing. -func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) (*EthHeader, []*EthTx, []*EthReceipt, [][]*EthLog, error) { - // Process the header - headerNode, err := NewEthHeader(block.Header()) - if err != nil { - return nil, nil, nil, nil, err - } - +func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]*EthTx, []*EthReceipt, [][]*EthLog, error) { // Process the txs txNodes, err := processTransactions(block.Transactions()) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, err } // Process the receipts and logs rctNodes, logNodes, err := processReceiptsAndLogs(receipts) - return headerNode, txNodes, rctNodes, logNodes, err + return txNodes, rctNodes, logNodes, err } // processTransactions will take the found transactions in a parsed block body diff --git a/indexer/ipld/eth_parser_test.go b/indexer/ipld/eth_parser_test.go index fd44058..8deb260 100644 --- a/indexer/ipld/eth_parser_test.go +++ b/indexer/ipld/eth_parser_test.go @@ -92,7 +92,7 @@ func loadBlockData(t *testing.T) []testCase { func TestFromBlockAndReceipts(t *testing.T) { testCases := loadBlockData(t) for _, tc := range testCases { - _, _, _, _, err := FromBlockAndReceipts(tc.block, tc.receipts) + _, _, _, err := FromBlockAndReceipts(tc.block, tc.receipts) if err != nil { t.Fatalf("error generating IPLDs from block and receipts, err %v, kind %s, block hash %s", err, tc.kind, tc.block.Hash()) } diff --git a/indexer/reexport.go b/indexer/reexport.go new file mode 100644 index 0000000..49d46e4 --- /dev/null +++ b/indexer/reexport.go @@ -0,0 +1,7 @@ +package indexer + +import "github.com/cerc-io/plugeth-statediff/indexer/interfaces" + +type Indexer = interfaces.StateDiffIndexer +type Batch = interfaces.Batch +type Config = interfaces.Config diff --git a/indexer/shared/schema/table_test.go b/indexer/shared/schema/table_test.go index 692a839..aaa026b 100644 --- a/indexer/shared/schema/table_test.go +++ b/indexer/shared/schema/table_test.go @@ -43,7 +43,8 @@ var testHeaderTable = Table{ "mh_key", "times_validated", "coinbase", - )} + ), +} func TestTable(t *testing.T) { headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)` diff --git a/indexer/test/test.go b/indexer/test/test.go index ad43529..5f9cf7c 100644 --- a/indexer/test/test.go +++ b/indexer/test/test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/cerc-io/plugeth-statediff/indexer/database/file" "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/mocks" @@ -48,7 +47,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) { t.Fatal(err) } defer func() { - if err := tx.Submit(err); err != nil { + if err := tx.Submit(); err != nil { t.Fatal(err) } }() @@ -61,11 +60,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if batchTx, ok := tx.(*sql.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber) - } else if batchTx, ok := tx.(*file.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber) - } + require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber()) } func DoTestPublishAndIndexHeaderIPLDs(t *testing.T, db sql.Database) { @@ -547,13 +542,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if batchTx, ok := tx1.(*sql.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber) - } else if batchTx, ok := tx1.(*file.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber) - } + require.Equal(t, mocks.BlockNumber.String(), tx1.BlockNumber()) - if err := tx1.Submit(err); err != nil { + if err := tx1.Submit(); err != nil { t.Fatal(err) } @@ -572,13 +563,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if tx, ok := tx2.(*sql.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber) - } else if tx, ok := tx2.(*sql.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber) - } + require.Equal(t, mocks.BlockNumber.String(), tx2.BlockNumber()) - if err := tx2.Submit(err); err != nil { + if err := tx2.Submit(); err != nil { t.Fatal(err) } @@ -597,13 +584,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if batchTx, ok := tx3.(*sql.BatchTx); ok { - require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber) - } else if batchTx, ok := tx3.(*file.BatchTx); ok { - require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber) - } + require.Equal(t, mocks.Block2Number.String(), tx3.BlockNumber()) - if err := tx3.Submit(err); err != nil { + if err := tx3.Submit(); err != nil { t.Fatal(err) } } diff --git a/indexer/test/test_legacy.go b/indexer/test/test_legacy.go index 6b93f79..efe5664 100644 --- a/indexer/test/test_legacy.go +++ b/indexer/test/test_legacy.go @@ -20,11 +20,11 @@ import ( "context" "testing" - "github.com/cerc-io/plugeth-statediff/indexer/database/file" "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/mocks" + "github.com/cerc-io/plugeth-statediff/indexer/node" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" "github.com/ipfs/go-cid" @@ -37,6 +37,14 @@ var ( legacyData = mocks.NewLegacyData(LegacyConfig) mockLegacyBlock *types.Block legacyHeaderCID cid.Cid + // Mainnet node info + LegacyNodeInfo = node.Info{ + GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", + NetworkID: "1", + ChainID: 1, + ID: "mockNodeID", + ClientName: "go-ethereum", + } ) func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) { @@ -51,7 +59,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) defer func() { - if err := tx.Submit(err); err != nil { + if err := tx.Submit(); err != nil { t.Fatal(err) } }() @@ -60,11 +68,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if batchTx, ok := tx.(*sql.BatchTx); ok { - require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber) - } else if batchTx, ok := tx.(*file.BatchTx); ok { - require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber) - } + require.Equal(t, legacyData.BlockNumber.String(), tx.BlockNumber()) } func TestLegacyIndexer(t *testing.T, db sql.Database) { diff --git a/indexer/test/test_mainnet.go b/indexer/test/test_mainnet.go index 01289ab..6e06458 100644 --- a/indexer/test/test_mainnet.go +++ b/indexer/test/test_mainnet.go @@ -19,8 +19,6 @@ package test import ( "testing" - "github.com/cerc-io/plugeth-statediff/indexer/database/file" - "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/mocks" "github.com/ethereum/go-ethereum/core/types" @@ -36,7 +34,7 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B require.NoError(t, err) defer func() { - if err := tx.Submit(err); err != nil { + if err := tx.Submit(); err != nil { t.Fatal(err) } }() @@ -45,9 +43,5 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B require.NoError(t, err) } - if batchTx, ok := tx.(*sql.BatchTx); ok { - require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber) - } else if batchTx, ok := tx.(*file.BatchTx); ok { - require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber) - } + require.Equal(t, testBlock.Number().String(), tx.BlockNumber()) } diff --git a/main/flags.go b/main/flags.go index c1f5c1a..27cd497 100644 --- a/main/flags.go +++ b/main/flags.go @@ -173,8 +173,6 @@ func initConfig() { case shared.FILE: indexerConfig = fileConfig case shared.POSTGRES: - dbConfig.ID = config.ID - dbConfig.ClientName = config.ClientName indexerConfig = dbConfig case shared.DUMP: switch dbDumpDst { diff --git a/main/main.go b/main/main.go index 0ab262f..20f9aca 100644 --- a/main/main.go +++ b/main/main.go @@ -47,8 +47,12 @@ func InitializeNode(stack core.Node, b core.Backend) { ClientName: serviceConfig.ClientName, } var err error - _, indexer, err = ind.NewStateDiffIndexer(serviceConfig.Context, - adapt.ChainConfig(backend.ChainConfig()), info, serviceConfig.IndexerConfig) + _, indexer, err = ind.NewStateDiffIndexer( + serviceConfig.Context, + adapt.ChainConfig(backend.ChainConfig()), + info, + serviceConfig.IndexerConfig, + ) if err != nil { log.Error("failed to construct indexer", "error", err) } diff --git a/mainnet_tests/builder_test.go b/mainnet_tests/builder_test.go index e428643..ded3ba4 100644 --- a/mainnet_tests/builder_test.go +++ b/mainnet_tests/builder_test.go @@ -444,7 +444,7 @@ func TestBuilderOnMainnetBlocks(t *testing.T) { } params := statediff.Params{} - var tests = []test_helpers.TestCase{ + var tests = []test_helpers.DiffTestCase{ // note that block0 (genesis) has over 1000 nodes due to the pre-allocation for the crowd-sale // it is not feasible to write a unit test of that size at this time { @@ -624,7 +624,7 @@ func TestBuilderOnMainnetBlocks(t *testing.T) { }, } - test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32}) + test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params) test_helpers.CheckedRoots{ block1: block1RootBranchNode, block2: block2RootBranchNode, diff --git a/service.go b/service.go index 7cc7f80..402b71c 100644 --- a/service.go +++ b/service.go @@ -817,11 +817,15 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p if params.IncludeReceipts { receipts = sds.BlockChain.GetReceiptsByHash(block.Hash()) } + + t := time.Now() tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty) if err != nil { return err } + defer tx.RollbackOnFailure(err) + // TODO: review/remove the need to sync here var nodeMtx, ipldMtx sync.Mutex nodeSink := func(node types2.StateLeafNode) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer) @@ -842,9 +846,12 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p BlockHash: block.Hash(), BlockNumber: block.Number(), }, params, nodeSink, ipldSink) + if err != nil { + return err + } - // TODO this anti-pattern needs to be sorted out eventually - if err = tx.Submit(err); err != nil { + metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(t)) + if err = tx.Submit(); err != nil { return fmt.Errorf("batch transaction submission failed: %w", err) } return nil diff --git a/test_helpers/builder.go b/test_helpers/builder.go index 9a82851..44f46bf 100644 --- a/test_helpers/builder.go +++ b/test_helpers/builder.go @@ -4,9 +4,13 @@ import ( "bytes" "fmt" "math/big" + "math/rand" + "path/filepath" "sort" + "sync" "testing" + "github.com/cerc-io/eth-iterator-utils/tracker" statediff "github.com/cerc-io/plugeth-statediff" "github.com/cerc-io/plugeth-statediff/adapt" sdtypes "github.com/cerc-io/plugeth-statediff/types" @@ -17,12 +21,20 @@ import ( "github.com/stretchr/testify/require" ) -type TestCase struct { +var subtrieCounts = []uint{1, 8, 32} + +type DiffTestCase struct { Name string Args statediff.Args Expected *sdtypes.StateObject } +type SnapshotTestCase struct { + Name string + StateRoot common.Hash + Expected *sdtypes.StateObject +} + type CheckedRoots map[*types.Block][]byte // Replicates the statediff object, but indexes nodes by CID @@ -33,12 +45,11 @@ type normalizedStateDiff struct { IPLDs map[string]sdtypes.IPLD } -func RunBuilderTests( +func RunBuildStateDiff( t *testing.T, sdb state.Database, - tests []TestCase, + tests []DiffTestCase, params statediff.Params, - subtrieCounts []uint, ) { builder := statediff.NewBuilder(adapt.GethStateView(sdb)) for _, test := range tests { @@ -58,6 +69,79 @@ func RunBuilderTests( } } +func RunStateSnapshot( + t *testing.T, + sdb state.Database, + test SnapshotTestCase, + params statediff.Params, +) { + builder := statediff.NewBuilder(adapt.GethStateView(sdb)) + + for _, subtries := range subtrieCounts { + // Skip the recovery test for empty diffs + doRecovery := len(test.Expected.Nodes) != 0 + + t.Run(fmt.Sprintf("%s with %d subtries", test.Name, subtries), func(t *testing.T) { + builder.SetSubtrieWorkers(subtries) + var stateNodes []sdtypes.StateLeafNode + var iplds []sdtypes.IPLD + interrupt := randomInterrupt(len(test.Expected.IPLDs)) + stateAppender := failingSyncedAppender(&stateNodes, -1) + ipldAppender := failingSyncedAppender(&iplds, interrupt) + recoveryFile := filepath.Join(t.TempDir(), "recovery.txt") + build := func() error { + tr := tracker.New(recoveryFile, subtries) + defer tr.CloseAndSave() + return builder.WriteStateSnapshot( + test.StateRoot, params, stateAppender, ipldAppender, tr, + ) + } + if doRecovery { + // First attempt fails, second succeeds + if build() == nil { + t.Fatal("expected an error") + } + require.FileExists(t, recoveryFile) + } + ipldAppender = failingSyncedAppender(&iplds, -1) + if err := build(); err != nil { + t.Fatal(err) + } + diff := sdtypes.StateObject{ + Nodes: stateNodes, + IPLDs: iplds, + } + require.Equal(t, + normalize(test.Expected), + normalize(&diff), + ) + }) + } + +} + +// an appender which fails on a configured trigger +func failingSyncedAppender[T any](to *[]T, failAt int) func(T) error { + var mtx sync.Mutex + return func(item T) error { + mtx.Lock() + defer mtx.Unlock() + if len(*to) == failAt { + return fmt.Errorf("failing at %d items", failAt) + } + *to = append(*to, item) + return nil + } +} + +// function to pick random int between N/4 and 3N/4 +func randomInterrupt(N int) int { + if N < 2 { + return 0 + } + return rand.Intn(N/2) + N/4 +} + func (roots CheckedRoots) Check(t *testing.T) { // Let's also confirm that our root state nodes form the state root hash in the headers for block, node := range roots { diff --git a/test_helpers/mocks/indexer.go b/test_helpers/mocks/indexer.go index e4ca5fb..403b553 100644 --- a/test_helpers/mocks/indexer.go +++ b/test_helpers/mocks/indexer.go @@ -17,6 +17,7 @@ package mocks import ( + context "context" "math/big" "time" @@ -52,6 +53,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return &batch{}, nil } +func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { + return "", nil +} + func (sdi *StateDiffIndexer) PushStateNode(txi interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { return nil } @@ -80,10 +85,21 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { return nil } +func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch { + return &batch{} +} + func (sdi *StateDiffIndexer) Close() error { return nil } -func (tx *batch) Submit(err error) error { +func (tx *batch) RollbackOnFailure(error) {} + +func (tx *batch) Submit() error { return nil } + +// batch.BlockNumber +func (tx *batch) BlockNumber() string { + return "0" +} diff --git a/types/types.go b/types/types.go index 9f3f60b..9d92fd2 100644 --- a/types/types.go +++ b/types/types.go @@ -53,7 +53,7 @@ type StateLeafNode struct { StorageDiff []StorageLeafNode } -// StorageLeafNode holds the data for a single storage diff node leaf node +// StorageLeafNode holds the data for a single storage diff leaf node type StorageLeafNode struct { Removed bool Value []byte diff --git a/utils/iterator.go b/utils/iterator.go index d02cff7..5aac38e 100644 --- a/utils/iterator.go +++ b/utils/iterator.go @@ -7,24 +7,9 @@ import ( "github.com/ethereum/go-ethereum/trie" ) -type symmDiffIterator struct { - a, b iterState // Nodes returned are those in b - a and a - b (keys only) - yieldFromA bool // Whether next node comes from a - count int // Number of nodes scanned on either trie - eqPathIndex int // Count index of last pair of equal paths, to detect an updated key -} - -// NewSymmetricDifferenceIterator constructs a trie.NodeIterator that iterates over the symmetric difference -// of elements in a and b, i.e., the elements in a that are not in b, and vice versa. -// Returns the iterator, and a pointer to an integer recording the number of nodes seen. -func NewSymmetricDifferenceIterator(a, b trie.NodeIterator) (*symmDiffIterator, *int) { - it := &symmDiffIterator{ - a: iterState{a, true}, - b: iterState{b, true}, - // common paths are detected by a distance <=1 from this index, so put it out of reach - eqPathIndex: -2, - } - return it, &it.count +type SymmDiffIterator struct { + a, b iterState // Nodes returned are those in b - a and a - b (keys only) + SymmDiffState } // pairs an iterator with a cache of its valid status @@ -33,66 +18,93 @@ type iterState struct { valid bool } +// SymmDiffState exposes state specific to symmetric difference iteration, which is not accessible +// from the NodeIterator interface. This includes the number of nodes seen, whether the current key +// is common to both A and B, and whether the current node is sourced from A or B. +type SymmDiffState struct { + yieldFromA bool // Whether next node comes from a + count int // Number of nodes scanned on either trie + eqPathIndex int // Count index of last pair of equal paths, to detect an updated key +} + +// NewSymmetricDifferenceIterator constructs a trie.NodeIterator that iterates over the symmetric difference +// of elements in a and b, i.e., the elements in a that are not in b, and vice versa. +// Returns the iterator, and a pointer to an auxiliary object for accessing the state not exposed by the NodeIterator interface recording the number of nodes seen. +func NewSymmetricDifferenceIterator(a, b trie.NodeIterator) *SymmDiffIterator { + it := &SymmDiffIterator{ + a: iterState{a, true}, + b: iterState{b, true}, + // common paths are detected by a distance <=1 between count and this index, so we start at -2 + SymmDiffState: SymmDiffState{eqPathIndex: -2}, + } + return it +} + func (st *iterState) Next(descend bool) bool { st.valid = st.NodeIterator.Next(descend) return st.valid } -func (it *symmDiffIterator) curr() *iterState { +// FromA returns true if the current node is sourced from A. +func (it *SymmDiffState) FromA() bool { + return it.yieldFromA +} + +// CommonPath returns true if a node with the current path exists in each sub-iterator - i.e. it +// represents an updated node. +func (it *SymmDiffState) CommonPath() bool { + return it.count-it.eqPathIndex <= 1 +} + +// Count returns the number of nodes seen. +func (it *SymmDiffState) Count() int { + return it.count +} + +func (it *SymmDiffIterator) curr() *iterState { if it.yieldFromA { return &it.a } return &it.b } -// FromA returns true if the current node is sourced from A. -func (it *symmDiffIterator) FromA() bool { - return it.yieldFromA -} - -// CommonPath returns true if a node with the current path exists in each sub-iterator - i.e. it -// represents an updated node. -func (it *symmDiffIterator) CommonPath() bool { - return it.count-it.eqPathIndex <= 1 -} - -func (it *symmDiffIterator) Hash() common.Hash { +func (it *SymmDiffIterator) Hash() common.Hash { return it.curr().Hash() } -func (it *symmDiffIterator) Parent() common.Hash { +func (it *SymmDiffIterator) Parent() common.Hash { return it.curr().Parent() } -func (it *symmDiffIterator) Leaf() bool { +func (it *SymmDiffIterator) Leaf() bool { return it.curr().Leaf() } -func (it *symmDiffIterator) LeafKey() []byte { +func (it *SymmDiffIterator) LeafKey() []byte { return it.curr().LeafKey() } -func (it *symmDiffIterator) LeafBlob() []byte { +func (it *SymmDiffIterator) LeafBlob() []byte { return it.curr().LeafBlob() } -func (it *symmDiffIterator) LeafProof() [][]byte { +func (it *SymmDiffIterator) LeafProof() [][]byte { return it.curr().LeafProof() } -func (it *symmDiffIterator) Path() []byte { +func (it *SymmDiffIterator) Path() []byte { return it.curr().Path() } -func (it *symmDiffIterator) NodeBlob() []byte { +func (it *SymmDiffIterator) NodeBlob() []byte { return it.curr().NodeBlob() } -func (it *symmDiffIterator) AddResolver(resolver trie.NodeResolver) { +func (it *SymmDiffIterator) AddResolver(resolver trie.NodeResolver) { panic("not implemented") } -func (it *symmDiffIterator) Next(bool) bool { +func (it *SymmDiffIterator) Next(bool) bool { // NodeIterators start in a "pre-valid" state, so the first Next advances to a valid node. if it.count == 0 { if it.a.Next(true) { @@ -110,7 +122,7 @@ func (it *symmDiffIterator) Next(bool) bool { return it.a.valid || it.b.valid } -func (it *symmDiffIterator) seek() { +func (it *SymmDiffIterator) seek() { // Invariants: // - At the end of the function, the sub-iterator with the lexically lesser path // points to the next element @@ -151,7 +163,7 @@ func (it *symmDiffIterator) seek() { } } -func (it *symmDiffIterator) Error() error { +func (it *SymmDiffIterator) Error() error { if err := it.a.Error(); err != nil { return err } @@ -172,3 +184,9 @@ func compareNodes(a, b trie.NodeIterator) int { } return 0 } + +// AlwaysBState returns a dummy SymmDiffState that indicates all elements are from B, and have no +// common paths with A. This is equivalent to a diff against an empty A. +func AlwaysBState() SymmDiffState { + return SymmDiffState{yieldFromA: false, eqPathIndex: -2} +} diff --git a/utils/iterator_test.go b/utils/iterator_test.go index e7210ac..b440fbd 100644 --- a/utils/iterator_test.go +++ b/utils/iterator_test.go @@ -45,37 +45,33 @@ func TestSymmetricDifferenceIterator(t *testing.T) { t.Run("with no difference", func(t *testing.T) { db := trie.NewDatabase(rawdb.NewMemoryDatabase()) triea := trie.NewEmpty(db) - di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil)) + di := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil)) for di.Next(true) { t.Errorf("iterator should not yield any elements") } - assert.Equal(t, 0, *count) + assert.Equal(t, 0, di.Count()) triea.MustUpdate([]byte("foo"), []byte("bar")) - di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil)) + di = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil)) for di.Next(true) { t.Errorf("iterator should not yield any elements") } - assert.Equal(t, 2, *count) + // two nodes visited: the leaf (value) and its parent + assert.Equal(t, 2, di.Count()) - // TODO will fail until fixed https://github.com/ethereum/go-ethereum/pull/27838 trieb := trie.NewEmpty(db) - di, count = utils.NewSymmetricDifferenceIterator( - triea.NodeIterator([]byte("jars")), - trieb.NodeIterator(nil)) + di = utils.NewSymmetricDifferenceIterator(triea.NodeIterator([]byte("jars")), trieb.NodeIterator(nil)) for di.Next(true) { - t.Errorf("iterator should not yield any elements, but got key %s", di.Path()) + t.Errorf("iterator should not yield any elements") } - assert.Equal(t, 0, *count) + assert.Equal(t, 0, di.Count()) - // // TODO will fail until merged: https://github.com/ethereum/go-ethereum/pull/27838 - // di, count = utils.NewSymmetricDifferenceIterator( - // triea.NodeIterator([]byte("food")), - // trieb.NodeIterator(nil)) + // TODO will fail until merged: https://github.com/ethereum/go-ethereum/pull/27838 + // di, aux = utils.NewSymmetricDifferenceIterator(triea.NodeIterator([]byte("food")), trieb.NodeIterator(nil)) // for di.Next(true) { - // t.Errorf("iterator should not yield any elements, but got key %s", di.Path()) + // t.Errorf("iterator should not yield any elements") // } - // assert.Equal(t, 0, *count) + // assert.Equal(t, 0, di.Count()) }) t.Run("small difference", func(t *testing.T) { @@ -86,7 +82,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) { trieb := trie.NewEmpty(dbb) trieb.MustUpdate([]byte("foo"), []byte("bar")) - di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil)) + di := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil)) leaves := 0 for di.Next(true) { if di.Leaf() { @@ -97,10 +93,10 @@ func TestSymmetricDifferenceIterator(t *testing.T) { } } assert.Equal(t, 1, leaves) - assert.Equal(t, 2, *count) + assert.Equal(t, 2, di.Count()) trieb.MustUpdate([]byte("quux"), []byte("bars")) - di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator([]byte("quux"))) + di = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator([]byte("quux"))) leaves = 0 for di.Next(true) { if di.Leaf() { @@ -111,7 +107,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) { } } assert.Equal(t, 1, leaves) - assert.Equal(t, 1, *count) + assert.Equal(t, 1, di.Count()) }) dba := trie.NewDatabase(rawdb.NewMemoryDatabase()) @@ -128,7 +124,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) { onlyA := make(map[string]string) onlyB := make(map[string]string) var deletions, creations []string - it, _ := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil)) + it := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil)) for it.Next(true) { if !it.Leaf() { continue @@ -209,7 +205,7 @@ func TestCompareDifferenceIterators(t *testing.T) { pathsA = append(pathsA, itAonly.Path()) } - itSym, _ := utils.NewSymmetricDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil)) + itSym := utils.NewSymmetricDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil)) var idxA, idxB int for itSym.Next(true) { if itSym.FromA() { diff --git a/utils/log/log.go b/utils/log/log.go index 2cdeb82..307d936 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -17,6 +17,7 @@ func init() { // The plugeth logger is only initialized with the geth runtime, // but tests expect to have a logger available, so default to this. DefaultLogger = TestLogger + TestLogger.SetLevel(int(log15.LvlInfo)) } func Trace(m string, a ...interface{}) { DefaultLogger.Trace(m, a...) }