Retry aborted transaction when the deadlock is detected. #166

Merged
arijitAD merged 2 commits from retry-on-deadlock into v1.10.12-statediff 2021-12-14 20:43:55 +00:00

View File

@ -20,6 +20,7 @@ import (
"bytes"
"math/big"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -46,8 +47,12 @@ import (
. "github.com/ethereum/go-ethereum/statediff/types"
)
const chainEventChanSize = 20000
const genesisBlockNumber = 0
const (
chainEventChanSize = 20000
genesisBlockNumber = 0
defaultRetryLimit = 3 // default retry limit once deadlock is detected.
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
)
var writeLoopParams = Params{
IntermediateStateNodes: true,
@ -131,6 +136,8 @@ type Service struct {
enableWriteLoop bool
// Size of the worker pool
numWorkers uint
// Number of retry for aborted transactions due to deadlock.
maxRetry uint
}
// Wrap the cached last block for safe access from different service loops
@ -189,6 +196,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
indexer: indexer,
enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
}
stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs())
@ -270,7 +278,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
err := sds.writeStateDiff(currBlock, common.Hash{}, writeLoopParams)
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams)
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height",
genesisBlockNumber, "error", err.Error(), "worker", workerId)
@ -299,7 +307,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
}
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
continue
@ -638,7 +646,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root()
}
return sds.writeStateDiff(currentBlock, parentRoot, params)
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}
// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
@ -651,7 +659,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root()
}
return sds.writeStateDiff(currentBlock, parentRoot, params)
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}
// Writes a state diff from the current block, parent state root, and provided params
@ -691,3 +699,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
}
return nil
}
// Wrapper function on writeStateDiff to retry when the deadlock is detected.
func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error {
var err error
for i := uint(0); i < sds.maxRetry; i++ {
err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
// Retry only when the deadlock is detected.
continue
}
break
}
return err
}