[wip] async traversal

This commit is contained in:
Roy Crihfield 2020-08-20 05:23:36 -05:00
parent 36c783a479
commit 49bd60d9f1
3 changed files with 193 additions and 120 deletions

View File

@ -37,20 +37,21 @@ var stateSnapshotCmd = &cobra.Command{
} }
func stateSnapshot() { func stateSnapshot() {
snapConfig := snapshot.Config{} serviceConfig := snapshot.ServiceConfig{}
snapConfig.Init() serviceConfig.Init()
snapshotService, err := snapshot.NewSnapshotService(snapConfig) snapshotService, err := snapshot.NewSnapshotService(serviceConfig)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
height := viper.GetInt64("snapshot.blockHeight") height := viper.GetInt64("snapshot.blockHeight")
doAsync := viper.GetBool("snapshot.async")
if height < 0 { if height < 0 {
if err := snapshotService.CreateLatestSnapshot(); err != nil { if err := snapshotService.CreateLatestSnapshot(doAsync); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
} else { } else {
uHeight := uint64(height) params := snapshot.SnapshotParams{Height: uint64(height), Async: doAsync}
if err := snapshotService.CreateSnapshot(uHeight); err != nil { if err := snapshotService.CreateSnapshot(params); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
} }
@ -63,8 +64,10 @@ func init() {
stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore") stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore")
stateSnapshotCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore") stateSnapshotCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore")
stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at") stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at")
stateSnapshotCmd.PersistentFlags().Bool("async", false, "use the async iterator")
viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path")) viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path"))
viper.BindPFlag("leveldb.ancient", stateSnapshotCmd.PersistentFlags().Lookup("ancient-path")) viper.BindPFlag("leveldb.ancient", stateSnapshotCmd.PersistentFlags().Lookup("ancient-path"))
viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height")) viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height"))
viper.BindPFlag("snapshot.async", stateSnapshotCmd.PersistentFlags().Lookup("async"))
} }

View File

