refactor config for workers

This commit is contained in:
Roy Crihfield 2020-09-14 22:46:50 -05:00
parent d99ceddcf8
commit f251d82d9d
6 changed files with 36 additions and 53 deletions

View File

@ -86,9 +86,11 @@ func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location") rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file location")
rootCmd.PersistentFlags().String("log-file", "", "file path for logging") rootCmd.PersistentFlags().String("log-file", "", "file path for logging")
rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "log level (trace, debug, info, warn, error, fatal, panic") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "log level (trace, debug, info, warn, error, fatal, panic")
rootCmd.PersistentFlags().Int("workers", 0, "number of concurrent workers to use")
viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file")) viper.BindPFlag("log.file", rootCmd.PersistentFlags().Lookup("log-file"))
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
viper.BindPFlag("statediff.workers", rootCmd.PersistentFlags().Lookup("workers"))
} }
func initConfig() { func initConfig() {

View File

@ -68,7 +68,8 @@ func serve() {
// create statediff service // create statediff service
logWithCommand.Info("creating statediff service") logWithCommand.Info("creating statediff service")
statediffService, err := sd.NewStateDiffService(lvlDBReader) statediffService, err := sd.NewStateDiffService(
lvlDBReader, sd.Config{Workers: viper.GetUint("statediff.workers")})
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }

View File

@ -40,11 +40,11 @@ func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI {
} }
// StateDiffAt returns a state diff payload at the specific blockheight // StateDiffAt returns a state diff payload at the specific blockheight
func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint64, params Params) (*sd.Payload, error) { func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint64, params sd.Params) (*sd.Payload, error) {
return api.sds.StateDiffAt(blockNumber, params) return api.sds.StateDiffAt(blockNumber, params)
} }
// StateTrieAt returns a state trie payload at the specific blockheight // StateTrieAt returns a state trie payload at the specific blockheight
func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*sd.Payload, error) { func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params sd.Params) (*sd.Payload, error) {
return api.sds.StateTrieAt(blockNumber, params) return api.sds.StateTrieAt(blockNumber, params)
} }

View File

