From f251d82d9dbc78c98d1ba214d602f2c63f9c7fa1 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 14 Sep 2020 22:46:50 -0500 Subject: [PATCH 1/4] refactor config for workers --- cmd/root.go | 2 ++ cmd/serve.go | 3 ++- pkg/api.go | 4 ++-- pkg/builder.go | 27 ++++++++++++++------------- pkg/service.go | 26 ++++++++++++++++---------- pkg/types.go | 27 --------------------------- 6 files changed, 36 insertions(+), 53 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 543138a..baf7747 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -86,9 +86,11 @@ func init() { rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location") rootCmd.PersistentFlags().String("log-file", "", "file path for logging") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "log level (trace, debug, info, warn, error, fatal, panic") + rootCmd.PersistentFlags().Int("workers", 0, "number of concurrent workers to use") viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) + viper.BindPFlag("statediff.workers", rootCmd.PersistentFlags().Lookup("workers")) } func initConfig() { diff --git a/cmd/serve.go b/cmd/serve.go index 227aefe..58fedb4 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -68,7 +68,8 @@ func serve() { // create statediff service logWithCommand.Info("creating statediff service") - statediffService, err := sd.NewStateDiffService(lvlDBReader) + statediffService, err := sd.NewStateDiffService( + lvlDBReader, sd.Config{Workers: viper.GetUint("statediff.workers")}) if err != nil { logWithCommand.Fatal(err) } diff --git a/pkg/api.go b/pkg/api.go index 19a6f0d..f22db38 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -40,11 +40,11 @@ func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI { } // StateDiffAt returns a state diff payload at the specific blockheight -func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint64, params Params) (*sd.Payload, error) { +func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) (*sd.Payload, error) { return api.sds.StateDiffAt(blockNumber, params) } // StateTrieAt returns a state trie payload at the specific blockheight -func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*sd.Payload, error) { +func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params sd.Params) (*sd.Payload, error) { return api.sds.StateTrieAt(blockNumber, params) } diff --git a/pkg/builder.go b/pkg/builder.go index 20e1887..a8435b4 100644 --- a/pkg/builder.go +++ b/pkg/builder.go @@ -44,12 +44,13 @@ var ( // Builder interface exposes the method for building a state diff between two blocks type Builder interface { - BuildStateDiffObject(args Args, params Params) (sd.StateObject, error) + BuildStateDiffObject(args sd.Args, params sd.Params) (sd.StateObject, error) BuildStateTrieObject(current *types.Block) (sd.StateObject, error) } type builder struct { stateCache state.Database + numWorkers uint } type iterPair struct { @@ -57,9 +58,13 @@ type iterPair struct { } // NewBuilder is used to create a statediff builder -func NewBuilder(stateCache state.Database) Builder { +func NewBuilder(stateCache state.Database, workers uint) Builder { + if workers == 0 { + workers = 1 + } return &builder{ stateCache: stateCache, // state cache is safe for concurrent reads + numWorkers: workers, } } @@ -149,7 +154,7 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]sd.StateNode, error) } // BuildStateDiff builds a statediff object from two blocks and the provided parameters -func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObject, error) { +func (sdb *builder) BuildStateDiffObject(args sd.Args, params sd.Params) (sd.StateObject, error) { if len(params.WatchedAddresses) > 0 { // if we are watching only specific accounts then we are only diffing leaf nodes log.Info("Ignoring intermediate state nodes because WatchedAddresses was passed") @@ -165,18 +170,14 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje if err != nil { return sd.StateObject{}, fmt.Errorf("error creating trie for new state root: %v", err) } - nWorkers := params.Workers - if nWorkers == 0 { - nWorkers = 1 - } // Split old and new tries into corresponding subtrie iterators - oldIterFac := iter.NewSubtrieIteratorFactory(oldTrie, nWorkers) - newIterFac := iter.NewSubtrieIteratorFactory(newTrie, nWorkers) - iterChan := make(chan []iterPair, nWorkers) + oldIterFac := iter.NewSubtrieIteratorFactory(oldTrie, sdb.numWorkers) + newIterFac := iter.NewSubtrieIteratorFactory(newTrie, sdb.numWorkers) + iterChan := make(chan []iterPair, sdb.numWorkers) // Create iterators ahead of time to avoid race condition in state.Trie access - for i := uint(0); i < nWorkers; i++ { + for i := uint(0); i < sdb.numWorkers; i++ { // two state iterations per diff build iterChan <- []iterPair{ iterPair{ @@ -193,7 +194,7 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje nodeChan := make(chan []sd.StateNode) var wg sync.WaitGroup - for w := uint(0); w < nWorkers; w++ { + for w := uint(0); w < sdb.numWorkers; w++ { wg.Add(1) go func(iterChan <-chan []iterPair) error { defer wg.Done() @@ -226,7 +227,7 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje }, nil } -func (sdb *builder) buildStateDiff(args []iterPair, params Params) ([]sd.StateNode, error) { +func (sdb *builder) buildStateDiff(args []iterPair, params sd.Params) ([]sd.StateNode, error) { // collect a slice of all the intermediate nodes that were touched and exist at B // a map of their leafkey to all the accounts that were touched and exist at B // and a slice of all the paths for the nodes in both of the above sets diff --git a/pkg/service.go b/pkg/service.go index 6739e61..a4baa85 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -48,9 +48,15 @@ type IService interface { // Main event loop for processing state diffs Loop(wg *sync.WaitGroup) // Method to get state diff object at specific block - StateDiffAt(blockNumber uint64, params Params) (*sd.Payload, error) + StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) // Method to get state trie object at specific block - StateTrieAt(blockNumber uint64, params Params) (*sd.Payload, error) + StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) +} + +// Server configuration +type Config struct { + // Number of goroutines to use + Workers uint } // Service is the underlying struct for the state diffing service @@ -64,10 +70,10 @@ type Service struct { } // NewStateDiffService creates a new Service -func NewStateDiffService(lvlDBReader lvlDBReader) (*Service, error) { +func NewStateDiffService(lvlDBReader lvlDBReader, cfg Config) (*Service, error) { return &Service{ lvlDBReader: lvlDBReader, - Builder: NewBuilder(lvlDBReader.StateDB()), + Builder: NewBuilder(lvlDBReader.StateDB(), cfg.Workers), QuitChan: make(chan bool), }, nil } @@ -104,7 +110,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) { // StateDiffAt returns a state diff object payload at the specific blockheight // This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data -func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*sd.Payload, error) { +func (sds *Service) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) { currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) if err != nil { return nil, err @@ -121,8 +127,8 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*sd.Payload, } // processStateDiff method builds the state diff payload from the current block, parent state root, and provided params -func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params Params) (*sd.Payload, error) { - stateDiff, err := sds.Builder.BuildStateDiffObject(Args{ +func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params sd.Params) (*sd.Payload, error) { + stateDiff, err := sds.Builder.BuildStateDiffObject(sd.Args{ BlockHash: currentBlock.Hash(), BlockNumber: currentBlock.Number(), OldStateRoot: parentRoot, @@ -139,7 +145,7 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo return sds.newPayload(stateDiffRlp, currentBlock, params) } -func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Params) (*sd.Payload, error) { +func (sds *Service) newPayload(stateObject []byte, block *types.Block, params sd.Params) (*sd.Payload, error) { payload := &sd.Payload{ StateObjectRlp: stateObject, } @@ -173,7 +179,7 @@ func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Pa // StateTrieAt returns a state trie object payload at the specified blockheight // This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data -func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*sd.Payload, error) { +func (sds *Service) StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) { currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) if err != nil { return nil, err @@ -182,7 +188,7 @@ func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*sd.Payload, return sds.processStateTrie(currentBlock, params) } -func (sds *Service) processStateTrie(block *types.Block, params Params) (*sd.Payload, error) { +func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd.Payload, error) { stateNodes, err := sds.Builder.BuildStateTrieObject(block) if err != nil { return nil, err diff --git a/pkg/types.go b/pkg/types.go index 8a68687..867d011 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -20,37 +20,10 @@ package statediff import ( - "math/big" - - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" sd "github.com/ethereum/go-ethereum/statediff" ) -// Subscription struct holds our subscription channels -type Subscription struct { - PayloadChan chan<- sd.Payload - QuitChan chan<- bool -} - -// Params is used to carry in parameters from subscribing/requesting clients configuration -type Params struct { - IntermediateStateNodes bool - IntermediateStorageNodes bool - IncludeBlock bool - IncludeReceipts bool - IncludeTD bool - WatchedAddresses []common.Address - WatchedStorageSlots []common.Hash - Workers uint -} - -// Args bundles the arguments for the state diff builder -type Args struct { - OldStateRoot, NewStateRoot, BlockHash common.Hash - BlockNumber *big.Int -} - // AccountMap is a mapping of hex encoded path => account wrapper type AccountMap map[string]accountWrapper From e9e7144ac638977151abe47b4f869ec81fde82fa Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 14 Sep 2020 22:47:25 -0500 Subject: [PATCH 2/4] refactor tests --- pkg/builder_test.go | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/pkg/builder_test.go b/pkg/builder_test.go index fdf03f6..aa5c091 100644 --- a/pkg/builder_test.go +++ b/pkg/builder_test.go @@ -27,8 +27,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" + statediff "github.com/ethereum/go-ethereum/statediff" - statediff "github.com/vulcanize/eth-statediff-service/pkg" + pkg "github.com/vulcanize/eth-statediff-service/pkg" "github.com/vulcanize/eth-statediff-service/pkg/testhelpers" ) @@ -38,7 +39,7 @@ var ( emptyDiffs = make([]statediff.StateNode, 0) emptyStorage = make([]statediff.StorageNode, 0) block0, block1, block2, block3, block4, block5, block6 *types.Block - builder statediff.Builder + builder pkg.Builder miningReward = int64(2000000000000000000) minerAddress = common.HexToAddress("0x0") minerLeafKey = testhelpers.AddressToLeafKey(minerAddress) @@ -474,7 +475,6 @@ func TestBuilder(t *testing.T) { block2 = blocks[1] block3 = blocks[2] params := statediff.Params{} - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string @@ -669,7 +669,7 @@ func TestBuilder(t *testing.T) { } for _, workers := range workerCounts { - params.Workers = workers + builder := pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -706,7 +706,6 @@ func TestBuilderWithIntermediateNodes(t *testing.T) { IntermediateStateNodes: true, IntermediateStorageNodes: true, } - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string @@ -929,7 +928,7 @@ func TestBuilderWithIntermediateNodes(t *testing.T) { } for _, workers := range workerCounts { - params.Workers = workers + builder = pkg.NewBuilder(chain.StateCache(), workers) for i, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -978,7 +977,6 @@ func TestBuilderWithWatchedAddressList(t *testing.T) { params := statediff.Params{ WatchedAddresses: []common.Address{testhelpers.Account1Addr, testhelpers.ContractAddr}, } - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string @@ -1115,7 +1113,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) { } for _, workers := range workerCounts { - params.Workers = workers + builder = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1151,7 +1149,6 @@ func TestBuilderWithWatchedAddressAndStorageKeyList(t *testing.T) { WatchedAddresses: []common.Address{testhelpers.Account1Addr, testhelpers.ContractAddr}, WatchedStorageSlots: []common.Hash{slot1StorageKey}, } - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string @@ -1275,7 +1272,7 @@ func TestBuilderWithWatchedAddressAndStorageKeyList(t *testing.T) { } for _, workers := range workerCounts { - params.Workers = workers + builder = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1311,7 +1308,6 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { IntermediateStateNodes: true, IntermediateStorageNodes: true, } - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string @@ -1485,7 +1481,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { } for _, workers := range workerCounts { - params.Workers = workers + builder = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1521,7 +1517,6 @@ func TestBuilderWithRemovedAccountAndStorageWithoutIntermediateNodes(t *testing. IntermediateStateNodes: false, IntermediateStorageNodes: false, } - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string @@ -1672,7 +1667,7 @@ func TestBuilderWithRemovedAccountAndStorageWithoutIntermediateNodes(t *testing. } for _, workers := range workerCounts { - params.Workers = workers + builder = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1788,7 +1783,6 @@ func TestBuilderWithMovedAccount(t *testing.T) { IntermediateStateNodes: true, IntermediateStorageNodes: true, } - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string @@ -1883,7 +1877,7 @@ func TestBuilderWithMovedAccount(t *testing.T) { } for _, workers := range workerCounts { - params.Workers = workers + builder = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1918,7 +1912,6 @@ func TestBuilderWithMovedAccountOnlyLeafs(t *testing.T) { IntermediateStateNodes: false, IntermediateStorageNodes: false, } - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string @@ -2002,7 +1995,7 @@ func TestBuilderWithMovedAccountOnlyLeafs(t *testing.T) { } for _, workers := range workerCounts { - params.Workers = workers + builder = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -2033,7 +2026,6 @@ func TestBuildStateTrie(t *testing.T) { block1 = blocks[0] block2 = blocks[1] block3 = blocks[2] - builder = statediff.NewBuilder(chain.StateCache()) var tests = []struct { name string From d711fa7d7f2c54f521ec26c8f35b0f8461225f9c Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 14 Sep 2020 23:19:13 -0500 Subject: [PATCH 3/4] add workers to example.toml --- environments/example.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/environments/example.toml b/environments/example.toml index 7442fe6..f8d3fee 100644 --- a/environments/example.toml +++ b/environments/example.toml @@ -6,6 +6,9 @@ ipcPath = ".ipc" httpPath = "127.0.0.1:8545" +[statediff] + workers = 4 + [log] file = "" level = "info" From 03018fa58a7773c15e43b2365268fc41980d0ce7 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 15 Sep 2020 01:03:46 -0500 Subject: [PATCH 4/4] proper fail for worker count --- pkg/builder.go | 8 ++++++-- pkg/builder_test.go | 16 ++++++++-------- pkg/service.go | 6 +++++- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pkg/builder.go b/pkg/builder.go index a8435b4..5b8968e 100644 --- a/pkg/builder.go +++ b/pkg/builder.go @@ -23,6 +23,7 @@ import ( "bytes" "fmt" "sync" + "math/bits" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -58,14 +59,17 @@ type iterPair struct { } // NewBuilder is used to create a statediff builder -func NewBuilder(stateCache state.Database, workers uint) Builder { +func NewBuilder(stateCache state.Database, workers uint) (Builder, error) { if workers == 0 { workers = 1 } + if bits.OnesCount(workers) != 1 { + return nil, fmt.Errorf("workers must be a power of 2") + } return &builder{ stateCache: stateCache, // state cache is safe for concurrent reads numWorkers: workers, - } + }, nil } // BuildStateTrieObject builds a state trie object from the provided block diff --git a/pkg/builder_test.go b/pkg/builder_test.go index aa5c091..1681dcf 100644 --- a/pkg/builder_test.go +++ b/pkg/builder_test.go @@ -669,7 +669,7 @@ func TestBuilder(t *testing.T) { } for _, workers := range workerCounts { - builder := pkg.NewBuilder(chain.StateCache(), workers) + builder, _ = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -928,7 +928,7 @@ func TestBuilderWithIntermediateNodes(t *testing.T) { } for _, workers := range workerCounts { - builder = pkg.NewBuilder(chain.StateCache(), workers) + builder, _ = pkg.NewBuilder(chain.StateCache(), workers) for i, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1113,7 +1113,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) { } for _, workers := range workerCounts { - builder = pkg.NewBuilder(chain.StateCache(), workers) + builder, _ = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1272,7 +1272,7 @@ func TestBuilderWithWatchedAddressAndStorageKeyList(t *testing.T) { } for _, workers := range workerCounts { - builder = pkg.NewBuilder(chain.StateCache(), workers) + builder, _ = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1481,7 +1481,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { } for _, workers := range workerCounts { - builder = pkg.NewBuilder(chain.StateCache(), workers) + builder, _ = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1667,7 +1667,7 @@ func TestBuilderWithRemovedAccountAndStorageWithoutIntermediateNodes(t *testing. } for _, workers := range workerCounts { - builder = pkg.NewBuilder(chain.StateCache(), workers) + builder, _ = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1877,7 +1877,7 @@ func TestBuilderWithMovedAccount(t *testing.T) { } for _, workers := range workerCounts { - builder = pkg.NewBuilder(chain.StateCache(), workers) + builder, _ = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { @@ -1995,7 +1995,7 @@ func TestBuilderWithMovedAccountOnlyLeafs(t *testing.T) { } for _, workers := range workerCounts { - builder = pkg.NewBuilder(chain.StateCache(), workers) + builder, _ = pkg.NewBuilder(chain.StateCache(), workers) for _, test := range tests { diff, err := builder.BuildStateDiffObject(test.startingArguments, params) if err != nil { diff --git a/pkg/service.go b/pkg/service.go index a4baa85..31a4fb8 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -71,9 +71,13 @@ type Service struct { // NewStateDiffService creates a new Service func NewStateDiffService(lvlDBReader lvlDBReader, cfg Config) (*Service, error) { + builder, err := NewBuilder(lvlDBReader.StateDB(), cfg.Workers) + if err != nil { + return nil, err + } return &Service{ lvlDBReader: lvlDBReader, - Builder: NewBuilder(lvlDBReader.StateDB(), cfg.Workers), + Builder: builder, QuitChan: make(chan bool), }, nil }