statediff: Use a worker pool #43

Merged
telackey merged 10 commits from 41-statediff-workerpool into v1.9.24-statediff 2020-11-25 10:29:32 +00:00
8 changed files with 178 additions and 36 deletions

View File

@ -191,8 +191,17 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
} else { } else {
utils.Fatalf("Must specify client name for statediff DB output") utils.Fatalf("Must specify client name for statediff DB output")
} }
} else {
if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) {
utils.Fatalf("Must pass DB parameters if enabling statediff write loop")
} }
utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name)) }
params := statediff.ServiceParams{
DBParams: dbParams,
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
}
utils.RegisterStateDiffService(stack, backend, params)
} }
// Configure GraphQL if requested // Configure GraphQL if requested

View File

@ -162,6 +162,7 @@ var (
utils.StateDiffDBNodeIDFlag, utils.StateDiffDBNodeIDFlag,
utils.StateDiffDBClientNameFlag, utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag, utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
configFileFlag, configFileFlag,
} }

View File

@ -242,6 +242,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffDBNodeIDFlag, utils.StateDiffDBNodeIDFlag,
utils.StateDiffDBClientNameFlag, utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag, utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
}, },
}, },
{ {

View File

@ -746,6 +746,10 @@ var (
Name: "statediff.writing", Name: "statediff.writing",
Usage: "Activates progressive writing of state diffs to database as new block are synced", Usage: "Activates progressive writing of state diffs to database as new block are synced",
} }
StateDiffWorkersFlag = cli.UintFlag{
Name: "statediff.workers",
Usage: "Number of concurrent workers to use during statediff processing (0 = 1)",
}
) )
// MakeDataDir retrieves the currently requested data directory, terminating // MakeDataDir retrieves the currently requested data directory, terminating
@ -1744,9 +1748,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C
} }
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC // RegisterStateDiffService configures and registers a service to stream state diff data over RPC
// dbParams are: Postgres connection URI, Node ID, client name func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) {
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) { if err := statediff.New(stack, ethServ, params); err != nil {
if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil {
Fatalf("Failed to register the Statediff service: %v", err) Fatalf("Failed to register the Statediff service: %v", err)
} }
} }

View File

@ -8,7 +8,7 @@ import (
) )
const ( const (
indexerNamespace = "indexer" namespace = "statediff"
) )
// Build a fully qualified metric name // Build a fully qualified metric name
@ -16,9 +16,9 @@ func metricName(subsystem, name string) string {
if name == "" { if name == "" {
return "" return ""
} }
parts := []string{indexerNamespace, name} parts := []string{namespace, name}
if subsystem != "" { if subsystem != "" {
parts = []string{indexerNamespace, subsystem, name} parts = []string{namespace, subsystem, name}
} }
// Prometheus uses _ but geth metrics uses / and replaces // Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/") return strings.Join(parts, "/")
@ -57,7 +57,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
tTxAndRecProcessing: metrics.NewTimer(), tTxAndRecProcessing: metrics.NewTimer(),
tStateStoreCodeProcessing: metrics.NewTimer(), tStateStoreCodeProcessing: metrics.NewTimer(),
} }
subsys := "" // todo subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.blocks) reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions) reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts) reg.Register(metricName(subsys, "receipts"), ctx.receipts)

54
statediff/metrics.go Normal file
View File

@ -0,0 +1,54 @@
package statediff
import (
"strings"
"github.com/ethereum/go-ethereum/metrics"
)
const (
namespace = "statediff"
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
return ""
}
parts := []string{namespace, name}
if subsystem != "" {
parts = []string{namespace, subsystem, name}
}
// Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/")
}
type statediffMetricsHandles struct {
// Height of latest synced by core.BlockChain
// FIXME
lastSyncHeight metrics.Gauge
// Height of the latest block received from chainEvent channel
lastEventHeight metrics.Gauge
// Height of latest state diff
lastStatediffHeight metrics.Gauge
// Current length of chainEvent channels
serviceLoopChannelLen metrics.Gauge
writeLoopChannelLen metrics.Gauge
}
func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
ctx := statediffMetricsHandles{
lastSyncHeight: metrics.NewGauge(),
lastEventHeight: metrics.NewGauge(),
lastStatediffHeight: metrics.NewGauge(),
serviceLoopChannelLen: metrics.NewGauge(),
writeLoopChannelLen: metrics.NewGauge(),
}
subsys := "service"
reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight)
reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight)
reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight)
reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen)
reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen)
return ctx
}

View File

