diff --git a/statediff/service.go b/statediff/service.go index 1154e4750..6411ba68e 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -20,6 +20,7 @@ import ( "bytes" "math/big" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -46,8 +47,12 @@ import ( . "github.com/ethereum/go-ethereum/statediff/types" ) -const chainEventChanSize = 20000 -const genesisBlockNumber = 0 +const ( + chainEventChanSize = 20000 + genesisBlockNumber = 0 + defaultRetryLimit = 3 // default retry limit once deadlock is detected. + deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html +) var writeLoopParams = Params{ IntermediateStateNodes: true, @@ -131,6 +136,8 @@ type Service struct { enableWriteLoop bool // Size of the worker pool numWorkers uint + // Number of retry for aborted transactions due to deadlock. + maxRetry uint } // Wrap the cached last block for safe access from different service loops @@ -189,6 +196,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params indexer: indexer, enableWriteLoop: params.EnableWriteLoop, numWorkers: workers, + maxRetry: defaultRetryLimit, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -270,7 +278,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) { // For genesis block we need to return the entire state trie hence we diff it with an empty trie. log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId) - err := sds.writeStateDiff(currBlock, common.Hash{}, writeLoopParams) + err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams) if err != nil { log.Error("statediff.Service.WriteLoop: processing error", "block height", genesisBlockNumber, "error", err.Error(), "worker", workerId) @@ -299,7 +307,7 @@ func (sds *Service) writeLoopWorker(params workerParams) { } log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) - err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) + err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams) if err != nil { log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) continue @@ -638,7 +646,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()) parentRoot = parentBlock.Root() } - return sds.writeStateDiff(currentBlock, parentRoot, params) + return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) } // WriteStateDiffFor writes a state diff for the specific blockhash directly to the database @@ -651,7 +659,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()) parentRoot = parentBlock.Root() } - return sds.writeStateDiff(currentBlock, parentRoot, params) + return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) } // Writes a state diff from the current block, parent state root, and provided params @@ -691,3 +699,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p } return nil } + +// Wrapper function on writeStateDiff to retry when the deadlock is detected. +func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error { + var err error + for i := uint(0); i < sds.maxRetry; i++ { + err = sds.writeStateDiff(block, parentRoot, params) + if err != nil && strings.Contains(err.Error(), deadlockDetected) { + // Retry only when the deadlock is detected. + continue + } + break + } + return err +}