diff --git a/cmd/geth/config.go b/cmd/geth/config.go index eab7f699c..bb4d2f2e2 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -191,8 +191,17 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } else { utils.Fatalf("Must specify client name for statediff DB output") } + } else { + if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) { + utils.Fatalf("Must pass DB parameters if enabling statediff write loop") + } } - utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name)) + params := statediff.ServiceParams{ + DBParams: dbParams, + EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name), + NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name), + } + utils.RegisterStateDiffService(stack, backend, params) } // Configure GraphQL if requested diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 0e657f636..a30720897 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -162,6 +162,7 @@ var ( utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, + utils.StateDiffWorkersFlag, configFileFlag, } diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index e3b52daa1..3acd40145 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -242,6 +242,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, + utils.StateDiffWorkersFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 6863364db..57481200a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -746,6 +746,10 @@ var ( Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", } + StateDiffWorkersFlag = cli.UintFlag{ + Name: "statediff.workers", + Usage: "Number of concurrent workers to use during statediff processing (0 = 1)", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1744,9 +1748,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C } // RegisterStateDiffService configures and registers a service to stream state diff data over RPC -// dbParams are: Postgres connection URI, Node ID, client name -func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) { - if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil { +func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) { + if err := statediff.New(stack, ethServ, params); err != nil { Fatalf("Failed to register the Statediff service: %v", err) } } diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go index 5ee3426a3..fc0727eda 100644 --- a/statediff/indexer/metrics.go +++ b/statediff/indexer/metrics.go @@ -8,7 +8,7 @@ import ( ) const ( - indexerNamespace = "indexer" + namespace = "statediff" ) // Build a fully qualified metric name @@ -16,9 +16,9 @@ func metricName(subsystem, name string) string { if name == "" { return "" } - parts := []string{indexerNamespace, name} + parts := []string{namespace, name} if subsystem != "" { - parts = []string{indexerNamespace, subsystem, name} + parts = []string{namespace, subsystem, name} } // Prometheus uses _ but geth metrics uses / and replaces return strings.Join(parts, "/") @@ -57,7 +57,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { tTxAndRecProcessing: metrics.NewTimer(), tStateStoreCodeProcessing: metrics.NewTimer(), } - subsys := "" // todo + subsys := "indexer" reg.Register(metricName(subsys, "blocks"), ctx.blocks) reg.Register(metricName(subsys, "transactions"), ctx.transactions) reg.Register(metricName(subsys, "receipts"), ctx.receipts) diff --git a/statediff/metrics.go b/statediff/metrics.go new file mode 100644 index 000000000..7e7d6e328 --- /dev/null +++ b/statediff/metrics.go @@ -0,0 +1,54 @@ +package statediff + +import ( + "strings" + + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + namespace = "statediff" +) + +// Build a fully qualified metric name +func metricName(subsystem, name string) string { + if name == "" { + return "" + } + parts := []string{namespace, name} + if subsystem != "" { + parts = []string{namespace, subsystem, name} + } + // Prometheus uses _ but geth metrics uses / and replaces + return strings.Join(parts, "/") +} + +type statediffMetricsHandles struct { + // Height of latest synced by core.BlockChain + // FIXME + lastSyncHeight metrics.Gauge + // Height of the latest block received from chainEvent channel + lastEventHeight metrics.Gauge + // Height of latest state diff + lastStatediffHeight metrics.Gauge + // Current length of chainEvent channels + serviceLoopChannelLen metrics.Gauge + writeLoopChannelLen metrics.Gauge +} + +func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { + ctx := statediffMetricsHandles{ + lastSyncHeight: metrics.NewGauge(), + lastEventHeight: metrics.NewGauge(), + lastStatediffHeight: metrics.NewGauge(), + serviceLoopChannelLen: metrics.NewGauge(), + writeLoopChannelLen: metrics.NewGauge(), + } + subsys := "service" + reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) + reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight) + reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) + reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen) + reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen) + return ctx +} diff --git a/statediff/service.go b/statediff/service.go index cee137bf5..2f16fc4ab 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -22,7 +22,6 @@ import ( "strconv" "sync" "sync/atomic" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -32,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -55,6 +55,8 @@ var writeLoopParams = Params{ IncludeCode: true, } +var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry) + type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block @@ -89,6 +91,15 @@ type IService interface { WriteLoop(chainEventCh chan core.ChainEvent) } +// Wraps consructor parameters +type ServiceParams struct { + DBParams *DBParams + // Whether to enable writing state diffs directly to track blochain head + EnableWriteLoop bool + // Size of the worker pool + NumWorkers uint +} + // Service is the underlying struct for the state diffing service type Service struct { // Used to sync access to the Subscriptions @@ -104,41 +115,56 @@ type Service struct { // A mapping of subscription params rlp hash to the corresponding subscription params SubscriptionTypes map[common.Hash]Params // Cache the last block so that we can avoid having to lookup the next block's parent - lastBlock lastBlockCache + BlockCache blockCache // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects indexer ind.Indexer // Whether to enable writing state diffs directly to track blochain head enableWriteLoop bool + // Size of the worker pool + numWorkers uint } // Wrap the cached last block for safe access from different service loops -type lastBlockCache struct { +type blockCache struct { sync.Mutex - block *types.Block + blocks map[common.Hash]*types.Block + maxSize uint +} + +func NewBlockCache(max uint) blockCache { + return blockCache{ + blocks: make(map[common.Hash]*types.Block), + maxSize: max, + } } // New creates a new statediff.Service -func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error { blockChain := ethServ.BlockChain() var indexer ind.Indexer - if dbParams != nil { + if params.DBParams != nil { info := nodeinfo.Info{ GenesisBlock: blockChain.Genesis().Hash().Hex(), NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10), ChainID: blockChain.Config().ChainID.Uint64(), - ID: dbParams.ID, - ClientName: dbParams.ClientName, + ID: params.DBParams.ID, + ClientName: params.DBParams.ClientName, } // TODO: pass max idle, open, lifetime? - db, err := postgres.NewDB(dbParams.ConnectionURL, postgres.ConnectionConfig{}, info) + db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info) if err != nil { return err } indexer = ind.NewStateDiffIndexer(blockChain.Config(), db) } + workers := params.NumWorkers + if workers == 0 { + workers = 1 + } sds := &Service{ Mutex: sync.Mutex{}, BlockChain: blockChain, @@ -146,8 +172,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), indexer: indexer, - enableWriteLoop: enableWriteLoop, + enableWriteLoop: params.EnableWriteLoop, + numWorkers: workers, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -171,46 +199,88 @@ func (sds *Service) APIs() []rpc.API { } } -func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { +// Return the parent block of currentBlock, using the cached block if available; +// and cache the passed block +func (lbc *blockCache) getParentBlock(currentBlock *types.Block, bc blockChain) *types.Block { lbc.Lock() parentHash := currentBlock.ParentHash() var parentBlock *types.Block - if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) { - parentBlock = lbc.block + if block, ok := lbc.blocks[parentHash]; ok { + parentBlock = block + if len(lbc.blocks) > int(lbc.maxSize) { + delete(lbc.blocks, parentHash) + } } else { parentBlock = bc.GetBlockByHash(parentHash) } - lbc.block = currentBlock + lbc.blocks[currentBlock.Hash()] = currentBlock lbc.Unlock() return parentBlock } +type workerParams struct { + chainEventCh <-chan core.ChainEvent + errCh <-chan error + wg *sync.WaitGroup + id uint +} + func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() + var wg sync.WaitGroup + // Process metrics for chain events, then forward to workers + chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case chainEvent := <-chainEventCh: + statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) + statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) + chainEventFwd <- chainEvent + case <-sds.QuitChan: + return + } + } + }() + wg.Add(int(sds.numWorkers)) + for worker := uint(0); worker < sds.numWorkers; worker++ { + params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker} + go sds.writeLoopWorker(params) + } + wg.Wait() +} + +func (sds *Service) writeLoopWorker(params workerParams) { + defer params.wg.Done() for { select { //Notify chain event channel of events - case chainEvent := <-chainEventCh: - log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) + case chainEvent := <-params.chainEventCh: + log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent.Block - parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue } + log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) if err != nil { - log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error()) + log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) continue } - case err := <-errCh: - log.Warn("Error from chain event subscription", "error", err) + // TODO: how to handle with concurrent workers + statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) + case err := <-params.errCh: + log.Warn("Error from chain event subscription", "error", err, "worker", params.id) sds.close() return case <-sds.QuitChan: - log.Info("Quitting the statediff writing process") + log.Info("Quitting the statediff writing process", "worker", params.id) sds.close() return } @@ -226,16 +296,17 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: - log.Debug("Event received from chainEventCh", "event", chainEvent) + statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) + log.Debug("Loop(): chain event received", "event", chainEvent) // if we don't have any subscribers, do not process a statediff if atomic.LoadInt32(&sds.subscribers) == 0 { log.Debug("Currently no subscribers to the statediffing service; processing is halted") continue } currentBlock := chainEvent.Block - parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { - log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number()) + log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue } sds.streamStateDiff(currentBlock, parentBlock.Root()) @@ -414,8 +485,8 @@ func (sds *Service) Start() error { if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) - go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) - go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan) + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + go sds.WriteLoop(chainEventCh) } return nil @@ -473,7 +544,7 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- Cod log.Info("sending code and codehash", "block height", blockNumber) currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root()) if err != nil { - log.Error("error creating trie for block", "number", current.Number(), "err", err) + log.Error("error creating trie for block", "block height", current.Number(), "err", err) close(quitChan) return } @@ -521,7 +592,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { - log.Info("Writing state diff", "block height", block.Number().Uint64()) + // log.Info("Writing state diff", "block height", block.Number().Uint64()) var totalDifficulty *big.Int var receipts types.Receipts if params.IncludeTD { diff --git a/statediff/service_test.go b/statediff/service_test.go index ef3c1bb2c..ca9a483a5 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -94,6 +94,7 @@ func testErrorInChainEventLoop(t *testing.T) { QuitChan: serviceQuit, Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } payloadChan := make(chan statediff.Payload, 2) quitChan := make(chan bool) @@ -177,6 +178,7 @@ func testErrorInBlockLoop(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } payloadChan := make(chan statediff.Payload) quitChan := make(chan bool) @@ -256,6 +258,7 @@ func testErrorInStateDiffAt(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams) if err != nil {