WriteStateDiffTracked

This commit is contained in:
Roy Crihfield 2023-08-27 01:20:29 +08:00
parent 66f7637086
commit 366d6d4b32
2 changed files with 98 additions and 24 deletions

View File

@ -27,6 +27,7 @@ import (
"time" "time"
iterutils "github.com/cerc-io/eth-iterator-utils" iterutils "github.com/cerc-io/eth-iterator-utils"
"github.com/cerc-io/eth-iterator-utils/tracker"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -58,7 +59,7 @@ type Builder interface {
WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error
} }
type StateDiffBuilder struct { type builder struct {
// state cache is safe for concurrent reads // state cache is safe for concurrent reads
stateCache adapt.StateView stateCache adapt.StateView
subtrieWorkers uint subtrieWorkers uint
@ -89,14 +90,18 @@ func syncedAppender[T any](to *[]T) func(T) error {
// NewBuilder is used to create a statediff builder // NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) Builder { func NewBuilder(stateCache adapt.StateView) Builder {
return &StateDiffBuilder{ return &builder{
stateCache: stateCache, stateCache: stateCache,
subtrieWorkers: defaultSubtrieWorkers, subtrieWorkers: defaultSubtrieWorkers,
} }
} }
func (sdb *builder) SetSubtrieWorkers(n uint) {
sdb.subtrieWorkers = n
}
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []sdtypes.StateLeafNode var stateNodes []sdtypes.StateLeafNode
var iplds []sdtypes.IPLD var iplds []sdtypes.IPLD
@ -113,7 +118,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt
} }
// WriteStateDiff writes a statediff object to output sinks // WriteStateDiff writes a statediff object to output sinks
func (sdb *StateDiffBuilder) WriteStateDiff( func (sdb *builder) WriteStateDiff(
args Args, params Params, args Args, params Params,
nodeSink sdtypes.StateNodeSink, nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink, ipldSink sdtypes.IPLDSink,
@ -132,14 +137,82 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// errgroup will cancel if any gr fails // errgroup will cancel if any group fails
g, ctx := errgroup.WithContext(context.Background()) g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ { for i := uint(0); i < sdb.subtrieWorkers; i++ {
func(subdiv uint) { func(subdiv uint) {
g.Go(func() error { g.Go(func() error {
a, b := subitersA[subdiv], subitersB[subdiv] a, b := subitersA[subdiv], subitersB[subdiv]
it, aux := utils.NewSymmetricDifferenceIterator(a, b)
return sdb.processAccounts(ctx, return sdb.processAccounts(ctx,
a, b, params.watchedAddressesLeafPaths, it, aux,
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger,
)
})
}(i)
}
return g.Wait()
}
// WriteStateDiff writes a statediff object to output sinks
func (sdb *builder) WriteStateDiffTracked(
args Args, params Params,
nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink,
tracker *tracker.Tracker,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
// Load tries for old and new states
triea, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
if err != nil {
return fmt.Errorf("error opening old state trie: %w", err)
}
trieb, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
if err != nil {
return fmt.Errorf("error opening new state trie: %w", err)
}
var subiters []trie.NodeIterator
var auxes []*utils.SymmDiffAux
// Constructor for difference iterator at a specific (recovered) path
makeIterator := func(key []byte) trie.NodeIterator {
a := triea.NodeIterator(key)
b := trieb.NodeIterator(key)
diffit, aux := utils.NewSymmetricDifferenceIterator(a, b)
// iterators are constructed in-order, so these will align
auxes = append(auxes, aux)
return diffit
}
subiters, err = tracker.Restore(makeIterator)
if err != nil {
return fmt.Errorf("error restoring iterators: %w", err)
}
if subiters != nil {
if len(subiters) != int(sdb.subtrieWorkers) {
return fmt.Errorf("expected to restore %d iterators, got %d", sdb.subtrieWorkers, len(subiters))
}
} else {
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
for i := 0; i < int(sdb.subtrieWorkers); i++ {
it, aux := utils.NewSymmetricDifferenceIterator(subitersA[i], subitersB[i])
it = tracker.Tracked(it)
subiters = append(subiters, it)
auxes = append(auxes, aux)
}
}
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// errgroup will cancel if any group fails
g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ {
func(subdiv uint) {
g.Go(func() error {
// a, b := subitersA[subdiv], subitersB[subdiv]
return sdb.processAccounts(ctx,
subiters[subdiv], auxes[subdiv],
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger, nodeSink, ipldSink, logger,
) )
}) })
@ -150,9 +223,10 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
// processAccounts processes account creations and deletions, and returns a set of updated // processAccounts processes account creations and deletions, and returns a set of updated
// existing accounts, indexed by leaf key. // existing accounts, indexed by leaf key.
func (sdb *StateDiffBuilder) processAccounts( func (sdb *builder) processAccounts(
ctx context.Context, ctx context.Context,
a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte, it trie.NodeIterator, aux *utils.SymmDiffAux,
watchedAddressesLeafPaths [][]byte,
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
logger log.Logger, logger log.Logger,
) error { ) error {
@ -163,7 +237,6 @@ func (sdb *StateDiffBuilder) processAccounts(
updates := make(accountUpdateMap) updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob. // Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob []byte var prevBlob []byte
it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
prevBlob = it.NodeBlob() prevBlob = it.NodeBlob()
for it.Next(true) { for it.Next(true) {
select { select {
@ -176,7 +249,7 @@ func (sdb *StateDiffBuilder) processAccounts(
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue continue
} }
if it.FromA() { // Node exists in the old trie if aux.FromA() { // Node exists in the old trie
if it.Leaf() { if it.Leaf() {
var account types.StateAccount var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
@ -185,7 +258,7 @@ func (sdb *StateDiffBuilder) processAccounts(
leafKey := make([]byte, len(it.LeafKey())) leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey()) copy(leafKey, it.LeafKey())
if it.CommonPath() { if aux.CommonPath() {
// If B also contains this leaf node, this is the old state of an updated account. // If B also contains this leaf node, this is the old state of an updated account.
if update, ok := updates[string(leafKey)]; ok { if update, ok := updates[string(leafKey)]; ok {
update.oldRoot = account.Root update.oldRoot = account.Root
@ -210,7 +283,7 @@ func (sdb *StateDiffBuilder) processAccounts(
return err return err
} }
if it.CommonPath() { if aux.CommonPath() {
// If A also contains this leaf node, this is the new state of an updated account. // If A also contains this leaf node, this is the new state of an updated account.
if update, ok := updates[string(accountW.LeafKey)]; ok { if update, ok := updates[string(accountW.LeafKey)]; ok {
update.new = *accountW update.new = *accountW
@ -279,11 +352,11 @@ func (sdb *StateDiffBuilder) processAccounts(
} }
} }
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(aux.Count()))
return it.Error() return it.Error()
} }
func (sdb *StateDiffBuilder) processAccountDeletion( func (sdb *builder) processAccountDeletion(
leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink, leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink,
) error { ) error {
diff := sdtypes.StateLeafNode{ diff := sdtypes.StateLeafNode{
@ -300,7 +373,7 @@ func (sdb *StateDiffBuilder) processAccountDeletion(
return nodeSink(diff) return nodeSink(diff)
} }
func (sdb *StateDiffBuilder) processAccountCreation( func (sdb *builder) processAccountCreation(
accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink, accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink,
) error { ) error {
diff := sdtypes.StateLeafNode{ diff := sdtypes.StateLeafNode{
@ -331,7 +404,7 @@ func (sdb *StateDiffBuilder) processAccountCreation(
// decodes account at leaf and encodes RLP data to CID // decodes account at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { func (sdb *builder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
var account types.StateAccount var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err) return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err)
@ -348,7 +421,7 @@ func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []
// processStorageCreations processes the storage node records for a newly created account // processStorageCreations processes the storage node records for a newly created account
// i.e. it returns all the storage nodes at this state, since there is no previous state. // i.e. it returns all the storage nodes at this state, since there is no previous state.
func (sdb *StateDiffBuilder) processStorageCreations( func (sdb *builder) processStorageCreations(
sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink, sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink,
) error { ) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer)
@ -386,7 +459,7 @@ func (sdb *StateDiffBuilder) processStorageCreations(
} }
// processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A // processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) processStorageUpdates( func (sdb *builder) processStorageUpdates(
oldroot common.Hash, newroot common.Hash, oldroot common.Hash, newroot common.Hash,
storageSink sdtypes.StorageNodeSink, storageSink sdtypes.StorageNodeSink,
ipldSink sdtypes.IPLDSink, ipldSink sdtypes.IPLDSink,
@ -407,10 +480,10 @@ func (sdb *StateDiffBuilder) processStorageUpdates(
var prevBlob []byte var prevBlob []byte
a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil) a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil)
it, _ := utils.NewSymmetricDifferenceIterator(a, b) it, aux := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) { for it.Next(true) {
if it.FromA() { if aux.FromA() {
if it.Leaf() && !it.CommonPath() { if it.Leaf() && !aux.CommonPath() {
// If this node's leaf key is absent from B, the storage slot was vacated. // If this node's leaf key is absent from B, the storage slot was vacated.
// In that case, emit an empty "removed" storage node record. // In that case, emit an empty "removed" storage node record.
if err := storageSink(sdtypes.StorageLeafNode{ if err := storageSink(sdtypes.StorageLeafNode{
@ -448,10 +521,10 @@ func (sdb *StateDiffBuilder) processStorageUpdates(
} }
// processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account // processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) processRemovedAccountStorage( func (sdb *builder) processRemovedAccountStorage(
sr common.Hash, storageSink sdtypes.StorageNodeSink, sr common.Hash, storageSink sdtypes.StorageNodeSink,
) error { ) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedAccountStorageNodesTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer)
if sr == emptyContractRoot { if sr == emptyContractRoot {
return nil return nil
} }
@ -482,7 +555,7 @@ func (sdb *StateDiffBuilder) processRemovedAccountStorage(
// decodes slot at leaf and encodes RLP data to CID // decodes slot at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { func (sdb *builder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
leafKey := make([]byte, len(it.LeafKey())) leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey()) copy(leafKey, it.LeafKey())
value := make([]byte, len(it.LeafBlob())) value := make([]byte, len(it.LeafBlob()))

View File

@ -822,6 +822,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err return err
} }
// TODO: review/remove the need to sync here
var nodeMtx, ipldMtx sync.Mutex var nodeMtx, ipldMtx sync.Mutex
nodeSink := func(node types2.StateLeafNode) error { nodeSink := func(node types2.StateLeafNode) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)