diff --git a/cmd/stateSnapshot.go b/cmd/stateSnapshot.go index d16df12..e48fdb2 100644 --- a/cmd/stateSnapshot.go +++ b/cmd/stateSnapshot.go @@ -37,20 +37,21 @@ var stateSnapshotCmd = &cobra.Command{ } func stateSnapshot() { - snapConfig := snapshot.Config{} - snapConfig.Init() - snapshotService, err := snapshot.NewSnapshotService(snapConfig) + serviceConfig := snapshot.ServiceConfig{} + serviceConfig.Init() + snapshotService, err := snapshot.NewSnapshotService(serviceConfig) if err != nil { logWithCommand.Fatal(err) } height := viper.GetInt64("snapshot.blockHeight") + doAsync := viper.GetBool("snapshot.async") if height < 0 { - if err := snapshotService.CreateLatestSnapshot(); err != nil { + if err := snapshotService.CreateLatestSnapshot(doAsync); err != nil { logWithCommand.Fatal(err) } } else { - uHeight := uint64(height) - if err := snapshotService.CreateSnapshot(uHeight); err != nil { + params := snapshot.SnapshotParams{Height: uint64(height), Async: doAsync} + if err := snapshotService.CreateSnapshot(params); err != nil { logWithCommand.Fatal(err) } } @@ -63,8 +64,10 @@ func init() { stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore") stateSnapshotCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore") stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at") + stateSnapshotCmd.PersistentFlags().Bool("async", false, "use the async iterator") viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path")) viper.BindPFlag("leveldb.ancient", stateSnapshotCmd.PersistentFlags().Lookup("ancient-path")) viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height")) + viper.BindPFlag("snapshot.async", stateSnapshotCmd.PersistentFlags().Lookup("async")) } diff --git a/pkg/snapshot/config.go b/pkg/snapshot/config.go index 7c3a90e..e5f6ade 100644 --- a/pkg/snapshot/config.go +++ b/pkg/snapshot/config.go @@ -31,14 +31,14 @@ const ( LVL_DB_PATH = "LVL_DB_PATH" ) -type Config struct { +type ServiceConfig struct { LevelDBPath string AncientDBPath string Node core.Node DBConfig config.Database } -func (c *Config) Init() { +func (c *ServiceConfig) Init() { c.DBConfig.Init() viper.BindEnv("leveldb.path", LVL_DB_PATH) viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index 40407fb..d831338 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -19,6 +19,7 @@ import ( "bytes" "errors" "fmt" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -37,6 +38,7 @@ var ( emptyNode, _ = rlp.EncodeToBytes([]byte{}) emptyCodeHash = crypto.Keccak256([]byte{}) emptyContractRoot = crypto.Keccak256Hash(emptyNode) + spawnDepth = 2 ) type Service struct { @@ -45,7 +47,8 @@ type Service struct { ipfsPublisher *Publisher } -func NewSnapshotService(con Config) (*Service, error) { + +func NewSnapshotService(con ServiceConfig) (*Service, error) { pgdb, err := postgres.NewDB(con.DBConfig, con.Node) if err != nil { return nil, err @@ -61,121 +64,203 @@ func NewSnapshotService(con Config) (*Service, error) { }, nil } -func (s *Service) CreateLatestSnapshot() error { +type SnapshotParams struct { + Height uint64 + Async bool +} + +type workerParams struct { + trieDB *trie.Database + headerID int64 + waitGroup *sync.WaitGroup +} + +func (s *Service) CreateSnapshot(par SnapshotParams) error { // extract header from lvldb and publish to PG-IPFS // hold onto the headerID so that we can link the state nodes to this header + logrus.Infof("Creating snapshot at height %d", par.Height) + hash := rawdb.ReadCanonicalHash(s.ethDB, par.Height) + header := rawdb.ReadHeader(s.ethDB, hash, par.Height) + if header == nil { + return fmt.Errorf("unable to read canonical header at height %d", par.Height) + } + headerID, err := s.ipfsPublisher.PublishHeader(header) + if err != nil { + return err + } + t, err := s.stateDB.OpenTrie(header.Root) + if err != nil { + return err + } + trieDB := s.stateDB.TrieDB() + params := workerParams{trieDB: trieDB, headerID: headerID} + if par.Async { + var wg sync.WaitGroup + params.waitGroup = &wg + return s.createSnapshotAsync(t.NodeIterator([]byte{}), 0, params) + wg.Wait() + } else { + return s.createSnapshot(t.NodeIterator([]byte{}), params) + } + return nil +} + +// Create snapshot up to head (ignores height param) +func (s *Service) CreateLatestSnapshot(doAsync bool) error { logrus.Info("Creating snapshot at head") hash := rawdb.ReadHeadHeaderHash(s.ethDB) height := rawdb.ReadHeaderNumber(s.ethDB, hash) if height == nil { return fmt.Errorf("unable to read header height for header hash %s", hash.String()) } - header := rawdb.ReadHeader(s.ethDB, hash, *height) - if header == nil { - return fmt.Errorf("unable to read canonical header at height %d", height) - } - logrus.Infof("head hash: %s head height: %d", hash.Hex(), *height) - headerID, err := s.ipfsPublisher.PublishHeader(header) - if err != nil { - return err - } - t, err := s.stateDB.OpenTrie(header.Root) - if err != nil { - return err - } - trieDB := s.stateDB.TrieDB() - return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) + return s.CreateSnapshot(SnapshotParams{Height: *height, Async: doAsync}) } -func (s *Service) CreateSnapshot(height uint64) error { - // extract header from lvldb and publish to PG-IPFS - // hold onto the headerID so that we can link the state nodes to this header - logrus.Infof("Creating snapshot at height %d", height) - hash := rawdb.ReadCanonicalHash(s.ethDB, height) - header := rawdb.ReadHeader(s.ethDB, hash, height) - if header == nil { - return fmt.Errorf("unable to read canonical header at height %d", height) - } - headerID, err := s.ipfsPublisher.PublishHeader(header) - if err != nil { - return err - } - t, err := s.stateDB.OpenTrie(header.Root) - if err != nil { - return err - } - trieDB := s.stateDB.TrieDB() - return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) +// cache the elements +type nodeResult struct { + node Node + elements []interface{} } -func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error { - for it.Next(true) { - if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves - continue - } - if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { - continue - } - nodePath := make([]byte, len(it.Path())) - copy(nodePath, it.Path()) - node, err := trieDB.Node(it.Hash()) - if err != nil { - return err - } - var nodeElements []interface{} - if err := rlp.DecodeBytes(node, &nodeElements); err != nil { - return err - } - ty, err := CheckKeyType(nodeElements) - if err != nil { - return err - } - stateNode := Node{ +func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { + nodePath := make([]byte, len(it.Path())) + copy(nodePath, it.Path()) + node, err := trieDB.Node(it.Hash()) + if err != nil { + return nil, err + } + var nodeElements []interface{} + if err := rlp.DecodeBytes(node, &nodeElements); err != nil { + return nil, err + } + ty, err := CheckKeyType(nodeElements) + if err != nil { + return nil, err + } + return &nodeResult{ + node: Node{ NodeType: ty, Path: nodePath, Value: node, + }, + elements: nodeElements, + }, nil +} + + +func (s *Service) processNode(it trie.NodeIterator, par workerParams) error { + fmt.Printf(" [!]: %v\n", it.Path()) + + if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves + return nil + } + if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { + return nil + } + res, err := resolveNode(it, par.trieDB) + if err != nil { + return err + } + switch res.node.NodeType { + case Leaf: + // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any + var account state.Account + if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { + return fmt.Errorf( + "error decoding account for leaf node at path %x nerror: %v", res.node.Path, err) } - switch ty { - case Leaf: - // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any - var account state.Account - if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil { - return fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", nodePath, err) - } - partialPath := trie.CompactToHex(nodeElements[0].([]byte)) - valueNodePath := append(nodePath, partialPath...) - encodedPath := trie.HexToCompact(valueNodePath) - leafKey := encodedPath[1:] - stateNode.Key = common.BytesToHash(leafKey) - stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID) + partialPath := trie.CompactToHex(res.elements[0].([]byte)) + valueNodePath := append(res.node.Path, partialPath...) + encodedPath := trie.HexToCompact(valueNodePath) + leafKey := encodedPath[1:] + res.node.Key = common.BytesToHash(leafKey) + stateID, err := s.ipfsPublisher.PublishStateNode(res.node, par.headerID) + if err != nil { + return err + } + // publish any non-nil code referenced by codehash + if !bytes.Equal(account.CodeHash, emptyCodeHash) { + codeBytes, err := s.ethDB.Get(account.CodeHash) if err != nil { return err } - // publish any non-nil code referenced by codehash - if !bytes.Equal(account.CodeHash, emptyCodeHash) { - codeBytes, err := s.ethDB.Get(account.CodeHash) - if err != nil { - return err - } - if err := s.ipfsPublisher.PublishCode(codeBytes); err != nil { - return err - } - } - if err := s.storageSnapshot(account.Root, stateID); err != nil { - return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %v", account, err) - } - case Extension, Branch: - stateNode.Key = common.BytesToHash([]byte{}) - if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID); err != nil { + if err := s.ipfsPublisher.PublishCode(codeBytes); err != nil { return err } - default: - return errors.New("unexpected node type") + } + if err := s.storageSnapshot(account.Root, stateID); err != nil { + return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %v", account, err) + } + case Extension, Branch: + res.node.Key = common.BytesToHash([]byte{}) + if _, err := s.ipfsPublisher.PublishStateNode(res.node, par.headerID); err != nil { + return err + } + default: + return errors.New("unexpected node type") + } + return nil +} + +func (s *Service) createSnapshot(it trie.NodeIterator, par workerParams) error { + if par.waitGroup != nil { + defer par.waitGroup.Done() + } + for it.Next(true) { + if err := s.processNode(it, par); err != nil { + return err } } return it.Error() } +// Iterate sequentially up to spawnDepth, then create workers to snapshot the subtrees +func (s *Service) processNodeAsync(it trie.NodeIterator, depth int, par workerParams) error { + fmt.Printf(" [&, %v]: %v\n", depth, it.Path()) + + if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves + return nil + } + if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { + return nil + } + res, err := resolveNode(it, par.trieDB) + if err != nil { + return err + } + switch res.node.NodeType { + case Leaf: + depth = spawnDepth // if it's a leaf, just spawn early + case Extension, Branch: + depth += 1 + default: + return errors.New("unexpected node type") + } + + // get a new iterator for the child + // XXX + t, err := s.stateDB.OpenTrie(it.Hash()) + return s.createSnapshotAsync(t.NodeIterator(nil), depth, par) +} + +func (s *Service) createSnapshotAsync(it trie.NodeIterator, depth int, par workerParams) error { + if depth >= spawnDepth { + print(" worker handoff\n") + par.waitGroup.Add(1) + go s.createSnapshot(it, par) + return nil + } + + // shallow traversal so we can control the depth (?) + for it.Next(true) { + err := s.processNodeAsync(it, depth, par) + if err != nil { + return err + } + } + return nil +} + func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { return nil @@ -193,38 +278,23 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error { if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { continue } - nodePath := make([]byte, len(it.Path())) - copy(nodePath, it.Path()) - node, err := s.stateDB.TrieDB().Node(it.Hash()) + res, err := resolveNode(it, s.stateDB.TrieDB()) if err != nil { return err } - var nodeElements []interface{} - if err := rlp.DecodeBytes(node, &nodeElements); err != nil { - return err - } - ty, err := CheckKeyType(nodeElements) - if err != nil { - return err - } - storageNode := Node{ - NodeType: ty, - Path: nodePath, - Value: node, - } - switch ty { + switch res.node.NodeType { case Leaf: - partialPath := trie.CompactToHex(nodeElements[0].([]byte)) - valueNodePath := append(nodePath, partialPath...) + partialPath := trie.CompactToHex(res.elements[0].([]byte)) + valueNodePath := append(res.node.Path, partialPath...) encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] - storageNode.Key = common.BytesToHash(leafKey) + res.node.Key = common.BytesToHash(leafKey) case Extension, Branch: - storageNode.Key = common.BytesToHash([]byte{}) + res.node.Key = common.BytesToHash([]byte{}) default: return errors.New("unexpected node type") } - if err := s.ipfsPublisher.PublishStorageNode(storageNode, stateID); err != nil { + if err := s.ipfsPublisher.PublishStorageNode(res.node, stateID); err != nil { return err } }