From 1f21a6547855578c3244c56c1c9f0d204e338e32 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 10 Jan 2022 18:06:29 -0600 Subject: [PATCH] fixes --- cmd/stateSnapshot.go | 5 ++- pkg/snapshot/service.go | 74 +++++++++++++++++++++++++++-------------- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/cmd/stateSnapshot.go b/cmd/stateSnapshot.go index dc81439..d98bbd0 100644 --- a/cmd/stateSnapshot.go +++ b/cmd/stateSnapshot.go @@ -46,13 +46,12 @@ func stateSnapshot() { } height := viper.GetInt64("snapshot.blockHeight") workers := viper.GetUint("snapshot.workers") - params := snapshot.SnapshotParams{Workers: workers} if height < 0 { - if err := snapshotService.CreateLatestSnapshot(params); err != nil { + if err := snapshotService.CreateLatestSnapshot(workers); err != nil { logWithCommand.Fatal(err) } } else { - params.Height = uint64(height) + params := snapshot.SnapshotParams{Workers: workers, Height: uint64(height)} if err := snapshotService.CreateSnapshot(params); err != nil { logWithCommand.Fatal(err) } diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index ede7843..4fbad8f 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/trie" "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" @@ -120,36 +121,55 @@ func (s *Service) CreateLatestSnapshot(workers uint) error { } type nodeResult struct { - node Node + node node elements []interface{} } func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { - nodePath := make([]byte, len(it.Path())) - copy(nodePath, it.Path()) - node, err := trieDB.Node(it.Hash()) + path := make([]byte, len(it.Path())) + copy(path, it.Path()) + n, err := trieDB.Node(it.Hash()) if err != nil { return nil, err } - var nodeElements []interface{} - if err := rlp.DecodeBytes(node, &nodeElements); err != nil { + var elements []interface{} + if err := rlp.DecodeBytes(n, &elements); err != nil { return nil, err } - ty, err := CheckKeyType(nodeElements) + ty, err := CheckKeyType(elements) if err != nil { return nil, err } return &nodeResult{ - node: Node{ - NodeType: ty, - Path: nodePath, - Value: node, + node: node{ + nodeType: ty, + path: path, + value: n, }, - elements: nodeElements, + elements: elements, }, nil } func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error { + tx, err := s.ipfsPublisher.db.Beginx() + if err != nil { + return err + } + + go s.ipfsPublisher.logNodeCounters() + defer func() { + logrus.Info("----- final counts -----") + s.ipfsPublisher.printNodeCounters() + if rec := recover(); rec != nil { + shared.Rollback(tx) + panic(rec) + } else if err != nil { + shared.Rollback(tx) + } else { + err = tx.Commit() + } + }() + for it.Next(true) { if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves return nil @@ -167,21 +187,21 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error { if err != nil { return err } - switch res.node.NodeType { + switch res.node.nodeType { case leaf: // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any // var account snapshot.Account var account types.StateAccount if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { return fmt.Errorf( - "error decoding account for leaf node at path %x nerror: %v", res.node.Path, err) + "error decoding account for leaf node at path %x nerror: %v", res.node.path, err) } partialPath := trie.CompactToHex(res.elements[0].([]byte)) - valueNodePath := append(res.node.Path, partialPath...) + valueNodePath := append(res.node.path, partialPath...) encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] - res.node.Key = common.BytesToHash(leafKey) - stateID, err := s.ipfsPublisher.PublishStateNode(res.node, headerID) + res.node.key = common.BytesToHash(leafKey) + stateID, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx) if err != nil { return err } @@ -204,8 +224,8 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error { return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) } case extension, branch: - stateNode.key = common.BytesToHash([]byte{}) - if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx); err != nil { + res.node.key = common.BytesToHash([]byte{}) + if _, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx); err != nil { return err } default: @@ -266,30 +286,34 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (* if err != nil { return nil, err } + // if res == nil {continue} tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize) if err != nil { return nil, err } + var nodeData []byte nodeData, err = s.stateDB.TrieDB().Node(it.Hash()) if err != nil { return nil, err } - switch res.node.NodeType { + res.node.value = nodeData + + switch res.node.nodeType { case leaf: partialPath := trie.CompactToHex(res.elements[0].([]byte)) - valueNodePath := append(res.node.Path, partialPath...) + valueNodePath := append(res.node.path, partialPath...) encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] - res.node.Key = common.BytesToHash(leafKey) + res.node.key = common.BytesToHash(leafKey) case extension, branch: - res.node.Key = common.BytesToHash([]byte{}) + res.node.key = common.BytesToHash([]byte{}) default: return nil, errors.New("unexpected node type") } - if err = s.ipfsPublisher.PublishStorageNode(res.node, stateID); err != nil { - return err + if err = s.ipfsPublisher.PublishStorageNode(&res.node, stateID, tx); err != nil { + return nil, err } }