@ -31,14 +31,14 @@ const (
LVL_DB_PATH = "LVL_DB_PATH" LVL_DB_PATH = "LVL_DB_PATH"
) )
type Config struct { type ServiceConfig struct {
LevelDBPath string LevelDBPath string
AncientDBPath string AncientDBPath string
Node core.Node Node core.Node
DBConfig config.Database DBConfig config.Database
} }
func (c *Config) Init() { func (c *ServiceConfig) Init() {
c.DBConfig.Init() c.DBConfig.Init()
viper.BindEnv("leveldb.path", LVL_DB_PATH) viper.BindEnv("leveldb.path", LVL_DB_PATH)
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)

View File

@ -19,6 +19,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
@ -37,6 +38,7 @@ var (
emptyNode, _ = rlp.EncodeToBytes([]byte{}) emptyNode, _ = rlp.EncodeToBytes([]byte{})
emptyCodeHash = crypto.Keccak256([]byte{}) emptyCodeHash = crypto.Keccak256([]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode) emptyContractRoot = crypto.Keccak256Hash(emptyNode)
spawnDepth = 2
) )
type Service struct { type Service struct {
@ -45,7 +47,8 @@ type Service struct {
ipfsPublisher *Publisher ipfsPublisher *Publisher
} }
func NewSnapshotService(con Config) (*Service, error) {
func NewSnapshotService(con ServiceConfig) (*Service, error) {
pgdb, err := postgres.NewDB(con.DBConfig, con.Node) pgdb, err := postgres.NewDB(con.DBConfig, con.Node)
if err != nil { if err != nil {
return nil, err return nil, err
@ -61,93 +64,117 @@ func NewSnapshotService(con Config) (*Service, error) {
}, nil }, nil
} }
func (s *Service) CreateLatestSnapshot() error { type SnapshotParams struct {
Height uint64
Async bool
}
type workerParams struct {
trieDB *trie.Database
headerID int64
waitGroup *sync.WaitGroup
}
func (s *Service) CreateSnapshot(par SnapshotParams) error {
// extract header from lvldb and publish to PG-IPFS // extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header // hold onto the headerID so that we can link the state nodes to this header
logrus.Infof("Creating snapshot at height %d", par.Height)
hash := rawdb.ReadCanonicalHash(s.ethDB, par.Height)
header := rawdb.ReadHeader(s.ethDB, hash, par.Height)
if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", par.Height)
}
headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil {
return err
}
t, err := s.stateDB.OpenTrie(header.Root)
if err != nil {
return err
}
trieDB := s.stateDB.TrieDB()
params := workerParams{trieDB: trieDB, headerID: headerID}
if par.Async {
var wg sync.WaitGroup
params.waitGroup = &wg
return s.createSnapshotAsync(t.NodeIterator([]byte{}), 0, params)
wg.Wait()
} else {
return s.createSnapshot(t.NodeIterator([]byte{}), params)
}
return nil
}
// Create snapshot up to head (ignores height param)
func (s *Service) CreateLatestSnapshot(doAsync bool) error {
logrus.Info("Creating snapshot at head") logrus.Info("Creating snapshot at head")
hash := rawdb.ReadHeadHeaderHash(s.ethDB) hash := rawdb.ReadHeadHeaderHash(s.ethDB)
height := rawdb.ReadHeaderNumber(s.ethDB, hash) height := rawdb.ReadHeaderNumber(s.ethDB, hash)
if height == nil { if height == nil {
return fmt.Errorf("unable to read header height for header hash %s", hash.String()) return fmt.Errorf("unable to read header height for header hash %s", hash.String())
} }
header := rawdb.ReadHeader(s.ethDB, hash, *height) return s.CreateSnapshot(SnapshotParams{Height: *height, Async: doAsync})
if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", height)
}
logrus.Infof("head hash: %s head height: %d", hash.Hex(), *height)
headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil {
return err
}
t, err := s.stateDB.OpenTrie(header.Root)
if err != nil {
return err
}
trieDB := s.stateDB.TrieDB()
return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID)
} }
func (s *Service) CreateSnapshot(height uint64) error { // cache the elements
// extract header from lvldb and publish to PG-IPFS type nodeResult struct {
// hold onto the headerID so that we can link the state nodes to this header node Node
logrus.Infof("Creating snapshot at height %d", height) elements []interface{}
hash := rawdb.ReadCanonicalHash(s.ethDB, height)
header := rawdb.ReadHeader(s.ethDB, hash, height)
if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", height)
}
headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil {
return err
}
t, err := s.stateDB.OpenTrie(header.Root)
if err != nil {
return err
}
trieDB := s.stateDB.TrieDB()
return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID)
} }
func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error { func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) {
for it.Next(true) {
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
continue
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue
}
nodePath := make([]byte, len(it.Path())) nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path()) copy(nodePath, it.Path())
node, err := trieDB.Node(it.Hash()) node, err := trieDB.Node(it.Hash())
if err != nil { if err != nil {
return err return nil, err
} }
var nodeElements []interface{} var nodeElements []interface{}
if err := rlp.DecodeBytes(node, &nodeElements); err != nil { if err := rlp.DecodeBytes(node, &nodeElements); err != nil {
return err return nil, err
} }
ty, err := CheckKeyType(nodeElements) ty, err := CheckKeyType(nodeElements)
if err != nil { if err != nil {
return err return nil, err
} }
stateNode := Node{ return &nodeResult{
node: Node{
NodeType: ty, NodeType: ty,
Path: nodePath, Path: nodePath,
Value: node, Value: node,
},
elements: nodeElements,
}, nil
}
func (s *Service) processNode(it trie.NodeIterator, par workerParams) error {
fmt.Printf(" [!]: %v\n", it.Path())
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
return nil
} }
switch ty { if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
return nil
}
res, err := resolveNode(it, par.trieDB)
if err != nil {
return err
}
switch res.node.NodeType {
case Leaf: case Leaf:
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any
var account state.Account var account state.Account
if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil { if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
return fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", nodePath, err) return fmt.Errorf(
"error decoding account for leaf node at path %x nerror: %v", res.node.Path, err)
} }
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(nodePath, partialPath...) valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
stateNode.Key = common.BytesToHash(leafKey) res.node.Key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID) stateID, err := s.ipfsPublisher.PublishStateNode(res.node, par.headerID)
if err != nil { if err != nil {
return err return err
} }
@ -165,17 +192,75 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %v", account, err) return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %v", account, err)
} }
case Extension, Branch: case Extension, Branch:
stateNode.Key = common.BytesToHash([]byte{}) res.node.Key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID); err != nil { if _, err := s.ipfsPublisher.PublishStateNode(res.node, par.headerID); err != nil {
return err return err
} }
default: default:
return errors.New("unexpected node type") return errors.New("unexpected node type")
} }
return nil
}
func (s *Service) createSnapshot(it trie.NodeIterator, par workerParams) error {
if par.waitGroup != nil {
defer par.waitGroup.Done()
}
for it.Next(true) {
if err := s.processNode(it, par); err != nil {
return err
}
} }
return it.Error() return it.Error()
} }
// Iterate sequentially up to spawnDepth, then create workers to snapshot the subtrees
func (s *Service) processNodeAsync(it trie.NodeIterator, depth int, par workerParams) error {
fmt.Printf(" [&, %v]: %v\n", depth, it.Path())
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
return nil
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
return nil
}
res, err := resolveNode(it, par.trieDB)
if err != nil {
return err
}
switch res.node.NodeType {
case Leaf:
depth = spawnDepth // if it's a leaf, just spawn early
case Extension, Branch:
depth += 1
default:
return errors.New("unexpected node type")
}
// get a new iterator for the child
// XXX
t, err := s.stateDB.OpenTrie(it.Hash())
return s.createSnapshotAsync(t.NodeIterator(nil), depth, par)
}
func (s *Service) createSnapshotAsync(it trie.NodeIterator, depth int, par workerParams) error {
if depth >= spawnDepth {
print(" worker handoff\n")
par.waitGroup.Add(1)
go s.createSnapshot(it, par)
return nil
}
// shallow traversal so we can control the depth (?)
for it.Next(true) {
err := s.processNodeAsync(it, depth, par)
if err != nil {
return err
}
}
return nil
}
func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error { func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil return nil
@ -193,38 +278,23 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error {
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue continue
} }
nodePath := make([]byte, len(it.Path())) res, err := resolveNode(it, s.stateDB.TrieDB())
copy(nodePath, it.Path())
node, err := s.stateDB.TrieDB().Node(it.Hash())
if err != nil { if err != nil {
return err return err
} }
var nodeElements []interface{} switch res.node.NodeType {
if err := rlp.DecodeBytes(node, &nodeElements); err != nil {
return err
}
ty, err := CheckKeyType(nodeElements)
if err != nil {
return err
}
storageNode := Node{
NodeType: ty,
Path: nodePath,
Value: node,
}
switch ty {
case Leaf: case Leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(nodePath, partialPath...) valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
storageNode.Key = common.BytesToHash(leafKey) res.node.Key = common.BytesToHash(leafKey)
case Extension, Branch: case Extension, Branch:
storageNode.Key = common.BytesToHash([]byte{}) res.node.Key = common.BytesToHash([]byte{})
default: default:
return errors.New("unexpected node type") return errors.New("unexpected node type")
} }
if err := s.ipfsPublisher.PublishStorageNode(storageNode, stateID); err != nil { if err := s.ipfsPublisher.PublishStorageNode(res.node, stateID); err != nil {
return err return err
} }
} }