statediff: use a worker pool (for direct writes)
This commit is contained in:
parent
72a47729bb
commit
02c7e785c5
@ -191,8 +191,17 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
} else {
|
||||
utils.Fatalf("Must specify client name for statediff DB output")
|
||||
}
|
||||
} else {
|
||||
if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) {
|
||||
utils.Fatalf("Must pass DB parameters if enabling statediff write loop")
|
||||
}
|
||||
utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name))
|
||||
}
|
||||
params := statediff.ServiceParams{
|
||||
DBParams: dbParams,
|
||||
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
|
||||
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
|
||||
}
|
||||
utils.RegisterStateDiffService(stack, backend, params)
|
||||
}
|
||||
|
||||
// Configure GraphQL if requested
|
||||
|
@ -162,6 +162,7 @@ var (
|
||||
utils.StateDiffDBNodeIDFlag,
|
||||
utils.StateDiffDBClientNameFlag,
|
||||
utils.StateDiffWritingFlag,
|
||||
utils.StateDiffWorkersFlag,
|
||||
configFileFlag,
|
||||
}
|
||||
|
||||
|
@ -746,6 +746,10 @@ var (
|
||||
Name: "statediff.writing",
|
||||
Usage: "Activates progressive writing of state diffs to database as new block are synced",
|
||||
}
|
||||
StateDiffWorkersFlag = cli.UintFlag{
|
||||
Name: "statediff.workers",
|
||||
Usage: "Number of concurrent workers to use during statediff processing (0 = 1)",
|
||||
}
|
||||
)
|
||||
|
||||
// MakeDataDir retrieves the currently requested data directory, terminating
|
||||
@ -1744,9 +1748,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C
|
||||
}
|
||||
|
||||
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
|
||||
// dbParams are: Postgres connection URI, Node ID, client name
|
||||
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) {
|
||||
if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil {
|
||||
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) {
|
||||
if err := statediff.New(stack, ethServ, params); err != nil {
|
||||
Fatalf("Failed to register the Statediff service: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -92,6 +92,15 @@ type IService interface {
|
||||
WriteLoop(chainEventCh chan core.ChainEvent)
|
||||
}
|
||||
|
||||
// Wraps consructor parameters
|
||||
type ServiceParams struct {
|
||||
DBParams *DBParams
|
||||
// Whether to enable writing state diffs directly to track blochain head
|
||||
EnableWriteLoop bool
|
||||
// Size of the worker pool
|
||||
NumWorkers uint
|
||||
}
|
||||
|
||||
// Service is the underlying struct for the state diffing service
|
||||
type Service struct {
|
||||
// Used to sync access to the Subscriptions
|
||||
@ -107,41 +116,56 @@ type Service struct {
|
||||
// A mapping of subscription params rlp hash to the corresponding subscription params
|
||||
SubscriptionTypes map[common.Hash]Params
|
||||
// Cache the last block so that we can avoid having to lookup the next block's parent
|
||||
lastBlock lastBlockCache
|
||||
lastBlock blockCache
|
||||
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
||||
subscribers int32
|
||||
// Interface for publishing statediffs as PG-IPLD objects
|
||||
indexer ind.Indexer
|
||||
// Whether to enable writing state diffs directly to track blochain head
|
||||
enableWriteLoop bool
|
||||
// Size of the worker pool
|
||||
numWorkers uint
|
||||
}
|
||||
|
||||
// Wrap the cached last block for safe access from different service loops
|
||||
type lastBlockCache struct {
|
||||
type blockCache struct {
|
||||
sync.Mutex
|
||||
block *types.Block
|
||||
blocks map[common.Hash]*types.Block
|
||||
maxSize uint
|
||||
}
|
||||
|
||||
func newBlockCache(max uint) blockCache {
|
||||
return blockCache{
|
||||
blocks: make(map[common.Hash]*types.Block),
|
||||
maxSize: max,
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new statediff.Service
|
||||
func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
|
||||
// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
|
||||
func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error {
|
||||
blockChain := ethServ.BlockChain()
|
||||
var indexer ind.Indexer
|
||||
if dbParams != nil {
|
||||
if params.DBParams != nil {
|
||||
info := nodeinfo.Info{
|
||||
GenesisBlock: blockChain.Genesis().Hash().Hex(),
|
||||
NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10),
|
||||
ChainID: blockChain.Config().ChainID.Uint64(),
|
||||
ID: dbParams.ID,
|
||||
ClientName: dbParams.ClientName,
|
||||
ID: params.DBParams.ID,
|
||||
ClientName: params.DBParams.ClientName,
|
||||
}
|
||||
|
||||
// TODO: pass max idle, open, lifetime?
|
||||
db, err := postgres.NewDB(dbParams.ConnectionURL, postgres.ConnectionConfig{}, info)
|
||||
db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
|
||||
}
|
||||
workers := params.NumWorkers
|
||||
if workers == 0 {
|
||||
workers = 1
|
||||
}
|
||||
sds := &Service{
|
||||
Mutex: sync.Mutex{},
|
||||
BlockChain: blockChain,
|
||||
@ -149,8 +173,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit
|
||||
QuitChan: make(chan bool),
|
||||
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
|
||||
SubscriptionTypes: make(map[common.Hash]Params),
|
||||
lastBlock: newBlockCache(workers),
|
||||
indexer: indexer,
|
||||
enableWriteLoop: enableWriteLoop,
|
||||
enableWriteLoop: params.EnableWriteLoop,
|
||||
numWorkers: workers,
|
||||
}
|
||||
stack.RegisterLifecycle(sds)
|
||||
stack.RegisterAPIs(sds.APIs())
|
||||
@ -174,16 +200,20 @@ func (sds *Service) APIs() []rpc.API {
|
||||
}
|
||||
}
|
||||
|
||||
func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block {
|
||||
// Return the parent block of currentBlock, using the cached block if available
|
||||
func (lbc *blockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block {
|
||||
lbc.Lock()
|
||||
parentHash := currentBlock.ParentHash()
|
||||
var parentBlock *types.Block
|
||||
if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) {
|
||||
parentBlock = lbc.block
|
||||
if block, ok := lbc.blocks[parentHash]; ok {
|
||||
parentBlock = block
|
||||
if len(lbc.blocks) > int(lbc.maxSize) {
|
||||
delete(lbc.blocks, parentHash)
|
||||
}
|
||||
} else {
|
||||
parentBlock = bc.GetBlockByHash(parentHash)
|
||||
}
|
||||
lbc.block = currentBlock
|
||||
lbc.blocks[currentBlock.Hash()] = currentBlock
|
||||
lbc.Unlock()
|
||||
return parentBlock
|
||||
}
|
||||
@ -417,13 +447,18 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
|
||||
func (sds *Service) Start() error {
|
||||
log.Info("Starting statediff service")
|
||||
|
||||
{
|
||||
// TODO: also use worker pool here?
|
||||
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
|
||||
go sds.Loop(chainEventCh)
|
||||
}
|
||||
|
||||
if sds.enableWriteLoop {
|
||||
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
|
||||
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
|
||||
go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan)
|
||||
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
|
||||
for worker := uint(0); worker < sds.numWorkers; worker++ {
|
||||
go sds.WriteLoop(chainEventCh)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user