diff --git a/README.md b/README.md index 1700096..add34b1 100644 --- a/README.md +++ b/README.md @@ -24,4 +24,5 @@ Config format: [snapshot] blockHeight = 0 + divideDepth = 1 ``` diff --git a/cmd/stateSnapshot.go b/cmd/stateSnapshot.go index e48fdb2..1ea4451 100644 --- a/cmd/stateSnapshot.go +++ b/cmd/stateSnapshot.go @@ -44,13 +44,14 @@ func stateSnapshot() { logWithCommand.Fatal(err) } height := viper.GetInt64("snapshot.blockHeight") - doAsync := viper.GetBool("snapshot.async") + divideDepth := viper.GetInt("snapshot.divideDepth") + params := snapshot.SnapshotParams{DivideDepth: divideDepth} if height < 0 { - if err := snapshotService.CreateLatestSnapshot(doAsync); err != nil { + if err := snapshotService.CreateLatestSnapshot(params); err != nil { logWithCommand.Fatal(err) } } else { - params := snapshot.SnapshotParams{Height: uint64(height), Async: doAsync} + params.Height = uint64(height) if err := snapshotService.CreateSnapshot(params); err != nil { logWithCommand.Fatal(err) } @@ -64,10 +65,10 @@ func init() { stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore") stateSnapshotCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore") stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at") - stateSnapshotCmd.PersistentFlags().Bool("async", false, "use the async iterator") + stateSnapshotCmd.PersistentFlags().Int("divide-depth", 0, "trie depth to divide concurrent work at") viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path")) viper.BindPFlag("leveldb.ancient", stateSnapshotCmd.PersistentFlags().Lookup("ancient-path")) viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height")) - viper.BindPFlag("snapshot.async", stateSnapshotCmd.PersistentFlags().Lookup("async")) + viper.BindPFlag("snapshot.divideDepth", stateSnapshotCmd.PersistentFlags().Lookup("divide-depth")) } diff --git a/go.mod b/go.mod index 64b6473..cd885bc 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,10 @@ require ( github.com/sirupsen/logrus v1.6.0 github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.0 + github.com/vulcanize/go-eth-state-node-iterator v0.1.0 github.com/vulcanize/ipfs-blockchain-watcher v0.0.11-alpha ) replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2 + +replace github.com/vulcanize/go-eth-state-node-iterator v0.1.0 => /home/roy/vulcanize/go-eth-state-node-iterator diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index d831338..bcee440 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -31,6 +31,7 @@ import ( "github.com/sirupsen/logrus" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" + iter "github.com/vulcanize/go-eth-state-node-iterator/pkg/iterator" ) var ( @@ -66,54 +67,45 @@ func NewSnapshotService(con ServiceConfig) (*Service, error) { type SnapshotParams struct { Height uint64 - Async bool + DivideDepth int } -type workerParams struct { - trieDB *trie.Database - headerID int64 - waitGroup *sync.WaitGroup -} - -func (s *Service) CreateSnapshot(par SnapshotParams) error { +func (s *Service) CreateSnapshot(params SnapshotParams) error { // extract header from lvldb and publish to PG-IPFS // hold onto the headerID so that we can link the state nodes to this header - logrus.Infof("Creating snapshot at height %d", par.Height) - hash := rawdb.ReadCanonicalHash(s.ethDB, par.Height) - header := rawdb.ReadHeader(s.ethDB, hash, par.Height) + logrus.Infof("Creating snapshot at height %d", params.Height) + hash := rawdb.ReadCanonicalHash(s.ethDB, params.Height) + header := rawdb.ReadHeader(s.ethDB, hash, params.Height) if header == nil { - return fmt.Errorf("unable to read canonical header at height %d", par.Height) + return fmt.Errorf("unable to read canonical header at height %d", params.Height) } headerID, err := s.ipfsPublisher.PublishHeader(header) if err != nil { return err } + t, err := s.stateDB.OpenTrie(header.Root) if err != nil { return err } - trieDB := s.stateDB.TrieDB() - params := workerParams{trieDB: trieDB, headerID: headerID} - if par.Async { - var wg sync.WaitGroup - params.waitGroup = &wg - return s.createSnapshotAsync(t.NodeIterator([]byte{}), 0, params) - wg.Wait() + if params.DivideDepth > 0 { + return s.createSnapshotAsync(t, headerID, params.DivideDepth) } else { - return s.createSnapshot(t.NodeIterator([]byte{}), params) + return s.createSnapshot(t.NodeIterator(nil), headerID) } return nil } // Create snapshot up to head (ignores height param) -func (s *Service) CreateLatestSnapshot(doAsync bool) error { +func (s *Service) CreateLatestSnapshot(params SnapshotParams) error { logrus.Info("Creating snapshot at head") hash := rawdb.ReadHeadHeaderHash(s.ethDB) height := rawdb.ReadHeaderNumber(s.ethDB, hash) if height == nil { return fmt.Errorf("unable to read header height for header hash %s", hash.String()) } - return s.CreateSnapshot(SnapshotParams{Height: *height, Async: doAsync}) + params.Height = *height + return s.CreateSnapshot(params) } // cache the elements @@ -122,7 +114,7 @@ type nodeResult struct { elements []interface{} } -func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { +func resolveNode(it iter.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { nodePath := make([]byte, len(it.Path())) copy(nodePath, it.Path()) node, err := trieDB.Node(it.Hash()) @@ -147,17 +139,14 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro }, nil } - -func (s *Service) processNode(it trie.NodeIterator, par workerParams) error { - fmt.Printf(" [!]: %v\n", it.Path()) - +func (s *Service) processNode(it iter.NodeIterator, headerID int64) error { if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves return nil } if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { return nil } - res, err := resolveNode(it, par.trieDB) + res, err := resolveNode(it, s.stateDB.TrieDB()) if err != nil { return err } @@ -174,7 +163,7 @@ func (s *Service) processNode(it trie.NodeIterator, par workerParams) error { encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] res.node.Key = common.BytesToHash(leafKey) - stateID, err := s.ipfsPublisher.PublishStateNode(res.node, par.headerID) + stateID, err := s.ipfsPublisher.PublishStateNode(res.node, headerID) if err != nil { return err } @@ -193,7 +182,7 @@ func (s *Service) processNode(it trie.NodeIterator, par workerParams) error { } case Extension, Branch: res.node.Key = common.BytesToHash([]byte{}) - if _, err := s.ipfsPublisher.PublishStateNode(res.node, par.headerID); err != nil { + if _, err := s.ipfsPublisher.PublishStateNode(res.node, headerID); err != nil { return err } default: @@ -202,62 +191,42 @@ func (s *Service) processNode(it trie.NodeIterator, par workerParams) error { return nil } -func (s *Service) createSnapshot(it trie.NodeIterator, par workerParams) error { - if par.waitGroup != nil { - defer par.waitGroup.Done() - } +func (s *Service) createSnapshot(it iter.NodeIterator, headerID int64) error { for it.Next(true) { - if err := s.processNode(it, par); err != nil { + if err := s.processNode(it, headerID); err != nil { return err } } return it.Error() } -// Iterate sequentially up to spawnDepth, then create workers to snapshot the subtrees -func (s *Service) processNodeAsync(it trie.NodeIterator, depth int, par workerParams) error { - fmt.Printf(" [&, %v]: %v\n", depth, it.Path()) +// Full-trie snapshot using goroutines +func (s *Service) createSnapshotAsync(tree state.Trie, headerID int64, depth int) error { + errors := make(chan error) + finished := make(chan bool) + var wg sync.WaitGroup - if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves - return nil - } - if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { - return nil - } - res, err := resolveNode(it, par.trieDB) - if err != nil { + iter.VisitSubtries(tree, depth, func (it iter.NodeIterator) { + wg.Add(1) + go func() { + defer wg.Done() + if err := s.createSnapshot(it, headerID); err != nil { + errors <- err + } + }() + }) + + go func() { + defer close(finished) + wg.Wait() + }() + + select { + case <-finished: + break + case err := <-errors: return err } - switch res.node.NodeType { - case Leaf: - depth = spawnDepth // if it's a leaf, just spawn early - case Extension, Branch: - depth += 1 - default: - return errors.New("unexpected node type") - } - - // get a new iterator for the child - // XXX - t, err := s.stateDB.OpenTrie(it.Hash()) - return s.createSnapshotAsync(t.NodeIterator(nil), depth, par) -} - -func (s *Service) createSnapshotAsync(it trie.NodeIterator, depth int, par workerParams) error { - if depth >= spawnDepth { - print(" worker handoff\n") - par.waitGroup.Add(1) - go s.createSnapshot(it, par) - return nil - } - - // shallow traversal so we can control the depth (?) - for it.Next(true) { - err := s.processNodeAsync(it, depth, par) - if err != nil { - return err - } - } return nil }