actually ignore the subtries that are not along the paths of interest

This commit is contained in:
i-norden 2022-06-16 10:45:44 -05:00 committed by nabarun
parent 038b25a41d
commit 9856419371
2 changed files with 94 additions and 45 deletions

View File

@ -82,6 +82,7 @@ func NewInPlaceSnapshotConfig() *Config {
&EthConfig{}, &EthConfig{},
&DBConfig{}, &DBConfig{},
&FileConfig{}, &FileConfig{},
&ServiceConfig{},
} }
ret.DB.Init() ret.DB.Init()

View File

@ -87,7 +87,7 @@ type SnapshotParams struct {
func (s *Service) CreateSnapshot(params SnapshotParams) error { func (s *Service) CreateSnapshot(params SnapshotParams) error {
paths := make([][]byte, 0, len(params.WatchedAddresses)) paths := make([][]byte, 0, len(params.WatchedAddresses))
for addr := range params.WatchedAddresses { for addr := range params.WatchedAddresses {
paths = append(paths, keybytesToHex(crypto.Keccak256(addr.Bytes()))) paths = append(paths, crypto.Keccak256(addr.Bytes()))
} }
s.watchingAddresses = len(paths) > 0 s.watchingAddresses = len(paths) > 0
// extract header from lvldb and publish to PG-IPFS // extract header from lvldb and publish to PG-IPFS
@ -214,6 +214,7 @@ func validPath(currentPath []byte, seekingPaths [][]byte) bool {
} }
return false return false
} }
func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error { func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
tx, err := s.ipfsPublisher.BeginTx() tx, err := s.ipfsPublisher.BeginTx()
if err != nil { if err != nil {
@ -226,67 +227,114 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *
} }
}() }()
for it.Next(true) { // iterate all the nodes at this level
for it.Next(false) {
// ignore node if it is not along paths of interest
if s.watchingAddresses && !validPath(it.Path(), seekingPaths) { if s.watchingAddresses && !validPath(it.Path(), seekingPaths) {
continue continue
} }
res, err := resolveNode(it, s.stateDB.TrieDB()) // if the node is along paths of interest
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
if err := s.createNodeSnapshot(tx, it, headerID, height, seekingPaths); err != nil {
return err
}
// create subTrie iterator for this node
subTrie, err := s.stateDB.OpenTrie(it.Hash())
if err != nil { if err != nil {
return err return err
} }
if res == nil { subTrieIt := subTrie.NodeIterator(nil)
// traverse and process the next level of this subTrie
if err := s.createSubTrieSnapshot(tx, subTrieIt, headerID, height, seekingPaths); err != nil {
return err
}
}
return it.Error()
}
func (s *Service) createSubTrieSnapshot(tx Tx, it trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
// iterate all the nodes at this level
for it.Next(false) {
// ignore node if it is not along paths of interest
if s.watchingAddresses && !validPath(it.Path(), seekingPaths) {
continue continue
} }
// if the node is along paths of interest
// create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie
if err := s.createNodeSnapshot(tx, it, headerID, height, seekingPaths); err != nil {
return err
}
// create subTrie iterator for this node
subTrie, err := s.stateDB.OpenTrie(it.Hash())
if err != nil {
return err
}
subTrieIt := subTrie.NodeIterator(nil)
// traverse and process the next level of this subTrie
if err := s.createSubTrieSnapshot(tx, subTrieIt, headerID, height, seekingPaths); err != nil {
return err
}
}
return it.Error()
}
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) func (s *Service) createNodeSnapshot(tx Tx, it trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil {
return err
}
if res == nil {
return nil
}
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
if err != nil {
return err
}
switch res.node.NodeType {
case Leaf:
// if the node is a leaf, decode the account and publish the associated storage trie
// nodes if there are any
var account types.StateAccount
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
return fmt.Errorf(
"error decoding account for leaf node at path %x nerror: %v", res.node.Path, err)
}
partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey)
err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx)
if err != nil { if err != nil {
return err return err
} }
switch res.node.NodeType { // publish any non-nil code referenced by codehash
case Leaf: if !bytes.Equal(account.CodeHash, emptyCodeHash) {
// if the node is a leaf, decode the account and publish the associated storage trie codeHash := common.BytesToHash(account.CodeHash)
// nodes if there are any codeBytes := rawdb.ReadCode(s.ethDB, codeHash)
var account types.StateAccount if len(codeBytes) == 0 {
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
return fmt.Errorf( return errors.New("missing code")
"error decoding account for leaf node at path %x nerror: %v", res.node.Path, err)
} }
partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...) if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil {
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey)
err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx)
if err != nil {
return err return err
} }
// publish any non-nil code referenced by codehash
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
codeHash := common.BytesToHash(account.CodeHash)
codeBytes := rawdb.ReadCode(s.ethDB, codeHash)
if len(codeBytes) == 0 {
log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
return errors.New("missing code")
}
if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil {
return err
}
}
if tx, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
}
case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{})
if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil {
return err
}
default:
return errors.New("unexpected node type")
} }
if tx, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
}
case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{})
if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil {
return err
}
default:
return errors.New("unexpected node type")
} }
return it.Error() return it.Error()
} }