This commit is contained in:
Roy Crihfield 2023-08-30 11:09:53 +08:00
parent 1f477f24ea
commit 7c4f8432db
4 changed files with 52 additions and 54 deletions

View File

@ -89,7 +89,7 @@ func syncedAppender[T any](to *[]T) func(T) error {
} }
// NewBuilder is used to create a statediff builder // NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) Builder { func NewBuilder(stateCache adapt.StateView) *builder {
return &builder{ return &builder{
stateCache: stateCache, stateCache: stateCache,
subtrieWorkers: defaultSubtrieWorkers, subtrieWorkers: defaultSubtrieWorkers,
@ -195,39 +195,36 @@ func (sdb *builder) WriteStateDiffTracked(
} }
if subiters != nil { if subiters != nil {
if len(subiters) != int(sdb.subtrieWorkers) { // Completed iterators are not saved by the tracker, so restoring fewer than configured is ok,
return fmt.Errorf("expected to restore %d iterators, got %d", sdb.subtrieWorkers, len(subiters)) // but having too many is not expected.
if len(subiters) > int(sdb.subtrieWorkers) {
return fmt.Errorf("restored too many iterators: expected %d, got %d",
sdb.subtrieWorkers, len(subiters))
} }
} else { } else {
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers)) subiters = iterutils.SubtrieIterators(makeIterator, uint(sdb.subtrieWorkers))
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) for i := range subiters {
for i := 0; i < int(sdb.subtrieWorkers); i++ { subiters[i] = tracker.Tracked(subiters[i])
it, aux := utils.NewSymmetricDifferenceIterator(subitersA[i], subitersB[i])
it = tracker.Tracked(it)
subiters = append(subiters, it)
auxes = append(auxes, aux)
} }
} }
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// errgroup will cancel if any group fails // errgroup will cancel if any group fails
g, ctx := errgroup.WithContext(context.Background()) g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ { for i := range subiters {
func(subdiv uint) { func(subdiv uint) {
g.Go(func() error { g.Go(func() error {
// a, b := subitersA[subdiv], subitersB[subdiv]
return sdb.processAccounts(ctx, return sdb.processAccounts(ctx,
subiters[subdiv], auxes[subdiv], subiters[subdiv], auxes[subdiv],
params.watchedAddressesLeafPaths, params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger, nodeSink, ipldSink, logger,
) )
}) })
}(i) }(uint(i))
} }
return g.Wait() return g.Wait()
} }
// processAccounts processes account creations and deletions, and returns a set of updated // processAccounts processes account creations, deletions, and updates
// existing accounts, indexed by leaf key.
func (sdb *builder) processAccounts( func (sdb *builder) processAccounts(
ctx context.Context, ctx context.Context,
it trie.NodeIterator, aux *utils.SymmDiffAux, it trie.NodeIterator, aux *utils.SymmDiffAux,
@ -241,8 +238,7 @@ func (sdb *builder) processAccounts(
updates := make(accountUpdateMap) updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob. // Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob []byte var prevBlob = it.NodeBlob()
prevBlob = it.NodeBlob()
for it.Next(true) { for it.Next(true) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -281,7 +277,7 @@ func (sdb *builder) processAccounts(
} }
continue continue
} }
// Node exists in the new trie // Node exists in the new trie (B)
if it.Leaf() { if it.Leaf() {
accountW, err := sdb.decodeStateLeaf(it, prevBlob) accountW, err := sdb.decodeStateLeaf(it, prevBlob)
if err != nil { if err != nil {
@ -301,42 +297,43 @@ func (sdb *builder) processAccounts(
return err return err
} }
} }
} else { continue
// New trie nodes will be written to blockstore only. }
// Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually // New inner trie nodes will be written to blockstore only.
// signifies a "value" node. // Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually
if it.Hash() == zeroHash { // signifies a "value" node.
continue
} // TODO: A zero hash indicates what?
// TODO - this can be handled when value node is (craeted?) if it.Hash() == zeroHash {
nodeVal := make([]byte, len(it.NodeBlob())) continue
copy(nodeVal, it.NodeBlob()) }
// if doing a selective diff, we need to ensure this is a watched path nodeVal := make([]byte, len(it.NodeBlob()))
if len(watchedAddressesLeafPaths) > 0 { copy(nodeVal, it.NodeBlob())
var elements []interface{} // if doing a selective diff, we need to ensure this is a watched path
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { if len(watchedAddressesLeafPaths) > 0 {
return err var elements []interface{}
} if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
ok, err := isLeaf(elements)
if err != nil {
return err
}
if ok {
partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...)
if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
}
}
}
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
return err return err
} }
prevBlob = nodeVal ok, err := isLeaf(elements)
if err != nil {
return err
}
if ok {
partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...)
if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
}
}
} }
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
return err
}
prevBlob = nodeVal
} }
for key, update := range updates { for key, update := range updates {

View File

@ -206,7 +206,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return blockTx, err return blockTx, err
} }
// processHeader write a header IPLD insert SQL stmt to a file // PushHeader write a header IPLD insert SQL stmt to a file
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
// Process the header // Process the header

View File

@ -43,7 +43,8 @@ var testHeaderTable = Table{
"mh_key", "mh_key",
"times_validated", "times_validated",
"coinbase", "coinbase",
)} ),
}
func TestTable(t *testing.T) { func TestTable(t *testing.T) {
headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)` headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)`

View File

@ -53,7 +53,7 @@ type StateLeafNode struct {
StorageDiff []StorageLeafNode StorageDiff []StorageLeafNode
} }
// StorageLeafNode holds the data for a single storage diff node leaf node // StorageLeafNode holds the data for a single storage diff leaf node
type StorageLeafNode struct { type StorageLeafNode struct {
Removed bool Removed bool
Value []byte Value []byte