fix overshadowing of snap.Tx

This commit is contained in:
i-norden 2023-05-16 09:22:15 -05:00
parent 0cab03b98e
commit 891fca89bd
3 changed files with 51 additions and 40 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@ mocks/
.vscode .vscode
output_dir*/ output_dir*/
log_file log_file
recovery_file

View File

@ -202,9 +202,6 @@ func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, head
} }
defer func() { defer func() {
err = CommitOrRollback(tx, err) err = CommitOrRollback(tx, err)
if err != nil {
log.Errorf("CommitOrRollback failed: %s", err)
}
}() }()
// path (from recovery dump) to be seeked on recovery // path (from recovery dump) to be seeked on recovery
@ -224,28 +221,31 @@ func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, head
recoveredPath = append(recoveredPath, *seekedPath...) recoveredPath = append(recoveredPath, *seekedPath...)
endPath = i.endPath endPath = i.endPath
} else { } else {
return errors.New("untracked iterator") err = errors.New("untracked iterator")
return err
} }
return s.createSubTrieSnapshot(ctx, tx, nil, it, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths) tx, err = s.createSubTrieSnapshot(ctx, tx, nil, it, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths)
return err
} }
// createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator // createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator
// continually updating seekedPath with path of the latest processed node // continually updating seekedPath with path of the latest processed node
func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator, func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator,
recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error { recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) (Tx, error) {
prom.IncActiveIterCount() prom.IncActiveIterCount()
defer prom.DecActiveIterCount() defer prom.DecActiveIterCount()
// descend in the first loop iteration to reach first child node // descend in the first loop iteration to reach first child node
var err error
descend := true descend := true
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return errors.New("ctx cancelled") return tx, errors.New("ctx cancelled")
default: default:
if ok := subTrieIt.Next(descend); !ok { if ok := subTrieIt.Next(descend); !ok {
return subTrieIt.Error() return tx, subTrieIt.Error()
} }
// to avoid descending further // to avoid descending further
@ -257,15 +257,16 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
// if node path is empty and prefix is nil, it's the root node // if node path is empty and prefix is nil, it's the root node
if prefixPath == nil { if prefixPath == nil {
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie // create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath); err != nil { tx, err = s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath)
return err if err != nil {
return tx, err
} }
updateSeekedPath(seekedPath, subTrieIt.Path()) updateSeekedPath(seekedPath, subTrieIt.Path())
} }
if ok := subTrieIt.Next(true); !ok { if ok := subTrieIt.Next(true); !ok {
// return if no further nodes available // return if no further nodes available
return subTrieIt.Error() return tx, subTrieIt.Error()
} }
} }
@ -283,7 +284,7 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
if trackedSubtrieIt, ok := subTrieIt.(*trackedIter); ok { if trackedSubtrieIt, ok := subTrieIt.(*trackedIter); ok {
s.tracker.stopIter(trackedSubtrieIt) s.tracker.stopIter(trackedSubtrieIt)
} }
return subTrieIt.Error() return tx, subTrieIt.Error()
} }
// skip the current node if it's before recovered path and not along the recovered path // skip the current node if it's before recovered path and not along the recovered path
@ -307,8 +308,9 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
// if the node is along paths of interest // if the node is along paths of interest
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie // create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath); err != nil { tx, err = s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath)
return err if err != nil {
return tx, err
} }
// update seeked path after node has been processed // update seeked path after node has been processed
updateSeekedPath(seekedPath, nodePath) updateSeekedPath(seekedPath, nodePath)
@ -316,11 +318,12 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
// create an iterator to traverse and process the next level of this subTrie // create an iterator to traverse and process the next level of this subTrie
nextSubTrieIt, err := s.createSubTrieIt(nodePath, subTrieIt.Hash(), recoveredPath) nextSubTrieIt, err := s.createSubTrieIt(nodePath, subTrieIt.Hash(), recoveredPath)
if err != nil { if err != nil {
return err return tx, err
} }
// pass on the seekedPath of the tracked concurrent iterator to be updated // pass on the seekedPath of the tracked concurrent iterator to be updated
if err := s.createSubTrieSnapshot(ctx, tx, nodePath, nextSubTrieIt, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths); err != nil { tx, err = s.createSubTrieSnapshot(ctx, tx, nodePath, nextSubTrieIt, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths)
return err if err != nil {
return tx, err
} }
} }
} }
@ -359,10 +362,10 @@ func (s *Service) createSubTrieIt(prefixPath []byte, hash common.Hash, recovered
// createNodeSnapshot indexes the current node // createNodeSnapshot indexes the current node
// entire storage trie is also indexed (if available) // entire storage trie is also indexed (if available)
func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID string, height *big.Int, func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID string, height *big.Int,
watchedAddressesLeafPaths [][]byte, prefixPath []byte) error { watchedAddressesLeafPaths [][]byte, prefixPath []byte) (Tx, error) {
tx, err := s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) tx, err := s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
if err != nil { if err != nil {
return err return tx, err
} }
// index values by leaf key // index values by leaf key
@ -371,25 +374,25 @@ func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID strin
// publish codehash => code mappings // publish codehash => code mappings
// take storage snapshot // take storage snapshot
if err := s.processStateValueNode(it, headerID, height, prefixPath, watchedAddressesLeafPaths, tx); err != nil { if err := s.processStateValueNode(it, headerID, height, prefixPath, watchedAddressesLeafPaths, tx); err != nil {
return err return tx, err
} }
} else { // trie nodes will be written to blockstore only } else { // trie nodes will be written to blockstore only
// reminder that this includes leaf nodes, since the geth iterator.Leaf() actually signifies a "value" node // reminder that this includes leaf nodes, since the geth iterator.Leaf() actually signifies a "value" node
// so this is also where we publish the IPLD block corresponding to the "value" nodes indexed above // so this is also where we publish the IPLD block corresponding to the "value" nodes indexed above
if IsNullHash(it.Hash()) { if IsNullHash(it.Hash()) {
// skip null node // skip null node
return nil return tx, nil
} }
nodeVal := make([]byte, len(it.NodeBlob())) nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob()) copy(nodeVal, it.NodeBlob())
if len(watchedAddressesLeafPaths) > 0 { if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{} var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return err return tx, err
} }
ok, err := isLeaf(elements) ok, err := isLeaf(elements)
if err != nil { if err != nil {
return err return tx, err
} }
if ok { if ok {
// create the full node path as it.Path() doesn't include the path before subtrie root // create the full node path as it.Path() doesn't include the path before subtrie root
@ -398,18 +401,18 @@ func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID strin
valueNodePath := append(nodePath, partialPath...) valueNodePath := append(nodePath, partialPath...)
if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) { if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) {
// skip this node // skip this node
return nil return tx, nil
} }
} }
} }
nodeHash := make([]byte, len(it.Hash().Bytes())) nodeHash := make([]byte, len(it.Hash().Bytes()))
copy(nodeHash, it.Hash().Bytes()) copy(nodeHash, it.Hash().Bytes())
if err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.MEthStateTrie, nodeHash), nodeVal, height, tx); err != nil { if err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.MEthStateTrie, nodeHash), nodeVal, height, tx); err != nil {
return err return tx, err
} }
} }
return it.Error() return tx, it.Error()
} }
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT // reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT
@ -423,15 +426,6 @@ func (s *Service) processStateValueNode(it trie.NodeIterator, headerID string, h
return nil return nil
} }
// created vs updated is important for leaf nodes since we need to diff their storage
// so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey
var account types.StateAccount
accountRLP := make([]byte, len(it.LeafBlob()))
copy(accountRLP, it.LeafBlob())
if err := rlp.DecodeBytes(accountRLP, &account); err != nil {
return fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", it.LeafKey(), err)
}
// since this is a "value node", we need to move up to the "parent" node which is the actual leaf node // since this is a "value node", we need to move up to the "parent" node which is the actual leaf node
// it should be in the fastcache since it necessarily was recently accessed to reach the current "node" // it should be in the fastcache since it necessarily was recently accessed to reach the current "node"
parentNodeRLP, err := s.stateDB.TrieDB().Node(it.Parent()) parentNodeRLP, err := s.stateDB.TrieDB().Node(it.Parent())
@ -450,6 +444,15 @@ func (s *Service) processStateValueNode(it trie.NodeIterator, headerID string, h
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
// created vs updated is important for leaf nodes since we need to diff their storage
// so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey
var account types.StateAccount
accountRLP := make([]byte, len(it.LeafBlob()))
copy(accountRLP, it.LeafBlob())
if err := rlp.DecodeBytes(accountRLP, &account); err != nil {
return fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", leafKey, err)
}
// write codehash => code mappings if we have a contract // write codehash => code mappings if we have a contract
if !bytes.Equal(account.CodeHash, emptyCodeHash) { if !bytes.Equal(account.CodeHash, emptyCodeHash) {
codeHash := common.BytesToHash(account.CodeHash) codeHash := common.BytesToHash(account.CodeHash)

View File

@ -3,6 +3,7 @@ package snapshot
import ( import (
"errors" "errors"
"fmt" "fmt"
"math/big"
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
@ -11,6 +12,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
@ -192,8 +195,6 @@ func TestAccountSelectiveSnapshot(t *testing.T) {
expectedStorageNodes.Store(keys, value) expectedStorageNodes.Store(keys, value)
} }
var count int
pub, tx := makeMocks(t) pub, tx := makeMocks(t)
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
pub.EXPECT().BeginTx().Return(tx, nil). pub.EXPECT().BeginTx().Return(tx, nil).
@ -206,7 +207,6 @@ func TestAccountSelectiveSnapshot(t *testing.T) {
gomock.Any(), gomock.Any(),
gomock.Eq(tx)). gomock.Eq(tx)).
Do(func(stateNode *models.StateNodeModel, _ snapt.Tx) error { Do(func(stateNode *models.StateNodeModel, _ snapt.Tx) error {
count++
if stateNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() { if stateNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() {
t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey)
} }
@ -261,7 +261,12 @@ func TestAccountSelectiveSnapshot(t *testing.T) {
}). }).
AnyTimes() AnyTimes()
pub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)). pub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)).
AnyTimes() Do(func(_ cid.Cid, _ []byte, height *big.Int, _ snapt.Tx) {
if height.String() != fixt.Chain2_Block32_Header.Number.String() {
t.Fatalf("wrong blockheight for ipld publish: %s", height.String())
}
}).
MaxTimes(len(fixt.Chain2_Block32_StateIPLDs) + len(fixt.Chain2_Block32_StorageIPLDs) + 1 + 2)
chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data") chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data")
config := testConfig(chainDataPath, ancientDataPath) config := testConfig(chainDataPath, ancientDataPath)
@ -318,6 +323,7 @@ func TestRecovery(t *testing.T) {
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header)) pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers) pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
tx.EXPECT().Rollback().MaxTimes(workers)
tx.EXPECT().Commit().MaxTimes(workers) tx.EXPECT().Commit().MaxTimes(workers)
pub.EXPECT().PublishStateLeafNode( pub.EXPECT().PublishStateLeafNode(
gomock.Any(), gomock.Any(),
@ -481,7 +487,8 @@ func TestAccountSelectiveRecovery(t *testing.T) {
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
pub.EXPECT().BeginTx().Return(tx, nil).Times(workers) pub.EXPECT().BeginTx().Return(tx, nil).Times(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
tx.EXPECT().Commit().Times(workers) tx.EXPECT().Commit().MaxTimes(workers)
tx.EXPECT().Rollback().MaxTimes(workers)
pub.EXPECT().PublishStateLeafNode( pub.EXPECT().PublishStateLeafNode(
gomock.Any(), gomock.Any(),
gomock.Eq(tx)). gomock.Eq(tx)).