Concurrent statediff iteration #12
19
builder.go
19
builder.go
@ -49,7 +49,7 @@ var (
|
||||
nullCodeHash = crypto.Keccak256([]byte{})
|
||||
zeroHash common.Hash
|
||||
|
||||
defaultSubtrieWorkers = 8
|
||||
defaultSubtrieWorkers uint = 1
|
||||
)
|
||||
|
||||
// Builder interface exposes the method for building a state diff between two blocks
|
||||
@ -61,7 +61,7 @@ type Builder interface {
|
||||
type StateDiffBuilder struct {
|
||||
// state cache is safe for concurrent reads
|
||||
stateCache adapt.StateView
|
||||
subtrieWorkers int
|
||||
subtrieWorkers uint
|
||||
}
|
||||
|
||||
type accountUpdate struct {
|
||||
@ -88,13 +88,22 @@ func syncedAppender[T any](to *[]T) func(T) error {
|
||||
}
|
||||
|
||||
// NewBuilder is used to create a statediff builder
|
||||
func NewBuilder(stateCache adapt.StateView) Builder {
|
||||
func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder {
|
||||
return &StateDiffBuilder{
|
||||
stateCache: stateCache,
|
||||
subtrieWorkers: defaultSubtrieWorkers,
|
||||
}
|
||||
}
|
||||
|
||||
// SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers.
|
||||
// Passing 0 will reset this to the default value.
|
||||
func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) {
|
||||
if n == 0 {
|
||||
n = defaultSubtrieWorkers
|
||||
}
|
||||
sdb.subtrieWorkers = n
|
||||
}
|
||||
|
||||
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
|
||||
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
|
||||
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
|
||||
@ -134,8 +143,8 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
|
||||
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
|
||||
// errgroup will cancel if any gr fails
|
||||
g, ctx := errgroup.WithContext(context.Background())
|
||||
for i := 0; i < sdb.subtrieWorkers; i++ {
|
||||
func(subdiv int) {
|
||||
for i := uint(0); i < sdb.subtrieWorkers; i++ {
|
||||
func(subdiv uint) {
|
||||
g.Go(func() error {
|
||||
a, b := subitersA[subdiv], subitersB[subdiv]
|
||||
return sdb.processAccounts(ctx,
|
||||
|
@ -795,13 +795,13 @@ func TestBuilder(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(),
|
||||
tests, params, test_helpers.CheckedRoots{
|
||||
block0: bankAccountAtBlock0LeafNode,
|
||||
block1: block1BranchRootNode,
|
||||
block2: block2BranchRootNode,
|
||||
block3: block3BranchRootNode,
|
||||
})
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block0: bankAccountAtBlock0LeafNode,
|
||||
block1: block1BranchRootNode,
|
||||
block2: block2BranchRootNode,
|
||||
block3: block3BranchRootNode,
|
||||
}.Check(t)
|
||||
}
|
||||
|
||||
func TestBuilderWithWatchedAddressList(t *testing.T) {
|
||||
@ -1009,12 +1009,13 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block0: bankAccountAtBlock0LeafNode,
|
||||
block1: block1BranchRootNode,
|
||||
block2: block2BranchRootNode,
|
||||
block3: block3BranchRootNode,
|
||||
})
|
||||
}.Check(t)
|
||||
}
|
||||
|
||||
func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
|
||||
@ -1259,11 +1260,12 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block4: block4BranchRootNode,
|
||||
block5: block5BranchRootNode,
|
||||
block6: block6BranchRootNode,
|
||||
})
|
||||
}.Check(t)
|
||||
}
|
||||
|
||||
func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
|
||||
@ -1393,11 +1395,12 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block4: block4BranchRootNode,
|
||||
block5: block5BranchRootNode,
|
||||
block6: block6BranchRootNode,
|
||||
})
|
||||
}.Check(t)
|
||||
}
|
||||
|
||||
func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
|
||||
@ -1596,11 +1599,12 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block4: block4BranchRootNode,
|
||||
block5: block5BranchRootNode,
|
||||
block6: block6BranchRootNode,
|
||||
})
|
||||
}.Check(t)
|
||||
}
|
||||
|
||||
var (
|
||||
@ -1823,10 +1827,11 @@ func TestBuilderWithMovedAccount(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block1: block01BranchRootNode,
|
||||
block2: bankAccountAtBlock02LeafNode,
|
||||
})
|
||||
}.Check(t)
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2349,11 +2354,12 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block1: block1bBranchRootNode,
|
||||
block2: block2bBranchRootNode,
|
||||
block3: block3bBranchRootNode,
|
||||
})
|
||||
}.Check(t)
|
||||
}
|
||||
|
||||
func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
|
||||
@ -2550,11 +2556,12 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block1: block1bBranchRootNode,
|
||||
block2: block2bBranchRootNode,
|
||||
block3: block3bBranchRootNode,
|
||||
})
|
||||
}.Check(t)
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -622,11 +622,10 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
test_helpers.RunBuilderTests(t,
|
||||
chain.StateCache(),
|
||||
tests, params, test_helpers.CheckedRoots{
|
||||
block1: block1RootBranchNode,
|
||||
block2: block2RootBranchNode,
|
||||
block3: block3RootBranchNode,
|
||||
})
|
||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||
test_helpers.CheckedRoots{
|
||||
block1: block1RootBranchNode,
|
||||
block2: block2RootBranchNode,
|
||||
block3: block3RootBranchNode,
|
||||
}.Check(t)
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package test_helpers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
@ -20,28 +21,34 @@ type TestCase struct {
|
||||
Expected *sdtypes.StateObject
|
||||
}
|
||||
|
||||
type CheckedRoots = map[*types.Block][]byte
|
||||
type CheckedRoots map[*types.Block][]byte
|
||||
|
||||
func RunBuilderTests(
|
||||
t *testing.T,
|
||||
sdb state.Database,
|
||||
tests []TestCase,
|
||||
params statediff.Params,
|
||||
roots CheckedRoots,
|
||||
subtrieCounts []uint,
|
||||
) {
|
||||
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
|
||||
for _, test := range tests {
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
diff, err := builder.BuildStateDiffObject(test.Args, params)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
for _, subtries := range subtrieCounts {
|
||||
t.Run(fmt.Sprintf("%s with %d subtries", test.Name, subtries), func(t *testing.T) {
|
||||
builder.SetSubtrieWorkers(subtries)
|
||||
diff, err := builder.BuildStateDiffObject(test.Args, params)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
normalize(test.Expected)
|
||||
normalize(&diff)
|
||||
require.Equal(t, *test.Expected, diff)
|
||||
})
|
||||
normalize(test.Expected)
|
||||
normalize(&diff)
|
||||
require.Equal(t, *test.Expected, diff)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (roots CheckedRoots) Check(t *testing.T) {
|
||||
// Let's also confirm that our root state nodes form the state root hash in the headers
|
||||
for block, node := range roots {
|
||||
require.Equal(t, block.Root(), crypto.Keccak256Hash(node),
|
||||
|
Loading…
Reference in New Issue
Block a user