Retry aborted transaction when the deadlock is detected. #166
@ -20,6 +20,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"math/big"
|
"math/big"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -46,8 +47,12 @@ import (
|
|||||||
. "github.com/ethereum/go-ethereum/statediff/types"
|
. "github.com/ethereum/go-ethereum/statediff/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const chainEventChanSize = 20000
|
const (
|
||||||
const genesisBlockNumber = 0
|
chainEventChanSize = 20000
|
||||||
|
genesisBlockNumber = 0
|
||||||
|
defaultRetryLimit = 3 // default retry limit once deadlock is detected.
|
||||||
|
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
|
||||||
|
)
|
||||||
|
|
||||||
var writeLoopParams = Params{
|
var writeLoopParams = Params{
|
||||||
IntermediateStateNodes: true,
|
IntermediateStateNodes: true,
|
||||||
@ -131,6 +136,8 @@ type Service struct {
|
|||||||
enableWriteLoop bool
|
enableWriteLoop bool
|
||||||
// Size of the worker pool
|
// Size of the worker pool
|
||||||
numWorkers uint
|
numWorkers uint
|
||||||
|
// Number of retry for aborted transactions due to deadlock.
|
||||||
|
maxRetry uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wrap the cached last block for safe access from different service loops
|
// Wrap the cached last block for safe access from different service loops
|
||||||
@ -189,6 +196,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: params.EnableWriteLoop,
|
enableWriteLoop: params.EnableWriteLoop,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
|
maxRetry: defaultRetryLimit,
|
||||||
}
|
}
|
||||||
stack.RegisterLifecycle(sds)
|
stack.RegisterLifecycle(sds)
|
||||||
stack.RegisterAPIs(sds.APIs())
|
stack.RegisterAPIs(sds.APIs())
|
||||||
@ -270,7 +278,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
|||||||
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
|
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
|
||||||
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
|
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
|
||||||
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
|
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
|
||||||
err := sds.writeStateDiff(currBlock, common.Hash{}, writeLoopParams)
|
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("statediff.Service.WriteLoop: processing error", "block height",
|
log.Error("statediff.Service.WriteLoop: processing error", "block height",
|
||||||
genesisBlockNumber, "error", err.Error(), "worker", workerId)
|
genesisBlockNumber, "error", err.Error(), "worker", workerId)
|
||||||
@ -299,7 +307,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
||||||
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
|
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
||||||
continue
|
continue
|
||||||
@ -638,7 +646,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
|||||||
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
|
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
|
||||||
parentRoot = parentBlock.Root()
|
parentRoot = parentBlock.Root()
|
||||||
}
|
}
|
||||||
return sds.writeStateDiff(currentBlock, parentRoot, params)
|
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
|
// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
|
||||||
@ -651,7 +659,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
|
|||||||
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
|
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
|
||||||
parentRoot = parentBlock.Root()
|
parentRoot = parentBlock.Root()
|
||||||
}
|
}
|
||||||
return sds.writeStateDiff(currentBlock, parentRoot, params)
|
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writes a state diff from the current block, parent state root, and provided params
|
// Writes a state diff from the current block, parent state root, and provided params
|
||||||
@ -691,3 +699,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wrapper function on writeStateDiff to retry when the deadlock is detected.
|
||||||
|
func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||||
|
var err error
|
||||||
|
for i := uint(0); i < sds.maxRetry; i++ {
|
||||||
|
err = sds.writeStateDiff(block, parentRoot, params)
|
||||||
|
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
|
||||||
|
// Retry only when the deadlock is detected.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user