diff --git a/builder.go b/builder.go index e4011fe..2e56ba5 100644 --- a/builder.go +++ b/builder.go @@ -50,7 +50,7 @@ var ( nullCodeHash = crypto.Keccak256([]byte{}) zeroHash common.Hash - defaultSubtrieWorkers uint = 8 + defaultSubtrieWorkers uint = 1 ) // Builder interface exposes the method for building a state diff between two blocks @@ -89,14 +89,19 @@ func syncedAppender[T any](to *[]T) func(T) error { } // NewBuilder is used to create a statediff builder -func NewBuilder(stateCache adapt.StateView) Builder { +func NewBuilder(stateCache adapt.StateView) *builder { return &builder{ stateCache: stateCache, subtrieWorkers: defaultSubtrieWorkers, } } +// SetSubtrieWorkers sets the number of disjoint subtries to divide among concurrent workers. +// Passing 0 will reset this to the default value. func (sdb *builder) SetSubtrieWorkers(n uint) { + if n == 0 { + n = defaultSubtrieWorkers + } sdb.subtrieWorkers = n } @@ -190,39 +195,36 @@ func (sdb *builder) WriteStateDiffTracked( } if subiters != nil { - if len(subiters) != int(sdb.subtrieWorkers) { - return fmt.Errorf("expected to restore %d iterators, got %d", sdb.subtrieWorkers, len(subiters)) + // Completed iterators are not saved by the tracker, so restoring fewer than configured is ok, + // but having too many is not expected. + if len(subiters) > int(sdb.subtrieWorkers) { + return fmt.Errorf("restored too many iterators: expected %d, got %d", + sdb.subtrieWorkers, len(subiters)) } } else { - subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers)) - subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) - for i := 0; i < int(sdb.subtrieWorkers); i++ { - it, aux := utils.NewSymmetricDifferenceIterator(subitersA[i], subitersB[i]) - it = tracker.Tracked(it) - subiters = append(subiters, it) - auxes = append(auxes, aux) + subiters = iterutils.SubtrieIterators(makeIterator, uint(sdb.subtrieWorkers)) + for i := range subiters { + subiters[i] = tracker.Tracked(subiters[i]) } } logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) // errgroup will cancel if any group fails g, ctx := errgroup.WithContext(context.Background()) - for i := uint(0); i < sdb.subtrieWorkers; i++ { + for i := range subiters { func(subdiv uint) { g.Go(func() error { - // a, b := subitersA[subdiv], subitersB[subdiv] return sdb.processAccounts(ctx, subiters[subdiv], auxes[subdiv], params.watchedAddressesLeafPaths, nodeSink, ipldSink, logger, ) }) - }(i) + }(uint(i)) } return g.Wait() } -// processAccounts processes account creations and deletions, and returns a set of updated -// existing accounts, indexed by leaf key. +// processAccounts processes account creations, deletions, and updates func (sdb *builder) processAccounts( ctx context.Context, it trie.NodeIterator, aux *utils.SymmDiffAux, @@ -236,8 +238,7 @@ func (sdb *builder) processAccounts( updates := make(accountUpdateMap) // Cache the RLP of the previous node. When we hit a value node this will be the parent blob. - var prevBlob []byte - prevBlob = it.NodeBlob() + var prevBlob = it.NodeBlob() for it.Next(true) { select { case <-ctx.Done(): @@ -276,7 +277,7 @@ func (sdb *builder) processAccounts( } continue } - // Node exists in the new trie + // Node exists in the new trie (B) if it.Leaf() { accountW, err := sdb.decodeStateLeaf(it, prevBlob) if err != nil { @@ -296,42 +297,43 @@ func (sdb *builder) processAccounts( return err } } - } else { - // New trie nodes will be written to blockstore only. - // Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually - // signifies a "value" node. - if it.Hash() == zeroHash { - continue - } - // TODO - this can be handled when value node is (craeted?) - nodeVal := make([]byte, len(it.NodeBlob())) - copy(nodeVal, it.NodeBlob()) - // if doing a selective diff, we need to ensure this is a watched path - if len(watchedAddressesLeafPaths) > 0 { - var elements []interface{} - if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { - return err - } - ok, err := isLeaf(elements) - if err != nil { - return err - } - if ok { - partialPath := utils.CompactToHex(elements[0].([]byte)) - valueNodePath := append(it.Path(), partialPath...) - if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) { - continue - } - } - } - if err := ipldSink(sdtypes.IPLD{ - CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(), - Content: nodeVal, - }); err != nil { + continue + } + // New inner trie nodes will be written to blockstore only. + // Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually + // signifies a "value" node. + + // TODO: A zero hash indicates what? + if it.Hash() == zeroHash { + continue + } + nodeVal := make([]byte, len(it.NodeBlob())) + copy(nodeVal, it.NodeBlob()) + // if doing a selective diff, we need to ensure this is a watched path + if len(watchedAddressesLeafPaths) > 0 { + var elements []interface{} + if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { return err } - prevBlob = nodeVal + ok, err := isLeaf(elements) + if err != nil { + return err + } + if ok { + partialPath := utils.CompactToHex(elements[0].([]byte)) + valueNodePath := append(it.Path(), partialPath...) + if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) { + continue + } + } } + if err := ipldSink(sdtypes.IPLD{ + CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(), + Content: nodeVal, + }); err != nil { + return err + } + prevBlob = nodeVal } for key, update := range updates { diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index ce56acb..56719b8 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -206,7 +206,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return blockTx, err } -// processHeader write a header IPLD insert SQL stmt to a file +// PushHeader write a header IPLD insert SQL stmt to a file // it returns the headerID func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { // Process the header diff --git a/indexer/shared/schema/table_test.go b/indexer/shared/schema/table_test.go index 692a839..aaa026b 100644 --- a/indexer/shared/schema/table_test.go +++ b/indexer/shared/schema/table_test.go @@ -43,7 +43,8 @@ var testHeaderTable = Table{ "mh_key", "times_validated", "coinbase", - )} + ), +} func TestTable(t *testing.T) { headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)` diff --git a/types/types.go b/types/types.go index 9f3f60b..9d92fd2 100644 --- a/types/types.go +++ b/types/types.go @@ -53,7 +53,7 @@ type StateLeafNode struct { StorageDiff []StorageLeafNode } -// StorageLeafNode holds the data for a single storage diff node leaf node +// StorageLeafNode holds the data for a single storage diff leaf node type StorageLeafNode struct { Removed bool Value []byte