diff --git a/builder.go b/builder.go index fc5aed3..4a896d0 100644 --- a/builder.go +++ b/builder.go @@ -27,6 +27,7 @@ import ( "time" iterutils "github.com/cerc-io/eth-iterator-utils" + "github.com/cerc-io/eth-iterator-utils/tracker" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -58,7 +59,7 @@ type Builder interface { WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error } -type StateDiffBuilder struct { +type builder struct { // state cache is safe for concurrent reads stateCache adapt.StateView subtrieWorkers uint @@ -88,8 +89,8 @@ func syncedAppender[T any](to *[]T) func(T) error { } // NewBuilder is used to create a statediff builder -func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder { - return &StateDiffBuilder{ +func NewBuilder(stateCache adapt.StateView) Builder { + return &builder{ stateCache: stateCache, subtrieWorkers: defaultSubtrieWorkers, } @@ -97,7 +98,7 @@ func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder { // SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers. // Passing 0 will reset this to the default value. -func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) { +func (sdb *builder) SetSubtrieWorkers(n uint) { if n == 0 { n = defaultSubtrieWorkers } @@ -105,7 +106,7 @@ func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) { } // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters -func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { +func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer) var stateNodes []sdtypes.StateLeafNode var iplds []sdtypes.IPLD @@ -122,7 +123,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt } // WriteStateDiff writes a statediff object to output sinks -func (sdb *StateDiffBuilder) WriteStateDiff( +func (sdb *builder) WriteStateDiff( args Args, params Params, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, @@ -141,14 +142,82 @@ func (sdb *StateDiffBuilder) WriteStateDiff( subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) - // errgroup will cancel if any gr fails + // errgroup will cancel if any group fails g, ctx := errgroup.WithContext(context.Background()) for i := uint(0); i < sdb.subtrieWorkers; i++ { func(subdiv uint) { g.Go(func() error { a, b := subitersA[subdiv], subitersB[subdiv] + it, aux := utils.NewSymmetricDifferenceIterator(a, b) return sdb.processAccounts(ctx, - a, b, params.watchedAddressesLeafPaths, + it, aux, + params.watchedAddressesLeafPaths, + nodeSink, ipldSink, logger, + ) + }) + }(i) + } + return g.Wait() +} + +// WriteStateDiff writes a statediff object to output sinks +func (sdb *builder) WriteStateDiffTracked( + args Args, params Params, + nodeSink sdtypes.StateNodeSink, + ipldSink sdtypes.IPLDSink, + tracker *tracker.Tracker, +) error { + defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer) + // Load tries for old and new states + triea, err := sdb.stateCache.OpenTrie(args.OldStateRoot) + if err != nil { + return fmt.Errorf("error opening old state trie: %w", err) + } + trieb, err := sdb.stateCache.OpenTrie(args.NewStateRoot) + if err != nil { + return fmt.Errorf("error opening new state trie: %w", err) + } + + var subiters []trie.NodeIterator + var auxes []*utils.SymmDiffAux + // Constructor for difference iterator at a specific (recovered) path + makeIterator := func(key []byte) trie.NodeIterator { + a := triea.NodeIterator(key) + b := trieb.NodeIterator(key) + diffit, aux := utils.NewSymmetricDifferenceIterator(a, b) + // iterators are constructed in-order, so these will align + auxes = append(auxes, aux) + return diffit + } + subiters, err = tracker.Restore(makeIterator) + if err != nil { + return fmt.Errorf("error restoring iterators: %w", err) + } + + if subiters != nil { + if len(subiters) != int(sdb.subtrieWorkers) { + return fmt.Errorf("expected to restore %d iterators, got %d", sdb.subtrieWorkers, len(subiters)) + } + } else { + subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers)) + subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) + for i := 0; i < int(sdb.subtrieWorkers); i++ { + it, aux := utils.NewSymmetricDifferenceIterator(subitersA[i], subitersB[i]) + it = tracker.Tracked(it) + subiters = append(subiters, it) + auxes = append(auxes, aux) + } + } + logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) + // errgroup will cancel if any group fails + g, ctx := errgroup.WithContext(context.Background()) + for i := uint(0); i < sdb.subtrieWorkers; i++ { + func(subdiv uint) { + g.Go(func() error { + // a, b := subitersA[subdiv], subitersB[subdiv] + return sdb.processAccounts(ctx, + subiters[subdiv], auxes[subdiv], + params.watchedAddressesLeafPaths, nodeSink, ipldSink, logger, ) }) @@ -159,9 +228,10 @@ func (sdb *StateDiffBuilder) WriteStateDiff( // processAccounts processes account creations and deletions, and returns a set of updated // existing accounts, indexed by leaf key. -func (sdb *StateDiffBuilder) processAccounts( +func (sdb *builder) processAccounts( ctx context.Context, - a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte, + it trie.NodeIterator, aux *utils.SymmDiffAux, + watchedAddressesLeafPaths [][]byte, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, logger log.Logger, ) error { @@ -172,7 +242,6 @@ func (sdb *StateDiffBuilder) processAccounts( updates := make(accountUpdateMap) // Cache the RLP of the previous node. When we hit a value node this will be the parent blob. var prevBlob []byte - it, itCount := utils.NewSymmetricDifferenceIterator(a, b) prevBlob = it.NodeBlob() for it.Next(true) { select { @@ -185,7 +254,7 @@ func (sdb *StateDiffBuilder) processAccounts( if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { continue } - if it.FromA() { // Node exists in the old trie + if aux.FromA() { // Node exists in the old trie if it.Leaf() { var account types.StateAccount if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { @@ -194,7 +263,7 @@ func (sdb *StateDiffBuilder) processAccounts( leafKey := make([]byte, len(it.LeafKey())) copy(leafKey, it.LeafKey()) - if it.CommonPath() { + if aux.CommonPath() { // If B also contains this leaf node, this is the old state of an updated account. if update, ok := updates[string(leafKey)]; ok { update.oldRoot = account.Root @@ -219,7 +288,7 @@ func (sdb *StateDiffBuilder) processAccounts( return err } - if it.CommonPath() { + if aux.CommonPath() { // If A also contains this leaf node, this is the new state of an updated account. if update, ok := updates[string(accountW.LeafKey)]; ok { update.new = *accountW @@ -288,11 +357,11 @@ func (sdb *StateDiffBuilder) processAccounts( } } - metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) + metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(aux.Count())) return it.Error() } -func (sdb *StateDiffBuilder) processAccountDeletion( +func (sdb *builder) processAccountDeletion( leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink, ) error { diff := sdtypes.StateLeafNode{ @@ -309,7 +378,7 @@ func (sdb *StateDiffBuilder) processAccountDeletion( return nodeSink(diff) } -func (sdb *StateDiffBuilder) processAccountCreation( +func (sdb *builder) processAccountCreation( accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink, ) error { diff := sdtypes.StateLeafNode{ @@ -340,7 +409,7 @@ func (sdb *StateDiffBuilder) processAccountCreation( // decodes account at leaf and encodes RLP data to CID // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. -func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { +func (sdb *builder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { var account types.StateAccount if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err) @@ -357,7 +426,7 @@ func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob [] // processStorageCreations processes the storage node records for a newly created account // i.e. it returns all the storage nodes at this state, since there is no previous state. -func (sdb *StateDiffBuilder) processStorageCreations( +func (sdb *builder) processStorageCreations( sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink, ) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer) @@ -395,7 +464,7 @@ func (sdb *StateDiffBuilder) processStorageCreations( } // processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A -func (sdb *StateDiffBuilder) processStorageUpdates( +func (sdb *builder) processStorageUpdates( oldroot common.Hash, newroot common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink, @@ -416,10 +485,10 @@ func (sdb *StateDiffBuilder) processStorageUpdates( var prevBlob []byte a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil) - it, _ := utils.NewSymmetricDifferenceIterator(a, b) + it, aux := utils.NewSymmetricDifferenceIterator(a, b) for it.Next(true) { - if it.FromA() { - if it.Leaf() && !it.CommonPath() { + if aux.FromA() { + if it.Leaf() && !aux.CommonPath() { // If this node's leaf key is absent from B, the storage slot was vacated. // In that case, emit an empty "removed" storage node record. if err := storageSink(sdtypes.StorageLeafNode{ @@ -457,7 +526,7 @@ func (sdb *StateDiffBuilder) processStorageUpdates( } // processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account -func (sdb *StateDiffBuilder) processRemovedAccountStorage( +func (sdb *builder) processRemovedAccountStorage( sr common.Hash, storageSink sdtypes.StorageNodeSink, ) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer) @@ -491,7 +560,7 @@ func (sdb *StateDiffBuilder) processRemovedAccountStorage( // decodes slot at leaf and encodes RLP data to CID // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. -func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { +func (sdb *builder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { leafKey := make([]byte, len(it.LeafKey())) copy(leafKey, it.LeafKey()) value := make([]byte, len(it.LeafBlob())) diff --git a/service.go b/service.go index 604b2a6..6a15311 100644 --- a/service.go +++ b/service.go @@ -824,6 +824,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p return err } + // TODO: review/remove the need to sync here var nodeMtx, ipldMtx sync.Mutex nodeSink := func(node types2.StateLeafNode) error { defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)