ipld-eth-state-snapshot/pkg/snapshot/service.go

519 lines
17 KiB
Go
Raw Normal View History

2020-07-01 18:44:59 +00:00
// Copyright © 2020 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package snapshot
import (
"bytes"
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
"context"
2020-07-01 23:07:56 +00:00
"errors"
2020-07-01 18:44:59 +00:00
"fmt"
"math/big"
2020-07-01 18:44:59 +00:00
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
2022-01-08 03:32:45 +00:00
"github.com/ethereum/go-ethereum/core/types"
2020-07-01 18:44:59 +00:00
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
iter "github.com/ethereum/go-ethereum/trie/concurrent_iterator"
2022-03-09 13:37:33 +00:00
log "github.com/sirupsen/logrus"
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
"golang.org/x/sync/errgroup"
2020-07-01 18:44:59 +00:00
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
. "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types"
2020-07-01 18:44:59 +00:00
)
var (
emptyNode, _ = rlp.EncodeToBytes(&[]byte{})
emptyCodeHash = crypto.Keccak256([]byte{})
2020-07-01 18:44:59 +00:00
emptyContractRoot = crypto.Keccak256Hash(emptyNode)
defaultBatchSize = uint(100)
2020-07-01 18:44:59 +00:00
)
2021-12-14 06:50:19 +00:00
// Service holds ethDB and stateDB to read data from lvldb and Publisher
// to publish trie in postgres DB.
2020-07-01 18:44:59 +00:00
type Service struct {
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
watchingAddresses bool
ethDB ethdb.Database
stateDB state.Database
ipfsPublisher Publisher
maxBatchSize uint
tracker iteratorTracker
recoveryFile string
2020-07-01 18:44:59 +00:00
}
2022-01-11 05:37:27 +00:00
func NewLevelDB(con *EthConfig) (ethdb.Database, error) {
kvdb, _ := rawdb.NewLevelDBDatabase(con.LevelDBPath, 1024, 256, "ipld-eth-state-snapshot", true)
edb, err := rawdb.NewDatabaseWithFreezer(kvdb, con.AncientDBPath, "ipld-eth-state-snapshot", true)
if err != nil {
return nil, fmt.Errorf("unable to create NewLevelDBDatabaseWithFreezer: %s", err)
}
return edb, nil
2022-01-11 05:37:27 +00:00
}
2022-01-11 05:37:27 +00:00
// NewSnapshotService creates Service.
func NewSnapshotService(edb ethdb.Database, pub Publisher, recoveryFile string) (*Service, error) {
2020-07-01 18:44:59 +00:00
return &Service{
ethDB: edb,
stateDB: state.NewDatabase(edb),
2022-01-11 00:59:26 +00:00
ipfsPublisher: pub,
maxBatchSize: defaultBatchSize,
recoveryFile: recoveryFile,
2020-07-01 18:44:59 +00:00
}, nil
}
2020-08-20 10:23:36 +00:00
type SnapshotParams struct {
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
WatchedAddresses map[common.Address]struct{}
Height uint64
Workers uint
2020-08-20 10:23:36 +00:00
}
2020-08-23 04:38:31 +00:00
func (s *Service) CreateSnapshot(params SnapshotParams) error {
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
paths := make([][]byte, 0, len(params.WatchedAddresses))
for addr := range params.WatchedAddresses {
paths = append(paths, keybytesToHex(crypto.Keccak256(addr.Bytes())))
}
s.watchingAddresses = len(paths) > 0
// extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header
2022-03-09 13:37:33 +00:00
log.Infof("Creating snapshot at height %d", params.Height)
2020-08-23 04:38:31 +00:00
hash := rawdb.ReadCanonicalHash(s.ethDB, params.Height)
header := rawdb.ReadHeader(s.ethDB, hash, params.Height)
if header == nil {
2020-08-23 04:38:31 +00:00
return fmt.Errorf("unable to read canonical header at height %d", params.Height)
}
2022-03-09 13:37:33 +00:00
log.Infof("head hash: %s head height: %d", hash.Hex(), params.Height)
err := s.ipfsPublisher.PublishHeader(header)
if err != nil {
return err
}
2020-08-23 04:38:31 +00:00
tree, err := s.stateDB.OpenTrie(header.Root)
if err != nil {
return err
}
2022-06-08 12:08:17 +00:00
headerID := header.Hash().String()
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
ctx, cancelCtx := context.WithCancel(context.Background())
2022-03-11 14:28:32 +00:00
s.tracker = newTracker(s.recoveryFile, int(params.Workers))
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
s.tracker.captureSignal(cancelCtx)
var iters []trie.NodeIterator
// attempt to restore from recovery file if it exists
2022-03-11 14:28:32 +00:00
iters, err = s.tracker.restore(tree)
if err != nil {
2022-05-26 10:20:42 +00:00
log.Errorf("restore error: %s", err.Error())
return err
}
2022-06-08 12:08:17 +00:00
if iters != nil {
2022-06-08 12:08:17 +00:00
log.Debugf("restored iterators; count: %d", len(iters))
if params.Workers < uint(len(iters)) {
return fmt.Errorf(
"number of recovered workers (%d) is greater than number configured (%d)",
len(iters), params.Workers,
)
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
} else {
// nothing to restore
2022-06-08 12:08:17 +00:00
log.Debugf("no iterators to restore")
if params.Workers > 1 {
iters = iter.SubtrieIterators(tree, params.Workers)
} else {
iters = []trie.NodeIterator{tree.NodeIterator(nil)}
}
for i, it := range iters {
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// recovered path is nil for fresh iterators
iters[i] = s.tracker.tracked(it, nil)
}
}
defer func() {
2022-03-11 14:28:32 +00:00
err := s.tracker.haltAndDump()
if err != nil {
2022-05-26 10:20:42 +00:00
log.Errorf("failed to write recovery file: %v", err)
}
}()
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
switch {
case len(iters) > 1:
return s.createSnapshotAsync(ctx, iters, headerID, new(big.Int).SetUint64(params.Height), paths)
case len(iters) == 1:
return s.createSnapshot(ctx, iters[0], headerID, new(big.Int).SetUint64(params.Height), paths)
default:
return nil
2020-08-20 10:23:36 +00:00
}
}
2020-08-20 10:23:36 +00:00
// Create snapshot up to head (ignores height param)
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses map[common.Address]struct{}) error {
2022-03-09 13:37:33 +00:00
log.Info("Creating snapshot at head")
2020-08-20 10:23:36 +00:00
hash := rawdb.ReadHeadHeaderHash(s.ethDB)
height := rawdb.ReadHeaderNumber(s.ethDB, hash)
if height == nil {
return fmt.Errorf("unable to read header height for header hash %s", hash.String())
2020-07-16 15:02:16 +00:00
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers, WatchedAddresses: watchedAddresses})
2020-08-20 10:23:36 +00:00
}
type nodeResult struct {
node Node
2020-08-20 10:23:36 +00:00
elements []interface{}
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
func resolveNode(nodePath []byte, it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) {
// "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
if it.Leaf() {
return nil, nil
}
if IsNullHash(it.Hash()) {
return nil, nil
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// use full node path
// (it.Path() will give partial path in case of subtrie iterators)
path := make([]byte, len(nodePath))
copy(path, nodePath)
2022-01-11 00:06:29 +00:00
n, err := trieDB.Node(it.Hash())
2020-07-01 18:44:59 +00:00
if err != nil {
2020-08-20 10:23:36 +00:00
return nil, err
2020-07-01 18:44:59 +00:00
}
2022-01-11 00:06:29 +00:00
var elements []interface{}
if err := rlp.DecodeBytes(n, &elements); err != nil {
2020-08-20 10:23:36 +00:00
return nil, err
}
2022-01-11 00:06:29 +00:00
ty, err := CheckKeyType(elements)
2020-07-01 18:44:59 +00:00
if err != nil {
2020-08-20 10:23:36 +00:00
return nil, err
2020-07-01 18:44:59 +00:00
}
2020-08-20 10:23:36 +00:00
return &nodeResult{
node: Node{
NodeType: ty,
Path: path,
Value: n,
2020-08-20 10:23:36 +00:00
},
2022-01-11 00:06:29 +00:00
elements: elements,
2020-08-20 10:23:36 +00:00
}, nil
2020-07-01 18:44:59 +00:00
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// validPath checks if a path is prefix to any one of the paths in the given list
func validPath(currentPath []byte, seekingPaths [][]byte) bool {
for _, seekingPath := range seekingPaths {
if bytes.HasPrefix(seekingPath, currentPath) {
return true
}
}
return false
}
// createSnapshot performs traversal using the given iterator and indexes the nodes
// optionally filtering them according to a list of paths
func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
tx, err := s.ipfsPublisher.BeginTx()
2022-01-11 00:06:29 +00:00
if err != nil {
return err
}
defer func() {
err = CommitOrRollback(tx, err)
if err != nil {
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
log.Errorf("CommitOrRollback failed: %s", err)
}
}()
2022-01-11 00:06:29 +00:00
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// path (from recovery dump) to be seeked on recovery
// nil in case of a fresh iterator
var recoveredPath []byte
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// latest path seeked from the concurrent iterator
// (updated after a node processed)
// nil in case of a fresh iterator; initially holds the recovered path in case of a recovered iterator
var seekedPath *[]byte
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// end path for the concurrent iterator
var endPath []byte
if iter, ok := it.(*trackedIter); ok {
seekedPath = &iter.seekedPath
recoveredPath = append(recoveredPath, *seekedPath...)
endPath = iter.endPath
} else {
return errors.New("untracked iterator")
}
return s.createSubTrieSnapshot(ctx, tx, nil, it, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths)
}
// createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator
// continually updating seekedPath with path of the latest processed node
func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator, recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error {
prom.IncActiveIterCount()
defer prom.DecActiveIterCount()
// descend in the first loop iteration to reach first child node
descend := true
for {
select {
case <-ctx.Done():
return errors.New("ctx cancelled")
default:
if ok := subTrieIt.Next(descend); !ok {
return subTrieIt.Error()
2020-07-01 23:07:56 +00:00
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// to avoid descending further
descend = false
// move on to next node if current path is empty
// occurs when reaching root node or just before reaching the first child of a subtrie in case of some concurrent iterators
if bytes.Equal(subTrieIt.Path(), []byte{}) {
// if node path is empty and prefix is nil, it's the root node
if prefixPath == nil {
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
if err := s.createNodeSnapshot(tx, subTrieIt.Path(), subTrieIt, headerID, height); err != nil {
return err
}
updateSeekedPath(seekedPath, subTrieIt.Path())
2022-01-08 03:32:45 +00:00
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
if ok := subTrieIt.Next(true); !ok {
// return if no further nodes available
return subTrieIt.Error()
2022-01-08 03:32:45 +00:00
}
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// create the full node path as it.Path() doesn't include the path before subtrie root
nodePath := append(prefixPath, subTrieIt.Path()...)
// check iterator upper bound before processing the node
// required to avoid processing duplicate nodes:
// if a node is considered more than once,
// it's whole subtrie is re-processed giving large number of duplicate nodoes
if !checkUpperPathBound(nodePath, endPath) {
// fmt.Println("failed checkUpperPathBound", nodePath, endPath)
// explicitly stop the iterator in tracker if upper bound check fails
// required since it won't be marked as stopped if further nodes are still available
if trackedSubtrieIt, ok := subTrieIt.(*trackedIter); ok {
s.tracker.stopIter(trackedSubtrieIt)
}
return subTrieIt.Error()
2022-01-08 03:32:45 +00:00
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// skip the current node if it's before recovered path and not along the recovered path
// nodes at the same level that are before recovered path are ignored to avoid duplicate nodes
// however, nodes along the recovered path are re-considered for redundancy
if bytes.Compare(recoveredPath, nodePath) > 0 &&
// a node is along the recovered path if it's path is shorter or equal in length
// and is part of the recovered path
!(len(nodePath) <= len(recoveredPath) && bytes.Equal(recoveredPath[:len(nodePath)], nodePath)) {
continue
}
// ignore node if it is not along paths of interest
if s.watchingAddresses && !validPath(nodePath, seekingPaths) {
// consider this node as processed since it is getting ignored
// and update the seeked path
updateSeekedPath(seekedPath, nodePath)
// move on to the next node
continue
}
// if the node is along paths of interest
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
if err := s.createNodeSnapshot(tx, nodePath, subTrieIt, headerID, height); err != nil {
return err
}
// update seeked path after node has been processed
updateSeekedPath(seekedPath, nodePath)
// create an iterator to traverse and process the next level of this subTrie
nextSubTrieIt, err := s.createSubTrieIt(nodePath, subTrieIt.Hash(), recoveredPath)
if err != nil {
return err
}
// pass on the seekedPath of the tracked concurrent iterator to be updated
if err := s.createSubTrieSnapshot(ctx, tx, nodePath, nextSubTrieIt, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths); err != nil {
2020-07-01 23:07:56 +00:00
return err
}
2020-08-20 10:23:36 +00:00
}
2020-07-01 18:44:59 +00:00
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
}
// createSubTrieIt creates an iterator to traverse the subtrie of node with the given hash
// the subtrie iterator is initialized at a node from the recovered path at corresponding level (if avaiable)
func (s *Service) createSubTrieIt(prefixPath []byte, hash common.Hash, recoveredPath []byte) (trie.NodeIterator, error) {
// skip directly to the node from the recovered path at corresponding level
// applicable if:
// node path is behind recovered path
// and recovered path includes the prefix path
var startPath []byte
if bytes.Compare(recoveredPath, prefixPath) > 0 &&
len(recoveredPath) > len(prefixPath) &&
bytes.Equal(recoveredPath[:len(prefixPath)], prefixPath) {
startPath = append(startPath, recoveredPath[len(prefixPath):len(prefixPath)+1]...)
// force the lower bound path to an even length
// (required by HexToKeyBytes())
if len(startPath)&0b1 == 1 {
// decrement first to avoid skipped nodes
decrementPath(startPath)
startPath = append(startPath, 0)
}
}
// create subTrie iterator with the given hash
subTrie, err := s.stateDB.OpenTrie(hash)
if err != nil {
return nil, err
}
return subTrie.NodeIterator(iter.HexToKeyBytes(startPath)), nil
}
// createNodeSnapshot indexes the current node
// entire storage trie is also indexed (if available)
func (s *Service) createNodeSnapshot(tx Tx, path []byte, it trie.NodeIterator, headerID string, height *big.Int) error {
res, err := resolveNode(path, it, s.stateDB.TrieDB())
if err != nil {
return err
}
if res == nil {
return nil
}
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
if err != nil {
return err
}
switch res.node.NodeType {
case Leaf:
// if the node is a leaf, decode the account and publish the associated storage trie
// nodes if there are any
var account types.StateAccount
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
return fmt.Errorf(
"error decoding account for leaf node at path %x nerror: %v", res.node.Path, err)
}
partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey)
if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil {
return err
}
// publish any non-nil code referenced by codehash
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
codeHash := common.BytesToHash(account.CodeHash)
codeBytes := rawdb.ReadCode(s.ethDB, codeHash)
if len(codeBytes) == 0 {
log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
return errors.New("missing code")
}
if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil {
return err
}
}
if _, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
}
case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{})
if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil {
return err
}
default:
return errors.New("unexpected node type")
}
2020-08-03 15:46:35 +00:00
return it.Error()
2020-07-01 18:44:59 +00:00
}
2022-01-08 03:32:45 +00:00
// Full-trie concurrent snapshot
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
func (s *Service) createSnapshotAsync(ctx context.Context, iters []trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
// use errgroup with a context to stop all concurrent iterators if one runs into an error
// each concurrent iterator completes processing it's current node before stopping
g, ctx := errgroup.WithContext(ctx)
for _, it := range iters {
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
func(it trie.NodeIterator) {
g.Go(func() error {
return s.createSnapshot(ctx, it, headerID, height, seekingPaths)
})
}(it)
2020-09-06 07:36:36 +00:00
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
return g.Wait()
2020-08-20 10:23:36 +00:00
}
func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.Int, statePath []byte, tx Tx) (Tx, error) {
2020-07-01 18:44:59 +00:00
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return tx, nil
2020-07-01 18:44:59 +00:00
}
2020-07-01 18:44:59 +00:00
sTrie, err := s.stateDB.OpenTrie(sr)
if err != nil {
return nil, err
2020-07-01 18:44:59 +00:00
}
2020-07-01 18:44:59 +00:00
it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) {
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
res, err := resolveNode(it.Path(), it, s.stateDB.TrieDB())
2020-07-01 18:44:59 +00:00
if err != nil {
return nil, err
}
if res == nil {
continue
}
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
2020-07-01 18:44:59 +00:00
if err != nil {
return nil, err
2020-07-01 18:44:59 +00:00
}
2022-01-11 00:06:29 +00:00
var nodeData []byte
nodeData, err = s.stateDB.TrieDB().Node(it.Hash())
2020-07-01 18:44:59 +00:00
if err != nil {
return nil, err
2020-07-01 18:44:59 +00:00
}
res.node.Value = nodeData
2022-01-11 00:06:29 +00:00
switch res.node.NodeType {
case Leaf:
2020-08-20 10:23:36 +00:00
partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...)
2020-07-01 18:44:59 +00:00
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey)
case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{})
2020-07-01 18:44:59 +00:00
default:
return nil, errors.New("unexpected node type")
2020-07-01 23:07:56 +00:00
}
2022-04-19 10:19:49 +00:00
if err = s.ipfsPublisher.PublishStorageNode(&res.node, headerID, height, statePath, tx); err != nil {
2022-01-11 00:06:29 +00:00
return nil, err
2020-07-01 18:44:59 +00:00
}
}
return tx, it.Error()
2020-07-01 18:44:59 +00:00
}