@ -44,12 +44,13 @@ var (
// Builder interface exposes the method for building a state diff between two blocks // Builder interface exposes the method for building a state diff between two blocks
type Builder interface { type Builder interface {
BuildStateDiffObject(args Args, params Params) (sd.StateObject, error) BuildStateDiffObject(args sd.Args, params sd.Params) (sd.StateObject, error)
BuildStateTrieObject(current *types.Block) (sd.StateObject, error) BuildStateTrieObject(current *types.Block) (sd.StateObject, error)
} }
type builder struct { type builder struct {
stateCache state.Database stateCache state.Database
numWorkers uint
} }
type iterPair struct { type iterPair struct {
@ -57,9 +58,13 @@ type iterPair struct {
} }
// NewBuilder is used to create a statediff builder // NewBuilder is used to create a statediff builder
func NewBuilder(stateCache state.Database) Builder { func NewBuilder(stateCache state.Database, workers uint) Builder {
if workers == 0 {
workers = 1
}
return &builder{ return &builder{
stateCache: stateCache, // state cache is safe for concurrent reads stateCache: stateCache, // state cache is safe for concurrent reads
numWorkers: workers,
} }
} }
@ -149,7 +154,7 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]sd.StateNode, error)
} }
// BuildStateDiff builds a statediff object from two blocks and the provided parameters // BuildStateDiff builds a statediff object from two blocks and the provided parameters
func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObject, error) { func (sdb *builder) BuildStateDiffObject(args sd.Args, params sd.Params) (sd.StateObject, error) {
if len(params.WatchedAddresses) > 0 { if len(params.WatchedAddresses) > 0 {
// if we are watching only specific accounts then we are only diffing leaf nodes // if we are watching only specific accounts then we are only diffing leaf nodes
log.Info("Ignoring intermediate state nodes because WatchedAddresses was passed") log.Info("Ignoring intermediate state nodes because WatchedAddresses was passed")
@ -165,18 +170,14 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje
if err != nil { if err != nil {
return sd.StateObject{}, fmt.Errorf("error creating trie for new state root: %v", err) return sd.StateObject{}, fmt.Errorf("error creating trie for new state root: %v", err)
} }
nWorkers := params.Workers
if nWorkers == 0 {
nWorkers = 1
}
// Split old and new tries into corresponding subtrie iterators // Split old and new tries into corresponding subtrie iterators
oldIterFac := iter.NewSubtrieIteratorFactory(oldTrie, nWorkers) oldIterFac := iter.NewSubtrieIteratorFactory(oldTrie, sdb.numWorkers)
newIterFac := iter.NewSubtrieIteratorFactory(newTrie, nWorkers) newIterFac := iter.NewSubtrieIteratorFactory(newTrie, sdb.numWorkers)
iterChan := make(chan []iterPair, nWorkers) iterChan := make(chan []iterPair, sdb.numWorkers)
// Create iterators ahead of time to avoid race condition in state.Trie access // Create iterators ahead of time to avoid race condition in state.Trie access
for i := uint(0); i < nWorkers; i++ { for i := uint(0); i < sdb.numWorkers; i++ {
// two state iterations per diff build // two state iterations per diff build
iterChan <- []iterPair{ iterChan <- []iterPair{
iterPair{ iterPair{
@ -193,7 +194,7 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje
nodeChan := make(chan []sd.StateNode) nodeChan := make(chan []sd.StateNode)
var wg sync.WaitGroup var wg sync.WaitGroup
for w := uint(0); w < nWorkers; w++ { for w := uint(0); w < sdb.numWorkers; w++ {
wg.Add(1) wg.Add(1)
go func(iterChan <-chan []iterPair) error { go func(iterChan <-chan []iterPair) error {
defer wg.Done() defer wg.Done()
@ -226,7 +227,7 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sd.StateObje
}, nil }, nil
} }
func (sdb *builder) buildStateDiff(args []iterPair, params Params) ([]sd.StateNode, error) { func (sdb *builder) buildStateDiff(args []iterPair, params sd.Params) ([]sd.StateNode, error) {
// collect a slice of all the intermediate nodes that were touched and exist at B // collect a slice of all the intermediate nodes that were touched and exist at B
// a map of their leafkey to all the accounts that were touched and exist at B // a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets // and a slice of all the paths for the nodes in both of the above sets

View File

@ -48,9 +48,15 @@ type IService interface {
// Main event loop for processing state diffs // Main event loop for processing state diffs
Loop(wg *sync.WaitGroup) Loop(wg *sync.WaitGroup)
// Method to get state diff object at specific block // Method to get state diff object at specific block
StateDiffAt(blockNumber uint64, params Params) (*sd.Payload, error) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// Method to get state trie object at specific block // Method to get state trie object at specific block
StateTrieAt(blockNumber uint64, params Params) (*sd.Payload, error) StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
}
// Server configuration
type Config struct {
// Number of goroutines to use
Workers uint
} }
// Service is the underlying struct for the state diffing service // Service is the underlying struct for the state diffing service
@ -64,10 +70,10 @@ type Service struct {
} }
// NewStateDiffService creates a new Service // NewStateDiffService creates a new Service
func NewStateDiffService(lvlDBReader lvlDBReader) (*Service, error) { func NewStateDiffService(lvlDBReader lvlDBReader, cfg Config) (*Service, error) {
return &Service{ return &Service{
lvlDBReader: lvlDBReader, lvlDBReader: lvlDBReader,
Builder: NewBuilder(lvlDBReader.StateDB()), Builder: NewBuilder(lvlDBReader.StateDB(), cfg.Workers),
QuitChan: make(chan bool), QuitChan: make(chan bool),
}, nil }, nil
} }
@ -104,7 +110,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) {
// StateDiffAt returns a state diff object payload at the specific blockheight // StateDiffAt returns a state diff object payload at the specific blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data // This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*sd.Payload, error) { func (sds *Service) StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) {
currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber)
if err != nil { if err != nil {
return nil, err return nil, err
@ -121,8 +127,8 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*sd.Payload,
} }
// processStateDiff method builds the state diff payload from the current block, parent state root, and provided params // processStateDiff method builds the state diff payload from the current block, parent state root, and provided params
func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params Params) (*sd.Payload, error) { func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params sd.Params) (*sd.Payload, error) {
stateDiff, err := sds.Builder.BuildStateDiffObject(Args{ stateDiff, err := sds.Builder.BuildStateDiffObject(sd.Args{
BlockHash: currentBlock.Hash(), BlockHash: currentBlock.Hash(),
BlockNumber: currentBlock.Number(), BlockNumber: currentBlock.Number(),
OldStateRoot: parentRoot, OldStateRoot: parentRoot,
@ -139,7 +145,7 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo
return sds.newPayload(stateDiffRlp, currentBlock, params) return sds.newPayload(stateDiffRlp, currentBlock, params)
} }
func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Params) (*sd.Payload, error) { func (sds *Service) newPayload(stateObject []byte, block *types.Block, params sd.Params) (*sd.Payload, error) {
payload := &sd.Payload{ payload := &sd.Payload{
StateObjectRlp: stateObject, StateObjectRlp: stateObject,
} }
@ -173,7 +179,7 @@ func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Pa
// StateTrieAt returns a state trie object payload at the specified blockheight // StateTrieAt returns a state trie object payload at the specified blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data // This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*sd.Payload, error) { func (sds *Service) StateTrieAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) {
currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber) currentBlock, err := sds.lvlDBReader.GetBlockByNumber(blockNumber)
if err != nil { if err != nil {
return nil, err return nil, err
@ -182,7 +188,7 @@ func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*sd.Payload,
return sds.processStateTrie(currentBlock, params) return sds.processStateTrie(currentBlock, params)
} }
func (sds *Service) processStateTrie(block *types.Block, params Params) (*sd.Payload, error) { func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd.Payload, error) {
stateNodes, err := sds.Builder.BuildStateTrieObject(block) stateNodes, err := sds.Builder.BuildStateTrieObject(block)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -20,37 +20,10 @@
package statediff package statediff
import ( import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
sd "github.com/ethereum/go-ethereum/statediff" sd "github.com/ethereum/go-ethereum/statediff"
) )
// Subscription struct holds our subscription channels
type Subscription struct {
PayloadChan chan<- sd.Payload
QuitChan chan<- bool
}
// Params is used to carry in parameters from subscribing/requesting clients configuration
type Params struct {
IntermediateStateNodes bool
IntermediateStorageNodes bool
IncludeBlock bool
IncludeReceipts bool
IncludeTD bool
WatchedAddresses []common.Address
WatchedStorageSlots []common.Hash
Workers uint
}
// Args bundles the arguments for the state diff builder
type Args struct {
OldStateRoot, NewStateRoot, BlockHash common.Hash
BlockNumber *big.Int
}
// AccountMap is a mapping of hex encoded path => account wrapper // AccountMap is a mapping of hex encoded path => account wrapper
type AccountMap map[string]accountWrapper type AccountMap map[string]accountWrapper