From 22ecd4065ad2ba34b6d5f0c71847f33fc993280c Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 12 Apr 2023 13:07:42 -0500 Subject: [PATCH] update service --- pkg/snapshot/file/publisher.go | 4 +- pkg/snapshot/file/publisher_test.go | 19 +- pkg/snapshot/pg/publisher.go | 4 +- pkg/snapshot/pg/publisher_test.go | 18 +- pkg/snapshot/service.go | 347 ++++++++++++++++------------ pkg/snapshot/util.go | 2 +- 6 files changed, 230 insertions(+), 164 deletions(-) diff --git a/pkg/snapshot/file/publisher.go b/pkg/snapshot/file/publisher.go index 943d18f..5604100 100644 --- a/pkg/snapshot/file/publisher.go +++ b/pkg/snapshot/file/publisher.go @@ -229,7 +229,7 @@ func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapT stateNode.HeaderID, stateNode.StateKey, stateNode.CID, - true, + false, stateNode.Balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, @@ -257,7 +257,7 @@ func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel, storageNode.StateKey, storageNode.StorageKey, storageNode.CID, - true, + false, storageNode.Value, false) if err != nil { diff --git a/pkg/snapshot/file/publisher_test.go b/pkg/snapshot/file/publisher_test.go index 0575aa0..1f2d8a1 100644 --- a/pkg/snapshot/file/publisher_test.go +++ b/pkg/snapshot/file/publisher_test.go @@ -9,12 +9,13 @@ import ( "path/filepath" "testing" + "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" fixt "github.com/cerc-io/ipld-eth-state-snapshot/fixture" - snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types" "github.com/cerc-io/ipld-eth-state-snapshot/test" ) @@ -22,12 +23,12 @@ var ( pgConfig = test.DefaultPgConfig nodeInfo = test.DefaultNodeInfo // tables ordered according to fkey depedencies - allTables = []*snapt.Table{ - &snapt.TableIPLDBlock, - &snapt.TableNodeInfo, - &snapt.TableHeader, - &snapt.TableStateNode, - &snapt.TableStorageNode, + allTables = []*schema.Table{ + &schema.TableIPLDBlock, + &schema.TableNodeInfo, + &schema.TableHeader, + &schema.TableStateNode, + &schema.TableStorageNode, } ) @@ -39,7 +40,7 @@ func writeFiles(t *testing.T, dir string) *publisher { test.NoError(t, err) headerID := fixt.Block1_Header.Hash().String() - test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx)) + test.NoError(t, pub.PublishStateLeafNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx)) test.NoError(t, tx.Commit()) return pub @@ -47,7 +48,7 @@ func writeFiles(t *testing.T, dir string) *publisher { // verify that we can parse the csvs // TODO check actual data -func verifyFileData(t *testing.T, path string, tbl *snapt.Table) { +func verifyFileData(t *testing.T, path string, tbl *schema.Table) { file, err := os.Open(path) test.NoError(t, err) r := csv.NewReader(file) diff --git a/pkg/snapshot/pg/publisher.go b/pkg/snapshot/pg/publisher.go index d3ce792..7378e09 100644 --- a/pkg/snapshot/pg/publisher.go +++ b/pkg/snapshot/pg/publisher.go @@ -160,7 +160,7 @@ func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapT stateNode.HeaderID, stateNode.StateKey, stateNode.CID, - true, + false, stateNode.Balance, stateNode.Nonce, stateNode.CodeHash, @@ -187,7 +187,7 @@ func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel, storageNode.StateKey, storageNode.StorageKey, storageNode.CID, - true, + false, storageNode.Value, false) if err != nil { diff --git a/pkg/snapshot/pg/publisher_test.go b/pkg/snapshot/pg/publisher_test.go index 25c1290..d084bf4 100644 --- a/pkg/snapshot/pg/publisher_test.go +++ b/pkg/snapshot/pg/publisher_test.go @@ -4,12 +4,13 @@ import ( "context" "testing" + "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" fixt "github.com/cerc-io/ipld-eth-state-snapshot/fixture" - snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types" "github.com/cerc-io/ipld-eth-state-snapshot/test" ) @@ -17,12 +18,12 @@ var ( pgConfig = test.DefaultPgConfig nodeInfo = test.DefaultNodeInfo // tables ordered according to fkey depedencies - allTables = []*snapt.Table{ - &snapt.TableIPLDBlock, - &snapt.TableNodeInfo, - &snapt.TableHeader, - &snapt.TableStateNode, - &snapt.TableStorageNode, + allTables = []*schema.Table{ + &schema.TableIPLDBlock, + &schema.TableNodeInfo, + &schema.TableHeader, + &schema.TableStateNode, + &schema.TableStorageNode, } ) @@ -33,7 +34,8 @@ func writeData(t *testing.T, db *postgres.DB) *publisher { test.NoError(t, err) headerID := fixt.Block1_Header.Hash().String() - test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx)) + stateNode := &fixt.Block1_StateNode0 + test.NoError(t, pub.PublishStateLeafNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx)) test.NoError(t, tx.Commit()) return pub diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index 302a673..4053961 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -29,6 +29,8 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/trie" iter "github.com/ethereum/go-ethereum/trie/concurrent_iterator" log "github.com/sirupsen/logrus" @@ -164,7 +166,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { } } -// Create snapshot up to head (ignores height param) +// CreateLatestSnapshot snapshot at head (ignores height param) func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses map[common.Address]struct{}) error { log.Info("Creating snapshot at head") hash := rawdb.ReadHeadHeaderHash(s.ethDB) @@ -175,54 +177,20 @@ func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses map[common return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers, WatchedAddresses: watchedAddresses}) } -type nodeResult struct { - node Node - elements []interface{} -} - -func resolveNode(nodePath []byte, it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { - // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves - if it.Leaf() { - return nil, nil - } - if IsNullHash(it.Hash()) { - return nil, nil +// Full-trie concurrent snapshot +func (s *Service) createSnapshotAsync(ctx context.Context, iters []trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error { + // use errgroup with a context to stop all concurrent iterators if one runs into an error + // each concurrent iterator completes processing it's current node before stopping + g, ctx := errgroup.WithContext(ctx) + for _, it := range iters { + func(it trie.NodeIterator) { + g.Go(func() error { + return s.createSnapshot(ctx, it, headerID, height, seekingPaths) + }) + }(it) } - // use full node path - // (it.Path() will give partial path in case of subtrie iterators) - path := make([]byte, len(nodePath)) - copy(path, nodePath) - n, err := trieDB.Node(it.Hash()) - if err != nil { - return nil, err - } - var elements []interface{} - if err := rlp.DecodeBytes(n, &elements); err != nil { - return nil, err - } - ty, err := CheckKeyType(elements) - if err != nil { - return nil, err - } - return &nodeResult{ - node: Node{ - NodeType: ty, - Path: path, - Value: n, - }, - elements: elements, - }, nil -} - -// validPath checks if a path is prefix to any one of the paths in the given list -func validPath(currentPath []byte, seekingPaths [][]byte) bool { - for _, seekingPath := range seekingPaths { - if bytes.HasPrefix(seekingPath, currentPath) { - return true - } - } - return false + return g.Wait() } // createSnapshot performs traversal using the given iterator and indexes the nodes @@ -264,7 +232,8 @@ func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, head // createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator // continually updating seekedPath with path of the latest processed node -func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator, recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error { +func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator, + recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error { prom.IncActiveIterCount() defer prom.DecActiveIterCount() @@ -288,7 +257,7 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [ // if node path is empty and prefix is nil, it's the root node if prefixPath == nil { // create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie - if err := s.createNodeSnapshot(tx, subTrieIt.Path(), subTrieIt, headerID, height); err != nil { + if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths); err != nil { return err } updateSeekedPath(seekedPath, subTrieIt.Path()) @@ -338,7 +307,7 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [ // if the node is along paths of interest // create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie - if err := s.createNodeSnapshot(tx, nodePath, subTrieIt, headerID, height); err != nil { + if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths); err != nil { return err } // update seeked path after node has been processed @@ -389,83 +358,123 @@ func (s *Service) createSubTrieIt(prefixPath []byte, hash common.Hash, recovered // createNodeSnapshot indexes the current node // entire storage trie is also indexed (if available) -func (s *Service) createNodeSnapshot(tx Tx, path []byte, it trie.NodeIterator, headerID string, height *big.Int) error { - res, err := resolveNode(path, it, s.stateDB.TrieDB()) - if err != nil { - return err - } - if res == nil { - return nil - } - - tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) +func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID string, height *big.Int, watchedAddressesLeafPaths [][]byte) error { + tx, err := s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) if err != nil { return err } - switch res.node.NodeType { - case Leaf: - // if the node is a leaf, decode the account and publish the associated storage trie - // nodes if there are any - var account types.StateAccount - if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { - return fmt.Errorf( - "error decoding account for leaf node at path %x nerror: %v", res.node.Path, err) - } - partialPath := trie.CompactToHex(res.elements[0].([]byte)) - valueNodePath := append(res.node.Path, partialPath...) - encodedPath := trie.HexToCompact(valueNodePath) - leafKey := encodedPath[1:] - res.node.Key = common.BytesToHash(leafKey) - if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil { + // index values by leaf key + if it.Leaf() { + // if it is a "value" node, we will index the value by leaf key + // publish codehash => code mappings + // take storage snapshot + if err := s.processStateValueNode(it, headerID, height, watchedAddressesLeafPaths, tx); err != nil { return err } - - // publish any non-nil code referenced by codehash - if !bytes.Equal(account.CodeHash, emptyCodeHash) { - codeHash := common.BytesToHash(account.CodeHash) - codeBytes := rawdb.ReadCode(s.ethDB, codeHash) - if len(codeBytes) == 0 { - log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey())) - return errors.New("missing code") - } - - if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil { + } else { // trie nodes will be written to blockstore only + // reminder that this includes leaf nodes, since the geth iterator.Leaf() actually signifies a "value" node + // so this is also where we publish the IPLD block corresponding to the "value" nodes indexed above + if IsNullHash(it.Hash()) { + // skip null node + return nil + } + nodeVal := make([]byte, len(it.NodeBlob())) + copy(nodeVal, it.NodeBlob()) + if len(watchedAddressesLeafPaths) > 0 { + var elements []interface{} + if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { return err } + ok, err := isLeaf(elements) + if err != nil { + return err + } + if ok { + nodePath := make([]byte, len(it.Path())) + copy(nodePath, it.Path()) + partialPath := trie.CompactToHex(elements[0].([]byte)) + valueNodePath := append(nodePath, partialPath...) + if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) { + // skip this node + return nil + } + } } - - if _, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil { - return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) - } - case Extension, Branch: - res.node.Key = common.BytesToHash([]byte{}) - if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil { + nodeHash := make([]byte, len(it.Hash().Bytes())) + copy(nodeHash, it.Hash().Bytes()) + if _, err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.MEthStateTrie, nodeHash), nodeVal, height, tx); err != nil { return err } - default: - return errors.New("unexpected node type") } + return it.Error() } -// Full-trie concurrent snapshot -func (s *Service) createSnapshotAsync(ctx context.Context, iters []trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error { - // use errgroup with a context to stop all concurrent iterators if one runs into an error - // each concurrent iterator completes processing it's current node before stopping - g, ctx := errgroup.WithContext(ctx) - for _, it := range iters { - func(it trie.NodeIterator) { - g.Go(func() error { - return s.createSnapshot(ctx, it, headerID, height, seekingPaths) - }) - }(it) +// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT +func (s *Service) processStateValueNode(it trie.NodeIterator, headerID string, height *big.Int, + watchedAddressesLeafPaths [][]byte, tx Tx) error { + // skip if it is not a watched address + // If we aren't watching any specific addresses, we are watching everything + if len(watchedAddressesLeafPaths) > 0 && !isWatchedAddress(watchedAddressesLeafPaths, it.Path()) { + return nil } - return g.Wait() + // created vs updated is important for leaf nodes since we need to diff their storage + // so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey + var account types.StateAccount + accountRLP := make([]byte, len(it.LeafBlob())) + copy(accountRLP, it.LeafBlob()) + if err := rlp.DecodeBytes(accountRLP, &account); err != nil { + return fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", it.LeafKey(), err) + } + leafKey := make([]byte, len(it.LeafKey())) + copy(leafKey, it.LeafKey()) + + // write codehash => code mappings if we have a contract + if !bytes.Equal(account.CodeHash, emptyCodeHash) { + codeHash := common.BytesToHash(account.CodeHash) + code, err := s.stateDB.ContractCode(common.Hash{}, codeHash) + if err != nil { + return fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err) + } + if _, err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()), code, height, tx); err != nil { + return err + } + } + + // since this is a "value node", we need to move up to the "parent" node which is the actual leaf node + // it should be in the fastcache since it necessarily was recently accessed to reach the current node + parentNodeRLP, err := s.stateDB.TrieDB().Node(it.Parent()) + if err != nil { + return err + } + // publish the state leaf model + stateKeyStr := common.BytesToHash(leafKey).String() + stateLeafNodeModel := &models.StateNodeModel{ + BlockNumber: height.String(), + HeaderID: headerID, + StateKey: stateKeyStr, + Removed: false, + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(parentNodeRLP)).String(), + Diff: false, + Balance: account.Balance.String(), + Nonce: account.Nonce, + CodeHash: common.BytesToHash(account.CodeHash).String(), + StorageRoot: account.Root.String(), + } + if err := s.ipfsPublisher.PublishStateLeafNode(stateLeafNodeModel, tx); err != nil { + return fmt.Errorf("failed publishing state leaf node for leaf key %s\r\nerror: %w", stateKeyStr, err) + } + // create storage snapshot + // this short circuits if storage is empty + if _, err := s.storageSnapshot(account.Root, stateKeyStr, headerID, height, tx); err != nil { + return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) + } + return nil } -func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.Int, statePath []byte, tx Tx) (Tx, error) { +func (s *Service) storageSnapshot(sr common.Hash, stateKey, headerID string, height *big.Int, tx Tx) (Tx, error) { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { return tx, nil } @@ -477,42 +486,96 @@ func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.I it := sTrie.NodeIterator(make([]byte, 0)) for it.Next(true) { - res, err := resolveNode(it.Path(), it, s.stateDB.TrieDB()) - if err != nil { - return nil, err - } - if res == nil { - continue - } - - tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) - if err != nil { - return nil, err - } - - var nodeData []byte - nodeData, err = s.stateDB.TrieDB().Node(it.Hash()) - if err != nil { - return nil, err - } - res.node.Value = nodeData - - switch res.node.NodeType { - case Leaf: - partialPath := trie.CompactToHex(res.elements[0].([]byte)) - valueNodePath := append(res.node.Path, partialPath...) - encodedPath := trie.HexToCompact(valueNodePath) - leafKey := encodedPath[1:] - res.node.Key = common.BytesToHash(leafKey) - case Extension, Branch: - res.node.Key = common.BytesToHash([]byte{}) - default: - return nil, errors.New("unexpected node type") - } - if err = s.ipfsPublisher.PublishStorageNode(&res.node, headerID, height, statePath, tx); err != nil { - return nil, err + if it.Leaf() { + if err := s.processStorageValueNode(it, stateKey, headerID, height, tx); err != nil { + return nil, err + } + } else { + nodeVal := make([]byte, len(it.NodeBlob())) + copy(nodeVal, it.NodeBlob()) + nodeHash := make([]byte, len(it.Hash().Bytes())) + copy(nodeHash, it.Hash().Bytes()) + if _, err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.MEthStorageTrie, nodeHash), nodeVal, height, tx); err != nil { + return nil, err + } } } return tx, it.Error() } + +// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT +func (s *Service) processStorageValueNode(it trie.NodeIterator, stateKey, headerID string, height *big.Int, tx Tx) error { + // skip if it is not a watched address + leafKey := make([]byte, len(it.LeafKey())) + copy(leafKey, it.LeafKey()) + value := make([]byte, len(it.LeafBlob())) + copy(value, it.LeafBlob()) + + // since this is a "value node", we need to move up to the "parent" node which is the actual leaf node + // it should be in the fastcache since it necessarily was recently accessed to reach the current node + parentNodeRLP, err := s.stateDB.TrieDB().Node(it.Parent()) + if err != nil { + return err + } + + // publish storage leaf node model + storageLeafKeyStr := common.BytesToHash(leafKey).String() + storageLeafNodeModel := &models.StorageNodeModel{ + BlockNumber: height.String(), + HeaderID: headerID, + StateKey: stateKey, + StorageKey: storageLeafKeyStr, + Removed: false, + CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(parentNodeRLP)).String(), + Diff: false, + Value: value, + } + if err := s.ipfsPublisher.PublishStorageLeafNode(storageLeafNodeModel, tx); err != nil { + return fmt.Errorf("failed to publish storage leaf node for state leaf key %s and storage leaf key %s\r\nerr: %w", stateKey, storageLeafKeyStr, err) + } + return nil +} + +// validPath checks if a path is prefix to any one of the paths in the given list +func validPath(currentPath []byte, seekingPaths [][]byte) bool { + for _, seekingPath := range seekingPaths { + if bytes.HasPrefix(seekingPath, currentPath) { + return true + } + } + return false +} + +// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch +func isWatchedAddress(watchedAddressesLeafPaths [][]byte, valueNodePath []byte) bool { + for _, watchedAddressPath := range watchedAddressesLeafPaths { + if bytes.Equal(watchedAddressPath, valueNodePath) { + return true + } + } + + return false +} + +// isLeaf checks if the node we are at is a leaf +func isLeaf(elements []interface{}) (bool, error) { + if len(elements) > 2 { + return false, nil + } + if len(elements) < 2 { + return false, fmt.Errorf("node cannot be less than two elements in length") + } + switch elements[0].([]byte)[0] / 16 { + case '\x00': + return false, nil + case '\x01': + return false, nil + case '\x02': + return true, nil + case '\x03': + return true, nil + default: + return false, fmt.Errorf("unknown hex prefix") + } +} diff --git a/pkg/snapshot/util.go b/pkg/snapshot/util.go index 19a4881..7e9556a 100644 --- a/pkg/snapshot/util.go +++ b/pkg/snapshot/util.go @@ -9,7 +9,7 @@ import ( "github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom" file "github.com/cerc-io/ipld-eth-state-snapshot/pkg/snapshot/file" - pg "github.com/cerc-io/ipld-eth-state-snapshot/pkg/snapshot/pg" + "github.com/cerc-io/ipld-eth-state-snapshot/pkg/snapshot/pg" snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types" )