diff --git a/cmd/geth/config.go b/cmd/geth/config.go index eab7f699c..bb4d2f2e2 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -191,8 +191,17 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } else { utils.Fatalf("Must specify client name for statediff DB output") } + } else { + if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) { + utils.Fatalf("Must pass DB parameters if enabling statediff write loop") + } } - utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name)) + params := statediff.ServiceParams{ + DBParams: dbParams, + EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name), + NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name), + } + utils.RegisterStateDiffService(stack, backend, params) } // Configure GraphQL if requested diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 0e657f636..a30720897 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -162,6 +162,7 @@ var ( utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, + utils.StateDiffWorkersFlag, configFileFlag, } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 6863364db..57481200a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -746,6 +746,10 @@ var ( Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", } + StateDiffWorkersFlag = cli.UintFlag{ + Name: "statediff.workers", + Usage: "Number of concurrent workers to use during statediff processing (0 = 1)", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1744,9 +1748,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C } // RegisterStateDiffService configures and registers a service to stream state diff data over RPC -// dbParams are: Postgres connection URI, Node ID, client name -func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) { - if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil { +func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) { + if err := statediff.New(stack, ethServ, params); err != nil { Fatalf("Failed to register the Statediff service: %v", err) } } diff --git a/statediff/service.go b/statediff/service.go index c3f270bb4..121279f2e 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -92,6 +92,15 @@ type IService interface { WriteLoop(chainEventCh chan core.ChainEvent) } +// Wraps consructor parameters +type ServiceParams struct { + DBParams *DBParams + // Whether to enable writing state diffs directly to track blochain head + EnableWriteLoop bool + // Size of the worker pool + NumWorkers uint +} + // Service is the underlying struct for the state diffing service type Service struct { // Used to sync access to the Subscriptions @@ -107,41 +116,56 @@ type Service struct { // A mapping of subscription params rlp hash to the corresponding subscription params SubscriptionTypes map[common.Hash]Params // Cache the last block so that we can avoid having to lookup the next block's parent - lastBlock lastBlockCache + lastBlock blockCache // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects indexer ind.Indexer // Whether to enable writing state diffs directly to track blochain head enableWriteLoop bool + // Size of the worker pool + numWorkers uint } // Wrap the cached last block for safe access from different service loops -type lastBlockCache struct { +type blockCache struct { sync.Mutex - block *types.Block + blocks map[common.Hash]*types.Block + maxSize uint +} + +func newBlockCache(max uint) blockCache { + return blockCache{ + blocks: make(map[common.Hash]*types.Block), + maxSize: max, + } } // New creates a new statediff.Service -func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error { blockChain := ethServ.BlockChain() var indexer ind.Indexer - if dbParams != nil { + if params.DBParams != nil { info := nodeinfo.Info{ GenesisBlock: blockChain.Genesis().Hash().Hex(), NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10), ChainID: blockChain.Config().ChainID.Uint64(), - ID: dbParams.ID, - ClientName: dbParams.ClientName, + ID: params.DBParams.ID, + ClientName: params.DBParams.ClientName, } // TODO: pass max idle, open, lifetime? - db, err := postgres.NewDB(dbParams.ConnectionURL, postgres.ConnectionConfig{}, info) + db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info) if err != nil { return err } indexer = ind.NewStateDiffIndexer(blockChain.Config(), db) } + workers := params.NumWorkers + if workers == 0 { + workers = 1 + } sds := &Service{ Mutex: sync.Mutex{}, BlockChain: blockChain, @@ -149,8 +173,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), + lastBlock: newBlockCache(workers), indexer: indexer, - enableWriteLoop: enableWriteLoop, + enableWriteLoop: params.EnableWriteLoop, + numWorkers: workers, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -174,16 +200,20 @@ func (sds *Service) APIs() []rpc.API { } } -func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { +// Return the parent block of currentBlock, using the cached block if available +func (lbc *blockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { lbc.Lock() parentHash := currentBlock.ParentHash() var parentBlock *types.Block - if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) { - parentBlock = lbc.block + if block, ok := lbc.blocks[parentHash]; ok { + parentBlock = block + if len(lbc.blocks) > int(lbc.maxSize) { + delete(lbc.blocks, parentHash) + } } else { parentBlock = bc.GetBlockByHash(parentHash) } - lbc.block = currentBlock + lbc.blocks[currentBlock.Hash()] = currentBlock lbc.Unlock() return parentBlock } @@ -417,13 +447,18 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { func (sds *Service) Start() error { log.Info("Starting statediff service") - chainEventCh := make(chan core.ChainEvent, chainEventChanSize) - go sds.Loop(chainEventCh) + { + // TODO: also use worker pool here? + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + go sds.Loop(chainEventCh) + } if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) - go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) - go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan) + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + for worker := uint(0); worker < sds.numWorkers; worker++ { + go sds.WriteLoop(chainEventCh) + } } return nil