From 1d84d12f75883e34491404b51421eafb9e207c0a Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 3 Oct 2023 18:56:52 +0800 Subject: [PATCH] handle updated accounts in shared map there's a possibly (though yet unseen) that an updated account could be moved outside the domain of a bounded iterater, in which case it would not see the update after traversal. previously this would have caused an error, but this should prevent it from happening. --- builder.go | 101 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 65 insertions(+), 36 deletions(-) diff --git a/builder.go b/builder.go index 80cdf88..232a1a6 100644 --- a/builder.go +++ b/builder.go @@ -69,8 +69,20 @@ type accountUpdate struct { new sdtypes.AccountWrapper oldRoot common.Hash } + type accountUpdateMap map[string]*accountUpdate +type accountUpdateLens struct { + state accountUpdateMap + sync.Mutex +} + +func (l *accountUpdateLens) update(fn func(accountUpdateMap)) { + l.Lock() + defer l.Unlock() + fn(l.state) +} + func appender[T any](to *[]T) func(T) error { return func(a T) error { *to = append(*to, a) @@ -141,8 +153,11 @@ func (sdb *builder) WriteStateDiff( subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers)) subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) + updates := accountUpdateLens{ + state: make(accountUpdateMap), + } logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) - // errgroup will cancel if any group fails + // errgroup will cancel if any worker fails g, ctx := errgroup.WithContext(context.Background()) for i := uint(0); i < sdb.subtrieWorkers; i++ { func(subdiv uint) { @@ -152,12 +167,35 @@ func (sdb *builder) WriteStateDiff( return sdb.processAccounts(ctx, it, &it.SymmDiffState, params.watchedAddressesLeafPaths, - nodeSink, ipldSink, logger, + nodeSink, ipldSink, &updates, + logger, ) }) }(i) } - return g.Wait() + + if err = g.Wait(); err != nil { + return err + } + + for key, update := range updates.state { + var storageDiff []sdtypes.StorageLeafNode + err := sdb.processStorageUpdates( + update.oldRoot, update.new.Account.Root, + appender(&storageDiff), ipldSink, + ) + if err != nil { + return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err) + } + + if err = nodeSink(sdtypes.StateLeafNode{ + AccountWrapper: update.new, + StorageDiff: storageDiff, + }); err != nil { + return err + } + } + return nil } // WriteStateDiff writes a statediff object to output sinks @@ -191,7 +229,11 @@ func (sdb *builder) WriteStateSnapshot( subiters[i] = tracker.Tracked(subiters[i]) } } - // errgroup will cancel if any group fails + updates := accountUpdateLens{ + state: make(accountUpdateMap), + } + + // errgroup will cancel if any worker fails g, ctx := errgroup.WithContext(context.Background()) for i := range subiters { func(subdiv uint) { @@ -200,7 +242,8 @@ func (sdb *builder) WriteStateSnapshot( return sdb.processAccounts(ctx, subiters[subdiv], &symdiff, params.watchedAddressesLeafPaths, - nodeSink, ipldSink, log.DefaultLogger, + nodeSink, ipldSink, &updates, + log.DefaultLogger, ) }) }(uint(i)) @@ -215,13 +258,13 @@ func (sdb *builder) processAccounts( it trie.NodeIterator, symdiff *utils.SymmDiffState, watchedAddressesLeafPaths [][]byte, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, + updateLens *accountUpdateLens, logger log.Logger, ) error { logger.Trace("statediff/processAccounts BEGIN") defer metrics.ReportAndUpdateDuration("statediff/processAccounts END", time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer) - updates := make(accountUpdateMap) // Cache the RLP of the previous node. When we hit a value node this will be the parent blob. var prevBlob = it.NodeBlob() for it.Next(true) { @@ -245,12 +288,14 @@ func (sdb *builder) processAccounts( copy(leafKey, it.LeafKey()) if symdiff.CommonPath() { - // If B also contains this leaf node, this is the old state of an updated account. - if update, ok := updates[string(leafKey)]; ok { - update.oldRoot = account.Root - } else { - updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root} - } + updateLens.update(func(updates accountUpdateMap) { + // If B also contains this leaf node, this is the old state of an updated account. + if update, ok := updates[string(leafKey)]; ok { + update.oldRoot = account.Root + } else { + updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root} + } + }) } else { // This node was removed, meaning the account was deleted. Emit empty // "removed" records for the state node and all storage all storage slots. @@ -270,12 +315,14 @@ func (sdb *builder) processAccounts( } if symdiff.CommonPath() { - // If A also contains this leaf node, this is the new state of an updated account. - if update, ok := updates[string(accountW.LeafKey)]; ok { - update.new = *accountW - } else { - updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW} - } + updateLens.update(func(updates accountUpdateMap) { + // If A also contains this leaf node, this is the new state of an updated account. + if update, ok := updates[string(accountW.LeafKey)]; ok { + update.new = *accountW + } else { + updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW} + } + }) } else { // account was created err := sdb.processAccountCreation(accountW, ipldSink, nodeSink) if err != nil { @@ -318,24 +365,6 @@ func (sdb *builder) processAccounts( } prevBlob = nodeVal } - - for key, update := range updates { - var storageDiff []sdtypes.StorageLeafNode - err := sdb.processStorageUpdates( - update.oldRoot, update.new.Account.Root, - appender(&storageDiff), ipldSink, - ) - if err != nil { - return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err) - } - - if err = nodeSink(sdtypes.StateLeafNode{ - AccountWrapper: update.new, - StorageDiff: storageDiff, - }); err != nil { - return err - } - } return it.Error() }