[wip] use new state iterator

This commit is contained in:
Roy Crihfield 2020-08-22 23:38:31 -05:00
parent 072cd0bcdb
commit f651864ff8
4 changed files with 55 additions and 81 deletions

View File

@ -24,4 +24,5 @@ Config format:
[snapshot] [snapshot]
blockHeight = 0 blockHeight = 0
divideDepth = 1
``` ```

View File

@ -44,13 +44,14 @@ func stateSnapshot() {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
height := viper.GetInt64("snapshot.blockHeight") height := viper.GetInt64("snapshot.blockHeight")
doAsync := viper.GetBool("snapshot.async") divideDepth := viper.GetInt("snapshot.divideDepth")
params := snapshot.SnapshotParams{DivideDepth: divideDepth}
if height < 0 { if height < 0 {
if err := snapshotService.CreateLatestSnapshot(doAsync); err != nil { if err := snapshotService.CreateLatestSnapshot(params); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
} else { } else {
params := snapshot.SnapshotParams{Height: uint64(height), Async: doAsync} params.Height = uint64(height)
if err := snapshotService.CreateSnapshot(params); err != nil { if err := snapshotService.CreateSnapshot(params); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
@ -64,10 +65,10 @@ func init() {
stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore") stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore")
stateSnapshotCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore") stateSnapshotCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore")
stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at") stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at")
stateSnapshotCmd.PersistentFlags().Bool("async", false, "use the async iterator") stateSnapshotCmd.PersistentFlags().Int("divide-depth", 0, "trie depth to divide concurrent work at")
viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path")) viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path"))
viper.BindPFlag("leveldb.ancient", stateSnapshotCmd.PersistentFlags().Lookup("ancient-path")) viper.BindPFlag("leveldb.ancient", stateSnapshotCmd.PersistentFlags().Lookup("ancient-path"))
viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height")) viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height"))
viper.BindPFlag("snapshot.async", stateSnapshotCmd.PersistentFlags().Lookup("async")) viper.BindPFlag("snapshot.divideDepth", stateSnapshotCmd.PersistentFlags().Lookup("divide-depth"))
} }

3
go.mod
View File

@ -10,7 +10,10 @@ require (
github.com/sirupsen/logrus v1.6.0 github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.0.0 github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.0 github.com/spf13/viper v1.7.0
github.com/vulcanize/go-eth-state-node-iterator v0.1.0
github.com/vulcanize/ipfs-blockchain-watcher v0.0.11-alpha github.com/vulcanize/ipfs-blockchain-watcher v0.0.11-alpha
) )
replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2 replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2
replace github.com/vulcanize/go-eth-state-node-iterator v0.1.0 => /home/roy/vulcanize/go-eth-state-node-iterator

View File

