[wip] use symmetric difference iterator
Some checks failed
Test / Run unit tests (pull_request) Failing after 5s
Test / Run integration tests (pull_request) Failing after 8s

This commit is contained in:
Roy Crihfield 2023-07-24 01:00:37 +08:00
parent b9d988e0ec
commit 79178e0f29
2 changed files with 93 additions and 151 deletions

View File

@ -120,47 +120,27 @@ func (sdb *StateDiffBuilder) WriteStateDiff(args Args, params Params, nodeSink s
return fmt.Errorf("error creating trie for newStateRoot: %w", err) return fmt.Errorf("error creating trie for newStateRoot: %w", err)
} }
// we do two state trie iterations: iters := IterPair{
// one for new/updated nodes, Older: oldTrie.NodeIterator(nil),
// one for deleted/updated nodes; Newer: newTrie.NodeIterator(nil),
// prepare 2 iterator instances for each task
iterPairs := []IterPair{
{
Older: oldTrie.NodeIterator([]byte{}),
Newer: newTrie.NodeIterator([]byte{}),
},
{
Older: oldTrie.NodeIterator([]byte{}),
Newer: newTrie.NodeIterator([]byte{}),
},
} }
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
return sdb.BuildStateDiff(iterPairs, params, nodeSink, ipldSink, logger) return sdb.BuildStateDiff(iters, params, nodeSink, ipldSink, logger)
} }
func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params, func (sdb *StateDiffBuilder) BuildStateDiff(iterPair IterPair, params Params,
sinkNode sdtypes.StateNodeSink, sinkIpld sdtypes.IPLDSink, logger log.Logger) error { nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, logger log.Logger) error {
logger.Trace("statediff BEGIN BuildStateDiff") logger.Trace("statediff BEGIN BuildStateDiff")
defer metrics.ReportAndUpdateDuration("statediff END BuildStateDiff", time.Now(), logger, metrics.IndexerMetrics.BuildStateDiffTimer) defer metrics.ReportAndUpdateDuration("statediff END BuildStateDiff", time.Now(), logger, metrics.IndexerMetrics.BuildStateDiffTimer)
// collect a slice of all the nodes that were touched and exist at B (B-A) // collect a slice of all the nodes that were touched and exist at B (B-A)
// a map of their leafkey to all the accounts that were touched and exist at B // a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets // and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, err := sdb.createdAndUpdatedState( diffAccountsAtB, diffAccountsAtA, err := sdb.createdAndUpdatedState(
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, sinkIpld, logger) iterPair.Older, iterPair.Newer, params.watchedAddressesLeafPaths, nodeSink, ipldSink, logger)
if err != nil { if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err) return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err)
} }
// collect a slice of all the nodes that existed at a path in A that doesn't exist in B
// a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedState(
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
params.watchedAddressesLeafPaths, sinkNode, logger)
if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %w", err)
}
// collect and sort the leafkey keys for both account mappings into a slice // collect and sort the leafkey keys for both account mappings into a slice
t := time.Now() t := time.Now()
createKeys := trie_helpers.SortKeys(diffAccountsAtB) createKeys := trie_helpers.SortKeys(diffAccountsAtB)
@ -178,12 +158,12 @@ func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params,
"duration", time.Since(t)) "duration", time.Since(t))
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two // build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, sinkNode, sinkIpld, logger) err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, nodeSink, ipldSink, logger)
if err != nil { if err != nil {
return fmt.Errorf("error building diff for updated accounts: %w", err) return fmt.Errorf("error building diff for updated accounts: %w", err)
} }
// build the diff nodes for created accounts // build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, sinkNode, sinkIpld, logger) err = sdb.buildAccountCreations(diffAccountsAtB, nodeSink, ipldSink, logger)
if err != nil { if err != nil {
return fmt.Errorf("error building diff for created accounts: %w", err) return fmt.Errorf("error building diff for created accounts: %w", err)
} }
@ -195,25 +175,46 @@ func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params,
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A // a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both // and a slice of the paths for all of the nodes included in both
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
watchedAddressesLeafPaths [][]byte, ipldSink sdtypes.IPLDSink, logger log.Logger) (sdtypes.AccountMap, error) { watchedAddressesLeafPaths [][]byte,
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
logger log.Logger,
) (sdtypes.AccountMap, sdtypes.AccountMap, error) {
logger.Trace("statediff BEGIN createdAndUpdatedState") logger.Trace("statediff BEGIN createdAndUpdatedState")
defer metrics.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics.IndexerMetrics.CreatedAndUpdatedStateTimer) defer metrics.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics.IndexerMetrics.CreatedAndUpdatedStateTimer)
diffAccountsAtB := make(sdtypes.AccountMap) diffAccountsAtB := make(sdtypes.AccountMap)
diffAccountsAtA := make(sdtypes.AccountMap)
// cache the RLP of the previous node, so when we hit a leaf we have the parent (containing) node // Cache the RLP of the previous node on A and B. When we hit a value node this will be the parent blob.
var prevBlob []byte var prevBlobFromA, prevBlobFromB []byte
it, itCount := trie.NewDifferenceIterator(a, b) it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) { for it.Next(true) {
// ignore node if it is not along paths of interest // ignore node if it is not along paths of interest
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue continue
} }
if it.FromA() {
if it.Leaf() {
accountW, err := sdb.processStateValueNode(it, prevBlobFromB)
if err != nil {
return nil, nil, err
}
if accountW == nil {
continue
}
leafKey := hex.EncodeToString(accountW.LeafKey)
diffAccountsAtA[leafKey] = *accountW
} else {
prevBlobFromB = make([]byte, len(it.NodeBlob()))
copy(prevBlobFromB, it.NodeBlob())
}
continue
}
// index values by leaf key // index values by leaf key
if it.Leaf() { if it.Leaf() {
// if it is a "value" node, we will index the value by leaf key // if it is a "value" node, we will index the value by leaf key
accountW, err := sdb.processStateValueNode(it, prevBlob) accountW, err := sdb.processStateValueNode(it, prevBlobFromA)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if accountW == nil { if accountW == nil {
continue continue
@ -235,11 +236,11 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
if len(watchedAddressesLeafPaths) > 0 { if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{} var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return nil, err return nil, nil, err
} }
ok, err := isLeaf(elements) ok, err := isLeaf(elements)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
partialPath := utils.CompactToHex(elements[0].([]byte)) partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...) valueNodePath := append(it.Path(), partialPath...)
@ -251,14 +252,40 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(), CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal, Content: nodeVal,
}); err != nil { }); err != nil {
return nil, err return nil, nil, err
} }
prevBlob = nodeVal prevBlobFromA = nodeVal
}
}
for leafKey, accountW := range diffAccountsAtA {
if _, ok := diffAccountsAtB[leafKey]; ok {
continue
}
// if this node's leaf key did not show up in diffAccountsAtB
// that means the account was deleted
// in that case, emit an empty "removed" diff state node
// include empty "removed" diff storage nodes for all the storage slots
diff := sdtypes.StateLeafNode{
AccountWrapper: sdtypes.AccountWrapper{
Account: nil,
LeafKey: accountW.LeafKey,
CID: shared.RemovedNodeStateCID,
},
Removed: true,
}
err := sdb.buildRemovedAccountStorageNodes(accountW.Account.Root, StorageNodeAppender(&diff.StorageDiff))
if err != nil {
return nil, nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
}
if err := nodeSink(diff); err != nil {
return nil, nil, err
} }
} }
logger.Debug("statediff COUNTS createdAndUpdatedState", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB)) logger.Debug("statediff COUNTS createdAndUpdatedState", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
return diffAccountsAtB, it.Error() return diffAccountsAtB, diffAccountsAtA, it.Error()
} }
// decodes account at leaf and encodes RLP data to CID // decodes account at leaf and encodes RLP data to CID
@ -277,63 +304,6 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentB
}, nil }, nil
} }
// deletedState returns a slice of all the paths that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *StateDiffBuilder) deletedState(a, b trie.NodeIterator, diffAccountsAtB sdtypes.AccountMap,
watchedAddressesLeafPaths [][]byte, nodeSink sdtypes.StateNodeSink, logger log.Logger) (sdtypes.AccountMap, error) {
logger.Trace("statediff BEGIN deletedState")
defer metrics.ReportAndUpdateDuration("statediff END deletedState", time.Now(), logger, metrics.IndexerMetrics.DeletedStateTimer)
diffAccountAtA := make(sdtypes.AccountMap)
var prevBlob []byte
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue
}
if it.Leaf() {
accountW, err := sdb.processStateValueNode(it, prevBlob)
if err != nil {
return nil, err
}
if accountW == nil {
continue
}
leafKey := hex.EncodeToString(accountW.LeafKey)
diffAccountAtA[leafKey] = *accountW
// if this node's leaf key did not show up in diffAccountsAtB
// that means the account was deleted
// in that case, emit an empty "removed" diff state node
// include empty "removed" diff storage nodes for all the storage slots
if _, ok := diffAccountsAtB[leafKey]; !ok {
diff := sdtypes.StateLeafNode{
AccountWrapper: sdtypes.AccountWrapper{
Account: nil,
LeafKey: accountW.LeafKey,
CID: shared.RemovedNodeStateCID,
},
Removed: true,
}
storageDiff := make([]sdtypes.StorageLeafNode, 0)
err := sdb.buildRemovedAccountStorageNodes(accountW.Account.Root, StorageNodeAppender(&storageDiff))
if err != nil {
return nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
}
diff.StorageDiff = storageDiff
if err := nodeSink(diff); err != nil {
return nil, err
}
}
} else {
prevBlob = make([]byte, len(it.NodeBlob()))
copy(prevBlob, it.NodeBlob())
}
}
return diffAccountAtA, it.Error()
}
// buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys // buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys
// to generate the statediff node objects for all of the accounts that existed at both A and B but in different states // to generate the statediff node objects for all of the accounts that existed at both A and B but in different states
// needs to be called before building account creations and deletions as this mutates // needs to be called before building account creations and deletions as this mutates
@ -348,7 +318,7 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.Ac
for _, key := range updatedKeys { for _, key := range updatedKeys {
createdAcc := creations[key] createdAcc := creations[key]
deletedAcc := deletions[key] deletedAcc := deletions[key]
storageDiff := make([]sdtypes.StorageLeafNode, 0) var storageDiff []sdtypes.StorageLeafNode
if deletedAcc.Account != nil && createdAcc.Account != nil { if deletedAcc.Account != nil && createdAcc.Account != nil {
err = sdb.buildStorageNodesIncremental( err = sdb.buildStorageNodesIncremental(
deletedAcc.Account.Root, createdAcc.Account.Root, deletedAcc.Account.Root, createdAcc.Account.Root,
@ -360,7 +330,6 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.Ac
} }
if err = nodeSink(sdtypes.StateLeafNode{ if err = nodeSink(sdtypes.StateLeafNode{
AccountWrapper: createdAcc, AccountWrapper: createdAcc,
Removed: false,
StorageDiff: storageDiff, StorageDiff: storageDiff,
}); err != nil { }); err != nil {
return err return err
@ -382,16 +351,13 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts sdtypes.AccountMap,
for _, val := range accounts { for _, val := range accounts {
diff := sdtypes.StateLeafNode{ diff := sdtypes.StateLeafNode{
AccountWrapper: val, AccountWrapper: val,
Removed: false,
} }
if !bytes.Equal(val.Account.CodeHash, nullCodeHash) { if !bytes.Equal(val.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff // For contract creations, any storage node contained is a diff
storageDiff := make([]sdtypes.StorageLeafNode, 0) err := sdb.buildStorageNodesEventual(val.Account.Root, StorageNodeAppender(&diff.StorageDiff), ipldSink)
err := sdb.buildStorageNodesEventual(val.Account.Root, StorageNodeAppender(&storageDiff), ipldSink)
if err != nil { if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", val.LeafKey, err) return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", val.LeafKey, err)
} }
diff.StorageDiff = storageDiff
// emit codehash => code mappings for contract // emit codehash => code mappings for contract
codeHash := common.BytesToHash(val.Account.CodeHash) codeHash := common.BytesToHash(val.Account.CodeHash)
code, err := sdb.stateCache.ContractCode(codeHash) code, err := sdb.stateCache.ContractCode(codeHash)
@ -520,29 +486,23 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldroot common.Hash, n
return err return err
} }
diffSlotsAtB, err := sdb.createdAndUpdatedStorage( a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil)
oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil), storageSink, ipldSink) changedSlots := make(map[string]bool)
if err != nil {
return err
}
return sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil),
diffSlotsAtB, storageSink)
}
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, storageSink sdtypes.StorageNodeSink,
ipldSink sdtypes.IPLDSink) (map[string]bool, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.CreatedAndUpdatedStorageTimer)
diffSlotsAtB := make(map[string]bool)
var prevBlob []byte var prevBlob []byte
it, _ := trie.NewDifferenceIterator(a, b) it, _ := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) { for it.Next(true) {
if it.FromA() {
if it.Leaf() {
changedSlots[string(it.LeafKey())] = true
}
continue
}
if it.Leaf() { if it.Leaf() {
storageLeafNode := sdb.processStorageValueNode(it, prevBlob) storageLeafNode := sdb.processStorageValueNode(it, prevBlob)
if err := storageSink(storageLeafNode); err != nil { if err := storageSink(storageLeafNode); err != nil {
return nil, err return err
} }
diffSlotsAtB[hex.EncodeToString(storageLeafNode.LeafKey)] = true delete(changedSlots, string(storageLeafNode.LeafKey))
} else { } else {
if it.Hash() == zeroHash { if it.Hash() == zeroHash {
continue continue
@ -553,34 +513,22 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, st
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(), CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(),
Content: nodeVal, Content: nodeVal,
}); err != nil { }); err != nil {
return nil, err return err
} }
prevBlob = nodeVal prevBlob = nodeVal
} }
} }
return diffSlotsAtB, it.Error() for leafKey := range changedSlots {
} // if this node's leaf key did not show up in diffSlotsAtB
// that means the storage slot was vacated
func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, storageSink sdtypes.StorageNodeSink) error { // in that case, emit an empty "removed" diff storage node
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DeletedOrUpdatedStorageTimer) if err := storageSink(sdtypes.StorageLeafNode{
it, _ := trie.NewDifferenceIterator(b, a) CID: shared.RemovedNodeStorageCID,
for it.Next(true) { Removed: true,
if it.Leaf() { LeafKey: []byte(leafKey),
leafKey := make([]byte, len(it.LeafKey())) Value: []byte{},
copy(leafKey, it.LeafKey()) }); err != nil {
// if this node's leaf key did not show up in diffSlotsAtB return err
// that means the storage slot was vacated
// in that case, emit an empty "removed" diff storage node
if _, ok := diffSlotsAtB[hex.EncodeToString(leafKey)]; !ok {
if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
Value: []byte{},
}); err != nil {
return err
}
}
} }
} }
return it.Error() return it.Error()

