WriteStateDiffTracked

This commit is contained in:
Roy Crihfield 2023-08-27 01:20:29 +08:00
parent 1781675b39
commit 1e3daaeaee
2 changed files with 95 additions and 25 deletions

View File

@ -27,6 +27,7 @@ import (
"time"
iterutils "github.com/cerc-io/eth-iterator-utils"
"github.com/cerc-io/eth-iterator-utils/tracker"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@ -58,7 +59,7 @@ type Builder interface {
WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error
}
type StateDiffBuilder struct {
type builder struct {
// state cache is safe for concurrent reads
stateCache adapt.StateView
subtrieWorkers uint
@ -88,8 +89,8 @@ func syncedAppender[T any](to *[]T) func(T) error {
}
// NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder {
return &StateDiffBuilder{
func NewBuilder(stateCache adapt.StateView) Builder {
return &builder{
stateCache: stateCache,
subtrieWorkers: defaultSubtrieWorkers,
}
@ -97,7 +98,7 @@ func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder {
// SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers.
// Passing 0 will reset this to the default value.
func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) {
func (sdb *builder) SetSubtrieWorkers(n uint) {
if n == 0 {
n = defaultSubtrieWorkers
}
@ -105,7 +106,7 @@ func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) {
}
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []sdtypes.StateLeafNode
var iplds []sdtypes.IPLD
@ -122,7 +123,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt
}
// WriteStateDiff writes a statediff object to output sinks
func (sdb *StateDiffBuilder) WriteStateDiff(
func (sdb *builder) WriteStateDiff(
args Args, params Params,
nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink,
@ -141,14 +142,82 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// errgroup will cancel if any gr fails
// errgroup will cancel if any group fails
g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ {
func(subdiv uint) {
g.Go(func() error {
a, b := subitersA[subdiv], subitersB[subdiv]
it, aux := utils.NewSymmetricDifferenceIterator(a, b)
return sdb.processAccounts(ctx,
a, b, params.watchedAddressesLeafPaths,
it, aux,
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger,
)
})
}(i)
}
return g.Wait()
}
// WriteStateDiff writes a statediff object to output sinks
func (sdb *builder) WriteStateDiffTracked(
args Args, params Params,
nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink,
tracker *tracker.Tracker,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
// Load tries for old and new states
triea, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
if err != nil {
return fmt.Errorf("error opening old state trie: %w", err)
}
trieb, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
if err != nil {
return fmt.Errorf("error opening new state trie: %w", err)
}
var subiters []trie.NodeIterator
var auxes []*utils.SymmDiffAux
// Constructor for difference iterator at a specific (recovered) path
makeIterator := func(key []byte) trie.NodeIterator {
a := triea.NodeIterator(key)
b := trieb.NodeIterator(key)
diffit, aux := utils.NewSymmetricDifferenceIterator(a, b)
// iterators are constructed in-order, so these will align
auxes = append(auxes, aux)
return diffit
}
subiters, err = tracker.Restore(makeIterator)
if err != nil {
return fmt.Errorf("error restoring iterators: %w", err)
}
if subiters != nil {
if len(subiters) != int(sdb.subtrieWorkers) {
return fmt.Errorf("expected to restore %d iterators, got %d", sdb.subtrieWorkers, len(subiters))
}
} else {
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
for i := 0; i < int(sdb.subtrieWorkers); i++ {
it, aux := utils.NewSymmetricDifferenceIterator(subitersA[i], subitersB[i])
it = tracker.Tracked(it)
subiters = append(subiters, it)
auxes = append(auxes, aux)
}
}
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// errgroup will cancel if any group fails
g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ {
func(subdiv uint) {
g.Go(func() error {
// a, b := subitersA[subdiv], subitersB[subdiv]
return sdb.processAccounts(ctx,
subiters[subdiv], auxes[subdiv],
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger,
)
})
@ -159,9 +228,10 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
// processAccounts processes account creations and deletions, and returns a set of updated
// existing accounts, indexed by leaf key.
func (sdb *StateDiffBuilder) processAccounts(
func (sdb *builder) processAccounts(
ctx context.Context,
a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte,
it trie.NodeIterator, aux *utils.SymmDiffAux,
watchedAddressesLeafPaths [][]byte,
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
logger log.Logger,
) error {
@ -172,7 +242,6 @@ func (sdb *StateDiffBuilder) processAccounts(
updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob []byte
it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
prevBlob = it.NodeBlob()
for it.Next(true) {
select {
@ -185,7 +254,7 @@ func (sdb *StateDiffBuilder) processAccounts(
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue
}
if it.FromA() { // Node exists in the old trie
if aux.FromA() { // Node exists in the old trie
if it.Leaf() {
var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
@ -194,7 +263,7 @@ func (sdb *StateDiffBuilder) processAccounts(
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if it.CommonPath() {
if aux.CommonPath() {
// If B also contains this leaf node, this is the old state of an updated account.
if update, ok := updates[string(leafKey)]; ok {
update.oldRoot = account.Root
@ -219,7 +288,7 @@ func (sdb *StateDiffBuilder) processAccounts(
return err
}
if it.CommonPath() {
if aux.CommonPath() {
// If A also contains this leaf node, this is the new state of an updated account.
if update, ok := updates[string(accountW.LeafKey)]; ok {
update.new = *accountW
@ -288,11 +357,11 @@ func (sdb *StateDiffBuilder) processAccounts(
}
}
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(aux.Count()))
return it.Error()
}
func (sdb *StateDiffBuilder) processAccountDeletion(
func (sdb *builder) processAccountDeletion(
leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
@ -309,7 +378,7 @@ func (sdb *StateDiffBuilder) processAccountDeletion(
return nodeSink(diff)
}
func (sdb *StateDiffBuilder) processAccountCreation(
func (sdb *builder) processAccountCreation(
accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
@ -340,7 +409,7 @@ func (sdb *StateDiffBuilder) processAccountCreation(
// decodes account at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
func (sdb *builder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err)
@ -357,7 +426,7 @@ func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []
// processStorageCreations processes the storage node records for a newly created account
// i.e. it returns all the storage nodes at this state, since there is no previous state.
func (sdb *StateDiffBuilder) processStorageCreations(
func (sdb *builder) processStorageCreations(
sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer)
@ -395,7 +464,7 @@ func (sdb *StateDiffBuilder) processStorageCreations(
}
// processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) processStorageUpdates(
func (sdb *builder) processStorageUpdates(
oldroot common.Hash, newroot common.Hash,
storageSink sdtypes.StorageNodeSink,
ipldSink sdtypes.IPLDSink,
@ -416,10 +485,10 @@ func (sdb *StateDiffBuilder) processStorageUpdates(
var prevBlob []byte
a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil)
it, _ := utils.NewSymmetricDifferenceIterator(a, b)
it, aux := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) {
if it.FromA() {
if it.Leaf() && !it.CommonPath() {
if aux.FromA() {
if it.Leaf() && !aux.CommonPath() {
// If this node's leaf key is absent from B, the storage slot was vacated.
// In that case, emit an empty "removed" storage node record.
if err := storageSink(sdtypes.StorageLeafNode{
@ -457,7 +526,7 @@ func (sdb *StateDiffBuilder) processStorageUpdates(
}
// processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) processRemovedAccountStorage(
func (sdb *builder) processRemovedAccountStorage(
sr common.Hash, storageSink sdtypes.StorageNodeSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer)
@ -491,7 +560,7 @@ func (sdb *StateDiffBuilder) processRemovedAccountStorage(
// decodes slot at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
func (sdb *builder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
value := make([]byte, len(it.LeafBlob()))

View File

@ -824,6 +824,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err
}
// TODO: review/remove the need to sync here
var nodeMtx, ipldMtx sync.Mutex
nodeSink := func(node types2.StateLeafNode) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)