Merge pull request #75 from cerc-io/ian/v5_dev
fix overshadowing of snap.Tx
This commit is contained in:
commit
dd86f02997
1
.gitignore
vendored
1
.gitignore
vendored
@ -5,3 +5,4 @@ mocks/
|
||||
.vscode
|
||||
output_dir*/
|
||||
log_file
|
||||
recovery_file
|
||||
|
@ -202,9 +202,6 @@ func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, head
|
||||
}
|
||||
defer func() {
|
||||
err = CommitOrRollback(tx, err)
|
||||
if err != nil {
|
||||
log.Errorf("CommitOrRollback failed: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// path (from recovery dump) to be seeked on recovery
|
||||
@ -224,28 +221,31 @@ func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, head
|
||||
recoveredPath = append(recoveredPath, *seekedPath...)
|
||||
endPath = i.endPath
|
||||
} else {
|
||||
return errors.New("untracked iterator")
|
||||
err = errors.New("untracked iterator")
|
||||
return err
|
||||
}
|
||||
|
||||
return s.createSubTrieSnapshot(ctx, tx, nil, it, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths)
|
||||
tx, err = s.createSubTrieSnapshot(ctx, tx, nil, it, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths)
|
||||
return err
|
||||
}
|
||||
|
||||
// createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator
|
||||
// continually updating seekedPath with path of the latest processed node
|
||||
func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator,
|
||||
recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error {
|
||||
recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) (Tx, error) {
|
||||
prom.IncActiveIterCount()
|
||||
defer prom.DecActiveIterCount()
|
||||
|
||||
// descend in the first loop iteration to reach first child node
|
||||
var err error
|
||||
descend := true
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errors.New("ctx cancelled")
|
||||
return tx, errors.New("ctx cancelled")
|
||||
default:
|
||||
if ok := subTrieIt.Next(descend); !ok {
|
||||
return subTrieIt.Error()
|
||||
return tx, subTrieIt.Error()
|
||||
}
|
||||
|
||||
// to avoid descending further
|
||||
@ -257,15 +257,16 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
|
||||
// if node path is empty and prefix is nil, it's the root node
|
||||
if prefixPath == nil {
|
||||
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
|
||||
if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath); err != nil {
|
||||
return err
|
||||
tx, err = s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath)
|
||||
if err != nil {
|
||||
return tx, err
|
||||
}
|
||||
updateSeekedPath(seekedPath, subTrieIt.Path())
|
||||
}
|
||||
|
||||
if ok := subTrieIt.Next(true); !ok {
|
||||
// return if no further nodes available
|
||||
return subTrieIt.Error()
|
||||
return tx, subTrieIt.Error()
|
||||
}
|
||||
}
|
||||
|
||||
@ -283,7 +284,7 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
|
||||
if trackedSubtrieIt, ok := subTrieIt.(*trackedIter); ok {
|
||||
s.tracker.stopIter(trackedSubtrieIt)
|
||||
}
|
||||
return subTrieIt.Error()
|
||||
return tx, subTrieIt.Error()
|
||||
}
|
||||
|
||||
// skip the current node if it's before recovered path and not along the recovered path
|
||||
@ -307,8 +308,9 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
|
||||
|
||||
// if the node is along paths of interest
|
||||
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
|
||||
if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath); err != nil {
|
||||
return err
|
||||
tx, err = s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath)
|
||||
if err != nil {
|
||||
return tx, err
|
||||
}
|
||||
// update seeked path after node has been processed
|
||||
updateSeekedPath(seekedPath, nodePath)
|
||||
@ -316,11 +318,12 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
|
||||
// create an iterator to traverse and process the next level of this subTrie
|
||||
nextSubTrieIt, err := s.createSubTrieIt(nodePath, subTrieIt.Hash(), recoveredPath)
|
||||
if err != nil {
|
||||
return err
|
||||
return tx, err
|
||||
}
|
||||
// pass on the seekedPath of the tracked concurrent iterator to be updated
|
||||
if err := s.createSubTrieSnapshot(ctx, tx, nodePath, nextSubTrieIt, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths); err != nil {
|
||||
return err
|
||||
tx, err = s.createSubTrieSnapshot(ctx, tx, nodePath, nextSubTrieIt, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths)
|
||||
if err != nil {
|
||||
return tx, err
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -359,10 +362,10 @@ func (s *Service) createSubTrieIt(prefixPath []byte, hash common.Hash, recovered
|
||||
// createNodeSnapshot indexes the current node
|
||||
// entire storage trie is also indexed (if available)
|
||||
func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID string, height *big.Int,
|
||||
watchedAddressesLeafPaths [][]byte, prefixPath []byte) error {
|
||||
watchedAddressesLeafPaths [][]byte, prefixPath []byte) (Tx, error) {
|
||||
tx, err := s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
return tx, err
|
||||
}
|
||||
|
||||
// index values by leaf key
|
||||
@ -371,25 +374,25 @@ func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID strin
|
||||
// publish codehash => code mappings
|
||||
// take storage snapshot
|
||||
if err := s.processStateValueNode(it, headerID, height, prefixPath, watchedAddressesLeafPaths, tx); err != nil {
|
||||
return err
|
||||
return tx, err
|
||||
}
|
||||
} else { // trie nodes will be written to blockstore only
|
||||
// reminder that this includes leaf nodes, since the geth iterator.Leaf() actually signifies a "value" node
|
||||
// so this is also where we publish the IPLD block corresponding to the "value" nodes indexed above
|
||||
if IsNullHash(it.Hash()) {
|
||||
// skip null node
|
||||
return nil
|
||||
return tx, nil
|
||||
}
|
||||
nodeVal := make([]byte, len(it.NodeBlob()))
|
||||
copy(nodeVal, it.NodeBlob())
|
||||
if len(watchedAddressesLeafPaths) > 0 {
|
||||
var elements []interface{}
|
||||
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
|
||||
return err
|
||||
return tx, err
|
||||
}
|
||||
ok, err := isLeaf(elements)
|
||||
if err != nil {
|
||||
return err
|
||||
return tx, err
|
||||
}
|
||||
if ok {
|
||||
// create the full node path as it.Path() doesn't include the path before subtrie root
|
||||
@ -398,18 +401,18 @@ func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID strin
|
||||
valueNodePath := append(nodePath, partialPath...)
|
||||
if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) {
|
||||
// skip this node
|
||||
return nil
|
||||
return tx, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeHash := make([]byte, len(it.Hash().Bytes()))
|
||||
copy(nodeHash, it.Hash().Bytes())
|
||||
if err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.MEthStateTrie, nodeHash), nodeVal, height, tx); err != nil {
|
||||
return err
|
||||
return tx, err
|
||||
}
|
||||
}
|
||||
|
||||
return it.Error()
|
||||
return tx, it.Error()
|
||||
}
|
||||
|
||||
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT
|
||||
@ -423,15 +426,6 @@ func (s *Service) processStateValueNode(it trie.NodeIterator, headerID string, h
|
||||
return nil
|
||||
}
|
||||
|
||||
// created vs updated is important for leaf nodes since we need to diff their storage
|
||||
// so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey
|
||||
var account types.StateAccount
|
||||
accountRLP := make([]byte, len(it.LeafBlob()))
|
||||
copy(accountRLP, it.LeafBlob())
|
||||
if err := rlp.DecodeBytes(accountRLP, &account); err != nil {
|
||||
return fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", it.LeafKey(), err)
|
||||
}
|
||||
|
||||
// since this is a "value node", we need to move up to the "parent" node which is the actual leaf node
|
||||
// it should be in the fastcache since it necessarily was recently accessed to reach the current "node"
|
||||
parentNodeRLP, err := s.stateDB.TrieDB().Node(it.Parent())
|
||||
@ -450,6 +444,15 @@ func (s *Service) processStateValueNode(it trie.NodeIterator, headerID string, h
|
||||
encodedPath := trie.HexToCompact(valueNodePath)
|
||||
leafKey := encodedPath[1:]
|
||||
|
||||
// created vs updated is important for leaf nodes since we need to diff their storage
|
||||
// so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey
|
||||
var account types.StateAccount
|
||||
accountRLP := make([]byte, len(it.LeafBlob()))
|
||||
copy(accountRLP, it.LeafBlob())
|
||||
if err := rlp.DecodeBytes(accountRLP, &account); err != nil {
|
||||
return fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", leafKey, err)
|
||||
}
|
||||
|
||||
// write codehash => code mappings if we have a contract
|
||||
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
|
||||
codeHash := common.BytesToHash(account.CodeHash)
|
||||
|
@ -3,6 +3,7 @@ package snapshot
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -11,6 +12,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
"github.com/golang/mock/gomock"
|
||||
@ -192,8 +195,6 @@ func TestAccountSelectiveSnapshot(t *testing.T) {
|
||||
expectedStorageNodes.Store(keys, value)
|
||||
}
|
||||
|
||||
var count int
|
||||
|
||||
pub, tx := makeMocks(t)
|
||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
|
||||
pub.EXPECT().BeginTx().Return(tx, nil).
|
||||
@ -206,7 +207,6 @@ func TestAccountSelectiveSnapshot(t *testing.T) {
|
||||
gomock.Any(),
|
||||
gomock.Eq(tx)).
|
||||
Do(func(stateNode *models.StateNodeModel, _ snapt.Tx) error {
|
||||
count++
|
||||
if stateNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() {
|
||||
t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey)
|
||||
}
|
||||
@ -261,7 +261,12 @@ func TestAccountSelectiveSnapshot(t *testing.T) {
|
||||
}).
|
||||
AnyTimes()
|
||||
pub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)).
|
||||
AnyTimes()
|
||||
Do(func(_ cid.Cid, _ []byte, height *big.Int, _ snapt.Tx) {
|
||||
if height.String() != fixt.Chain2_Block32_Header.Number.String() {
|
||||
t.Fatalf("wrong blockheight for ipld publish: %s", height.String())
|
||||
}
|
||||
}).
|
||||
MaxTimes(len(fixt.Chain2_Block32_StateIPLDs) + len(fixt.Chain2_Block32_StorageIPLDs) + 1 + 2)
|
||||
|
||||
chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data")
|
||||
config := testConfig(chainDataPath, ancientDataPath)
|
||||
@ -318,6 +323,7 @@ func TestRecovery(t *testing.T) {
|
||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
|
||||
pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers)
|
||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
|
||||
tx.EXPECT().Rollback().MaxTimes(workers)
|
||||
tx.EXPECT().Commit().MaxTimes(workers)
|
||||
pub.EXPECT().PublishStateLeafNode(
|
||||
gomock.Any(),
|
||||
@ -481,7 +487,8 @@ func TestAccountSelectiveRecovery(t *testing.T) {
|
||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
|
||||
pub.EXPECT().BeginTx().Return(tx, nil).Times(workers)
|
||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
|
||||
tx.EXPECT().Commit().Times(workers)
|
||||
tx.EXPECT().Commit().MaxTimes(workers)
|
||||
tx.EXPECT().Rollback().MaxTimes(workers)
|
||||
pub.EXPECT().PublishStateLeafNode(
|
||||
gomock.Any(),
|
||||
gomock.Eq(tx)).
|
||||
|
Loading…
Reference in New Issue
Block a user