diff --git a/cmd/root.go b/cmd/root.go index 543138a..baf7747 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -86,9 +86,11 @@ func init() { rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location") rootCmd.PersistentFlags().String("log-file", "", "file path for logging") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "log level (trace, debug, info, warn, error, fatal, panic") + rootCmd.PersistentFlags().Int("workers", 0, "number of concurrent workers to use") viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) + viper.BindPFlag("statediff.workers", rootCmd.PersistentFlags().Lookup("workers")) } func initConfig() { diff --git a/cmd/serve.go b/cmd/serve.go index 227aefe..58fedb4 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -68,7 +68,8 @@ func serve() { // create statediff service logWithCommand.Info("creating statediff service") - statediffService, err := sd.NewStateDiffService(lvlDBReader) + statediffService, err := sd.NewStateDiffService( + lvlDBReader, sd.Config{Workers: viper.GetUint("statediff.workers")}) if err != nil { logWithCommand.Fatal(err) } diff --git a/pkg/api.go b/pkg/api.go index 19a6f0d..f22db38 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -40,11 +40,11 @@ func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI { } // StateDiffAt returns a state diff payload at the specific blockheight -func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint64, params Params) (*sd.Payload, error) { +func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) (*sd.Payload, error) { return api.sds.StateDiffAt(blockNumber, params) } // StateTrieAt returns a state trie payload at the specific blockheight -func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*sd.Payload, error) { +func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params sd.Params) (*sd.Payload, error) { return api.sds.StateTrieAt(blockNumber, params) } diff --git a/pkg/builder.go b/pkg/builder.go index 20e1887..a8435b4 100644 --- a/pkg/builder.go +++ b/pkg/builder.go @@ -44,12 +44,13 @@ var ( // Builder interface exposes the method for building a state diff between two blocks type Builder interface { - BuildStateDiffObject(args Args, params Params) (sd.StateObject, error) + BuildStateDiffObject(args sd.Args, params sd.Params) (sd.StateObject, error) BuildStateTrieObject(current *types.Block) (sd.StateObject, error) } type builder struct { stateCache state.Database + numWorkers uint } type iterPair struct { @@ -57,9 +58,13 @@ type iterPair struct { } // NewBuilder is used to create a statediff builder -func NewBuilder(stateCache state.Database) Builder { +func NewBuilder(stateCache state.Database, workers uint) Builder { + if workers == 0 { + workers = 1 + } return &builder{ stateCache: stateCache, // state cache is safe for concurrent reads + numWorkers: workers, } } @@ -149,7 +154,7 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]sd.StateNode, error) } // BuildStateDiff builds a statediff object from two blocks and the provided parameters -func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObject, error) { +func (sdb *builder) BuildStateDiffObject(args sd.Args, params sd.Params) (sd.StateObject, error) { if len(params.WatchedAddresses) > 0 { // if we are watching only specific accounts then we are only diffing leaf nodes log.Info("Ignoring intermediate state nodes because WatchedAddresses was passed") @@ -165,18 +170,14 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje if err != nil { return sd.StateObject{}, fmt.Errorf("error creating trie for new state root: %v", err) } - nWorkers := params.Workers - if nWorkers == 0 { - nWorkers = 1 - } // Split old and new tries into corresponding subtrie iterators - oldIterFac := iter.NewSubtrieIteratorFactory(oldTrie, nWorkers) - newIterFac := iter.NewSubtrieIteratorFactory(newTrie, nWorkers) - iterChan := make(chan []iterPair, nWorkers) + oldIterFac := iter.NewSubtrieIteratorFactory(oldTrie, sdb.numWorkers) + newIterFac := iter.NewSubtrieIteratorFactory(newTrie, sdb.numWorkers) + iterChan := make(chan []iterPair, sdb.numWorkers) // Create iterators ahead of time to avoid race condition in state.Trie access - for i := uint(0); i < nWorkers; i++ { + for i := uint(0); i < sdb.numWorkers; i++ { // two state iterations per diff build iterChan <- []iterPair{ iterPair{ @@ -193,7 +194,7 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje nodeChan := make(chan []sd.StateNode) var wg sync.WaitGroup - for w := uint(0); w < nWorkers; w++ { + for w := uint(0); w < sdb.numWorkers; w++ { wg.Add(1) go func(iterChan <-chan []iterPair) error { defer wg.Done() @@ -226,7 +227,7 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje }, nil } -func (sdb *builder) buildStateDiff(args []iterPair, params Params) ([]sd.StateNode, error) { +func (sdb *builder) buildStateDiff(args []iterPair, params sd.Params) ([]sd.StateNode, error) { // collect a slice of all the intermediate nodes that were touched and exist at B // a map of their leafkey to all the accounts that were touched and exist at B // and a slice of all the paths for the nodes in both of the above sets diff --git a/pkg/service.go b/pkg/service.go index 6739e61..a4baa85 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -48,9 +48,15 @@ type IService interface { // Main event loop for processing state diffs Loop(wg *sync.WaitGroup) // Method to get state diff object at specific block - StateDiffAt(blockNumber uint64, params Params) (*sd.Payload, error) + StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) // Method to get state trie object at specific block - StateTrieAt(blockNumber uint64, params Params) (*sd.Payload, error) + StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) +} + +// Server configuration +type Config struct { + // Number of goroutines to use + Workers uint } // Service is the underlying struct for the state diffing service @@ -64,10 +70,10 @@ type Service struct { } // NewStateDiffService creates a new Service -func NewStateDiffService(lvlDBReader lvlDBReader) (*Service, error) { +func NewStateDiffService(lvlDBReader lvlDBReader, cfg Config) (*Service, error) { return &Service{ lvlDBReader: lvlDBReader, - Builder: NewBuilder(lvlDBReader.StateDB()), + Builder: NewBuilder(lvlDBReader.StateDB(), cfg.Workers), QuitChan: make(chan bool), }, nil } @@ -104,7 +110,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) { // StateDiffAt returns a state diff object payload at the specific blockheight // This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data -func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*sd.Payload, error) { +func (sds *Service) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) { currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) if err != nil { return nil, err @@ -121,8 +127,8 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*sd.Payload, } // processStateDiff method builds the state diff payload from the current block, parent state root, and provided params -func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params Params) (*sd.Payload, error) { - stateDiff, err := sds.Builder.BuildStateDiffObject(Args{ +func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params sd.Params) (*sd.Payload, error) { + stateDiff, err := sds.Builder.BuildStateDiffObject(sd.Args{ BlockHash: currentBlock.Hash(), BlockNumber: currentBlock.Number(), OldStateRoot: parentRoot, @@ -139,7 +145,7 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo return sds.newPayload(stateDiffRlp, currentBlock, params) } -func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Params) (*sd.Payload, error) { +func (sds *Service) newPayload(stateObject []byte, block *types.Block, params sd.Params) (*sd.Payload, error) { payload := &sd.Payload{ StateObjectRlp: stateObject, } @@ -173,7 +179,7 @@ func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Pa // StateTrieAt returns a state trie object payload at the specified blockheight // This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data -func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*sd.Payload, error) { +func (sds *Service) StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) { currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) if err != nil { return nil, err @@ -182,7 +188,7 @@ func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*sd.Payload, return sds.processStateTrie(currentBlock, params) } -func (sds *Service) processStateTrie(block *types.Block, params Params) (*sd.Payload, error) { +func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd.Payload, error) { stateNodes, err := sds.Builder.BuildStateTrieObject(block) if err != nil { return nil, err diff --git a/pkg/types.go b/pkg/types.go index 8a68687..867d011 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -20,37 +20,10 @@ package statediff import ( - "math/big" - - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" sd "github.com/ethereum/go-ethereum/statediff" ) -// Subscription struct holds our subscription channels -type Subscription struct { - PayloadChan chan<- sd.Payload - QuitChan chan<- bool -} - -// Params is used to carry in parameters from subscribing/requesting clients configuration -type Params struct { - IntermediateStateNodes bool - IntermediateStorageNodes bool - IncludeBlock bool - IncludeReceipts bool - IncludeTD bool - WatchedAddresses []common.Address - WatchedStorageSlots []common.Hash - Workers uint -} - -// Args bundles the arguments for the state diff builder -type Args struct { - OldStateRoot, NewStateRoot, BlockHash common.Hash - BlockNumber *big.Int -} - // AccountMap is a mapping of hex encoded path => account wrapper type AccountMap map[string]accountWrapper