diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 8d5f2ad17..dcf3a80f1 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -174,7 +174,7 @@ func makeFullNode(ctx *cli.Context) *node.Node { utils.Fatalf("Must specify client name for statediff DB output") } } - utils.RegisterStateDiffService(stack, dbParams) + utils.RegisterStateDiffService(stack, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name)) } // Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode diff --git a/cmd/geth/main.go b/cmd/geth/main.go index a34cf0190..b420f2e3c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -150,6 +150,7 @@ var ( utils.StateDiffDBFlag, utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, + utils.StateDiffWritingFlag, configFileFlag, } diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 28dd44861..4751a1763 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -258,6 +258,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.StateDiffDBFlag, utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, + utils.StateDiffWritingFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 4efbfa197..b0e6f5930 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -763,7 +763,7 @@ var ( } StateDiffDBFlag = cli.StringFlag{ Name: "statediff.db", - Usage: "Postgres database connection string for writing state diffs", + Usage: "PostgreSQL database connection string for writing state diffs", } StateDiffDBNodeIDFlag = cli.StringFlag{ Name: "statediff.dbnodeid", @@ -773,6 +773,10 @@ var ( Name: "statediff.dbclientname", Usage: "Client name to use when writing state diffs to database", } + StateDiffWritingFlag = cli.BoolFlag{ + Name: "statediff.writing", + Usage: "Activates progressive writing of state diffs to database as new block are synced", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1646,14 +1650,14 @@ func RegisterGraphQLService(stack *node.Node, endpoint string, cors, vhosts []st // RegisterStateDiffService configures and registers a service to stream state diff data over RPC // dbParams are: Postgres connection URI, Node ID, client name -func RegisterStateDiffService(stack *node.Node, dbParams *[3]string) { +func RegisterStateDiffService(stack *node.Node, dbParams *[3]string, startWriteLoop bool) { if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { var ethServ *eth.Ethereum err := ctx.Service(ðServ) if err != nil { return nil, err } - return statediff.NewStateDiffService(ethServ, dbParams) + return statediff.NewStateDiffService(ethServ, dbParams, startWriteLoop) }); err != nil { Fatalf("Failed to register State Diff Service", err) } diff --git a/statediff/service.go b/statediff/service.go index c382f0b91..89ed39cfe 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -47,6 +47,15 @@ import ( const chainEventChanSize = 20000 +var writeLoopParams = Params{ + IntermediateStateNodes: true, + IntermediateStorageNodes: true, + IncludeBlock: true, + IncludeReceipts: true, + IncludeTD: true, + IncludeCode: true, +} + type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block @@ -75,6 +84,8 @@ type IService interface { StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool) // Method to write state diff object directly to DB WriteStateDiffAt(blockNumber uint64, params Params) error + // Event loop for progressively processing and writing diffs directly to DB + WriteLoop(chainEventCh chan core.ChainEvent) } // Service is the underlying struct for the state diffing service @@ -92,15 +103,23 @@ type Service struct { // A mapping of subscription params rlp hash to the corresponding subscription params SubscriptionTypes map[common.Hash]Params // Cache the last block so that we can avoid having to lookup the next block's parent - lastBlock *types.Block + lastBlock lastBlockCache // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects indexer ind.Indexer + // Whether to enable writing state diffs directly to track blochain head + enableWriteLoop bool +} + +// Wrap the cached last block for safe access from different service loops +type lastBlockCache struct { + sync.Mutex + block *types.Block } // NewStateDiffService creates a new statediff.Service -func NewStateDiffService(ethServ *eth.Ethereum, dbParams *[3]string) (*Service, error) { +func NewStateDiffService(ethServ *eth.Ethereum, dbParams *[3]string, enableWriteLoop bool) (*Service, error) { blockChain := ethServ.BlockChain() var indexer ind.Indexer if dbParams != nil { @@ -130,6 +149,7 @@ func NewStateDiffService(ethServ *eth.Ethereum, dbParams *[3]string) (*Service, Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), indexer: indexer, + enableWriteLoop: enableWriteLoop, }, nil } @@ -150,6 +170,52 @@ func (sds *Service) APIs() []rpc.API { } } +func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { + lbc.Lock() + parentHash := currentBlock.ParentHash() + var parentBlock *types.Block + if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) { + parentBlock = lbc.block + } else { + parentBlock = bc.GetBlockByHash(parentHash) + } + lbc.block = currentBlock + lbc.Unlock() + return parentBlock +} + +func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { + chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) + defer chainEventSub.Unsubscribe() + errCh := chainEventSub.Err() + for { + select { + //Notify chain event channel of events + case chainEvent := <-chainEventCh: + log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) + currentBlock := chainEvent.Block + parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) + if parentBlock == nil { + log.Error(fmt.Sprintf("Parent block is nil, skipping this block (%d)", currentBlock.Number())) + continue + } + err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) + if err != nil { + log.Error(fmt.Sprintf("statediff (DB write) processing error at blockheight %d: err: %s", currentBlock.Number().Uint64(), err.Error())) + continue + } + case err := <-errCh: + log.Warn("Error from chain event subscription", "error", err) + sds.close() + return + case <-sds.QuitChan: + log.Info("Quitting the statediffing process") + sds.close() + return + } + } +} + // Loop is the main processing method func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) @@ -166,14 +232,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { continue } currentBlock := chainEvent.Block - parentHash := currentBlock.ParentHash() - var parentBlock *types.Block - if sds.lastBlock != nil && bytes.Equal(sds.lastBlock.Hash().Bytes(), currentBlock.ParentHash().Bytes()) { - parentBlock = sds.lastBlock - } else { - parentBlock = sds.BlockChain.GetBlockByHash(parentHash) - } - sds.lastBlock = currentBlock + parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error(fmt.Sprintf("Parent block is nil, skipping this block (%d)", currentBlock.Number())) continue @@ -352,6 +411,11 @@ func (sds *Service) Start(*p2p.Server) error { chainEventCh := make(chan core.ChainEvent, chainEventChanSize) go sds.Loop(chainEventCh) + if sds.enableWriteLoop { + log.Debug("Starting statediff DB write loop (parameters: %+v)", writeLoopParams) + go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) + } + return nil }