Retry aborted transaction when the deadlock is detected. #166
@ -20,6 +20,7 @@ import (
|
||||
"bytes"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -46,8 +47,12 @@ import (
|
||||
. "github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
const chainEventChanSize = 20000
|
||||
const genesisBlockNumber = 0
|
||||
const (
|
||||
chainEventChanSize = 20000
|
||||
genesisBlockNumber = 0
|
||||
defaultRetryLimit = 3 // default retry limit once deadlock is detected.
|
||||
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
|
||||
)
|
||||
|
||||
var writeLoopParams = Params{
|
||||
IntermediateStateNodes: true,
|
||||
@ -131,6 +136,8 @@ type Service struct {
|
||||
enableWriteLoop bool
|
||||
// Size of the worker pool
|
||||
numWorkers uint
|
||||
// Number of retry for aborted transactions due to deadlock.
|
||||
maxRetry uint
|
||||
}
|
||||
|
||||
// Wrap the cached last block for safe access from different service loops
|
||||
@ -189,6 +196,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
||||
indexer: indexer,
|
||||
enableWriteLoop: params.EnableWriteLoop,
|
||||
numWorkers: workers,
|
||||
maxRetry: defaultRetryLimit,
|
||||
}
|
||||
stack.RegisterLifecycle(sds)
|
||||
stack.RegisterAPIs(sds.APIs())
|
||||
@ -270,7 +278,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
|
||||
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
|
||||
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
|
||||
err := sds.writeStateDiff(currBlock, common.Hash{}, writeLoopParams)
|
||||
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams)
|
||||
if err != nil {
|
||||
log.Error("statediff.Service.WriteLoop: processing error", "block height",
|
||||
genesisBlockNumber, "error", err.Error(), "worker", workerId)
|
||||
@ -299,7 +307,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
}
|
||||
|
||||
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
||||
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
|
||||
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
|
||||
if err != nil {
|
||||
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
||||
continue
|
||||
@ -638,7 +646,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
||||
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
|
||||
parentRoot = parentBlock.Root()
|
||||
}
|
||||
return sds.writeStateDiff(currentBlock, parentRoot, params)
|
||||
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
|
||||
}
|
||||
|
||||
// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
|
||||
@ -651,7 +659,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
|
||||
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
|
||||
parentRoot = parentBlock.Root()
|
||||
}
|
||||
return sds.writeStateDiff(currentBlock, parentRoot, params)
|
||||
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
|
||||
}
|
||||
|
||||
// Writes a state diff from the current block, parent state root, and provided params
|
||||
@ -691,3 +699,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wrapper function on writeStateDiff to retry when the deadlock is detected.
|
||||
func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||
var err error
|
||||
for i := uint(0); i < sds.maxRetry; i++ {
|
||||
err = sds.writeStateDiff(block, parentRoot, params)
|
||||
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
|
||||
// Retry only when the deadlock is detected.
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user