update service
This commit is contained in:
parent
382ad92701
commit
22ecd4065a
@ -229,7 +229,7 @@ func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapT
|
|||||||
stateNode.HeaderID,
|
stateNode.HeaderID,
|
||||||
stateNode.StateKey,
|
stateNode.StateKey,
|
||||||
stateNode.CID,
|
stateNode.CID,
|
||||||
true,
|
false,
|
||||||
stateNode.Balance,
|
stateNode.Balance,
|
||||||
strconv.FormatUint(stateNode.Nonce, 10),
|
strconv.FormatUint(stateNode.Nonce, 10),
|
||||||
stateNode.CodeHash,
|
stateNode.CodeHash,
|
||||||
@ -257,7 +257,7 @@ func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel,
|
|||||||
storageNode.StateKey,
|
storageNode.StateKey,
|
||||||
storageNode.StorageKey,
|
storageNode.StorageKey,
|
||||||
storageNode.CID,
|
storageNode.CID,
|
||||||
true,
|
false,
|
||||||
storageNode.Value,
|
storageNode.Value,
|
||||||
false)
|
false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -9,12 +9,13 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
|
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
|
||||||
|
|
||||||
fixt "github.com/cerc-io/ipld-eth-state-snapshot/fixture"
|
fixt "github.com/cerc-io/ipld-eth-state-snapshot/fixture"
|
||||||
snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types"
|
|
||||||
"github.com/cerc-io/ipld-eth-state-snapshot/test"
|
"github.com/cerc-io/ipld-eth-state-snapshot/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -22,12 +23,12 @@ var (
|
|||||||
pgConfig = test.DefaultPgConfig
|
pgConfig = test.DefaultPgConfig
|
||||||
nodeInfo = test.DefaultNodeInfo
|
nodeInfo = test.DefaultNodeInfo
|
||||||
// tables ordered according to fkey depedencies
|
// tables ordered according to fkey depedencies
|
||||||
allTables = []*snapt.Table{
|
allTables = []*schema.Table{
|
||||||
&snapt.TableIPLDBlock,
|
&schema.TableIPLDBlock,
|
||||||
&snapt.TableNodeInfo,
|
&schema.TableNodeInfo,
|
||||||
&snapt.TableHeader,
|
&schema.TableHeader,
|
||||||
&snapt.TableStateNode,
|
&schema.TableStateNode,
|
||||||
&snapt.TableStorageNode,
|
&schema.TableStorageNode,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -39,7 +40,7 @@ func writeFiles(t *testing.T, dir string) *publisher {
|
|||||||
test.NoError(t, err)
|
test.NoError(t, err)
|
||||||
|
|
||||||
headerID := fixt.Block1_Header.Hash().String()
|
headerID := fixt.Block1_Header.Hash().String()
|
||||||
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
|
test.NoError(t, pub.PublishStateLeafNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
|
||||||
|
|
||||||
test.NoError(t, tx.Commit())
|
test.NoError(t, tx.Commit())
|
||||||
return pub
|
return pub
|
||||||
@ -47,7 +48,7 @@ func writeFiles(t *testing.T, dir string) *publisher {
|
|||||||
|
|
||||||
// verify that we can parse the csvs
|
// verify that we can parse the csvs
|
||||||
// TODO check actual data
|
// TODO check actual data
|
||||||
func verifyFileData(t *testing.T, path string, tbl *snapt.Table) {
|
func verifyFileData(t *testing.T, path string, tbl *schema.Table) {
|
||||||
file, err := os.Open(path)
|
file, err := os.Open(path)
|
||||||
test.NoError(t, err)
|
test.NoError(t, err)
|
||||||
r := csv.NewReader(file)
|
r := csv.NewReader(file)
|
||||||
|
@ -160,7 +160,7 @@ func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapT
|
|||||||
stateNode.HeaderID,
|
stateNode.HeaderID,
|
||||||
stateNode.StateKey,
|
stateNode.StateKey,
|
||||||
stateNode.CID,
|
stateNode.CID,
|
||||||
true,
|
false,
|
||||||
stateNode.Balance,
|
stateNode.Balance,
|
||||||
stateNode.Nonce,
|
stateNode.Nonce,
|
||||||
stateNode.CodeHash,
|
stateNode.CodeHash,
|
||||||
@ -187,7 +187,7 @@ func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel,
|
|||||||
storageNode.StateKey,
|
storageNode.StateKey,
|
||||||
storageNode.StorageKey,
|
storageNode.StorageKey,
|
||||||
storageNode.CID,
|
storageNode.CID,
|
||||||
true,
|
false,
|
||||||
storageNode.Value,
|
storageNode.Value,
|
||||||
false)
|
false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -4,12 +4,13 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
|
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
|
||||||
|
|
||||||
fixt "github.com/cerc-io/ipld-eth-state-snapshot/fixture"
|
fixt "github.com/cerc-io/ipld-eth-state-snapshot/fixture"
|
||||||
snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types"
|
|
||||||
"github.com/cerc-io/ipld-eth-state-snapshot/test"
|
"github.com/cerc-io/ipld-eth-state-snapshot/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -17,12 +18,12 @@ var (
|
|||||||
pgConfig = test.DefaultPgConfig
|
pgConfig = test.DefaultPgConfig
|
||||||
nodeInfo = test.DefaultNodeInfo
|
nodeInfo = test.DefaultNodeInfo
|
||||||
// tables ordered according to fkey depedencies
|
// tables ordered according to fkey depedencies
|
||||||
allTables = []*snapt.Table{
|
allTables = []*schema.Table{
|
||||||
&snapt.TableIPLDBlock,
|
&schema.TableIPLDBlock,
|
||||||
&snapt.TableNodeInfo,
|
&schema.TableNodeInfo,
|
||||||
&snapt.TableHeader,
|
&schema.TableHeader,
|
||||||
&snapt.TableStateNode,
|
&schema.TableStateNode,
|
||||||
&snapt.TableStorageNode,
|
&schema.TableStorageNode,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -33,7 +34,8 @@ func writeData(t *testing.T, db *postgres.DB) *publisher {
|
|||||||
test.NoError(t, err)
|
test.NoError(t, err)
|
||||||
|
|
||||||
headerID := fixt.Block1_Header.Hash().String()
|
headerID := fixt.Block1_Header.Hash().String()
|
||||||
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
|
stateNode := &fixt.Block1_StateNode0
|
||||||
|
test.NoError(t, pub.PublishStateLeafNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
|
||||||
|
|
||||||
test.NoError(t, tx.Commit())
|
test.NoError(t, tx.Commit())
|
||||||
return pub
|
return pub
|
||||||
|
@ -29,6 +29,8 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
iter "github.com/ethereum/go-ethereum/trie/concurrent_iterator"
|
iter "github.com/ethereum/go-ethereum/trie/concurrent_iterator"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@ -164,7 +166,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create snapshot up to head (ignores height param)
|
// CreateLatestSnapshot snapshot at head (ignores height param)
|
||||||
func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses map[common.Address]struct{}) error {
|
func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses map[common.Address]struct{}) error {
|
||||||
log.Info("Creating snapshot at head")
|
log.Info("Creating snapshot at head")
|
||||||
hash := rawdb.ReadHeadHeaderHash(s.ethDB)
|
hash := rawdb.ReadHeadHeaderHash(s.ethDB)
|
||||||
@ -175,54 +177,20 @@ func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses map[common
|
|||||||
return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers, WatchedAddresses: watchedAddresses})
|
return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers, WatchedAddresses: watchedAddresses})
|
||||||
}
|
}
|
||||||
|
|
||||||
type nodeResult struct {
|
// Full-trie concurrent snapshot
|
||||||
node Node
|
func (s *Service) createSnapshotAsync(ctx context.Context, iters []trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
|
||||||
elements []interface{}
|
// use errgroup with a context to stop all concurrent iterators if one runs into an error
|
||||||
|
// each concurrent iterator completes processing it's current node before stopping
|
||||||
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
for _, it := range iters {
|
||||||
|
func(it trie.NodeIterator) {
|
||||||
|
g.Go(func() error {
|
||||||
|
return s.createSnapshot(ctx, it, headerID, height, seekingPaths)
|
||||||
|
})
|
||||||
|
}(it)
|
||||||
}
|
}
|
||||||
|
|
||||||
func resolveNode(nodePath []byte, it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) {
|
return g.Wait()
|
||||||
// "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
|
|
||||||
if it.Leaf() {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
if IsNullHash(it.Hash()) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// use full node path
|
|
||||||
// (it.Path() will give partial path in case of subtrie iterators)
|
|
||||||
path := make([]byte, len(nodePath))
|
|
||||||
copy(path, nodePath)
|
|
||||||
n, err := trieDB.Node(it.Hash())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var elements []interface{}
|
|
||||||
if err := rlp.DecodeBytes(n, &elements); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ty, err := CheckKeyType(elements)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &nodeResult{
|
|
||||||
node: Node{
|
|
||||||
NodeType: ty,
|
|
||||||
Path: path,
|
|
||||||
Value: n,
|
|
||||||
},
|
|
||||||
elements: elements,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// validPath checks if a path is prefix to any one of the paths in the given list
|
|
||||||
func validPath(currentPath []byte, seekingPaths [][]byte) bool {
|
|
||||||
for _, seekingPath := range seekingPaths {
|
|
||||||
if bytes.HasPrefix(seekingPath, currentPath) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// createSnapshot performs traversal using the given iterator and indexes the nodes
|
// createSnapshot performs traversal using the given iterator and indexes the nodes
|
||||||
@ -264,7 +232,8 @@ func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, head
|
|||||||
|
|
||||||
// createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator
|
// createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator
|
||||||
// continually updating seekedPath with path of the latest processed node
|
// continually updating seekedPath with path of the latest processed node
|
||||||
func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator, recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error {
|
func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator,
|
||||||
|
recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error {
|
||||||
prom.IncActiveIterCount()
|
prom.IncActiveIterCount()
|
||||||
defer prom.DecActiveIterCount()
|
defer prom.DecActiveIterCount()
|
||||||
|
|
||||||
@ -288,7 +257,7 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
|
|||||||
// if node path is empty and prefix is nil, it's the root node
|
// if node path is empty and prefix is nil, it's the root node
|
||||||
if prefixPath == nil {
|
if prefixPath == nil {
|
||||||
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
|
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
|
||||||
if err := s.createNodeSnapshot(tx, subTrieIt.Path(), subTrieIt, headerID, height); err != nil {
|
if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
updateSeekedPath(seekedPath, subTrieIt.Path())
|
updateSeekedPath(seekedPath, subTrieIt.Path())
|
||||||
@ -338,7 +307,7 @@ func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath [
|
|||||||
|
|
||||||
// if the node is along paths of interest
|
// if the node is along paths of interest
|
||||||
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
|
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
|
||||||
if err := s.createNodeSnapshot(tx, nodePath, subTrieIt, headerID, height); err != nil {
|
if err := s.createNodeSnapshot(tx, subTrieIt, headerID, height, seekingPaths); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// update seeked path after node has been processed
|
// update seeked path after node has been processed
|
||||||
@ -389,83 +358,123 @@ func (s *Service) createSubTrieIt(prefixPath []byte, hash common.Hash, recovered
|
|||||||
|
|
||||||
// createNodeSnapshot indexes the current node
|
// createNodeSnapshot indexes the current node
|
||||||
// entire storage trie is also indexed (if available)
|
// entire storage trie is also indexed (if available)
|
||||||
func (s *Service) createNodeSnapshot(tx Tx, path []byte, it trie.NodeIterator, headerID string, height *big.Int) error {
|
func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID string, height *big.Int, watchedAddressesLeafPaths [][]byte) error {
|
||||||
res, err := resolveNode(path, it, s.stateDB.TrieDB())
|
tx, err := s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if res == nil {
|
|
||||||
|
// index values by leaf key
|
||||||
|
if it.Leaf() {
|
||||||
|
// if it is a "value" node, we will index the value by leaf key
|
||||||
|
// publish codehash => code mappings
|
||||||
|
// take storage snapshot
|
||||||
|
if err := s.processStateValueNode(it, headerID, height, watchedAddressesLeafPaths, tx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else { // trie nodes will be written to blockstore only
|
||||||
|
// reminder that this includes leaf nodes, since the geth iterator.Leaf() actually signifies a "value" node
|
||||||
|
// so this is also where we publish the IPLD block corresponding to the "value" nodes indexed above
|
||||||
|
if IsNullHash(it.Hash()) {
|
||||||
|
// skip null node
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
nodeVal := make([]byte, len(it.NodeBlob()))
|
||||||
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
|
copy(nodeVal, it.NodeBlob())
|
||||||
|
if len(watchedAddressesLeafPaths) > 0 {
|
||||||
|
var elements []interface{}
|
||||||
|
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ok, err := isLeaf(elements)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if ok {
|
||||||
switch res.node.NodeType {
|
nodePath := make([]byte, len(it.Path()))
|
||||||
case Leaf:
|
copy(nodePath, it.Path())
|
||||||
// if the node is a leaf, decode the account and publish the associated storage trie
|
partialPath := trie.CompactToHex(elements[0].([]byte))
|
||||||
// nodes if there are any
|
valueNodePath := append(nodePath, partialPath...)
|
||||||
var account types.StateAccount
|
if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) {
|
||||||
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
|
// skip this node
|
||||||
return fmt.Errorf(
|
return nil
|
||||||
"error decoding account for leaf node at path %x nerror: %v", res.node.Path, err)
|
|
||||||
}
|
}
|
||||||
partialPath := trie.CompactToHex(res.elements[0].([]byte))
|
|
||||||
valueNodePath := append(res.node.Path, partialPath...)
|
|
||||||
encodedPath := trie.HexToCompact(valueNodePath)
|
|
||||||
leafKey := encodedPath[1:]
|
|
||||||
res.node.Key = common.BytesToHash(leafKey)
|
|
||||||
if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// publish any non-nil code referenced by codehash
|
|
||||||
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
|
|
||||||
codeHash := common.BytesToHash(account.CodeHash)
|
|
||||||
codeBytes := rawdb.ReadCode(s.ethDB, codeHash)
|
|
||||||
if len(codeBytes) == 0 {
|
|
||||||
log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
|
|
||||||
return errors.New("missing code")
|
|
||||||
}
|
}
|
||||||
|
nodeHash := make([]byte, len(it.Hash().Bytes()))
|
||||||
if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil {
|
copy(nodeHash, it.Hash().Bytes())
|
||||||
|
if _, err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.MEthStateTrie, nodeHash), nodeVal, height, tx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil {
|
|
||||||
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
|
|
||||||
}
|
|
||||||
case Extension, Branch:
|
|
||||||
res.node.Key = common.BytesToHash([]byte{})
|
|
||||||
if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return errors.New("unexpected node type")
|
|
||||||
}
|
|
||||||
return it.Error()
|
return it.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Full-trie concurrent snapshot
|
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT
|
||||||
func (s *Service) createSnapshotAsync(ctx context.Context, iters []trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
|
func (s *Service) processStateValueNode(it trie.NodeIterator, headerID string, height *big.Int,
|
||||||
// use errgroup with a context to stop all concurrent iterators if one runs into an error
|
watchedAddressesLeafPaths [][]byte, tx Tx) error {
|
||||||
// each concurrent iterator completes processing it's current node before stopping
|
// skip if it is not a watched address
|
||||||
g, ctx := errgroup.WithContext(ctx)
|
// If we aren't watching any specific addresses, we are watching everything
|
||||||
for _, it := range iters {
|
if len(watchedAddressesLeafPaths) > 0 && !isWatchedAddress(watchedAddressesLeafPaths, it.Path()) {
|
||||||
func(it trie.NodeIterator) {
|
return nil
|
||||||
g.Go(func() error {
|
|
||||||
return s.createSnapshot(ctx, it, headerID, height, seekingPaths)
|
|
||||||
})
|
|
||||||
}(it)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return g.Wait()
|
// created vs updated is important for leaf nodes since we need to diff their storage
|
||||||
|
// so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey
|
||||||
|
var account types.StateAccount
|
||||||
|
accountRLP := make([]byte, len(it.LeafBlob()))
|
||||||
|
copy(accountRLP, it.LeafBlob())
|
||||||
|
if err := rlp.DecodeBytes(accountRLP, &account); err != nil {
|
||||||
|
return fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", it.LeafKey(), err)
|
||||||
|
}
|
||||||
|
leafKey := make([]byte, len(it.LeafKey()))
|
||||||
|
copy(leafKey, it.LeafKey())
|
||||||
|
|
||||||
|
// write codehash => code mappings if we have a contract
|
||||||
|
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
|
||||||
|
codeHash := common.BytesToHash(account.CodeHash)
|
||||||
|
code, err := s.stateDB.ContractCode(common.Hash{}, codeHash)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
|
||||||
|
}
|
||||||
|
if _, err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()), code, height, tx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.Int, statePath []byte, tx Tx) (Tx, error) {
|
// since this is a "value node", we need to move up to the "parent" node which is the actual leaf node
|
||||||
|
// it should be in the fastcache since it necessarily was recently accessed to reach the current node
|
||||||
|
parentNodeRLP, err := s.stateDB.TrieDB().Node(it.Parent())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// publish the state leaf model
|
||||||
|
stateKeyStr := common.BytesToHash(leafKey).String()
|
||||||
|
stateLeafNodeModel := &models.StateNodeModel{
|
||||||
|
BlockNumber: height.String(),
|
||||||
|
HeaderID: headerID,
|
||||||
|
StateKey: stateKeyStr,
|
||||||
|
Removed: false,
|
||||||
|
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(parentNodeRLP)).String(),
|
||||||
|
Diff: false,
|
||||||
|
Balance: account.Balance.String(),
|
||||||
|
Nonce: account.Nonce,
|
||||||
|
CodeHash: common.BytesToHash(account.CodeHash).String(),
|
||||||
|
StorageRoot: account.Root.String(),
|
||||||
|
}
|
||||||
|
if err := s.ipfsPublisher.PublishStateLeafNode(stateLeafNodeModel, tx); err != nil {
|
||||||
|
return fmt.Errorf("failed publishing state leaf node for leaf key %s\r\nerror: %w", stateKeyStr, err)
|
||||||
|
}
|
||||||
|
// create storage snapshot
|
||||||
|
// this short circuits if storage is empty
|
||||||
|
if _, err := s.storageSnapshot(account.Root, stateKeyStr, headerID, height, tx); err != nil {
|
||||||
|
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) storageSnapshot(sr common.Hash, stateKey, headerID string, height *big.Int, tx Tx) (Tx, error) {
|
||||||
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
|
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
|
||||||
return tx, nil
|
return tx, nil
|
||||||
}
|
}
|
||||||
@ -477,42 +486,96 @@ func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.I
|
|||||||
|
|
||||||
it := sTrie.NodeIterator(make([]byte, 0))
|
it := sTrie.NodeIterator(make([]byte, 0))
|
||||||
for it.Next(true) {
|
for it.Next(true) {
|
||||||
res, err := resolveNode(it.Path(), it, s.stateDB.TrieDB())
|
if it.Leaf() {
|
||||||
if err != nil {
|
if err := s.processStorageValueNode(it, stateKey, headerID, height, tx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if res == nil {
|
} else {
|
||||||
continue
|
nodeVal := make([]byte, len(it.NodeBlob()))
|
||||||
}
|
copy(nodeVal, it.NodeBlob())
|
||||||
|
nodeHash := make([]byte, len(it.Hash().Bytes()))
|
||||||
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
|
copy(nodeHash, it.Hash().Bytes())
|
||||||
if err != nil {
|
if _, err := s.ipfsPublisher.PublishIPLD(ipld.Keccak256ToCid(ipld.MEthStorageTrie, nodeHash), nodeVal, height, tx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var nodeData []byte
|
|
||||||
nodeData, err = s.stateDB.TrieDB().Node(it.Hash())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
res.node.Value = nodeData
|
|
||||||
|
|
||||||
switch res.node.NodeType {
|
|
||||||
case Leaf:
|
|
||||||
partialPath := trie.CompactToHex(res.elements[0].([]byte))
|
|
||||||
valueNodePath := append(res.node.Path, partialPath...)
|
|
||||||
encodedPath := trie.HexToCompact(valueNodePath)
|
|
||||||
leafKey := encodedPath[1:]
|
|
||||||
res.node.Key = common.BytesToHash(leafKey)
|
|
||||||
case Extension, Branch:
|
|
||||||
res.node.Key = common.BytesToHash([]byte{})
|
|
||||||
default:
|
|
||||||
return nil, errors.New("unexpected node type")
|
|
||||||
}
|
|
||||||
if err = s.ipfsPublisher.PublishStorageNode(&res.node, headerID, height, statePath, tx); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx, it.Error()
|
return tx, it.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT
|
||||||
|
func (s *Service) processStorageValueNode(it trie.NodeIterator, stateKey, headerID string, height *big.Int, tx Tx) error {
|
||||||
|
// skip if it is not a watched address
|
||||||
|
leafKey := make([]byte, len(it.LeafKey()))
|
||||||
|
copy(leafKey, it.LeafKey())
|
||||||
|
value := make([]byte, len(it.LeafBlob()))
|
||||||
|
copy(value, it.LeafBlob())
|
||||||
|
|
||||||
|
// since this is a "value node", we need to move up to the "parent" node which is the actual leaf node
|
||||||
|
// it should be in the fastcache since it necessarily was recently accessed to reach the current node
|
||||||
|
parentNodeRLP, err := s.stateDB.TrieDB().Node(it.Parent())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// publish storage leaf node model
|
||||||
|
storageLeafKeyStr := common.BytesToHash(leafKey).String()
|
||||||
|
storageLeafNodeModel := &models.StorageNodeModel{
|
||||||
|
BlockNumber: height.String(),
|
||||||
|
HeaderID: headerID,
|
||||||
|
StateKey: stateKey,
|
||||||
|
StorageKey: storageLeafKeyStr,
|
||||||
|
Removed: false,
|
||||||
|
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(parentNodeRLP)).String(),
|
||||||
|
Diff: false,
|
||||||
|
Value: value,
|
||||||
|
}
|
||||||
|
if err := s.ipfsPublisher.PublishStorageLeafNode(storageLeafNodeModel, tx); err != nil {
|
||||||
|
return fmt.Errorf("failed to publish storage leaf node for state leaf key %s and storage leaf key %s\r\nerr: %w", stateKey, storageLeafKeyStr, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// validPath checks if a path is prefix to any one of the paths in the given list
|
||||||
|
func validPath(currentPath []byte, seekingPaths [][]byte) bool {
|
||||||
|
for _, seekingPath := range seekingPaths {
|
||||||
|
if bytes.HasPrefix(seekingPath, currentPath) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
|
||||||
|
func isWatchedAddress(watchedAddressesLeafPaths [][]byte, valueNodePath []byte) bool {
|
||||||
|
for _, watchedAddressPath := range watchedAddressesLeafPaths {
|
||||||
|
if bytes.Equal(watchedAddressPath, valueNodePath) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// isLeaf checks if the node we are at is a leaf
|
||||||
|
func isLeaf(elements []interface{}) (bool, error) {
|
||||||
|
if len(elements) > 2 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if len(elements) < 2 {
|
||||||
|
return false, fmt.Errorf("node cannot be less than two elements in length")
|
||||||
|
}
|
||||||
|
switch elements[0].([]byte)[0] / 16 {
|
||||||
|
case '\x00':
|
||||||
|
return false, nil
|
||||||
|
case '\x01':
|
||||||
|
return false, nil
|
||||||
|
case '\x02':
|
||||||
|
return true, nil
|
||||||
|
case '\x03':
|
||||||
|
return true, nil
|
||||||
|
default:
|
||||||
|
return false, fmt.Errorf("unknown hex prefix")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
|
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
|
||||||
file "github.com/cerc-io/ipld-eth-state-snapshot/pkg/snapshot/file"
|
file "github.com/cerc-io/ipld-eth-state-snapshot/pkg/snapshot/file"
|
||||||
pg "github.com/cerc-io/ipld-eth-state-snapshot/pkg/snapshot/pg"
|
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/snapshot/pg"
|
||||||
snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types"
|
snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user