From 6de232666619b50051a8be5f3149423bd488bc2f Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 24 Jul 2023 01:00:37 +0800 Subject: [PATCH] [wip] use symmetric difference iterator --- builder.go | 238 +++++++++++----------------- indexer/database/metrics/metrics.go | 6 - 2 files changed, 93 insertions(+), 151 deletions(-) diff --git a/builder.go b/builder.go index 456d5bb..362df49 100644 --- a/builder.go +++ b/builder.go @@ -120,47 +120,27 @@ func (sdb *StateDiffBuilder) WriteStateDiff(args Args, params Params, nodeSink s return fmt.Errorf("error creating trie for newStateRoot: %w", err) } - // we do two state trie iterations: - // one for new/updated nodes, - // one for deleted/updated nodes; - // prepare 2 iterator instances for each task - iterPairs := []IterPair{ - { - Older: oldTrie.NodeIterator([]byte{}), - Newer: newTrie.NodeIterator([]byte{}), - }, - { - Older: oldTrie.NodeIterator([]byte{}), - Newer: newTrie.NodeIterator([]byte{}), - }, + iters := IterPair{ + Older: oldTrie.NodeIterator(nil), + Newer: newTrie.NodeIterator(nil), } - logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) - return sdb.BuildStateDiff(iterPairs, params, nodeSink, ipldSink, logger) + return sdb.BuildStateDiff(iters, params, nodeSink, ipldSink, logger) } -func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params, - sinkNode sdtypes.StateNodeSink, sinkIpld sdtypes.IPLDSink, logger log.Logger) error { +func (sdb *StateDiffBuilder) BuildStateDiff(iterPair IterPair, params Params, + nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, logger log.Logger) error { logger.Trace("statediff BEGIN BuildStateDiff") defer metrics.ReportAndUpdateDuration("statediff END BuildStateDiff", time.Now(), logger, metrics.IndexerMetrics.BuildStateDiffTimer) // collect a slice of all the nodes that were touched and exist at B (B-A) // a map of their leafkey to all the accounts that were touched and exist at B // and a slice of all the paths for the nodes in both of the above sets - diffAccountsAtB, err := sdb.createdAndUpdatedState( - iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, sinkIpld, logger) + diffAccountsAtB, diffAccountsAtA, err := sdb.createdAndUpdatedState( + iterPair.Older, iterPair.Newer, params.watchedAddressesLeafPaths, nodeSink, ipldSink, logger) if err != nil { return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err) } - // collect a slice of all the nodes that existed at a path in A that doesn't exist in B - // a map of their leafkey to all the accounts that were touched and exist at A - diffAccountsAtA, err := sdb.deletedState( - iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB, - params.watchedAddressesLeafPaths, sinkNode, logger) - if err != nil { - return fmt.Errorf("error collecting deletedOrUpdatedNodes: %w", err) - } - // collect and sort the leafkey keys for both account mappings into a slice t := time.Now() createKeys := trie_helpers.SortKeys(diffAccountsAtB) @@ -178,12 +158,12 @@ func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params, "duration", time.Since(t)) // build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two - err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, sinkNode, sinkIpld, logger) + err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, nodeSink, ipldSink, logger) if err != nil { return fmt.Errorf("error building diff for updated accounts: %w", err) } // build the diff nodes for created accounts - err = sdb.buildAccountCreations(diffAccountsAtB, sinkNode, sinkIpld, logger) + err = sdb.buildAccountCreations(diffAccountsAtB, nodeSink, ipldSink, logger) if err != nil { return fmt.Errorf("error building diff for created accounts: %w", err) } @@ -195,25 +175,46 @@ func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params, // a mapping of their leafkeys to all the accounts that exist in a different state at B than A // and a slice of the paths for all of the nodes included in both func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, - watchedAddressesLeafPaths [][]byte, ipldSink sdtypes.IPLDSink, logger log.Logger) (sdtypes.AccountMap, error) { + watchedAddressesLeafPaths [][]byte, + nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, + logger log.Logger, +) (sdtypes.AccountMap, sdtypes.AccountMap, error) { logger.Trace("statediff BEGIN createdAndUpdatedState") defer metrics.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics.IndexerMetrics.CreatedAndUpdatedStateTimer) diffAccountsAtB := make(sdtypes.AccountMap) + diffAccountsAtA := make(sdtypes.AccountMap) - // cache the RLP of the previous node, so when we hit a leaf we have the parent (containing) node - var prevBlob []byte - it, itCount := trie.NewDifferenceIterator(a, b) + // Cache the RLP of the previous node on A and B. When we hit a value node this will be the parent blob. + var prevBlobFromA, prevBlobFromB []byte + it, itCount := utils.NewSymmetricDifferenceIterator(a, b) for it.Next(true) { // ignore node if it is not along paths of interest if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { continue } + if it.FromA() { + if it.Leaf() { + accountW, err := sdb.processStateValueNode(it, prevBlobFromB) + if err != nil { + return nil, nil, err + } + if accountW == nil { + continue + } + leafKey := hex.EncodeToString(accountW.LeafKey) + diffAccountsAtA[leafKey] = *accountW + } else { + prevBlobFromB = make([]byte, len(it.NodeBlob())) + copy(prevBlobFromB, it.NodeBlob()) + } + continue + } // index values by leaf key if it.Leaf() { // if it is a "value" node, we will index the value by leaf key - accountW, err := sdb.processStateValueNode(it, prevBlob) + accountW, err := sdb.processStateValueNode(it, prevBlobFromA) if err != nil { - return nil, err + return nil, nil, err } if accountW == nil { continue @@ -235,11 +236,11 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, if len(watchedAddressesLeafPaths) > 0 { var elements []interface{} if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { - return nil, err + return nil, nil, err } ok, err := isLeaf(elements) if err != nil { - return nil, err + return nil, nil, err } partialPath := utils.CompactToHex(elements[0].([]byte)) valueNodePath := append(it.Path(), partialPath...) @@ -251,14 +252,40 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(), Content: nodeVal, }); err != nil { - return nil, err + return nil, nil, err } - prevBlob = nodeVal + prevBlobFromA = nodeVal + } + } + + for leafKey, accountW := range diffAccountsAtA { + if _, ok := diffAccountsAtB[leafKey]; ok { + continue + } + // if this node's leaf key did not show up in diffAccountsAtB + // that means the account was deleted + // in that case, emit an empty "removed" diff state node + // include empty "removed" diff storage nodes for all the storage slots + diff := sdtypes.StateLeafNode{ + AccountWrapper: sdtypes.AccountWrapper{ + Account: nil, + LeafKey: accountW.LeafKey, + CID: shared.RemovedNodeStateCID, + }, + Removed: true, + } + + err := sdb.buildRemovedAccountStorageNodes(accountW.Account.Root, StorageNodeAppender(&diff.StorageDiff)) + if err != nil { + return nil, nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err) + } + if err := nodeSink(diff); err != nil { + return nil, nil, err } } logger.Debug("statediff COUNTS createdAndUpdatedState", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB)) metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) - return diffAccountsAtB, it.Error() + return diffAccountsAtB, diffAccountsAtA, it.Error() } // decodes account at leaf and encodes RLP data to CID @@ -277,63 +304,6 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentB }, nil } -// deletedState returns a slice of all the paths that are emptied at B -// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B -func (sdb *StateDiffBuilder) deletedState(a, b trie.NodeIterator, diffAccountsAtB sdtypes.AccountMap, - watchedAddressesLeafPaths [][]byte, nodeSink sdtypes.StateNodeSink, logger log.Logger) (sdtypes.AccountMap, error) { - logger.Trace("statediff BEGIN deletedState") - defer metrics.ReportAndUpdateDuration("statediff END deletedState", time.Now(), logger, metrics.IndexerMetrics.DeletedStateTimer) - diffAccountAtA := make(sdtypes.AccountMap) - - var prevBlob []byte - it, _ := trie.NewDifferenceIterator(b, a) - for it.Next(true) { - if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { - continue - } - - if it.Leaf() { - accountW, err := sdb.processStateValueNode(it, prevBlob) - if err != nil { - return nil, err - } - if accountW == nil { - continue - } - leafKey := hex.EncodeToString(accountW.LeafKey) - diffAccountAtA[leafKey] = *accountW - // if this node's leaf key did not show up in diffAccountsAtB - // that means the account was deleted - // in that case, emit an empty "removed" diff state node - // include empty "removed" diff storage nodes for all the storage slots - if _, ok := diffAccountsAtB[leafKey]; !ok { - diff := sdtypes.StateLeafNode{ - AccountWrapper: sdtypes.AccountWrapper{ - Account: nil, - LeafKey: accountW.LeafKey, - CID: shared.RemovedNodeStateCID, - }, - Removed: true, - } - - storageDiff := make([]sdtypes.StorageLeafNode, 0) - err := sdb.buildRemovedAccountStorageNodes(accountW.Account.Root, StorageNodeAppender(&storageDiff)) - if err != nil { - return nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err) - } - diff.StorageDiff = storageDiff - if err := nodeSink(diff); err != nil { - return nil, err - } - } - } else { - prevBlob = make([]byte, len(it.NodeBlob())) - copy(prevBlob, it.NodeBlob()) - } - } - return diffAccountAtA, it.Error() -} - // buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys // to generate the statediff node objects for all of the accounts that existed at both A and B but in different states // needs to be called before building account creations and deletions as this mutates @@ -348,7 +318,7 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.Ac for _, key := range updatedKeys { createdAcc := creations[key] deletedAcc := deletions[key] - storageDiff := make([]sdtypes.StorageLeafNode, 0) + var storageDiff []sdtypes.StorageLeafNode if deletedAcc.Account != nil && createdAcc.Account != nil { err = sdb.buildStorageNodesIncremental( deletedAcc.Account.Root, createdAcc.Account.Root, @@ -360,7 +330,6 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.Ac } if err = nodeSink(sdtypes.StateLeafNode{ AccountWrapper: createdAcc, - Removed: false, StorageDiff: storageDiff, }); err != nil { return err @@ -382,16 +351,13 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts sdtypes.AccountMap, for _, val := range accounts { diff := sdtypes.StateLeafNode{ AccountWrapper: val, - Removed: false, } if !bytes.Equal(val.Account.CodeHash, nullCodeHash) { // For contract creations, any storage node contained is a diff - storageDiff := make([]sdtypes.StorageLeafNode, 0) - err := sdb.buildStorageNodesEventual(val.Account.Root, StorageNodeAppender(&storageDiff), ipldSink) + err := sdb.buildStorageNodesEventual(val.Account.Root, StorageNodeAppender(&diff.StorageDiff), ipldSink) if err != nil { return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", val.LeafKey, err) } - diff.StorageDiff = storageDiff // emit codehash => code mappings for contract codeHash := common.BytesToHash(val.Account.CodeHash) code, err := sdb.stateCache.ContractCode(codeHash) @@ -520,29 +486,23 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldroot common.Hash, n return err } - diffSlotsAtB, err := sdb.createdAndUpdatedStorage( - oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil), storageSink, ipldSink) - if err != nil { - return err - } - return sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil), - diffSlotsAtB, storageSink) -} - -func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, storageSink sdtypes.StorageNodeSink, - ipldSink sdtypes.IPLDSink) (map[string]bool, error) { - defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.CreatedAndUpdatedStorageTimer) - diffSlotsAtB := make(map[string]bool) - + a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil) + changedSlots := make(map[string]bool) var prevBlob []byte - it, _ := trie.NewDifferenceIterator(a, b) + it, _ := utils.NewSymmetricDifferenceIterator(a, b) for it.Next(true) { + if it.FromA() { + if it.Leaf() { + changedSlots[string(it.LeafKey())] = true + } + continue + } if it.Leaf() { storageLeafNode := sdb.processStorageValueNode(it, prevBlob) if err := storageSink(storageLeafNode); err != nil { - return nil, err + return err } - diffSlotsAtB[hex.EncodeToString(storageLeafNode.LeafKey)] = true + delete(changedSlots, string(storageLeafNode.LeafKey)) } else { if it.Hash() == zeroHash { continue @@ -553,34 +513,22 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, st CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(), Content: nodeVal, }); err != nil { - return nil, err + return err } prevBlob = nodeVal } } - return diffSlotsAtB, it.Error() -} - -func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, storageSink sdtypes.StorageNodeSink) error { - defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DeletedOrUpdatedStorageTimer) - it, _ := trie.NewDifferenceIterator(b, a) - for it.Next(true) { - if it.Leaf() { - leafKey := make([]byte, len(it.LeafKey())) - copy(leafKey, it.LeafKey()) - // if this node's leaf key did not show up in diffSlotsAtB - // that means the storage slot was vacated - // in that case, emit an empty "removed" diff storage node - if _, ok := diffSlotsAtB[hex.EncodeToString(leafKey)]; !ok { - if err := storageSink(sdtypes.StorageLeafNode{ - CID: shared.RemovedNodeStorageCID, - Removed: true, - LeafKey: leafKey, - Value: []byte{}, - }); err != nil { - return err - } - } + for leafKey := range changedSlots { + // if this node's leaf key did not show up in diffSlotsAtB + // that means the storage slot was vacated + // in that case, emit an empty "removed" diff storage node + if err := storageSink(sdtypes.StorageLeafNode{ + CID: shared.RemovedNodeStorageCID, + Removed: true, + LeafKey: []byte(leafKey), + Value: []byte{}, + }); err != nil { + return err } } return it.Error() diff --git a/indexer/database/metrics/metrics.go b/indexer/database/metrics/metrics.go index 34fbaba..f377bcb 100644 --- a/indexer/database/metrics/metrics.go +++ b/indexer/database/metrics/metrics.go @@ -84,8 +84,6 @@ type IndexerMetricsHandles struct { IPLDOutputTimer metrics.Timer DifferenceIteratorNextTimer metrics.Timer DifferenceIteratorCounter metrics.Counter - DeletedOrUpdatedStorageTimer metrics.Timer - CreatedAndUpdatedStorageTimer metrics.Timer BuildStorageNodesIncrementalTimer metrics.Timer BuildStateDiffObjectTimer metrics.Timer WriteStateDiffTimer metrics.Timer @@ -120,8 +118,6 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { IPLDOutputTimer: metrics.NewTimer(), DifferenceIteratorNextTimer: metrics.NewTimer(), DifferenceIteratorCounter: metrics.NewCounter(), - DeletedOrUpdatedStorageTimer: metrics.NewTimer(), - CreatedAndUpdatedStorageTimer: metrics.NewTimer(), BuildStorageNodesIncrementalTimer: metrics.NewTimer(), BuildStateDiffObjectTimer: metrics.NewTimer(), WriteStateDiffTimer: metrics.NewTimer(), @@ -154,8 +150,6 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer) reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer) reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter) - reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer) - reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer) reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer) reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer) reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffTimer)