This commit is contained in:
Roy Crihfield 2022-01-10 18:06:29 -06:00
parent 317b277939
commit 1f21a65478
2 changed files with 51 additions and 28 deletions

View File

@ -46,13 +46,12 @@ func stateSnapshot() {
} }
height := viper.GetInt64("snapshot.blockHeight") height := viper.GetInt64("snapshot.blockHeight")
workers := viper.GetUint("snapshot.workers") workers := viper.GetUint("snapshot.workers")
params := snapshot.SnapshotParams{Workers: workers}
if height < 0 { if height < 0 {
if err := snapshotService.CreateLatestSnapshot(params); err != nil { if err := snapshotService.CreateLatestSnapshot(workers); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
} else { } else {
params.Height = uint64(height) params := snapshot.SnapshotParams{Workers: workers, Height: uint64(height)}
if err := snapshotService.CreateSnapshot(params); err != nil { if err := snapshotService.CreateSnapshot(params); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -120,36 +121,55 @@ func (s *Service) CreateLatestSnapshot(workers uint) error {
} }
type nodeResult struct { type nodeResult struct {
node Node node node
elements []interface{} elements []interface{}
} }
func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) {
nodePath := make([]byte, len(it.Path())) path := make([]byte, len(it.Path()))
copy(nodePath, it.Path()) copy(path, it.Path())
node, err := trieDB.Node(it.Hash()) n, err := trieDB.Node(it.Hash())
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodeElements []interface{} var elements []interface{}
if err := rlp.DecodeBytes(node, &nodeElements); err != nil { if err := rlp.DecodeBytes(n, &elements); err != nil {
return nil, err return nil, err
} }
ty, err := CheckKeyType(nodeElements) ty, err := CheckKeyType(elements)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &nodeResult{ return &nodeResult{
node: Node{ node: node{
NodeType: ty, nodeType: ty,
Path: nodePath, path: path,
Value: node, value: n,
}, },
elements: nodeElements, elements: elements,
}, nil }, nil
} }
func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error { func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
tx, err := s.ipfsPublisher.db.Beginx()
if err != nil {
return err
}
go s.ipfsPublisher.logNodeCounters()
defer func() {
logrus.Info("----- final counts -----")
s.ipfsPublisher.printNodeCounters()
if rec := recover(); rec != nil {
shared.Rollback(tx)
panic(rec)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
for it.Next(true) { for it.Next(true) {
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
return nil return nil
@ -167,21 +187,21 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
if err != nil { if err != nil {
return err return err
} }
switch res.node.NodeType { switch res.node.nodeType {
case leaf: case leaf:
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any
// var account snapshot.Account // var account snapshot.Account
var account types.StateAccount var account types.StateAccount
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
return fmt.Errorf( return fmt.Errorf(
"error decoding account for leaf node at path %x nerror: %v", res.node.Path, err) "error decoding account for leaf node at path %x nerror: %v", res.node.path, err)
} }
partialPath := trie.CompactToHex(res.elements[0].([]byte)) partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...) valueNodePath := append(res.node.path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey) res.node.key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(res.node, headerID) stateID, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx)
if err != nil { if err != nil {
return err return err
} }
@ -204,8 +224,8 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
} }
case extension, branch: case extension, branch:
stateNode.key = common.BytesToHash([]byte{}) res.node.key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx); err != nil { if _, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx); err != nil {
return err return err
} }
default: default:
@ -266,30 +286,34 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if res == nil {continue}
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize) tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodeData []byte
nodeData, err = s.stateDB.TrieDB().Node(it.Hash()) nodeData, err = s.stateDB.TrieDB().Node(it.Hash())
if err != nil { if err != nil {
return nil, err return nil, err
} }
switch res.node.NodeType { res.node.value = nodeData
switch res.node.nodeType {
case leaf: case leaf:
partialPath := trie.CompactToHex(res.elements[0].([]byte)) partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...) valueNodePath := append(res.node.path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey) res.node.key = common.BytesToHash(leafKey)
case extension, branch: case extension, branch:
res.node.Key = common.BytesToHash([]byte{}) res.node.key = common.BytesToHash([]byte{})
default: default:
return nil, errors.New("unexpected node type") return nil, errors.New("unexpected node type")
} }
if err = s.ipfsPublisher.PublishStorageNode(res.node, stateID); err != nil { if err = s.ipfsPublisher.PublishStorageNode(&res.node, stateID, tx); err != nil {
return err return nil, err
} }
} }