@ -22,7 +22,6 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -32,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -55,6 +55,8 @@ var writeLoopParams = Params{
IncludeCode: true, IncludeCode: true,
} }
var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
type blockChain interface { type blockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
GetBlockByHash(hash common.Hash) *types.Block GetBlockByHash(hash common.Hash) *types.Block
@ -89,6 +91,15 @@ type IService interface {
WriteLoop(chainEventCh chan core.ChainEvent) WriteLoop(chainEventCh chan core.ChainEvent)
} }
// Wraps consructor parameters
type ServiceParams struct {
DBParams *DBParams
// Whether to enable writing state diffs directly to track blochain head
EnableWriteLoop bool
// Size of the worker pool
NumWorkers uint
}
// Service is the underlying struct for the state diffing service // Service is the underlying struct for the state diffing service
type Service struct { type Service struct {
// Used to sync access to the Subscriptions // Used to sync access to the Subscriptions
@ -104,41 +115,56 @@ type Service struct {
// A mapping of subscription params rlp hash to the corresponding subscription params // A mapping of subscription params rlp hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]Params SubscriptionTypes map[common.Hash]Params
// Cache the last block so that we can avoid having to lookup the next block's parent // Cache the last block so that we can avoid having to lookup the next block's parent
lastBlock lastBlockCache BlockCache blockCache
Review

I like the params getting passed together like this!

I like the params getting passed together like this!
// Whether or not we have any subscribers; only if we do, do we processes state diffs // Whether or not we have any subscribers; only if we do, do we processes state diffs
subscribers int32 subscribers int32
// Interface for publishing statediffs as PG-IPLD objects // Interface for publishing statediffs as PG-IPLD objects
indexer ind.Indexer indexer ind.Indexer
// Whether to enable writing state diffs directly to track blochain head // Whether to enable writing state diffs directly to track blochain head
enableWriteLoop bool enableWriteLoop bool
// Size of the worker pool
numWorkers uint
} }
// Wrap the cached last block for safe access from different service loops // Wrap the cached last block for safe access from different service loops
type lastBlockCache struct { type blockCache struct {
sync.Mutex sync.Mutex
block *types.Block blocks map[common.Hash]*types.Block
maxSize uint
}
func NewBlockCache(max uint) blockCache {
return blockCache{
blocks: make(map[common.Hash]*types.Block),
maxSize: max,
}
} }
// New creates a new statediff.Service // New creates a new statediff.Service
func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { // func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error {
blockChain := ethServ.BlockChain() blockChain := ethServ.BlockChain()
var indexer ind.Indexer var indexer ind.Indexer
if dbParams != nil { if params.DBParams != nil {
info := nodeinfo.Info{ info := nodeinfo.Info{
GenesisBlock: blockChain.Genesis().Hash().Hex(), GenesisBlock: blockChain.Genesis().Hash().Hex(),
NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10), NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10),
ChainID: blockChain.Config().ChainID.Uint64(), ChainID: blockChain.Config().ChainID.Uint64(),
ID: dbParams.ID, ID: params.DBParams.ID,
ClientName: dbParams.ClientName, ClientName: params.DBParams.ClientName,
} }
// TODO: pass max idle, open, lifetime? // TODO: pass max idle, open, lifetime?
db, err := postgres.NewDB(dbParams.ConnectionURL, postgres.ConnectionConfig{}, info) db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info)
if err != nil { if err != nil {
return err return err
} }
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db) indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
} }
workers := params.NumWorkers
if workers == 0 {
workers = 1
}
sds := &Service{ sds := &Service{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},
BlockChain: blockChain, BlockChain: blockChain,
@ -146,8 +172,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params), SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers),
indexer: indexer, indexer: indexer,
enableWriteLoop: enableWriteLoop, enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers,
} }
stack.RegisterLifecycle(sds) stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs()) stack.RegisterAPIs(sds.APIs())
@ -171,46 +199,88 @@ func (sds *Service) APIs() []rpc.API {
} }
} }
func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { // Return the parent block of currentBlock, using the cached block if available;
// and cache the passed block
func (lbc *blockCache) getParentBlock(currentBlock *types.Block, bc blockChain) *types.Block {
lbc.Lock() lbc.Lock()
parentHash := currentBlock.ParentHash() parentHash := currentBlock.ParentHash()
var parentBlock *types.Block var parentBlock *types.Block
if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) { if block, ok := lbc.blocks[parentHash]; ok {
parentBlock = lbc.block parentBlock = block
if len(lbc.blocks) > int(lbc.maxSize) {
delete(lbc.blocks, parentHash)
}
} else { } else {
parentBlock = bc.GetBlockByHash(parentHash) parentBlock = bc.GetBlockByHash(parentHash)
} }
lbc.block = currentBlock lbc.blocks[currentBlock.Hash()] = currentBlock
lbc.Unlock() lbc.Unlock()
return parentBlock return parentBlock
} }
type workerParams struct {
chainEventCh <-chan core.ChainEvent
errCh <-chan error
wg *sync.WaitGroup
id uint
}
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe() defer chainEventSub.Unsubscribe()
errCh := chainEventSub.Err() errCh := chainEventSub.Err()
var wg sync.WaitGroup
// Process metrics for chain events, then forward to workers
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case chainEvent := <-chainEventCh:
statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64()))
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- chainEvent
case <-sds.QuitChan:
return
}
}
}()
wg.Add(int(sds.numWorkers))
for worker := uint(0); worker < sds.numWorkers; worker++ {
params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker}
go sds.writeLoopWorker(params)
}
wg.Wait()
}
func (sds *Service) writeLoopWorker(params workerParams) {
defer params.wg.Done()
for { for {
select { select {
//Notify chain event channel of events //Notify chain event channel of events
case chainEvent := <-chainEventCh: case chainEvent := <-params.chainEventCh:
log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) log.Debug("WriteLoop(): chain event received", "event", chainEvent)
currentBlock := chainEvent.Block currentBlock := chainEvent.Block
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
if parentBlock == nil { if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
continue continue
} }
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
if err != nil { if err != nil {
log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error()) log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
continue continue
} }
case err := <-errCh: // TODO: how to handle with concurrent workers
log.Warn("Error from chain event subscription", "error", err) statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
case err := <-params.errCh:
log.Warn("Error from chain event subscription", "error", err, "worker", params.id)
sds.close() sds.close()
return return
case <-sds.QuitChan: case <-sds.QuitChan:
log.Info("Quitting the statediff writing process") log.Info("Quitting the statediff writing process", "worker", params.id)
sds.close() sds.close()
return return
} }
@ -226,16 +296,17 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
select { select {
//Notify chain event channel of events //Notify chain event channel of events
case chainEvent := <-chainEventCh: case chainEvent := <-chainEventCh:
log.Debug("Event received from chainEventCh", "event", chainEvent) statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
log.Debug("Loop(): chain event received", "event", chainEvent)
// if we don't have any subscribers, do not process a statediff // if we don't have any subscribers, do not process a statediff
if atomic.LoadInt32(&sds.subscribers) == 0 { if atomic.LoadInt32(&sds.subscribers) == 0 {
log.Debug("Currently no subscribers to the statediffing service; processing is halted") log.Debug("Currently no subscribers to the statediffing service; processing is halted")
continue continue
} }
currentBlock := chainEvent.Block currentBlock := chainEvent.Block
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
if parentBlock == nil { if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number()) log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
continue continue
} }
sds.streamStateDiff(currentBlock, parentBlock.Root()) sds.streamStateDiff(currentBlock, parentBlock.Root())
@ -414,8 +485,8 @@ func (sds *Service) Start() error {
if sds.enableWriteLoop { if sds.enableWriteLoop {
log.Info("Starting statediff DB write loop", "params", writeLoopParams) log.Info("Starting statediff DB write loop", "params", writeLoopParams)
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan) go sds.WriteLoop(chainEventCh)
} }
return nil return nil
@ -473,7 +544,7 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- Cod
log.Info("sending code and codehash", "block height", blockNumber) log.Info("sending code and codehash", "block height", blockNumber)
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root()) currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
if err != nil { if err != nil {
log.Error("error creating trie for block", "number", current.Number(), "err", err) log.Error("error creating trie for block", "block height", current.Number(), "err", err)
close(quitChan) close(quitChan)
return return
} }
@ -521,7 +592,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// Writes a state diff from the current block, parent state root, and provided params // Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
log.Info("Writing state diff", "block height", block.Number().Uint64()) // log.Info("Writing state diff", "block height", block.Number().Uint64())
var totalDifficulty *big.Int var totalDifficulty *big.Int
var receipts types.Receipts var receipts types.Receipts
if params.IncludeTD { if params.IncludeTD {

View File

@ -94,6 +94,7 @@ func testErrorInChainEventLoop(t *testing.T) {
QuitChan: serviceQuit, QuitChan: serviceQuit,
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1),
} }
payloadChan := make(chan statediff.Payload, 2) payloadChan := make(chan statediff.Payload, 2)
quitChan := make(chan bool) quitChan := make(chan bool)
@ -177,6 +178,7 @@ func testErrorInBlockLoop(t *testing.T) {
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1),
} }
payloadChan := make(chan statediff.Payload) payloadChan := make(chan statediff.Payload)
quitChan := make(chan bool) quitChan := make(chan bool)
@ -256,6 +258,7 @@ func testErrorInStateDiffAt(t *testing.T) {
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1),
} }
stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams) stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams)
if err != nil { if err != nil {