@ -31,6 +31,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
iter "github.com/vulcanize/go-eth-state-node-iterator/pkg/iterator"
) )
var ( var (
@ -66,54 +67,45 @@ func NewSnapshotService(con ServiceConfig) (*Service, error) {
type SnapshotParams struct { type SnapshotParams struct {
Height uint64 Height uint64
Async bool DivideDepth int
} }
type workerParams struct { func (s *Service) CreateSnapshot(params SnapshotParams) error {
trieDB *trie.Database
headerID int64
waitGroup *sync.WaitGroup
}
func (s *Service) CreateSnapshot(par SnapshotParams) error {
// extract header from lvldb and publish to PG-IPFS // extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header // hold onto the headerID so that we can link the state nodes to this header
logrus.Infof("Creating snapshot at height %d", par.Height) logrus.Infof("Creating snapshot at height %d", params.Height)
hash := rawdb.ReadCanonicalHash(s.ethDB, par.Height) hash := rawdb.ReadCanonicalHash(s.ethDB, params.Height)
header := rawdb.ReadHeader(s.ethDB, hash, par.Height) header := rawdb.ReadHeader(s.ethDB, hash, params.Height)
if header == nil { if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", par.Height) return fmt.Errorf("unable to read canonical header at height %d", params.Height)
} }
headerID, err := s.ipfsPublisher.PublishHeader(header) headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil { if err != nil {
return err return err
} }
t, err := s.stateDB.OpenTrie(header.Root) t, err := s.stateDB.OpenTrie(header.Root)
if err != nil { if err != nil {
return err return err
} }
trieDB := s.stateDB.TrieDB() if params.DivideDepth > 0 {
params := workerParams{trieDB: trieDB, headerID: headerID} return s.createSnapshotAsync(t, headerID, params.DivideDepth)
if par.Async {
var wg sync.WaitGroup
params.waitGroup = &wg
return s.createSnapshotAsync(t.NodeIterator([]byte{}), 0, params)
wg.Wait()
} else { } else {
return s.createSnapshot(t.NodeIterator([]byte{}), params) return s.createSnapshot(t.NodeIterator(nil), headerID)
} }
return nil return nil
} }
// Create snapshot up to head (ignores height param) // Create snapshot up to head (ignores height param)
func (s *Service) CreateLatestSnapshot(doAsync bool) error { func (s *Service) CreateLatestSnapshot(params SnapshotParams) error {
logrus.Info("Creating snapshot at head") logrus.Info("Creating snapshot at head")
hash := rawdb.ReadHeadHeaderHash(s.ethDB) hash := rawdb.ReadHeadHeaderHash(s.ethDB)
height := rawdb.ReadHeaderNumber(s.ethDB, hash) height := rawdb.ReadHeaderNumber(s.ethDB, hash)
if height == nil { if height == nil {
return fmt.Errorf("unable to read header height for header hash %s", hash.String()) return fmt.Errorf("unable to read header height for header hash %s", hash.String())
} }
return s.CreateSnapshot(SnapshotParams{Height: *height, Async: doAsync}) params.Height = *height
return s.CreateSnapshot(params)
} }
// cache the elements // cache the elements
@ -122,7 +114,7 @@ type nodeResult struct {
elements []interface{} elements []interface{}
} }
func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { func resolveNode(it iter.NodeIterator, trieDB *trie.Database) (*nodeResult, error) {
nodePath := make([]byte, len(it.Path())) nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path()) copy(nodePath, it.Path())
node, err := trieDB.Node(it.Hash()) node, err := trieDB.Node(it.Hash())
@ -147,17 +139,14 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro
}, nil }, nil
} }
func (s *Service) processNode(it iter.NodeIterator, headerID int64) error {
func (s *Service) processNode(it trie.NodeIterator, par workerParams) error {
fmt.Printf(" [!]: %v\n", it.Path())
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
return nil return nil
} }
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
return nil return nil
} }
res, err := resolveNode(it, par.trieDB) res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil { if err != nil {
return err return err
} }
@ -174,7 +163,7 @@ func (s *Service) processNode(it trie.NodeIterator, par workerParams) error {
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey) res.node.Key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(res.node, par.headerID) stateID, err := s.ipfsPublisher.PublishStateNode(res.node, headerID)
if err != nil { if err != nil {
return err return err
} }
@ -193,7 +182,7 @@ func (s *Service) processNode(it trie.NodeIterator, par workerParams) error {
} }
case Extension, Branch: case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{}) res.node.Key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(res.node, par.headerID); err != nil { if _, err := s.ipfsPublisher.PublishStateNode(res.node, headerID); err != nil {
return err return err
} }
default: default:
@ -202,62 +191,42 @@ func (s *Service) processNode(it trie.NodeIterator, par workerParams) error {
return nil return nil
} }
func (s *Service) createSnapshot(it trie.NodeIterator, par workerParams) error { func (s *Service) createSnapshot(it iter.NodeIterator, headerID int64) error {
if par.waitGroup != nil {
defer par.waitGroup.Done()
}
for it.Next(true) { for it.Next(true) {
if err := s.processNode(it, par); err != nil { if err := s.processNode(it, headerID); err != nil {
return err return err
} }
} }
return it.Error() return it.Error()
} }
// Iterate sequentially up to spawnDepth, then create workers to snapshot the subtrees // Full-trie snapshot using goroutines
func (s *Service) processNodeAsync(it trie.NodeIterator, depth int, par workerParams) error { func (s *Service) createSnapshotAsync(tree state.Trie, headerID int64, depth int) error {
fmt.Printf(" [&, %v]: %v\n", depth, it.Path()) errors := make(chan error)
finished := make(chan bool)
var wg sync.WaitGroup
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves iter.VisitSubtries(tree, depth, func (it iter.NodeIterator) {
return nil wg.Add(1)
} go func() {
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { defer wg.Done()
return nil if err := s.createSnapshot(it, headerID); err != nil {
} errors <- err
res, err := resolveNode(it, par.trieDB) }
if err != nil { }()
})
go func() {
defer close(finished)
wg.Wait()
}()
select {
case <-finished:
break
case err := <-errors:
return err return err
} }
switch res.node.NodeType {
case Leaf:
depth = spawnDepth // if it's a leaf, just spawn early
case Extension, Branch:
depth += 1
default:
return errors.New("unexpected node type")
}
// get a new iterator for the child
// XXX
t, err := s.stateDB.OpenTrie(it.Hash())
return s.createSnapshotAsync(t.NodeIterator(nil), depth, par)
}
func (s *Service) createSnapshotAsync(it trie.NodeIterator, depth int, par workerParams) error {
if depth >= spawnDepth {
print(" worker handoff\n")
par.waitGroup.Add(1)
go s.createSnapshot(it, par)
return nil
}
// shallow traversal so we can control the depth (?)
for it.Next(true) {
err := s.processNodeAsync(it, depth, par)
if err != nil {
return err
}
}
return nil return nil
} }