statediff WriteLoop service + CLI flag

This commit is contained in:
Roy Crihfield 2020-10-28 14:33:11 +08:00
parent 20dcefa703
commit 1984446ad2
5 changed files with 84 additions and 14 deletions

View File

@ -174,7 +174,7 @@ func makeFullNode(ctx *cli.Context) *node.Node {
utils.Fatalf("Must specify client name for statediff DB output")
}
}
utils.RegisterStateDiffService(stack, dbParams)
utils.RegisterStateDiffService(stack, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name))
}
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode

View File

@ -150,6 +150,7 @@ var (
utils.StateDiffDBFlag,
utils.StateDiffDBNodeIDFlag,
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
configFileFlag,
}

View File

@ -258,6 +258,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.StateDiffDBFlag,
utils.StateDiffDBNodeIDFlag,
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
},
},
{

View File

@ -763,7 +763,7 @@ var (
}
StateDiffDBFlag = cli.StringFlag{
Name: "statediff.db",
Usage: "Postgres database connection string for writing state diffs",
Usage: "PostgreSQL database connection string for writing state diffs",
}
StateDiffDBNodeIDFlag = cli.StringFlag{
Name: "statediff.dbnodeid",
@ -773,6 +773,10 @@ var (
Name: "statediff.dbclientname",
Usage: "Client name to use when writing state diffs to database",
}
StateDiffWritingFlag = cli.BoolFlag{
Name: "statediff.writing",
Usage: "Activates progressive writing of state diffs to database as new block are synced",
}
)
// MakeDataDir retrieves the currently requested data directory, terminating
@ -1646,14 +1650,14 @@ func RegisterGraphQLService(stack *node.Node, endpoint string, cors, vhosts []st
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
// dbParams are: Postgres connection URI, Node ID, client name
func RegisterStateDiffService(stack *node.Node, dbParams *[3]string) {
func RegisterStateDiffService(stack *node.Node, dbParams *[3]string, startWriteLoop bool) {
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
var ethServ *eth.Ethereum
err := ctx.Service(&ethServ)
if err != nil {
return nil, err
}
return statediff.NewStateDiffService(ethServ, dbParams)
return statediff.NewStateDiffService(ethServ, dbParams, startWriteLoop)
}); err != nil {
Fatalf("Failed to register State Diff Service", err)
}

View File

@ -47,6 +47,15 @@ import (
const chainEventChanSize = 20000
var writeLoopParams = Params{
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeCode: true,
}
type blockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
GetBlockByHash(hash common.Hash) *types.Block
@ -75,6 +84,8 @@ type IService interface {
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool)
// Method to write state diff object directly to DB
WriteStateDiffAt(blockNumber uint64, params Params) error
// Event loop for progressively processing and writing diffs directly to DB
WriteLoop(chainEventCh chan core.ChainEvent)
}
// Service is the underlying struct for the state diffing service
@ -92,15 +103,23 @@ type Service struct {
// A mapping of subscription params rlp hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]Params
// Cache the last block so that we can avoid having to lookup the next block's parent
lastBlock *types.Block
lastBlock lastBlockCache
// Whether or not we have any subscribers; only if we do, do we processes state diffs
subscribers int32
// Interface for publishing statediffs as PG-IPLD objects
indexer ind.Indexer
// Whether to enable writing state diffs directly to track blochain head
enableWriteLoop bool
}
// Wrap the cached last block for safe access from different service loops
type lastBlockCache struct {
sync.Mutex
block *types.Block
}
// NewStateDiffService creates a new statediff.Service
func NewStateDiffService(ethServ *eth.Ethereum, dbParams *[3]string) (*Service, error) {
func NewStateDiffService(ethServ *eth.Ethereum, dbParams *[3]string, enableWriteLoop bool) (*Service, error) {
blockChain := ethServ.BlockChain()
var indexer ind.Indexer
if dbParams != nil {
@ -130,6 +149,7 @@ func NewStateDiffService(ethServ *eth.Ethereum, dbParams *[3]string) (*Service,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
indexer: indexer,
enableWriteLoop: enableWriteLoop,
}, nil
}
@ -150,6 +170,52 @@ func (sds *Service) APIs() []rpc.API {
}
}
func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block {
lbc.Lock()
parentHash := currentBlock.ParentHash()
var parentBlock *types.Block
if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) {
parentBlock = lbc.block
} else {
parentBlock = bc.GetBlockByHash(parentHash)
}
lbc.block = currentBlock
lbc.Unlock()
return parentBlock
}
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe()
errCh := chainEventSub.Err()
for {
select {
//Notify chain event channel of events
case chainEvent := <-chainEventCh:
log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent)
currentBlock := chainEvent.Block
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain)
if parentBlock == nil {
log.Error(fmt.Sprintf("Parent block is nil, skipping this block (%d)", currentBlock.Number()))
continue
}
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
if err != nil {
log.Error(fmt.Sprintf("statediff (DB write) processing error at blockheight %d: err: %s", currentBlock.Number().Uint64(), err.Error()))
continue
}
case err := <-errCh:
log.Warn("Error from chain event subscription", "error", err)
sds.close()
return
case <-sds.QuitChan:
log.Info("Quitting the statediffing process")
sds.close()
return
}
}
}
// Loop is the main processing method
func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
@ -166,14 +232,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
continue
}
currentBlock := chainEvent.Block
parentHash := currentBlock.ParentHash()
var parentBlock *types.Block
if sds.lastBlock != nil && bytes.Equal(sds.lastBlock.Hash().Bytes(), currentBlock.ParentHash().Bytes()) {
parentBlock = sds.lastBlock
} else {
parentBlock = sds.BlockChain.GetBlockByHash(parentHash)
}
sds.lastBlock = currentBlock
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain)
if parentBlock == nil {
log.Error(fmt.Sprintf("Parent block is nil, skipping this block (%d)", currentBlock.Number()))
continue
@ -352,6 +411,11 @@ func (sds *Service) Start(*p2p.Server) error {
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.Loop(chainEventCh)
if sds.enableWriteLoop {
log.Debug("Starting statediff DB write loop (parameters: %+v)", writeLoopParams)
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
}
return nil
}