Retry aborted transaction when the deadlock is detected. #166

Merged
arijitAD merged 2 commits from retry-on-deadlock into v1.10.12-statediff 2021-12-14 20:43:55 +00:00

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"math/big" "math/big"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -46,8 +47,12 @@ import (
. "github.com/ethereum/go-ethereum/statediff/types" . "github.com/ethereum/go-ethereum/statediff/types"
) )
const chainEventChanSize = 20000 const (
const genesisBlockNumber = 0 chainEventChanSize = 20000
genesisBlockNumber = 0
defaultRetryLimit = 3 // default retry limit once deadlock is detected.
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
)
var writeLoopParams = Params{ var writeLoopParams = Params{
IntermediateStateNodes: true, IntermediateStateNodes: true,
@ -131,6 +136,8 @@ type Service struct {
enableWriteLoop bool enableWriteLoop bool
// Size of the worker pool // Size of the worker pool
numWorkers uint numWorkers uint
// Number of retry for aborted transactions due to deadlock.
maxRetry uint
} }
// Wrap the cached last block for safe access from different service loops // Wrap the cached last block for safe access from different service loops
@ -189,6 +196,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
indexer: indexer, indexer: indexer,
enableWriteLoop: params.EnableWriteLoop, enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers, numWorkers: workers,
maxRetry: defaultRetryLimit,
} }
stack.RegisterLifecycle(sds) stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs()) stack.RegisterAPIs(sds.APIs())
@ -270,7 +278,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie. // For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId) log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
err := sds.writeStateDiff(currBlock, common.Hash{}, writeLoopParams) err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams)
if err != nil { if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", log.Error("statediff.Service.WriteLoop: processing error", "block height",
genesisBlockNumber, "error", err.Error(), "worker", workerId) genesisBlockNumber, "error", err.Error(), "worker", workerId)
@ -299,7 +307,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
} }
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
if err != nil { if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
continue continue
@ -638,7 +646,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()) parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root() parentRoot = parentBlock.Root()
} }
return sds.writeStateDiff(currentBlock, parentRoot, params) return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
} }
// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database // WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
@ -651,7 +659,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()) parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root() parentRoot = parentBlock.Root()
} }
return sds.writeStateDiff(currentBlock, parentRoot, params) return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
} }
// Writes a state diff from the current block, parent state root, and provided params // Writes a state diff from the current block, parent state root, and provided params
@ -691,3 +699,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
} }
return nil return nil
} }
// Wrapper function on writeStateDiff to retry when the deadlock is detected.
func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error {
var err error
for i := uint(0); i < sds.maxRetry; i++ {
err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
// Retry only when the deadlock is detected.
continue
}
break
}
return err
}