diff --git a/.gitignore b/.gitignore index 4b9b68b..c678610 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ mocks/ .vscode output_dir*/ log_file +recovery_file diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index 1ef6374..0517af0 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -202,9 +202,6 @@ func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, head } defer func() { err = CommitOrRollback(tx, err) - if err != nil { - log.Errorf("CommitOrRollback failed: %s", err) - } }() // path (from recovery dump) to be seeked on recovery @@ -224,28 +221,31 @@ func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, head recoveredPath = append(recoveredPath, *seekedPath...) endPath = i.endPath } else { - return errors.New("untracked iterator") + err = errors.New("untracked iterator") + return err } - return s.createSubTrieSnapshot(ctx, tx, nil, it, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths) + tx, err = s.createSubTrieSnapshot(ctx, tx, nil, it, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths) + return err } // createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator // continually updating seekedPath with path of the latest processed node func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator, - recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error { + recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) (Tx, error) { prom.IncActiveIterCount() defer prom.DecActiveIterCount() // descend in the first loop iteration to reach first child node + var err error descend := true for { select { case <-ctx.Done(): - return errors.New("ctx cancelled") + return tx, errors.New("ctx cancelled") default: if ok := subTrieIt.Next(descend); !ok { - return subTrieIt.Error() + return tx, subTrieIt.Error() } // to avoid descending further @@ -257,15 +257,16 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [ // if node path is empty and prefix is nil, it's the root node if prefixPath == nil { // create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie - if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath); err != nil { - return err + tx, err = s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath) + if err != nil { + return tx, err } updateSeekedPath(seekedPath, subTrieIt.Path()) } if ok := subTrieIt.Next(true); !ok { // return if no further nodes available - return subTrieIt.Error() + return tx, subTrieIt.Error() } } @@ -283,7 +284,7 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [ if trackedSubtrieIt, ok := subTrieIt.(*trackedIter); ok { s.tracker.stopIter(trackedSubtrieIt) } - return subTrieIt.Error() + return tx, subTrieIt.Error() } // skip the current node if it's before recovered path and not along the recovered path @@ -307,8 +308,9 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [ // if the node is along paths of interest // create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie - if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath); err != nil { - return err + tx, err = s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths, prefixPath) + if err != nil { + return tx, err } // update seeked path after node has been processed updateSeekedPath(seekedPath, nodePath) @@ -316,11 +318,12 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [ // create an iterator to traverse and process the next level of this subTrie nextSubTrieIt, err := s.createSubTrieIt(nodePath, subTrieIt.Hash(), recoveredPath) if err != nil { - return err + return tx, err } // pass on the seekedPath of the tracked concurrent iterator to be updated - if err := s.createSubTrieSnapshot(ctx, tx, nodePath, nextSubTrieIt, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths); err != nil { - return err + tx, err = s.createSubTrieSnapshot(ctx, tx, nodePath, nextSubTrieIt, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths) + if err != nil { + return tx, err } } } @@ -359,10 +362,10 @@ func (s *Service) createSubTrieIt(prefixPath []byte, hash common.Hash, recovered // createNodeSnapshot indexes the current node // entire storage trie is also indexed (if available) func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID string, height *big.Int, - watchedAddressesLeafPaths [][]byte, prefixPath []byte) error { + watchedAddressesLeafPaths [][]byte, prefixPath []byte) (Tx, error) { tx, err := s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) if err != nil { - return err + return tx, err } // index values by leaf key @@ -371,25 +374,25 @@ func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID strin // publish codehash => code mappings // take storage snapshot if err := s.processStateValueNode(it, headerID, height, prefixPath, watchedAddressesLeafPaths, tx); err != nil { - return err + return tx, err } } else { // trie nodes will be written to blockstore only // reminder that this includes leaf nodes, since the geth iterator.Leaf() actually signifies a "value" node // so this is also where we publish the IPLD block corresponding to the "value" nodes indexed above if IsNullHash(it.Hash()) { // skip null node - return nil + return tx, nil } nodeVal := make([]byte, len(it.NodeBlob())) copy(nodeVal, it.NodeBlob()) if len(watchedAddressesLeafPaths) > 0 { var elements []interface{} if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { - return err + return tx, err } ok, err := isLeaf(elements) if err != nil { - return err + return tx, err } if ok { // create the full node path as it.Path() doesn't include the path before subtrie root @@ -398,18 +401,18 @@ func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID strin valueNodePath := append(nodePath, partialPath...) if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) { // skip this node - return nil + return tx, nil } } } nodeHash := make([]byte, len(it.Hash().Bytes())) copy(nodeHash, it.Hash().Bytes()) if err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.MEthStateTrie, nodeHash), nodeVal, height, tx); err != nil { - return err + return tx, err } } - return it.Error() + return tx, it.Error() } // reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT @@ -423,15 +426,6 @@ func (s *Service) processStateValueNode(it trie.NodeIterator, headerID string, h return nil } - // created vs updated is important for leaf nodes since we need to diff their storage - // so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey - var account types.StateAccount - accountRLP := make([]byte, len(it.LeafBlob())) - copy(accountRLP, it.LeafBlob()) - if err := rlp.DecodeBytes(accountRLP, &account); err != nil { - return fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", it.LeafKey(), err) - } - // since this is a "value node", we need to move up to the "parent" node which is the actual leaf node // it should be in the fastcache since it necessarily was recently accessed to reach the current "node" parentNodeRLP, err := s.stateDB.TrieDB().Node(it.Parent()) @@ -450,6 +444,15 @@ func (s *Service) processStateValueNode(it trie.NodeIterator, headerID string, h encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] + // created vs updated is important for leaf nodes since we need to diff their storage + // so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey + var account types.StateAccount + accountRLP := make([]byte, len(it.LeafBlob())) + copy(accountRLP, it.LeafBlob()) + if err := rlp.DecodeBytes(accountRLP, &account); err != nil { + return fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", leafKey, err) + } + // write codehash => code mappings if we have a contract if !bytes.Equal(account.CodeHash, emptyCodeHash) { codeHash := common.BytesToHash(account.CodeHash) diff --git a/pkg/snapshot/service_test.go b/pkg/snapshot/service_test.go index 93f0c08..7dc16dd 100644 --- a/pkg/snapshot/service_test.go +++ b/pkg/snapshot/service_test.go @@ -3,6 +3,7 @@ package snapshot import ( "errors" "fmt" + "math/big" "math/rand" "os" "path/filepath" @@ -11,6 +12,8 @@ import ( "testing" "time" + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/golang/mock/gomock" @@ -192,8 +195,6 @@ func TestAccountSelectiveSnapshot(t *testing.T) { expectedStorageNodes.Store(keys, value) } - var count int - pub, tx := makeMocks(t) pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) pub.EXPECT().BeginTx().Return(tx, nil). @@ -206,7 +207,6 @@ func TestAccountSelectiveSnapshot(t *testing.T) { gomock.Any(), gomock.Eq(tx)). Do(func(stateNode *models.StateNodeModel, _ snapt.Tx) error { - count++ if stateNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() { t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) } @@ -261,7 +261,12 @@ func TestAccountSelectiveSnapshot(t *testing.T) { }). AnyTimes() pub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)). - AnyTimes() + Do(func(_ cid.Cid, _ []byte, height *big.Int, _ snapt.Tx) { + if height.String() != fixt.Chain2_Block32_Header.Number.String() { + t.Fatalf("wrong blockheight for ipld publish: %s", height.String()) + } + }). + MaxTimes(len(fixt.Chain2_Block32_StateIPLDs) + len(fixt.Chain2_Block32_StorageIPLDs) + 1 + 2) chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data") config := testConfig(chainDataPath, ancientDataPath) @@ -318,6 +323,7 @@ func TestRecovery(t *testing.T) { pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header)) pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers) pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() + tx.EXPECT().Rollback().MaxTimes(workers) tx.EXPECT().Commit().MaxTimes(workers) pub.EXPECT().PublishStateLeafNode( gomock.Any(), @@ -481,7 +487,8 @@ func TestAccountSelectiveRecovery(t *testing.T) { pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) pub.EXPECT().BeginTx().Return(tx, nil).Times(workers) pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() - tx.EXPECT().Commit().Times(workers) + tx.EXPECT().Commit().MaxTimes(workers) + tx.EXPECT().Rollback().MaxTimes(workers) pub.EXPECT().PublishStateLeafNode( gomock.Any(), gomock.Eq(tx)).