eth, miner: add timeout for building sealing block (#25407)

* eth, miner: add timeout for building sealing block

* eth, cmd, miner: add newpayloadtimeout flag

* eth, miner, cmd: address comments

* eth, miner: minor fixes

Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
rjl493456442 2022-10-03 20:10:00 +08:00 committed by GitHub
parent f61b50b1e8
commit 1913b50111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 112 additions and 55 deletions

View File

@ -127,6 +127,7 @@ var (
utils.MinerExtraDataFlag, utils.MinerExtraDataFlag,
utils.MinerRecommitIntervalFlag, utils.MinerRecommitIntervalFlag,
utils.MinerNoVerifyFlag, utils.MinerNoVerifyFlag,
utils.MinerNewPayloadTimeout,
utils.NATFlag, utils.NATFlag,
utils.NoDiscoverFlag, utils.NoDiscoverFlag,
utils.DiscoveryV5Flag, utils.DiscoveryV5Flag,

View File

@ -563,6 +563,12 @@ var (
Usage: "Disable remote sealing verification", Usage: "Disable remote sealing verification",
Category: flags.MinerCategory, Category: flags.MinerCategory,
} }
MinerNewPayloadTimeout = &cli.DurationFlag{
Name: "miner.newpayload-timeout",
Usage: "Specify the maximum time allowance for creating a new payload",
Value: ethconfig.Defaults.Miner.NewPayloadTimeout,
Category: flags.MinerCategory,
}
// Account settings // Account settings
UnlockedAccountFlag = &cli.StringFlag{ UnlockedAccountFlag = &cli.StringFlag{
@ -1658,6 +1664,9 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) {
if ctx.IsSet(MinerNoVerifyFlag.Name) { if ctx.IsSet(MinerNoVerifyFlag.Name) {
cfg.Noverify = ctx.Bool(MinerNoVerifyFlag.Name) cfg.Noverify = ctx.Bool(MinerNoVerifyFlag.Name)
} }
if ctx.IsSet(MinerNewPayloadTimeout.Name) {
cfg.NewPayloadTimeout = ctx.Duration(MinerNewPayloadTimeout.Name)
}
} }
func setRequiredBlocks(ctx *cli.Context, cfg *ethconfig.Config) { func setRequiredBlocks(ctx *cli.Context, cfg *ethconfig.Config) {

View File

@ -476,10 +476,9 @@ func TestExchangeTransitionConfig(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10) genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks) n, ethservice := startEthService(t, genesis, preMergeBlocks)
defer n.Close() defer n.Close()
var (
api = NewConsensusAPI(ethservice)
)
// invalid ttd // invalid ttd
api := NewConsensusAPI(ethservice)
config := beacon.TransitionConfigurationV1{ config := beacon.TransitionConfigurationV1{
TerminalTotalDifficulty: (*hexutil.Big)(big.NewInt(0)), TerminalTotalDifficulty: (*hexutil.Big)(big.NewInt(0)),
TerminalBlockHash: common.Hash{}, TerminalBlockHash: common.Hash{},
@ -812,10 +811,8 @@ func TestInvalidBloom(t *testing.T) {
func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) { func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(100) genesis, preMergeBlocks := generatePreMergeChain(100)
fmt.Println(genesis.Config.TerminalTotalDifficulty)
genesis.Config.TerminalTotalDifficulty = preMergeBlocks[0].Difficulty() //.Sub(genesis.Config.TerminalTotalDifficulty, preMergeBlocks[len(preMergeBlocks)-1].Difficulty()) genesis.Config.TerminalTotalDifficulty = preMergeBlocks[0].Difficulty() //.Sub(genesis.Config.TerminalTotalDifficulty, preMergeBlocks[len(preMergeBlocks)-1].Difficulty())
fmt.Println(genesis.Config.TerminalTotalDifficulty)
n, ethservice := startEthService(t, genesis, preMergeBlocks) n, ethservice := startEthService(t, genesis, preMergeBlocks)
defer n.Close() defer n.Close()

View File

@ -84,16 +84,12 @@ var Defaults = Config{
TrieTimeout: 60 * time.Minute, TrieTimeout: 60 * time.Minute,
SnapshotCache: 102, SnapshotCache: 102,
FilterLogCacheSize: 32, FilterLogCacheSize: 32,
Miner: miner.Config{ Miner: miner.DefaultConfig,
GasCeil: 30000000, TxPool: core.DefaultTxPoolConfig,
GasPrice: big.NewInt(params.GWei), RPCGasCap: 50000000,
Recommit: 3 * time.Second, RPCEVMTimeout: 5 * time.Second,
}, GPO: FullNodeGPO,
TxPool: core.DefaultTxPoolConfig, RPCTxFeeCap: 1, // 1 ether
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
RPCTxFeeCap: 1, // 1 ether
} }
func init() { func init() {

View File

@ -53,6 +53,16 @@ type Config struct {
GasPrice *big.Int // Minimum gas price for mining a transaction GasPrice *big.Int // Minimum gas price for mining a transaction
Recommit time.Duration // The time interval for miner to re-create mining work. Recommit time.Duration // The time interval for miner to re-create mining work.
Noverify bool // Disable remote mining solution verification(only useful in ethash). Noverify bool // Disable remote mining solution verification(only useful in ethash).
NewPayloadTimeout time.Duration // The maximum time allowance for creating a new payload
}
// DefaultConfig contains default settings for miner.
var DefaultConfig = Config{
GasCeil: 30000000,
GasPrice: big.NewInt(params.GWei),
Recommit: 3 * time.Second,
NewPayloadTimeout: 2 * time.Second,
} }
// Miner creates blocks and searches for proof-of-work values. // Miner creates blocks and searches for proof-of-work values.

View File

@ -80,6 +80,7 @@ const (
var ( var (
errBlockInterruptedByNewHead = errors.New("new head arrived while building block") errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
errBlockInterruptedByTimeout = errors.New("timeout while building block")
) )
// environment is the worker's current environment and holds all // environment is the worker's current environment and holds all
@ -158,6 +159,7 @@ const (
commitInterruptNone int32 = iota commitInterruptNone int32 = iota
commitInterruptNewHead commitInterruptNewHead
commitInterruptResubmit commitInterruptResubmit
commitInterruptTimeout
) )
// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
@ -241,6 +243,13 @@ type worker struct {
// non-stop and no real transaction will be included. // non-stop and no real transaction will be included.
noempty uint32 noempty uint32
// newpayloadTimeout is the maximum timeout allowance for creating payload.
// The default value is 2 seconds but node operator can set it to arbitrary
// large value. A large timeout allowance may cause Geth to fail creating
// a non-empty payload within the specified time and eventually miss the slot
// in case there are some computation expensive transactions in txpool.
newpayloadTimeout time.Duration
// External functions // External functions
isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner. isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner.
@ -288,6 +297,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval) log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval recommit = minRecommitInterval
} }
// Sanitize the timeout config for creating payload.
newpayloadTimeout := worker.config.NewPayloadTimeout
if newpayloadTimeout == 0 {
log.Warn("Sanitizing new payload timeout to default", "provided", newpayloadTimeout, "updated", DefaultConfig.NewPayloadTimeout)
newpayloadTimeout = DefaultConfig.NewPayloadTimeout
}
if newpayloadTimeout < time.Millisecond*100 {
log.Warn("Low payload timeout may cause high amount of non-full blocks", "provided", newpayloadTimeout, "default", DefaultConfig.NewPayloadTimeout)
}
worker.newpayloadTimeout = newpayloadTimeout
worker.wg.Add(4) worker.wg.Add(4)
go worker.mainLoop() go worker.mainLoop()
@ -844,42 +863,26 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
var coalescedLogs []*types.Log var coalescedLogs []*types.Log
for { for {
// In the following three cases, we will interrupt the execution of the transaction. // Check interruption signal and abort building if it's fired.
// (1) new head block event arrival, the interrupt signal is 1 if interrupt != nil {
// (2) worker start or restart, the interrupt signal is 1 if signal := atomic.LoadInt32(interrupt); signal != commitInterruptNone {
// (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. return signalToErr(signal)
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
return errBlockInterruptedByRecommit
} }
return errBlockInterruptedByNewHead
} }
// If we don't have enough gas for any further transactions then we're done // If we don't have enough gas for any further transactions then we're done.
if env.gasPool.Gas() < params.TxGas { if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break break
} }
// Retrieve the next transaction and abort if all done // Retrieve the next transaction and abort if all done.
tx := txs.Peek() tx := txs.Peek()
if tx == nil { if tx == nil {
break break
} }
// Error may be ignored here. The error has already been checked // Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool. // during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(env.signer, tx) from, _ := types.Sender(env.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf // Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do. // phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
@ -926,7 +929,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
txs.Shift() txs.Shift()
} }
} }
if !w.isRunning() && len(coalescedLogs) > 0 { if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are sealing. The reason is that // We don't push the pendingLogsEvent while we are sealing. The reason is that
// when we are sealing, the worker will regenerate a sealing block every 3 seconds. // when we are sealing, the worker will regenerate a sealing block every 3 seconds.
@ -942,11 +944,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
} }
w.pendingLogsFeed.Send(cpy) w.pendingLogsFeed.Send(cpy)
} }
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return nil return nil
} }
@ -986,15 +983,15 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
} }
timestamp = parent.Time() + 1 timestamp = parent.Time() + 1
} }
// Construct the sealing block header, set the extra field if it's allowed // Construct the sealing block header.
num := parent.Number()
header := &types.Header{ header := &types.Header{
ParentHash: parent.Hash(), ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1), Number: new(big.Int).Add(parent.Number(), common.Big1),
GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil), GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil),
Time: timestamp, Time: timestamp,
Coinbase: genParams.coinbase, Coinbase: genParams.coinbase,
} }
// Set the extra field if it's allowed.
if !genParams.noExtra && len(w.extra) != 0 { if !genParams.noExtra && len(w.extra) != 0 {
header.Extra = w.extra header.Extra = w.extra
} }
@ -1082,7 +1079,16 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
defer work.discard() defer work.discard()
if !params.noTxs { if !params.noTxs {
w.fillTransactions(nil, work) interrupt := new(int32)
timer := time.AfterFunc(w.newpayloadTimeout, func() {
atomic.StoreInt32(interrupt, commitInterruptTimeout)
})
defer timer.Stop()
err := w.fillTransactions(interrupt, work)
if errors.Is(err, errBlockInterruptedByTimeout) {
log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout))
}
} }
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
} }
@ -1113,13 +1119,36 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
if !noempty && atomic.LoadUint32(&w.noempty) == 0 { if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
w.commit(work.copy(), nil, false, start) w.commit(work.copy(), nil, false, start)
} }
// Fill pending transactions from the txpool into the block.
// Fill pending transactions from the txpool
err = w.fillTransactions(interrupt, work) err = w.fillTransactions(interrupt, work)
if errors.Is(err, errBlockInterruptedByNewHead) { switch {
case err == nil:
// The entire block is filled, decrease resubmit interval in case
// of current interval is larger than the user-specified one.
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
case errors.Is(err, errBlockInterruptedByRecommit):
// Notify resubmit loop to increase resubmitting interval if the
// interruption is due to frequent commits.
gaslimit := work.header.GasLimit
ratio := float64(gaslimit-work.gasPool.Gas()) / float64(gaslimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
case errors.Is(err, errBlockInterruptedByNewHead):
// If the block building is interrupted by newhead event, discard it
// totally. Committing the interrupted block introduces unnecessary
// delay, and possibly causes miner to mine on the previous head,
// which could result in higher uncle rate.
work.discard() work.discard()
return return
} }
// Submit the generated block for consensus sealing.
w.commit(work.copy(), w.fullTaskHook, true, start) w.commit(work.copy(), w.fullTaskHook, true, start)
// Swap out the old work with the new one, terminating any leftover // Swap out the old work with the new one, terminating any leftover
@ -1231,3 +1260,18 @@ func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float {
} }
return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
} }
// signalToErr converts the interruption signal to a concrete error type for return.
// The given signal must be a valid interruption signal.
func signalToErr(signal int32) error {
switch signal {
case commitInterruptNewHead:
return errBlockInterruptedByNewHead
case commitInterruptResubmit:
return errBlockInterruptedByRecommit
case commitInterruptTimeout:
return errBlockInterruptedByTimeout
default:
panic(fmt.Errorf("undefined signal %d", signal))
}
}

View File

@ -523,21 +523,21 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
} }
func TestGetSealingWorkEthash(t *testing.T) { func TestGetSealingWorkEthash(t *testing.T) {
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false) testGetSealingWork(t, ethashChainConfig, ethash.NewFaker())
} }
func TestGetSealingWorkClique(t *testing.T) { func TestGetSealingWorkClique(t *testing.T) {
testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()), false) testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
} }
func TestGetSealingWorkPostMerge(t *testing.T) { func TestGetSealingWorkPostMerge(t *testing.T) {
local := new(params.ChainConfig) local := new(params.ChainConfig)
*local = *ethashChainConfig *local = *ethashChainConfig
local.TerminalTotalDifficulty = big.NewInt(0) local.TerminalTotalDifficulty = big.NewInt(0)
testGetSealingWork(t, local, ethash.NewFaker(), true) testGetSealingWork(t, local, ethash.NewFaker())
} }
func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, postMerge bool) { func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close() defer engine.Close()
w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)