Use symmetric difference iterator #11

Merged
roysc merged 27 commits from symmetric-diff-iterator into main 2023-09-20 03:22:19 +00:00
3 changed files with 78 additions and 184 deletions
Showing only changes of commit 91549f449c - Show all commits

View File

@ -138,7 +138,9 @@ func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
logger log.Logger,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.CreatedAndUpdatedStateTimer)
logger.Trace("statediff/processAccounts BEGIN")
defer metrics.ReportAndUpdateDuration("statediff/processAccounts END",
time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer)
updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
@ -235,7 +237,7 @@ func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
for key, update := range updates {
var storageDiff []sdtypes.StorageLeafNode
err := sdb.processUpdatedAccountStorage(
err := sdb.processStorageUpdates(
update.oldRoot, update.new.Account.Root,
appender(&storageDiff), ipldSink,
)
@ -280,7 +282,7 @@ func (sdb *StateDiffBuilder) processAccountCreation(
}
if !bytes.Equal(accountW.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
err := sdb.processCreatedAccountStorage(accountW.Account.Root, appender(&diff.StorageDiff), ipldSink)
err := sdb.processStorageCreations(accountW.Account.Root, appender(&diff.StorageDiff), ipldSink)
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", accountW.LeafKey, err)
}
@ -318,12 +320,12 @@ func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []
}, nil
}
// processCreatedAccountStorage processes the storage node records for a newly created account
// processStorageCreations processes the storage node records for a newly created account
// i.e. it returns all the storage nodes at this state, since there is no previous state.
func (sdb *StateDiffBuilder) processCreatedAccountStorage(
func (sdb *StateDiffBuilder) processStorageCreations(
sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesEventualTimer)
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer)
if sr == emptyContractRoot {
return nil
}
@ -357,45 +359,13 @@ func (sdb *StateDiffBuilder) processCreatedAccountStorage(
return it.Error()
}
// processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) processRemovedAccountStorage(
sr common.Hash, storageSink sdtypes.StorageNodeSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedAccountStorageNodesTimer)
if sr == emptyContractRoot {
return nil
}
log.Debug("Storage root for removed diffs", "root", sr)
sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build removed account storage diffs", "error", err)
return err
}
it := sTrie.NodeIterator(nil)
for it.Next(true) {
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
Value: []byte{},
}); err != nil {
return err
}
}
}
return it.Error()
}
// processUpdatedAccountStorage builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) processUpdatedAccountStorage(
// processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) processStorageUpdates(
oldroot common.Hash, newroot common.Hash,
storageSink sdtypes.StorageNodeSink,
ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesIncrementalTimer)
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageUpdatesTimer)
if newroot == oldroot {
return nil
}
@ -451,6 +421,38 @@ func (sdb *StateDiffBuilder) processUpdatedAccountStorage(
return it.Error()
}
// processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) processRemovedAccountStorage(
sr common.Hash, storageSink sdtypes.StorageNodeSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer)
if sr == emptyContractRoot {
return nil
}
log.Debug("Storage root for removed diffs", "root", sr)
sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build removed account storage diffs", "error", err)
return err
}
it := sTrie.NodeIterator(nil)
for it.Next(true) {
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
Value: []byte{},
}); err != nil {
return err
}
}
}
return it.Error()
}
// decodes slot at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.

View File