View File

@ -84,8 +84,6 @@ type IndexerMetricsHandles struct {
IPLDOutputTimer metrics.Timer IPLDOutputTimer metrics.Timer
DifferenceIteratorNextTimer metrics.Timer DifferenceIteratorNextTimer metrics.Timer
DifferenceIteratorCounter metrics.Counter DifferenceIteratorCounter metrics.Counter
DeletedOrUpdatedStorageTimer metrics.Timer
CreatedAndUpdatedStorageTimer metrics.Timer
BuildStorageNodesIncrementalTimer metrics.Timer BuildStorageNodesIncrementalTimer metrics.Timer
BuildStateDiffObjectTimer metrics.Timer BuildStateDiffObjectTimer metrics.Timer
WriteStateDiffTimer metrics.Timer WriteStateDiffTimer metrics.Timer
@ -120,8 +118,6 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
IPLDOutputTimer: metrics.NewTimer(), IPLDOutputTimer: metrics.NewTimer(),
DifferenceIteratorNextTimer: metrics.NewTimer(), DifferenceIteratorNextTimer: metrics.NewTimer(),
DifferenceIteratorCounter: metrics.NewCounter(), DifferenceIteratorCounter: metrics.NewCounter(),
DeletedOrUpdatedStorageTimer: metrics.NewTimer(),
CreatedAndUpdatedStorageTimer: metrics.NewTimer(),
BuildStorageNodesIncrementalTimer: metrics.NewTimer(), BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
BuildStateDiffObjectTimer: metrics.NewTimer(), BuildStateDiffObjectTimer: metrics.NewTimer(),
WriteStateDiffTimer: metrics.NewTimer(), WriteStateDiffTimer: metrics.NewTimer(),
@ -154,8 +150,6 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer) reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer)
reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer) reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer)
reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter) reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter)
reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer) reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer) reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffTimer) reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffTimer)