diff --git a/builder.go b/builder.go index e495fec..e4011fe 100644 --- a/builder.go +++ b/builder.go @@ -27,6 +27,7 @@ import ( "time" iterutils "github.com/cerc-io/eth-iterator-utils" + "github.com/cerc-io/eth-iterator-utils/tracker" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -58,7 +59,7 @@ type Builder interface { WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error } -type StateDiffBuilder struct { +type builder struct { // state cache is safe for concurrent reads stateCache adapt.StateView subtrieWorkers uint @@ -89,14 +90,18 @@ func syncedAppender[T any](to *[]T) func(T) error { // NewBuilder is used to create a statediff builder func NewBuilder(stateCache adapt.StateView) Builder { - return &StateDiffBuilder{ + return &builder{ stateCache: stateCache, subtrieWorkers: defaultSubtrieWorkers, } } +func (sdb *builder) SetSubtrieWorkers(n uint) { + sdb.subtrieWorkers = n +} + // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters -func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { +func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer) var stateNodes []sdtypes.StateLeafNode var iplds []sdtypes.IPLD @@ -113,7 +118,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt } // WriteStateDiff writes a statediff object to output sinks -func (sdb *StateDiffBuilder) WriteStateDiff( +func (sdb *builder) WriteStateDiff( args Args, params Params, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, @@ -132,14 +137,82 @@ func (sdb *StateDiffBuilder) WriteStateDiff( subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) - // errgroup will cancel if any gr fails + // errgroup will cancel if any group fails g, ctx := errgroup.WithContext(context.Background()) for i := uint(0); i < sdb.subtrieWorkers; i++ { func(subdiv uint) { g.Go(func() error { a, b := subitersA[subdiv], subitersB[subdiv] + it, aux := utils.NewSymmetricDifferenceIterator(a, b) return sdb.processAccounts(ctx, - a, b, params.watchedAddressesLeafPaths, + it, aux, + params.watchedAddressesLeafPaths, + nodeSink, ipldSink, logger, + ) + }) + }(i) + } + return g.Wait() +} + +// WriteStateDiff writes a statediff object to output sinks +func (sdb *builder) WriteStateDiffTracked( + args Args, params Params, + nodeSink sdtypes.StateNodeSink, + ipldSink sdtypes.IPLDSink, + tracker *tracker.Tracker, +) error { + defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer) + // Load tries for old and new states + triea, err := sdb.stateCache.OpenTrie(args.OldStateRoot) + if err != nil { + return fmt.Errorf("error opening old state trie: %w", err) + } + trieb, err := sdb.stateCache.OpenTrie(args.NewStateRoot) + if err != nil { + return fmt.Errorf("error opening new state trie: %w", err) + } + + var subiters []trie.NodeIterator + var auxes []*utils.SymmDiffAux + // Constructor for difference iterator at a specific (recovered) path + makeIterator := func(key []byte) trie.NodeIterator { + a := triea.NodeIterator(key) + b := trieb.NodeIterator(key) + diffit, aux := utils.NewSymmetricDifferenceIterator(a, b) + // iterators are constructed in-order, so these will align + auxes = append(auxes, aux) + return diffit + } + subiters, err = tracker.Restore(makeIterator) + if err != nil { + return fmt.Errorf("error restoring iterators: %w", err) + } + + if subiters != nil { + if len(subiters) != int(sdb.subtrieWorkers) { + return fmt.Errorf("expected to restore %d iterators, got %d", sdb.subtrieWorkers, len(subiters)) + } + } else { + subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers)) + subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) + for i := 0; i < int(sdb.subtrieWorkers); i++ { + it, aux := utils.NewSymmetricDifferenceIterator(subitersA[i], subitersB[i]) + it = tracker.Tracked(it) + subiters = append(subiters, it) + auxes = append(auxes, aux) + } + } + logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) + // errgroup will cancel if any group fails + g, ctx := errgroup.WithContext(context.Background()) + for i := uint(0); i < sdb.subtrieWorkers; i++ { + func(subdiv uint) { + g.Go(func() error { + // a, b := subitersA[subdiv], subitersB[subdiv] + return sdb.processAccounts(ctx, + subiters[subdiv], auxes[subdiv], + params.watchedAddressesLeafPaths, nodeSink, ipldSink, logger, ) }) @@ -150,9 +223,10 @@ func (sdb *StateDiffBuilder) WriteStateDiff( // processAccounts processes account creations and deletions, and returns a set of updated // existing accounts, indexed by leaf key. -func (sdb *StateDiffBuilder) processAccounts( +func (sdb *builder) processAccounts( ctx context.Context, - a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte, + it trie.NodeIterator, aux *utils.SymmDiffAux, + watchedAddressesLeafPaths [][]byte, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, logger log.Logger, ) error { @@ -163,7 +237,6 @@ func (sdb *StateDiffBuilder) processAccounts( updates := make(accountUpdateMap) // Cache the RLP of the previous node. When we hit a value node this will be the parent blob. var prevBlob []byte - it, itCount := utils.NewSymmetricDifferenceIterator(a, b) prevBlob = it.NodeBlob() for it.Next(true) { select { @@ -176,7 +249,7 @@ func (sdb *StateDiffBuilder) processAccounts( if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { continue } - if it.FromA() { // Node exists in the old trie + if aux.FromA() { // Node exists in the old trie if it.Leaf() { var account types.StateAccount if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { @@ -185,7 +258,7 @@ func (sdb *StateDiffBuilder) processAccounts( leafKey := make([]byte, len(it.LeafKey())) copy(leafKey, it.LeafKey()) - if it.CommonPath() { + if aux.CommonPath() { // If B also contains this leaf node, this is the old state of an updated account. if update, ok := updates[string(leafKey)]; ok { update.oldRoot = account.Root @@ -210,7 +283,7 @@ func (sdb *StateDiffBuilder) processAccounts( return err } - if it.CommonPath() { + if aux.CommonPath() { // If A also contains this leaf node, this is the new state of an updated account. if update, ok := updates[string(accountW.LeafKey)]; ok { update.new = *accountW @@ -279,11 +352,11 @@ func (sdb *StateDiffBuilder) processAccounts( } } - metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) + metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(aux.Count())) return it.Error() } -func (sdb *StateDiffBuilder) processAccountDeletion( +func (sdb *builder) processAccountDeletion( leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink, ) error { diff := sdtypes.StateLeafNode{ @@ -300,7 +373,7 @@ func (sdb *StateDiffBuilder) processAccountDeletion( return nodeSink(diff) } -func (sdb *StateDiffBuilder) processAccountCreation( +func (sdb *builder) processAccountCreation( accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink, ) error { diff := sdtypes.StateLeafNode{ @@ -331,7 +404,7 @@ func (sdb *StateDiffBuilder) processAccountCreation( // decodes account at leaf and encodes RLP data to CID // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. -func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { +func (sdb *builder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { var account types.StateAccount if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err) @@ -348,7 +421,7 @@ func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob [] // processStorageCreations processes the storage node records for a newly created account // i.e. it returns all the storage nodes at this state, since there is no previous state. -func (sdb *StateDiffBuilder) processStorageCreations( +func (sdb *builder) processStorageCreations( sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink, ) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer) @@ -386,7 +459,7 @@ func (sdb *StateDiffBuilder) processStorageCreations( } // processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A -func (sdb *StateDiffBuilder) processStorageUpdates( +func (sdb *builder) processStorageUpdates( oldroot common.Hash, newroot common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink, @@ -407,10 +480,10 @@ func (sdb *StateDiffBuilder) processStorageUpdates( var prevBlob []byte a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil) - it, _ := utils.NewSymmetricDifferenceIterator(a, b) + it, aux := utils.NewSymmetricDifferenceIterator(a, b) for it.Next(true) { - if it.FromA() { - if it.Leaf() && !it.CommonPath() { + if aux.FromA() { + if it.Leaf() && !aux.CommonPath() { // If this node's leaf key is absent from B, the storage slot was vacated. // In that case, emit an empty "removed" storage node record. if err := storageSink(sdtypes.StorageLeafNode{ @@ -448,10 +521,10 @@ func (sdb *StateDiffBuilder) processStorageUpdates( } // processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account -func (sdb *StateDiffBuilder) processRemovedAccountStorage( +func (sdb *builder) processRemovedAccountStorage( sr common.Hash, storageSink sdtypes.StorageNodeSink, ) error { - defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedAccountStorageNodesTimer) + defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer) if sr == emptyContractRoot { return nil } @@ -482,7 +555,7 @@ func (sdb *StateDiffBuilder) processRemovedAccountStorage( // decodes slot at leaf and encodes RLP data to CID // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. -func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { +func (sdb *builder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { leafKey := make([]byte, len(it.LeafKey())) copy(leafKey, it.LeafKey()) value := make([]byte, len(it.LeafBlob())) diff --git a/service.go b/service.go index 472b689..3c7d8f6 100644 --- a/service.go +++ b/service.go @@ -822,6 +822,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p return err } + // TODO: review/remove the need to sync here var nodeMtx, ipldMtx sync.Mutex nodeSink := func(node types2.StateLeafNode) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)