@ -72,24 +72,15 @@ type IndexerMetricsHandles struct {
StateStoreCodeProcessingTimer metrics.Timer
// Fine-grained code timers
BuildStateDiffTimer metrics.Timer
CreatedAndUpdatedStateTimer metrics.Timer
DeletedStateTimer metrics.Timer
BuildAccountUpdatesTimer metrics.Timer
BuildAccountCreationsTimer metrics.Timer
ResolveNodeTimer metrics.Timer
SortKeysTimer metrics.Timer
FindIntersectionTimer metrics.Timer
ProcessAccountsTimer metrics.Timer
OutputTimer metrics.Timer
IPLDOutputTimer metrics.Timer
DifferenceIteratorNextTimer metrics.Timer
DifferenceIteratorCounter metrics.Counter
BuildStorageNodesIncrementalTimer metrics.Timer
BuildStateDiffObjectTimer metrics.Timer
WriteStateDiffTimer metrics.Timer
BuildStorageNodesEventualTimer metrics.Timer
BuildRemovedAccountStorageNodesTimer metrics.Timer
BuildRemovedStorageNodesFromTrieTimer metrics.Timer
ProcessStorageUpdatesTimer metrics.Timer
ProcessStorageCreationsTimer metrics.Timer
ProcessRemovedAccountStorageTimer metrics.Timer
IsWatchedAddressTimer metrics.Timer
}
@ -106,24 +97,15 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
UncleProcessingTimer: metrics.NewTimer(),
TxAndRecProcessingTimer: metrics.NewTimer(),
StateStoreCodeProcessingTimer: metrics.NewTimer(),
BuildStateDiffTimer: metrics.NewTimer(),
CreatedAndUpdatedStateTimer: metrics.NewTimer(),
DeletedStateTimer: metrics.NewTimer(),
BuildAccountUpdatesTimer: metrics.NewTimer(),
BuildAccountCreationsTimer: metrics.NewTimer(),
ResolveNodeTimer: metrics.NewTimer(),
SortKeysTimer: metrics.NewTimer(),
FindIntersectionTimer: metrics.NewTimer(),
ProcessAccountsTimer: metrics.NewTimer(),
OutputTimer: metrics.NewTimer(),
IPLDOutputTimer: metrics.NewTimer(),
DifferenceIteratorNextTimer: metrics.NewTimer(),
DifferenceIteratorCounter: metrics.NewCounter(),
BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
BuildStateDiffObjectTimer: metrics.NewTimer(),
WriteStateDiffTimer: metrics.NewTimer(),
BuildStorageNodesEventualTimer: metrics.NewTimer(),
BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(),
BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(),
ProcessStorageUpdatesTimer: metrics.NewTimer(),
ProcessStorageCreationsTimer: metrics.NewTimer(),
ProcessRemovedAccountStorageTimer: metrics.NewTimer(),
IsWatchedAddressTimer: metrics.NewTimer(),
}
subsys := "indexer"
@ -138,25 +120,15 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
reg.Register(metricName(subsys, "t_build_statediff"), ctx.BuildStateDiffTimer)
reg.Register(metricName(subsys, "t_created_and_update_state"), ctx.CreatedAndUpdatedStateTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedStateTimer)
reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer)
reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer)
reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer)
reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer)
reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer)
reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer)
reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer)
reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer)
reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter)
reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffTimer)
reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer)
reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer)
reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer)
reg.Register(metricName(subsys, "t_process_accounts"), ctx.ProcessAccountsTimer)
reg.Register(metricName(subsys, "t_process_storage_updates"), ctx.ProcessStorageUpdatesTimer)
reg.Register(metricName(subsys, "t_process_storage_creations"), ctx.ProcessStorageCreationsTimer)
reg.Register(metricName(subsys, "t_process_removed_account_storage"), ctx.ProcessRemovedAccountStorageTimer)
reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer)
log.Debug("Registering statediff indexer metrics.")

View File

@ -1,80 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains a batch of utility type declarations used by the tests. As the node
// operates on unique types, a lot of them are needed to check various features.
package trie_helpers
import (
"sort"
"strings"
"time"
metrics2 "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/types"
)
// SortKeys sorts the keys in the account map
func SortKeys(data types.AccountMap) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer)
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
}
sort.Strings(keys)
return keys
}
// FindIntersection finds the set of strings from both arrays that are equivalent
// a and b must first be sorted
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
func FindIntersection(a, b []string) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer)
lenA := len(a)
lenB := len(b)
iOfA, iOfB := 0, 0
updates := make([]string, 0)
if iOfA >= lenA || iOfB >= lenB {
return updates
}
for {
switch strings.Compare(a[iOfA], b[iOfB]) {
// -1 when a[iOfA] < b[iOfB]
case -1:
iOfA++
if iOfA >= lenA {
return updates
}
// 0 when a[iOfA] == b[iOfB]
case 0:
updates = append(updates, a[iOfA])
iOfA++
iOfB++
if iOfA >= lenA || iOfB >= lenB {
return updates
}
// 1 when a[iOfA] > b[iOfB]
case 1:
iOfB++
if iOfB >= lenB {
return updates
}
}
}
}